行程
Python 中的多線程其實不是真正的多線程,如果想要充分地使用多核心 CPU 的資源,在 Python 中大部分情況都需要使用多進程。 Python 提供了非常好用的多進程套件 multiprocessing,只需要定義一個函數,Python 會完成其他所有事情。借助這個套件,可以輕鬆完成從單一進程到並發執行的轉換。 multiprocessing 支援子進程、通訊和共享資料、執行不同形式的同步,提供了 Process、Queue、Pipe、Lock 等元件。
1、類別Process
建立流程的類別:Process([group [, target [, name [, args [, kwargs]]]]])
target 表示呼叫物件
args 表示呼叫物件的位置參數元組
#kwargs表示呼叫物件的字典
name為別名
group實質上不使用
下面看一個創建函數並將其作為多個進程的例子:
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import multiprocessing
import time
def worker(interval, name):
print(name + '【start】')
time.sleep(interval)
print(name + '【end】')
if __name__ == "__main__":
p1 = multiprocessing.Process(target=worker, args=(2, '两点水1'))
p2 = multiprocessing.Process(target=worker, args=(3, '两点水2'))
p3 = multiprocessing.Process(target=worker, args=(4, '两点水3'))
p1.start()
p2.start()
p3.start()
print("The number of CPU is:" + str(multiprocessing.cpu_count()))
for p in multiprocessing.active_children():
print("child p.name:" + p.name + "\tp.id" + str(p.pid))
print("END!!!!!!!!!!!!!!!!!")輸出的結果:
多進程輸出結果
2、把進程創建成類
當然我們也可以把進程創建成一個類,如下面的例子,當進程p 呼叫start() 時,自動調用run() 方法。
# -*- coding: UTF-8 -*-
import multiprocessing
import time
class ClockProcess(multiprocessing.Process):
def __init__(self, interval):
multiprocessing.Process.__init__(self)
self.interval = interval
def run(self):
n = 5
while n > 0:
print("当前时间: {0}".format(time.ctime()))
time.sleep(self.interval)
n -= 1
if __name__ == '__main__':
p = ClockProcess(3)
p.start()輸出結果如下:
建立進程類別
#3、daemon 屬性
想知道daemon 屬性有什麼用,看下面兩個例子吧,一個加了daemon 屬性,一個沒有加,對比輸出的結果:
沒有加deamon 屬性的例子:
# -*- coding: UTF-8 -*-
import multiprocessing
import time
def worker(interval):
print('工作开始时间:{0}'.format(time.ctime()))
time.sleep(interval)
print('工作结果时间:{0}'.format(time.ctime()))
if __name__ == '__main__':
p = multiprocessing.Process(target=worker, args=(3,))
p.start()
print('【EMD】')輸出結果:
【EMD】 工作开始时间:Mon Oct 9 17:47:06 2017 工作结果时间:Mon Oct 9 17:47:09 2017
在上面範例中,進程p 新增daemon 屬性:
# -*- coding: UTF-8 -*-
import multiprocessing
import time
def worker(interval):
print('工作开始时间:{0}'.format(time.ctime()))
time.sleep(interval)
print('工作结果时间:{0}'.format(time.ctime()))
if __name__ == '__main__':
p = multiprocessing.Process(target=worker, args=(3,))
p.daemon = True
p.start()
print('【EMD】')輸出結果:
【EMD】
根據輸出結果可見,如果在子進程中新增了daemon 屬性,那麼當主進程結束的時候,子進程也會跟著結束。所以沒有列印子進程的資訊。
4、join 方法
結合上面的範例繼續,如果我們想要讓子執行緒執行完該怎麼做呢?
那麼我們可以用到 join 方法,join 方法的主要作用是:阻塞目前進程,直到呼叫 join 方法的那個進程執行完,然後再繼續執行目前進程。
因此看下加了join 方法的例子:
import multiprocessing
import time
def worker(interval):
print('工作开始时间:{0}'.format(time.ctime()))
time.sleep(interval)
print('工作结果时间:{0}'.format(time.ctime()))
if __name__ == '__main__':
p = multiprocessing.Process(target=worker, args=(3,))
p.daemon = True
p.start()
p.join()
print('【EMD】')輸出的結果:
工作开始时间:Tue Oct 10 11:30:08 2017 工作结果时间:Tue Oct 10 11:30:11 2017 【EMD】
5、Pool
如果需要很多的子進程,我們就需要一個一個的去創建嗎?
當然不用,我們可以使用進程池的方法批次建立子進程。
範例如下:
# -*- coding: UTF-8 -*-
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('进程的名称:{0} ;进程的PID: {1} '.format(name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('进程 {0} 运行了 {1} 秒'.format(name, (end - start)))
if __name__ == '__main__':
print('主进程的 PID:{0}'.format(os.getpid()))
p = Pool(4)
for i in range(6):
p.apply_async(long_time_task, args=(i,))
p.close()
# 等待所有子进程结束后在关闭主进程
p.join()
print('【End】')輸出的結果如下:
主进程的 PID:7256 进程的名称:0 ;进程的PID: 1492 进程的名称:1 ;进程的PID: 12232 进程的名称:2 ;进程的PID: 4332 进程的名称:3 ;进程的PID: 11604 进程 2 运行了 0.6500370502471924 秒 进程的名称:4 ;进程的PID: 4332 进程 1 运行了 1.0830621719360352 秒 进程的名称:5 ;进程的PID: 12232 进程 5 运行了 0.029001712799072266 秒 进程 4 运行了 0.9720554351806641 秒 进程 0 运行了 2.3181326389312744 秒 进程 3 运行了 2.5331451892852783 秒 【End】
這裡有一點要注意: Pool 物件呼叫join() 方法會等待所有子程序執行完畢,呼叫join() 之前必須先呼叫close() ,呼叫close() 之後就不能繼續加入新的Process 了。
請注意輸出的結果,子進程0,1,2,3是立刻執行的,而子進程4 要等待前面某個子進程完成後才執行,這是因為Pool 的預設大小在我的電腦上是4,因此,最多同時執行4 個行程。這是 Pool 有意設計的限制,並不是作業系統的限制。如果改成:
p = Pool(5)
就可以同時跑 5 個進程。
6、進程間通訊
Process 之間一定是需要通訊的,作業系統提供了許多機制來實現進程間的通訊。 Python 的 multiprocessing 模組包裝了底層的機制,提供了Queue、Pipes 等多種方式來交換資料。
以 Queue 為例,在父進程中建立兩個子進程,一個往 Queue 寫數據,一個從 Queue 讀取資料:
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
from multiprocessing import Process, Queue
import os, time, random
def write(q):
# 写数据进程
print('写进程的PID:{0}'.format(os.getpid()))
for value in ['两点水', '三点水', '四点水']:
print('写进 Queue 的值为:{0}'.format(value))
q.put(value)
time.sleep(random.random())
def read(q):
# 读取数据进程
print('读进程的PID:{0}'.format(os.getpid()))
while True:
value = q.get(True)
print('从 Queue 读取的值为:{0}'.format(value))
if __name__ == '__main__':
# 父进程创建 Queue,并传给各个子进程
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程 pw
pw.start()
# 启动子进程pr
pr.start()
# 等待pw结束:
pw.join()
# pr 进程里是死循环,无法等待其结束,只能强行终止
pr.terminate()輸出的結果為:
读进程的PID:13208 写进程的PID:10864 写进 Queue 的值为:两点水 从 Queue 读取的值为:两点水 写进 Queue 的值为:三点水 从 Queue 读取的值为:三点水 写进 Queue 的值为:四点水 从 Queue 读取的值为:四点水
- 課程推薦
- 課件下載
課件暫不提供下載,工作人員正在整理中,後期請多關注該課程~ 















