Maison > développement back-end > Golang > File d'attente Redis et Cron en Go

File d'attente Redis et Cron en Go

DDD
Libérer: 2024-12-31 04:40:23
original
860 Les gens l'ont consulté

Redis Queue and Cron in Go

Le message original est ici

Dans ce tutoriel, nous allons interagir avec une file d'attente et la placer sur un serveur Redis
en utilisant le package github.com/hibiken/asynq et créez un planificateur pour un
tâche planifiée à l'aide du package github.com/robfig/cron. Ce pas à pas
Le guide explique comment créer une file d'attente, planifier des tâches et gérer gracieusement
arrêts.

Initialiser le module

Commencez par créer un nouveau module Go pour le projet :

go mod init learn_queue_and_cron
Copier après la connexion

Créer cron.go

Le fichier cron.go est responsable de la planification et de l'exécution de tâches à des endroits spécifiques
intervalles. Ci-dessous la mise en œuvre :

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/robfig/cron/v3"
)

func runCron(c *cron.Cron) {

    // Schedule a task to run every minute
    _, err := c.AddFunc("@every 1m", func() {
        fmt.Printf("Task executed every minute at: %v \n", time.Now().Local())
    })
    if err != nil {
        log.Fatal(err)
    }

    // Start the cron scheduler
    c.Start()
    log.Println("Cron scheduler started")

    // Keep the main goroutine running
    select {}
}
Copier après la connexion

Ce code planifie l'exécution d'une tâche toutes les minutes et maintient l'application en cours d'exécution
pour garantir que le planificateur fonctionne en continu.

Créer une file d'attente.go

Le fichier queue.go gère le traitement des tâches à l'aide d'Asynq. Voici le code :

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    "github.com/hibiken/asynq"
)

func runQueue(server *asynq.Server) {
    mux := asynq.NewServeMux()
    mux.HandleFunc("send_email", emailHandler)
    mux.HandleFunc("generate_report", reportHandler)

    if err := server.Run(mux); err != nil {
        log.Fatalf("Failed to run Asynq server: %v", err)
    }
}

func emailHandler(ctx context.Context, task *asynq.Task) error {
    var payload struct {
        To string `json:"to"`
    }
    if err := json.Unmarshal(task.Payload(), &payload); err != nil {
        return fmt.Errorf("failed to unmarshal payload: %w", err)
    }
    fmt.Printf("Sending email to: %s\n", payload.To)
    return nil
}

func reportHandler(ctx context.Context, task *asynq.Task) error {
    var payload struct {
        ReportID int `json:"report_id"`
    }
    if err := json.Unmarshal(task.Payload(), &payload); err != nil {
        return fmt.Errorf("failed to unmarshal payload: %w", err)
    }
    fmt.Printf("Generating report for ID: %d\n", payload.ReportID)
    return nil
}
Copier après la connexion

Explication

  • Handlers : emailHandler et reportHandler traitent les tâches en analysant leurs charges utiles et exécutant les actions respectives.
  • File d'attente des tâches : Des tâches telles que "send_email" et "generate_report" sont définies et traité via la file d'attente des tâches d'Asynq.

Créer un routeur.go

Le fichier router.go configure les points de terminaison HTTP pour mettre les tâches en file d'attente :

package main

import (
    "encoding/json"
    "net/http"

    "github.com/gin-gonic/gin"
    "github.com/hibiken/asynq"
)

func setupRouter(client *asynq.Client) *gin.Engine {
    r := gin.Default()

    r.POST("/enqueue/email", func(c *gin.Context) {
        var payload struct {
            To string `json:"to"`
        }
        if err := c.ShouldBindJSON(&payload); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"})
            return
        }

        jsonPayload, err := json.Marshal(payload)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal payload"})
            return
        }

        task := asynq.NewTask("send_email", jsonPayload)
        _, err = client.Enqueue(task)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue task"})
            return
        }

        c.JSON(http.StatusOK, gin.H{"message": "Email job enqueued"})
    })

    r.POST("/enqueue/report", func(c *gin.Context) {
        var payload struct {
            ReportID int `json:"report_id"`
        }
        if err := c.ShouldBindJSON(&payload); err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"})
            return
        }

        jsonPayload, err := json.Marshal(payload)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to marshal payload"})
            return
        }

        task := asynq.NewTask("generate_report", jsonPayload)
        _, err = client.Enqueue(task)
        if err != nil {
            c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enqueue task"})
            return
        }

        c.JSON(http.StatusOK, gin.H{"message": "Report job enqueued"})
    })

    return r
}
Copier après la connexion

Ce code utilise le framework Gin pour exposer deux points de terminaison pour la mise en file d'attente des tâches.

Créer main.go

Le fichier main.go intègre le tout ensemble :

package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/hibiken/asynq"
    "github.com/robfig/cron/v3"
)

func main() {
    c := cron.New()

    server := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{
            Concurrency: 10,
        },
    )

    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
    defer client.Close()

    router := setupRouter(client)

    httpServer := &http.Server{
        Addr:    ":8080",
        Handler: router,
    }

    // Prepare shutdown context
    ctx, stop := context.WithCancel(context.Background())
    defer stop()
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, os.Interrupt, syscall.SIGTERM)

    go runQueue(server)
    go runCron(c)
    go func() {
        if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("Failed to run HTTP server: %v", err)
        }
    }()

    appShutdown(ctx, httpServer, c, server, quit)
}

func appShutdown(ctx context.Context, httpServer *http.Server, c *cron.Cron, server *asynq.Server, quit chan os.Signal) {
    // Wait for termination signal
    <-quit
    log.Println("Shutting down gracefully...")

    httpCtx, httpCancel := context.WithTimeout(ctx, 5*time.Second)
    defer httpCancel()
    if err := httpServer.Shutdown(httpCtx); err != nil {
        log.Printf("HTTP server shutdown error: %v", err)
    }

    server.Shutdown()
    c.Stop()

    log.Println("Application stopped")
}
Copier après la connexion

Ce fichier combine la file d'attente, le cron, le serveur HTTP et la logique d'arrêt.

Installer les dépendances

Installez toutes les dépendances requises :

go mod tidy
Copier après la connexion

Créer et exécuter l'application

Créez et exécutez l'application en utilisant :

go build -o run *.go && ./run
Copier après la connexion

Testez l'application

Visitez les points de terminaison suivants pour mettre les tâches en file d'attente :

  • http://localhost:8080/enqueue/email
  • http://localhost:8080/enqueue/report

Surveillez le terminal pour les journaux d'exécution des tâches.

URL canonique

Pour des informations plus détaillées, visitez l'article original sur mon blog.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:dev.to
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal