1.python调用AnsibleApi远程执行任务,不用celery的情况下能正确运行,使用的话返回为空.pdb调试发现是调用Ansible返回异常,但具体原因几天实在无法查出
2.代码复现如现如下:
tasks.py
from celery import shared_task from .deploy_tomcat2 import django_process @shared_task def deploy(jira_num): #return 'hello world {0}'.format(jira_num) #rdb.set_trace() return django_process(jira_num)
deploy_tomcat2.py
from .AnsibleApi import CallApi def django_process(jira_num): server = '10.10.10.30' name = 'abc' port = 11011 code = 'efs' jdk = '1.12.13' jvm = 'xxxx' if str.isdigit(jira_num): # import pdb # pdb.set_trace() call = CallApi(server,name,port,code,jdk,jvm) return call.run_task()
AnsibleApi.py
#!/usr/bin/env python # -*- coding: utf-8 -*- import logging from .Logger import Logger from django.conf import settings from collections import namedtuple from ansible.parsing.dataloader import DataLoader from ansible.vars import VariableManager from ansible.inventory import Inventory from ansible.playbook.play import Play from ansible.executor.task_queue_manager import TaskQueueManager from ansible.plugins.callback import CallbackBase Log = Logger('/tmp/auto_deploy_tomcat.log',logging.INFO) class ResultCallback(CallbackBase): def __init__(self, *args, **kwargs): super(ResultCallback ,self).__init__(*args, **kwargs) self.host_ok = {} self.host_unreachable = {} self.host_failed = {} def v2_runner_on_unreachable(self, result): self.host_unreachable[result._host.get_name()] = result def v2_runner_on_ok(self, result, *args, **kwargs): self.host_ok[result._host.get_name()] = result def v2_runner_on_failed(self, result, *args, **kwargs): self.host_failed[result._host.get_name()] = result class CallApi(object): user = settings.SSH_USER ssh_private_key_file = settings.SSH_PRIVATE_KEY_FILE results_callback = ResultCallback() Options = namedtuple('Options', ['connection', 'module_path', 'private_key_file', 'forks', 'become', 'become_method', 'become_user', 'check']) def __init__(self,ip,name,port,code,jdk,jvm): self.ip = ip self.name = name self.port = port self.code = code self.jdk = jdk self.jvm = jvm self.results_callback = ResultCallback() self.results_raw = {} def _gen_user_task(self): tasks = [] deploy_script = 'autodeploy/tomcat_deploy.sh' dst_script = '/tmp/tomcat_deploy.sh' cargs = dict(src=deploy_script, dest=dst_script, owner=self.user, group=self.user, mode='0755') args = "%s %s %d %s %s '%s'" % (dst_script, self.name, self.port, self.code, self.jdk, self.jvm) tasks.append(dict(action=dict(module='copy', args=cargs),register='shell_out')) tasks.append(dict(action=dict(module='debug', args=dict(msg='{{shell_out}}')))) # tasks.append(dict(action=dict(module='command', args=args))) # tasks.append(dict(action=dict(module='command', args=args), register='result')) # tasks.append(dict(action=dict(module='debug', args=dict(msg='{{result.stdout}}')))) self.tasks = tasks def _set_option(self): self._gen_user_task() self.variable_manager = VariableManager() self.loader = DataLoader() self.options = self.Options(connection='smart', module_path=None, private_key_file=self.ssh_private_key_file, forks=None, become=True, become_method='sudo', become_user='root', check=False) self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=[self.ip]) self.variable_manager.set_inventory(self.inventory) play_source = dict( name = "auto deploy tomcat", hosts = self.ip, remote_user = self.user, gather_facts='no', tasks = self.tasks ) self.play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader) def run_task(self): self.results_raw = {'success':{}, 'failed':{}, 'unreachable':{}} tqm = None from celery.contrib import rdb;rdb.set_trace() #import pdb;pdb.set_trace() self._set_option() try: tqm = TaskQueueManager( inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, passwords=None, stdout_callback=self.results_callback, ) result = tqm.run(self.play) finally: if tqm is not None: tqm.cleanup() for host, result in self.results_callback.host_ok.items(): self.results_raw['success'][host] = result._result for host, result in self.results_callback.host_failed.items(): self.results_raw['failed'][host] = result._result for host, result in self.results_callback.host_unreachable.items(): self.results_raw['unreachable'][host]= result._result Log.info("result is :%s" % self.results_raw) return self.results_raw
复现方法
启动celery worker:celery -A jira worker -Q queue.ops.deploy -n "deploy.%h" -l info
另一窗口生产消息:deploy.apply_async(args=['150'], queue='queue.ops.deploy', routing_key='ops.deploy')
There are two ways to solve this problem, which is to turn off assert:
1. Set export PYTHONOPTIMIZE=1 in the worker startup window of celery or turn on the celery parameter -O OPTIMIZATION
2. Comment out line 102 in process.py under the python package multiprocessing , close assert
Since we all use django, CRUD seems to be standard, so you might as well try the post_save signal
directly deploy.delay(**params)
Excuse me, have you solved it? I should have encountered the same problem. The delay execution has output, but in fact no ansible operation has been executed