Struktur kod
Kami mencipta pakej general workerPool untuk menggunakan pekerja memproses tugas mengikut keselarasan yang diperlukan oleh perniagaan. Mari kita lihat struktur direktori:
workerpool ├── pool.go ├── task.go └── worker.go
Direktori kumpulan pekerja berada dalam direktori akar projek. Tugas ialah satu unit kerja yang perlu diproses;
Pertama lihat kod Tugas:
// workerpool/task.go package workerpool import ( "fmt" ) type Task struct { Err error Data interface{} f func(interface{}) error } func NewTask(f func(interface{}) error, data interface{}) *Task { return &Task{f: f, Data: data} } func process(workerID int, task *Task) { fmt.Printf("Worker %d processes task %v\n", workerID, task.Data) task.Err = task.f(task.Data) }
Task ialah struktur ringkas yang menyimpan semua data yang diperlukan untuk memproses tugasan. Apabila membuat tugasan, Data dan fungsi f yang akan dilaksanakan diluluskan, dan fungsi process() akan memproses tugasan tersebut. Apabila memproses tugasan, hantar Data sebagai parameter untuk berfungsi f dan simpan hasil pelaksanaan dalam Tugasan.Err.
Mari kita lihat cara Pekerja mengendalikan tugas:
// workerpool/worker.go package workerpool import ( "fmt" "sync" ) // Worker handles all the work type Worker struct { ID int taskChan chan *Task } // NewWorker returns new instance of worker func NewWorker(channel chan *Task, ID int) *Worker { return &Worker{ ID: ID, taskChan: channel, } } // Start starts the worker func (wr *Worker) Start(wg *sync.WaitGroup) { fmt.Printf("Starting worker %d\n", wr.ID) wg.Add(1) go func() { defer wg.Done() for task := range wr.taskChan { process(wr.ID, task) } }() }
Kami mencipta struktur Pekerja kecil, termasuk ID pekerja dan saluran untuk menyimpan tugasan yang belum selesai. Dalam kaedah Mula(), gunakan julat untuk membaca tugasan daripada taskChan dan memprosesnya. Seperti yang anda boleh bayangkan, berbilang pekerja boleh melaksanakan tugas secara serentak.
Kami mengendalikan tugas dengan melaksanakan Tugas dan Pekerja, tetapi nampaknya ada sesuatu yang hilang Siapa yang bertanggungjawab untuk menjana pekerja ini dan menghantar tugas kepada mereka? Jawapannya ialah: Kolam Pekerja.
// workerpoo/pool.go package workerpool import ( "fmt" "sync" "time" ) // Pool is the worker pool type Pool struct { Tasks []*Task concurrency int collector chan *Task wg sync.WaitGroup } // NewPool initializes a new pool with the given tasks and // at the given concurrency. func NewPool(tasks []*Task, concurrency int) *Pool { return &Pool{ Tasks: tasks, concurrency: concurrency, collector: make(chan *Task, 1000), } } // Run runs all work within the pool and blocks until it's // finished. func (p *Pool) Run() { for i := 1; i <= p.concurrency; i++ { worker := NewWorker(p.collector, i) worker.Start(&p.wg) } for i := range p.Tasks { p.collector <- p.Tasks[i] } close(p.collector) p.wg.Wait() }
Dalam kod di atas, kumpulan menyimpan semua tugasan yang belum selesai dan menjana beberapa goroutin yang konsisten dengan serentak untuk pemprosesan tugasan serentak. Saluran cache dikongsi -- pengumpul antara pekerja.
Jadi, apabila kami menjalankan kumpulan kerja ini, kami boleh menjana bilangan pekerja yang diperlukan, dan saluran pengumpul dikongsi di kalangan pekerja. Seterusnya, gunakan untuk julat untuk membaca tugasan dan menulis tugasan baca ke dalam pengumpul. Kami menggunakan sync.WaitGroup untuk mencapai penyegerakan antara coroutine. Sekarang kita mempunyai penyelesaian yang baik, mari kita mengujinya.
// main.go package main import ( "fmt" "time" "github.com/Joker666/goworkerpool/workerpool" ) func main() { var allTask []*workerpool.Task for i := 1; i <= 100; i++ { task := workerpool.NewTask(func(data interface{}) error { taskID := data.(int) time.Sleep(100 * time.Millisecond) fmt.Printf("Task %d processed\n", taskID) return nil }, i) allTask = append(allTask, task) } pool := workerpool.NewPool(allTask, 5) pool.Run() }
Kod di atas mencipta 100 tugasan dan menggunakan 5 serentak untuk memproses tugasan ini.
输出如下:
Worker 3 processes task 98 Task 92 processed Worker 2 processes task 99 Task 98 processed Worker 5 processes task 100 Task 99 processed Task 100 processed Took ===============> 2.0056295s
处理 100 个任务花费了 2s,如何我们将并发数提高到 10,我们会看到处理完所有任务只需要大约 1s。
我们通过实现 workerPool 构建了一个健壮的解决方案,具有并发性、错误处理、数据处理等功能。这是个通用的包,不耦合具体的实现。我们可以使用它来解决一些大问题。
实际上,我们还可以进一步扩展上面的解决方案,以便 worker 可以在后台等待我们投递新的任务并处理。为此,代码需要做一些修改,Task 结构体保持不变,但是需要小改下 Worker,看下面代码:
// workerpool/worker.go // Worker handles all the work type Worker struct { ID int taskChan chan *Task quit chan bool } // NewWorker returns new instance of worker func NewWorker(channel chan *Task, ID int) *Worker { return &Worker{ ID: ID, taskChan: channel, quit: make(chan bool), } } .... // StartBackground starts the worker in background waiting func (wr *Worker) StartBackground() { fmt.Printf("Starting worker %d\n", wr.ID) for { select { case task := <-wr.taskChan: process(wr.ID, task) case <-wr.quit: return } } } // Stop quits the worker func (wr *Worker) Stop() { fmt.Printf("Closing worker %d\n", wr.ID) go func() { wr.quit <- true }() }
Worker 结构体新加 quit channel,并且新加了两个方法。StartBackgorund() 在 for 循环里使用 select-case 从 taskChan 队列读取任务并处理,如果从 quit 读取到结束信号就立即返回。Stop() 方法负责往 quit 写入结束信号。
添加完这两个新的方法之后,我们来修改下 Pool:
// workerpool/pool.go type Pool struct { Tasks []*Task Workers []*Worker concurrency int collector chan *Task runBackground chan bool wg sync.WaitGroup } // AddTask adds a task to the pool func (p *Pool) AddTask(task *Task) { p.collector <- task } // RunBackground runs the pool in background func (p *Pool) RunBackground() { go func() { for { fmt.Print("⌛ Waiting for tasks to come in ...\n") time.Sleep(10 * time.Second) } }() for i := 1; i <= p.concurrency; i++ { worker := NewWorker(p.collector, i) p.Workers = append(p.Workers, worker) go worker.StartBackground() } for i := range p.Tasks { p.collector <- p.Tasks[i] } p.runBackground = make(chan bool) <-p.runBackground } // Stop stops background workers func (p *Pool) Stop() { for i := range p.Workers { p.Workers[i].Stop() } p.runBackground <- true }
Pool 结构体添加了两个成员:Workers 和 runBackground,Workers 保存所有的 worker,runBackground 用于维持 pool 存活状态。
添加了三个新的方法,AddTask() 方法用于往 collector 添加任务;RunBackground() 方法衍生出一个无限运行的 goroutine,以便 pool 维持存活状态,因为 runBackground 信道是空,读取空的 channel 会阻塞,所以 pool 能维持运行状态。接着,在协程里面启动 worker;Stop() 方法用于停止 worker,并且给 runBackground 发送停止信号以便结束 RunBackground() 方法。
我们来看下具体是如何工作的。
如果是在现实的业务场景中,pool 将会与 HTTP 服务器一块运行并消耗任务。我们通过 for 无限循环模拟这种这种场景,如果满足某一条件,pool 将会停止。
// main.go ... pool := workerpool.NewPool(allTask, 5) go func() { for { taskID := rand.Intn(100) + 20 if taskID%7 == 0 { pool.Stop() } time.Sleep(time.Duration(rand.Intn(5)) * time.Second) task := workerpool.NewTask(func(data interface{}) error { taskID := data.(int) time.Sleep(100 * time.Millisecond) fmt.Printf("Task %d processed\n", taskID) return nil }, taskID) pool.AddTask(task) } }() pool.RunBackground()
当执行上面的代码时,我们就会看到有随机的 task 被投递到后台运行的 workers,其中某一个 worker 会读取到任务并完成处理。当满足某一条件时,程序便会停止退出。
Atas ialah kandungan terperinci Concurrency dan WorkerPool dalam bahasa Go - Bahagian 2. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!