Home  >  Article  >  Backend Development  >  Python custom master-slave distributed architecture example analysis

Python custom master-slave distributed architecture example analysis

高洛峰
高洛峰Original
2017-02-22 16:22:061317browse

This article mainly introduces Python's custom master-slave distributed architecture, and analyzes the structure, principles and specific code implementation techniques of the master-slave distributed architecture in the form of examples. Friends in need can refer to it

The example in this article describes Python’s custom master-slave distributed architecture. Share it with everyone for your reference, the details are as follows:

Environment: Win7 x64, Python 2.7, APScheduler 2.1.2.

The schematic diagram is as follows:

Python custom master-slave distributed architecture example analysis

Code part:

(1), Center node:

#encoding=utf-8
#author: walker
#date: 2014-12-03
#function: 中心节点(主要功能是分配任务)
import SocketServer, socket, Queue
CenterIP = '127.0.0.1'  #中心节点IP
CenterListenPort = 9999  #中心节点监听端口
CenterClient = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #中心节点用于发送网络消息的socket
TaskQueue = Queue.Queue() #任务队列
#获取任务队列
def GetTaskQueue():
  for i in range(1, 11):
    TaskQueue.put(str(i))
#CenterServer的回调函数,在接受到udp报文是触发
class MyUDPHandler(SocketServer.BaseRequestHandler):
  def handle(self):
    data = self.request[0].strip()
    socket = self.request[1]
    print(data)
    if data.startswith('wait'):
      vec = data.split(':')
      if len(vec) != 3:
        print('Error: len(vec) != 3')
      else:
        nodeIP = vec[1]
        nodeListenPort = vec[2]
        nodeID = nodeIP + ':' + nodeListenPort
        if not TaskQueue.empty():
          task = TaskQueue.get()
          print('send task ' + task + ' to ' + nodeID)
          CenterClient.sendto('task:' + task, (nodeIP, int(nodeListenPort)))
        else:
          print('TaskQueue is empty!')
GetTaskQueue() #获取任务队列
CenterServer = SocketServer.UDPServer((CenterIP, CenterListenPort), MyUDPHandler)
print('Listen port ' + str(CenterListenPort) + ' ...')
CenterServer.serve_forever()

(2), task node:

#encoding=utf-8
#author: walker
#date: 2014-12-03
#function: 任务节点(请求/接收/执行任务)
import time, socket, SocketServer
from apscheduler.scheduler import Scheduler
CenterIP = '127.0.0.1'  #中心节点IP
CenterListenPort = 9999  #中心节点监听端口
NodeIP = socket.gethostbyname(socket.gethostname())  #任务节点自身IP
NodeClient = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  #任务节点用于发送网络消息的socket
#任务:发送网络信息
def jobSendNetMsg():
  msg = ''
  if NodeServer.TaskState == 'wait':
    msg = 'wait:' + NodeIP + ':' + str(NodeListenPort)
  elif NodeServer.TaskState == 'exec':
    msg = 'exec:' + NodeIP + ':' + str(NodeListenPort)
  print(msg)
  NodeClient.sendto(msg, (CenterIP, CenterListenPort))
#添加并启动定时任务
def InitTimer():
  sched = Scheduler()
  sched.add_interval_job(jobSendNetMsg, seconds=1)
  sched.start()
#执行任务
def ExecTask(task):
  print('ExecTask ' + task + ' ...')
  time.sleep(2)
  print('ExecTask ' + task + ' over')
#NodeServer的回调函数,在接受到udp报文是触发
class MyUDPHandler(SocketServer.BaseRequestHandler):
  def handle(self):
    data = self.request[0].strip()
    socket = self.request[1]
    print('recv data: ' + data)
    if data.startswith('task'):
      vec = data.split(':')
      if len(vec) != 2:
        print('Error: len(vec) != 2')
      else:
        task = vec[1]
        self.server.TaskState = 'exec'
        ExecTask(task)
        self.server.TaskState = 'wait'
InitTimer()
NodeServer = SocketServer.UDPServer(('', 0), MyUDPHandler)
NodeServer.TaskState = 'wait' #(exec/wait)
NodeListenPort = NodeServer.server_address[1]
print('NodeListenPort:' + str(NodeListenPort))
NodeServer.serve_forever()


For more Python custom master-slave distributed architecture example analysis related articles, please pay attention to the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn