Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
224 views
in Technique[技术] by (71.8m points)

django+celery+ansibleApi无返回

1.python调用AnsibleApi远程执行任务,不用celery的情况下能正确运行,使用的话返回为空.pdb调试发现是调用Ansible返回异常,但具体原因几天实在无法查出

2.代码复现如现如下:

  • tasks.py

from celery import shared_task
from .deploy_tomcat2 import django_process


@shared_task
def deploy(jira_num):
    #return 'hello world {0}'.format(jira_num)
    #rdb.set_trace()
    return django_process(jira_num)
  • deploy_tomcat2.py

from .AnsibleApi import CallApi

def django_process(jira_num):
    server = '10.10.10.30'
    name = 'abc'
    port = 11011
    code = 'efs'
    jdk = '1.12.13'
    jvm = 'xxxx'

    if str.isdigit(jira_num):
        # import pdb
        # pdb.set_trace()
        call = CallApi(server,name,port,code,jdk,jvm)
        return call.run_task()
  • AnsibleApi.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
from .Logger import Logger
from django.conf import settings
from collections import namedtuple
from ansible.parsing.dataloader import DataLoader
from ansible.vars import VariableManager
from ansible.inventory import Inventory
from ansible.playbook.play import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.plugins.callback import CallbackBase

Log = Logger('/tmp/auto_deploy_tomcat.log',logging.INFO)


class ResultCallback(CallbackBase):
    def __init__(self, *args, **kwargs):
        super(ResultCallback ,self).__init__(*args, **kwargs)
        self.host_ok = {}
        self.host_unreachable = {}
        self.host_failed = {}

    def v2_runner_on_unreachable(self, result):
        self.host_unreachable[result._host.get_name()] = result

    def v2_runner_on_ok(self, result, *args, **kwargs):
        self.host_ok[result._host.get_name()] = result

    def v2_runner_on_failed(self, result, *args, **kwargs):
        self.host_failed[result._host.get_name()] = result


class CallApi(object):
    user = settings.SSH_USER
    ssh_private_key_file = settings.SSH_PRIVATE_KEY_FILE
    results_callback = ResultCallback()
    Options = namedtuple('Options',
                         ['connection', 'module_path', 'private_key_file', 'forks', 'become', 'become_method',
                          'become_user', 'check']) 

    def __init__(self,ip,name,port,code,jdk,jvm):
        self.ip = ip
        self.name = name
        self.port = port
        self.code = code
        self.jdk = jdk
        self.jvm = jvm
        self.results_callback = ResultCallback()
        self.results_raw = {}

    def _gen_user_task(self):
        tasks = []
        deploy_script = 'autodeploy/tomcat_deploy.sh'
        dst_script = '/tmp/tomcat_deploy.sh'
        cargs = dict(src=deploy_script, dest=dst_script, owner=self.user, group=self.user, mode='0755')
        args = "%s %s %d %s %s '%s'" % (dst_script, self.name, self.port, self.code, self.jdk, self.jvm)
        tasks.append(dict(action=dict(module='copy', args=cargs),register='shell_out'))
        tasks.append(dict(action=dict(module='debug', args=dict(msg='{{shell_out}}'))))
        # tasks.append(dict(action=dict(module='command', args=args)))
        # tasks.append(dict(action=dict(module='command', args=args), register='result'))
        # tasks.append(dict(action=dict(module='debug', args=dict(msg='{{result.stdout}}'))))
        self.tasks = tasks

    def _set_option(self):
        self._gen_user_task()

        self.variable_manager = VariableManager()
        self.loader = DataLoader()
        self.options = self.Options(connection='smart', module_path=None, private_key_file=self.ssh_private_key_file, forks=None,
                                    become=True, become_method='sudo', become_user='root', check=False)
        self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=[self.ip])
        self.variable_manager.set_inventory(self.inventory)

        play_source = dict(
        name = "auto deploy tomcat",
            hosts = self.ip,
            remote_user = self.user,
            gather_facts='no',
            tasks = self.tasks
        )
        self.play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader)

    def run_task(self):
        self.results_raw = {'success':{}, 'failed':{}, 'unreachable':{}}
        tqm = None
        from celery.contrib import rdb;rdb.set_trace()
        #import pdb;pdb.set_trace()
        self._set_option()
        try:
            tqm = TaskQueueManager(
                inventory=self.inventory,
                variable_manager=self.variable_manager,
                loader=self.loader,
                options=self.options,
                passwords=None,
                stdout_callback=self.results_callback,
            )
            result = tqm.run(self.play)
        finally:
            if tqm is not None:
                tqm.cleanup()

        for host, result in self.results_callback.host_ok.items():
            self.results_raw['success'][host] = result._result

        for host, result in self.results_callback.host_failed.items():
            self.results_raw['failed'][host] = result._result

        for host, result in self.results_callback.host_unreachable.items():
            self.results_raw['unreachable'][host]= result._result
        Log.info("result is :%s" % self.results_raw)
        return self.results_raw

  • 复现方法

  • 启动celery worker:
    celery -A jira worker -Q queue.ops.deploy -n "deploy.%h" -l info

  • 另一窗口生产消息:
    deploy.apply_async(args=['150'], queue='queue.ops.deploy', routing_key='ops.deploy')


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

有两种方法解决这个问题,就是关闭assert:
1.在celery 的worker启动窗口设置export PYTHONOPTIMIZE=1或打开celery这个参数-O OPTIMIZATION
2.注释掉python包multiprocessing下面process.py中102行,关闭assert


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...