首頁 > 後端開發 > Golang > 在 Go 中使用 CloudEvents

在 Go 中使用 CloudEvents

Susan Sarandon
發布: 2024-12-04 22:40:21
原創
856 人瀏覽過

Using CloudEvents in Go

採用事件驅動架構(EDA)來提高可擴展性並減少元件/服務之間的耦合在複雜環境中相對常見。

雖然這種方法解決了許多問題,但團隊面臨的挑戰之一是標準化事件以確保所有組件之間的兼容性。為了緩解這項挑戰,我們可以使用 CloudEvents 專案。

該項目旨在成為標準化和描述事件的規範,帶來一致性、可訪問性和可移植性。另一個優點是,該專案除了作為規範之外,還提供了一系列 SDK 來加速團隊採用。

在這篇文章中,我想在一個虛構的專案中示範 Go SDK(Python SDK 的特殊外觀)的使用。

讓我們考慮一個由兩個微服務組成的環境:一個用於管理用戶 (CRUD) 的用戶,以及一個審核服務,用於在環境中存儲重要事件以供將來分析。

使用者服務的服務代碼如下:

package main

import (
    "context"
    "encoding/json"
    "log"
    "net/http"
    "time"

    cloudevents "github.com/cloudevents/sdk-go/v2"
    "github.com/cloudevents/sdk-go/v2/protocol"
    "github.com/go-chi/chi/v5"
    "github.com/go-chi/httplog"
    "github.com/google/uuid"
)

const auditService = "http://localhost:8080/"

func main() {
    logger := httplog.NewLogger("user", httplog.Options{
        JSON: true,
    })
    ctx := context.Background()
    ceClient, err := cloudevents.NewClientHTTP()
    if err != nil {
        log.Fatalf("failed to create client, %v", err)
    }

    r := chi.NewRouter()
    r.Use(httplog.RequestLogger(logger))
    r.Post("/v1/user", storeUser(ctx, ceClient))

    http.Handle("/", r)
    srv := &http.Server{
        ReadTimeout:  30 * time.Second,
        WriteTimeout: 30 * time.Second,
        Addr:         ":3000",
        Handler:      http.DefaultServeMux,
    }
    err = srv.ListenAndServe()
    if err != nil {
        logger.Panic().Msg(err.Error())
    }
}

type userRequest struct {
    ID       uuid.UUID
    Name     string `json:"name"`
    Password string `json:"password"`
}

func storeUser(ctx context.Context, ceClient cloudevents.Client) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        oplog := httplog.LogEntry(r.Context())

        var ur userRequest
        err := json.NewDecoder(r.Body).Decode(&ur)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            oplog.Error().Msg(err.Error())
            return
        }
        ur.ID = uuid.New()
        //TODO: store user in a database

        // Create an Event.
        event := cloudevents.NewEvent()
        event.SetSource("github.com/eminetto/post-cloudevents")
        event.SetType("user.storeUser")
        event.SetData(cloudevents.ApplicationJSON, map[string]string{"id": ur.ID.String()})

        // Set a target.
        ctx := cloudevents.ContextWithTarget(context.Background(), auditService)

        // Send that Event.
        var result protocol.Result
        if result = ceClient.Send(ctx, event); cloudevents.IsUndelivered(result) {
            oplog.Error().Msgf("failed to send, %v", result)
            w.WriteHeader(http.StatusInternalServerError)
            return
        }

        return
    }
}

登入後複製

在程式碼中,您可以看到事件的建立以及將其發送到審核服務,如下所示:

package main

import (
    "context"
    "fmt"
    "log"

    cloudevents "github.com/cloudevents/sdk-go/v2"
)

func receive(event cloudevents.Event) {
    // do something with event.
    fmt.Printf("%s", event)
}

func main() {
    // The default client is HTTP.
    c, err := cloudevents.NewClientHTTP()
    if err != nil {
        log.Fatalf("failed to create client, %v", err)
    }
    if err = c.StartReceiver(context.Background(), receive); err != nil {
        log.Fatalf("failed to start receiver: %v", err)
    }
}

登入後複製

透過執行這兩個服務,您可以透過向使用者發送請求來了解它們的工作原理:

curl -X "POST" "http://localhost:3000/v1/user" \
     -H 'Accept: application/json' \
     -H 'Content-Type: application/json' \
     -d $'{
  "name": "Ozzy Osbourne",
  "password": "12345"
}'

登入後複製

使用者輸出是:

{"level":"info","service":"user","httpRequest":{"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user"},"httpRequest":{"header":{"accept":"application/json","content-length":"52","content-type":"application/json","user-agent":"curl/8.7.1"},"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user","scheme":"http"},"timestamp":"2024-11-28T15:52:27.947355-03:00","message":"Request: POST /v1/user"}
{"level":"warn","service":"user","httpRequest":{"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user"},"httpResponse":{"bytes":0,"elapsed":2.33225,"status":0},"timestamp":"2024-11-28T15:52:27.949877-03:00","message":"Response: 0 Unknown"}

登入後複製

審核服務的輸出表示事件已收到。

❯ go run main.go
Context Attributes,
  specversion: 1.0
  type: user.storeUser
  source: github.com/eminetto/post-cloudevents
  id: 5190bc29-a3d5-4fca-9a88-85fccffc16b6
  time: 2024-11-28T18:53:17.474154Z
  datacontenttype: application/json
Data,
  {
    "id": "8aadf8c5-9c4e-4c11-af24-beac2fb9a4b7"
  }
登入後複製

為了驗證可移植性目標,我使用 Python SDK 實作了一個版本的稽核服務:

from flask import Flask, request

from cloudevents.http import from_http

app = Flask(__name__)


# create an endpoint at http://localhost:/3000/
@app.route("/", methods=["POST"])
def home():
    # create a CloudEvent
    event = from_http(request.headers, request.get_data())

    # you can access cloudevent fields as seen below
    print(
        f"Found {event['id']} from {event['source']} with type "
        f"{event['type']} and specversion {event['specversion']}"
    )

    return "", 204


if __name__ == "__main__":
    app.run(port=8080)

登入後複製

應用程式輸出顯示事件的接收,無需更改服務使用者:

(.venv) eminetto@Macbook-Air-de-Elton audit-python % python3 main.py
 * Serving Flask app 'main'
 * Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on http://127.0.0.1:8080
Press CTRL+C to quit
Found ce1abe22-dce5-40f0-8c82-12093b707ed7 from github.com/eminetto/post-cloudevents with type user.storeUser and specversion 1.0
127.0.0.1 - - [28/Nov/2024 15:59:31] "POST / HTTP/1.1" 204 -
登入後複製

前面的範例介紹了 CloudEvents SDK,但它違反了基於事件的架構的一個原則:鬆散耦合。應用程式用戶了解審核應用程式並與之綁定,這不是一個好的做法。我們可以透過使用其他 CloudEvents 功能(例如 pub/sub)或添加 Kafka 等功能來改善這種情況。以下範例使用 Kafka 來解耦兩個應用程式。

第一步是建立一個 docker-compose.yaml 來使用 Kafka:

services:
  kafka:
    image: bitnami/kafka:latest
    restart: on-failure
    ports:
      - 9092:9092
    environment:
      - KAFKA_CFG_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_NUM_PARTITIONS=3
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

  zookeeper:
    image: bitnami/zookeeper:latest
    ports:
      - 2181:2181
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

登入後複製

服務用戶進行了以下更改:

package main

import (
    "context"
    "encoding/json"
    "log"
    "net/http"
    "time"

    "github.com/IBM/sarama"
    "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
    cloudevents "github.com/cloudevents/sdk-go/v2"
    "github.com/go-chi/chi/v5"
    "github.com/go-chi/httplog"
    "github.com/google/uuid"
)

const (
    auditService = "127.0.0.1:9092"
    auditTopic   = "audit"
)

func main() {
    logger := httplog.NewLogger("user", httplog.Options{
        JSON: true,
    })
    ctx := context.Background()

    saramaConfig := sarama.NewConfig()
    saramaConfig.Version = sarama.V2_0_0_0

    sender, err := kafka_sarama.NewSender([]string{auditService}, saramaConfig, auditTopic)
    if err != nil {
        log.Fatalf("failed to create protocol: %s", err.Error())
    }

    defer sender.Close(context.Background())

    ceClient, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
    if err != nil {
        log.Fatalf("failed to create client, %v", err)
    }

    r := chi.NewRouter()
    r.Use(httplog.RequestLogger(logger))
    r.Post("/v1/user", storeUser(ctx, ceClient))

    http.Handle("/", r)
    srv := &http.Server{
        ReadTimeout:  30 * time.Second,
        WriteTimeout: 30 * time.Second,
        Addr:         ":3000",
        Handler:      http.DefaultServeMux,
    }
    err = srv.ListenAndServe()
    if err != nil {
        logger.Panic().Msg(err.Error())
    }
}

type userRequest struct {
    ID       uuid.UUID
    Name     string `json:"name"`
    Password string `json:"password"`
}

func storeUser(ctx context.Context, ceClient cloudevents.Client) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        oplog := httplog.LogEntry(r.Context())

        var ur userRequest
        err := json.NewDecoder(r.Body).Decode(&ur)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            oplog.Error().Msg(err.Error())
            return
        }
        ur.ID = uuid.New()
        //TODO: store user in a database

        // Create an Event.
        event := cloudevents.NewEvent()
        event.SetID(uuid.New().String())
        event.SetSource("github.com/eminetto/post-cloudevents")
        event.SetType("user.storeUser")
        event.SetData(cloudevents.ApplicationJSON, map[string]string{"id": ur.ID.String()})

        // Send that Event.
        if result := ceClient.Send(
            // Set the producer message key
            kafka_sarama.WithMessageKey(context.Background(), sarama.StringEncoder(event.ID())),
            event,
        ); cloudevents.IsUndelivered(result) {
            oplog.Error().Msgf("failed to send, %v", result)
            w.WriteHeader(http.StatusInternalServerError)
            return
        }

        return
    }
}


登入後複製

需要進行一些更改,主要是為了與 Kafka 建立連接,但事件本身沒有改變。

我對審核服務進行了類似的更改:

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/IBM/sarama"

    "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
    cloudevents "github.com/cloudevents/sdk-go/v2"
)

const (
    auditService = "127.0.0.1:9092"
    auditTopic   = "audit"
    auditGroupID = "audit-group-id"
)

func receive(event cloudevents.Event) {
    // do something with event.
    fmt.Printf("%s", event)
}

func main() {
    saramaConfig := sarama.NewConfig()
    saramaConfig.Version = sarama.V2_0_0_0

    receiver, err := kafka_sarama.NewConsumer([]string{auditService}, saramaConfig, auditGroupID, auditTopic)
    if err != nil {
        log.Fatalf("failed to create protocol: %s", err.Error())
    }

    defer receiver.Close(context.Background())

    c, err := cloudevents.NewClient(receiver)
    if err != nil {
        log.Fatalf("failed to create client, %v", err)
    }

    if err = c.StartReceiver(context.Background(), receive); err != nil {
        log.Fatalf("failed to start receiver: %v", err)
    }
}

登入後複製

應用程式的輸出保持不變。

引入 Kafka 後,我們解耦了應用程序,不再違反 EDA 原則,同時保留了 CloudEvents 提供的優勢。

這篇文章的目標是介紹該標準並示範使用 SDK 實現的簡易性。我可以更深入地討論這個主題,但我希望我已經實現了目標並激發了對該技術的研究和使用。

如果您已經使用/已經使用過 CloudEvents 並想在評論中分享您的經驗,這將非常有用。

您可以在 GitHub 上的儲存庫中找到我在這篇文章中提供的程式碼。

原於 2024 年 11 月 29 日發佈於 https://eltonminetto.dev。

以上是在 Go 中使用 CloudEvents的詳細內容。更多資訊請關注PHP中文網其他相關文章!

來源:dev.to
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
作者最新文章
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板