확장성을 높이고 구성요소/서비스 간의 결합을 줄이기 위해 이벤트 중심 아키텍처(EDA)를 채택하는 것은 복잡한 환경에서 비교적 일반적입니다.
이 접근 방식은 여러 문제를 해결하지만 팀이 직면한 과제 중 하나는 이벤트를 표준화하여 모든 구성 요소 간의 호환성을 보장하는 것입니다. 이 문제를 완화하기 위해 CloudEvents 프로젝트를 사용할 수 있습니다.
이 프로젝트는 이벤트를 표준화 및 설명하고 일관성, 접근성 및 이식성을 제공하기 위한 사양을 목표로 합니다. 또 다른 장점은 프로젝트가 사양일 뿐만 아니라 팀 채택을 가속화하기 위해 일련의 SDK를 제공한다는 것입니다.
이 게시물에서는 가상 프로젝트에서 Go SDK(Python SDK의 특별한 모습 포함)를 사용하는 방법을 시연하고 싶습니다.
두 개의 마이크로서비스, 즉 사용자를 관리하는 사용자(CRUD)와 향후 분석을 위해 중요한 이벤트를 환경에 저장하는 감사 서비스로 구성된 환경을 생각해 보겠습니다.
사용자 서비스의 서비스 코드는 다음과 같습니다.
package main import ( "context" "encoding/json" "log" "net/http" "time" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/protocol" "github.com/go-chi/chi/v5" "github.com/go-chi/httplog" "github.com/google/uuid" ) const auditService = "http://localhost:8080/" func main() { logger := httplog.NewLogger("user", httplog.Options{ JSON: true, }) ctx := context.Background() ceClient, err := cloudevents.NewClientHTTP() if err != nil { log.Fatalf("failed to create client, %v", err) } r := chi.NewRouter() r.Use(httplog.RequestLogger(logger)) r.Post("/v1/user", storeUser(ctx, ceClient)) http.Handle("/", r) srv := &http.Server{ ReadTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second, Addr: ":3000", Handler: http.DefaultServeMux, } err = srv.ListenAndServe() if err != nil { logger.Panic().Msg(err.Error()) } } type userRequest struct { ID uuid.UUID Name string `json:"name"` Password string `json:"password"` } func storeUser(ctx context.Context, ceClient cloudevents.Client) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { oplog := httplog.LogEntry(r.Context()) var ur userRequest err := json.NewDecoder(r.Body).Decode(&ur) if err != nil { w.WriteHeader(http.StatusBadRequest) oplog.Error().Msg(err.Error()) return } ur.ID = uuid.New() //TODO: store user in a database // Create an Event. event := cloudevents.NewEvent() event.SetSource("github.com/eminetto/post-cloudevents") event.SetType("user.storeUser") event.SetData(cloudevents.ApplicationJSON, map[string]string{"id": ur.ID.String()}) // Set a target. ctx := cloudevents.ContextWithTarget(context.Background(), auditService) // Send that Event. var result protocol.Result if result = ceClient.Send(ctx, event); cloudevents.IsUndelivered(result) { oplog.Error().Msgf("failed to send, %v", result) w.WriteHeader(http.StatusInternalServerError) return } return } }
코드에서는 다음과 같이 이벤트 생성 및 감사 서비스로의 전송을 볼 수 있습니다.
package main import ( "context" "fmt" "log" cloudevents "github.com/cloudevents/sdk-go/v2" ) func receive(event cloudevents.Event) { // do something with event. fmt.Printf("%s", event) } func main() { // The default client is HTTP. c, err := cloudevents.NewClientHTTP() if err != nil { log.Fatalf("failed to create client, %v", err) } if err = c.StartReceiver(context.Background(), receive); err != nil { log.Fatalf("failed to start receiver: %v", err) } }
두 서비스를 모두 실행하면 사용자에게 요청을 보내 서비스가 어떻게 작동하는지 확인할 수 있습니다.
curl -X "POST" "http://localhost:3000/v1/user" \ -H 'Accept: application/json' \ -H 'Content-Type: application/json' \ -d $'{ "name": "Ozzy Osbourne", "password": "12345" }'
사용자 출력은 다음과 같습니다.
{"level":"info","service":"user","httpRequest":{"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user"},"httpRequest":{"header":{"accept":"application/json","content-length":"52","content-type":"application/json","user-agent":"curl/8.7.1"},"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user","scheme":"http"},"timestamp":"2024-11-28T15:52:27.947355-03:00","message":"Request: POST /v1/user"} {"level":"warn","service":"user","httpRequest":{"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user"},"httpResponse":{"bytes":0,"elapsed":2.33225,"status":0},"timestamp":"2024-11-28T15:52:27.949877-03:00","message":"Response: 0 Unknown"}
감사 서비스의 출력은 이벤트 수신을 보여줍니다.
❯ go run main.go Context Attributes, specversion: 1.0 type: user.storeUser source: github.com/eminetto/post-cloudevents id: 5190bc29-a3d5-4fca-9a88-85fccffc16b6 time: 2024-11-28T18:53:17.474154Z datacontenttype: application/json Data, { "id": "8aadf8c5-9c4e-4c11-af24-beac2fb9a4b7" }
이식성 목표를 검증하기 위해 Python SDK를 사용하여 감사 서비스 버전을 구현했습니다.
from flask import Flask, request from cloudevents.http import from_http app = Flask(__name__) # create an endpoint at http://localhost:/3000/ @app.route("/", methods=["POST"]) def home(): # create a CloudEvent event = from_http(request.headers, request.get_data()) # you can access cloudevent fields as seen below print( f"Found {event['id']} from {event['source']} with type " f"{event['type']} and specversion {event['specversion']}" ) return "", 204 if __name__ == "__main__": app.run(port=8080)
애플리케이션 출력에는 서비스 사용자를 변경할 필요 없이 이벤트 수신이 표시됩니다.
(.venv) eminetto@Macbook-Air-de-Elton audit-python % python3 main.py * Serving Flask app 'main' * Debug mode: off WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. * Running on http://127.0.0.1:8080 Press CTRL+C to quit Found ce1abe22-dce5-40f0-8c82-12093b707ed7 from github.com/eminetto/post-cloudevents with type user.storeUser and specversion 1.0 127.0.0.1 - - [28/Nov/2024 15:59:31] "POST / HTTP/1.1" 204 -
이전 예에서는 CloudEvents SDK를 소개했지만 이벤트 기반 아키텍처의 원칙인 느슨한 결합을 위반했습니다. 애플리케이션 사용자는 감사 애플리케이션을 인식하고 이에 연결되어 있지만 이는 좋은 관행이 아닙니다. 게시/구독과 같은 다른 CloudEvents 기능을 사용하거나 Kafka와 같은 기능을 추가하여 이러한 상황을 개선할 수 있습니다. 다음 예에서는 Kafka를 사용하여 두 애플리케이션을 분리합니다.
첫 번째 단계는 Kafka를 사용하기 위해 하나의 docker-compose.yaml을 만드는 것이었습니다.
services: kafka: image: bitnami/kafka:latest restart: on-failure ports: - 9092:9092 environment: - KAFKA_CFG_BROKER_ID=1 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_CFG_NUM_PARTITIONS=3 - ALLOW_PLAINTEXT_LISTENER=yes depends_on: - zookeeper zookeeper: image: bitnami/zookeeper:latest ports: - 2181:2181 environment: - ALLOW_ANONYMOUS_LOGIN=yes
서비스 사용자가 다음과 같이 변경되었습니다.
package main import ( "context" "encoding/json" "log" "net/http" "time" "github.com/IBM/sarama" "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/go-chi/chi/v5" "github.com/go-chi/httplog" "github.com/google/uuid" ) const ( auditService = "127.0.0.1:9092" auditTopic = "audit" ) func main() { logger := httplog.NewLogger("user", httplog.Options{ JSON: true, }) ctx := context.Background() saramaConfig := sarama.NewConfig() saramaConfig.Version = sarama.V2_0_0_0 sender, err := kafka_sarama.NewSender([]string{auditService}, saramaConfig, auditTopic) if err != nil { log.Fatalf("failed to create protocol: %s", err.Error()) } defer sender.Close(context.Background()) ceClient, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) if err != nil { log.Fatalf("failed to create client, %v", err) } r := chi.NewRouter() r.Use(httplog.RequestLogger(logger)) r.Post("/v1/user", storeUser(ctx, ceClient)) http.Handle("/", r) srv := &http.Server{ ReadTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second, Addr: ":3000", Handler: http.DefaultServeMux, } err = srv.ListenAndServe() if err != nil { logger.Panic().Msg(err.Error()) } } type userRequest struct { ID uuid.UUID Name string `json:"name"` Password string `json:"password"` } func storeUser(ctx context.Context, ceClient cloudevents.Client) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { oplog := httplog.LogEntry(r.Context()) var ur userRequest err := json.NewDecoder(r.Body).Decode(&ur) if err != nil { w.WriteHeader(http.StatusBadRequest) oplog.Error().Msg(err.Error()) return } ur.ID = uuid.New() //TODO: store user in a database // Create an Event. event := cloudevents.NewEvent() event.SetID(uuid.New().String()) event.SetSource("github.com/eminetto/post-cloudevents") event.SetType("user.storeUser") event.SetData(cloudevents.ApplicationJSON, map[string]string{"id": ur.ID.String()}) // Send that Event. if result := ceClient.Send( // Set the producer message key kafka_sarama.WithMessageKey(context.Background(), sarama.StringEncoder(event.ID())), event, ); cloudevents.IsUndelivered(result) { oplog.Error().Msgf("failed to send, %v", result) w.WriteHeader(http.StatusInternalServerError) return } return } }
주로 Kafka와의 연결을 위해 몇 가지 변경이 필요했지만 이벤트 자체는 변경되지 않았습니다.
감사 서비스에도 비슷한 변경을 했습니다.
package main import ( "context" "fmt" "log" "github.com/IBM/sarama" "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" cloudevents "github.com/cloudevents/sdk-go/v2" ) const ( auditService = "127.0.0.1:9092" auditTopic = "audit" auditGroupID = "audit-group-id" ) func receive(event cloudevents.Event) { // do something with event. fmt.Printf("%s", event) } func main() { saramaConfig := sarama.NewConfig() saramaConfig.Version = sarama.V2_0_0_0 receiver, err := kafka_sarama.NewConsumer([]string{auditService}, saramaConfig, auditGroupID, auditTopic) if err != nil { log.Fatalf("failed to create protocol: %s", err.Error()) } defer receiver.Close(context.Background()) c, err := cloudevents.NewClient(receiver) if err != nil { log.Fatalf("failed to create client, %v", err) } if err = c.StartReceiver(context.Background(), receive); err != nil { log.Fatalf("failed to start receiver: %v", err) } }
애플리케이션의 출력은 동일하게 유지됩니다.
Kafka를 포함하면서 CloudEvents가 제공하는 이점을 유지하면서 더 이상 EDA 원칙을 위반하지 않고 애플리케이션을 분리했습니다.
이 게시물의 목표는 표준을 소개하고 SDK를 사용한 구현의 용이성을 보여주는 것이었습니다. 주제를 더 깊이 다룰 수 있지만 기술의 목적과 영감을 주는 연구 및 사용을 달성했기를 바랍니다.
이미 CloudEvents를 사용하고 계시거나, 댓글로 경험을 공유하고 싶으시다면 매우 유용할 것입니다.
이 게시물에서 제가 제시한 코드는 GitHub 저장소에서 찾을 수 있습니다.
원본은 2024년 11월 29일 https://eltonminetto.dev에 게시되었습니다.
위 내용은 Go에서 CloudEvents 사용의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!