• 技术文章 >后端开发 >Python教程

    python中进程间数据通讯模块multiprocessing.Manager的介绍

    不言不言2019-03-23 11:08:47转载2831

    本篇文章给大家带来的内容是关于python中进程间数据通讯模块multiprocessing.Manager的介绍,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

    目前开发中有遇到进程间需要共享数据的情况. 所以研究了下multiprocessing.Manager, 主要会以dict为例子, 说明下进程间共享(同一个父进程).

    dict使用说明

    import multiprocessing
    # 1. 创建一个Manger对象
    manager = multiprocessing.Manager()
    # 2. 创建一个dict
    temp_dict = manager.dict()
    # 3. 创建一个测试程序
    def test(idx, test_dict):
        test_dict[idx] = idx
    # 4. 创建进程池进行测试
    pool = multiprocessing.Pool(4)
    for i in range(100):
        pool.apply_async(test, args=(i, temp_dict))
    pool.close()
    pool.join()
    print(temp_dict)

    too simple.

    简单的源码分析

    这时我们再看一个例子

    import multiprocessing
    # 1. 创建一个Manger对象
    manager = multiprocessing.Manager()
    # 2. 创建一个dict
    temp_dict = manager.dict()
    temp_dict['test'] = {}
    # 3. 创建一个测试程序
    def test(idx, test_dict):
        test_dict['test'][idx] = idx
    # 4. 创建进程池进行测试
    pool = multiprocessing.Pool(4)
    for i in range(100):
        pool.apply_async(test, args=(i, temp_dict))
    pool.close()
    pool.join()
    print(temp_dict)

    可以看到输出结果是奇怪的{'test': {}}
    如果我们简单修改一下代码

    import multiprocessing
    # 1. 创建一个Manger对象
    manager = multiprocessing.Manager()
    # 2. 创建一个dict
    temp_dict = manager.dict()
    temp_dict['test'] = {}
    # 3. 创建一个测试程序
    def test(idx, test_dict):
        row = test_dict['test']
        row[idx] = idx
        test_dict['test'] = row
    # 4. 创建进程池进行测试
    pool = multiprocessing.Pool(4)
    for i in range(100):
        pool.apply_async(test, args=(i, temp_dict))
    pool.close()
    pool.join()
    print(temp_dict)

    这时输出结果就符合预期了.

    为了了解这个现象背后的原因, 我简单去读了一下源码, 主要有以下几段代码很关键.

    def Manager():
        '''
        Returns a manager associated with a running server process
    
        The managers methods such as `Lock()`, `Condition()` and `Queue()`
        can be used to create shared objects.
        '''
        from multiprocessing.managers import SyncManager
        m = SyncManager()
        m.start()
        return m
        
    ...
        def start(self, initializer=None, initargs=()):
            '''
            Spawn a server process for this manager object
            '''
            assert self._state.value == State.INITIAL
    
            if initializer is not None and not hasattr(initializer, '__call__'):
                raise TypeError('initializer must be a callable')
    
            # pipe over which we will retrieve address of server
            reader, writer = connection.Pipe(duplex=False)
    
            # spawn process which runs a server
            self._process = Process(
                target=type(self)._run_server,
                args=(self._registry, self._address, self._authkey,
                      self._serializer, writer, initializer, initargs),
                )
            ident = ':'.join(str(i) for i in self._process._identity)
            self._process.name = type(self).__name__  + '-' + ident
            self._process.start()
    ...

    上面代码可以看出, 当我们声明了一个Manager对象的时候, 程序实际在其他进程启动了一个server服务, 这个server是阻塞的, 以此来实现进程间数据安全.
    我的理解就是不同进程之间操作都是互斥的, 一个进程向server请求到这部分数据, 再把这部分数据修改, 返回给server, 之后server再去处理其他进程的请求.

    回到上面的奇怪现象上, 这个操作test_dict['test'][idx] = idx实际上在拉取到server上的数据后进行了修改, 但并没有返回给server, 所以temp_dict的数据根本没有变化. 在第二段正常代码, 就相当于先向服务器请求数据, 再向服务器传送修改后的数据. 这样就可以解释这个现象了.

    进程间数据安全

    这个时候如果出现一种情况, 两个进程同时请求了一份相同的数据, 分别进行修改, 再提交到server上会怎么样呢? 那当然是数据产生异常. 基于此, 我们需要Manager的另一个对象, Lock(). 这个对象也不难理解, Manager本身就是一个server, dict跟lock都来自于这个server, 所以当你lock住的时候, 其他进程是不能取到数据, 自然也不会出现上面那种异常情况.

    代码示例:

    import multiprocessing
    # 1. 创建一个Manger对象
    manager = multiprocessing.Manager()
    # 2. 创建一个dict
    temp_dict = manager.dict()
    lock = manager.Lock()
    temp_dict['test'] = {}
    # 3. 创建一个测试程序
    def test(idx, test_dict, lock):
        lock.acquire()
        row = test_dict['test']
        row[idx] = idx
        test_dict['test'] = row
        lock.release()
    # 4. 创建进程池进行测试
    pool = multiprocessing.Pool(4)
    for i in range(100):
        pool.apply_async(test, args=(i, temp_dict, lock))
    pool.close()
    pool.join()
    print(temp_dict)

    切忌不要进程里自己新建lock对象, 要使用统一的lock对象.

    本篇文章到这里就已经全部结束了,更多其他精彩内容可以关注PHP中文网的python视频教程栏目!

    以上就是python中进程间数据通讯模块multiprocessing.Manager的介绍的详细内容,更多请关注php中文网其它相关文章!

    声明:本文转载于:segmentfault,如有侵犯,请联系admin@php.cn删除
    专题推荐:python
    上一篇:Python如何获取列表长度?(代码示例) 下一篇:自己动手写 PHP MVC 框架(40节精讲/巨细/新人进阶必看)

    相关文章推荐

    • Python如何删除除字母和数字之外的所有字符?(代码示例)• Python如何计算列表中所有数字的乘积?(代码示例)• 如何获取当前使用的Python版本信息?(代码示例)• Python实现给照片换底色(附代码)• Python中is 和 ==的详细解析(附代码)
    1/1

    PHP中文网