Detailed explanation of Python threads, processes and coroutines

Release: 2017-03-18 11:58:33
1728 people have browsed it


Interpreter environment: python3.5.1

We all know that socket and socketserver are two must-learn modules for python network programming. Among them, socketserver is a module that supports IO multiplexing. and multi-threaded, multi-process modules. Generally, we will write this sentence in the socketserver server code:

server = socketserver.ThreadingTCPServer(settings.IP_PORT, MyServer)

ThreadingTCPServer is a class that supports multiple The inheritance relationship between threads and the socketserver of the TCP protocol is as follows:

class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

The TCPServer on the right is actually its main The functional parent class, and the ThreadingMixIn on the left is a class that implements multi-threading, and it does not have any code itself.

MixIn is very common in python class naming. It is generally called "mixing in" and jokingly called "mixing in". It is usually inherited by subclasses for some important functions.

class ThreadingMixIn: daemon_threads = False def process_request_thread(self, request, client_address): try: self.finish_request(request, client_address) self.shutdown_request(request) except: self.handle_error(request, client_address) self.shutdown_request(request) def process_request(self, request, client_address): t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.start()
Copy after login

In the ThreadingMixIn class, one attribute and two methods are actually defined. What is actually called in the process_request method is Python's built-in multi-threading module threading. This module is the basis for all multithreading in Python, and socketserver essentially utilizes this module.

1. Threads

Threads, sometimes called lightweight processes (Lightweight Process, LWP), are the smallest units of program execution flow. A standard thread consists of a thread ID, a current instruction pointer (PC), a register set and a stack. In addition, a thread is an entity in the process and is the basic unit that is independently scheduled and dispatched by the system. The thread itself does not independently own system resources, but it can share all the resources owned by the process with other threads belonging to the same process. A thread can create and destroy another thread, and multiple threads in the same process can execute concurrently. Due to the mutual constraints between threads, threads show discontinuity in their operation. Threads also have three basic states: ready, blocked and running. The ready state means that the thread has all the conditions to run, can run logically, and is waiting for the processor; the running state means that the thread occupies the processor and is running; the blocking state means that the thread is waiting for an event (such as a semaphore), and the logic is not enforceable. Every application has at least one process and one thread. A thread is a single sequential flow of control in a program. Running multiple threads simultaneously in a single program to complete different tasks that are divided into pieces is called multithreading.

You don’t need to read the above paragraph! For example, if a manufacturer wants to produce a certain product, it builds many factories in its production base, and each factory has multiple production lines. All factories cooperate to produce the entire product, and all assembly lines in a certain factory produce part of the product that the factory is responsible for. Each factory has its own material library, and the production lines in the factory share these materials. To achieve production, every manufacturer must have at least one factory building and one production line. Then this manufacturer is an application; each factory is a process; each production line is a thread.

1.1 Ordinary multi-threading

In python, the threading module provides threading functions. Through it, we can easily create multiple threads in the process. The following is an example:

import threading import time def show(arg): time.sleep(1) print('thread'+str(arg)) for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start() print('main thread stop')
Copy after login

The above code creates 10 "foreground" threads, and then the controller is handed over to the CPU. The CPU schedules according to the specified algorithm and executes instructions in slices.

The following are the main methods of the Thread class:

  • start The thread is ready, waiting for CPU scheduling

  • setName is set for the thread Name

  • getName Get the thread name

  • setDaemon Set to background thread or foreground thread (default)
    If it is a background thread , During the execution of the main thread, the background thread is also in progress. After the main thread completes execution, the background thread stops regardless of success or failure. If it is a foreground thread, during the execution of the main thread, the foreground thread is also in progress. After the main thread completes execution, the program stops after waiting for the foreground thread to complete execution.

  • #join executes each thread one by one, and continues execution after completion. This method makes multi-threading meaningless.

  • run The thread automatically executes the run method of the thread object after being scheduled by the CPU

1.2 Custom thread class

For threading The Thread class in the module essentially executes its run method. Therefore, you can customize the thread class, let it inherit the Thread class, and then override the run method.

import threading class MyThreading(threading.Thread): def __init__(self,func,arg): super(MyThreading,self).__init__() self.func = func self.arg = arg def run(self): self.func(self.arg) def f1(args): print(args) obj = MyThreading(f1, 123) obj.start()
Copy after login

1.3 Thread lock

When the CPU executes a task, it is randomly scheduled between threads, and each thread may only execute n pieces of code before switching to another thread. Since resources and data are shared between multiple threads in a process, it is easy to cause resource grabbing or dirty data, so there is the concept of lock, which restricts only one thread to access a specified data at a certain time. .

1.3.1 The lock is not used

#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time NUM = 0 def show(): global NUM NUM += 1 name = t.getName() time.sleep(1) # 注意,这行语句的位置很重要,必须在NUM被修改后,否则观察不到脏数据的现象。 print(name, "执行完毕后,NUM的值为: ", NUM) for i in range(10): t = threading.Thread(target=show) t.start() print('main thread stop')
Copy after login

After the above code is run, the results are as follows:

main thread stop Thread-1 执行完毕后,NUM的值为: 10 Thread-2 执行完毕后,NUM的值为: 10 Thread-4 执行完毕后,NUM的值为: 10 Thread-9 执行完毕后,NUM的值为: 10 Thread-3 执行完毕后,NUM的值为: 10 Thread-6 执行完毕后,NUM的值为: 10 Thread-8 执行完毕后,NUM的值为: 10 Thread-7 执行完毕后,NUM的值为: 10 Thread-5 执行完毕后,NUM的值为: 10 Thread-10 执行完毕后,NUM的值为: 10
Copy after login

It can be seen that because the thread accesses a data at the same time, an incorrect result is produced . In order to solve this problem, python defines several thread lock classes in the threading module, namely:

  • Lock 普通锁(不可嵌套)

  • RLock 普通锁(可嵌套)常用

  • Semaphore 信号量

  • event 事件

  • condition 条件

1.3.2 普通锁Lock和RLock



import time import threading NUM = 10 def func(lock): global NUM lock.acquire() # 让锁开始起作用 NUM -= 1 time.sleep(1) print(NUM) lock.release() # 释放锁 lock = threading.Lock() # 实例化一个锁对象 for i in range(10): t = threading.Thread(target=func, args=(lock,)) # 记得把锁当作参数传递给func参数 t.start()
Copy after login


1.3.3 信号量(Semaphore)



#!/usr/bin/env python # -*- coding:utf-8 -*- import time import threading def run(n): semaphore.acquire() print("run the thread: %s" % n) time.sleep(1) semaphore.release() num = 0 semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行 for i in range(20): t = threading.Thread(target=run, args=(i,)) t.start()
Copy after login

1.3.4 事件(Event)


事件主要提供了三个方法 set、wait、clear。



import threading def func(e,i): print(i) e.wait() # 检测当前event是什么状态,如果是红灯,则阻塞,如果是绿灯则继续往下执行。默认是红灯。 print(i+100) event = threading.Event() for i in range(10): t = threading.Thread(target=func, args=(event, i)) t.start() event.clear() # 主动将状态设置为红灯 inp = input(">>>") if inp == "1": event.set() # 主动将状态设置为绿灯
Copy after login

1.3.5 条件(condition)



import threading def condition(): ret = False r = input(">>>") if r == "yes": ret = True return ret def func(conn, i): print(i) conn.acquire() conn.wait_for(condition) # 这个方法接受一个函数的返回值 print(i+100) conn.release() c = threading.Condition() for i in range(10): t = threading.Thread(target=func, args=(c, i,)) t.start()
Copy after login


#!/usr/bin/env python # -*- coding:utf-8 -*- import threading def run(n): con.acquire() con.wait() print("run the thread: %s" %n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start() while True: inp = input('>>>') if inp == "q": break # 下面这三行是固定语法 con.acquire() con.notify(int(inp)) # 这个方法接收一个整数,表示让多少个线程通过 con.release()
Copy after login

1.3 全局解释器锁(GIL)







在Python 3.2中实现了一个新的GIL,并且带着一些积极的结果。这是自1992年以来,GIL的一次最主要改变。旧的GIL通过对Python指令进行计数来确定何时放弃GIL。在新的GIL实现中,用一个固定的超时时间来指示当前的线程以放弃这个锁。在当前线程保持这个锁,且当第二个线程请求这个锁的时候,当前线程就会在5ms后被强制释放掉这个锁(这就是说,当前线程每5ms就要检查其是否需要释放这个锁)。当任务是可行的时候,这会使得线程间的切换更加可预测。





原文:Python’s Hardest Problem
译文:Python 最难的问题

1.4 定时器(Timer)


from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) # 表示1秒后执行hello函数 t.start()
Copy after login

1.5 队列


  • queue.Queue :先进先出队列

  • queue.LifoQueue :后进先出队列

  • queue.PriorityQueue :优先级队列

  • queue.deque :双向队列

1.5.1 Queue:先进先出队列


import queue q = queue.Queue(5) q.put(11) q.put(22) q.put(33) print(q.get()) print(q.get()) print(q.get())
Copy after login


  • maxsize 队列的最大元素个数,也就是queue.Queue(5)中的5。当队列内的元素达到这个值时,后来的元素默认会阻塞,等待队列腾出位置。

    def __init__(self, maxsize=0): self.maxsize = maxsize self._init(maxsize)
    Copy after login
  • qsize() 获取当前队列中元素的个数,也就是队列的大小

  • empty() 判断当前队列是否为空,返回True或者False

  • full() 判断当前队列是否已满,返回True或者False

  • put(self, block=True, timeout=None)


  • get(self, block=True, timeout=None)

  • join() 阻塞进程,直到所有任务完成,需要配合另一个方法task_done。

    def join(self): with self.all_tasks_done: while self.unfinished_tasks: self.all_tasks_done.wait()
    Copy after login
  • task_done() 表示某个任务完成。每一条get语句后需要一条task_done。

    import queue q = queue.Queue(5) q.put(11) q.put(22) print(q.get()) q.task_done() print(q.get()) q.task_done() q.join()
    Copy after login

1.5.2 LifoQueue:后进先出队列


import queue q = queue.LifoQueue() q.put(123) q.put(456) print(q.get())
Copy after login


1.5.3 PriorityQueue:优先级队列


q = queue.PriorityQueue() q.put((1,"alex1")) q.put((1,"alex2")) q.put((1,"alex3")) q.put((3,"alex3")) print(q.get())
Copy after login

1.5.4 deque:双向队列


q = queue.deque() q.append(123) q.append(333) q.appendleft(456) q.pop() q.popleft()
Copy after login

1.6 生产者消费者模型










#!/usr/bin/env python # -*- coding:utf-8 -*- # Author:Liu Jiang import time import queue import threading q = queue.Queue(10) def productor(i): while True: q.put("厨师 %s 做的包子!"%i) time.sleep(2) def consumer(k): while True: print("顾客 %s 吃了一个 %s"%(k,q.get())) time.sleep(1) for i in range(3): t = threading.Thread(target=productor,args=(i,)) t.start() for k in range(10): v = threading.Thread(target=consumer,args=(k,)) v.start()
Copy after login

1.7 线程池


#!/usr/bin/env python # -*- coding:utf-8 -*- # Author:Liu Jiang import queue import time import threading class MyThreadPool: def __init__(self, maxsize=5): self.maxsize = maxsize self._q = queue.Queue(maxsize) for i in range(maxsize): self._q.put(threading.Thread) def get_thread(self): return self._q.get() def add_thread(self): self._q.put(threading.Thread) def task(i, pool): print(i) time.sleep(1) pool.add_thread() pool = MyThreadPool(5) for i in range(100): t = pool.get_thread() obj = t(target=task, args=(i,pool)) obj.start()
Copy after login



#!/usr/bin/env python # -*- coding:utf-8 -*- import queue import threading import contextlib import time StopEvent = object() # 创建空对象 class ThreadPool(object): def __init__(self, max_num, max_task_num = None): if max_task_num: self.q = queue.Queue(max_task_num) else: self.q = queue.Queue() self.max_num = max_num self.cancel = False self.terminal = False self.generate_list = [] self.free_list = [] def run(self, func, args, callback=None): """ 线程池执行一个任务 :param func: 任务函数 :param args: 任务函数所需参数 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) :return: 如果线程池已经终止,则返回True否则None """ if self.cancel: return if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 创建一个线程 """ t = threading.Thread( t.start() def call(self): """ 循环去获取任务函数并执行任务函数 """ current_thread = threading.currentThread self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) success = True except Exception as e: success = False result = None if callback is not None: try: callback(success, result) except Exception as e: pass with self.worker_state(self.free_list, current_thread): if self.terminal: event = StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread) def close(self): """ 执行完所有的任务后,所有线程停止 """ self.cancel = True full_size = len(self.generate_list) while full_size: self.q.put(StopEvent) full_size -= 1 def terminate(self): """ 无论是否还有任务,终止线程 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.empty() @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用于记录线程中正在等待的线程数 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) # How to use pool = ThreadPool(5) def callback(status, result): # status, execute action status # result, execute action return value pass def action(i): print(i) for i in range(30): ret =, (i,), callback) time.sleep(5) print(len(pool.generate_list), len(pool.free_list)) print(len(pool.generate_list), len(pool.free_list)) # pool.close() # pool.terminate()
Copy after login


在python中multiprocess模块提供了Process类,实现进程相关的功能。但是,由于它是基于fork机制的,因此不被windows平台支持。想要在windows中运行,必须使用if __name__ == '__main__:的方式,显然这只能用于调试和学习,不能用于实际环境。


from multiprocessing import Process def foo(i): print("This is Process ", i) if __name__ == '__main__': for i in range(5): p = Process(target=foo, args=(i,)) p.start()
Copy after login

2.1 进程的数据共享


from multiprocessing import Process list_1 = [] def foo(i): list_1.append(i) print("This is Process ", i," and list_1 is ", list_1) if __name__ == '__main__': for i in range(5): p = Process(target=foo, args=(i,)) p.start() print("The end of list_1:", list_1)
Copy after login


2.1.1 使用Array共享数据

from multiprocessing import Process from multiprocessing import Array def Foo(i,temp): temp[0] += 100 for item in temp: print(i,'----->',item) if __name__ == '__main__': temp = Array('i', [11, 22, 33, 44]) for i in range(2): p = Process(target=Foo, args=(i,temp)) p.start()
Copy after login

对于Array数组类,括号内的“i”表示它内部的元素全部是int类型,而不是指字符i,列表内的元素可以预先指定,也可以指定列表长度。概括的来说就是Array类在实例化的时候就必须指定数组的数据类型和数组的大小,类似temp = Array('i', 5)。对于数据类型有下面的表格对应:

‘c’: ctypes.c_char, ‘u’: ctypes.c_wchar,
‘b’: ctypes.c_byte, ‘B’: ctypes.c_ubyte,
‘h’: ctypes.c_short, ‘H’: ctypes.c_ushort,
‘i’: ctypes.c_int, ‘I’: ctypes.c_uint,
‘l’: ctypes.c_long, ‘L’: ctypes.c_ulong,
‘f’: ctypes.c_float, ‘d’: ctypes.c_double

2.1.2 使用Manager共享数据

from multiprocessing import Process,Manager def Foo(i,dic): dic[i] = 100+i print(dic.values()) if __name__ == '__main__': manage = Manager() dic = manage.dict() for i in range(10): p = Process(target=Foo, args=(i,dic)) p.start() p.join()
Copy after login


2.1.3 使用queues的Queue类共享数据

import multiprocessing from multiprocessing import Process from multiprocessing import queues def foo(i,arg): arg.put(i) print('The Process is ', i, "and the queue's size is ", arg.qsize()) if __name__ == "__main__": li = queues.Queue(20, ctx=multiprocessing) for i in range(10): p = Process(target=foo, args=(i,li,)) p.start()
Copy after login


2.2 进程锁

为了防止和多线程一样的出现数据抢夺和脏数据的问题,同样需要设置进程锁。与threading类似,在multiprocessing里也有同名的锁类RLock, Lock, Event, Condition, Semaphore,连用法都是一样样的!(这个我喜欢)

from multiprocessing import Process from multiprocessing import queues from multiprocessing import Array from multiprocessing import RLock, Lock, Event, Condition, Semaphore import multiprocessing import time def foo(i,lis,lc): lc.acquire() lis[0] = lis[0] - 1 time.sleep(1) print('say hi',lis[0]) lc.release() if __name__ == "__main__": # li = [] li = Array('i', 1) li[0] = 10 lock = RLock() for i in range(10): p = Process(target=foo,args=(i,li,lock)) p.start()
Copy after login

2.3 进程池

既然有线程池,那必然也有进程池。但是,python给我们内置了一个进程池,不需要像线程池那样需要自定义,你只需要简单的from multiprocessing import Pool

#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Pool import time def f1(args): time.sleep(1) print(args) if __name__ == '__main__': p = Pool(5) for i in range(30): p.apply_async(func=f1, args= (i,)) p.close() # 等子进程执行完毕后关闭进程池 # time.sleep(2) # p.terminate() # 立刻关闭进程池 p.join()
Copy after login










3.1 greenlet

from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
Copy after login


3.2 gevent

from gevent import monkey; monkey.patch_all() import gevent import requests def f(url): print('GET: %s' % url) resp = requests.get(url) data = resp.text print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, ''), gevent.spawn(f, ''), gevent.spawn(f, ''), ])
Copy after login


The above is the detailed content of Detailed explanation of Python threads, processes and coroutines. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact
Latest Downloads
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap welfare online PHP training,Help PHP learners grow quickly!