Tutoriel Python de démarrage rapide de RabbitMQ

高洛峰
Libérer: 2017-03-09 09:28:19
original
1795 Les gens l'ont consulté

HelloWorld

Introduction

RabbitMQ : acceptant les messages puis les remettant, il peut être considéré comme un « bureau de poste ». Les expéditeurs et les destinataires interagissent via des files d'attente. La taille de la file d'attente peut être considérée comme illimitée. Plusieurs expéditeurs peuvent envoyer des messages à une file d'attente, et plusieurs destinataires peuvent également recevoir des messages d'une file d'attente.

code

Le protocole utilisé par lapinmq est amqp, et le client recommandé pour python est pika

pip install pika -i https://pypi.douban.com/simple/
Copier après la connexion

send.py

# coding: utf8
import pika

# 建立一个连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
           'localhost'))  # 连接本地的RabbitMQ服务器
channel = connection.channel()  # 获得channel
Copier après la connexion

Le lien ici concerne cette machine. Si vous souhaitez vous connecter à un serveur sur une autre machine, remplissez simplement l'adresse ou le nom d'hôte.

Ensuite, nous commençons à envoyer des messages. Assurez-vous que la file d'attente qui accepte le message existe, sinon RabbitMQ supprimera le message.

channel.queue_declare(queue='hello')  # 在RabbitMQ中创建hello这个队列
channel.basic_publish(exchange='',  # 使用默认的exchange来发送消息到队列
                  routing_key='hello',  # 发送到该队列 hello 中
                  body='Hello World!')  # 消息内容

connection.close()  # 关闭 同时flush
Copier après la connexion

RabbitMQ nécessite 1 Go d'espace disque libre par défaut. l'envoi échouera.

À l'heure actuelle, un message a été stocké dans la file d'attente locale bonjour. Si vous utilisez Rabbitmqctl list_queues, vous pouvez voir

hello 1
Copier après la connexion

indiquant qu'il y a un message stocké dans la file d'attente bonjour.

receive.py

# coding: utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()
Copier après la connexion

Connectez-vous toujours au serveur en premier, comme lors de l'envoi avant

channel.queue_declare(queue='hello')  # 此处就是声明了 来确保该队列 hello 存在 可以多次声明 这里主要是为了防止接受程序先运行时出错

def callback(ch, method, properties, body):  # 用于接收到消息后的回调
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',  # 收指定队列hello的消息
                      no_ack=True)  #在处理完消息后不发送ack给服务器
channel.start_consuming()  # 启动消息接受 这会进入一个死循环
Copier après la connexion

File d'attente des tâches (file d'attente des tâches)

La file d'attente de travail est utilisée pour distribuer les tâches fastidieuses sur plusieurs processus de travail. Au lieu d'effectuer immédiatement des tâches gourmandes en ressources (vous devez attendre que ces tâches soient terminées), planifiez ces tâches pour une exécution ultérieure. Par exemple, nous envoyons la tâche sous forme de message à la file d'attente, démarrons un processus de travail pour l'accepter et éventuellement l'exécuter, et pouvons démarrer plusieurs processus de travail pour qu'ils fonctionnent. Cela s'applique aux applications Web, où les tâches complexes ne doivent pas être effectuées dans la fenêtre de traitement d'une requête http.

channel.basic_publish(exchange='',
                  routing_key='task_queue',
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # 使得消息持久化
                  ))
Copier après la connexion

La façon de distribuer les messages est le sondage, c'est-à-dire que chaque processus de travail reçoit le même nombre de messages.

Accusation de message

Si un message est affecté à un processus de travail, mais que le processus de travail se bloque avant la fin du traitement, le message peut être perdu, car une fois que RabbitMQ distribue un message au processus de travail , il supprime le message.

Afin d'éviter la perte de messages, Rabbitmq fournit un accusé de réception, c'est-à-dire qu'une fois que le processus de travail a reçu le message et l'a traité, il envoie un accusé de réception à Rabbitmq pour informer Rabbitmq que le message peut être supprimé de la file d'attente à ce moment-là. temps. Si le processus de travail meurt et que Rabbitmq ne reçoit pas l'accusé de réception, le message sera redistribué aux autres processus de travail. Il n'est pas nécessaire de définir un délai d'attente, même si la tâche prend beaucoup de temps, elle peut être traitée.

ack est activé par défaut. Auparavant, notre processus de travail spécifiait no_ack=True

channel.basic_consume(callback, queue='hello')  # 会启用ack
Copier après la connexion

rappel avec ack :

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack
Copier après la connexion

Persistance du message

Cependant, RabbitMQ est parfois redémarré et les messages sont perdus. La persistance peut être paramétrée lors de la création de la file d'attente :
(la nature de la file d'attente ne peut plus être modifiée une fois déterminée)

channel.queue_declare(queue='task_queue', durable=True)
Copier après la connexion

Parallèlement, l'attribut de persistance du message doit également être paramétré lors de l'envoi le message :

channel.basic_publish(exchange='',

                  routing_key="task_queue",
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # make message persistent
                  ))
Copier après la connexion

Cependant, si RabbitMQ vient de recevoir un message et n'a pas eu le temps de le stocker, le message sera quand même perdu. En même temps, RabbitMQ ne reçoit pas tous les messages. Si vous avez besoin d'une garantie plus complète, vous devez utiliser la confirmation de l'éditeur

Distribution équitable des messages

Polling. La distribution des messages en mode peut ne pas être équitable, par exemple, les messages impairs sont tous des tâches lourdes, certains processus exécuteront toujours des tâches lourdes, même s'il y a un retard de messages sur un certain processus de travail qui n'a pas été traité, par exemple. de nombreux accusés de réception n'ont pas été envoyés, RabbitMQ lui enverra toujours des messages dans l'ordre :

channel.basic_qos(prefetch_count=1)
Copier après la connexion

Informez RabbitMQ afin que si un processus de travail ne renvoie pas d'accusé de réception, il ne lui attribuera pas de messages.

Envoi groupé

En général, un message est envoyé à un processus de travail puis terminé. Parfois, je souhaite envoyer un message à plusieurs processus en même temps :

échange

L'expéditeur envoie-t-il directement le message à la file d'attente ? En fait, l'expéditeur ne sait pas à quelle file d'attente le message sera envoyé. L'expéditeur ne peut envoyer le message qu'à l'échange. d'une part, l'échange reçoit les messages du producteur et d'autre part, il les pousse vers la file d'attente. Ainsi, en tant qu'échange, vous devez savoir quoi faire lorsqu'un message est reçu, s'il doit être ajouté à un message spécial. file d'attente ou mis dans plusieurs files d'attente, ou rejeté. Exchange a des types directs, sujets, en-têtes, fanout et autres, c'est-à-dire que l'envoi de masse est utilisé lors de la publication d'un message, c'est-à-dire. l'échange par défaut a été utilisé, peut être utilisé lors de l'envoi ou de la réception

channel.exchange_declare(exchange='logs', type='fanout')  # 该exchange会把消息发送给所有它知道的队列中
Copier après la connexion
Échange de liaison et file d'attente

result = channel.queue_declare()  # 创建一个随机队列
result = channel.queue_declare(exclusive=True)  # 创建一个随机队列,同时在没有接收者连接该队列后则销毁它
queue_name = result.method.queue
Copier après la connexion
Les journaux seront également envoyés à hello lors de l'envoi. Lors de l'envoi. Le message est acheminé à l'aide de l'échange de journaux nouvellement créé

channel.queue_bind(exchange='logs',
               queue='hello')
Copier après la connexion
Bind a déjà été utilisé, c'est-à-dire que la relation entre l'échange et la file d'attente est établie (la file d'attente est intéressé par les messages de l'échange), Vous pouvez également spécifier l'option router_key lors de la liaison. Utilisez l'échange direct

pour envoyer le message correspondant à la clé de routage à la file d'attente liée à la même clé de routage

.

   channel.basic_publish(exchange='logs',
                  routing_key='',
                  body=message)
Copier après la connexion
Fonction Envoyer. , publier des messages de différentes gravités :

Lier la gravité correspondante dans la fonction d'acceptation :

Utiliser l'échange de sujets

L'échange direct utilisé auparavant ne peut que Pour lier une clé de routage, vous pouvez utiliser ce sujet d'échange qui sépare les clés de routage, par exemple :
channel.exchange_declare(exchange='direct_logs',
                     type='direct')
Copier après la connexion
"stock.usd.nyse" "nyse.vmw"
Copier après la connexion

和direct exchange一样,在接受者那边绑定的key与发送时指定的routing key相同即可,另外有些特殊的值:

* 代表1个单词
# 代表0个或多个单词
Copier après la connexion

如果发送者发出的routing key都是3个部分的,如:celerity.colour.species。

Q1:
*.orange.*  对应的是中间的colour都为orange的

Q2:
*.*.rabbit  对应的是最后部分的species为rabbit的
lazy.#      对应的是第一部分是lazy的
Copier après la connexion

qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,对于lazy.pink.rabbit虽然匹配到了Q2两次,但是只会发送一次。如果绑定时直接绑定#,则会收到所有的。

 RPC

在远程机器上运行一个函数然后获得结果。

1、客户端启动 同时设置一个临时队列用于接受回调,绑定该队列

    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response, no_ack=True,
                               queue=self.callback_queue)
Copier après la connexion

2、客户端发送rpc请求,同时附带reply_to对应回调队列,correlation_id设置为每个请求的唯一id(虽然说可以为每一次RPC请求都创建一个回调队列,但是这样效率不高,如果一个客户端只使用一个队列,则需要使用correlation_id来匹配是哪个请求),之后阻塞在回调队列直到收到回复

注意:如果收到了非法的correlation_id直接丢弃即可,因为有这种情况--服务器已经发了响应但是还没发ack就挂了,等一会服务器重启了又会重新处理该任务,又发了一遍相应,但是这时那个请求已经被处理掉了

channel.basic_publish(exchange='',
                       routing_key='rpc_queue',
                       properties=pika.BasicProperties(
                             reply_to = self.callback_queue,
                             correlation_id = self.corr_id,
                             ),
                       body=str(n))  # 发出调用

while self.response is None:  # 这边就相当于阻塞了
    self.connection.process_data_events()  # 查看回调队列
return int(self.response)
Copier après la connexion

3、请求会发送到rpc_queue队列
4、RPC服务器从rpc_queue中取出,执行,发送回复

channel.basic_consume(on_request, queue='rpc_queue')  # 绑定 等待请求

# 处理之后:
ch.basic_publish(exchange='',
                 routing_key=props.reply_to,
                 properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                 body=str(response))  # 发送回复到回调队列
ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack
Copier après la connexion

5、客户端从回调队列中取出数据,检查correlation_id,执行相应操作

if self.corr_id == props.correlation_id:
        self.response = body
Copier après la connexion

                                               

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:php.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal
À propos de nous Clause de non-responsabilité Sitemap
Site Web PHP chinois:Formation PHP en ligne sur le bien-être public,Aidez les apprenants PHP à grandir rapidement!