Der vorherige Artikel stellte die Installation von Rabbitmq und die klassische Hallo-Welt vor! Beispiel. Hier erfahren Sie mehr über Arbeitswarteschlangen. Da es sich um eine Fortsetzung des vorherigen Artikels handelt, ist dieser Artikel möglicherweise schwer zu verstehen, wenn Sie den vorherigen Artikel nicht gelesen haben. Die Adresse des vorherigen Artikels lautet: So installieren Sie Rabbitmq und Python auf Ubuntu
Nachrichten können auch als Aufgaben verstanden werden, und der Absender der Nachricht kann als Aufgabenzuweiser verstanden werden und der Nachrichtenempfänger kann als Arbeiter verstanden werden. Wenn der Arbeiter eine Aufgabe empfängt und sie nicht abgeschlossen hat, sendet der Aufgabenzuteiler eine andere Aufgabe, sodass mehrere Arbeiter erforderlich sind, um diese Aufgaben gemeinsam zu erledigen werden Arbeitswarteschlangen genannt. Das Strukturdiagramm lautet wie folgt:
rabbitmqs Python-Instanz-Arbeitswarteschlange
Vorbereitung
Verwenden Sie im Beispielprogramm new_task.py, um den Aufgabenzuteiler zu simulieren, und worker.py, um den Worker zu simulieren.
Ändern Sie send.py, um Informationen von Befehlszeilenparametern zu empfangen und zu senden.
import sys message= ' '.join(sys.argv[1:])or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)
Ändern Sie die Rückruffunktion von require.py.
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done"
Öffnen Sie hier zwei Terminals, beide führen worker.py aus und befinden sich im Überwachungsstatus. Dies entspricht zwei Workern. Öffnen Sie das dritte Terminal, führen Sie new_task.py
$ python new_task.py First message. $ python new_task.py Second message.. $ python new_task.py Third message... $ python new_task.py Fourth message.... $ python new_task.py Fifth message.....
aus und beobachten Sie, dass worker.py 3 Aufgaben erhält:
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
Ein anderer Arbeiter hat 2 Aufgaben erhalten:
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
Aus der oben genannten Sicht werden jedem Arbeiter nacheinander Aufgaben zugewiesen. Wenn also ein Arbeiter während der Bearbeitung einer Aufgabe stirbt, ist die Aufgabe nicht erledigt und sollte an andere Arbeiter übergeben werden. Daher sollte es einen Mechanismus geben, der Feedback gibt, wenn ein Mitarbeiter eine Aufgabe erledigt.
Nachrichtenbestätigung
Nachrichtenbestätigung erfolgt, wenn der Arbeiter die Aufgabe abschließt, sie wird an Rabbitmq zurückgemeldet. Ändern Sie die Rückruffunktion in worker.py:
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(5) print " [x] Done" ch.basic_ack(delivery_tag= method.delivery_tag)
Halten Sie hier 5 Sekunden lang an, um das Beenden mit Strg+C zu erleichtern.
Sie können den Parameter no_ack=True auch entfernen oder auf False setzen.
channel.basic_consume(callback, queue='hello', no_ack=False)
Wenn Sie diesen Code ausführen, gehen die ausgeführten Aufgaben nicht verloren, auch wenn einer der Arbeiter mit Strg+C beendet wird und Rabbitmq die Aufgaben an andere Arbeiter neu verteilt.
Nachrichtenbeständigkeit (Nachrichtenbeständigkeit)
Obwohl es einen Nachrichten-Feedback-Mechanismus gibt, wenn Rabbitmq selbst hängt Lass es fallen, die Mission ist trotzdem verloren. Daher müssen Aufgaben dauerhaft gespeichert werden. Persistenten Speicher deklarieren:
channel.queue_declare(queue='hello', durable=True)
Aber dieses Programm führt einen Fehler aus, da die Hello-Warteschlange bereits vorhanden ist und die Verwendung anderer Parameter zum Neudefinieren vorhandener Warteschlangen nicht zulässt. Definieren Sie eine Warteschlange neu:
channel.queue_declare(queue='task_queue', durable=True)
Wenn Sie eine Aufgabe senden, verwenden Sie Delivery_mode=2, um die Aufgabe als dauerhaften Speicher zu markieren:
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode= 2,# make message persistent ))
Fairer Versand
Im obigen Beispiel werden zwar jedem Arbeiter nacheinander Aufgaben zugewiesen, nicht jedoch jeder Aufgabe zwangsläufig das Gleiche. Einige Aufgaben können schwerer sein und eine längere Ausführungszeit in Anspruch nehmen; andere können leichter sein und eine kürzere Ausführungszeit in Anspruch nehmen. Es wäre am besten, wenn es fair geplant werden kann, um prefetch_count=1 zu setzen, damit Rabbitmq nicht mehrere Aufgaben gleichzeitig an Worker zuweist. Das heißt, erst nachdem der Worker die Aufgabe abgeschlossen hat, erhält es die Aufgabe erneut .
channel.basic_qos(prefetch_count=1)
new_task.py vollständiger Code
#!/usr/bin/env python import pika import sys connection= pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel= connection.channel() channel.queue_declare(queue='task_queue', durable=True) message= ' '.join(sys.argv[1:])or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode= 2,# make message persistent )) print " [x] Sent %r" % (message,) connection.close()
worker.py vollständiger Code
#!/usr/bin/env python import pika import time connection= pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel= connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag= method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
Das Obige ist der Inhalt der Verwendung von Python Rabbitmq (2), Weitere verwandte Inhalte finden Sie auf der chinesischen PHP-Website (m.sbmmt.com)!