ホームページ > バックエンド開発 > Golang > Go の Redis Queue と Cron

Go の Redis Queue と Cron

DDD
リリース: 2024-12-31 04:40:23
オリジナル
858 人が閲覧しました

Redis Queue and Cron in Go

元の投稿はこちら

このチュートリアルでは、キューを操作し、それを Redis サーバーに配置します
github.com/hibiken/asynq パッケージを使用して、
のスケジューラーを作成します。 github.com/robfig/cron パッケージを使用してスケジュールされたタスク。このステップバイステップ
このガイドでは、キューの設定、タスクのスケジュール設定、およびグレースフルな処理の方法について説明します
シャットダウン

モジュールの初期化

プロジェクト用に新しい Go モジュールを作成することから始めます:

go mod init learn_queue_and_cron
ログイン後にコピー

cron.goを作成する

cron.go ファイルは、特定の時間でのタスクのスケジュールと実行を担当します
間隔。以下は実装です:

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/robfig/cron/v3"
)

func runCron(c *cron.Cron) {

    // Schedule a task to run every minute
    _, err := c.AddFunc("@every 1m", func() {
        fmt.Printf("Task executed every minute at: %v \n", time.Now().Local())
    })
    if err != nil {
        log.Fatal(err)
    }

    // Start the cron scheduler
    c.Start()
    log.Println("Cron scheduler started")

    // Keep the main goroutine running
    select {}
}
ログイン後にコピー

このコードは、毎分実行するタスクをスケジュールし、アプリケーションの実行を維持します
スケジューラが継続的に動作するようにします。

queue.go を作成する

queue.go ファイルは、Asynch を使用してタスク処理を管理します。コードは次のとおりです:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

func runQueue(server *asynq.Server) {
    mux := asynq.NewServeMux()
    mux.HandleFunc("send_email", emailHandler)
    mux.HandleFunc("generate_report", reportHandler)

    if err := server.Run(mux); err != nil {
        log.Fatalf("Failed to run Asynq server: %v", err)
    }
}

func emailHandler(ctx context.Context, task *asynq.Task) error {
    var payload struct {
        To string `json:"to"`
    }
    if err := json.Unmarshal(task.Payload(), &payload); err != nil {
        return fmt.Errorf("failed to unmarshal payload: %w", err)
    }
    fmt.Printf("Sending email to: %s\n", payload.To)
    return nil
}

func reportHandler(ctx context.Context, task *asynq.Task) error {
    var payload struct {
        ReportID int `json:"report_id"`
    }
    if err := json.Unmarshal(task.Payload(), &payload); err != nil {
        return fmt.Errorf("failed to unmarshal payload: %w", err)
    }
    fmt.Printf("Generating report for ID: %d\n", payload.ReportID)
    return nil
}
ログイン後にコピー

説明

  • ハンドラー: emailHandler と reportHandler は解析によってタスクを処理します ペイロードとそれぞれのアクションの実行。
  • タスクキュー: 「send_email」や「generate_report」などのタスクが定義されています そして、Asynch のタスク キューを介して処理されます。

router.go を作成する

router.go ファイルは、タスクをキューに入れるための HTTP エンドポイントを設定します。

package main

import (
    "encoding/json"
    "net/http"

    "github.com/gin-gonic/gin"
    "github.com/hibiken/asynq"
)

func setupRouter(client *asynq.Client) *gin.Engine {
    r := gin.Default()

    r.POST("/enqueue/email", func(c *gin.Context) {
        var payload struct {
            To string `json:"to"`
        }
        if err := c.ShouldBindJSON(&payload); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"})
            return
        }

        jsonPayload, err := json.Marshal(payload)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal payload"})
            return
        }

        task := asynq.NewTask("send_email", jsonPayload)
        _, err = client.Enqueue(task)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue task"})
            return
        }

        c.JSON(http.StatusOK, gin.H{"message": "Email job enqueued"})
    })

    r.POST("/enqueue/report", func(c *gin.Context) {
        var payload struct {
            ReportID int `json:"report_id"`
        }
        if err := c.ShouldBindJSON(&payload); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"})
            return
        }

        jsonPayload, err := json.Marshal(payload)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal payload"})
            return
        }

        task := asynq.NewTask("generate_report", jsonPayload)
        _, err = client.Enqueue(task)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue task"})
            return
        }

        c.JSON(http.StatusOK, gin.H{"message": "Report job enqueued"})
    })

    return r
}
ログイン後にコピー

このコードは、Gin フレームワークを使用して、タスクをキューに入れるための 2 つのエンドポイントを公開します。

main.go を作成する

main.go ファイルはすべてを統合します:

package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/hibiken/asynq"
    "github.com/robfig/cron/v3"
)

func main() {
    c := cron.New()

    server := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{
            Concurrency: 10,
        },
    )

    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
    defer client.Close()

    router := setupRouter(client)

    httpServer := &http.Server{
        Addr:    ":8080",
        Handler: router,
    }

    // Prepare shutdown context
    ctx, stop := context.WithCancel(context.Background())
    defer stop()
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, os.Interrupt, syscall.SIGTERM)

    go runQueue(server)
    go runCron(c)
    go func() {
        if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("Failed to run HTTP server: %v", err)
        }
    }()

    appShutdown(ctx, httpServer, c, server, quit)
}

func appShutdown(ctx context.Context, httpServer *http.Server, c *cron.Cron, server *asynq.Server, quit chan os.Signal) {
    // Wait for termination signal
    <-quit
    log.Println("Shutting down gracefully...")

    httpCtx, httpCancel := context.WithTimeout(ctx, 5*time.Second)
    defer httpCancel()
    if err := httpServer.Shutdown(httpCtx); err != nil {
        log.Printf("HTTP server shutdown error: %v", err)
    }

    server.Shutdown()
    c.Stop()

    log.Println("Application stopped")
}
ログイン後にコピー

このファイルは、キュー、cron、HTTP サーバー、シャットダウン ロジックを組み合わせたものです。

依存関係のインストール

必要な依存関係をすべてインストールします:

go mod tidy
ログイン後にコピー

アプリケーションを構築して実行する

以下を使用してアプリケーションを構築して実行します。

go build -o run *.go && ./run
ログイン後にコピー

アプリケーションをテストする

次のエンドポイントにアクセスしてタスクをキューに入れます:

  • http://localhost:8080/enqueue/email
  • http://localhost:8080/enqueue/report

ターミナルでタスクの実行ログを監視します。

正規の URL

さらに詳しい情報については、私のブログの元の投稿をご覧ください。

以上がGo の Redis Queue と Cronの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:dev.to
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート