Python に詳しい方なら、Celery について聞いたことがあるでしょう。画像処理や電子メールの送信など、非同期でタスクを処理する場合によく使用される選択肢です。
何人かと話していると、多くの開発者が最初は Celery に魅力を感じているものの、プロジェクトの規模が大きくなり複雑さが増すにつれて、その興奮は薄れ始めていることに気付き始めました。正当な理由で Celery から離れる人もいますが、単純にセロリの核心を自分のニーズに合わせて調整できるほど深く探求していない人もいます。
このブログでは、一部の開発者が代替案を探し始めたり、カスタムのバックグラウンド ワーカー フレームワークを構築したりする理由の 1 つである公平な処理について説明したいと思います。ユーザー/テナントがさまざまなサイズのタスクを送信する環境では、あるテナントの重いワークロードが他のテナントに影響を与えるリスクがボトルネックを生み出し、フラストレーションを引き起こす可能性があります。
Celery で公平な処理を実装し、単一のテナントがリソースを独占できないようにバランスのとれたタスク分散を確保するための戦略について説明します。
マルチテナント アプリケーション、特にバッチ処理を処理するアプリケーションが直面する一般的な課題を詳しく見てみましょう。ユーザーが画像処理タスクをキューに入れて、少し待った後に処理済みの画像を受け取ることができるシステムがあると想像してください。この設定により、API の応答性が維持されるだけでなく、負荷を効率的に処理するために必要に応じてワーカーをスケールすることもできます。
あるテナントが処理のために画像の膨大なバッチを送信することを決定するまでは、すべてがスムーズに実行されます。複数のワーカーを配置し、需要の増加に合わせて自動スケールすることもできるため、インフラストラクチャに自信を持っています。ただし、問題は、他のテナントが小さなバッチ (おそらく数枚の画像のみ) をキューに入れようとしたときに突然始まり、更新が行われずに長い待ち時間に直面することになります。気づかないうちに、サポート チケットが殺到し始め、サービスが遅い、または応答しないというユーザーの苦情が寄せられます。
Celery はデフォルトでタスクを受け取った順に処理するため、このシナリオは非常に一般的です。 1 つのテナントが大量のタスクの流入でワーカーを圧倒している場合、最適な自動スケーリング戦略であっても、他のテナントの遅延を防ぐには十分ではない可能性があります。その結果、これらのユーザーは、約束または期待されたサービス レベルを下回る可能性があります。
公正な処理を確保するための効果的な戦略の 1 つは、レート制限を実装することです。これにより、各テナントが特定の時間枠内に送信できるタスクの数を制御できます。これにより、単一のテナントがワーカーを独占することがなくなり、すべてのテナントがタスクを処理する公平な機会を確保できるようになります。
Celery には、タスク レベルでレート制限を行うための機能が組み込まれています。
# app.py from celery import Celery app = Celery("app", broker="redis://localhost:6379/0") @app.task(rate_limit="10/m") # Limit to 10 tasks per minute def process_data(data): print(f"Processing data: {data}") # Call the task if __name__ == "__main__": for i in range(20): process_data.delay(f"data_{i}")
以下を実行してワーカーを実行できます:
celery -A app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1
次に、app.py スクリプトを実行して 20 のタスクをトリガーします。
python app.py
ローカルで実行できた場合は、レート制限が確実に適用されるように各タスク間に遅延があることがわかります。さて、あなたはおそらくこれが私たちの問題の解決にはあまり役に立たないと考えているでしょうが、あなたの言うことはまったく正しい。 Celery によるこの組み込みのレート制限は、タスクに厳しいレート制限がある外部サービスへの呼び出しが含まれる可能性があるシナリオに役立ちます。
この例では、組み込み機能が複雑なシナリオには単純すぎる可能性があることを強調しています。ただし、Celery のフレームワークをさらに詳しく調査することで、この制限を克服できます。テナントごとに自動再試行を使用して適切なレート制限を設定する方法を見てみましょう。
Redis を使用してテナントごとのレート制限を追跡します。 Redis は Celery の人気のあるデータベースおよびブローカーであるため、おそらくすでにスタックにあるこのコンポーネントを活用しましょう。
いくつかのライブラリをインポートしましょう:
import time import redis from celery import Celery, Task
次に、レート制限タスクのカスタム基本タスク クラスを実装します。
# Initialize a Redis client redis_client = redis.StrictRedis(host="localhost", port=6379, db=0) class RateLimitedTask(Task): def __init__(self, *args, **kwargs): # Set default rate limit if not hasattr(self, "custom_rate_limit"): self.custom_rate_limit = 10 super().__init__(*args, **kwargs) def __call__(self, tenant_id, *args, **kwargs): # Rate limiting logic key = f"rate_limit:{tenant_id}:{self.name}" # Increment the count for this minute current_count = redis_client.incr(key) if current_count == 1: # Set expiration for the key if it's the first request redis_client.expire(key, 10) if current_count > self.custom_rate_limit: print(f"Rate limit exceeded for tenant {tenant_id}. Retrying...") raise self.retry(countdown=10) return super().__call__(tenant_id, *args, **kwargs)
このカスタム クラスは、Redis を使用して特定のテナントによってトリガーされたタスクの量を追跡し、10 秒の TTL を設定します。レート制限を超えると、タスクは 10 秒後に再試行されます。したがって、基本的にデフォルトのレート制限は 10 秒以内に 10 タスクです。
処理をエミュレートするサンプル タスクを定義してみましょう:
@app.task(base=RateLimitedTask, custom_rate_limit=5) def process(tenant_id: int, data): """ Mock processing task that takes 0.3 seconds to complete. """ print(f"Processing data: {data} for tenant: {tenant_id}") time.sleep(0.3)
ここではプロセスタスクを定義しており、タスクレベルでcustom_rate_limitを変更できることがわかります。 Custom_rate_limit を指定しない場合は、デフォルト値の 10 が割り当てられます。 これで、レート制限が 10 秒以内に 5 つのタスクに変更されました。
次に、さまざまなテナントに対していくつかのタスクをトリガーしてみましょう:
if __name__ == "__main__": for i in range(20): process.apply_async(args=(1, f"data_{i}")) for i in range(10): process.apply_async(args=(2, f"data_{i}"))
テナント ID 1 には 20 個のタスク、テナント ID 2 には 10 個のタスクを定義しています。
完全なコードは次のようになります:
# app.py import time import redis from celery import Celery, Task app = Celery( "app", broker="redis://localhost:6379/0", broker_connection_retry_on_startup=False, ) # Initialize a Redis client redis_client = redis.StrictRedis(host="localhost", port=6379, db=0) class RateLimitedTask(Task): def __init__(self, *args, **kwargs): if not hasattr(self, "custom_rate_limit"): self.custom_rate_limit = 10 super().__init__(*args, **kwargs) def __call__(self, tenant_id, *args, **kwargs): # Rate limiting logic key = f"rate_limit:{tenant_id}:{self.name}" # Increment the count for this minute current_count = redis_client.incr(key) if current_count == 1: # Set expiration for the key if it's the first request redis_client.expire(key, 10) if current_count > self.custom_rate_limit: print(f"Rate limit exceeded for tenant {tenant_id}. Retrying...") raise self.retry(countdown=10) return super().__call__(tenant_id, *args, **kwargs) @app.task(base=RateLimitedTask, custom_rate_limit=5) def process(tenant_id: int, data): """ Mock processing task that takes 0.3 seconds to complete. """ print(f"Processing data: {data} for tenant: {tenant_id}") time.sleep(0.3) if __name__ == "__main__": for i in range(20): process.apply_async(args=(1, f"data_{i}")) for i in range(10): process.apply_async(args=(2, f"data_{i}"))
ワーカーを実行しましょう:
celery -A app worker --loglevel=warning --concurrency 1 --prefetch-multiplier 1
次に、app.py スクリプトを実行してタスクをトリガーします。
python app.py
ご覧のとおり、ワーカーは最初のテナントの 5 つのタスクを処理し、他のすべてのタスクに対して再試行を設定します。次に、2 番目のテナントの 5 つのタスクを取得し、他のタスクの再試行を設定し、続行します。
このアプローチでは、テナントごとにレート制限を定義できますが、この例でわかるように、非常に高速に実行されるタスクの場合、レート制限を厳密に設定しすぎると、ワーカーがしばらく何もしない状態になってしまいます。レート制限パラメーターを微調整することは非常に重要であり、特定のタスクと量に応じて異なります。最適なバランスが見つかるまで、ためらわずに試してみてください。
Celery のデフォルトのタスク処理がマルチテナント環境でどのように不公平を引き起こす可能性があるか、またレート制限がこの問題の解決にどのように役立つかを調査してきました。テナント固有のレート制限を実装することで、単一のテナントによるリソースの独占を防ぎ、処理能力のより公平な配分を確保できます。
このアプローチは、Celery で公正な処理を実現するための強固な基盤を提供します。ただし、マルチテナント アプリケーションでのタスク処理をさらに最適化するために検討する価値のある手法は他にもあります。当初は 1 つの投稿ですべてをカバーする予定でしたが、このトピックは非常に広範囲にわたることが判明しました。明確さを確保し、この記事の焦点を絞るため、記事を 2 つの部分に分割することにしました。
このシリーズの次のパートでは、公平性と効率を高める別のメカニズムとして、タスクの優先順位について詳しく説明します。このアプローチにより、さまざまな基準に基づいてタスクにさまざまな優先度レベルを割り当てることができるため、需要が高い期間であっても重要なタスクが迅速に処理されるようになります。
次回もお楽しみに!
以上がセロリによる公正な処理の確保 — パート Iの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。