案例(二)-KisFlow-Golang流即時計算-流並行運算

PHPz
發布: 2024-07-16 17:28:10
原創
689 人瀏覽過

Case (II) - KisFlow-Golang Stream Real-Time Computing - Flow Parallel Operation

Github:https://github.com/aceld/kis-flow
文件:https://github.com/aceld/kis-flow/wiki


第 1 部分-概覽
Part2.1-專案建置/基礎模組
Part2.2-專案建置/基礎模組
第三部分-資料流
Part4-功能調度
第5部-連接器
Part6-配置導入與導出
Part7-KisFlow 動作
Part8-Cache/Params 資料快取和資料參數
Part9-流程的多份副本
Part10-Prometheus Metrics 統計量
Part11-基於反射的FaaS參數類型自適應註冊


案例1-快速入門
Case2-Flow並行運行
Case3-KisFlow在多Goroutine中的應用
案例4-訊息佇列(MQ)應用程式中的KisFlow


下載 KisFlow 原始碼

雷雷

KisFlow 開發者文件

原始碼範例

https://github.com/aceld/kis-flow-usage/tree/main/8-connector

KisFlow 可以透過 Connector 實現兩個流的組合

結合以下兩個流程,本次介紹將涵蓋Connector的介面和使用方法。

數據流程圖

Flow Diagram

案例介紹

假設一個學生有四個屬性:

雷雷

定義Flow1:CalStuAvgScore-1-2,計算學生學分1(score_1)和學分2(score_2)的平均分數(avg_score_1_2)。
定義 Flow2:CalStuAvgScore-3,計算學生的學分 3 的平均分數 (score_3) 和 avg_score_1_2,即學分 1、學分 2 和學分 3 的平均值。學分 1 和學分 2 的平均值由 Flow1 提供。

流程1

Flow1 由 4 個函數組成:

V(函數:VerifyStu)驗證StuId的有效性
C(函數:AvgStuScore12)計算學分1和學分2的平均分數
S(函數:SaveScoreAvg12)將avg_score_1_2儲存在Redis中
E(函數:PrintStuAvgScore)列印Credit 1和Credit 2的平均分數。

流程2

Flow2 由 4 個函數組成:

V(函數:VerifyStu)驗證StuId的有效性
L(函數:LoadScoreAvg12)讀取Flow1計算出的目前學生學分1和學分2的平均分數(avg_score_1_2)
C(函數:AvgStuScore3)計算學分3的平均分數以及學分1和學分2的平均分數
E(函數:PrintStuAvgScore)列印Credit 1、Credit 2和Credit 3的平均分數。

conf/func/func-AvgStuScore-3.yml

雷雷

conf/func/func-LoadScoreAvg-1-2.yml

雷雷

基本數據協議

stu_proto.go

雷雷

連接器初始化

本專案中定義的Connector Score12Cache是與Redis關聯的連結資源。此連接器需要一個初始化方法,用於在 KisFlow 啟動時建立連線。

conn_init.go

雷雷

這裡,連接成功的Redis實例儲存在連接器的快取變數「rdb」中。

雷雷

FaaS 實施

功能(五):VerifyStu

faas_stu_verify.go

雷雷

VerifyStu() 用於驗證資料。如果資料不符合要求,則終止目前資料流。最後,資料被重複使用,透過flow.Next(kis.ActionDataReuse)傳遞到下一層。

函數(C):AvgStuScore12

faas_avg_score_1_2.go

雷雷

AvgStuScore12()計算score_1和score_2的平均分數,得到avg_score。

函數(S):儲存ScoreAvg12

faas_save_score_avg_1_2.go

雷雷

SaveScoreAvg12() 透過綁定的 Connector 將資料儲存到 Redis 中,使用的是 Connector 中配置的 key。最後將來源資料透傳給下一個函數

函數(E):PrintStuAvgScore

faas_stu_score_avg_print.go

雷雷

PrintStuAvgScore() 列印目前學生的平均成績。

函數(L):LoadScoreAvg12

faas_load_score_avg_1_2.go

package main import ( "context" "fmt" "github.com/aceld/kis-flow/kis" "github.com/aceld/kis-flow/serialize" "github.com/go-redis/redis/v8" "strconv" ) type LoadStuScoreIn struct { serialize.DefaultSerialize StuScore3 } type LoadStuScoreOut struct { serialize.DefaultSerialize StuScore3 } func GetStuScoresByStuId(ctx context.Context, conn kis.Connector, stuId int) (float64, error) { var rdb *redis.Client // Get Redis Client rdb = conn.GetMetaData("rdb").(*redis.Client) // make key key := conn.GetConfig().Key + strconv.Itoa(stuId) // get data from redis result, err := rdb.HGetAll(ctx, key).Result() if err != nil { return 0, err } // get value avgScoreStr, ok := result["avg_score"] if !ok { return 0, fmt.Errorf("avg_score not found for stuId: %d", stuId) } // parse to float64 avgScore, err := strconv.ParseFloat(avgScoreStr, 64) if err != nil { return 0, err } return avgScore, nil } func LoadScoreAvg12(ctx context.Context, flow kis.Flow, rows []*LoadStuScoreIn) error { fmt.Printf("->Call Func LoadScoreAvg12\n") conn, err := flow.GetConnector() if err != nil { fmt.Printf("LoadScoreAvg12(): GetConnector err = %s\n", err.Error()) return err } for _, row := range rows { stuScoreAvg1_2, err := GetStuScoresByStuId(ctx, conn, row.StuId) if err != nil { fmt.Printf("LoadScoreAvg12(): GetStuScoresByStuId err = %s\n", err.Error()) return err } out := LoadStuScoreOut{ StuScore3: StuScore3{ StuId: row.StuId, Score3: row.Score3, AvgScore12: stuScoreAvg1_2, // avg score of score1 and score2 (load from redis) }, } // commit result _ = flow.CommitRow(out) } return flow.Next() }
登入後複製

LoadScoreAvg12() reads the average score of score_1 and score_2 from Redis through the linked resource Redis of the bound Connector using the key configured in the Connector. It then sends the source data from upstream, along with the newly read average score of score1 and score2, to the next layer.

Function(C): AvgStuScore3

faas_stu_score_avg_3.go

package main import ( "context" "fmt" "github.com/aceld/kis-flow/kis" "github.com/aceld/kis-flow/serialize" ) type AvgStuScore3In struct { serialize.DefaultSerialize StuScore3 } type AvgStuScore3Out struct { serialize.DefaultSerialize StuScoreAvg } func AvgStuScore3(ctx context.Context, flow kis.Flow, rows []*AvgStuScore3In) error { fmt.Printf("->Call Func AvgStuScore3\n") for _, row := range rows { out := AvgStuScore3Out{ StuScoreAvg: StuScoreAvg{ StuId: row.StuId, AvgScore: (float64(row.Score3) + row.AvgScore12*2) / 3, }, } // Submit result data _ = flow.CommitRow(out) } return flow.Next() }
登入後複製

AvgStuScore3() recalculates the average score of three scores by adding score_3 and the average score of score_1 and score_2, resulting in the final average score avg_score.

Register FaaS & CaaSInit/CaaS (Register Function/Connector)

main.go

func init() { // Register functions kis.Pool().FaaS("VerifyStu", VerifyStu) kis.Pool().FaaS("AvgStuScore12", AvgStuScore12) kis.Pool().FaaS("SaveScoreAvg12", SaveScoreAvg12) kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore) kis.Pool().FaaS("LoadScoreAvg12", LoadScoreAvg12) kis.Pool().FaaS("AvgStuScore3", AvgStuScore3) // Register connectors kis.Pool().CaaSInit("Score12Cache", InitScore12Cache) }
登入後複製

Main Process

main.go

package main import ( "context" "github.com/aceld/kis-flow/file" "github.com/aceld/kis-flow/kis" "sync" ) func RunFlowCalStuAvgScore12(ctx context.Context, flow kis.Flow) error { // Commit data _ = flow.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90}`) _ = flow.CommitRow(`{"stu_id":102, "score_1":100, "score_2":80}`) // Run the flow if err := flow.Run(ctx); err != nil { return err } return nil } func RunFlowCalStuAvgScore3(ctx context.Context, flow kis.Flow) error { // Commit data _ = flow.CommitRow(`{"stu_id":101, "score_3": 80}`) _ = flow.CommitRow(`{"stu_id":102, "score_3": 70}`) // Run the flow if err := flow.Run(ctx); err != nil { return err } return nil } func main() { ctx := context.Background() // Load Configuration from file if err := file.ConfigImportYaml("conf/"); err != nil { panic(err) } var wg sync.WaitGroup wg.Add(2) go func() { // Run flow1 concurrently defer wg.Done() flow1 := kis.Pool().GetFlow("CalStuAvgScore12") if flow1 == nil { panic("flow1 is nil") } if err := RunFlowCalStuAvgScore12(ctx, flow1); err != nil { panic(err) } }() go func() { // Run flow2 concurrently defer wg.Done() flow2 := kis.Pool().GetFlow("CalStuAvgScore3") if flow2 == nil { panic("flow2 is nil") } if err := RunFlowCalStuAvgScore3(ctx, flow2); err != nil { panic(err) } }() wg.Wait() return }
登入後複製

Two Goroutines are launched concurrently to execute Flow1 and Flow2, calculating the final average scores for student 101 and student 102.

Execution Results

===> Call Connector InitScore12Cache Connected to Redis: PONG Add FlowRouter FlowName=CalStuAvgScore12 ===> Call Connector InitScore12Cache Connected to Redis: PONG Add FlowRouter FlowName=CalStuAvgScore3 ->Call Func VerifyStu ->Call Func VerifyStu ->Call Func AvgStuScore12 ->Call Func LoadScoreAvg12 ->Call Func SaveScoreAvg12 ->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore12] stuid: [101], avg score: [95] stuid: [102], avg score: [90] ->Call Func AvgStuScore3 ->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore3] stuid: [101], avg score: [90] stuid: [102], avg score: [83.33333333333333]
登入後複製

In Flow[CalStuAvgScore3], we observe the final computed average scores for scores 1, 2, and 3.


Author: Aceld
GitHub: https://github.com/aceld

KisFlow Open Source Project Address: https://github.com/aceld/kis-flow

Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection


Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications

以上是案例(二)-KisFlow-Golang流即時計算-流並行運算的詳細內容。更多資訊請關注PHP中文網其他相關文章!

來源:dev.to
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板
關於我們 免責聲明 Sitemap
PHP中文網:公益線上PHP培訓,幫助PHP學習者快速成長!