這篇文章主要介紹了Python透過future處理並發問題,非常不錯,具有參考借鑒價值,需要的朋友可以參考下
future初識
透過下面腳本來對future進行一個初步了解:
#範例1:普通透過循環的方式
import os import time import sys import requests POP20_CC = ( "CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR" ).split() BASE_URL = 'http://flupy.org/data/flags' DEST_DIR = 'downloads/' def save_flag(img,filename): path = os.path.join(DEST_DIR,filename) with open(path,'wb') as fp: fp.write(img) def get_flag(cc): url = "{}/{cc}/{cc}.gif".format(BASE_URL,cc=cc.lower()) resp = requests.get(url) return resp.content def show(text): print(text,end=" ") sys.stdout.flush() def download_many(cc_list): for cc in sorted(cc_list): image = get_flag(cc) show(cc) save_flag(image,cc.lower()+".gif") return len(cc_list) def main(download_many): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time()-t0 msg = "\n{} flags downloaded in {:.2f}s" print(msg.format(count,elapsed)) if __name__ == '__main__': main(download_many)
範例2:透過future方式實現,這裡對上面的部分程式碼進行了重複使用
from concurrent import futures from flags import save_flag, get_flag, show, main MAX_WORKERS = 20 def download_one(cc): image = get_flag(cc) show(cc) save_flag(image, cc.lower()+".gif") return cc def download_many(cc_list): workers = min(MAX_WORKERS,len(cc_list)) with futures.ThreadPoolExecutor(workers) as executor: res = executor.map(download_one, sorted(cc_list)) return len(list(res)) if __name__ == '__main__': main(download_many)
分別運作三次,兩者的平均速度:13.67和1.59s,可以看到差別還是非常大的。
future
future是concurrent.futures模組和asyncio模組的重要元件
#從python3.4開始標準函式庫中有兩個名為Future的類別:concurrent.futures.Future和asyncio.Future
這兩個類別的作用相同:兩個Future類別的實例都表示可能完成或尚未完成的延遲計算。與Twisted中的Deferred類別、Tornado框架中的Future類別的功能類似
注意:通常情況下自己不應該創建future,而是由並發框架(concurrent.futures或asyncio)實例化
原因:future表示終將發生的事情,而確定某件事情會發生的唯一方式是執行的時間已經安排好,因此只有把某件事情交給concurrent.futures.Executor子類處理時,才會建立concurrent.futures.Future實例。
如:Executor.submit()方法的參數是一個可呼叫的對象,呼叫這個方法後會為傳入的可呼叫物件排定時間,並傳回一個
future
客戶端程式碼不能應該改變future的狀態,並發框架在future表示的延遲計算結束後會改變期物的狀態,我們無法控制計算何時結束。
這兩種future都有.done()方法,這個方法不阻塞,回傳值是布林值,指明future連結的可呼叫物件是否已經執行。客戶端程式碼通常不會詢問future是否運行結束,而是會等待通知。因此兩個Future類別都有.add_done_callback()方法,這個方法只有一個參數,型別是可呼叫的對象,future運行結束後會呼叫指定的可呼叫對象。
.result()方法是在兩個Future類別中的作用相同:傳回可呼叫物件的結果,或是重新拋出執行可呼叫的物件時拋出的例外。但是如果future沒有運行結束,result方法在兩個Futrue類別中的行為差異非常大。
對concurrent.futures.Future實例來說,呼叫.result()方法會阻塞呼叫方所在的線程,直到有結果可返回,此時,result方法可以接收可選的timeout參數,如果在指定的時間內future沒有運行完畢,會拋出TimeoutError異常。
而asyncio.Future.result方法不支援設定超時時間,在取得future結果最好使用yield from結構,但是concurrent.futures.Future不能這樣做
#不管是asyncio還是concurrent.futures.Future都會有幾個函數是返回future,其他函數則是使用future,在最開始的例子中我們使用的Executor.map就是在使用future,返回值是一個迭代器,迭代器的__next__方法呼叫各個future的result方法,因此我們得到的是各個futrue的結果,而不是future本身
關於future.as_completed函數的使用,這裡我們用了兩個循環,一個用於創建並排定future,另外一個用於獲取future的結果
from concurrent import futures from flags import save_flag, get_flag, show, main MAX_WORKERS = 20 def download_one(cc): image = get_flag(cc) show(cc) save_flag(image, cc.lower()+".gif") return cc def download_many(cc_list): cc_list = cc_list[:5] with futures.ThreadPoolExecutor(max_workers=3) as executor: to_do = [] for cc in sorted(cc_list): future = executor.submit(download_one,cc) to_do.append(future) msg = "Secheduled for {}:{}" print(msg.format(cc,future)) results = [] for future in futures.as_completed(to_do): res = future.result() msg = "{}result:{!r}" print(msg.format(future,res)) results.append(res) return len(results) if __name__ == '__main__': main(download_many)
結果如下:
注意:Python程式碼是無法控制GIL,標準庫中所有執行阻塞型IO操作的函數,在等待作業系統返回結果時都會釋放GIL.運行其他線程執行,也正是因為這樣,Python線程可以在IO密集型應用中發揮作用
以上都是concurrent.futures啟動線程,下面透過它啟動進程
concurrent.futures啟動進程
##concurrent.futures中的ProcessPoolExecutor類把工作分配給多個Python進程處理,因此,如果需要做CPU密集型處理,使用這個模組能繞過GIL,利用所有的CPU核心。 其原理是一個ProcessPoolExecutor創建了N個獨立的Python解釋器,N是系統上面可用的CPU核數。 使用方法和ThreadPoolExecutor方法一樣以上是Python如何透過future處理並發問題的實例詳解的詳細內容。更多資訊請關注PHP中文網其他相關文章!