How to use Python asynchronous methods

WBOY
Release: 2023-05-10 23:28:04
forward
1387 people have browsed it

Why asynchronous programming?

To understand the motivation for asynchronous programming, we first have to understand what limits the speed at which our code runs. Ideally, we want our code to run at the speed of light, skipping our code instantly without any delay. However, the code actually runs much slower due to two factors:

  • CPU time (how long it takes the processor to execute instructions)

  • IO time (time waiting for network requests or storage reads/writes)

When our code is waiting for IO, the CPU is basically idle, waiting for a response from some external device. Normally, the kernel detects this and immediately switches execution to other threads in the system. So if we want to speed up a set of IO-intensive tasks, we can create a thread for each task. When one of the threads stops, waiting for IO, the kernel will switch to another thread to continue processing.

This works well in practice, but has two drawbacks:

  • Threads have overhead (especially in Python)

  • We have no control over when the kernel chooses to switch between threads

For example, if we wanted to execute 10,000 tasks, we would either have to create 10,000 threads, which would take A lot of RAM, either we need to create less number of worker threads and execute tasks with less concurrency. Additionally, initially spawning these threads consumes CPU time.

Because the kernel can choose to switch between threads at any time, races can occur at any time in our code.

Introducing Asynchrony

In traditional synchronous thread-based code, the kernel must detect when a thread is IO bound and choose to switch between threads at will. With Python async, the programmer uses the keyword await to confirm the line of code that declares the IO binding and confirms that permission is granted to perform other tasks. For example, consider the following code that performs a web request:

async def request_google():
    reader, writer = await asyncio.open_connection('google.com', 80)
    writer.write(b'GET / HTTP/2\n\n')
    await writer.drain()
    response = await reader.read()
    return response.decode()
Copy after login

Here, here we see this code await in two places. So, while waiting for our bytes to be sent to the server (writer.drain()), while waiting for the server to reply with some bytes (reader.read()), We know that other code may execute and global variables may change. However, from the start of the function to the first wait, we can ensure that the code runs line by line without switching to other code in the running program. This is the beauty of async.

asyncio is a standard library that allows us to do some interesting things with these asynchronous functions. For example, if we wanted to perform two requests to Google at the same time, we could:

async def request_google_twice():
    response_1, response_2 = await asyncio.gather(request_google(), request_google())
    return response_1, response_2
Copy after login

When we call request_google_twice(), the magic asyncio.gather kicks in One function call, but when we call await writer.drain(), it starts executing the second function call so that both requests happen in parallel. It then waits for the first or second requested writer.drain() call to complete and continues executing the function.

Finally, there is an important detail that was left out: asyncio.run. To actually call an asynchronous function from a regular [synchronous] Python function, we wrap the call in asyncio.run(...):

async def async_main():
    r1, r2 = await request_google_twice()
    print('Response one:', r1)
    print('Response two:', r2)
    return 12

return_val = asyncio.run(async_main())
Copy after login

Note that if we just call async_main() Without calling await ... or asyncio.run(...), nothing will happen. This is just limited by the nature of how asynchronous works.

So, how does asynchronous work, and what are the functions of these magical asyncio.run and asyncio.gather functions? Read below to learn more.

How async works

To understand the magic of async, we first need to understand a simpler Python construct: generator

Generator

A generator is a Python function that returns a series of values ​​one by one (iterable). For example:

def get_numbers():
    print("|| get_numbers begin")
    print("|| get_numbers Giving 1...")
    yield 1
    print("|| get_numbers Giving 2...")
    yield 2
    print("|| get_numbers Giving 3...")
    yield 3
    print("|| get_numbers end")

print("| for begin")
for number in get_numbers():
    print(f"| Got {number}.")
print("| for end")
Copy after login
| for begin
|| get_numbers begin
|| get_numbers Giving 1...
| Got 1.
|| get_numbers Giving 2...
| Got 2.
|| get_numbers Giving 3...
| Got 3.
|| get_numbers end
| for end
Copy after login

So, we see that for each iteration of the for loop, we execute it only once in the generator. We can perform this iteration more explicitly using Python's next() function:

In [3]: generator = get_numbers()                                                                                                                                                            

In [4]: next(generator)                                                                                                                                                                      
|| get_numbers begin
|| get_numbers Giving 1...
Out[4]: 1

In [5]: next(generator)                                                                                                                                                                      
|| get_numbers Giving 2...
Out[5]: 2

In [6]: next(generator)                                                                                                                                                                      
|| get_numbers Giving 3...
Out[6]: 3

In [7]: next(generator)                                                                                                                                                                      
|| get_numbers end
---------------------------------------
StopIteration       Traceback (most recent call last)
 in 
----> 1 next(generator)

StopIteration:
Copy after login

这与异步函数的行为非常相似。正如异步函数从函数开始直到第一次等待时连续执行代码一样,我们第一次调用next()时,生成器将从函数顶部执行到第一个yield 语句。然而,现在我们只是从生成器返回数字。我们将使用相同的思想,但返回一些不同的东西来使用生成器创建类似异步的函数。

使用生成器进行异步

让我们使用生成器来创建我们自己的小型异步框架。

但是,为简单起见,让我们将实际 IO 替换为睡眠(即。time.sleep)。让我们考虑一个需要定期发送更新的应用程序:

def send_updates(count: int, interval_seconds: float):
    for i in range(1, count + 1):
        time.sleep(interval_seconds)
        print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))
Copy after login

因此,如果我们调用send_updates(3, 1.0),它将输出这三条消息,每条消息间隔 1 秒:

[1.0] Sending update 1/3.
[1.0] Sending update 2/3.
[1.0] Sending update 3/3.
Copy after login

现在,假设我们要同时运行几个不同的时间间隔。例如,send_updates(10, 1.0)send_updates(5, 2.0)send_updates(4, 3.0)。我们可以使用线程来做到这一点,如下所示:

threads = [
    threading.Thread(target=send_updates, args=(10, 1.0)),
    threading.Thread(target=send_updates, args=(5, 2.0)),
    threading.Thread(target=send_updates, args=(4, 3.0))
]
for i in threads:
    i.start()
for i in threads:
    i.join()
Copy after login

这可行,在大约 12 秒内完成,但使用具有前面提到的缺点的线程。让我们使用生成器构建相同的东西。

在演示生成器的示例中,我们返回了整数。为了获得类似异步的行为,而不是返回任意值,我们希望返回一些描述要等待的IO的对象。在我们的例子中,我们的“IO”只是一个计时器,它将等待一段时间。因此,让我们创建一个计时器对象,用于此目的:

class AsyncTimer:
    def __init__(self, duration: float):
        self.done_time = time.time() + duration
Copy after login

现在,让我们从我们的函数中产生这个而不是调用time.sleep

def send_updates(count: int, interval_seconds: float):
    for i in range(1, count + 1):
        yield AsyncTimer(interval_seconds)
        print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))
Copy after login

现在,每次我们调用send_updates(...)时调用next(...),我们都会得到一个AsyncTimer对象,告诉我们直到我们应该等待什么时候:

generator = send_updates(3, 1.5)
timer = next(generator)  # [1.5] Sending update 1/3.
print(timer.done_time - time.time())  # 1.498...
Copy after login

由于我们的代码现在实际上并没有调用time.sleep,我们现在可以同时执行另一个send_updates调用。

所以,为了把这一切放在一起,我们需要退后一步,意识到一些事情:

  • 生成器就像部分执行的函数,等待一些 IO(计时器)。

  • 每个部分执行的函数都有一些 IO(计时器),它在继续执行之前等待。

  • 因此,我们程序的当前状态是每个部分执行的函数(生成器)和该函数正在等待的 IO(计时器)对的对列表

  • 现在,要运行我们的程序,我们只需要等到某个 IO 准备就绪(即我们的一个计时器已过期),然后再向前一步执行相应的函数,得到一个阻塞该函数的新 IO。

实现此逻辑为我们提供了以下信息:

# Initialize each generator with a timer of 0 so it immediately executes
generator_timer_pairs = [
    (send_updates(10, 1.0), AsyncTimer(0)),
    (send_updates(5, 2.0), AsyncTimer(0)),
    (send_updates(4, 3.0), AsyncTimer(0))
]

while generator_timer_pairs:
    pair = min(generator_timer_pairs, key=lambda x: x[1].done_time)
    generator, min_timer = pair

    # Wait until this timer is ready
    time.sleep(max(0, min_timer.done_time - time.time()))
    del generator_timer_pairs[generator_timer_pairs.index(pair)]

    try:  # Execute one more step of this function
        new_timer = next(generator)
        generator_timer_pairs.append((generator, new_timer))
    except StopIteration:  # When the function is complete
        pass
Copy after login

有了这个,我们有了一个使用生成器的类似异步函数的工作示例。请注意,当生成器完成时,它会引发StopIteration,并且当我们不再有部分执行的函数(生成器)时,我们的函数就完成了

现在,我们把它包装在一个函数中,我们得到了类似于asyncio.run的东西。结合asyncio.gather运行:

def async_run_all(*generators):
    generator_timer_pairs = [
        (generator, AsyncTimer(0))
        for generator in generators
    ]
    while generator_timer_pairs:
        pair = min(generator_timer_pairs, key=lambda x: x[1].done_time)
        generator, min_timer = pair

        time.sleep(max(0, min_timer.done_time - time.time()))
        del generator_timer_pairs[generator_timer_pairs.index(pair)]

        try:
            new_timer = next(generator)
            generator_timer_pairs.append((generator, new_timer))
        except StopIteration:
            pass

async_run_all(
    send_updates(10, 1.0),
    send_updates(5, 2.0),
    send_updates(4, 3.0)
)
Copy after login

使用 async/await 进行异步

实现我们的caveman版本的asyncio的最后一步是支持Python 3.5中引入的async/await语法。await的行为类似于yield,只是它不是直接返回提供的值,而是返回next((...).__await__())async函数返回“协程”,其行为类似于生成器,但需要使用.send(None)而不是next()(请注意,正如生成器在最初调用时不返回任何内容一样,异步函数在逐步执行之前不会执行任何操作,这解释了我们前面提到的)。

因此,鉴于这些信息,我们只需进行一些调整即可将我们的示例转换为async/await。以下是最终结果:

class AsyncTimer:
    def __init__(self, duration: float):
        self.done_time = time.time() + duration
    def __await__(self):
        yield self

async def send_updates(count: int, interval_seconds: float):
    for i in range(1, count + 1):
        await AsyncTimer(interval_seconds)
        print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))

def _wait_until_io_ready(ios):
    min_timer = min(ios, key=lambda x: x.done_time)
    time.sleep(max(0, min_timer.done_time - time.time()))
    return ios.index(min_timer)

def async_run_all(*coroutines):
    coroutine_io_pairs = [
        (coroutine, AsyncTimer(0))
        for coroutine in coroutines
    ]
    while coroutine_io_pairs:
        ios = [io for cor, io in coroutine_io_pairs]
        ready_index = _wait_until_io_ready(ios)
        coroutine, _ = coroutine_io_pairs.pop(ready_index)

        try:
            new_io = coroutine.send(None)
            coroutine_io_pairs.append((coroutine, new_io))
        except StopIteration:
            pass

async_run_all(
    send_updates(10, 1.0),
    send_updates(5, 2.0),
    send_updates(4, 3.0)
)
Copy after login

我们有了它,我们的迷你异步示例完成了,使用async/await. 现在,您可能已经注意到我将 timer 重命名为 io 并将查找最小计时器的逻辑提取到一个名为_wait_until_io_ready. 这是有意将这个示例与最后一个主题联系起来:真实 IO。

Here we have completed our small async example, using async/await. Now, you may have noticed that I renamed timer to io and extracted the logic for finding the minimum timer into a function called _wait_until_io_ready. This is to connect this example with the last topic: Real IO.

Real IO (not just timers)

So, all these examples are great, but how do they relate to real asyncio, we want to wait on TCP on real IO Socket and file reading/writing? Well, the beauty is in that _wait_until_io_ready function. To get real IO working, all we have to do is create some AsyncReadFile similar to AsyncTimer new object that contains the file descriptor. Then, AsyncReadFile the set of objects we are waiting for corresponds to a set of file descriptors. Finally, we can use the function (syscall) select() to wait for one of these file descriptors to be ready. Since TCP/UDP sockets are implemented using file descriptors, this also covers network requests.

So, all these examples are great, but what do they have to do with real asynchronous IO? Do we want to wait for actual IO, like TCP sockets and file reads/writes? Well, the advantage lies in the _wait_until_io_ready function. To make real IO work, all we need to do is create some new AsyncReadFile, similar to AsyncTimer, which contains a file descriptor. Then, the set of AsyncReadFile objects we are waiting for corresponds to a set of file descriptors. Finally, we can use the function (syscall)select() to wait for one of these file descriptors to be ready. Since TCP/UDP sockets are implemented using file descriptors, this covers network requests as well.

The above is the detailed content of How to use Python asynchronous methods. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:yisu.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!