並發使我們能夠獨立處理多個任務。 Goroutine 是一種獨立處理多個任務的簡單方法。在這篇文章中,我們逐步增強了一個 http 處理程序,該程序接受文件,並利用通道和同步包探索 Go 中的各種並發模式。
在進入並發模式之前,讓我們先做好準備。想像一下,我們有一個 HTTP 處理程序,它透過表單接受多個檔案並以某種方式處理這些檔案。
func processFile(file multipart.File) { // do something with the file fmt.Println("Processing file...") time.Sleep(100 * time.Millisecond) // Simulating file processing time } func UploadHandler(w http.ResponseWriter, r *http.Request) { // limit to 10mb if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // iterate through all files and process them sequentially for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } processFile(f) f.Close() } }
在上面的範例中,我們從表單接收檔案並按順序處理它們。如果上傳 10 個文件,則需要 1 秒鐘才能完成該過程並向客戶端發送回應。
當處理許多文件時,這可能會成為瓶頸,但是透過 Go 的並發支持,我們可以輕鬆解決這個問題。
為了解決這個問題,我們可以並發處理文件。要產生一個新的 goroutine,我們可以在函數呼叫前加上 go 關鍵字,例如去處理檔案(f)。然而,由於 goroutine 是非阻塞的,處理程序可能會在進程完成之前返回,從而導致檔案可能未處理或返回不正確的狀態。要等待所有檔案的處理,我們可以使用sync.WaitGroup。
WaitGroup 等待多個 goroutine 完成,對於我們產生的每個 goroutine,我們也應該增加 WaitGroup 中的計數器,這可以透過 Add 函數來完成。當 goroutine 完成時,應該呼叫 Done,以便計數器減一。在從函數返回之前,應該呼叫 Wait,函數會阻塞,直到 WaitGroup 的計數器為 0。
func UploadHandler(w http.ResponseWriter, r *http.Request) { if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // create WaitGroup var wg sync.WaitGroup for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } wg.Add(1) // Add goroutine to the WaitGroup by incrementing the WaitGroup counter, this should be called before starting a goroutine // Process file concurrently go func(file multipart.File) { defer wg.Done() // decrement the counter by calling Done, utilize defer to guarantee that Done is called. defer file.Close() processFile(f) }(f) } // Wait for all goroutines to complete wg.Wait() fmt.Fprintln(w, "All files processed successfully!") }
現在,對於每個上傳的文件,都會產生一個新的 goroutine,這可能會壓垮系統。一種解決方案是限制生成的 goroutine 的數量。
信號量只是一個變量,我們可以用它來控制多個線程(或在本例中為 goroutine)對公共資源的存取。
在 Go 中,我們可以利用緩衝通道來實現訊號量。
在進入實作之前,我們先來看看什麼是通道以及緩衝通道和非緩衝通道之間的差異。
通道是一個管道,我們可以透過它發送和接收數據,以便在 go 例程之間安全地通訊。
通道必須使用 make 函數建立。
func processFile(file multipart.File) { // do something with the file fmt.Println("Processing file...") time.Sleep(100 * time.Millisecond) // Simulating file processing time } func UploadHandler(w http.ResponseWriter, r *http.Request) { // limit to 10mb if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // iterate through all files and process them sequentially for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } processFile(f) f.Close() } }
通道有一個特殊的運算子
請操作員指向通道 ch
該動畫形像地展示了生產者透過無緩衝通道發送值 1 以及消費者從該通道讀取資料的情況。
如果生產者發送事件的速度比消費者處理的速度快,那麼我們可以選擇利用緩衝通道來排隊多個訊息,而不會阻塞生產者,直到緩衝區已滿。同時,消費者可以按照自己的步調處理訊息。
func UploadHandler(w http.ResponseWriter, r *http.Request) { if err := r.ParseMultipartForm(10 << 20); err != nil { http.Error(w, "Unable to parse form", http.StatusBadRequest) return } // create WaitGroup var wg sync.WaitGroup for _, file := range r.MultipartForm.File["files"] { f, err := file.Open() if err != nil { http.Error(w, "Unable to read file", http.StatusInternalServerError) return } wg.Add(1) // Add goroutine to the WaitGroup by incrementing the WaitGroup counter, this should be called before starting a goroutine // Process file concurrently go func(file multipart.File) { defer wg.Done() // decrement the counter by calling Done, utilize defer to guarantee that Done is called. defer file.Close() processFile(f) }(f) } // Wait for all goroutines to complete wg.Wait() fmt.Fprintln(w, "All files processed successfully!") }
在此範例中,生產者最多可以發送兩個項目而不會阻塞。當緩衝區達到容量時,生產者將阻塞,直到消費者處理了至少一則訊息。
回到最初的問題,我們要限制同時處理文件的 goroutine 數量。為此,我們可以利用緩衝通道。
ch := make(chan int)
在這個範例中,我們新增了一個容量為 5 的緩衝通道,這使我們能夠同時處理 5 個檔案並限制系統壓力。
但是如果並非所有檔案都相等怎麼辦?我們可以可靠地預測不同的文件類型或文件大小需要更多的資源來處理。在這種情況下,我們可以使用加權信號量。
簡單地說,使用加權訊號量,我們可以為單一任務分配更多資源。 Go 已經在擴展同步包中提供了加權信號量的實作。
ch := make(chan int, 2)
在此版本中,我們建立了一個具有5 個槽的加權訊號量,如果只上傳影像,例如進程會同時處理5 個影像,但是如果上傳PDF,則會取得2 個槽,這將減少可處理的文件量同時。
我們探討了 Go 中的一些並發模式,利用sync.WaitGroup 和訊號量來控制並發任務的數量。然而,還有更多可用的工具,我們可以利用通道來建立工作池、新增逾時或使用扇入/扇出模式。
此外,錯誤處理是一個重要方面,但為了簡單起見,大多數情況下都忽略了這一點。
處理錯誤的一種方法是利用通道來聚合錯誤並在所有 goroutine 完成後處理它們。
Go 也提供了 errgroup.Group ,它與sync.WaitGroups 相關,但增加了對傳回錯誤的任務的處理。
該包可以在擴展同步包中找到。
以上是Goroutines 和 Channels:Go 中的並發模式的詳細內容。更多資訊請關注PHP中文網其他相關文章!