python asyncio隊列示例
asyncio.Queue 是用於異步任務間安全通信的隊列工具,1. 生產者通過await queue.put(item) 添加數據,消費者用await queue.get() 獲取數據;2. 每處理完一項需調用queue.task_done(),以便queue.join() 等待所有任務完成;3. 使用None 作為結束信號通知消費者停止;4. 多個消費者時,需發送多個結束信號或在取消任務前確保所有任務已處理完畢;5. 隊列支持設置maxsize 限制容量,put 和get 操作自動掛起不阻塞事件循環,程序最終通過cancel() 終止仍在等待的消費者任務,完整實現高效異步生產者-消費者模式。
下面是一個實用的Python asyncio.Queue
示例,展示如何在異步任務中使用隊列進行生產者-消費者模式的通信。

什麼是asyncio.Queue
?
asyncio.Queue
是一個線程安全、異步安全的隊列,用於在多個協程之間安全地傳遞數據。常用於生產者生成數據、消費者處理數據的場景。
基本示例:生產者和消費者
import asyncio import random async def producer(queue, n): for i in range(n): item = f"item-{i}" await queue.put(item) print(f"Produced: {item}") # 模擬異步耗時(如網絡請求、IO) await asyncio.sleep(random.uniform(0.1, 0.5)) # 發送結束信號await queue.put(None) print("Producer done.") async def consumer(queue): while True: item = await queue.get() if item is None: # 收到結束信號,把None 放回去,以便其他消費者也能收到queue.task_done() break print(f"Consumed: {item}") await asyncio.sleep(random.uniform(0.2, 0.7)) # 模擬處理時間queue.task_done() # 標記任務完成print("Consumer done.") async def main(): queue = asyncio.Queue() num_items = 5 # 同時運行生產者和消費者producer_task = asyncio.create_task(producer(queue, num_items)) consumer_task = asyncio.create_task(consumer(queue)) # 等待消費者處理完所有任務(包括None) await producer_task await queue.join() # 等待所有任務被標記為task_done consumer_task.cancel() # 手動取消消費者(因為它在等待更多任務) # 運行asyncio.run(main())
關鍵點說明
✅ queue.put()
和queue.get()
- 都是awaitable的,不會阻塞事件循環。
-
put()
在隊列滿時掛起;get()
在隊列空時掛起。
✅ queue.task_done()
- 每當消費者處理完一個任務,必須調用
task_done()
。 -
queue.join()
會等待所有任務都被task_done()
標記。
✅ 使用None
作為結束信號
- 通知消費者停止。
- 多個消費者時,要小心處理結束信號(避免多個消費者重複處理)。
✅ queue.join()
vs task.cancel()
-
queue.join()
等待所有任務完成。 - 消費者是無限循環,需要用
cancel()
停止。
多個消費者示例(可選)
async def main_multiple_consumers(): queue = asyncio.Queue() num_items = 10 num_consumers = 3 # 創建多個消費者consumers = [asyncio.create_task(consumer(queue)) for _ in range(num_consumers)] producer_task = asyncio.create_task(producer(queue, num_items)) await producer_task await queue.join() # 等待所有item 被處理# 取消所有消費者(它們在等待更多任務) for c in consumers: c.cancel() # 等待它們被取消await asyncio.gather(*consumers, return_exceptions=True) # 運行# asyncio.run(main_multiple_consumers())
小貼士
- 隊列默認無大小限制。可以用
asyncio.Queue(maxsize=5)
限制容量。 - 如果隊列滿了,
put()
會自動等待,直到有空間。 - 不要用
.get_nowait()
或.put_nowait()
在協程中,除非你確定不會出錯,否則會拋異常。
基本上就這些。 asyncio.Queue
是寫異步爬蟲、任務調度、消息處理系統的常用工具,掌握它對寫高效的異步程序很有幫助。

以上是python asyncio隊列示例的詳細內容。更多資訊請關注PHP中文網其他相關文章!

熱AI工具

Undress AI Tool
免費脫衣圖片

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

確保已安裝Python並將其添加到系統PATH,通過終端運行python--version或python3--version驗證;2.將Python文件保存為.py擴展名,如hello.py;3.在SublimeText中創建自定義構建系統,Windows用戶使用{"cmd":["python","-u","$file"]},macOS/Linux用戶使用{"cmd":["python3

要調試Python腳本,需先安裝Python擴展並配置解釋器,然後創建launch.json文件設置調試配置,接著在代碼中設置斷點並按F5啟動調試,腳本將在斷點處暫停,允許檢查變量和單步執行,最終通過查看控制台輸出、添加日誌或調整參數等方式排查問題,確保環境正確後調試過程簡單高效。

ClassmethodsinPythonareboundtotheclassandnottoinstances,allowingthemtobecalledwithoutcreatinganobject.1.Theyaredefinedusingthe@classmethoddecoratorandtakeclsasthefirstparameter,referringtotheclassitself.2.Theycanaccessclassvariablesandarecommonlyused

asyncio.Queue是用於異步任務間安全通信的隊列工具,1.生產者通過awaitqueue.put(item)添加數據,消費者用awaitqueue.get()獲取數據;2.每處理完一項需調用queue.task_done(),以便queue.join()等待所有任務完成;3.使用None作為結束信號通知消費者停止;4.多個消費者時,需發送多個結束信號或在取消任務前確保所有任務已處理完畢;5.隊列支持設置maxsize限制容量,put和get操作自動掛起不阻塞事件循環,程序最終通過canc

yield關鍵字用於定義生成器函數,使其能暫停執行並逐個返回值,之後從暫停處恢復;生成器函數返回生成器對象,具有惰性求值特性,可節省內存,適用於處理大文件、流數據和無限序列等場景,且生成器是迭代器,支持next()和for循環,但無法倒回,必須重新創建才能再次迭代。

InstallSublimeTextandPython,thenconfigureabuildsystembycreatingaPython3.sublime-buildfilewiththeappropriatecmdandselectorsettingstoenablerunningPythonscriptsviaCtrl B.2.OrganizeyourprojectbycreatingadedicatedfolderwithPythonfilesandsupportingdocument

toseepythonOutputiNaseparatePanelInSubliMeText,Usethebuilt-InbuildSystembysavingYourfileWitha.pyExtensionandensionAndPressingCtrl b(orcmd b)

ToavoidgettingblockedwhilewebscrapingwithPython,userealisticrequestheaders,addrandomizeddelays,rotateIPaddresseswithproxies,maintainsessions,respectrobots.txt,anduseheadlessbrowserswhennecessary,ensuringethicalandstealthybehaviortomimicrealusersandpr
