Queue 모듈은 Queue 작업을 제공하는 모듈입니다. Queue는 스레드 간 데이터 교환에 가장 일반적으로 사용되는 형태입니다. 이 모듈은 세 가지 대기열을 제공합니다:
Queue.Queue(maxsize): 선입선출, maxsize는 대기열의 크기이며 값이 양수가 아닌 경우 무선 순환 대기열입니다.
Queue.LifoQueue(maxsize): 후입선출, 스택과 동일
Queue.PriorityQueue(maxsize): 우선순위 대기열.
그 중 LifoQueue와 PriorityQueue는 Queue의 하위 클래스입니다. 세 가지에는 다음과 같은 일반적인 메서드가 있습니다.
qsize(): 대략적인 대기열 크기를 반환합니다. 왜 "대략"이라는 단어를 추가합니까? 값이 0보다 크면 동시 실행 중에 get() 메서드가 차단되지 않는다는 보장이 없기 때문입니다. 마찬가지로 put() 메서드에도 유효합니다.
empty(): 부울 값을 반환합니다. 대기열이 비어 있으면 True를 반환하고, 그렇지 않으면 False를 반환합니다.
full(): 대기열 크기가 설정된 경우 대기열이 가득 차면 True를 반환하고, 그렇지 않으면 False를 반환합니다.
put(item[,block[,timeout]]): 요소 항목을 대기열에 추가합니다. block이 False로 설정된 경우 대기열이 가득 차면 Full 예외가 발생합니다. block이 True로 설정되고 timeout이 None으로 설정된 경우 대기열에 추가하기 전에 공간이 생길 때까지 기다립니다. 그렇지 않으면 timeout으로 설정된 시간 초과 값에 따라 전체 예외가 발생합니다.
put_nowwait(item): put(item,False)와 동일합니다. block이 False로 설정된 경우 대기열이 비어 있으면 비어 있음 예외가 발생합니다. block이 True로 설정되고 timeout이 None으로 설정된 경우 대기열에 추가하기 전에 공간이 생길 때까지 기다립니다. 그렇지 않으면 timeout으로 설정된 시간 초과 값에 따라 빈 예외가 발생합니다.
get([block[,timeout]]): 대기열에서 요소를 제거하고 요소의 값을 반환합니다. timeout이 양수이면 최대 timeout 초 동안 차단됩니다. 해당 시간에 사용 가능한 항목이 없으면 빈 예외가 발생합니다.
get_nowwait(): get(False)과 동일
task_done(): 대기열에 넣기 작업이 완료되었음을 나타내는 신호를 보냅니다. 이는 소비자 스레드에서 자주 사용됩니다.
join(): 대기열의 모든 요소가 처리될 때까지 차단한 다음 다른 작업을 처리합니다.
(1) 소스코드 분석
Queue 모듈은 사용법이 매우 간단하지만, 모듈의 관련 소스코드를 게시하고 분석하는 것이 필요하다고 생각합니다. 고수들이 작성한 코드가 얼마나 아름답고 구조적이며 모듈화되어 있는지 보고, 내가 작성한 코드에 대해 생각해 보면 눈물이 납니다. 길이를 줄이기 위해 소스코드의 주석 부분을 삭제했습니다.
from time import time as _time try: import threading as _threading except ImportError: import dummy_threading as _threading from collections import deque import heapq __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] class Empty(Exception): "Exception raised by Queue.get(block=0)/get_nowait()." pass class Full(Exception): "Exception raised by Queue.put(block=0)/put_nowait()." pass class Queue: def __init__(self, maxsize=0): self.maxsize = maxsize self._init(maxsize) self.mutex = _threading.Lock() self.not_empty = _threading.Condition(self.mutex) self.not_full = _threading.Condition(self.mutex) self.all_tasks_done = _threading.Condition(self.mutex) self.unfinished_tasks = def get_nowait(self): return self.get(False) def _init(self, maxsize): self.queue = deque() def _qsize(self, len=len): return len(self.queue) def _put(self, item): self.queue.append(item) def _get(self): return self.queue.popleft()
다음 함수 분석을 통해 Queue 개체가 컬렉션 모듈의 대기열을 기반으로 한다는 것을 알 수 있습니다(컬렉션 모듈에 대해서는 Python: 통계 계산 및 컬렉션 계산을 위해 카운터 사용 모듈을 참조하세요). ), 제외 잠금 및 조건 변수와 상호 작용하는 스레딩 모듈이 캡슐화됩니다.
deque는 양방향 큐로, 큐와 스택에 매우 적합합니다. 위의 Queue 개체는 선입 선출 대기열이므로 먼저 _init() 함수는 이중 종료 대기열을 정의한 다음 오른쪽에 있는 _put() 및 _get() 함수를 정의합니다. 왼쪽의 요소를 추가하고 삭제하는 것은 선입선출(FIFO) 대기열을 구성합니다. 마찬가지로 LifoQueue(후입선출 대기열)의 구현을 생각하기 쉽습니다. 대기열의 오른쪽이 추가되고 오른쪽이 삭제됩니다. 살펴보기 위해 소스 코드를 게시할 수 있습니다.
class LifoQueue(Queue): '''Variant of Queue that retrieves most recently added entries first.''' def _init(self, maxsize): self.queue = [] def _qsize(self, len=len): return len(self.queue) def _put(self, item): self.queue.append(item) def _get(self): return self.queue.pop()
"큐"는 queue()를 사용하지 않지만 목록과 동일합니다. 목록의 추가() 및 팝() 작업은 가장 오른쪽 요소에 요소를 추가하고 가장 오른쪽 요소를 삭제하기 때문입니다. 요소.
PriorityQueue를 살펴보겠습니다. 여기서는 heapq 모듈의 heappush() 및 heappop() 함수가 사용됩니다. heapq 모듈은 힙의 데이터 구조를 모듈화하고 이 데이터 구조를 구축할 수 있습니다. 동시에 heapq 모듈은 힙을 작동하는 해당 방법도 제공합니다. 그 중 _init() 함수의 self.queue=[]는 빈 힙을 생성하는 것으로 볼 수 있습니다. heappush()는 힙에 새 값을 삽입하고, heappop()은 힙에서 최소값을 팝하여 우선순위를 달성할 수 있습니다(heapq 모듈에 대한 간략한 소개는 다음과 같습니다). 소스코드는 다음과 같습니다.
class PriorityQueue(Queue): '''Variant of Queue that retrieves open entries in priority order (lowest first). Entries are typically tuples of the form: (priority number, data). ''' def _init(self, maxsize): self.queue = [] def _qsize(self, len=len): return len(self.queue) def _put(self, item, heappush=heapq.heappush): heappush(self.queue, item) def _get(self, heappop=heapq.heappop): return heappop(self.queue)
기본적인 데이터 구조를 분석한 후, 다른 부분도 분석해 보겠습니다.
mutex는 threading.Lock() 개체이고, mutex 잠금은 not_empty, not_full 및 all_tasks_done은 모두 threading.Condition() 개체 및 조건 변수이며 동일한 잠금 개체 mutex를 유지합니다. 스레딩 모듈의 Lock 개체 및 Condition 개체에 대한 자세한 내용은 이전 블로그 게시물 Python: 스레드, 프로세스 및 코루틴(2) - 스레딩 모듈)을 참조하세요.
그 중:
self.mutex 뮤텍스 잠금: 대기열의 상태(empty(), qsize() 등)를 얻거나 대기열의 내용을 수정하는 모든 작업 (get, put 등) 모두 뮤텍스 잠금을 보유해야 합니다. acquire()는 잠금을 획득하고 release()는 잠금을 해제합니다. 동시에 뮤텍스 잠금은 세 가지 조건 변수에 의해 공동으로 유지됩니다.
self.not_empty 조건 변수: 스레드가 큐에 데이터를 추가한 후 self.not_empty.notify()를 호출하여 다른 스레드에 알리고 요소를 제거하는 스레드를 깨웁니다.
self.not_full 조건 변수: 요소가 대기열에서 제거되면 요소를 추가하는 스레드가 활성화됩니다.
self.all_tasks_done 조건변수: 완료되지 않은 작업 개수가 0으로 삭제되면 모든 작업 완료를 알림
self.unfinished_tasks: 완료되지 않은 작업 개수 정의
주요 메소드를 살펴보겠습니다:
(1) put()
源代码如下:
def put(self, item, block=True, timeout=None): self.not_full.acquire() #not_full获得锁 try: if self.maxsize > 0: #如果队列长度有限制 if not block: #如果没阻塞 if self._qsize() == self.maxsize: #如果队列满了抛异常 raise Full elif timeout is None: #有阻塞且超时为空,等待 while self._qsize() == self.maxsize: self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: #如果有阻塞,且超时非负时,结束时间=当前时间+超时时间 endtime = _time() + timeout while self._qsize() == self.maxsize: remaining = endtime - _time() if remaining <= 0.0: #到时后,抛异常 raise Full #如果没到时,队列是满的就会一直被挂起,直到有“位置”腾出 self.not_full.wait(remaining) self._put(item) #调用_put方法,添加元素 self.unfinished_tasks += 1 #未完成任务+1 self.not_empty.notify() #通知非空,唤醒非空挂起的任务 finally: self.not_full.release() #not_full释放锁
默认情况下block为True,timeout为None。如果队列满则会等待,未满则会调用_put方法将进程加入deque中(后面介绍),并且未完成任务加1还会通知队列非空。
如果设置block参数为Flase,队列满时则会抛异常。如果设置了超时那么在时间到之前进行阻塞,时间一到抛异常。这个方法使用not_full对象进行操作。
(2)get()
源码如下:
def get(self, block=True, timeout=None): self.not_empty.acquire() #not_empty获得锁 try: if not block: #不阻塞时 if not self._qsize(): #队列为空时抛异常 raise Empty elif timeout is None: #不限时时,队列为空则会等待 while not self._qsize(): self.not_empty.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = _time() + timeout while not self._qsize(): remaining = endtime - _time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) item = self._get() #调用_get方法,移除并获得项目 self.not_full.notify() #通知非满 return item #返回项目 finally: self.not_empty.release() #释放锁
逻辑跟put()函数一样,参数默认情况下队列空了则会等待,否则将会调用_get方法(往下看)移除并获得一个项,最后返回这个项。这个方法使用not_empty对象进行操作。
不过我觉得put()与get()两个函数结合起来理解比较好。not_full与not_empty代表的是两种不同操作类型的线程,not_full可以理解成is-not-full,即队列是否满了,默认是没有满,没有满时not_full这个条件变量才能获取锁,并做一些条件判断,只有符合条件才能向队列里加元素,添加成功后就会通知not_empty条件变量队列里不是空的,“我”刚刚添加进了一个元素,满足可以执行删除动作的基本条件了(队列不是空的,想想如果是空的执行删除动作就没有意义了),同时唤醒一些被挂起的执行移除动作的线程,让这些线程重新判断条件,如果条件准许就会执行删除动作,然后又通知not_full条件变量,告诉“它”队列不是满的,因为“我”刚才删除了一个元素(想想如果队列满了添加元素就添加不进呀,就没意义了),满足了添加元素的基本条件(队列不是满的),同时唤醒一些被挂起的执行添加动作的线程,这些线程又会进行条件判断,符合条件就会添加元素,否则继续挂起,依次类推,同时这样也保证了线程的安全。正与前面所说,当一个元素被移除出队列时,会唤醒一个添加元素的线程;当添加一个元素时会唤醒一个删除元素的线程。
(3)task_done()
源码如下:
def task_done(self): self.all_tasks_done.acquire() #获得锁 try: unfinished = self.unfinished_tasks - 1 #判断队列中一个线程的任务是否全部完成 if unfinished <= 0: #是则进行通知,或在过量调用时报异常 if unfinished < 0: raise ValueError('task_done() called too many times') self.all_tasks_done.notify_all() self.unfinished_tasks = unfinished #否则未完成任务数量-1 finally: self.all_tasks_done.release() #最后释放锁
这个方法判断队列中一个线程的任务是否全部完成,首先会通过all_tasks_done对象获得锁,如果是则进行通知,最后释放锁。
(4)join()
源码如下:
def join(self): self.all_tasks_done.acquire() try: while self.unfinished_tasks: #如果有未完成的任务,将调用wait()方法等待 self.all_tasks_done.wait() finally: self.all_tasks_done.release()
阻塞方法,当队列中有未完成进程时,调用join方法来阻塞,直到他们都完成。
其它的方法都比较简单,也比较好理解,有兴趣可以去看看Queue.py里的源码,要注意的是任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有互斥锁mutex。
(二)简单例子
实现一个线程不断生成一个随机数到一个队列中
实现一个线程从上面的队列里面不断的取出奇数
实现另外一个线程从上面的队列里面不断取出偶数
import random,threading,time from Queue import Queue is_product = True class Producer(threading.Thread): """生产数据""" def __init__(self, t_name, queue): threading.Thread.__init__(self,name=t_name) self.data=queue def run(self): while 1: if self.data.full(): global is_product is_product = False else: if self.data.qsize() <= 7:#队列长度小于等于7时添加元素 is_product = True for i in range(2): #每次向队列里添加两个元素 randomnum=random.randint(1,99) print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum) self.data.put(randomnum,False) #将数据依次存入队列 time.sleep(1) print "deque length is %s"%self.data.qsize() else: if is_product: for i in range(2): # randomnum = random.randint(1, 99) print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum) self.data.put(randomnum,False) # 将数据依次存入队列 time.sleep(1) print "deque length is %s" % self.data.qsize() else: pass print "%s: %s finished!" %(time.ctime(), self.getName()) #Consumer thread class Consumer_even(threading.Thread): def __init__(self,t_name,queue): threading.Thread.__init__(self,name=t_name) self.data=queue def run(self): while 1: if self.data.qsize() > 7:#队列长度大于7时开始取元素 val_even = self.data.get(False) if val_even%2==0: print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even) time.sleep(2) else: self.data.put(val_even) time.sleep(2) print "deque length is %s" % self.data.qsize() else: pass class Consumer_odd(threading.Thread): def __init__(self,t_name,queue): threading.Thread.__init__(self, name=t_name) self.data=queue def run(self): while 1: if self.data.qsize() > 7: val_odd = self.data.get(False) if val_odd%2!=0: print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd) time.sleep(2) else: self.data.put(val_odd) time.sleep(2) print "deque length is %s" % self.data.qsize() else: pass #Main thread def main(): queue = Queue(20) producer = Producer('Pro.', queue) consumer_even = Consumer_even('Con_even.', queue) consumer_odd = Consumer_odd('Con_odd.',queue) producer.start() consumer_even.start() consumer_odd.start() producer.join() consumer_even.join() consumer_odd.join() if __name__ == '__main__': main()