python - django+celery+ansibleApi无返回
阿神
阿神 2017-04-18 10:22:50
0
3
1274

1.python调用AnsibleApi远程执行任务,不用celery的情况下能正确运行,使用的话返回为空.pdb调试发现是调用Ansible返回异常,但具体原因几天实在无法查出

2.代码复现如现如下:

  • tasks.py

from celery import shared_task from .deploy_tomcat2 import django_process @shared_task def deploy(jira_num): #return 'hello world {0}'.format(jira_num) #rdb.set_trace() return django_process(jira_num)
  • deploy_tomcat2.py

from .AnsibleApi import CallApi def django_process(jira_num): server = '10.10.10.30' name = 'abc' port = 11011 code = 'efs' jdk = '1.12.13' jvm = 'xxxx' if str.isdigit(jira_num): # import pdb # pdb.set_trace() call = CallApi(server,name,port,code,jdk,jvm) return call.run_task()
  • AnsibleApi.py

#!/usr/bin/env python # -*- coding: utf-8 -*- import logging from .Logger import Logger from django.conf import settings from collections import namedtuple from ansible.parsing.dataloader import DataLoader from ansible.vars import VariableManager from ansible.inventory import Inventory from ansible.playbook.play import Play from ansible.executor.task_queue_manager import TaskQueueManager from ansible.plugins.callback import CallbackBase Log = Logger('/tmp/auto_deploy_tomcat.log',logging.INFO) class ResultCallback(CallbackBase): def __init__(self, *args, **kwargs): super(ResultCallback ,self).__init__(*args, **kwargs) self.host_ok = {} self.host_unreachable = {} self.host_failed = {} def v2_runner_on_unreachable(self, result): self.host_unreachable[result._host.get_name()] = result def v2_runner_on_ok(self, result, *args, **kwargs): self.host_ok[result._host.get_name()] = result def v2_runner_on_failed(self, result, *args, **kwargs): self.host_failed[result._host.get_name()] = result class CallApi(object): user = settings.SSH_USER ssh_private_key_file = settings.SSH_PRIVATE_KEY_FILE results_callback = ResultCallback() Options = namedtuple('Options', ['connection', 'module_path', 'private_key_file', 'forks', 'become', 'become_method', 'become_user', 'check']) def __init__(self,ip,name,port,code,jdk,jvm): self.ip = ip self.name = name self.port = port self.code = code self.jdk = jdk self.jvm = jvm self.results_callback = ResultCallback() self.results_raw = {} def _gen_user_task(self): tasks = [] deploy_script = 'autodeploy/tomcat_deploy.sh' dst_script = '/tmp/tomcat_deploy.sh' cargs = dict(src=deploy_script, dest=dst_script, owner=self.user, group=self.user, mode='0755') args = "%s %s %d %s %s '%s'" % (dst_script, self.name, self.port, self.code, self.jdk, self.jvm) tasks.append(dict(action=dict(module='copy', args=cargs),register='shell_out')) tasks.append(dict(action=dict(module='debug', args=dict(msg='{{shell_out}}')))) # tasks.append(dict(action=dict(module='command', args=args))) # tasks.append(dict(action=dict(module='command', args=args), register='result')) # tasks.append(dict(action=dict(module='debug', args=dict(msg='{{result.stdout}}')))) self.tasks = tasks def _set_option(self): self._gen_user_task() self.variable_manager = VariableManager() self.loader = DataLoader() self.options = self.Options(connection='smart', module_path=None, private_key_file=self.ssh_private_key_file, forks=None, become=True, become_method='sudo', become_user='root', check=False) self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=[self.ip]) self.variable_manager.set_inventory(self.inventory) play_source = dict( name = "auto deploy tomcat", hosts = self.ip, remote_user = self.user, gather_facts='no', tasks = self.tasks ) self.play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader) def run_task(self): self.results_raw = {'success':{}, 'failed':{}, 'unreachable':{}} tqm = None from celery.contrib import rdb;rdb.set_trace() #import pdb;pdb.set_trace() self._set_option() try: tqm = TaskQueueManager( inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, passwords=None, stdout_callback=self.results_callback, ) result = tqm.run(self.play) finally: if tqm is not None: tqm.cleanup() for host, result in self.results_callback.host_ok.items(): self.results_raw['success'][host] = result._result for host, result in self.results_callback.host_failed.items(): self.results_raw['failed'][host] = result._result for host, result in self.results_callback.host_unreachable.items(): self.results_raw['unreachable'][host]= result._result Log.info("result is :%s" % self.results_raw) return self.results_raw
  • 复现方法

  • 启动celery worker:
    celery -A jira worker -Q queue.ops.deploy -n "deploy.%h" -l info

  • 另一窗口生产消息:
    deploy.apply_async(args=['150'], queue='queue.ops.deploy', routing_key='ops.deploy')

阿神
阿神

闭关修行中......

全部回覆 (3)
PHPzhong

有兩種方法可以解決這個問題,就是關閉assert:
1.在celery 的worker啟動視窗設定export PYTHONOPTIMIZE=1或開啟celery這個參數-O OPTIMIZATION
2.註解掉python套件multiprocessing下面process.py中102行,關閉assert

    黄舟

    既然都用django,CRUD看來是標配了,那你不如試試 post_save 這個 signal
    直接 deploy.delay(**params)

      刘奇


      請問解決了沒,我應該是遇到同樣的問題,delay執行有輸出,可實際上沒執行到ansible的操作

        最新下載
        更多>
        網站特效
        網站源碼
        網站素材
        前端模板
        關於我們 免責聲明 Sitemap
        PHP中文網:公益線上PHP培訓,幫助PHP學習者快速成長!