サードパーティ API と対話する分散アプリケーションがあるシナリオを想像してみましょう。通常、サードパーティ API には、クライアントがリクエストをバーストさせてサービスのダウンタイムを引き起こすことを避けるために、レート制限制御メカニズムが備わっています。このようなシナリオでは、呼び出し元は分散環境でサードパーティ API への送信リクエストの速度をどのように制御できるでしょうか?この投稿では、この問題に対する考えられる戦略について説明します。
リクエストのレートを制御するアルゴリズムは複数ありますが、理解と実装が比較的簡単なため、ここではトークン バケット アルゴリズムに焦点を当てます。このアルゴリズムは、バケットは最大T個のトークンを保持でき、アプリケーションがサードパーティ API にリクエストを行う場合は、1を必要とすることを示しています。バケットからのトークン。バケットが空の場合は、バケット内に少なくとも1個のトークンが存在するまで待機する必要があります。また、バケットにはRトークン/ミリ秒の固定レートで1トークンが補充されます。
トークン バケット アルゴリズムは非常に簡単に理解できますが、分散環境でこれを使用してサードパーティ API への送信リクエストを制御するにはどうすればよいでしょうか?
分散環境で送信レート制限を制御したい場合は、現在のレート制限に関する信頼できる一元的な情報源が必要です。真実の情報源を実装するには複数の方法がありますが、私は考えられる実装を示して次の図を理想化しました。
上の図では、複数のポッドに分散アプリケーションがあり、各ポッドはサードパーティ API にリクエストを行うことができます。アプリケーション インフラストラクチャには、トークン バケット アルゴリズムを使用してレート制限を制御するTCP サーバーがあります。サードパーティ API にリクエストを行う前に、ポッドは TCP サーバーに新しいトークンを要求し、使用可能なトークンが少なくとも 1 つ存在するまで、TCP サーバーからの応答を待ちます。トークンが利用可能になった後、ポッドはサードパーティ API にリクエストを作成します。
TCP サーバーの実装は、このリポジトリ https://github.com/rafaquelhodev/rlimit/ にあります。次のセクションでは、golang でのトークン バケットの実装について簡単に説明します。
以下に、トークン バケットの実装の背後にある主なアイデアを示します。詳細な実装を理解するには、https://github.com/rafaquelhodev/rlimit/ リポジトリをご覧ください。
レート制限制御は TokenBucket 構造体に集中されています:
type TokenBucket struct { id string mu sync.Mutex tokens int64 maxTokens int64 refillPeriod int64 cron chan bool subs []chan bool }
TokenBucket 構造体に subs プロパティがあることがわかります。基本的に、これは特定のトークン バケットのサブスクライバーの配列です。クライアントからトークンがリクエストされるたびに、クライアントは subs 配列に追加され、新しいトークンがバケットに追加されるとクライアントに通知されます。
バケットを開始するとき、バケットがサポートできるトークンの最大数 (maxTokens) と、トークンがバケットに追加される時間 (refillPeriod) を指定する必要があります。
func newTokenBucket(id string, maxTokens int64, refillPeriod int64) *TokenBucket { bucket := &TokenBucket{ id: id, tokens: 0, maxTokens: maxTokens, refillPeriod: refillPeriod, cron: make(chan bool), subs: make([]chan bool, 0), } fmt.Printf("refill period = %d\n", refillPeriod) bucket.startCron() return bucket }
ここで、「トークンはどのようにしてバケットに追加されるのか?」と疑問に思うかもしれません。そのため、バケットの作成時に cron ジョブが開始され、refillPeriod ミリ秒ごとに新しいトークンがバケットに追加されます。
func (tb *TokenBucket) startCron() { ticker := time.NewTicker(time.Duration(tb.refillPeriod) * time.Millisecond) go func() { for { select { case <-tb.cron: ticker.Stop() return case <-ticker.C: if tb.tokens < tb.maxTokens { tb.tokens += 1 fmt.Printf("[TOKEN REFIL] | currTokens = %d\n", tb.tokens) if len(tb.subs) > 0 { sub := tb.subs[0] tb.subs = tb.subs[1:] sub <- true } } } } }() }
最後に、クライアントがバケットからトークンを取得したい場合は、waitAvailable 関数を呼び出す必要があります。
func (tb *TokenBucket) waitAvailable() bool { tb.mu.Lock() if tb.tokens > 0 { fmt.Printf("[CONSUMING TOKEN] - id = %s\n", tb.id) tb.tokens -= 1 tb.mu.Unlock() return true } fmt.Printf("[WAITING TOKEN] - id %s\n", tb.id) ch := tb.tokenSubscribe() tb.mu.Unlock() <-ch fmt.Printf("[NEW TOKEN AVAILABLED] - id %s\n", tb.id) tb.tokens -= 1 return true }
https://github.com/Mohamed-khattab/Token-bucket-rate-limiter からインスピレーションを得た
以上が送信レート制限の制御の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。