Python 中的协程是编写异步代码的强大工具。它们彻底改变了我们处理并发操作的方式,使构建可扩展且高效的应用程序变得更加容易。我花了很多时间使用协程,并且很高兴能分享一些关于创建自定义异步原语的见解。
让我们从基础开始。协程是可以暂停和恢复的特殊函数,允许协作式多任务处理。它们是 Python 的 async/await 语法的基础。当您定义协程时,您实际上是在创建一个函数,该函数可以将控制权交还给事件循环,从而允许其他任务运行。
要创建自定义可等待对象,您需要实现 await 方法。该方法应该返回一个迭代器。这是一个简单的例子:
class CustomAwaitable: def __init__(self, value): self.value = value def __await__(self): yield return self.value async def use_custom_awaitable(): result = await CustomAwaitable(42) print(result) # Output: 42
这个CustomAwaitable类可以与await关键字一起使用,就像内置的awaitables一样。当等待时,它会产生一次控制,然后返回它的值。
但是如果我们想创建更复杂的异步原语怎么办?让我们看看如何实现自定义信号量。信号量用于控制多个协程对共享资源的访问:
import asyncio class CustomSemaphore: def __init__(self, value=1): self._value = value self._waiters = [] async def acquire(self): while self._value <= 0: fut = asyncio.get_running_loop().create_future() self._waiters.append(fut) await fut self._value -= 1 def release(self): self._value += 1 if self._waiters: asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None) async def __aenter__(self): await self.acquire() return self async def __aexit__(self, exc_type, exc, tb): self.release() async def worker(semaphore, num): async with semaphore: print(f"Worker {num} acquired the semaphore") await asyncio.sleep(1) print(f"Worker {num} released the semaphore") async def main(): semaphore = CustomSemaphore(2) tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)] await asyncio.gather(*tasks) asyncio.run(main())
此 CustomSemaphore 类实现获取和释放方法,以及异步上下文管理器协议(aenter 和 aexit)。它允许最多两个协程同时获取信号量。
现在,我们来谈谈创建高效的事件循环。虽然 Python 的 asyncio 提供了强大的事件循环实现,但在某些情况下您可能需要自定义一个。这是自定义事件循环的基本示例:
import time from collections import deque class CustomEventLoop: def __init__(self): self._ready = deque() self._stopping = False def call_soon(self, callback, *args): self._ready.append((callback, args)) def run_forever(self): while not self._stopping: self._run_once() def _run_once(self): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) def stop(self): self._stopping = True def run_until_complete(self, coro): def _done_callback(fut): self.stop() task = self.create_task(coro) task.add_done_callback(_done_callback) self.run_forever() return task.result() def create_task(self, coro): task = Task(coro, self) self.call_soon(task._step) return task class Task: def __init__(self, coro, loop): self._coro = coro self._loop = loop self._done = False self._result = None self._callbacks = [] def _step(self): try: if self._done: return result = self._coro.send(None) if isinstance(result, SleepHandle): result._task = self self._loop.call_soon(result._wake_up) else: self._loop.call_soon(self._step) except StopIteration as e: self.set_result(e.value) def set_result(self, result): self._result = result self._done = True for callback in self._callbacks: self._loop.call_soon(callback, self) def add_done_callback(self, callback): if self._done: self._loop.call_soon(callback, self) else: self._callbacks.append(callback) def result(self): if not self._done: raise RuntimeError('Task is not done') return self._result class SleepHandle: def __init__(self, duration): self._duration = duration self._task = None self._start_time = time.time() def _wake_up(self): if time.time() - self._start_time >= self._duration: self._task._loop.call_soon(self._task._step) else: self._task._loop.call_soon(self._wake_up) async def sleep(duration): return SleepHandle(duration) async def example(): print("Start") await sleep(1) print("After 1 second") await sleep(2) print("After 2 more seconds") return "Done" loop = CustomEventLoop() result = loop.run_until_complete(example()) print(result)
这个自定义事件循环实现了基本功能,例如运行任务、处理协程,甚至是简单的睡眠功能。它不像 Python 的内置事件循环那样功能丰富,但它演示了核心概念。
编写异步代码的挑战之一是管理任务优先级。虽然Python的asyncio没有为任务提供内置的优先级队列,但我们可以实现自己的:
import asyncio import heapq class PriorityEventLoop(asyncio.AbstractEventLoop): def __init__(self): self._ready = [] self._stopping = False self._clock = 0 def call_at(self, when, callback, *args, context=None): handle = asyncio.Handle(callback, args, self, context) heapq.heappush(self._ready, (when, handle)) return handle def call_later(self, delay, callback, *args, context=None): return self.call_at(self._clock + delay, callback, *args, context=context) def call_soon(self, callback, *args, context=None): return self.call_at(self._clock, callback, *args, context=context) def time(self): return self._clock def stop(self): self._stopping = True def is_running(self): return not self._stopping def run_forever(self): while self._ready and not self._stopping: self._run_once() def _run_once(self): if not self._ready: return when, handle = heapq.heappop(self._ready) self._clock = when handle._run() def create_task(self, coro): return asyncio.Task(coro, loop=self) def run_until_complete(self, future): asyncio.futures._chain_future(future, self.create_future()) self.run_forever() if not future.done(): raise RuntimeError('Event loop stopped before Future completed.') return future.result() def create_future(self): return asyncio.Future(loop=self) async def low_priority_task(): print("Low priority task started") await asyncio.sleep(2) print("Low priority task finished") async def high_priority_task(): print("High priority task started") await asyncio.sleep(1) print("High priority task finished") async def main(): loop = asyncio.get_event_loop() loop.call_later(0.1, loop.create_task, low_priority_task()) loop.call_later(0, loop.create_task, high_priority_task()) await asyncio.sleep(3) asyncio.run(main())
此 PriorityEventLoop 使用堆队列根据任务的计划执行时间来管理任务。您可以通过安排具有不同延迟的任务来分配优先级。
优雅地处理取消是使用协程的另一个重要方面。以下是如何实现可取消任务的示例:
import asyncio async def cancellable_operation(): try: print("Operation started") await asyncio.sleep(5) print("Operation completed") except asyncio.CancelledError: print("Operation was cancelled") # Perform any necessary cleanup raise # Re-raise the CancelledError async def main(): task = asyncio.create_task(cancellable_operation()) await asyncio.sleep(2) task.cancel() try: await task except asyncio.CancelledError: print("Main: task was cancelled") asyncio.run(main())
在此示例中,cancellable_operation 捕获 CancelledError,执行任何必要的清理,然后重新引发异常。这允许优雅地处理取消,同时仍然传播取消状态。
让我们探索实现自定义异步迭代器。这些对于创建可以异步迭代的序列非常有用:
class CustomAwaitable: def __init__(self, value): self.value = value def __await__(self): yield return self.value async def use_custom_awaitable(): result = await CustomAwaitable(42) print(result) # Output: 42
这个 AsyncRange 类实现了异步迭代器协议,允许它在异步 for 循环中使用。
最后,让我们看看如何实现自定义异步上下文管理器。这些对于管理需要异步获取和释放的资源很有用:
import asyncio class CustomSemaphore: def __init__(self, value=1): self._value = value self._waiters = [] async def acquire(self): while self._value <= 0: fut = asyncio.get_running_loop().create_future() self._waiters.append(fut) await fut self._value -= 1 def release(self): self._value += 1 if self._waiters: asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None) async def __aenter__(self): await self.acquire() return self async def __aexit__(self, exc_type, exc, tb): self.release() async def worker(semaphore, num): async with semaphore: print(f"Worker {num} acquired the semaphore") await asyncio.sleep(1) print(f"Worker {num} released the semaphore") async def main(): semaphore = CustomSemaphore(2) tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)] await asyncio.gather(*tasks) asyncio.run(main())
这个 AsyncResource 类实现了 aenter 和 aexit 方法,允许它与 async with 语句一起使用。
总之,Python 的协程系统为构建自定义异步原语提供了强大的基础。通过了解底层机制和协议,您可以针对特定的异步挑战创建量身定制的解决方案,优化复杂并发场景中的性能,并扩展 Python 的异步功能。请记住,虽然这些自定义实现非常适合学习和特定用例,但 Python 的内置 asyncio 库经过了高度优化,应该成为大多数场景的首选。快乐编码!
一定要看看我们的创作:
投资者中心 | 智能生活 | 时代与回响 | 令人费解的谜团 | 印度教 | 精英开发 | JS学校
科技考拉洞察 | 时代与回响世界 | 投资者中央媒体 | 令人费解的谜团 | 科学与时代媒介 | 现代印度教
以上是掌握 Python 协程:为强大的并发应用程序创建自定义异步工具的详细内容。更多信息请关注PHP中文网其他相关文章!