Das Warteschlangenmodul ist ein Modul, das Warteschlangenoperationen bereitstellt. Warteschlangen sind die am häufigsten verwendete Form des Datenaustauschs zwischen Threads. Dieses Modul stellt drei Warteschlangen bereit:
Queue.Queue(maxsize): First in, first out, maxsize ist die Größe der Warteschlange, und wenn ihr Wert eine nicht positive Zahl ist, handelt es sich um eine drahtlose Ringwarteschlange
Warteschlange. LifoQueue(maxsize): Zuletzt rein, zuerst raus, entspricht Stapel
Queue.PriorityQueue(maxsize): Prioritätswarteschlange.
Unter diesen sind LifoQueue und PriorityQueue Unterklassen von Queue. Die drei haben die folgenden gemeinsamen Methoden:
qsize(): Gibt die ungefähre Warteschlangengröße zurück. Warum das Wort „ungefähr“ hinzufügen? Denn wenn der Wert größer als 0 ist, kann nicht garantiert werden, dass die Methode get() während der gleichzeitigen Ausführung nicht blockiert wird. Dies gilt auch für die Methode put().
empty(): Gibt einen booleschen Wert zurück. Wenn die Warteschlange leer ist, wird True zurückgegeben, andernfalls wird False zurückgegeben.
full(): Wenn die Warteschlangengröße festgelegt ist und die Warteschlange voll ist, wird True zurückgegeben, andernfalls wird False zurückgegeben.
put(item[,block[,timeout]]): Füge ein Element zur Warteschlange hinzu. Wenn block auf False gesetzt ist, wird eine Full-Ausnahme ausgelöst, wenn die Warteschlange voll ist. Wenn „Block“ auf „True“ und „Timeout“ auf „None“ gesetzt ist, wird gewartet, bis Platz vorhanden ist, bevor der Block zur Warteschlange hinzugefügt wird. Andernfalls wird eine vollständige Ausnahme basierend auf dem durch „Timeout“ festgelegten Timeout-Wert ausgelöst.
put_nowwait(item): Entspricht put(item,False). Wenn der Block auf „False“ gesetzt ist und die Warteschlange leer ist, wird eine Empty-Ausnahme ausgelöst. Wenn der Block auf „True“ und das Timeout auf „None“ gesetzt ist, wird gewartet, bis Platz vorhanden ist, bevor er zur Warteschlange hinzugefügt wird. Andernfalls wird eine leere Ausnahme basierend auf dem durch „timeout“ festgelegten Timeout-Wert ausgelöst.
get([block[,timeout]]): Entfernt ein Element aus der Warteschlange und gibt den Wert des Elements zurück. Wenn timeout eine positive Zahl ist, wird es für bis zu timeout Sekunden blockiert, und wenn innerhalb Wenn zu diesem Zeitpunkt keine Elemente verfügbar sind, wird eine Empty-Ausnahme ausgelöst.
get_nowwait(): Äquivalent zu get(False)
task_done(): Sendet ein Signal, um anzuzeigen, dass die in die Warteschlange gestellte Aufgabe abgeschlossen wurde, was häufig in Verbraucherthreads verwendet wird.
join(): Blockiert, bis alle Elemente der Warteschlange verarbeitet sind, und verarbeitet dann andere Vorgänge.
(1) Quellcode-Analyse
Das Queue-Modul ist sehr einfach zu verwenden, aber ich denke, es ist notwendig, den relevanten Quellcode des Moduls zu veröffentlichen und zu analysieren Schauen Sie sich an, wie schön, strukturiert und modular der von den Meistern geschriebene Code ist, und denken Sie dann an den Code, den ich geschrieben habe, der mir Tränen in die Augen treibt. Um die Länge zu reduzieren, wurde der Kommentarteil des Quellcodes gelöscht.
from time import time as _time try: import threading as _threading except ImportError: import dummy_threading as _threading from collections import deque import heapq __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] class Empty(Exception): "Exception raised by Queue.get(block=0)/get_nowait()." pass class Full(Exception): "Exception raised by Queue.put(block=0)/put_nowait()." pass class Queue: def __init__(self, maxsize=0): self.maxsize = maxsize self._init(maxsize) self.mutex = _threading.Lock() self.not_empty = _threading.Condition(self.mutex) self.not_full = _threading.Condition(self.mutex) self.all_tasks_done = _threading.Condition(self.mutex) self.unfinished_tasks = def get_nowait(self): return self.get(False) def _init(self, maxsize): self.queue = deque() def _qsize(self, len=len): return len(self.queue) def _put(self, item): self.queue.append(item) def _get(self): return self.queue.popleft()
Durch die Analyse der folgenden Funktionen wissen wir, dass das Queue-Objekt auf der Warteschlange des Sammlungsmoduls basiert (Informationen zum Sammlungsmodul finden Sie unter Python: Counter zum Zählen von Statistiken und zum Sammlungsmodul verwenden). ), plus das Threading-Modul zur Interaktion. Ausschlusssperren und Bedingungsvariablen sind gekapselt.
deque ist eine doppelendige Warteschlange, die sich sehr gut für Warteschlangen und Stapel eignet. Das obige Queue-Objekt ist eine First-in-First-out-Warteschlange, daher definiert die Funktion _init() zunächst eine doppelendige Warteschlange und dann die Funktionen _put() und _get(), die sich auf der rechten Seite befinden Das Hinzufügen von Elementen und das Löschen von Elementen auf der linken Seite stellen eine First-In-First-Out-Warteschlange dar. Ebenso ist es einfach, sich die Implementierung von LifoQueue (Last-In-First-Out-Warteschlange) vorzustellen dass die rechte Seite der Warteschlange hinzugefügt und die rechte Seite gelöscht wird. Sie können den Quellcode für einen Blick posten.
class LifoQueue(Queue): '''Variant of Queue that retrieves most recently added entries first.''' def _init(self, maxsize): self.queue = [] def _qsize(self, len=len): return len(self.queue) def _put(self, item): self.queue.append(item) def _get(self): return self.queue.pop()
Obwohl seine „Warteschlange“ nicht queue() verwendet, ist es dasselbe mit einer Liste, da die Operationen list append() und pop() Elemente zum Element ganz rechts hinzufügen und das Element ganz rechts löschen Element.
Werfen wir einen Blick auf PriorityQueue? Es handelt sich um eine Prioritätswarteschlange. Die Funktionen heappush() und heappop() des heapq-Moduls werden hier verwendet. Das Heapq-Modul modularisiert die Datenstruktur des Heaps und kann diese Datenstruktur aufbauen. Gleichzeitig stellt das Heapq-Modul auch entsprechende Methoden zum Betreiben des Heaps bereit. Unter diesen kann self.queue=[] in der Funktion _init() als Erstellung eines leeren Heaps angesehen werden. heappush() fügt einen neuen Wert in den Heap ein und heappop() entnimmt den Mindestwert aus dem Heap, wodurch die Priorität aktiviert wird (hier finden Sie eine kurze Einführung in das Heapq-Modul). Der Quellcode lautet wie folgt:
class PriorityQueue(Queue): '''Variant of Queue that retrieves open entries in priority order (lowest first). Entries are typically tuples of the form: (priority number, data). ''' def _init(self, maxsize): self.queue = [] def _qsize(self, len=len): return len(self.queue) def _put(self, item, heappush=heapq.heappush): heappush(self.queue, item) def _get(self, heappop=heapq.heappop): return heappop(self.queue)
Nachdem wir die grundlegende Datenstruktur analysiert haben, analysieren wir andere Teile.
mutex ist ein threading.Lock()-Objekt, eine Mutex-Sperre; not_empty, not_full und all_tasks_done sind alle threading.Condition()-Objekte und Bedingungsvariablen, und sie behalten das gleiche Sperrobjekt mutex( Weitere Informationen zu Informationen zum Lock-Objekt und Condition-Objekt im Threading-Modul finden Sie im vorherigen Blog-Beitrag „Python: Threads, Prozesse und Coroutinen (2) – Threading-Modul“).
Unter ihnen:
self.mutex Mutex-Sperre: jede Operation, um den Status der Warteschlange abzurufen (empty(), qsize() usw.) oder den Inhalt der Warteschlange zu ändern (Get, Put usw.) Alle müssen die Mutex-Sperre halten. acquire() erwirbt die Sperre und release() gibt die Sperre frei. Gleichzeitig wird die Mutex-Sperre gemeinsam durch drei Bedingungsvariablen aufrechterhalten.
Bedingungsvariable self.not_empty: Nachdem der Thread Daten zur Warteschlange hinzugefügt hat, ruft er self.not_empty.notify() auf, um andere Threads zu benachrichtigen, und aktiviert dann einen Thread, der Elemente entfernt.
self.not_full Bedingungsvariable: Wenn ein Element aus der Warteschlange entfernt wird, wird ein Thread aktiviert, der Elemente hinzufügt.
self.all_tasks_done-Bedingungsvariable: Wenn die Anzahl der unvollendeten Aufgaben auf 0 gelöscht wird, benachrichtigen Sie alle Aufgaben, die abgeschlossen werden sollen
self.unfinished_tasks: Definieren Sie die Anzahl der unvollendeten Aufgaben
Werfen wir einen Blick auf die wichtigsten Methoden:
(1) put()
源代码如下:
def put(self, item, block=True, timeout=None): self.not_full.acquire() #not_full获得锁 try: if self.maxsize > 0: #如果队列长度有限制 if not block: #如果没阻塞 if self._qsize() == self.maxsize: #如果队列满了抛异常 raise Full elif timeout is None: #有阻塞且超时为空,等待 while self._qsize() == self.maxsize: self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: #如果有阻塞,且超时非负时,结束时间=当前时间+超时时间 endtime = _time() + timeout while self._qsize() == self.maxsize: remaining = endtime - _time() if remaining <= 0.0: #到时后,抛异常 raise Full #如果没到时,队列是满的就会一直被挂起,直到有“位置”腾出 self.not_full.wait(remaining) self._put(item) #调用_put方法,添加元素 self.unfinished_tasks += 1 #未完成任务+1 self.not_empty.notify() #通知非空,唤醒非空挂起的任务 finally: self.not_full.release() #not_full释放锁
默认情况下block为True,timeout为None。如果队列满则会等待,未满则会调用_put方法将进程加入deque中(后面介绍),并且未完成任务加1还会通知队列非空。
如果设置block参数为Flase,队列满时则会抛异常。如果设置了超时那么在时间到之前进行阻塞,时间一到抛异常。这个方法使用not_full对象进行操作。
(2)get()
源码如下:
def get(self, block=True, timeout=None): self.not_empty.acquire() #not_empty获得锁 try: if not block: #不阻塞时 if not self._qsize(): #队列为空时抛异常 raise Empty elif timeout is None: #不限时时,队列为空则会等待 while not self._qsize(): self.not_empty.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = _time() + timeout while not self._qsize(): remaining = endtime - _time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) item = self._get() #调用_get方法,移除并获得项目 self.not_full.notify() #通知非满 return item #返回项目 finally: self.not_empty.release() #释放锁
逻辑跟put()函数一样,参数默认情况下队列空了则会等待,否则将会调用_get方法(往下看)移除并获得一个项,最后返回这个项。这个方法使用not_empty对象进行操作。
不过我觉得put()与get()两个函数结合起来理解比较好。not_full与not_empty代表的是两种不同操作类型的线程,not_full可以理解成is-not-full,即队列是否满了,默认是没有满,没有满时not_full这个条件变量才能获取锁,并做一些条件判断,只有符合条件才能向队列里加元素,添加成功后就会通知not_empty条件变量队列里不是空的,“我”刚刚添加进了一个元素,满足可以执行删除动作的基本条件了(队列不是空的,想想如果是空的执行删除动作就没有意义了),同时唤醒一些被挂起的执行移除动作的线程,让这些线程重新判断条件,如果条件准许就会执行删除动作,然后又通知not_full条件变量,告诉“它”队列不是满的,因为“我”刚才删除了一个元素(想想如果队列满了添加元素就添加不进呀,就没意义了),满足了添加元素的基本条件(队列不是满的),同时唤醒一些被挂起的执行添加动作的线程,这些线程又会进行条件判断,符合条件就会添加元素,否则继续挂起,依次类推,同时这样也保证了线程的安全。正与前面所说,当一个元素被移除出队列时,会唤醒一个添加元素的线程;当添加一个元素时会唤醒一个删除元素的线程。
(3)task_done()
源码如下:
def task_done(self): self.all_tasks_done.acquire() #获得锁 try: unfinished = self.unfinished_tasks - 1 #判断队列中一个线程的任务是否全部完成 if unfinished <= 0: #是则进行通知,或在过量调用时报异常 if unfinished < 0: raise ValueError('task_done() called too many times') self.all_tasks_done.notify_all() self.unfinished_tasks = unfinished #否则未完成任务数量-1 finally: self.all_tasks_done.release() #最后释放锁
这个方法判断队列中一个线程的任务是否全部完成,首先会通过all_tasks_done对象获得锁,如果是则进行通知,最后释放锁。
(4)join()
源码如下:
def join(self): self.all_tasks_done.acquire() try: while self.unfinished_tasks: #如果有未完成的任务,将调用wait()方法等待 self.all_tasks_done.wait() finally: self.all_tasks_done.release()
阻塞方法,当队列中有未完成进程时,调用join方法来阻塞,直到他们都完成。
其它的方法都比较简单,也比较好理解,有兴趣可以去看看Queue.py里的源码,要注意的是任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有互斥锁mutex。
(二)简单例子
实现一个线程不断生成一个随机数到一个队列中
实现一个线程从上面的队列里面不断的取出奇数
实现另外一个线程从上面的队列里面不断取出偶数
import random,threading,time from Queue import Queue is_product = True class Producer(threading.Thread): """生产数据""" def __init__(self, t_name, queue): threading.Thread.__init__(self,name=t_name) self.data=queue def run(self): while 1: if self.data.full(): global is_product is_product = False else: if self.data.qsize() <= 7:#队列长度小于等于7时添加元素 is_product = True for i in range(2): #每次向队列里添加两个元素 randomnum=random.randint(1,99) print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum) self.data.put(randomnum,False) #将数据依次存入队列 time.sleep(1) print "deque length is %s"%self.data.qsize() else: if is_product: for i in range(2): # randomnum = random.randint(1, 99) print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum) self.data.put(randomnum,False) # 将数据依次存入队列 time.sleep(1) print "deque length is %s" % self.data.qsize() else: pass print "%s: %s finished!" %(time.ctime(), self.getName()) #Consumer thread class Consumer_even(threading.Thread): def __init__(self,t_name,queue): threading.Thread.__init__(self,name=t_name) self.data=queue def run(self): while 1: if self.data.qsize() > 7:#队列长度大于7时开始取元素 val_even = self.data.get(False) if val_even%2==0: print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even) time.sleep(2) else: self.data.put(val_even) time.sleep(2) print "deque length is %s" % self.data.qsize() else: pass class Consumer_odd(threading.Thread): def __init__(self,t_name,queue): threading.Thread.__init__(self, name=t_name) self.data=queue def run(self): while 1: if self.data.qsize() > 7: val_odd = self.data.get(False) if val_odd%2!=0: print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd) time.sleep(2) else: self.data.put(val_odd) time.sleep(2) print "deque length is %s" % self.data.qsize() else: pass #Main thread def main(): queue = Queue(20) producer = Producer('Pro.', queue) consumer_even = Consumer_even('Con_even.', queue) consumer_odd = Consumer_odd('Con_odd.',queue) producer.start() consumer_even.start() consumer_odd.start() producer.join() consumer_even.join() consumer_odd.join() if __name__ == '__main__': main()