The Python standard library provides us with the threading and multiprocessing modules to write corresponding multi-threading/multi-process code. However, when the project reaches a certain scale, frequent creation/destruction of processes or threads is very resource intensive. Yes, at this time we have to write our own thread pool/process pool to trade space for time. But starting from Python3.2, the standard library provides us with the concurrent.futures module, which provides two classes: ThreadPoolExecutor and ProcessPoolExecutor, which realizes further abstraction of threading and multiprocessing. , provides direct support for writing thread pools/process pools.
The basis of the concurrent.futures module is Executor. Executor is an abstract class and cannot be used directly. However, the two subclasses ThreadPoolExecutor and ProcessPoolExecutor it provides are very useful. As the names suggest, they are used to create thread pool and process pool codes respectively. We can put the corresponding tasks directly into the thread pool/process pool, and there is no need to maintain the Queue to worry about deadlocks. The thread pool/process pool will automatically schedule it for us.
Future I believe that friends who have experience in programming under java and nodejs will be familiar with this concept. You can understand it as an operation to be completed in the future, this It is the basis of asynchronous programming. In traditional programming mode, for example, when we operate queue.get, blocking will occur before waiting for the result to be returned, and the CPU cannot be freed to do other things. The introduction of Future helps us to complete the task during the waiting period. Other operations. Regarding asynchronous IO in Python, you can refer to my Python concurrent programming coroutine/asynchronous IO after reading this article.
p.s: If you are still sticking to Python2.x, please install the futures module first.
pip install futures
Let’s first understand the concept of thread pool through the following code
# example1.py from concurrent.futures import ThreadPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ThreadPoolExecutor(max_workers=2) # 创建一个最大可容纳2个task的线程池 future1 = pool.submit(return_future_result, ("hello")) # 往线程池里面加入一个task future2 = pool.submit(return_future_result, ("world")) # 往线程池里面加入一个task print(future1.done()) # 判断task1是否结束 time.sleep(3) print(future2.done()) # 判断task2是否结束 print(future1.result()) # 查看task1返回的结果 print(future2.result()) # 查看task2返回的结果
We based on the running results Let’s analyze it. We use the submit method to add a task to the thread pool, and submit returns a Future object. The Future object can be simply understood as an operation completed in the future. In the first print statement, it is obvious that our future1 has not been completed because of time.sleep(2), because we used time.sleep(3) to pause the main thread, so when it comes to the second print statement, our thread pool All tasks here have been completed.
ziwenxie :: ~ » python example1.py False True hello world # 在上述程序执行的过程中,通过ps命令我们可以看到三个线程同时在后台运行 ziwenxie :: ~ » ps -eLf | grep python ziwenxie 8361 7557 8361 3 3 19:45 pts/0 00:00:00 python example1.py ziwenxie 8361 7557 8362 0 3 19:45 pts/0 00:00:00 python example1.py ziwenxie 8361 7557 8363 0 3 19:45 pts/0 00:00:00 python example1.py
We can also rewrite the above code into a process pool form. The API and thread pool are exactly the same, so I won’t be wordy.
# example2.py from concurrent.futures import ProcessPoolExecutor import time def return_future_result(message): time.sleep(2) return message pool = ProcessPoolExecutor(max_workers=2) future1 = pool.submit(return_future_result, ("hello")) future2 = pool.submit(return_future_result, ("world")) print(future1.done()) time.sleep(3) print(future2.done()) print(future1.result()) print(future2.result())
The following are the running results
ziwenxie :: ~ » python example2.py False True hello world ziwenxie :: ~ » ps -eLf | grep python ziwenxie 8560 7557 8560 3 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8560 7557 8563 0 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8560 7557 8564 0 3 19:53 pts/0 00:00:00 python example2.py ziwenxie 8561 8560 8561 0 1 19:53 pts/0 00:00:00 python example2.py ziwenxie 8562 8560 8562 0 1 19:53 pts/0 00:00:00 python example2.py
In addition to submit, Exectuor also provides us with the map method and built-in The usage of map is similar. Let's compare the difference between the two through two examples.
# example3.py import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))
As can be seen from the running results, as_completed is not returned in the order of the URLS list elements.
ziwenxie :: ~ » python example3.py 'http://example.com/' page is 1270 byte 'https://api.github.com/' page is 2039 bytes 'http://httpbin.org' page is 12150 bytes
# example4.py import concurrent.futures import urllib.request URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url): with urllib.request.urlopen(url, timeout=60) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: for url, data in zip(URLS, executor.map(load_url, URLS)): print('%r page is %d bytes' % (url, len(data)))
As can be seen from the running results, map returns in the order of the URLS list elements, and the code written is more concise and intuitive. We You can choose any one according to your specific needs.
ziwenxie :: ~ » python example4.py 'http://httpbin.org' page is 12150 bytes 'http://example.com/' page is 1270 bytes 'https://api.github.com/' page is 2039 bytes
The wait method will return a tuple (tuple). The tuple contains two sets (sets), one is completed (completed) and the other Is uncompleted (unfinished). One advantage of using the wait method is to gain greater freedom. It receives three parameters: FIRST_COMPLETED, FIRST_EXCEPTION and ALL_COMPLETE. The default setting is ALL_COMPLETED.
Let’s take a look at the difference between the three parameters through the following example
from concurrent.futures import ThreadPoolExecutor, wait, as_completed from time import sleep from random import randint def return_after_random_secs(num): sleep(randint(1, 5)) return "Return of {}".format(num) pool = ThreadPoolExecutor(5) futures = [] for x in range(5): futures.append(pool.submit(return_after_random_secs, x)) print(wait(futures)) # print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))
If the default ALL_COMPLETED is used, the program will block until all tasks in the thread pool are completed.
ziwenxie :: ~ » python example5.py DoneAndNotDoneFutures(done={ <Future at 0x7f0b06c9bc88 state=finished returned str>, <Future at 0x7f0b06cbaa90 state=finished returned str>, <Future at 0x7f0b06373898 state=finished returned str>, <Future at 0x7f0b06352ba8 state=finished returned str>, <Future at 0x7f0b06373b00 state=finished returned str>}, not_done=set())
If the FIRST_COMPLETED parameter is used, the program will not wait until all tasks in the thread pool are completed.
ziwenxie :: ~ » python example5.py DoneAndNotDoneFutures(done={ <Future at 0x7f84109edb00 state=finished returned str>, <Future at 0x7f840e2e9320 state=finished returned str>, <Future at 0x7f840f25ccc0 state=finished returned str>}, not_done={<Future at 0x7f840e2e9ba8 state=running>, <Future at 0x7f840e2e9940 state=running>})
Write a small program to compare the execution efficiency gap between multiprocessing.pool(ThreadPool) and ProcessPollExecutor(ThreadPoolExecutor), and think about why this happens based on the Future mentioned above result.
The above is the detailed content of Python concurrent programming thread pool/process pool. For more information, please follow other related articles on the PHP Chinese website!