如何在Python中實施用於並發編程的線程安全隊列?
使用queue.Queue是實現Python線程安全隊列的最可靠方法,1. 它內置鎖機制,避免競態條件;2. put()和get()默認阻塞,支持timeout避免無限等待;3. 配合task_done()和join()可協調任務完成;4. 可設置maxsize實現有界隊列以控制內存;5. 自定義封裝可提供更清晰接口;6. 注意queue.Queue僅用於線程間通信,不適用於多進程場景,且若使用join()則必須在消費者中調用task_done(),否則程序可能掛起。
Implementing a thread-safe queue in Python is straightforward thanks to the built-in queue.Queue
class, which is designed specifically for safe use across multiple threads. Here's how to do it properly and what to keep in mind.

Use queue.Queue
for Thread Safety
The most reliable and simplest way to have a thread-safe queue in Python is to use the queue.Queue
module. It handles all locking mechanisms internally, so you don't have to worry about race conditions when putting or getting items.
import queue import threading import time # Create a thread-safe queue q = queue.Queue() def producer(): for i in range(5): item = f"item-{i}" q.put(item) print(f"Produced: {item}") time.sleep(0.1) # Simulate work def consumer(): while True: try: # Use timeout to avoid blocking forever item = q.get(timeout=1) print(f"Consumed: {item}") q.task_done() # Indicate that the task is done except queue.Empty: print("No more items, exiting.") break # Create threads t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) # Start threads t1.start() t2.start() # Wait for both threads to finish t1.join() t2.join()
Key Points for Using queue.Queue
- Blocking Operations :
put()
andget()
block by default, which is useful for coordinating threads. -
task_done()
andjoin()
: If you usetask_done()
after processing each item, you can callq.join()
in the producer to wait until all items are processed. - Avoid
queue.Empty
andqueue.Full
exceptions : Always handle them when using non-blocking or timeout-based calls.
Example with q.join()
:

def producer_with_join(): for i in range(3): q.put(f"task-{i}") q.join() # Wait until all tasks are marked as done print("All tasks completed.") def consumer_with_task_done(): while True: item = q.get() if item is None: break print(f"Processing {item}") time.sleep(0.5) q.task_done() # Mark task as done
Alternative: Bounded Queue (With Size Limit)
You can limit the queue size to control memory usage and enable backpressure:
q = queue.Queue(maxsize=3)
Now, put()
will block if the queue is full until an item is consumed.

Custom Thread-Safe Queue (Advanced Use Case)
While queue.Queue
covers most needs, you might want to wrap it for specific behavior:
import queue from typing import Any class SafeQueue: def __init__(self, maxsize: int = 0): self._q = queue.Queue(maxsize=maxsize) def put(self, item: Any): self._q.put(item) def get(self) -> Any: return self._q.get() def empty(self) -> bool: return self._q.empty() def size(self) -> int: return self._q.qsize() def task_done(self): self._q.task_done() def join(self): self._q.join()
This encapsulates the queue and provides a clean interface.
Important Notes
- Don't use
list
with locks manually unless you have a very specific reason — it's error-prone. -
queue.Queue
is for threads only. For multiprocessing, usemultiprocessing.Queue
. - Always call
task_done()
in consumers if you usejoin()
, or your program may hang.
Basically, just use queue.Queue
— it's well-tested, efficient, and built for this exact purpose.
以上是如何在Python中實施用於並發編程的線程安全隊列?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

熱AI工具

Undress AI Tool
免費脫衣圖片

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

安裝pyodbc:使用pipinstallpyodbc命令安裝庫;2.連接SQLServer:通過pyodbc.connect()方法,使用包含DRIVER、SERVER、DATABASE、UID/PWD或Trusted_Connection的連接字符串,分別支持SQL身份驗證或Windows身份驗證;3.查看已安裝驅動:運行pyodbc.drivers()並篩選含'SQLServer'的驅動名,確保使用如'ODBCDriver17forSQLServer'等正確驅動名稱;4.連接字符串關鍵參數

使用httpx.AsyncClient可高效发起异步HTTP请求,1.基本GET请求通过asyncwith管理客户端并用awaitclient.get发起非阻塞请求;2.并发多个请求时结合asyncio.gather可显著提升性能,总耗时等于最慢请求;3.支持自定义headers、认证、base_url和超时设置;4.可发送POST请求并携带JSON数据;5.注意避免混用同步异步代码,代理支持需注意后端兼容性,适合用于爬虫或API聚合等场景。

pythoncanbeoptimizedFormized-formemory-boundoperationsbyreducingOverHeadThroughGenerator,有效dattratsures,andManagingObjectLifetimes.first,useGeneratorSInsteadoFlistSteadoflistSteadoFocessLargedAtasetSoneItematatime,desceedingingLoadeGingloadInterveringerverneDraineNterveingerverneDraineNterveInterveIntMory.second.second.second.second,Choos,Choos

本文旨在幫助 SQLAlchemy 初學者解決在使用 create_engine 時遇到的 "RemovedIn20Warning" 警告,以及隨之而來的 "ResourceClosedError" 連接關閉錯誤。文章將詳細解釋該警告的原因,並提供消除警告以及修復連接問題的具體步驟和代碼示例,確保你能夠順利地查詢和操作數據庫。

shutil.rmtree()是Python中用於遞歸刪除整個目錄樹的函數,能刪除指定文件夾及其所有內容。 1.基本用法:使用shutil.rmtree(path)刪除目錄,需處理FileNotFoundError、PermissionError等異常。 2.實際應用:可一鍵清除包含子目錄和文件的文件夾,如臨時數據或緩存目錄。 3.注意事項:刪除操作不可恢復;路徑不存在時拋出FileNotFoundError;可能因權限或文件佔用導致失敗。 4.可選參數:可通過ignore_errors=True忽略錯

安裝對應數據庫驅動;2.使用connect()連接數據庫;3.創建cursor對象;4.用execute()或executemany()執行SQL並用參數化查詢防注入;5.用fetchall()等獲取結果;6.修改後需commit();7.最後關閉連接或使用上下文管理器自動處理;完整流程確保安全且高效執行SQL操作。

Python是實現ETL流程的高效工具,1.數據抽取:通過pandas、sqlalchemy、requests等庫可從數據庫、API、文件等來源提取數據;2.數據轉換:使用pandas進行清洗、類型轉換、關聯、聚合等操作,確保數據質量並優化性能;3.數據加載:利用pandas的to_sql方法或云平台SDK將數據寫入目標系統,注意寫入方式與批次處理;4.工具推薦:Airflow、Dagster、Prefect用於流程調度與管理,結合日誌報警與虛擬環境提升穩定性與可維護性。

使用psycopg2.pool.SimpleConnectionPool可有效管理數據庫連接,避免頻繁創建和銷毀連接帶來的性能開銷。 1.創建連接池時指定最小和最大連接數及數據庫連接參數,確保連接池初始化成功;2.通過getconn()獲取連接,執行數據庫操作後使用putconn()將連接歸還池中,禁止直接調用conn.close();3.SimpleConnectionPool是線程安全的,適用於多線程環境;4.推薦結合contextmanager實現上下文管理器,確保連接在異常時也能正確歸還;
