


How to implement a thread-safe queue for concurrent programming in Python?
Use queue.Queue is the most reliable method to implement Python thread-safe queues. 1. It has a built-in lock mechanism to avoid race conditions; 2. Put() and get() block by default, and support timeout to avoid infinite waiting; 3. Use task_done() and join() to coordinate task completion; 4. You can set maxsize to implement bounded queues to control memory; 5. Custom encapsulation can provide a clearer interface; 6. Note that queue.Queue is only used for inter-thread communication and is not suitable for multi-process scenarios. If you use join(), task_done() must be called in the consumer, otherwise the program may hang.
Implementing a thread-safe queue in Python is straightforward thanks to the built-in queue.Queue
class, which is designed specifically for safe use across multiple threads. Here's how to do it properly and what to keep in mind.

Use queue.Queue
for Thread Safety
The most reliable and simplest way to have a thread-safe queue in Python is to use the queue.Queue
module. It handles all locking mechanisms internally, so you don't have to worry about race conditions when putting or getting items.
import queue import threading import time # Create a thread-safe queue q = queue.Queue() def producer(): for i in range(5): item = f"item-{i}" q.put(item) print(f"Produced: {item}") time.sleep(0.1) # Simulate work def consumer(): While True: try: # Use timeout to avoid blocking forever item = q.get(timeout=1) print(f"Consumed: {item}") q.task_done() # Indicate that the task is done except queue.Empty: print("No more items, exiting.") break # Create threads t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) # Start threads t1.start() t2.start() # Wait for both threads to finish t1.join() t2.join()
Key Points for Using queue.Queue
- Blocking Operations :
put()
andget()
block by default, which is useful for coordinating threads. -
task_done()
andjoin()
: If you usetask_done()
after processing each item, you can callq.join()
in the producer to wait until all items are processed. - Avoid
queue.Empty
andqueue.Full
exceptions : Always handle them when using non-blocking or timeout-based calls.
Example with q.join()
:

def producer_with_join(): for i in range(3): q.put(f"task-{i}") q.join() # Wait until all tasks are marked as done print("All tasks completed.") def consumer_with_task_done(): While True: item = q.get() if item is None: break print(f"Processing {item}") time.sleep(0.5) q.task_done() # Mark task as done
Alternative: Bounded Queue (With Size Limit)
You can limit the queue size to control memory usage and enable backpressure:
q = queue.Queue(maxsize=3)
Now, put()
will block if the queue is full until an item is consumed.

Custom Thread-Safe Queue (Advanced Use Case)
While queue.Queue
covers most needs, you might want to wrap it for specific behavior:
import queue from typing import Any class SafeQueue: def __init__(self, maxsize: int = 0): self._q = queue.Queue(maxsize=maxsize) def put(self, item: Any): self._q.put(item) def get(self) -> Any: return self._q.get() def empty(self) -> bool: return self._q.empty() def size(self) -> int: return self._q.qsize() def task_done(self): self._q.task_done() def join(self): self._q.join()
This encapsulates the queue and provides a clean interface.
Important Notes
- Don't use
list
with locks manually unless you have a very specific reason — it's error-prone. -
queue.Queue
is for threads only. For multiprocessing, usemultiprocessing.Queue
. - Always call
task_done()
in consumers if you usejoin()
, or your program may hang.
Basically, just use queue.Queue
— it's well-tested, efficient, and built for this exact purpose.
The above is the detailed content of How to implement a thread-safe queue for concurrent programming in Python?. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undress AI Tool
Undress images for free

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Use subprocess.run() to safely execute shell commands and capture output. It is recommended to pass parameters in lists to avoid injection risks; 2. When shell characteristics are required, you can set shell=True, but beware of command injection; 3. Use subprocess.Popen to realize real-time output processing; 4. Set check=True to throw exceptions when the command fails; 5. You can directly call chains to obtain output in a simple scenario; you should give priority to subprocess.run() in daily life to avoid using os.system() or deprecated modules. The above methods override the core usage of executing shell commands in Python.

Use Seaborn's jointplot to quickly visualize the relationship and distribution between two variables; 2. The basic scatter plot is implemented by sns.jointplot(data=tips,x="total_bill",y="tip",kind="scatter"), the center is a scatter plot, and the histogram is displayed on the upper and lower and right sides; 3. Add regression lines and density information to a kind="reg", and combine marginal_kws to set the edge plot style; 4. When the data volume is large, it is recommended to use "hex"

Use httpx.AsyncClient to efficiently initiate asynchronous HTTP requests. 1. Basic GET requests manage clients through asyncwith and use awaitclient.get to initiate non-blocking requests; 2. Combining asyncio.gather to combine with asyncio.gather can significantly improve performance, and the total time is equal to the slowest request; 3. Support custom headers, authentication, base_url and timeout settings; 4. Can send POST requests and carry JSON data; 5. Pay attention to avoid mixing synchronous asynchronous code. Proxy support needs to pay attention to back-end compatibility, which is suitable for crawlers or API aggregation and other scenarios.

String lists can be merged with join() method, such as ''.join(words) to get "HelloworldfromPython"; 2. Number lists must be converted to strings with map(str, numbers) or [str(x)forxinnumbers] before joining; 3. Any type list can be directly converted to strings with brackets and quotes, suitable for debugging; 4. Custom formats can be implemented by generator expressions combined with join(), such as '|'.join(f"[{item}]"foriteminitems) output"[a]|[

Pythoncanbeoptimizedformemory-boundoperationsbyreducingoverheadthroughgenerators,efficientdatastructures,andmanagingobjectlifetimes.First,usegeneratorsinsteadofliststoprocesslargedatasetsoneitematatime,avoidingloadingeverythingintomemory.Second,choos

Install pyodbc: Use the pipinstallpyodbc command to install the library; 2. Connect SQLServer: Use the connection string containing DRIVER, SERVER, DATABASE, UID/PWD or Trusted_Connection through the pyodbc.connect() method, and support SQL authentication or Windows authentication respectively; 3. Check the installed driver: Run pyodbc.drivers() and filter the driver name containing 'SQLServer' to ensure that the correct driver name is used such as 'ODBCDriver17 for SQLServer'; 4. Key parameters of the connection string

shutil.rmtree() is a function in Python that recursively deletes the entire directory tree. It can delete specified folders and all contents. 1. Basic usage: Use shutil.rmtree(path) to delete the directory, and you need to handle FileNotFoundError, PermissionError and other exceptions. 2. Practical application: You can clear folders containing subdirectories and files in one click, such as temporary data or cached directories. 3. Notes: The deletion operation is not restored; FileNotFoundError is thrown when the path does not exist; it may fail due to permissions or file occupation. 4. Optional parameters: Errors can be ignored by ignore_errors=True

This article aims to help SQLAlchemy beginners resolve the "RemovedIn20Warning" warning encountered when using create_engine and the subsequent "ResourceClosedError" connection closing error. The article will explain the cause of this warning in detail and provide specific steps and code examples to eliminate the warning and fix connection issues to ensure that you can query and operate the database smoothly.
