1.python调用AnsibleApi远程执行任务,不用celery的情况下能正确运行,使用的话返回为空.pdb调试发现是调用Ansible返回异常,但具体原因几天实在无法查出
2.代码复现如现如下:
tasks.py
from celery import shared_task
from .deploy_tomcat2 import django_process
@shared_task
def deploy(jira_num):
#return 'hello world {0}'.format(jira_num)
#rdb.set_trace()
return django_process(jira_num)
deploy_tomcat2.py
from .AnsibleApi import CallApi
def django_process(jira_num):
server = '10.10.10.30'
name = 'abc'
port = 11011
code = 'efs'
jdk = '1.12.13'
jvm = 'xxxx'
if str.isdigit(jira_num):
# import pdb
# pdb.set_trace()
call = CallApi(server,name,port,code,jdk,jvm)
return call.run_task()
AnsibleApi.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
from .Logger import Logger
from django.conf import settings
from collections import namedtuple
from ansible.parsing.dataloader import DataLoader
from ansible.vars import VariableManager
from ansible.inventory import Inventory
from ansible.playbook.play import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.plugins.callback import CallbackBase
Log = Logger('/tmp/auto_deploy_tomcat.log',logging.INFO)
class ResultCallback(CallbackBase):
def __init__(self, *args, **kwargs):
super(ResultCallback ,self).__init__(*args, **kwargs)
self.host_ok = {}
self.host_unreachable = {}
self.host_failed = {}
def v2_runner_on_unreachable(self, result):
self.host_unreachable[result._host.get_name()] = result
def v2_runner_on_ok(self, result, *args, **kwargs):
self.host_ok[result._host.get_name()] = result
def v2_runner_on_failed(self, result, *args, **kwargs):
self.host_failed[result._host.get_name()] = result
class CallApi(object):
user = settings.SSH_USER
ssh_private_key_file = settings.SSH_PRIVATE_KEY_FILE
results_callback = ResultCallback()
Options = namedtuple('Options',
['connection', 'module_path', 'private_key_file', 'forks', 'become', 'become_method',
'become_user', 'check'])
def __init__(self,ip,name,port,code,jdk,jvm):
self.ip = ip
self.name = name
self.port = port
self.code = code
self.jdk = jdk
self.jvm = jvm
self.results_callback = ResultCallback()
self.results_raw = {}
def _gen_user_task(self):
tasks = []
deploy_script = 'autodeploy/tomcat_deploy.sh'
dst_script = '/tmp/tomcat_deploy.sh'
cargs = dict(src=deploy_script, dest=dst_script, owner=self.user, group=self.user, mode='0755')
args = "%s %s %d %s %s '%s'" % (dst_script, self.name, self.port, self.code, self.jdk, self.jvm)
tasks.append(dict(action=dict(module='copy', args=cargs),register='shell_out'))
tasks.append(dict(action=dict(module='debug', args=dict(msg='{{shell_out}}'))))
# tasks.append(dict(action=dict(module='command', args=args)))
# tasks.append(dict(action=dict(module='command', args=args), register='result'))
# tasks.append(dict(action=dict(module='debug', args=dict(msg='{{result.stdout}}'))))
self.tasks = tasks
def _set_option(self):
self._gen_user_task()
self.variable_manager = VariableManager()
self.loader = DataLoader()
self.options = self.Options(connection='smart', module_path=None, private_key_file=self.ssh_private_key_file, forks=None,
become=True, become_method='sudo', become_user='root', check=False)
self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=[self.ip])
self.variable_manager.set_inventory(self.inventory)
play_source = dict(
name = "auto deploy tomcat",
hosts = self.ip,
remote_user = self.user,
gather_facts='no',
tasks = self.tasks
)
self.play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader)
def run_task(self):
self.results_raw = {'success':{}, 'failed':{}, 'unreachable':{}}
tqm = None
from celery.contrib import rdb;rdb.set_trace()
#import pdb;pdb.set_trace()
self._set_option()
try:
tqm = TaskQueueManager(
inventory=self.inventory,
variable_manager=self.variable_manager,
loader=self.loader,
options=self.options,
passwords=None,
stdout_callback=self.results_callback,
)
result = tqm.run(self.play)
finally:
if tqm is not None:
tqm.cleanup()
for host, result in self.results_callback.host_ok.items():
self.results_raw['success'][host] = result._result
for host, result in self.results_callback.host_failed.items():
self.results_raw['failed'][host] = result._result
for host, result in self.results_callback.host_unreachable.items():
self.results_raw['unreachable'][host]= result._result
Log.info("result is :%s" % self.results_raw)
return self.results_raw
复现方法
启动celery worker:celery -A jira worker -Q queue.ops.deploy -n "deploy.%h" -l info
另一窗口生产消息:deploy.apply_async(args=['150'], queue='queue.ops.deploy', routing_key='ops.deploy')
이 문제를 해결하는 방법에는 두 가지가 있습니다. 즉, 어설션을 끄는 것입니다.
1. 셀러리의 작업자 시작 창에서 내보내기 PYTHONOPTIMIZE=1을 설정하거나 셀러리 매개변수 -O OPTIMIZATION을 설정합니다
2. process.py의 102번째 줄 아래에 있는 Python 패키지 다중 처리를 주석 처리하고 Assert를 닫습니다
우리 모두 django를 사용하므로 CRUD가 표준인 것 같으므로 post_save 신호를 사용해 보는 것이 좋습니다
직접 배포.delay(**params)
해결하셨나요? 동일한 문제가 발생했어야 했는데, 지연 실행에서 출력이 나오지만 실제로는 실행되지 않습니다