Parallelität ermöglicht es uns, mehrere Aufgaben unabhängig voneinander zu bearbeiten. Goroutinen sind eine einfache Möglichkeit, mehrere Aufgaben unabhängig voneinander zu bearbeiten. In diesem Beitrag verbessern wir schrittweise einen HTTP-Handler, der Dateien akzeptiert und verschiedene Parallelitätsmuster in Go mithilfe von Kanälen und dem Synchronisierungspaket untersucht.
Bevor wir uns mit Parallelitätsmustern befassen, bereiten wir die Bühne. Stellen Sie sich vor, wir haben einen HTTP-Handler, der mehrere Dateien über ein Formular akzeptiert und die Dateien auf irgendeine Weise verarbeitet.
func processFile(file multipart.File) { // do something with the file fmt.Println("Processing file...") time.Sleep(100 * time.Millisecond) // Simulating file processing time } func UploadHandler(w http.ResponseWriter, r *http.Request) { // limit to 10mb if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // iterate through all files and process them sequentially for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } processFile(f) f.Close() } }
Im obigen Beispiel empfangen wir Dateien aus einem Formular und verarbeiten sie nacheinander. Wenn 10 Dateien hochgeladen werden, würde es 1 Sekunde dauern, bis der Vorgang abgeschlossen ist und eine Antwort an den Client gesendet wird.
Bei der Verarbeitung vieler Dateien kann dies zu einem Engpass werden. Mit der Parallelitätsunterstützung von Go können wir dieses Problem jedoch leicht lösen.
Um dieses Problem zu lösen, können wir Dateien gleichzeitig verarbeiten. Um eine neue Goroutine zu erzeugen, können wir einem Funktionsaufruf das Schlüsselwort go voranstellen, z. B. Gehen Sie zu ProcessFile(f). Da Goroutinen jedoch nicht blockieren, kehrt der Handler möglicherweise zurück, bevor der Prozess abgeschlossen ist, wodurch Dateien möglicherweise unverarbeitet bleiben oder einen falschen Status zurückgeben. Um auf die Verarbeitung aller Dateien zu warten, können wir sync.WaitGroup.
verwenden
Eine WaitGroup wartet darauf, dass eine Reihe von Goroutinen abgeschlossen werden. Für jede Goroutine, die wir erzeugen, sollten wir zusätzlich den Zähler in der WaitGroup erhöhen. Dies kann mit der Add-Funktion erfolgen. Wenn eine Goroutine fertig ist, sollte Done aufgerufen werden, damit der Zähler um eins verringert wird. Vor der Rückkehr von der Funktion sollte Wait aufgerufen werden, das blockiert, bis der Zähler der WaitGroup 0 ist.
func UploadHandler(w http.ResponseWriter, r *http.Request) { if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // create WaitGroup var wg sync.WaitGroup for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } wg.Add(1) // Add goroutine to the WaitGroup by incrementing the WaitGroup counter, this should be called before starting a goroutine // Process file concurrently go func(file multipart.File) { defer wg.Done() // decrement the counter by calling Done, utilize defer to guarantee that Done is called. defer file.Close() processFile(f) }(f) } // Wait for all goroutines to complete wg.Wait() fmt.Fprintln(w, "All files processed successfully!") }
Jetzt wird für jede hochgeladene Datei eine neue Goroutine erzeugt, was das System überfordern könnte. Eine Lösung besteht darin, die Anzahl der erzeugten Goroutinen zu begrenzen.
Ein Semaphor ist einfach eine Variable, die wir verwenden können, um den Zugriff auf gemeinsame Ressourcen durch mehrere Threads oder in diesem Fall Goroutinen zu steuern.
In Go können wir gepufferte Kanäle nutzen, um ein Semaphor zu implementieren.
Bevor wir mit der Implementierung beginnen, schauen wir uns an, was Kanäle sind und welchen Unterschied zwischen gepufferten und ungepufferten Kanälen besteht.
Kanäle sind eine Leitung, über die wir Daten senden und empfangen können, um sicher zwischen Go-Routinen zu kommunizieren.
Kanäle müssen mit der Make-Funktion erstellt werden.
func processFile(file multipart.File) { // do something with the file fmt.Println("Processing file...") time.Sleep(100 * time.Millisecond) // Simulating file processing time } func UploadHandler(w http.ResponseWriter, r *http.Request) { // limit to 10mb if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // iterate through all files and process them sequentially for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } processFile(f) f.Close() } }
Kanäle haben einen speziellen Operator <-, der zum Senden oder Lesen von einem Kanal verwendet wird.
Wenn der Operator auf den Kanal ch <-1 zeigt, werden Daten an den Kanal gesendet. Zeigt der Pfeil vom Kanal <-ch weg, wird der Wert empfangen. Sende- und Empfangsvorgänge blockieren standardmäßig. Dies bedeutet, dass jeder Vorgang wartet, bis die andere Seite bereit ist.
Die Animation visualisiert, wie ein Produzent den Wert 1 über einen ungepufferten Kanal sendet und der Verbraucher aus dem Kanal liest.
Wenn der Produzent Ereignisse schneller senden kann, als der Verbraucher verarbeiten kann, haben wir die Möglichkeit, einen gepufferten Kanal zu verwenden, um mehrere Nachrichten in die Warteschlange zu stellen, ohne den Produzenten zu blockieren, bis der Puffer voll ist. Gleichzeitig kann der Verbraucher die Nachrichten in seinem eigenen Tempo verarbeiten.
func UploadHandler(w http.ResponseWriter, r *http.Request) { if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // create WaitGroup var wg sync.WaitGroup for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } wg.Add(1) // Add goroutine to the WaitGroup by incrementing the WaitGroup counter, this should be called before starting a goroutine // Process file concurrently go func(file multipart.File) { defer wg.Done() // decrement the counter by calling Done, utilize defer to guarantee that Done is called. defer file.Close() processFile(f) }(f) } // Wait for all goroutines to complete wg.Wait() fmt.Fprintln(w, "All files processed successfully!") }
In diesem Beispiel kann der Produzent bis zu zwei Artikel ohne Blockierung versenden. Wenn die Kapazität des Puffers erreicht ist, blockiert der Produzent, bis der Verbraucher mindestens eine Nachricht verarbeitet hat.
Zurück zum ursprünglichen Problem: Wir möchten die Anzahl der Goroutinen begrenzen, die Dateien gleichzeitig verarbeiten. Dazu können wir gepufferte Kanäle nutzen.
ch := make(chan int)
In diesem Beispiel haben wir einen gepufferten Kanal mit einer Kapazität von 5 hinzugefügt. Dadurch können wir 5 Dateien gleichzeitig verarbeiten und die Belastung des Systems begrenzen.
Aber was ist, wenn nicht alle Dateien gleich sind? Wir können möglicherweise zuverlässig vorhersagen, dass unterschiedliche Dateitypen oder Dateigrößen mehr Ressourcen für die Verarbeitung erfordern. In diesem Fall können wir ein gewichtetes Semaphor verwenden.
Einfach ausgedrückt können wir mit einem gewichteten Semaphor einer einzelnen Aufgabe mehr Ressourcen zuweisen. Go bietet bereits eine Implementierung für ein gewichtetes Semaphor innerhalb des Extend-Sync-Pakets.
ch := make(chan int, 2)
In dieser Version haben wir ein gewichtetes Semaphor mit 5 Slots erstellt. Wenn beispielsweise nur Bilder hochgeladen werden, verarbeitet der Prozess 5 Bilder gleichzeitig. Wenn jedoch ein PDF hochgeladen wird, werden 2 Slots erfasst, was die Anzahl der zu verarbeitenden Dateien verringern würde gleichzeitig.
Wir haben einige Parallelitätsmuster in Go untersucht und dabei sync.WaitGroup und Semaphoren verwendet, um die Anzahl gleichzeitiger Aufgaben zu steuern. Es stehen jedoch weitere Tools zur Verfügung. Wir könnten Kanäle nutzen, um einen Worker-Pool zu erstellen, Zeitüberschreitungen hinzuzufügen oder Fan-In/Out-Muster zu verwenden.
Darüber hinaus ist die Fehlerbehandlung ein wichtiger Aspekt, der der Einfachheit halber größtenteils weggelassen wurde.
Eine Möglichkeit, mit Fehlern umzugehen, wäre die Nutzung von Kanälen, um Fehler zu aggregieren und zu behandeln, nachdem alle Goroutinen abgeschlossen sind.
Go bietet auch eine errgroup.Group, die mit sync.WaitGroups zusammenhängt, aber die Behandlung von Aufgaben hinzufügt, die Fehler zurückgeben.
Das Paket finden Sie im Paket „Extend Sync“.
Das obige ist der detaillierte Inhalt vonGoroutinen und Kanäle: Parallelitätsmuster in Go. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!