Maison > développement back-end > Golang > Résilience dans la communication entre les microservices à l'aide de la bibliothèque failsafe-go

Résilience dans la communication entre les microservices à l'aide de la bibliothèque failsafe-go

PHPz
Libérer: 2024-08-27 06:04:02
original
603 Les gens l'ont consulté

Resilience in communication between microservices using the failsafe-go lib

Commençons par le début. Qu’est-ce que la résilience ? J'aime la définition dans ce post :

La capacité intrinsèque d'un système à ajuster son fonctionnement avant, pendant ou après des changements et des perturbations, afin qu'il puisse maintenir les opérations requises dans des conditions attendues et inattendues

Comme il s'agit d'un terme large, je me concentrerai sur la communication entre microservices dans cet article. Pour ce faire, j'ai créé deux services en utilisant Go : serviceA et serviceB (ma créativité n'était pas grande lors de la rédaction de cet article).

Le code initial pour les deux était le suivant :

package main

// serviceA
import (
    "encoding/json"
    "io"
    "log/slog"
    "net/http"
    "os"

    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        resp, err := http.Get("http://localhost:3001")
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

Copier après la connexion
package main

//serviceB
import (
    "net/http"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
)

func main() {
    r := chi.NewRouter()
    r.Use(middleware.Logger)
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"message": "hello from service B"}`))
    })
    http.ListenAndServe(":3001", r)
}

Copier après la connexion

Comme vous pouvez le voir dans le code, si le serviceB a un problème, cela affectera le fonctionnement du serviceA, car il ne gère aucun échec de communication. Nous améliorerons cela en utilisant lib failsafe-go.

D'après la documentation sur le site officiel :

Failsafe-go est une bibliothèque permettant de créer des applications Go résilientes et tolérantes aux pannes. Il fonctionne en enveloppant les fonctions avec une ou plusieurs politiques de résilience, qui peuvent être combinées et composées selon les besoins.

Commençons par appliquer certaines politiques disponibles et tester leur composition.

Temps mort

La première politique que nous testerons est la plus simple, incluant un délai d'attente pour garantir que la connexion est interrompue si le serviceB met trop de temps à répondre et que le client sait pourquoi.

La première étape a été de modifier le serviceB afin qu'il inclue un délai afin de faciliter la démonstration du scénario :

package main
//serviceB
import (
    "net/http"
    "time"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
)

func main() {
    r := chi.NewRouter()
    r.Use(middleware.Logger)
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        time.Sleep(5 * time.Second) //add a delay to simulate a slow service
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"message": "hello from service B"}`))
    })
    http.ListenAndServe(":3001", r)
}

Copier après la connexion

Après avoir installé failsafe-go, en utilisant les commandes :

❯ cd serviceA
❯ go get github.com/failsafe-go/failsafe-go
Copier après la connexion

Le code de serviceA/main.go est :

package main

import (
    "encoding/json"
    "io"
    "log/slog"
    "net/http"
    "os"
    "time"

    "github.com/failsafe-go/failsafe-go"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/failsafe-go/failsafe-go/timeout"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        // Create a Timeout for 1 second
        timeout := newTimeout(logger)

        // Use the Timeout with a failsafe RoundTripper
        roundTripper := failsafehttp.NewRoundTripper(nil, timeout)
        client := &http.Client{Transport: roundTripper}
        resp, err := client.Get("http://localhost:3001")
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newTimeout(logger *slog.Logger) timeout.Timeout[*http.Response] {
    return timeout.Builder[*http.Response](1 * time.Second).
        OnTimeoutExceeded(func(e failsafe.ExecutionDoneEvent[*http.Response]) {
            logger.Info("Connection timed out")
        }).Build()
}

Copier après la connexion

Pour tester son fonctionnement, j'ai utilisé curl pour accéder au serviceA :

❯ curl -v http://localhost:3000
* Host localhost:3000 was resolved.
* IPv6: ::1
* IPv4: 127.0.0.1
*   Trying [::1]:3000...
* Connected to localhost (::1) port 3000
> GET / HTTP/1.1
> Host: localhost:3000
> User-Agent: curl/8.7.1
> Accept: */*
>
* Request completely sent off
< HTTP/1.1 500 Internal Server Error
< Date: Fri, 23 Aug 2024 19:43:23 GMT
< Content-Length: 45
< Content-Type: text/plain; charset=utf-8
<
* Connection #0 to host localhost left intact
Get "http://localhost:3001": timeout exceeded⏎
Copier après la connexion

Et le résultat suivant est généré par serviceA :

go run main.go
{"time":"2024-08-20T08:37:36.852886-03:00","level":"INFO","msg":"Connection timed out"}
{"time":"2024-08-20T08:37:36.856079-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-20T08:37:35.851262-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63409","referer":"","length":0},"response":{"time":"2024-08-20T08:37:36.856046-03:00","latency":1004819000,"status":500,"length":45},"id":""}
Copier après la connexion

De cette façon, il est possible de constater que le client (curl dans ce cas) a eu une réponse efficace et que le serviceA n'a pas eu d'impact significatif.

Améliorons la résilience de notre application en étudiant une autre stratégie avantageuse : réessayer.

Réessayer

Encore une fois, il a fallu apporter une modification au serviceB pour ajouter des erreurs aléatoires :

package main

import (
    "math/rand"
    "net/http"
    "strconv"
    "time"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
)

func main() {
    r := chi.NewRouter()
    r.Use(middleware.Logger)
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        retryAfterDelay := 1 * time.Second
        if fail() {
            w.Header().Add("Retry-After", strconv.Itoa(int(retryAfterDelay.Seconds())))
            w.WriteHeader(http.StatusServiceUnavailable)
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"message": "hello from service B"}`))
    })
    http.ListenAndServe(":3001", r)
}

func fail() bool {
    if flipint := rand.Intn(2); flipint == 0 {
        return true
    }
    return false
}

Copier après la connexion

Pour faciliter la compréhension, j'affiche une stratégie à la fois, c'est pourquoi serviceA a été remplacé par la version originale et non par la version avec un délai d'attente. Plus tard, nous examinerons comment composer plusieurs politiques pour rendre l’application plus résiliente.

Le code serviceA/main.go ressemblait à ceci :

package main

import (
    "encoding/json"
    "fmt"
    "io"
    "log/slog"
    "net/http"
    "os"
    "time"

    "github.com/failsafe-go/failsafe-go"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/failsafe-go/failsafe-go/retrypolicy"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        // Create a RetryPolicy that only handles 500 responses, with backoff delays between retries
        retryPolicy := newRetryPolicy(logger)

        // Use the RetryPolicy with a failsafe RoundTripper
        roundTripper := failsafehttp.NewRoundTripper(nil, retryPolicy)
        client := &http.Client{Transport: roundTripper}

        resp, err := client.Get("http://localhost:3001")
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newRetryPolicy(logger *slog.Logger) retrypolicy.RetryPolicy[*http.Response] {
    return retrypolicy.Builder[*http.Response]().
        HandleIf(func(response *http.Response, _ error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        WithBackoff(time.Second, 10*time.Second).
        OnRetryScheduled(func(e failsafe.ExecutionScheduledEvent[*http.Response]) {
            logger.Info(fmt.Sprintf("Retry %d after delay of %d", e.Attempts(), e.Delay))
        }).Build()
}

Copier après la connexion

De cette façon, si serviceB renvoie le statut StatusServiceUnavailable (code 503), la connexion sera retentée à intervalles progressifs, grâce à la configuration de la fonction WithBackoff. Le résultat de serviceA, lorsqu'il est accessible via curl, devrait ressembler à :

go run main.go
{"time":"2024-08-20T08:43:38.297621-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:43:38.283715-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63542","referer":"","length":0},"response":{"time":"2024-08-20T08:43:38.297556-03:00","latency":13840708,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:43:39.946562-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:43:39.943394-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63544","referer":"","length":0},"response":{"time":"2024-08-20T08:43:39.946545-03:00","latency":3151000,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:43:40.845862-03:00","level":"INFO","msg":"Retry 1 after delay of 1000000000"}
{"time":"2024-08-20T08:43:41.85287-03:00","level":"INFO","msg":"Retry 2 after delay of 2000000000"}
{"time":"2024-08-20T08:43:43.860694-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:43:40.841468-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63545","referer":"","length":0},"response":{"time":"2024-08-20T08:43:43.860651-03:00","latency":3019287458,"status":200,"length":71},"id":""}
Copier après la connexion

Dans cet exemple, il est possible de voir que des erreurs se sont produites lors de l'accès au serviceB et que la bibliothèque a réexécuté la connexion jusqu'à ce qu'elle réussisse. Si la connexion continue de générer une erreur, le client recevra un message d'erreur ”http://localhost:3001 : tentatives dépassées.

Approfondissons la résilience en ajoutant un disjoncteur à notre projet.

Disjoncteur

Le concept de disjoncteur est une politique plus avancée qui offre un meilleur contrôle sur l'accès aux services. Le disjoncteur modèle fonctionne dans trois états : fermé (pas d'erreur), ouvert (avec erreurs, interrompt la transmission) et semi-ouvert (envoie un nombre limité de requêtes au service en difficulté pour tester sa récupération).

Pour utiliser cette stratégie, j'ai créé une nouvelle version du serviceB afin qu'il puisse générer davantage de scénarios d'erreur et de retards :

package main

import (
    "math/rand"
    "net/http"
    "strconv"
    "time"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
)

func main() {
    r := chi.NewRouter()
    r.Use(middleware.Logger)
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        retryAfterDelay := 1 * time.Second
        if fail() {
            w.Header().Add("Retry-After", strconv.Itoa(int(retryAfterDelay.Seconds())))
            w.WriteHeader(http.StatusServiceUnavailable)
            return
        }
        if sleep() {
            time.Sleep(1 * time.Second)
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"message": "hello from service B"}`))
    })
    http.ListenAndServe(":3001", r)
}

func fail() bool {
    if flipint := rand.Intn(2); flipint == 0 {
        return true
    }
    return false
}

func sleep() bool {
    if flipint := rand.Intn(2); flipint == 0 {
        return true
    }
    return false
}

Copier après la connexion

Et le code de serviceA :

package main

import (
    "encoding/json"
    "fmt"
    "io"
    "log/slog"
    "net/http"
    "os"
    "time"

    "github.com/failsafe-go/failsafe-go/circuitbreaker"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        // Create a CircuitBreaker that handles 503 responses and uses a half-open delay based on the Retry-After header
        circuitBreaker := newCircuitBreaker(logger)

        // Use the RetryPolicy with a failsafe RoundTripper
        roundTripper := failsafehttp.NewRoundTripper(nil, circuitBreaker)
        client := &http.Client{Transport: roundTripper}

        sendGet := func() (*http.Response, error) {
            resp, err := client.Get("http://localhost:3001")
            return resp, err
        }
        maxRetries := 3
        resp, err := sendGet()
        for i := 0; i < maxRetries; i++ {
            if err == nil && resp != nil && resp.StatusCode != http.StatusServiceUnavailable && resp.StatusCode != http.StatusTooManyRequests {
                break
            }
            time.Sleep(circuitBreaker.RemainingDelay()) // Wait for circuit breaker's delay, provided by the Retry-After header
            resp, err = sendGet()
        }
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newCircuitBreaker(logger *slog.Logger) circuitbreaker.CircuitBreaker[*http.Response] {
    return circuitbreaker.Builder[*http.Response]().
        HandleIf(func(response *http.Response, err error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        WithDelayFunc(failsafehttp.DelayFunc).
        OnStateChanged(func(event circuitbreaker.StateChangedEvent) {
            logger.Info(fmt.Sprintf("circuit breaker state changed from %s to %s", event.OldState.String(), event.NewState.String()))
        }).
        Build()
}

Copier après la connexion

Comme nous pouvons le voir dans la sortie du serviceA, le disjoncteur fonctionne :

❯ go run main.go
{"time":"2024-08-20T08:51:37.770611-03:00","level":"INFO","msg":"circuit breaker state changed from closed to open"}
{"time":"2024-08-20T08:51:38.771682-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-20T08:51:38.776743-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-20T08:51:39.777821-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-20T08:51:39.784897-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-20T08:51:40.786209-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-20T08:51:40.792457-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to closed"}
{"time":"2024-08-20T08:51:40.792733-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:51:37.756947-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63699","referer":"","length":0},"response":{"time":"2024-08-20T08:51:40.792709-03:00","latency":3036065875,"status":200,"length":71},"id":""}
Copier après la connexion

Cette politique permet un meilleur contrôle sur les erreurs, permettant au serviceB de récupérer s'il rencontre un problème.

Mais que faire lorsque le serviceB ne peut plus revenir, pour une raison quelconque ? Dans ces cas-là, nous pouvons utiliser une solution de repli.

Retomber

L'idée de cette politique est d'avoir une alternative si le service souhaité présente un problème plus grave et met beaucoup de temps à revenir. Pour ce faire, nous allons changer le code serviceA :

package main

import (
    "bytes"
    "encoding/json"
    "io"
    "log/slog"
    "net/http"
    "os"

    "github.com/failsafe-go/failsafe-go"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/failsafe-go/failsafe-go/fallback"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        fallback := newFallback(logger)

        roundTripper := failsafehttp.NewRoundTripper(nil, fallback)
        client := &http.Client{Transport: roundTripper}

        resp, err := client.Get("http://localhost:3001")
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        type response struct {
            Message string `json:"message"`
        }
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newFallback(logger *slog.Logger) fallback.Fallback[*http.Response] {
    resp := &http.Response{
        StatusCode: http.StatusOK,
        Header:     map[string][]string{"Content-Type": {"application/json"}},
        Body:       io.NopCloser(bytes.NewBufferString(`{"message": "error accessing service B"}`)),
    }
    return fallback.BuilderWithResult[*http.Response](resp).
        HandleIf(func(response *http.Response, err error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        OnFallbackExecuted(func(e failsafe.ExecutionDoneEvent[*http.Response]) {
            logger.Info("Fallback executed result")
        }).
        Build()
}

Copier après la connexion

Dans la fonction newFallback, on peut voir la création d'une http.Response que la lib utilisera si l'utilisateur serviceB ne répond pas.

Cette fonctionnalité nous permet de répondre au client pendant que l'équipe responsable du serviceB a le temps de remettre le service en marche.

Le résultat de serviceA ressemble à ceci :

❯ go run main.go
{"time":"2024-08-20T08:55:27.326475-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:27.31306-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63772","referer":"","length":0},"response":{"time":"2024-08-20T08:55:27.326402-03:00","latency":13343208,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:55:31.756765-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:31.754348-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63774","referer":"","length":0},"response":{"time":"2024-08-20T08:55:31.756753-03:00","latency":2404750,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:55:34.091845-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:33.086273-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63775","referer":"","length":0},"response":{"time":"2024-08-20T08:55:34.091812-03:00","latency":1005580625,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:55:37.386512-03:00","level":"INFO","msg":"Fallback executed result"}
{"time":"2024-08-20T08:55:37.386553-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:37.38415-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63777","referer":"","length":0},"response":{"time":"2024-08-20T08:55:37.386544-03:00","latency":2393916,"status":200,"length":76},"id":""}
Copier après la connexion

In the next step, we will combine the concepts we've seen to create a more resilient application.

Policy composition

To do this, we need to change the code of serviceA so that it makes use of the policies we have seen so far:

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "log/slog"
    "net/http"
    "os"
    "time"

    "github.com/failsafe-go/failsafe-go"
    "github.com/failsafe-go/failsafe-go/circuitbreaker"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/failsafe-go/failsafe-go/fallback"
    "github.com/failsafe-go/failsafe-go/retrypolicy"
    "github.com/failsafe-go/failsafe-go/timeout"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        retryPolicy := newRetryPolicy(logger)
        fallback := newFallback(logger)
        circuitBreaker := newCircuitBreaker(logger)
        timeout := newTimeout(logger)

        roundTripper := failsafehttp.NewRoundTripper(nil, fallback, retryPolicy, circuitBreaker, timeout)
        client := &http.Client{Transport: roundTripper}

        sendGet := func() (*http.Response, error) {
            resp, err := client.Get("http://localhost:3001")
            return resp, err
        }
        maxRetries := 3
        resp, err := sendGet()
        for i := 0; i < maxRetries; i++ {
            if err == nil && resp != nil && resp.StatusCode != http.StatusServiceUnavailable && resp.StatusCode != http.StatusTooManyRequests {
                break
            }
            time.Sleep(circuitBreaker.RemainingDelay()) // Wait for circuit breaker's delay, provided by the Retry-After header
            resp, err = sendGet()
        }
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newTimeout(logger *slog.Logger) timeout.Timeout[*http.Response] {
    return timeout.Builder[*http.Response](10 * time.Second).
        OnTimeoutExceeded(func(e failsafe.ExecutionDoneEvent[*http.Response]) {
            logger.Info("Connection timed out")
        }).Build()
}

func newFallback(logger *slog.Logger) fallback.Fallback[*http.Response] {
    resp := &http.Response{
        StatusCode: http.StatusOK,
        Header:     map[string][]string{"Content-Type": {"application/json"}},
        Body:       io.NopCloser(bytes.NewBufferString(`{"message": "error accessing service B"}`)),
    }
    return fallback.BuilderWithResult[*http.Response](resp).
        HandleIf(func(response *http.Response, err error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        OnFallbackExecuted(func(e failsafe.ExecutionDoneEvent[*http.Response]) {
            logger.Info("Fallback executed result")
        }).
        Build()
}

func newRetryPolicy(logger *slog.Logger) retrypolicy.RetryPolicy[*http.Response] {
    return retrypolicy.Builder[*http.Response]().
        HandleIf(func(response *http.Response, _ error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        WithBackoff(time.Second, 10*time.Second).
        OnRetryScheduled(func(e failsafe.ExecutionScheduledEvent[*http.Response]) {
            logger.Info(fmt.Sprintf("Retry %d after delay of %d", e.Attempts(), e.Delay))
        }).Build()
}

func newCircuitBreaker(logger *slog.Logger) circuitbreaker.CircuitBreaker[*http.Response] {
    return circuitbreaker.Builder[*http.Response]().
        HandleIf(func(response *http.Response, err error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        WithDelayFunc(failsafehttp.DelayFunc).
        OnStateChanged(func(event circuitbreaker.StateChangedEvent) {
            logger.Info(fmt.Sprintf("circuit breaker state changed from %s to %s", event.OldState.String(), event.NewState.String()))
        }).
        Build()
}


Copier après la connexion

In the code:

roundTripper := failsafehttp.NewRoundTripper(nil, fallback, retryPolicy, circuitBreaker, timeout)
Copier après la connexion

It is possible to view the use of all defined policies. The lib will execute it in the "rightmost" order, that is:

timeout -> circuitBreaker -> retryPolicy -> fallback
Copier après la connexion

We can see the execution of the policies by observing the serviceA output:

go run main.go
{"time":"2024-08-19T10:15:29.226553-03:00","level":"INFO","msg":"circuit breaker state changed from closed to open"}
{"time":"2024-08-19T10:15:29.226841-03:00","level":"INFO","msg":"Retry 1 after delay of 1000000000"}
{"time":"2024-08-19T10:15:30.227941-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-19T10:15:30.234182-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-19T10:15:30.234258-03:00","level":"INFO","msg":"Retry 2 after delay of 2000000000"}
{"time":"2024-08-19T10:15:32.235282-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-19T10:15:42.23622-03:00","level":"INFO","msg":"Connection timed out"}
{"time":"2024-08-19T10:15:42.237942-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to closed"}
{"time":"2024-08-19T10:15:42.238043-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-19T10:15:29.215709-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:52527","referer":"","length":0},"response":{"time":"2024-08-19T10:15:42.238008-03:00","latency":13022704750,"status":500,"length":45},"id":""}
{"time":"2024-08-19T10:15:56.53476-03:00","level":"INFO","msg":"circuit breaker state changed from closed to open"}
{"time":"2024-08-19T10:15:56.534803-03:00","level":"INFO","msg":"Retry 1 after delay of 1000000000"}
{"time":"2024-08-19T10:15:57.535108-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-19T10:15:57.53889-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-19T10:15:57.538911-03:00","level":"INFO","msg":"Retry 2 after delay of 2000000000"}
{"time":"2024-08-19T10:15:59.539948-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-19T10:15:59.544425-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-19T10:15:59.544575-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-19T10:15:56.5263-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:52542","referer":"","length":0},"response":{"time":"2024-08-19T10:15:59.544557-03:00","latency":3018352000,"status":500,"length":245},"id":""}
{"time":"2024-08-19T10:16:11.044207-03:00","level":"INFO","msg":"Connection timed out"}
{"time":"2024-08-19T10:16:11.046026-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-19T10:16:01.043317-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:52544","referer":"","length":0},"response":{"time":"2024-08-19T10:16:11.045601-03:00","latency":10002596334,"status":500,"length":45},"id":""}
Copier après la connexion

Conclusion

One of the advantages of microservices architecture is that we can break a complex domain into smaller, specialized services that communicate with each other to complete the necessary logic. Ensuring that this communication is resilient and will continue to work even in the face of failures and unforeseen events is fundamental. Using libraries such as failsafe-go makes this process easier.

You can find the codes presented in this post on my Github.

Originally published at https://eltonminetto.dev on August 24, 2024

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

source:dev.to
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal