How to cancel channel job based on ID in golang

PHPz
Release: 2024-02-08 23:42:11
forward
622 people have browsed it

How to cancel channel job based on ID in golang

In golang, canceling channel jobs is a common requirement. When we are dealing with concurrent tasks, sometimes we need to cancel the executing task based on the task ID. So, how to implement this function in golang? Below, I will introduce you to a simple and effective method. First, we need to create a buffered channel to store the task's ID. We can then use a select statement to listen for read operations on the channel, and send the task's ID to the channel when the task needs to be canceled. Then, in the task execution function, we can determine whether the task needs to be canceled by judging whether the channel is closed. If the channel is closed, the task has been canceled and we can terminate the execution of the task at the appropriate place. In this way, we can easily cancel channel jobs based on ID in golang. The above is the method introduced by php editor Xinyi, I hope it will be helpful to everyone!

Question content

So I have a post endpoint that creates a job and adds them to a chan. workerjobschan = make(chan job, maxqueuesize)

This is how I execute the job in the channel (main.go):

for i := 1; i <= maxworkers; i++ {
    go func(i int) {
        for job := range workerjobschan {
            ctx, cancel := context.withcancel(context.background())
            storejob(job.search.id, cancel)
            job.execute(ctx, c.db, i)
        }
    }(i)
}
Copy after login

I store the cancellation function in the map: canceljobfuncs = make(map[int]context.cancelfunc).

This is the job function:

func (j *job) execute(ctx context.context, db *sql.db, workerid int) error {


    for {
        select {
        // check for cancellation signal
        case <-ctx.done():
             if err := ctx.err(); err != nil {
                fmt.println("worker", workerid, "error", err)
             }
            fmt.println("worker", workerid, "cancelled")
            return nil

        default:
            fmt.printf("worker%d: processing %s\n", workerid, j.search.query)
            time.sleep(2 * time.second)
            fmt.printf("worker%d: active %s\n", workerid, j.search.query)
            time.sleep(5 * time.second)
            fmt.printf("worker%d: completed %s!\n", workerid, j.search.query)

        }
    }
}
Copy after login

I cancel the context (in the http handler) like this:

cancelJob(search.ID)
Copy after login

But the job continues to run. I've tried a lot of things but can't seem to get it to work.

Workaround

Here's a way to illustrate an obvious point: if your code doesn't check ctx.done(), it has no way of knowing that it has been Cancel.
(BTW, this is another paraphrase of what @jimb wrote in the comment on your question).

So when the code in the .execute(...) method starts executing this block:

fmt.printf("worker%d: processing %s\n", workerid, j.search.query)
    time.sleep(2 * time.second)
    fmt.printf("worker%d: active %s\n", workerid, j.search.query)
    time.sleep(5 * time.second)
    fmt.printf("worker%d: completed %s!\n", workerid, j.search.query)
Copy after login

It will reach the end of the block (7 seconds). There is no instruction telling it to stop on cancel.

If you want your function to be able to detect cancellation during a "sleep" instruction, you must change your code.

Here's an example of how to do this using your example:

func (j *job) execute(ctx context.context, db *sql.db, workerid int) error {


    for {
        fmt.printf("worker%d: processing %s\n", workerid, j.search.query)

        // rewrite time.sleep() with time.after() so that it can be composed
        // in a select statement:
        select {
        case <-ctx.done():
            fmt.println("worker", workerid, "cancelled")
            return nil
        case <-time.after(2 * time.second):
            // keep going
        }

        fmt.printf("worker%d: active %s\n", workerid, j.search.query)

        select {
        case <-ctx.done():
            fmt.println("worker", workerid, "cancelled")
            return nil
        case <-time.after(5 * time.second):
            // keep going
        }

        fmt.printf("worker%d: completed %s!\n", workerid, j.search.query)
    }
}
Copy after login

//m.sbmmt.com/link/3bc31a430954d8326605fc690ed22f4d

I guess your actual code doesn't have the time.sleep() directive, but processsearch(...) or doquery(...) or...

If you need these functions to be cancelable during execution, you need to pass the cancellation context to them somehow and have them check for cancellation in some way.

One way to "pass context" is obviously to add it to the parameters of said function:

processsearch(ctx, ...)
doquery(ctx, ...)
Copy after login

But depending on your existing code, some parameters may already have built-in methods to cancel.
for example:

// an http.Request carries a context:
func doQuery(req *http.Request, ....) {
    ...
}

// at call site:
    ...
    req := http.NewRequestWithContext(ctx, "GET", "https://some.other.service/", nil)
    doQuery(req, ...)
Copy after login

The above is the detailed content of How to cancel channel job based on ID in golang. For more information, please follow other related articles on the PHP Chinese website!

source:stackoverflow.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!