python - 使用multiprocessing.Process调用start方法后,有较小的几率子进程中run方法未执行
ringa_lee
ringa_lee 2017-04-18 09:04:50
0
1
904

继承multiprocessing.Process实现了一个worker类,在父进程中,自己实现了一个最多启动N的限制(出问题的环境是30个)。
实际运行中发现,大约有万分之二(当前每天运行46000+次,大约出现11次)的概率,子进程创建后run方法未执行。
代码和日志如下,注意打印日志的语句
父进程启动子进程(父进程里还有一个控制并发进程数量的逻辑,如果需要的话我贴出来):

...
    def run_task(self, task):
        logging.info('execute monitor %s' % task['id'])
        worker = execute_worker.ExecuteWorkerProcess(task)
        logging.debug('execute process start %s' % task['id'])
        worker.start()
        logging.info('worker pid is %s (%s)' % (worker.pid, task['id']))
        logging.debug('execute process started %s' % task['id'])
        self.worker_pool.append(worker)
...

子进程run方法

class ExecuteWorkerProcess(multiprocessing.Process):
...
    def __init__(self, task):
        super(ExecuteWorkerProcess, self).__init__()
        self.stopping = False
        self.task = task
        self.worker = ExecuteWorker(task)
        if 'task' in task:
            self.routine = False
        else:
            self.routine = True
        self.zk = None
        logging.debug('process created %s' % self.task['id'])
...
    def run(self):
        logging.debug('process start %s' % self.task['id'])
        try:
            logging.debug('process run before %s' % self.task['id'])
            self._run()
            logging.debug('process run after %s' % self.task['id'])
        except:
            logging.exception('')
            title = u'监控执行进程报错'
            text = u'监控项id:%s\n错误信息:\n%s' % (self.task['id'], traceback.format_exc())
            warning.email_warning(title, text, to_dev=True)
        logging.debug('process start done %s' % self.task['id'])
...

出现问题的进程日志如下:

正常任务日志如下:

可以看到正常和异常的日志主进程中都打印除了子进程的pid,但是异常继承子进程run行数的第一行没有执行。
是否有人遇到过?这个是不是multiprocessing.Process的坑,有没有规避办法...

ringa_lee
ringa_lee

ringa_lee

répondre à tous(1)
左手右手慢动作

La raison a été trouvée. En raison de l'utilisation de thread+mutiprocessing (fork) dans le processus principal, un blocage se produit dans la journalisation. Le phénomène est que la première phrase de journalisation dans le processus enfant se bloque. Le problème ne se produit que sous Linux.
J'ai trouvé la méthode de reproduction après avoir lu cette réponse sur stckoverflow, une autre réponse, la solution
Démo de reproduction :

#coding=utf-8
import os
import time
import logging
import threading
import multiprocessing
import logging.handlers


def init_log(log_path, level=logging.INFO, when="midnight", backup=7,
             format="%(levelname)s:[%(asctime)s][%(filename)s:%(lineno)d][%(process)s][%(thread)d] %(message)s",
             datefmt="%Y-%m-%d %H:%M:%S"):
    formatter = logging.Formatter(format, datefmt)
    logger = logging.getLogger()
    logger.setLevel(level)

    dir = os.path.dirname(log_path)
    if not os.path.isdir(dir):
        os.makedirs(dir)

    handler = logging.handlers.TimedRotatingFileHandler(log_path + ".log",
                                                        when=when,
                                                        backupCount=backup)
    handler.setLevel(level)
    handler.setFormatter(formatter)
    logger.addHandler(handler)


stop = False

class Worker(multiprocessing.Process):
    u"""
    多进程方式执行任务,使用CPU密集型
    """
    def __init__(self):
        super(Worker, self).__init__()
        self.reset_ts = time.time() + 3
        self.stopping = False

    def run(self):
        logging.info('Process pid is %s' % os.getpid())

    def check_timeout(self, now):
        u"""
        若当前时间已超过复位时间,则结束进程
        :param now:
        :return:
        """
        global stop
        if now > self.reset_ts and not self.stopping:
            self.stopping = True
            logging.info('error pid is %s' % os.getpid())
            stop = True


def main():
    global stop
    worker_pool = []
    while 1:
        if worker_pool:
            now = time.time()
            for worker in worker_pool:
                worker.check_timeout(now)
                if stop:
                    logging.error('Process not run, exit!')
                    exit(-1)

        alive_workers = [worker for worker in worker_pool if worker.is_alive()]
        over_workers = list(set(alive_workers) ^ set(worker_pool))
        for over_worker in over_workers:
            over_worker.join()
        worker_pool = alive_workers
        if len(worker_pool) < 1000:
            logging.info('create worker')
            worker = Worker()
            worker.start()
            logging.info('worker pid is %s' % worker.pid)
            worker_pool.append(worker)
        time.sleep(0.001)


class ExecuteThread(threading.Thread):
    def run(self):
        main()

class ExecuteThread2(threading.Thread):
    def run(self):
        global stop
        while 1:
            logging.info('main thread')
            time.sleep(0.001)
            if stop:
                exit(-1)

if __name__ == '__main__':
    init_log('/yourpath/timeout')
    #main()

    thread = ExecuteThread()
    thread2 = ExecuteThread2()
    thread.start()
    thread2.start()
    thread.join()
    thread2.join()

Après la reproduction, n'oubliez pas d'effacer le processus bloqué....

Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal