メッセージデータ送信におけるメッセージキュー(MQ、Message Queue)の保持の役割は、データ通信の保証とリアルタイム処理の利便性を提供します。ここでは、Python でのスレッドの MQ メッセージキューの実装とその分析について見ていきます。メッセージキューの利点
「メッセージキュー」は、送信中にメッセージを保存するコンテナです。メッセージ キュー マネージャーは、メッセージを送信元から宛先に中継するときに仲介者として機能します。キューの主な目的は、ルーティングを提供し、メッセージの配信を保証することです。メッセージの送信時に受信者が不在の場合、メッセージ キューはメッセージが正常に配信されるまでメッセージを保持します。メッセージ キューは、あらゆるアーキテクチャやアプリケーションにとって重要なコンポーネントであると私は考えています。以下に 10 の理由があります:
Python メッセージ キューの例:
#!/usr/bin/env python import Queue import threading import time queue = Queue.Queue() class ThreadNum(threading.Thread): """没打印一个数字等待1秒,并发打印10个数字需要多少秒?""" def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): whileTrue: #消费者端,从队列中获取num num = self.queue.get() print "i'm num %s"%(num) time.sleep(1) #在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号 self.queue.task_done() start = time.time() def main(): #产生一个 threads pool, 并把消息传递给thread函数进行处理,这里开启10个并发 for i in range(10): t = ThreadNum(queue) t.setDaemon(True) t.start() #往队列中填错数据 for num in range(10): queue.put(num) #wait on the queue until everything has been processed queue.join() main() print "Elapsed Time: %s" % (time.time() - start)
実行結果:
i'm num 0 i'm num 1 i'm num 2 i'm num 3 i'm num 4 i'm num 5 i'm num 6 i'm num 7 i'm num 8 i'm num 9 Elapsed Time: 1.01399993896
解釈:
具体的な作業手順は次のとおりです。
1. Queue.Queue() のインスタンスを作成し、データを入力します。
2. threading.Threadを継承して作成されたスレッドクラスに、入力されたデータインスタンスを渡します。
3. デーモンスレッドプールを生成します。
4. 毎回キューから 1 つのアイテムを取り出し、このスレッドのデータと実行メソッドを使用して、対応する作業を実行します。
5. この作業が完了したら、queue.task_done() 関数を使用して、タスクが完了したことを示すシグナルをキューに送信します。
6. キューで結合操作を実行するということは、実際には、キューが空になるまでメイン プログラムを終了するまで待機することを意味します。
このモードを使用するときに注意すべき点が 1 つあります。デーモン スレッドを true に設定すると、プログラムは実行後に自動的に終了します。利点は、キューに対して結合操作を実行したり、キューが空になるまで待ってから終了したりできることです。
いわゆる複数のキュー。1 つのキューの出力を別のキューの入力として使用できます
#!/usr/bin/env python import Queue import threading import time queue = Queue.Queue() out_queue = Queue.Queue() class ThreadNum(threading.Thread): def __init__(self, queue, out_queue): threading.Thread.__init__(self) self.queue = queue self.out_queue = out_queue def run(self): whileTrue: #从队列中取消息 num = self.queue.get() bkeep = num #将bkeep放入队列中 self.out_queue.put(bkeep) #signals to queue job is done self.queue.task_done() class PrintLove(threading.Thread): def __init__(self, out_queue): threading.Thread.__init__(self) self.out_queue = out_queue def run(self): whileTrue: #从队列中获取消息并赋值给bkeep bkeep = self.out_queue.get() keke = "I love " + str(bkeep) print keke, print self.getName() time.sleep(1) #signals to queue job is done self.out_queue.task_done() start = time.time() def main(): #populate queue with data for num in range(10): queue.put(num) #spawn a pool of threads, and pass them queue instance for i in range(5): t = ThreadNum(queue, out_queue) t.setDaemon(True) t.start() for i in range(5): pl = PrintLove(out_queue) pl.setDaemon(True) pl.start() #wait on the queue until everything has been processed queue.join() out_queue.join() main() print "Elapsed Time: %s" % (time.time() - start)
実行結果:
I love 0 Thread-6 I love 1 Thread-7 I love 2 Thread-8 I love 3 Thread-9 I love 4 Thread-10 I love 5 Thread-7 I love 6 Thread-6 I love 7 Thread-9 I love 8 Thread-8 I love 9 Thread-10 Elapsed Time: 2.00300002098
解釈:
ThreadNum クラスのワークフロー
定義キュー - -->スレッドの継承--->キューの初期化--->実行関数の定義--->キュー内のデータの取得--->データの処理--->データの挿入別のキューへ --->アイテムが処理されたことをキューに伝えるシグナルを送信します
メイン関数のワークフロー:
--->データをカスタムキューにスローします
---> for ループが開始されると決定される スレッドの数----> ThreadNum クラスをインスタンス化する----> スレッドを開始し、デーモンをセットアップする
---> ---->PrintLove クラスをインスタンス化します--- >スレッドを開始してガードとして設定します
--->キュー内のメッセージが処理されるのを待ってから結合を実行します。つまり、メインプログラムを終了します。
MQ の一般的な実装を理解した後、メッセージ キューの利点をまとめてみましょう:
プロジェクトの開始時に、将来のプロジェクトでどのようなニーズが発生するかを予測することは非常に困難です。メッセージ キューは、処理プロセスの途中に暗黙的なデータベースのインターフェイス層を挿入し、両側の処理プロセスはこのインターフェイスを実装する必要があります。これにより、同じインターフェイスの制約に従っている限り、両側のプロセスを独立して拡張または変更できます。
データの処理中にプロセスが失敗することがあります。データは永続化しない限り、永久に失われます。メッセージ キューは、データが完全に処理されるまで保持することで、データ損失のリスクを回避します。多くのメッセージ キューで使用される「挿入-取得-削除」パラダイムでは、キューからメッセージを削除する前に、処理プロセスでメッセージが処理されたことを明確に示し、データが安全であることを確認する必要があります。それを使用して完了しました。
メッセージ キューは処理を分離するため、追加の処理を追加するだけでメッセージのエンキューと処理の頻度を簡単に増やすことができます。コードを変更したりパラメータを調整したりする必要はありません。拡張は電源ボタンを押すだけで簡単です。
あなたのアプリケーションが Hacker News のホームページに掲載されていると、トラフィックが異常なレベルに達していることがわかります。訪問数が劇的に増加した場合でもアプリケーションは機能し続ける必要がありますが、このようなトラフィックのバーストはまれです。そのようなピークの訪問を処理できるという基準に基づいてリソースをスタンバイに投資するのは非常に無駄です。メッセージ キューを使用すると、要求の過負荷によって完全に機能不全に陥ることなく、重要なコンポーネントが増大するアクセス圧力に耐えることができます。 詳細については、ピーク処理能力に関するブログ投稿をご覧ください。
システムの一部のコンポーネントに障害が発生しても、システム全体には影響しません。メッセージ キューによりプロセス間の結合が軽減されるため、メッセージを処理するプロセスがハングアップした場合でも、キューに追加されたメッセージはシステムの回復後に引き続き処理できます。多くの場合、リクエストの再試行または延期を許可できるかどうかで、多少不便を感じるユーザーとイライラするユーザーの違いが決まります。
メッセージ キューによって提供される冗長メカニズムにより、1 つのプロセスがキューを読み取る限り、メッセージが実際に処理されることが保証されます。これに基づいて、IronMQ は「1 回のみの配信」保証を提供します。キューからデータを受信しているプロセスの数に関係なく、各メッセージは 1 回しか処理できません。これが可能になるのは、メッセージの取得が単にメッセージを「サブスクライブ」し、メッセージをキューから一時的に削除するだけであるためです。クライアントがメッセージの処理を終了したことを明示的に示さない限り、メッセージはキューに戻され、構成可能な時間が経過した後に再度処理できるようになります。
多くの場合、データが処理される順序が重要です。メッセージ キューは本質的にソートされており、データが特定の順序で処理されることを保証できます。 IronMO は、メッセージが FIFO (先入れ先出し) 順序で処理されることを保証するため、キュー内のメッセージの位置はメッセージが取得された位置になります。
重要なシステムには、さまざまな量の処理時間を必要とする要素が存在します。たとえば、画像の読み込みは、フィルターを適用するよりも時間がかかりません。メッセージ キューはバッファリング層を使用して、タスクを最も効率的に実行できるようにします。キューへの書き込みは、キューからの読み取りの準備処理に制約されることなく、できるだけ早く処理されます。このバッファリングは、システムを流れるデータの速度を制御し、最適化するのに役立ちます。
分散システムでは、ユーザーの操作にどのくらいの時間がかかるのか、またその理由を全体的に把握するのは大きな課題です。メッセージ シリーズは、データ フローが十分に最適化されていない、メッセージの処理頻度によってパフォーマンスが低下しているプロセスや領域を特定するのに役立ちます。
多くの場合、メッセージをすぐに処理したくない、またはその必要はありません。メッセージ キューは、メッセージをキューに入れてもすぐには処理できない非同期処理メカニズムを提供します。必要な数のメッセージをキューに入れて、必要なときに処理できます。
以上がMQ メッセージ キュー実装の概要と Python でのスレッドの長所と短所の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。