首頁 > 後端開發 > Python教學 > Python rabbitmq的使用(二)

Python rabbitmq的使用(二)

黄舟
發布: 2017-01-17 14:51:41
原創
1315 人瀏覽過

上一篇介紹了rabbitmq的安裝和經典的hello world!實例。這裡將對工作隊列(Work Queues)做一個了解。因為是接上一篇說明的,所以如果沒看過上一篇,看這篇可能會比較難懂。上一篇的地址是:ubuntu安裝rabbitmq和python的使用實作


訊息也可以理解為任務,訊息發送者可以理解為任務分配者,訊息接收者可以理解為工作者,當工作者接收到一個任務,還沒完成的時候,任務分配者又發一個任務過來,那就忙不過來了,於是就需要多個工作者來共同處理這些任務,這些工作者,就稱為工作隊列。結構圖如下:

Python rabbitmq的使用(二)

rabbitmq的python實例工作佇列


準備工作(Preparation)


準備工作(Preparation)


工作者。

修改send.py,從命令列參數接收訊息,並發送

import sys
message= ' '.join(sys.argv[1:])or "Hello World!"
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
print " [x] Sent %r" % (message,)
登入後複製

修改receive.py的回呼函數。

import time
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
登入後複製

這邊先打開兩個終端,都運行worker.py,處於監聽狀態,這邊就相當於兩個工作者。打開第三個終端,執行new_task.py

$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....
登入後複製

觀察worker.py接收到任務,其中一個工作者接收到3個任務:

$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'
登入後複製

另外一個工作者接收到2個任務:

$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message..'
[x] Received 'Fourth message....'
登入後複製

從上面來看,每個工作者,都會依序分配到任務。那如果一個工作者,在處理任務的時候掛掉,這個任務就沒有完成,應當交由其他工作者處理。所以應當有一種機制,當一個工作者完成任務時,就會回饋訊息。

訊息確​​認(Message acknowledgment)



訊息確​​認就是當工作者完成任務後,會回饋給rabbitmq。修改worker.py中的回呼函數:

def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep(5)
print " [x] Done"
ch.basic_ack(delivery_tag= method.delivery_tag)
登入後複製

這邊停頓5秒,可以方便ctrl+c退出。

去除no_ack=True參數或設定為False也可以。

channel.basic_consume(callback, queue='hello', no_ack=False)
登入後複製

用這個程式碼運行,即使其中一個工作者ctrl+c退出後,正在執行的任務也不會遺失,rabbitmq會將任務重新分配給其他工作者。


訊息持久化儲存(Message durability)

雖然有了訊息回饋機制,但是如果rabbitmq自身掛掉的話,那麼任務還是會遺失。所以需要將任務持久化儲存起來。聲明持久化儲存:
channel.queue_declare(queue='hello', durable=True)
登入後複製
但是這個程式會執行錯誤,因為hello這個佇列已經存在,並且是非持久化的,rabbitmq不允許使用不同的參數來重新定義存在的佇列。重新定義一個佇列:

channel.queue_declare(queue='task_queue', durable=True)
登入後複製


在發送任務的時候,用delivery_mode=2來標記任務為持久化儲存:

channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode= 2,# make message persistent
))
登入後複製

中,雖然每個工作者是依序分配到任務,但是每個任務不一定一樣。可能有的任務比較重,執行時間比較久;有的任務比較輕,執行時間比較短。如果能公平調度就最好了,使用basic_qos設定prefetch_count=1,使得rabbitmq不會在同一時間給工作者分配多個任務,即只有工作者完成任務之後,才會再次接收到任務。

channel.basic_qos(prefetch_count=1)
登入後複製

new_task.py完整程式碼

#!/usr/bin/env python
import pika
import sys
connection= pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel= connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message= ' '.join(sys.argv[1:])or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode= 2,# make message persistent
))
print " [x] Sent %r" % (message,)
connection.close()
登入後複製

worker.py完整程式碼

#!/usr/bin/env python
import pika
import time
connection= pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel= connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
ch.basic_ack(delivery_tag= method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
登入後複製

以上就是Python rabbitmq的使用(二)的內容,更多相關內容請關注PHPcn)!

相關標籤:
來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板