首页 > 后端开发 > Golang > Go 中的 Redis 队列和 Cron

Go 中的 Redis 队列和 Cron

DDD
发布: 2024-12-31 04:40:23
原创
859 人浏览过

Redis Queue and Cron in Go

原帖在这里

在本教程中,我们将与队列交互并将其放入 Redis 服务器
使用 github.com/hibiken/asynq 包并为
创建一个调度程序 使用 github.com/robfig/cron 包的计划任务。这一步一步
指南解释了如何设置队列、安排任务以及优雅地处理
关闭。

初始化模块

首先为项目创建一个新的 Go 模块:

go mod init learn_queue_and_cron
登录后复制

创建 cron.go

cron.go 文件负责在特定时间安排和运行任务
间隔。下面是实现:

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/robfig/cron/v3"
)

func runCron(c *cron.Cron) {

    // Schedule a task to run every minute
    _, err := c.AddFunc("@every 1m", func() {
        fmt.Printf("Task executed every minute at: %v \n", time.Now().Local())
    })
    if err != nil {
        log.Fatal(err)
    }

    // Start the cron scheduler
    c.Start()
    log.Println("Cron scheduler started")

    // Keep the main goroutine running
    select {}
}
登录后复制

此代码安排任务每分钟运行一次并保持应用程序运行
确保调度程序连续工作。

创建queue.go

queue.go 文件使用 Asynq 管理任务处理。代码如下:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

func runQueue(server *asynq.Server) {
    mux := asynq.NewServeMux()
    mux.HandleFunc("send_email", emailHandler)
    mux.HandleFunc("generate_report", reportHandler)

    if err := server.Run(mux); err != nil {
        log.Fatalf("Failed to run Asynq server: %v", err)
    }
}

func emailHandler(ctx context.Context, task *asynq.Task) error {
    var payload struct {
        To string `json:"to"`
    }
    if err := json.Unmarshal(task.Payload(), &payload); err != nil {
        return fmt.Errorf("failed to unmarshal payload: %w", err)
    }
    fmt.Printf("Sending email to: %s\n", payload.To)
    return nil
}

func reportHandler(ctx context.Context, task *asynq.Task) error {
    var payload struct {
        ReportID int `json:"report_id"`
    }
    if err := json.Unmarshal(task.Payload(), &payload); err != nil {
        return fmt.Errorf("failed to unmarshal payload: %w", err)
    }
    fmt.Printf("Generating report for ID: %d\n", payload.ReportID)
    return nil
}
登录后复制

解释

  • Handlers: emailHandler 和 reportHandler 通过解析处理任务 它们的有效负载并执行相应的操作。
  • 任务队列:定义了“send_email”和“generate_report”等任务 并通过Asynq的任务队列进行处理。

创建router.go

router.go 文件设置 HTTP 端点来将任务排入队列:

package main

import (
    "encoding/json"
    "net/http"

    "github.com/gin-gonic/gin"
    "github.com/hibiken/asynq"
)

func setupRouter(client *asynq.Client) *gin.Engine {
    r := gin.Default()

    r.POST("/enqueue/email", func(c *gin.Context) {
        var payload struct {
            To string `json:"to"`
        }
        if err := c.ShouldBindJSON(&payload); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"})
            return
        }

        jsonPayload, err := json.Marshal(payload)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal payload"})
            return
        }

        task := asynq.NewTask("send_email", jsonPayload)
        _, err = client.Enqueue(task)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue task"})
            return
        }

        c.JSON(http.StatusOK, gin.H{"message": "Email job enqueued"})
    })

    r.POST("/enqueue/report", func(c *gin.Context) {
        var payload struct {
            ReportID int `json:"report_id"`
        }
        if err := c.ShouldBindJSON(&payload); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"})
            return
        }

        jsonPayload, err := json.Marshal(payload)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal payload"})
            return
        }

        task := asynq.NewTask("generate_report", jsonPayload)
        _, err = client.Enqueue(task)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue task"})
            return
        }

        c.JSON(http.StatusOK, gin.H{"message": "Report job enqueued"})
    })

    return r
}
登录后复制

此代码使用 Gin 框架公开两个端点以用于排队任务。

创建main.go

main.go 文件将所有内容集成在一起:

package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/hibiken/asynq"
    "github.com/robfig/cron/v3"
)

func main() {
    c := cron.New()

    server := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{
            Concurrency: 10,
        },
    )

    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
    defer client.Close()

    router := setupRouter(client)

    httpServer := &http.Server{
        Addr:    ":8080",
        Handler: router,
    }

    // Prepare shutdown context
    ctx, stop := context.WithCancel(context.Background())
    defer stop()
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, os.Interrupt, syscall.SIGTERM)

    go runQueue(server)
    go runCron(c)
    go func() {
        if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("Failed to run HTTP server: %v", err)
        }
    }()

    appShutdown(ctx, httpServer, c, server, quit)
}

func appShutdown(ctx context.Context, httpServer *http.Server, c *cron.Cron, server *asynq.Server, quit chan os.Signal) {
    // Wait for termination signal
    <-quit
    log.Println("Shutting down gracefully...")

    httpCtx, httpCancel := context.WithTimeout(ctx, 5*time.Second)
    defer httpCancel()
    if err := httpServer.Shutdown(httpCtx); err != nil {
        log.Printf("HTTP server shutdown error: %v", err)
    }

    server.Shutdown()
    c.Stop()

    log.Println("Application stopped")
}
登录后复制

该文件结合了队列、cron、HTTP 服务器和关闭逻辑。

安装依赖项

安装所有必需的依赖项:

go mod tidy
登录后复制

构建并运行应用程序

使用以下命令构建并运行应用程序:

go build -o run *.go && ./run
登录后复制

测试应用程序

访问以下端点将任务排队:

  • http://localhost:8080/enqueue/email
  • http://localhost:8080/enqueue/report

观察终端的任务执行日志。

规范网址

欲了解更多详细信息,请访问我博客上的原始帖子。

以上是Go 中的 Redis 队列和 Cron的详细内容。更多信息请关注PHP中文网其他相关文章!

来源:dev.to
本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板