Python 中的協程是編寫非同步程式碼的強大工具。它們徹底改變了我們處理並發操作的方式,使建立可擴展且高效的應用程式變得更加容易。我花了很多時間使用協程,很高興能分享一些關於創建自訂非同步原語的見解。
讓我們從基礎開始。協程是可以暫停和恢復的特殊函數,允許協作式多工處理。它們是 Python 的 async/await 語法的基礎。當您定義協程時,您實際上正在建立一個函數,該函數可以將控制權交還給事件循環,從而允許其他任務運行。
要建立自訂可等待對象,您需要實作 await 方法。該方法應該傳回一個迭代器。這是一個簡單的例子:
class CustomAwaitable: def __init__(self, value): self.value = value def __await__(self): yield return self.value async def use_custom_awaitable(): result = await CustomAwaitable(42) print(result) # Output: 42
這個CustomAwaitable類別可以與await關鍵字一起使用,就像內建的awaitables一樣。當等待時,它會產生一次控制,然後傳回它的值。
但是如果我們想創建更複雜的非同步原語怎麼辦?讓我們看看如何實現自訂信號量。信號量用於控制多個協程對共享資源的存取:
import asyncio class CustomSemaphore: def __init__(self, value=1): self._value = value self._waiters = [] async def acquire(self): while self._value <= 0: fut = asyncio.get_running_loop().create_future() self._waiters.append(fut) await fut self._value -= 1 def release(self): self._value += 1 if self._waiters: asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None) async def __aenter__(self): await self.acquire() return self async def __aexit__(self, exc_type, exc, tb): self.release() async def worker(semaphore, num): async with semaphore: print(f"Worker {num} acquired the semaphore") await asyncio.sleep(1) print(f"Worker {num} released the semaphore") async def main(): semaphore = CustomSemaphore(2) tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)] await asyncio.gather(*tasks) asyncio.run(main())
此 CustomSemaphore 類別實作取得和釋放方法,以及非同步上下文管理器協定(aenter 和 aexit)。它允許最多兩個協程同時獲取信號量。
現在,我們來談談創造高效率的事件循環。雖然 Python 的 asyncio 提供了強大的事件循環實現,但在某些情況下您可能需要自訂一個。這是自訂事件循環的基本範例:
import time from collections import deque class CustomEventLoop: def __init__(self): self._ready = deque() self._stopping = False def call_soon(self, callback, *args): self._ready.append((callback, args)) def run_forever(self): while not self._stopping: self._run_once() def _run_once(self): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) def stop(self): self._stopping = True def run_until_complete(self, coro): def _done_callback(fut): self.stop() task = self.create_task(coro) task.add_done_callback(_done_callback) self.run_forever() return task.result() def create_task(self, coro): task = Task(coro, self) self.call_soon(task._step) return task class Task: def __init__(self, coro, loop): self._coro = coro self._loop = loop self._done = False self._result = None self._callbacks = [] def _step(self): try: if self._done: return result = self._coro.send(None) if isinstance(result, SleepHandle): result._task = self self._loop.call_soon(result._wake_up) else: self._loop.call_soon(self._step) except StopIteration as e: self.set_result(e.value) def set_result(self, result): self._result = result self._done = True for callback in self._callbacks: self._loop.call_soon(callback, self) def add_done_callback(self, callback): if self._done: self._loop.call_soon(callback, self) else: self._callbacks.append(callback) def result(self): if not self._done: raise RuntimeError('Task is not done') return self._result class SleepHandle: def __init__(self, duration): self._duration = duration self._task = None self._start_time = time.time() def _wake_up(self): if time.time() - self._start_time >= self._duration: self._task._loop.call_soon(self._task._step) else: self._task._loop.call_soon(self._wake_up) async def sleep(duration): return SleepHandle(duration) async def example(): print("Start") await sleep(1) print("After 1 second") await sleep(2) print("After 2 more seconds") return "Done" loop = CustomEventLoop() result = loop.run_until_complete(example()) print(result)
這個自訂事件循環實現了基本功能,例如運行任務、處理協程,甚至是簡單的睡眠功能。它不像 Python 的內建事件循環那樣功能豐富,但它演示了核心概念。
編寫非同步程式碼的挑戰之一是管理任務優先順序。雖然Python的asyncio沒有為任務提供內建的優先權隊列,但我們可以實現自己的:
import asyncio import heapq class PriorityEventLoop(asyncio.AbstractEventLoop): def __init__(self): self._ready = [] self._stopping = False self._clock = 0 def call_at(self, when, callback, *args, context=None): handle = asyncio.Handle(callback, args, self, context) heapq.heappush(self._ready, (when, handle)) return handle def call_later(self, delay, callback, *args, context=None): return self.call_at(self._clock + delay, callback, *args, context=context) def call_soon(self, callback, *args, context=None): return self.call_at(self._clock, callback, *args, context=context) def time(self): return self._clock def stop(self): self._stopping = True def is_running(self): return not self._stopping def run_forever(self): while self._ready and not self._stopping: self._run_once() def _run_once(self): if not self._ready: return when, handle = heapq.heappop(self._ready) self._clock = when handle._run() def create_task(self, coro): return asyncio.Task(coro, loop=self) def run_until_complete(self, future): asyncio.futures._chain_future(future, self.create_future()) self.run_forever() if not future.done(): raise RuntimeError('Event loop stopped before Future completed.') return future.result() def create_future(self): return asyncio.Future(loop=self) async def low_priority_task(): print("Low priority task started") await asyncio.sleep(2) print("Low priority task finished") async def high_priority_task(): print("High priority task started") await asyncio.sleep(1) print("High priority task finished") async def main(): loop = asyncio.get_event_loop() loop.call_later(0.1, loop.create_task, low_priority_task()) loop.call_later(0, loop.create_task, high_priority_task()) await asyncio.sleep(3) asyncio.run(main())
此 PriorityEventLoop 使用堆疊佇列根據任務的計畫執行時間來管理任務。您可以透過安排具有不同延遲的任務來分配優先順序。
優雅地處理取消是使用協程的另一個重要面向。以下是如何實現可取消任務的範例:
import asyncio async def cancellable_operation(): try: print("Operation started") await asyncio.sleep(5) print("Operation completed") except asyncio.CancelledError: print("Operation was cancelled") # Perform any necessary cleanup raise # Re-raise the CancelledError async def main(): task = asyncio.create_task(cancellable_operation()) await asyncio.sleep(2) task.cancel() try: await task except asyncio.CancelledError: print("Main: task was cancelled") asyncio.run(main())
在此範例中,cancellable_operation 捕捉 CancelledError,執行任何必要的清理,然後重新引發例外狀況。這允許優雅地處理取消,同時仍然傳播取消狀態。
讓我們來探索實作自訂非同步迭代器。這些對於創建可以非同步迭代的序列非常有用:
class CustomAwaitable: def __init__(self, value): self.value = value def __await__(self): yield return self.value async def use_custom_awaitable(): result = await CustomAwaitable(42) print(result) # Output: 42
這個 AsyncRange 類別實作了非同步迭代器協議,允許它在非同步 for 迴圈中使用。
最後,讓我們看看如何實作自訂非同步上下文管理器。這些對於管理需要非同步取得和釋放的資源很有用:
import asyncio class CustomSemaphore: def __init__(self, value=1): self._value = value self._waiters = [] async def acquire(self): while self._value <= 0: fut = asyncio.get_running_loop().create_future() self._waiters.append(fut) await fut self._value -= 1 def release(self): self._value += 1 if self._waiters: asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None) async def __aenter__(self): await self.acquire() return self async def __aexit__(self, exc_type, exc, tb): self.release() async def worker(semaphore, num): async with semaphore: print(f"Worker {num} acquired the semaphore") await asyncio.sleep(1) print(f"Worker {num} released the semaphore") async def main(): semaphore = CustomSemaphore(2) tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)] await asyncio.gather(*tasks) asyncio.run(main())
這個 AsyncResource 類別實作了 aenter 和 aexit 方法,允許它與 async with 語句一起使用。
總之,Python 的協程系統為建立自訂非同步原語提供了強大的基礎。透過了解底層機制和協議,您可以針對特定的非同步挑戰創建量身定制的解決方案,優化複雜並發場景中的效能,並擴展 Python 的非同步功能。請記住,雖然這些自訂實作非常適合學習和特定用例,但 Python 的內建 asyncio 程式庫經過了高度優化,應該成為大多數場景的首選。快樂編碼!
一定要看看我們的創作:
投資者中心 | 智能生活 | 時代與迴響 | 令人費解的謎團 | 印度教 | 精英開發 | JS學校
科技無尾熊洞察 | 時代與迴響世界 | 投資人中央媒體 | 令人費解的謎團 | | 令人費解的謎團 | >科學與時代媒介 |
現代印度教以上是掌握 Python 協程:為強大的並發應用程式建立自訂非同步工具的詳細內容。更多資訊請關注PHP中文網其他相關文章!