php editor Zimo In the software development process, message queue is a common communication mechanism used to achieve asynchronous communication between producers and consumers. However, sometimes we want to control the reading of messages by producers and consumers in order to better manage system resources and handle requests during peak hours. This article will introduce some methods to restrict producers and consumers from reading messages to help developers optimize system performance and improve application stability.
I want to get the application producer-consumer (closed via signal) using go.
The producer continuously generates messages in the queue, with a limit of 10. Some consumers read and process the channel. If the number of messages in the queue is 0, the producer generates 10 messages again. When a stop signal is received, the producer stops generating new messages and the consumer processes everything in the channel.
I found a piece of code but I can't understand if it works properly because I found something strange:
result:
Code example:
package main import ( "context" "fmt" "math/rand" "os" "os/signal" "sync" "syscall" "time" ) func main() { const nConsumers = 2 in := make(chan int, 10) p := Producer{&in} c := Consumer{&in, make(chan int, nConsumers)} go p.Produce() ctx, cancelFunc := context.WithCancel(context.Background()) go c.Consume(ctx) wg := &sync.WaitGroup{} wg.Add(nConsumers) for i := 1; i <= nConsumers; i++ { go c.Work(wg, i) } termChan := make(chan os.Signal, 1) signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) <-termChan cancelFunc() wg.Wait() } type Consumer struct { in *chan int jobs chan int } func (c Consumer) Work(wg *sync.WaitGroup, i int) { defer wg.Done() for job := range c.jobs { fmt.Printf("Worker #%d start job %d\n", i, job) time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000))) fmt.Printf("Worker #%d finish job %d\n", i, job) } fmt.Printf("Worker #%d interrupted\n", i) } func (c Consumer) Consume(ctx context.Context) { for { select { case job := <-*c.in: c.jobs <- job case <-ctx.Done(): close(c.jobs) fmt.Println("Consumer close channel") return } } } type Producer struct { in *chan int } func (p Producer) Produce() { task := 1 for { *p.in <- task fmt.Printf("Send value %d\n", task) task++ time.Sleep(time.Millisecond * 500) } }
Why after stopping the program, not all the messages in the queue are processed, and it seems that some data is lost.
This is because when ctx
completes, (consumer).consume
stops reading from the in
channel, but go p.produce( )
The created goroutine still writes in
channel.
The demo below solves this problem and simplifies the source code.
Comments:
produce
Stops after ctx
completes. And it closes the in
channel.
Field jobs
has been removed from consumer
, workers are read directly from the in
channel.
The following request is ignored because it is strange. Common behavior is that when a job is generated, if the in
channel is not full, the job will be sent to the in
channel immediately; when it is full, the send operation will block until the in
channel is filled.
Until the channel read operation.
package main import ( "context" "fmt" "math/rand" "os/signal" "sync" "syscall" "time" ) func main() { const nConsumers = 2 ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() in := make(chan int, 10) p := Producer{in} c := Consumer{in} go p.Produce(ctx) var wg sync.WaitGroup wg.Add(nConsumers) for i := 1; i <= nConsumers; i++ { go c.Work(&wg, i) } <-ctx.Done() fmt.Printf("\nGot end signal, waiting for %d jobs to finish\n", len(in)) wg.Wait() } type Consumer struct { in chan int } func (c *Consumer) Work(wg *sync.WaitGroup, i int) { defer wg.Done() for job := range c.in { fmt.Printf("Worker #%d start job %d\n", i, job) time.Sleep(time.Millisecond * time.Duration(3000+rand.Intn(3000))) fmt.Printf("Worker #%d finish job %d\n", i, job) } fmt.Printf("Worker #%d interrupted\n", i) } type Producer struct { in chan int } func (p *Producer) Produce(ctx context.Context) { task := 1 for { select { case p.in <- task: fmt.Printf("Send value %d\n", task) task++ time.Sleep(time.Millisecond * 500) case <-ctx.Done(): close(p.in) return } } }
The above is the detailed content of How to restrict producers and consumers from reading messages?. For more information, please follow other related articles on the PHP Chinese website!