我正在嘗試建立 goroutine 來完成任務
所以我寫了這段程式碼。 像 a、b、c 這樣沒有依賴關係的任務很容易實現並且運作良好。 只是在實現依賴任務 d 和 e 時遇到一些問題,每個任務都有 2 個任務的依賴關係。
只剩下一個連接點,它為每個任務建立一個通道,然後傳遞訊息,該訊息將由依賴任務讀取,以減少依賴任務完成後的依賴數量。請參閱程式碼中的 checkpoint 1
註解。
有人可以幫我解決這個問題嗎?我只是停留在如何在這種情況下實現 goroutine 的部分。
程式碼:
package main import ( "fmt" "sync" ) type task struct { isdone bool dependencies []*task subscribers []*task donechan chan bool numdependencies int taskname string informsubchannel chan bool // } func (t *task) executetask() { fmt.printf("task %s is getting executed...\n", t.taskname) // <-time.after(5 * time.second) fmt.printf("task %s is done!! <-------\n", t.taskname) } func (t *task) updatedependency() { var updateddependencies []*task for _, t := range t.dependencies { if !t.isdone { updateddependencies = append(updateddependencies, t) } } t.numdependencies = len(updateddependencies) fmt.printf("updating dependency for task: %s to %d\n", t.taskname, t.numdependencies) t.dependencies = updateddependencies } // if we are having dependencies for a task subscribe to those dependent task. // when the dependent task is done inform it and reduce the no of dependencies. // a --> d (d depends on a), a has finished its task so inform it subscribed task which is d here and reduce d dependencies. func (t *task) informsubscriber() { if len(t.subscribers) > 0 { for _, sub := range t.subscribers { fmt.printf("task %s has informed subscriber %s\n", t.taskname, sub.taskname) sub.updatedependency() } } } // task is subscribed to dependent task. d has been subscribed to a, d will watch over the activity of a func (t *task) setsubscriber(sub *task) { fmt.printf("set subscriber %s to task %s\n", sub.taskname, t.taskname) t.subscribers = append(t.subscribers, sub) } // go routine - background task execution // mark it as completed func (t *task) markcompleted() { for { select { case <-t.donechan: { t.isdone = true t.executetask() // inform all the subscribers that the task is completed and adjust their dependencies t.informsubscriber() close(t.donechan) return } default: } } } func (t *task) setdependency(tasks []*task) { t.dependencies = tasks t.numdependencies = len(t.dependencies) } // this will be use if dependent task are already done. will be used in checkpoint 1. func (t *task) trackdependency() { t.numdependencies -= 1 fmt.printf("no of dependencies for task %s is: %d\n", t.taskname, t.numdependencies) if t.numdependencies == 0 { // execute task t.donechan <- true } } func (t *task) start() { fmt.printf("running task %s\n", t.taskname) t.updatedependency() go t.markcompleted() if t.numdependencies > 0 { // for every dependent task for _, dep := range t.dependencies { // create subscribers dep.setsubscriber(t) // what if all dependencies are already executed. subscriber won't help as they won't be marked completed as already done. // say a and c are already done then d won't be able to complete itself since it's still waiting for them // if dependencies are already finished mark it as completed too // code: handle the dependent case here(unable to implement) // background function for tracking dependency // checkpoint 1: read dependent task channel value & reduce dependencies if done go t.trackdependency() } fmt.printf("task %s has %d dependencies and waiting for them to get finished\n", t.taskname, t.numdependencies) } else { // if no dependencies. mark it as finished t.donechan <- true } } func createtask(taskname string) *task { return &task{ isdone: false, taskname: taskname, dependencies: nil, subscribers: nil, numdependencies: 0, donechan: make(chan bool), } } func main() { taska := createtask("a") taskb := createtask("b") taskc := createtask("c") taskd := createtask("d") taske := createtask("e") taskd.setdependency([]*task{taska, taskb}) taske.setdependency([]*task{taskc, taskd}) alltasks := []*task{taska, taskb, taskc, taskd, taske} var wg sync.waitgroup for _, t := range alltasks { wg.add(1) go func(t *task) { defer wg.done() t.start() }(t) } wg.wait() }
範例輸出:
#(base) ninjakx@Kritis-MacBook-Pro Practice % go run task.go Running Task D Running Task B Running Task C Updating dependency for task: B to 0 Running Task E Task B is getting executed... Updating dependency for task: C to 0 Running Task A Task C is getting executed... Task C is done!! <------- Updating dependency for task: D to 2 Set subscriber D to task A Set subscriber D to task B Task D has 2 dependencies and waiting for them to get finished Task B is done!! <------- No of dependencies for task D is: 2 Updating dependency for task: E to 2 Set subscriber E to task C Set subscriber E to task D Task E has 2 dependencies and waiting for them to get finished No of dependencies for task E is: 2 No of dependencies for task D is: 2 No of dependencies for task E is: 2 Updating dependency for task: A to 0 task B has informed subscriber D Updating dependency for task: D to 0 Task A is getting executed... Task A is done!! <-------
由於上述缺失實現,目前 發現 5 個資料競爭
。
我認為您可以使用較小的任務結構和 waitgroup
的一些幫助來實現上述場景進行同步。
這是我將一些註解放在一起進行解釋的範例。
package main import ( "fmt" "math/rand" "sync" "time" ) // tasks holds an id ( for ease of debugging ) // a buffered channel that is only used for signaling when the task is executed // and finally a list of dependency tasks type task struct { id string done chan struct{} dependencies []*task } // run is where all the logic happens // // we create a waitgroup that will be the size of the dependencies for the current task // and we will wait until all tasks have signaled that they have executed. // // when all the dependencies have signaled through their channel that they are done // then the current task is free to execute and then signal any potential waiting task. func (t *task) run(done func()) { wg := sync.waitgroup{} wg.add(len(t.dependencies)) for _, task := range t.dependencies { go func(dep *task) { fmt.printf("%s is waiting for task %s to finish\n", t.id, dep.id) <-dep.done wg.done() }(task) } wg.wait() // emulate work time.sleep(time.duration(rand.intn(5-1)+1) * time.second) fmt.printf("job %s ran\n", t.id) t.done <- struct{}{} done() } func newtask(id string) *task { return &task{ id: id, // we need buffered size here, else the task will be blocked until someone will read the channel on `run` done: make(chan struct{}, 1), } } func (t *task) setdeps(deps ...*task) { t.dependencies = append(t.dependencies, deps...) } // executetasks simply runs all the tasks concurrently and waits until every tasks is completed func executetasks(tasks ...*task) { fmt.println("starting execution") wg := sync.waitgroup{} wg.add(len(tasks)) for _, task := range tasks { go task.run(wg.done) } wg.wait() fmt.println("end of execution") } func main() { // initialise the tasks a := newtask("a") b := newtask("b") c := newtask("c") d := newtask("d") e := newtask("e") // and set dependencies // a.setdeps(d) d.setdeps(a, b) e.setdeps(d, c) // then we "try" to execute all the tasks. executetasks(a, b, c, d, e) }
當然這不是完美的解決方案,我可以認為已經有很多情況沒有得到處理
例如
a => d
和 d => a
為了解決第一個問題,您可能需要建立依賴圖並檢查它是否是循環的。對於第二個, hacky
方式可能是
go func(dep *Task) { fmt.Printf("%s is waiting for task %s to finish\n", t.id, dep.id) <-dep.done // put the value back if anyone else is also dependent dep.done <- struct{}{} wg.Done() }(task)
以上是使用 go 例程建立 pub sub的詳細內容。更多資訊請關注PHP中文網其他相關文章!