在上一篇文章中,我們展示如何從 MQTT 代理程式接收物聯網設備資料。在這篇文章中,我們將把資料儲存到資料庫中。
在一個強大的系統中,我們可以選擇將原始資料事件儲存在資料湖中。也許,我們將來會對此進行探索;但為了簡單起見,現在我們將其儲存在 PostGres 中。
上一篇文章示範了接收原始資料並將其解組到已經用 gorm 標籤註釋的結構中。 Gorm 是 Go 的流行 ORM。如果您不熟悉,可以在這裡了解更多。
type IoTDeviceMessage struct { BaseModel Time time.Time `json:"time" gorm:"index"` DeviceID string `json:"device_id"` DeviceType string `json:"device_type"` DeviceData json.RawMessage `json:"device_data"` }
所以我們需要做的就是配置 Postgres 連接,然後使用 gorm 保存事件資料。
func setupPostgres(logger *zerolog.Logger) *Repository { dbHost := os.Getenv("POSTGRES_HOST") dbName := os.Getenv("POSTGRES_DB") dbPort := os.Getenv("POSTGRES_PORT") dbUser := os.Getenv("POSTGRES_USER") dbPassword := os.Getenv("POSTGRES_PASSWORD") dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=disable TimeZone=UTC", dbHost, dbUser, dbPassword, dbName, dbPort) logger.Info().Msg(fmt.Sprintf("Connecting to PostgreSQL at %s", dsn)) db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) if err != nil { logger.Fatal().Err(err).Msg("failed to connect to database") } // Auto-migrate the schema err = db.AutoMigrate(&IoTDataEvent{}, &IoTRawDeviceMessage{}, &DeviceModel{}, &TempRHDevice{}) if err != nil { logger.Fatal().Err(err).Msg("failed to migrate models") } sqlDB, err := db.DB() sqlDB.SetMaxIdleConns(10) sqlDB.SetMaxOpenConns(100) sqlDB.SetConnMaxLifetime(time.Hour) repo := NewRepository(db, logger) return repo }
在這裡我們設定 Postgres 連線。請注意,我們使用環境變數來儲存敏感資訊。對於生產系統來說,這都是一個很好的實踐,無論它們是否容器化。
我們也初始化一個名為 Repository 的結構。這個結構體包含我們實際的儲存和檢索方法。這為我們提供了與 postgres 配置的一些分離。
type Repository struct { db *gorm.DB logger *zerolog.Logger } func NewRepository(db *gorm.DB, logger *zerolog.Logger) *Repository { return &Repository{db: db, logger: logger} } func (r *Repository) Close() { sqlDb, err := r.db.DB() if err != nil { r.logger.Error().Err(err).Msg("failed to close database") return } _ = sqlDb.Close() } ... // Message-related functions func (r *Repository) CreateMessage(message *IoTDeviceMessage) error { return r.db.Create(message).Error } func (r *Repository) GetMessagesByDeviceID(deviceID uint, limit int) ([]IoTDeviceMessage, error) { var messages []IoTDeviceMessage err := r.db.Where("device_id = ?", deviceID).Order("timestamp desc").Limit(limit).Find(&messages).Error return messages, err } func (r *Repository) DeleteMessagesByDeviceID(deviceID uint) error { return r.db.Where("device_id = ?", deviceID).Delete(&IoTDeviceMessage{}).Error }
現在只需要保留該訊息即可。由於我們使用管道模式來處理訊息,因此我們將在管道中添加持久步驟一個新階段。
// pipeline stage to persist the message func persistIoTEvent(ctx context.Context, logger *zerolog.Logger, repo *Repository, input <-chan IoTRawDeviceMessage) chan IoTRawDeviceMessage { out := make(chan IoTRawDeviceMessage) go func() { defer close(out) for iotMsg := range input { logger.Info().Msg(fmt.Sprintf("Persist iot msg for device: %s", iotMsg.DeviceID)) err := repo.CreateMessage(&iotMsg) if err != nil { logger.Error().Err(err).Msg("Error creating IoTRawDeviceMessage") } } }() return out } ... finalChan := persistIoTEvent(ctx, logger, repo, processMsg(ctx, logger, mqttMsgChan)) for iotMsg := range finalChan { // now we have the IoTRawDeviceMessage that has been persisted logger.Info().Msg(fmt.Sprintf("Received iot msg: %+v", iotMsg)) // do something like check for alert conditions }
這就是全部。
可以在此處找到此程式碼。您可以將其與上一篇文章中的相同發布商代碼一起使用。請務必將您的 Postgres 設定配置為環境變數。
以上是儲存物聯網設備數據的詳細內容。更多資訊請關注PHP中文網其他相關文章!