キュー モジュールは、キュー操作を提供するモジュールです。キューは、スレッド間でデータを交換する最も一般的に使用される形式です。このモジュールは 3 つのキューを提供します:
Queue.Queue(maxsize): 先入れ先出し、maxsize はキューのサイズであり、その値が正でない数値の場合、それはワイヤレス循環キュー
Queue です。 LifoQueue(maxsize): 後入れ先出し、スタックと同等
Queue.PriorityQueue(maxsize): 優先キュー。
その中で、LifoQueue と PriorityQueue は Queue のサブクラスです。 3 つは次の共通メソッドを持っています:
qsize(): おおよそのキュー サイズを返します。なぜ「約」という言葉を追加するのでしょうか?値が 0 より大きい場合、get() メソッドが同時実行中にブロックされないことが保証されないため、同様に put() メソッドに対しても有効です。
empty(): キューが空の場合は True を返し、そうでない場合は False を返します。
full(): キュー サイズが設定されている場合、キューがいっぱいの場合は True を返し、それ以外の場合は False を返します。
put(item[,block[,timeout]]): 要素 item をキューに追加します。 block が False に設定されている場合、キューがいっぱいの場合は Full 例外がスローされます。 block が True に設定され、timeout が None に設定されている場合、スペースが空くまで待機してからキューに追加されます。それ以外の場合は、timeout で設定されたタイムアウト値に基づいて完全例外がスローされます。
put_nowwait(item): put(item,False) と同等。 block が False に設定されている場合、キューが空の場合は、Empty 例外がスローされます。 block が True に設定され、timeout が None に設定されている場合、スペースができるまで待機してからキューに追加します。それ以外の場合は、timeout で設定されたタイムアウト値に基づいて空の例外がスローされます。
get([block[,timeout]]): キューから要素を削除し、その要素の値を返します。timeout が正の数の場合、最大 timeout 秒間ブロックされ、その範囲内に使用可能な項目がない場合は、ブロックされます。 time になると、空の例外がスローされます。
get_nowwait(): get(False) と同等
task_done(): キューに登録されているタスクが完了したことを示すシグナルを送信します。これはコンシューマー スレッドでよく使用されます。
join(): キューのすべての要素が処理されるまでブロックし、その後、他の操作を処理します。
(1) ソースコード分析
Queue モジュールの使い方は非常に簡単ですが、モジュールの関連するソースコードを投稿して分析する必要があると思います。 by the masters は美しく、なんと構造化され、モジュール化されているので、自分が書いたコードのことを考えると泣けてきます。ぜひ学びに来てください。長さを減らすため、ソースコードのコメント部分を削除しました。
from time import time as _time try: import threading as _threading except ImportError: import dummy_threading as _threading from collections import deque import heapq __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] class Empty(Exception): "Exception raised by Queue.get(block=0)/get_nowait()." pass class Full(Exception): "Exception raised by Queue.put(block=0)/put_nowait()." pass class Queue: def __init__(self, maxsize=0): self.maxsize = maxsize self._init(maxsize) self.mutex = _threading.Lock() self.not_empty = _threading.Condition(self.mutex) self.not_full = _threading.Condition(self.mutex) self.all_tasks_done = _threading.Condition(self.mutex) self.unfinished_tasks = def get_nowait(self): return self.get(False) def _init(self, maxsize): self.queue = deque() def _qsize(self, len=len): return len(self.queue) def _put(self, item): self.queue.append(item) def _get(self): return self.queue.popleft()
以下の関数の分析を通じて、Queue オブジェクトがコレクション モジュール (コレクション モジュールについては、Python: 統計とコレクション モジュールをカウントするためのカウンターの使用を参照) のキューに加えて、スレッド モジュールに基づいていることがわかります。ミューテックスロックと条件変数のカプセル化。
deque は両端キューであり、キューやスタックに非常に適しています。上記の Queue オブジェクトは先入れ先出しキューであるため、最初に _init() 関数が両端キューを定義し、次に、右側の _put() 関数と _get() 関数を定義します。同様に、左側の要素の追加と削除は、LifoQueue (後入れ先出しキュー) の実装を考えるのが簡単です。キューの右側が追加され、右側が削除されていることを確認してください。ソースコードを投稿してご覧ください。
class LifoQueue(Queue): '''Variant of Queue that retrieves most recently added entries first.''' def _init(self, maxsize): self.queue = [] def _qsize(self, len=len): return len(self.queue) def _put(self, item): self.queue.append(item) def _get(self): return self.queue.pop()
その「キュー」は queue() を使用しませんが、リストの append() および Pop() 操作は右端の要素に要素を追加し、右端の要素を削除するため、リストでも同じです。
PriorityQueue を見てみましょう。ここでは heapq モジュールの heappush() 関数と heappop() 関数が使用されます。 heapq モジュールはヒープのデータ構造をモジュール化し、このデータ構造を構築できると同時に、ヒープを操作するための対応するメソッドも提供します。このうち、_init()関数のself.queue=[]は空のヒープを作成しているとみなすことができます。 heappush() は新しい値をヒープに挿入し、 heappop() はヒープから最小値をポップします。これにより優先度を達成できます (ここでは heapq モジュールの簡単な紹介を示します)。ソースコードは次のとおりです。
class PriorityQueue(Queue): '''Variant of Queue that retrieves open entries in priority order (lowest first). Entries are typically tuples of the form: (priority number, data). ''' def _init(self, maxsize): self.queue = [] def _qsize(self, len=len): return len(self.queue) def _put(self, item, heappush=heapq.heappush): heappush(self.queue, item) def _get(self, heappop=heapq.heappop): return heappop(self.queue)
基本的なデータ構造が分析され、その後、他の部分が分析されます。
mutex は threading.Lock() オブジェクト、ミューテックス ロックです。not_empty、not_full、および all_tasks_done はすべて threading.Condition() オブジェクトと条件変数であり、同じロック オブジェクトのミューテックスを維持します (ロック オブジェクトのスレッド モジュールについて)および Condition オブジェクトについては、以前のブログ投稿「Python: スレッド、プロセス、コルーチン (2) - スレッド モジュール」を参照してください。
その中には:
self.mutex mutex lock: キューのステータスを取得する操作 (empty()、qsize() など)、またはキューの内容を変更する操作 (get、put など) は、このミューテックス ロックを保持します。 acquire() はロックを取得し、release() はロックを解放します。同時に、ミューテックス ロックは 3 つの条件変数によって共同で維持されます。
self.not_empty 条件変数: スレッドはデータをキューに追加した後、self.not_empty.notify() を呼び出して他のスレッドに通知し、要素を削除するスレッドを起動します。
self.not_full 条件変数: 要素がキューから削除されると、要素を追加するスレッドが起動されます。
self.all_tasks_done 条件変数: 未完了のタスクの数が削除されて 0 になったら、すべてのタスクに完了を通知します
self.unfinished_tasks : 未完了のタスクの数を定義します
main メソッドをもう一度見てみましょう:
(1 )put()
源代码如下:
def put(self, item, block=True, timeout=None): self.not_full.acquire() #not_full获得锁 try: if self.maxsize > 0: #如果队列长度有限制 if not block: #如果没阻塞 if self._qsize() == self.maxsize: #如果队列满了抛异常 raise Full elif timeout is None: #有阻塞且超时为空,等待 while self._qsize() == self.maxsize: self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: #如果有阻塞,且超时非负时,结束时间=当前时间+超时时间 endtime = _time() + timeout while self._qsize() == self.maxsize: remaining = endtime - _time() if remaining <= 0.0: #到时后,抛异常 raise Full #如果没到时,队列是满的就会一直被挂起,直到有“位置”腾出 self.not_full.wait(remaining) self._put(item) #调用_put方法,添加元素 self.unfinished_tasks += 1 #未完成任务+1 self.not_empty.notify() #通知非空,唤醒非空挂起的任务 finally: self.not_full.release() #not_full释放锁
默认情况下block为True,timeout为None。如果队列满则会等待,未满则会调用_put方法将进程加入deque中(后面介绍),并且未完成任务加1还会通知队列非空。
如果设置block参数为Flase,队列满时则会抛异常。如果设置了超时那么在时间到之前进行阻塞,时间一到抛异常。这个方法使用not_full对象进行操作。
(2)get()
源码如下:
def get(self, block=True, timeout=None): self.not_empty.acquire() #not_empty获得锁 try: if not block: #不阻塞时 if not self._qsize(): #队列为空时抛异常 raise Empty elif timeout is None: #不限时时,队列为空则会等待 while not self._qsize(): self.not_empty.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = _time() + timeout while not self._qsize(): remaining = endtime - _time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) item = self._get() #调用_get方法,移除并获得项目 self.not_full.notify() #通知非满 return item #返回项目 finally: self.not_empty.release() #释放锁
逻辑跟put()函数一样,参数默认情况下队列空了则会等待,否则将会调用_get方法(往下看)移除并获得一个项,最后返回这个项。这个方法使用not_empty对象进行操作。
不过我觉得put()与get()两个函数结合起来理解比较好。not_full与not_empty代表的是两种不同操作类型的线程,not_full可以理解成is-not-full,即队列是否满了,默认是没有满,没有满时not_full这个条件变量才能获取锁,并做一些条件判断,只有符合条件才能向队列里加元素,添加成功后就会通知not_empty条件变量队列里不是空的,“我”刚刚添加进了一个元素,满足可以执行删除动作的基本条件了(队列不是空的,想想如果是空的执行删除动作就没有意义了),同时唤醒一些被挂起的执行移除动作的线程,让这些线程重新判断条件,如果条件准许就会执行删除动作,然后又通知not_full条件变量,告诉“它”队列不是满的,因为“我”刚才删除了一个元素(想想如果队列满了添加元素就添加不进呀,就没意义了),满足了添加元素的基本条件(队列不是满的),同时唤醒一些被挂起的执行添加动作的线程,这些线程又会进行条件判断,符合条件就会添加元素,否则继续挂起,依次类推,同时这样也保证了线程的安全。正与前面所说,当一个元素被移除出队列时,会唤醒一个添加元素的线程;当添加一个元素时会唤醒一个删除元素的线程。
(3)task_done()
源码如下:
def task_done(self): self.all_tasks_done.acquire() #获得锁 try: unfinished = self.unfinished_tasks - 1 #判断队列中一个线程的任务是否全部完成 if unfinished <= 0: #是则进行通知,或在过量调用时报异常 if unfinished < 0: raise ValueError('task_done() called too many times') self.all_tasks_done.notify_all() self.unfinished_tasks = unfinished #否则未完成任务数量-1 finally: self.all_tasks_done.release() #最后释放锁
这个方法判断队列中一个线程的任务是否全部完成,首先会通过all_tasks_done对象获得锁,如果是则进行通知,最后释放锁。
(4)join()
源码如下:
def join(self): self.all_tasks_done.acquire() try: while self.unfinished_tasks: #如果有未完成的任务,将调用wait()方法等待 self.all_tasks_done.wait() finally: self.all_tasks_done.release()
阻塞方法,当队列中有未完成进程时,调用join方法来阻塞,直到他们都完成。
其它的方法都比较简单,也比较好理解,有兴趣可以去看看Queue.py里的源码,要注意的是任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有互斥锁mutex。
(二)简单例子
实现一个线程不断生成一个随机数到一个队列中
实现一个线程从上面的队列里面不断的取出奇数
实现另外一个线程从上面的队列里面不断取出偶数
import random,threading,time from Queue import Queue is_product = True class Producer(threading.Thread): """生产数据""" def __init__(self, t_name, queue): threading.Thread.__init__(self,name=t_name) self.data=queue def run(self): while 1: if self.data.full(): global is_product is_product = False else: if self.data.qsize() <= 7:#队列长度小于等于7时添加元素 is_product = True for i in range(2): #每次向队列里添加两个元素 randomnum=random.randint(1,99) print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum) self.data.put(randomnum,False) #将数据依次存入队列 time.sleep(1) print "deque length is %s"%self.data.qsize() else: if is_product: for i in range(2): # randomnum = random.randint(1, 99) print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum) self.data.put(randomnum,False) # 将数据依次存入队列 time.sleep(1) print "deque length is %s" % self.data.qsize() else: pass print "%s: %s finished!" %(time.ctime(), self.getName()) #Consumer thread class Consumer_even(threading.Thread): def __init__(self,t_name,queue): threading.Thread.__init__(self,name=t_name) self.data=queue def run(self): while 1: if self.data.qsize() > 7:#队列长度大于7时开始取元素 val_even = self.data.get(False) if val_even%2==0: print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even) time.sleep(2) else: self.data.put(val_even) time.sleep(2) print "deque length is %s" % self.data.qsize() else: pass class Consumer_odd(threading.Thread): def __init__(self,t_name,queue): threading.Thread.__init__(self, name=t_name) self.data=queue def run(self): while 1: if self.data.qsize() > 7: val_odd = self.data.get(False) if val_odd%2!=0: print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd) time.sleep(2) else: self.data.put(val_odd) time.sleep(2) print "deque length is %s" % self.data.qsize() else: pass #Main thread def main(): queue = Queue(20) producer = Producer('Pro.', queue) consumer_even = Consumer_even('Con_even.', queue) consumer_odd = Consumer_odd('Con_odd.',queue) producer.start() consumer_even.start() consumer_odd.start() producer.join() consumer_even.join() consumer_odd.join() if __name__ == '__main__': main()