L'éditeur PHP Zimo introduit les canaux tampons synchronisés et les groupes d'attente, qui sont une technologie couramment utilisée dans la programmation simultanée. Les canaux tampons synchrones permettent le transfert de données entre plusieurs threads et réalisent la synchronisation entre les threads via des tampons. Le groupe d'attente permet de gérer un groupe de threads, en attendant qu'une condition spécifique soit remplie avant de les exécuter simultanément. Ces deux technologies peuvent résoudre efficacement le problème de synchronisation entre les threads dans la programmation multi-thread et améliorer les performances de concurrence et la fiabilité du programme.
J'utilise waitgroup
与 buffered
通道时遇到问题。问题是 waitgroup
pour fermer avant que la chaîne ne soit entièrement lue, ce qui rend ma chaîne à moitié lue et s'interrompt au milieu.
func main() { var wg sync.waitgroup var err error start := time.now() students := make([]studentdetails, 0) studentch := make(chan studentdetail, 10000) errorch := make(chan error, 1) wg.add(1) go s.getdetailstudents(rctx, studentch , errorch, &wg, s.link, false) go func(ch chan studentdetail, e chan error) { loop: for { select { case p, ok := <-ch: if ok { l.printf("links %s: [%s]\n", p.title, p.link) students = append(students, p) } else { l.print("closed channel") break loop } case err = <-e: if err != nil { break } } } }(studentch, errorch) wg.wait() close(studentch) close(errorch) l.warnln("closed: all wait-groups completed!") l.warnf("total items fetched: %d", len(students)) elapsed := time.since(start) l.warnf("operation took %s", elapsed) }
Le problème est que cette fonction est recursive
。我的意思是一些 http 调用来获取 students
et effectue ensuite plus d'appels en fonction de la condition.
func (s Student) getDetailStudents(rCtx context.Context, content chan<- studentDetail, errorCh chan<- error, wg *sync.WaitGroup, url string, subSection bool) { util.MustNotNil(rCtx) L := logger.GetLogger(rCtx) defer func() { L.Println("Closing all waitgroup!") wg.Done() }() wc := getWC() httpClient := wc.Registry.MustHTTPClient() res, err := httpClient.Get(url) if err != nil { L.Fatal(err) } defer res.Body.Close() if res.StatusCode != 200 { L.Errorf("status code error: %d %s", res.StatusCode, res.Status) errorCh <- errors.New("service_status_code") return } // parse response and return error if found some through errorCh as done above. // decide page subSection based on response if it is more. if !subSection { wg.Add(1) go s.getDetailStudents(rCtx, content, errorCh, wg, link, true) // L.Warnf("total pages found %d", pageSub.Length()+1) } // Find students from response list and parse each Student students := s.parseStudentItemList(rCtx, item) for _, student := range students { content <- student } L.Warnf("Calling HTTP Service for %q with total %d record", url, elementsSub.Length()) }
Modifiez les variables pour éviter la base de code d'origine.
Le problème est qu'une fois le groupe d'attente terminé, les élèves sont lus au hasard. Je souhaite continuer à exécuter jusqu'à ce que tous les étudiants aient lu, et si une erreur se produit, elle devrait s'interrompre dès qu'elle rencontre une erreur.
Vous devez savoir quand la goroutine de réception est terminée. Le groupe d'attente fait cela pour générer des goroutines. Vous pouvez donc utiliser deux groupes d'attente :
wg.Add(1) go s.getDetailStudents(rCtx, studentCh , errorCh, &wg, s.Link, false) wgReader.Add(1) go func(ch chan studentDetail, e chan error) { defer wgReader.Done() ... } wg.Wait() close(studentCh) close(errorCh) wgReader.Wait() // Wait for the readers to complete
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!