目錄
Use queue.Queue for Thread Safety
Key Points for Using queue.Queue
Alternative: Bounded Queue (With Size Limit)
Custom Thread-Safe Queue (Advanced Use Case)
Important Notes
首頁 後端開發 Python教學 如何在Python中實施用於並發編程的線程安全隊列?

如何在Python中實施用於並發編程的線程安全隊列?

Aug 04, 2025 am 07:40 AM

使用queue.Queue是實現Python線程安全隊列的最可靠方法,1. 它內置鎖機制,避免競態條件;2. put()和get()默認阻塞,支持timeout避免無限等待;3. 配合task_done()和join()可協調任務完成;4. 可設置maxsize實現有界隊列以控制內存;5. 自定義封裝可提供更清晰接口;6. 注意queue.Queue僅用於線程間通信,不適用於多進程場景,且若使用join()則必須在消費者中調用task_done(),否則程序可能掛起。

How to implement a thread-safe queue for concurrent programming in Python?

Implementing a thread-safe queue in Python is straightforward thanks to the built-in queue.Queue class, which is designed specifically for safe use across multiple threads. Here's how to do it properly and what to keep in mind.

How to implement a thread-safe queue for concurrent programming in Python?

Use queue.Queue for Thread Safety

The most reliable and simplest way to have a thread-safe queue in Python is to use the queue.Queue module. It handles all locking mechanisms internally, so you don't have to worry about race conditions when putting or getting items.

 import queue
import threading
import time

# Create a thread-safe queue
q = queue.Queue()

def producer():
    for i in range(5):
        item = f"item-{i}"
        q.put(item)
        print(f"Produced: {item}")
        time.sleep(0.1) # Simulate work

def consumer():
    while True:
        try:
            # Use timeout to avoid blocking forever
            item = q.get(timeout=1)
            print(f"Consumed: {item}")
            q.task_done() # Indicate that the task is done
        except queue.Empty:
            print("No more items, exiting.")
            break

# Create threads
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)

# Start threads
t1.start()
t2.start()

# Wait for both threads to finish
t1.join()
t2.join()

Key Points for Using queue.Queue

  • Blocking Operations : put() and get() block by default, which is useful for coordinating threads.
  • task_done() and join() : If you use task_done() after processing each item, you can call q.join() in the producer to wait until all items are processed.
  • Avoid queue.Empty and queue.Full exceptions : Always handle them when using non-blocking or timeout-based calls.

Example with q.join() :

How to implement a thread-safe queue for concurrent programming in Python?
 def producer_with_join():
    for i in range(3):
        q.put(f"task-{i}")
    q.join() # Wait until all tasks are marked as done
    print("All tasks completed.")

def consumer_with_task_done():
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Processing {item}")
        time.sleep(0.5)
        q.task_done() # Mark task as done

Alternative: Bounded Queue (With Size Limit)

You can limit the queue size to control memory usage and enable backpressure:

 q = queue.Queue(maxsize=3)

Now, put() will block if the queue is full until an item is consumed.

How to implement a thread-safe queue for concurrent programming in Python?

Custom Thread-Safe Queue (Advanced Use Case)

While queue.Queue covers most needs, you might want to wrap it for specific behavior:

 import queue
from typing import Any

class SafeQueue:
    def __init__(self, maxsize: int = 0):
        self._q = queue.Queue(maxsize=maxsize)

    def put(self, item: Any):
        self._q.put(item)

    def get(self) -> Any:
        return self._q.get()

    def empty(self) -> bool:
        return self._q.empty()

    def size(self) -> int:
        return self._q.qsize()

    def task_done(self):
        self._q.task_done()

    def join(self):
        self._q.join()

This encapsulates the queue and provides a clean interface.

Important Notes

  • Don't use list with locks manually unless you have a very specific reason — it's error-prone.
  • queue.Queue is for threads only. For multiprocessing, use multiprocessing.Queue .
  • Always call task_done() in consumers if you use join() , or your program may hang.

Basically, just use queue.Queue — it's well-tested, efficient, and built for this exact purpose.

以上是如何在Python中實施用於並發編程的線程安全隊列?的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

熱門話題

PHP教程
1596
276
Python連接到SQL Server PYODBC示例 Python連接到SQL Server PYODBC示例 Jul 30, 2025 am 02:53 AM

安裝pyodbc:使用pipinstallpyodbc命令安裝庫;2.連接SQLServer:通過pyodbc.connect()方法,使用包含DRIVER、SERVER、DATABASE、UID/PWD或Trusted_Connection的連接字符串,分別支持SQL身份驗證或Windows身份驗證;3.查看已安裝驅動:運行pyodbc.drivers()並篩選含'SQLServer'的驅動名,確保使用如'ODBCDriver17forSQLServer'等正確驅動名稱;4.連接字符串關鍵參數

python httpx async客戶端示例 python httpx async客戶端示例 Jul 29, 2025 am 01:08 AM

使用httpx.AsyncClient可高效发起异步HTTP请求,1.基本GET请求通过asyncwith管理客户端并用awaitclient.get发起非阻塞请求;2.并发多个请求时结合asyncio.gather可显著提升性能,总耗时等于最慢请求;3.支持自定义headers、认证、base_url和超时设置;4.可发送POST请求并携带JSON数据;5.注意避免混用同步异步代码,代理支持需注意后端兼容性,适合用于爬虫或API聚合等场景。

優化用於內存操作的Python 優化用於內存操作的Python Jul 28, 2025 am 03:22 AM

pythoncanbeoptimizedFormized-formemory-boundoperationsbyreducingOverHeadThroughGenerator,有效dattratsures,andManagingObjectLifetimes.first,useGeneratorSInsteadoFlistSteadoflistSteadoFocessLargedAtasetSoneItematatime,desceedingingLoadeGingloadInterveringerverneDraineNterveingerverneDraineNterveInterveIntMory.second.second.second.second,Choos,Choos

SQLAlchemy 2.0 棄用警告及連接關閉問題解決指南 SQLAlchemy 2.0 棄用警告及連接關閉問題解決指南 Aug 05, 2025 pm 07:57 PM

本文旨在幫助 SQLAlchemy 初學者解決在使用 create_engine 時遇到的 "RemovedIn20Warning" 警告,以及隨之而來的 "ResourceClosedError" 連接關閉錯誤。文章將詳細解釋該警告的原因,並提供消除警告以及修復連接問題的具體步驟和代碼示例,確保你能夠順利地查詢和操作數據庫。

python shutil rmtree示例 python shutil rmtree示例 Aug 01, 2025 am 05:47 AM

shutil.rmtree()是Python中用於遞歸刪除整個目錄樹的函數,能刪除指定文件夾及其所有內容。 1.基本用法:使用shutil.rmtree(path)刪除目錄,需處理FileNotFoundError、PermissionError等異常。 2.實際應用:可一鍵清除包含子目錄和文件的文件夾,如臨時數據或緩存目錄。 3.注意事項:刪除操作不可恢復;路徑不存在時拋出FileNotFoundError;可能因權限或文件佔用導致失敗。 4.可選參數:可通過ignore_errors=True忽略錯

如何在Python中執行SQL查詢? 如何在Python中執行SQL查詢? Aug 02, 2025 am 01:56 AM

安裝對應數據庫驅動;2.使用connect()連接數據庫;3.創建cursor對象;4.用execute()或executemany()執行SQL並用參數化查詢防注入;5.用fetchall()等獲取結果;6.修改後需commit();7.最後關閉連接或使用上下文管理器自動處理;完整流程確保安全且高效執行SQL操作。

數據工程ETL的Python 數據工程ETL的Python Aug 02, 2025 am 08:48 AM

Python是實現ETL流程的高效工具,1.數據抽取:通過pandas、sqlalchemy、requests等庫可從數據庫、API、文件等來源提取數據;2.數據轉換:使用pandas進行清洗、類型轉換、關聯、聚合等操作,確保數據質量並優化性能;3.數據加載:利用pandas的to_sql方法或云平台SDK將數據寫入目標系統,注意寫入方式與批次處理;4.工具推薦:Airflow、Dagster、Prefect用於流程調度與管理,結合日誌報警與虛擬環境提升穩定性與可維護性。

Python Psycopg2連接池示例 Python Psycopg2連接池示例 Jul 28, 2025 am 03:01 AM

使用psycopg2.pool.SimpleConnectionPool可有效管理數據庫連接,避免頻繁創建和銷毀連接帶來的性能開銷。 1.創建連接池時指定最小和最大連接數及數據庫連接參數,確保連接池初始化成功;2.通過getconn()獲取連接,執行數據庫操作後使用putconn()將連接歸還池中,禁止直接調用conn.close();3.SimpleConnectionPool是線程安全的,適用於多線程環境;4.推薦結合contextmanager實現上下文管理器,確保連接在異常時也能正確歸還;

See all articles