首页 > 后端开发 > Python教程 > 掌握 Python 协程:为强大的并发应用程序创建自定义异步工具

掌握 Python 协程:为强大的并发应用程序创建自定义异步工具

DDD
发布: 2024-11-29 12:18:14
原创
976 人浏览过

Master Python Coroutines: Create Custom Async Tools for Powerful Concurrent Apps

Python 中的协程是编写异步代码的强大工具。它们彻底改变了我们处理并发操作的方式,使构建可扩展且高效的应用程序变得更加容易。我花了很多时间使用协程,并且很高兴能分享一些关于创建自定义异步原语的见解。

让我们从基础开始。协程是可以暂停和恢复的特殊函数,允许协作式多任务处理。它们是 Python 的 async/await 语法的基础。当您定义协程时,您实际上是在创建一个函数,该函数可以将控制权交还给事件循环,从而允许其他任务运行。

要创建自定义可等待对象,您需要实现 await 方法。该方法应该返回一个迭代器。这是一个简单的例子:

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42
登录后复制
登录后复制

这个CustomAwaitable类可以与await关键字一起使用,就像内置的awaitables一样。当等待时,它会产生一次控制,然后返回它的值。

但是如果我们想创建更复杂的异步原语怎么办?让我们看看如何实现自定义信号量。信号量用于控制多个协程对共享资源的访问:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value <= 0:
            fut = asyncio.get_running_loop().create_future()
            self._waiters.append(fut)
            await fut
        self._value -= 1

    def release(self):
        self._value += 1
        if self._waiters:
            asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None)

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self.release()

async def worker(semaphore, num):
    async with semaphore:
        print(f"Worker {num} acquired the semaphore")
        await asyncio.sleep(1)
    print(f"Worker {num} released the semaphore")

async def main():
    semaphore = CustomSemaphore(2)
    tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())
登录后复制
登录后复制

此 CustomSemaphore 类实现获取和释放方法,以及异步上下文管理器协议(aenteraexit)。它允许最多两个协程同时获取信号量。

现在,我们来谈谈创建高效的事件循环。虽然 Python 的 asyncio 提供了强大的事件循环实现,但在某些情况下您可能需要自定义一个。这是自定义事件循环的基本示例:

import time
from collections import deque

class CustomEventLoop:
    def __init__(self):
        self._ready = deque()
        self._stopping = False

    def call_soon(self, callback, *args):
        self._ready.append((callback, args))

    def run_forever(self):
        while not self._stopping:
            self._run_once()

    def _run_once(self):
        ntodo = len(self._ready)
        for _ in range(ntodo):
            callback, args = self._ready.popleft()
            callback(*args)

    def stop(self):
        self._stopping = True

    def run_until_complete(self, coro):
        def _done_callback(fut):
            self.stop()

        task = self.create_task(coro)
        task.add_done_callback(_done_callback)
        self.run_forever()
        return task.result()

    def create_task(self, coro):
        task = Task(coro, self)
        self.call_soon(task._step)
        return task

class Task:
    def __init__(self, coro, loop):
        self._coro = coro
        self._loop = loop
        self._done = False
        self._result = None
        self._callbacks = []

    def _step(self):
        try:
            if self._done:
                return
            result = self._coro.send(None)
            if isinstance(result, SleepHandle):
                result._task = self
                self._loop.call_soon(result._wake_up)
            else:
                self._loop.call_soon(self._step)
        except StopIteration as e:
            self.set_result(e.value)

    def set_result(self, result):
        self._result = result
        self._done = True
        for callback in self._callbacks:
            self._loop.call_soon(callback, self)

    def add_done_callback(self, callback):
        if self._done:
            self._loop.call_soon(callback, self)
        else:
            self._callbacks.append(callback)

    def result(self):
        if not self._done:
            raise RuntimeError('Task is not done')
        return self._result

class SleepHandle:
    def __init__(self, duration):
        self._duration = duration
        self._task = None
        self._start_time = time.time()

    def _wake_up(self):
        if time.time() - self._start_time >= self._duration:
            self._task._loop.call_soon(self._task._step)
        else:
            self._task._loop.call_soon(self._wake_up)

async def sleep(duration):
    return SleepHandle(duration)

async def example():
    print("Start")
    await sleep(1)
    print("After 1 second")
    await sleep(2)
    print("After 2 more seconds")
    return "Done"

loop = CustomEventLoop()
result = loop.run_until_complete(example())
print(result)
登录后复制

这个自定义事件循环实现了基本功能,例如运行任务、处理协程,甚至是简单的睡眠功能。它不像 Python 的内置事件循环那样功能丰富,但它演示了核心概念。

编写异步代码的挑战之一是管理任务优先级。虽然Python的asyncio没有为任务提供内置的优先级队列,但我们可以实现自己的:

import asyncio
import heapq

class PriorityEventLoop(asyncio.AbstractEventLoop):
    def __init__(self):
        self._ready = []
        self._stopping = False
        self._clock = 0

    def call_at(self, when, callback, *args, context=None):
        handle = asyncio.Handle(callback, args, self, context)
        heapq.heappush(self._ready, (when, handle))
        return handle

    def call_later(self, delay, callback, *args, context=None):
        return self.call_at(self._clock + delay, callback, *args, context=context)

    def call_soon(self, callback, *args, context=None):
        return self.call_at(self._clock, callback, *args, context=context)

    def time(self):
        return self._clock

    def stop(self):
        self._stopping = True

    def is_running(self):
        return not self._stopping

    def run_forever(self):
        while self._ready and not self._stopping:
            self._run_once()

    def _run_once(self):
        if not self._ready:
            return
        when, handle = heapq.heappop(self._ready)
        self._clock = when
        handle._run()

    def create_task(self, coro):
        return asyncio.Task(coro, loop=self)

    def run_until_complete(self, future):
        asyncio.futures._chain_future(future, self.create_future())
        self.run_forever()
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')
        return future.result()

    def create_future(self):
        return asyncio.Future(loop=self)

async def low_priority_task():
    print("Low priority task started")
    await asyncio.sleep(2)
    print("Low priority task finished")

async def high_priority_task():
    print("High priority task started")
    await asyncio.sleep(1)
    print("High priority task finished")

async def main():
    loop = asyncio.get_event_loop()
    loop.call_later(0.1, loop.create_task, low_priority_task())
    loop.call_later(0, loop.create_task, high_priority_task())
    await asyncio.sleep(3)

asyncio.run(main())
登录后复制

此 PriorityEventLoop 使用堆队列根据任务的计划执行时间来管理任务。您可以通过安排具有不同延迟的任务来分配优先级。

优雅地处理取消是使用协程的另一个重要方面。以下是如何实现可取消任务的示例:

import asyncio

async def cancellable_operation():
    try:
        print("Operation started")
        await asyncio.sleep(5)
        print("Operation completed")
    except asyncio.CancelledError:
        print("Operation was cancelled")
        # Perform any necessary cleanup
        raise  # Re-raise the CancelledError

async def main():
    task = asyncio.create_task(cancellable_operation())
    await asyncio.sleep(2)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Main: task was cancelled")

asyncio.run(main())
登录后复制

在此示例中,cancellable_operation 捕获 CancelledError,执行任何必要的清理,然后重新引发异常。这允许优雅地处理取消,同时仍然传播取消状态。

让我们探索实现自定义异步迭代器。这些对于创建可以异步迭代的序列非常有用:

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42
登录后复制
登录后复制

这个 AsyncRange 类实现了异步迭代器协议,允许它在异步 for 循环中使用。

最后,让我们看看如何实现自定义异步上下文管理器。这些对于管理需要异步获取和释放的资源很有用:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value <= 0:
            fut = asyncio.get_running_loop().create_future()
            self._waiters.append(fut)
            await fut
        self._value -= 1

    def release(self):
        self._value += 1
        if self._waiters:
            asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None)

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self.release()

async def worker(semaphore, num):
    async with semaphore:
        print(f"Worker {num} acquired the semaphore")
        await asyncio.sleep(1)
    print(f"Worker {num} released the semaphore")

async def main():
    semaphore = CustomSemaphore(2)
    tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())
登录后复制
登录后复制

这个 AsyncResource 类实现了 aenteraexit 方法,允许它与 async with 语句一起使用。

总之,Python 的协程系统为构建自定义异步原语提供了强大的基础。通过了解底层机制和协议,您可以针对特定的异步挑战创建量身定制的解决方案,优化复杂并发场景中的性能,并扩展 Python 的异步功能。请记住,虽然这些自定义实现非常适合学习和特定用例,但 Python 的内置 asyncio 库经过了高度优化,应该成为大多数场景的首选。快乐编码!


我们的创作

一定要看看我们的创作:

投资者中心 | 智能生活 | 时代与回响 | 令人费解的谜团 | 印度教 | 精英开发 | JS学校


我们在媒体上

科技考拉洞察 | 时代与回响世界 | 投资者中央媒体 | 令人费解的谜团 | 科学与时代媒介 | 现代印度教

以上是掌握 Python 协程:为强大的并发应用程序创建自定义异步工具的详细内容。更多信息请关注PHP中文网其他相关文章!

来源:dev.to
本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板