PHP editor Zimo Dalam proses pembangunan perisian, baris gilir mesej ialah mekanisme komunikasi biasa yang digunakan untuk mencapai komunikasi tak segerak antara pengeluar dan pengguna. Walau bagaimanapun, kadangkala kami ingin mengawal pembacaan mesej oleh pengeluar dan pengguna untuk mengurus sumber sistem dengan lebih baik dan mengendalikan permintaan semasa waktu sibuk. Artikel ini akan memperkenalkan beberapa kaedah untuk menyekat pengeluar dan pengguna daripada membaca mesej untuk membantu pembangun mengoptimumkan prestasi sistem dan meningkatkan kestabilan aplikasi.
Saya ingin mendapatkan pengeluar aplikasi-pengguna (ditutup melalui isyarat) menggunakan go.
Pengeluar terus menjana mesej dalam baris gilir, dengan had 10. Sesetengah pengguna membaca dan memproses saluran. Jika bilangan mesej dalam baris gilir ialah 0, pengeluar menjana 10 mesej sekali lagi. Apabila isyarat berhenti diterima, pengeluar berhenti menjana mesej baharu dan pengguna memproses segala-galanya dalam saluran.
Saya menjumpai sekeping kod tetapi tidak dapat memahami sama ada ia berfungsi dengan betul kerana saya menemui sesuatu yang pelik:
Hasil:
Contoh kod:
package main import ( "context" "fmt" "math/rand" "os" "os/signal" "sync" "syscall" "time" ) func main() { const nConsumers = 2 in := make(chan int, 10) p := Producer{&in} c := Consumer{&in, make(chan int, nConsumers)} go p.Produce() ctx, cancelFunc := context.WithCancel(context.Background()) go c.Consume(ctx) wg := &sync.WaitGroup{} wg.Add(nConsumers) for i := 1; i <= nConsumers; i++ { go c.Work(wg, i) } termChan := make(chan os.Signal, 1) signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) <-termChan cancelFunc() wg.Wait() } type Consumer struct { in *chan int jobs chan int } func (c Consumer) Work(wg *sync.WaitGroup, i int) { defer wg.Done() for job := range c.jobs { fmt.Printf("Worker #%d start job %d\n", i, job) time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000))) fmt.Printf("Worker #%d finish job %d\n", i, job) } fmt.Printf("Worker #%d interrupted\n", i) } func (c Consumer) Consume(ctx context.Context) { for { select { case job := <-*c.in: c.jobs <- job case <-ctx.Done(): close(c.jobs) fmt.Println("Consumer close channel") return } } } type Producer struct { in *chan int } func (p Producer) Produce() { task := 1 for { *p.in <- task fmt.Printf("Send value %d\n", task) task++ time.Sleep(time.Millisecond * 500) } }
Mengapa selepas menghentikan program, tidak semua mesej dalam baris gilir diproses, dan beberapa data nampaknya hilang.
Ini kerana apabilactx
完成后,(consumer).consume
停止从in
通道读取,但go p.produce()
创建的 goroutine 仍然写入in
ch.
Demo di bawah menyelesaikan masalah ini dan memudahkan kod sumber.
Nota:
produce
在ctx
完成后停止。并且它关闭了in
Saluran.
Bidangjobs
已从consumer
中删除,工作人员直接从in
Baca saluran.
Permintaan berikut tidak diendahkan kerana pelik. Tingkah laku biasa ialah apabila kerja dijana, jika saluranin
通道未满,则作业会立即发送到in
通道;当它已满时,发送操作将阻塞,直到从in
membaca kerja itu sehingga itu.
Jika bilangan mesej dalam baris gilir ialah 0, pengeluar menjana 10 mesej lagi
package main import ( "context" "fmt" "math/rand" "os/signal" "sync" "syscall" "time" ) func main() { const nConsumers = 2 ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() in := make(chan int, 10) p := Producer{in} c := Consumer{in} go p.Produce(ctx) var wg sync.WaitGroup wg.Add(nConsumers) for i := 1; i <= nConsumers; i++ { go c.Work(&wg, i) } <-ctx.Done() fmt.Printf("\nGot end signal, waiting for %d jobs to finish\n", len(in)) wg.Wait() } type Consumer struct { in chan int } func (c *Consumer) Work(wg *sync.WaitGroup, i int) { defer wg.Done() for job := range c.in { fmt.Printf("Worker #%d start job %d\n", i, job) time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000))) fmt.Printf("Worker #%d finish job %d\n", i, job) } fmt.Printf("Worker #%d interrupted\n", i) } type Producer struct { in chan int } func (p *Producer) Produce(ctx context.Context) { task := 1 for { select { case p.in <- task: fmt.Printf("Send value %d\n", task) task++ time.Sleep(time.Millisecond * 500) case <-ctx.Done(): close(p.in) return } } }
Atas ialah kandungan terperinci Bagaimana untuk menyekat pengeluar dan pengguna daripada membaca mesej?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!