RabbitMQ는 MQ 기반의 서버입니다. Python은 Pika 라이브러리를 통해 프로그램 제어에 사용할 수 있습니다. 여기서는 RabbitMQ 서버 메시지 큐의 원격 결과 반환에 대해 자세히 설명합니다. 작성자의 테스트에 대해 먼저 이야기하세요. 환경: Ubuntu14.04 + Python 2.7.4
RabbitMQ 서버sudo apt-get install rabbitmq-server
RabbitMQ를 사용하는 Python에는 Pika 라이브러리가 필요합니다
sudo pip install pika
메시지 보낸 사람이 메시지를 보낸 후 결과가 반환되지 않습니다. 그냥 메시지를 보내면 당연히 문제가 없지만, 실제로는 수신측에서 수신된 메시지를 처리해서 송신측에 돌려주어야 하는 경우가 많습니다.
처리 방법 설명: 정보를 보내기 전에 송신 측에서 메시지 수신을 위한 임시 대기열을 생성합니다. 이 대기열은 반환된 결과를 수신하는 데 사용됩니다. 실제로 여기에서는 수신 측과 송신 측의 개념이 상대적으로 모호합니다. 왜냐하면 송신 측도 메시지를 받아야 하고, 수신 측도 메시지를 보내야 하기 때문입니다. 따라서 여기에서는 이 프로세스를 설명하기 위해 또 다른 예를 사용합니다.
예제 내용: 제어 센터와 컴퓨팅 노드가 있다고 가정합니다. 제어 센터는 컴퓨팅 노드에 자연수 N을 전송하고 컴퓨팅 노드는 N 값에 1을 더하여 제어 센터로 반환합니다. . 여기서 center.py는 제어 센터를 시뮬레이션하는 데 사용되고, Compute.py는 컴퓨팅 노드를 시뮬레이션하는 데 사용됩니다.
compute.py 코드 분석
#!/usr/bin/env python #coding=utf8 import pika #连接rabbitmq服务器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #定义队列 channel.queue_declare(queue='compute_queue') print ' [*] Waiting for n' #将n值加1 def increase(n): return n + 1 #定义接收到消息的处理方法 def request(ch, method, properties, body): print " [.] increase(%s)" % (body,) response = increase(int(body)) #将计算结果发送回控制中心 ch.basic_publish(exchange='', routing_key=properties.reply_to, body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(request, queue='compute_queue') channel.start_consuming()
계산 노드의 코드는 원래 수신 방법이 비교적 간단하다는 점을 언급할 가치가 있습니다. all direct 메시지가 인쇄되고 여기에서 더하기 1의 계산이 수행되며 결과가 제어 센터로 다시 전송됩니다.
center.py 코드 분석
#!/usr/bin/env python #coding=utf8 import pika class Center(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() #定义接收返回消息的队列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) #定义接收到返回消息的处理方法 def on_response(self, ch, method, props, body): self.response = body def request(self, n): self.response = None #发送计算请求,并声明返回队列 self.channel.basic_publish(exchange='', routing_key='compute_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, ), body=str(n)) #接收返回的数据 while self.response is None: self.connection.process_data_events() return int(self.response) center = Center() print " [x] Requesting increase(30)" response = center.request(30) print " [.] Got %r" % (response,)
위의 예제 코드는 반환된 데이터를 수신하고 요청을 보낼 때 대기열 및 처리 방법을 정의합니다. reply_to에 대기열을 할당합니다. 이 매개변수는 계산 노드 코드에서 반환 대기열을 얻는 데 사용됩니다.
두 개의 터미널을 엽니다. 하나는 python comput.py 코드를 실행하고, 다른 하나는 center.py를 실행합니다. 실행이 성공하면 효과를 볼 수 있습니다.
테스트 중에 작성자는 몇 가지 사소한 문제에 직면했습니다. 즉, center.py가 메시지를 보낼 때 반환 대기열이 지정되지 않았습니다. 그 결과, Compute.py가 이후 데이터를 다시 보낼 때 오류를 보고했습니다. 결과를 계산하면 Routing_key가 존재하지 않는다는 메시지가 표시되고 다시 실행하면 오류가 보고됩니다. Rabbitmqctl list_queues를 사용하여 큐를 확인하고 Compute_queue 큐에 1개의 데이터가 있는지 확인하세요. 이 데이터는 Compute.py가 다시 실행될 때마다 다시 처리됩니다. 나중에 /etc/init.d/rabbitmq-server restart를 사용하여 Rabbitmq를 다시 시작했는데 모든 것이 정상이었습니다.
상관 ID앞의 예에서는 원격 결과 반환의 예를 보여 주었는데 한 가지 언급되지 않은 것이 바로 상관 ID입니다.
여러 컴퓨팅 노드가 있고 제어 센터가 여러 스레드를 시작하고 이러한 컴퓨팅 노드에 숫자를 보내고 계산 결과를 요구하여 반환하지만 제어 센터는 하나의 대기열만 열고 모든 스레드는 다음에서 시작한다고 가정합니다. 메시지를 얻기 위해 각 스레드는 수신된 메시지가 해당 스레드에 해당하는지 어떻게 확인합니까? 상관관계 ID를 활용한 것입니다. 상관관계(correlation)는 중국어로 상호상관(mutual correlation)으로 번역되는데, 이것도 이런 의미를 표현합니다.
Correlation id 작동 원리: 관제 센터는 계산 요청을 보낼 때 Correlation id를 설정하고, 계산 노드는 수신된 Correlation id와 함께 계산 결과를 반환하여 관제 센터가 이를 통해 요청을 식별할 수 있도록 합니다. 상관관계 ID 실제로 상관 ID는 요청의 고유 식별 코드로도 이해될 수 있습니다.
예시 내용: 제어 센터는 여러 스레드를 시작하고 각 스레드는 상관 ID를 통해 계산 요청을 시작하며 각 스레드는 해당 계산 결과를 정확하게 수신할 수 있습니다.
compute.py 코드 분석
이전 글과 비교해서 한 곳만 수정하면 됩니다. 계산 결과를 제어 센터로 다시 보낼 때 매개변수 Correlation_id 설정, 값을 추가하면 됩니다. 이 매개변수의 사실은 관제센터에서 보냈는데, 그냥 다시 돌려보내는 것입니다. 코드는 다음과 같습니다.
#!/usr/bin/env python #coding=utf8 import pika #连接rabbitmq服务器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #定义队列 channel.queue_declare(queue='compute_queue') print ' [*] Waiting for n' #将n值加1 def increase(n): return n + 1 #定义接收到消息的处理方法 def request(ch, method, props, body): print " [.] increase(%s)" % (body,) response = increase(int(body)) #将计算结果发送回控制中心,增加correlation_id的设定 ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(request, queue='compute_queue') channel.start_consuming()
center.py 코드 분석
통제센터 코드는 조금 더 복잡하며 키는 3개입니다. 장소:
파이썬의 uuid를 사용하여 고유한 Correlation_id를 생성합니다.
계산 요청을 보낼 때 Correlation_id 매개변수를 설정하세요.반환된 데이터를 저장할 딕셔너리를 정의하고, 키값은 해당 스레드에서 생성한 Correlation_id입니다.
코드는 다음과 같습니다.
#!/usr/bin/env python #coding=utf8 import pika, threading, uuid #自定义线程类,继承threading.Thread class MyThread(threading.Thread): def __init__(self, func, num): super(MyThread, self).__init__() self.func = func self.num = num def run(self): print " [x] Requesting increase(%d)" % self.num response = self.func(self.num) print " [.] increase(%d)=%d" % (self.num, response) #控制中心类 class Center(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() #定义接收返回消息的队列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) #返回的结果都会存储在该字典里 self.response = {} #定义接收到返回消息的处理方法 def on_response(self, ch, method, props, body): self.response[props.correlation_id] = body def request(self, n): corr_id = str(uuid.uuid4()) self.response[corr_id] = None #发送计算请求,并设定返回队列和correlation_id self.channel.basic_publish(exchange='', routing_key='compute_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = corr_id, ), body=str(n)) #接收返回的数据 while self.response[corr_id] is None: self.connection.process_data_events() return int(self.response[corr_id]) center = Center() #发起5次计算请求 nums= [10, 20, 30, 40 ,50] threads = [] for num in nums: threads.append(MyThread(center.request, num)) for thread in threads: thread.start() for thread in threads: thread.join()
작가는 Compute.py를 실행하기 위해 두 개의 터미널을 열고, center.py를 실행하기 위해 터미널을 열었고, 최종 결과가 출력되었습니다. 스크린샷은 다음과 같습니다.
획득한 결과가 순차적으로 출력되지는 않지만 소스 데이터와 일치하는 결과를 볼 수 있습니다.
여기의 예는 대기열을 생성하고 상관 관계 ID를 사용하여 각 요청을 식별하는 것입니다. Correlation ID를 사용하지 않는 방법도 있는데, 이는 요청이 있을 때마다 임시 대기열을 생성하는 것입니다. 그러나 이는 성능을 너무 많이 소비하므로 공식적으로 권장되지 않습니다.
Python의 RabbitMQ 서버 메시지 대기열 운영 및 원격 결과 반환과 관련된 더 많은 기사를 보려면 PHP 중국어 웹사이트를 주목하세요!