首頁 > 後端開發 > Golang > Go 中的 Redis 隊列和 Cron

Go 中的 Redis 隊列和 Cron

DDD
發布: 2024-12-31 04:40:23
原創
858 人瀏覽過

Redis Queue and Cron in Go

原文在這裡

在本教程中,我們將與佇列互動並將其放入 Redis 伺服器
使用 github.com/hibiken/asynq 套件並為
建立調度程序 使用 github.com/robfig/cron 套件的排程任務。這一步一步
指南解釋如何設定隊列、安排任務以及優雅地處理
關閉。

初始化模組

先為專案建立一個新的 Go 模組:

go mod init learn_queue_and_cron
登入後複製

創建 cron.go

cron.go 檔案負責在特定時間排程和執行任務
間隔。下面是實作:

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/robfig/cron/v3"
)

func runCron(c *cron.Cron) {

    // Schedule a task to run every minute
    _, err := c.AddFunc("@every 1m", func() {
        fmt.Printf("Task executed every minute at: %v \n", time.Now().Local())
    })
    if err != nil {
        log.Fatal(err)
    }

    // Start the cron scheduler
    c.Start()
    log.Println("Cron scheduler started")

    // Keep the main goroutine running
    select {}
}
登入後複製

此程式碼安排任務每分鐘運行一次並保持應用程式運行
確保調度程序連續工作。

創建queue.go

queue.go 檔案使用 Asynq 管理任務處理。程式碼如下:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

func runQueue(server *asynq.Server) {
    mux := asynq.NewServeMux()
    mux.HandleFunc("send_email", emailHandler)
    mux.HandleFunc("generate_report", reportHandler)

    if err := server.Run(mux); err != nil {
        log.Fatalf("Failed to run Asynq server: %v", err)
    }
}

func emailHandler(ctx context.Context, task *asynq.Task) error {
    var payload struct {
        To string `json:"to"`
    }
    if err := json.Unmarshal(task.Payload(), &payload); err != nil {
        return fmt.Errorf("failed to unmarshal payload: %w", err)
    }
    fmt.Printf("Sending email to: %s\n", payload.To)
    return nil
}

func reportHandler(ctx context.Context, task *asynq.Task) error {
    var payload struct {
        ReportID int `json:"report_id"`
    }
    if err := json.Unmarshal(task.Payload(), &payload); err != nil {
        return fmt.Errorf("failed to unmarshal payload: %w", err)
    }
    fmt.Printf("Generating report for ID: %d\n", payload.ReportID)
    return nil
}
登入後複製

解釋

  • Handlers: emailHandler 和 reportHandler 透過解析處理任務 它們的有效負載並執行相應的操作。
  • 任務佇列:定義了「send_email」和「generate_report」等任務 並透過Asynq的任務佇列進行處理。

建立router.go

router.go 檔案設定 HTTP 端點來將任務排入佇列:

package main

import (
    "encoding/json"
    "net/http"

    "github.com/gin-gonic/gin"
    "github.com/hibiken/asynq"
)

func setupRouter(client *asynq.Client) *gin.Engine {
    r := gin.Default()

    r.POST("/enqueue/email", func(c *gin.Context) {
        var payload struct {
            To string `json:"to"`
        }
        if err := c.ShouldBindJSON(&payload); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"})
            return
        }

        jsonPayload, err := json.Marshal(payload)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal payload"})
            return
        }

        task := asynq.NewTask("send_email", jsonPayload)
        _, err = client.Enqueue(task)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue task"})
            return
        }

        c.JSON(http.StatusOK, gin.H{"message": "Email job enqueued"})
    })

    r.POST("/enqueue/report", func(c *gin.Context) {
        var payload struct {
            ReportID int `json:"report_id"`
        }
        if err := c.ShouldBindJSON(&payload); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"})
            return
        }

        jsonPayload, err := json.Marshal(payload)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal payload"})
            return
        }

        task := asynq.NewTask("generate_report", jsonPayload)
        _, err = client.Enqueue(task)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue task"})
            return
        }

        c.JSON(http.StatusOK, gin.H{"message": "Report job enqueued"})
    })

    return r
}
登入後複製

此程式碼使用 Gin 框架公開兩個端點以用於排隊任務。

建立main.go

main.go 檔案將所有內容整合在一起:

package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/hibiken/asynq"
    "github.com/robfig/cron/v3"
)

func main() {
    c := cron.New()

    server := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{
            Concurrency: 10,
        },
    )

    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
    defer client.Close()

    router := setupRouter(client)

    httpServer := &http.Server{
        Addr:    ":8080",
        Handler: router,
    }

    // Prepare shutdown context
    ctx, stop := context.WithCancel(context.Background())
    defer stop()
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, os.Interrupt, syscall.SIGTERM)

    go runQueue(server)
    go runCron(c)
    go func() {
        if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("Failed to run HTTP server: %v", err)
        }
    }()

    appShutdown(ctx, httpServer, c, server, quit)
}

func appShutdown(ctx context.Context, httpServer *http.Server, c *cron.Cron, server *asynq.Server, quit chan os.Signal) {
    // Wait for termination signal
    <-quit
    log.Println("Shutting down gracefully...")

    httpCtx, httpCancel := context.WithTimeout(ctx, 5*time.Second)
    defer httpCancel()
    if err := httpServer.Shutdown(httpCtx); err != nil {
        log.Printf("HTTP server shutdown error: %v", err)
    }

    server.Shutdown()
    c.Stop()

    log.Println("Application stopped")
}
登入後複製

此檔案結合了佇列、cron、HTTP 伺服器和關閉邏輯。

安裝依賴項

安裝所有必要的依賴項:

go mod tidy
登入後複製

建置並運行應用程式

使用以下命令建置並執行應用程式:

go build -o run *.go && ./run
登入後複製

測試應用程式

存取以下端點將任務排隊:

  • http://localhost:8080/enqueue/email
  • http://localhost:8080/enqueue/report

觀察終端的任務執行日誌。

規範網址

欲了解更多詳細信息,請訪問我博客上的原始帖子。

以上是Go 中的 Redis 隊列和 Cron的詳細內容。更多資訊請關注PHP中文網其他相關文章!

來源:dev.to
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板