Die Einführung einer ereignisgesteuerten Architektur (EDA) zur Erhöhung der Skalierbarkeit und zur Reduzierung der Kopplung zwischen Komponenten/Diensten ist in komplexen Umgebungen relativ häufig.
Während dieser Ansatz eine Reihe von Problemen löst, besteht eine der Herausforderungen für Teams darin, Ereignisse zu standardisieren, um die Kompatibilität zwischen allen Komponenten sicherzustellen. Um diese Herausforderung zu mildern, können wir das CloudEvents-Projekt nutzen.
Ziel des Projekts ist es, eine Spezifikation zur Standardisierung und Beschreibung von Ereignissen zu erstellen, die Konsistenz, Zugänglichkeit und Portabilität gewährleistet. Ein weiterer Vorteil besteht darin, dass das Projekt nicht nur eine Spezifikation ist, sondern auch eine Reihe von SDKs bereitstellt, um die Teamakzeptanz zu beschleunigen.
In diesem Beitrag möchte ich die Verwendung des Go SDK (mit einer besonderen Darstellung des Python SDK) in einem fiktiven Projekt demonstrieren.
Betrachten wir eine Umgebung, die aus zwei Mikrodiensten besteht: einem Benutzer, der Benutzer verwaltet (CRUD), und einem Prüfdienst, der wichtige Ereignisse in der Umgebung für zukünftige Analysen speichert.
Der Dienstcode des Benutzerdienstes lautet wie folgt:
package main import ( "context" "encoding/json" "log" "net/http" "time" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/protocol" "github.com/go-chi/chi/v5" "github.com/go-chi/httplog" "github.com/google/uuid" ) const auditService = "http://localhost:8080/" func main() { logger := httplog.NewLogger("user", httplog.Options{ JSON: true, }) ctx := context.Background() ceClient, err := cloudevents.NewClientHTTP() if err != nil { log.Fatalf("failed to create client, %v", err) } r := chi.NewRouter() r.Use(httplog.RequestLogger(logger)) r.Post("/v1/user", storeUser(ctx, ceClient)) http.Handle("/", r) srv := &http.Server{ ReadTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second, Addr: ":3000", Handler: http.DefaultServeMux, } err = srv.ListenAndServe() if err != nil { logger.Panic().Msg(err.Error()) } } type userRequest struct { ID uuid.UUID Name string `json:"name"` Password string `json:"password"` } func storeUser(ctx context.Context, ceClient cloudevents.Client) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { oplog := httplog.LogEntry(r.Context()) var ur userRequest err := json.NewDecoder(r.Body).Decode(&ur) if err != nil { w.WriteHeader(http.StatusBadRequest) oplog.Error().Msg(err.Error()) return } ur.ID = uuid.New() //TODO: store user in a database // Create an Event. event := cloudevents.NewEvent() event.SetSource("github.com/eminetto/post-cloudevents") event.SetType("user.storeUser") event.SetData(cloudevents.ApplicationJSON, map[string]string{"id": ur.ID.String()}) // Set a target. ctx := cloudevents.ContextWithTarget(context.Background(), auditService) // Send that Event. var result protocol.Result if result = ceClient.Send(ctx, event); cloudevents.IsUndelivered(result) { oplog.Error().Msgf("failed to send, %v", result) w.WriteHeader(http.StatusInternalServerError) return } return } }
Im Code können Sie die Erstellung eines Ereignisses und dessen Übermittlung an den Prüfungsdienst sehen, was wie folgt aussieht:
package main import ( "context" "fmt" "log" cloudevents "github.com/cloudevents/sdk-go/v2" ) func receive(event cloudevents.Event) { // do something with event. fmt.Printf("%s", event) } func main() { // The default client is HTTP. c, err := cloudevents.NewClientHTTP() if err != nil { log.Fatalf("failed to create client, %v", err) } if err = c.StartReceiver(context.Background(), receive); err != nil { log.Fatalf("failed to start receiver: %v", err) } }
Wenn Sie beide Dienste ausführen, können Sie sehen, wie sie funktionieren, indem Sie eine Anfrage an den Benutzer senden:
curl -X "POST" "http://localhost:3000/v1/user" \ -H 'Accept: application/json' \ -H 'Content-Type: application/json' \ -d $'{ "name": "Ozzy Osbourne", "password": "12345" }'
Die Benutzerausgabe lautet:
{"level":"info","service":"user","httpRequest":{"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user"},"httpRequest":{"header":{"accept":"application/json","content-length":"52","content-type":"application/json","user-agent":"curl/8.7.1"},"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user","scheme":"http"},"timestamp":"2024-11-28T15:52:27.947355-03:00","message":"Request: POST /v1/user"} {"level":"warn","service":"user","httpRequest":{"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user"},"httpResponse":{"bytes":0,"elapsed":2.33225,"status":0},"timestamp":"2024-11-28T15:52:27.949877-03:00","message":"Response: 0 Unknown"}
Die Ausgabe des Prüfdienstes zeigt den Eingang des Ereignisses.
❯ go run main.go Context Attributes, specversion: 1.0 type: user.storeUser source: github.com/eminetto/post-cloudevents id: 5190bc29-a3d5-4fca-9a88-85fccffc16b6 time: 2024-11-28T18:53:17.474154Z datacontenttype: application/json Data, { "id": "8aadf8c5-9c4e-4c11-af24-beac2fb9a4b7" }
Um das Portabilitätsziel zu validieren, habe ich das Python SDK verwendet, um eine Version des Prüfdienstes zu implementieren:
from flask import Flask, request from cloudevents.http import from_http app = Flask(__name__) # create an endpoint at http://localhost:/3000/ @app.route("/", methods=["POST"]) def home(): # create a CloudEvent event = from_http(request.headers, request.get_data()) # you can access cloudevent fields as seen below print( f"Found {event['id']} from {event['source']} with type " f"{event['type']} and specversion {event['specversion']}" ) return "", 204 if __name__ == "__main__": app.run(port=8080)
Die Anwendungsausgabe zeigt den Empfang des Ereignisses, ohne dass Änderungen am Dienstbenutzer erforderlich sind:
(.venv) eminetto@Macbook-Air-de-Elton audit-python % python3 main.py * Serving Flask app 'main' * Debug mode: off WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. * Running on http://127.0.0.1:8080 Press CTRL+C to quit Found ce1abe22-dce5-40f0-8c82-12093b707ed7 from github.com/eminetto/post-cloudevents with type user.storeUser and specversion 1.0 127.0.0.1 - - [28/Nov/2024 15:59:31] "POST / HTTP/1.1" 204 -
Das vorherige Beispiel stellt die CloudEvents SDKs vor, verstößt jedoch gegen ein Prinzip ereignisbasierter Architekturen: die Kopplung lockern. Der Anwendungsbenutzer ist sich der Überwachungsanwendung bewusst und an sie gebunden, was keine gute Praxis ist. Wir können diese Situation verbessern, indem wir andere CloudEvents-Funktionen wie Pub/Sub verwenden oder etwas wie Kafka hinzufügen. Im folgenden Beispiel wird Kafka verwendet, um die beiden Anwendungen zu entkoppeln.
Der erste Schritt bestand darin, eine docker-compose.yaml zur Verwendung von Kafka zu erstellen:
services: kafka: image: bitnami/kafka:latest restart: on-failure ports: - 9092:9092 environment: - KAFKA_CFG_BROKER_ID=1 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_CFG_NUM_PARTITIONS=3 - ALLOW_PLAINTEXT_LISTENER=yes depends_on: - zookeeper zookeeper: image: bitnami/zookeeper:latest ports: - 2181:2181 environment: - ALLOW_ANONYMOUS_LOGIN=yes
Die folgende Änderung gab es beim Dienstbenutzer:
package main import ( "context" "encoding/json" "log" "net/http" "time" "github.com/IBM/sarama" "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/go-chi/chi/v5" "github.com/go-chi/httplog" "github.com/google/uuid" ) const ( auditService = "127.0.0.1:9092" auditTopic = "audit" ) func main() { logger := httplog.NewLogger("user", httplog.Options{ JSON: true, }) ctx := context.Background() saramaConfig := sarama.NewConfig() saramaConfig.Version = sarama.V2_0_0_0 sender, err := kafka_sarama.NewSender([]string{auditService}, saramaConfig, auditTopic) if err != nil { log.Fatalf("failed to create protocol: %s", err.Error()) } defer sender.Close(context.Background()) ceClient, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) if err != nil { log.Fatalf("failed to create client, %v", err) } r := chi.NewRouter() r.Use(httplog.RequestLogger(logger)) r.Post("/v1/user", storeUser(ctx, ceClient)) http.Handle("/", r) srv := &http.Server{ ReadTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second, Addr: ":3000", Handler: http.DefaultServeMux, } err = srv.ListenAndServe() if err != nil { logger.Panic().Msg(err.Error()) } } type userRequest struct { ID uuid.UUID Name string `json:"name"` Password string `json:"password"` } func storeUser(ctx context.Context, ceClient cloudevents.Client) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { oplog := httplog.LogEntry(r.Context()) var ur userRequest err := json.NewDecoder(r.Body).Decode(&ur) if err != nil { w.WriteHeader(http.StatusBadRequest) oplog.Error().Msg(err.Error()) return } ur.ID = uuid.New() //TODO: store user in a database // Create an Event. event := cloudevents.NewEvent() event.SetID(uuid.New().String()) event.SetSource("github.com/eminetto/post-cloudevents") event.SetType("user.storeUser") event.SetData(cloudevents.ApplicationJSON, map[string]string{"id": ur.ID.String()}) // Send that Event. if result := ceClient.Send( // Set the producer message key kafka_sarama.WithMessageKey(context.Background(), sarama.StringEncoder(event.ID())), event, ); cloudevents.IsUndelivered(result) { oplog.Error().Msgf("failed to send, %v", result) w.WriteHeader(http.StatusInternalServerError) return } return } }
Einige Änderungen waren nötig, hauptsächlich um eine Verbindung zu Kafka herzustellen, aber das Ereignis selbst änderte sich nicht.
Ich habe eine ähnliche Änderung am Prüfungsdienst vorgenommen:
package main import ( "context" "fmt" "log" "github.com/IBM/sarama" "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" cloudevents "github.com/cloudevents/sdk-go/v2" ) const ( auditService = "127.0.0.1:9092" auditTopic = "audit" auditGroupID = "audit-group-id" ) func receive(event cloudevents.Event) { // do something with event. fmt.Printf("%s", event) } func main() { saramaConfig := sarama.NewConfig() saramaConfig.Version = sarama.V2_0_0_0 receiver, err := kafka_sarama.NewConsumer([]string{auditService}, saramaConfig, auditGroupID, auditTopic) if err != nil { log.Fatalf("failed to create protocol: %s", err.Error()) } defer receiver.Close(context.Background()) c, err := cloudevents.NewClient(receiver) if err != nil { log.Fatalf("failed to create client, %v", err) } if err = c.StartReceiver(context.Background(), receive); err != nil { log.Fatalf("failed to start receiver: %v", err) } }
Die Ausgabe der Anwendungen bleibt gleich.
Durch die Einbeziehung von Kafka haben wir die Anwendungen entkoppelt, wodurch die Prinzipien von EDA nicht mehr verletzt werden und gleichzeitig die Vorteile von CloudEvents erhalten bleiben.
Ziel dieses Beitrags war es, den Standard vorzustellen und die einfache Implementierung mithilfe der SDKs zu demonstrieren. Ich könnte das Thema ausführlicher behandeln, hoffe aber, dass ich das Ziel erreicht und die Forschung und Nutzung der Technologie inspiriert habe.
Es wäre sehr nützlich, wenn Sie CloudEvents bereits nutzen/genutzt haben und Ihre Erfahrungen in den Kommentaren teilen möchten.
Die Codes, die ich in diesem Beitrag vorgestellt habe, finden Sie im Repository auf GitHub.
Ursprünglich veröffentlicht unter https://eltonminetto.dev am 29. November 2024.
Das obige ist der detaillierte Inhalt vonVerwenden von CloudEvents in Go. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!