A-A+

最新python 协程 asyncio案例及详细刨析

2021年08月20日 17:07 学习笔记 暂无评论 共6502字 (阅读1,066 views次)

【注意:此文章为博主原创文章!转载需注意,请带原文链接,至少也要是txt格式!】

首先关于python的协程可以参考官网的说明:https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.create_task 如果难以看懂,可以看本文,本文会做一个解释。

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# -*- coding:utf-8 -*-
# __author__ = shrimp
# __Blog__ = https://woj.app
# __Date__ = 2021/8/20
# __ver__ = python3
 
import asyncio
import time
 
 
##注意这里,就是你最终要执行的协程函数
async def get_pokemon(fut, num):
    print("我开始执行", num)
    await asyncio.sleep(num)
    print("我已经执行完毕", num)
    ##设置 *num* 作为 *fut* Future 的结果。
    fut.set_result(num)  ##类似于 return num
 
 
async def main():
    loop = asyncio.get_running_loop()  ##获取当前事件循环
 
    ##创建一个新的 Future 对象,如果你想获取协程函数执行的任务结果,就必须创建这个
    fut = loop.create_future()
 
    task_list = []  ##创建要并行的任务列表
 
    loop.create_task(get_pokemon(fut, 5))  # 使用高层级的 asyncio.create_task() 函数来创建 Task 对象,也可用低层级的loop.create_task() 或 ensure_future() 函数。不建议手动实例化 Task 对象。
 
    print(await fut)    #等待fut有结果,打印结果
 
 
if __name__ == '__main__':
    tt = time.time()
    asyncio.run(main())
    print(time.time() - tt)

下面再做一个高级函数的详细示例讲解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# -*- coding:utf-8 -*-
# __author__ = shrimp
# __Blog__ = https://woj.app
# __Date__ = 2021/8/20
# __ver__ = python3
 
import asyncio
import time
 
 
##注意这里,就是你最终要执行的协程函数
async def get_pokemon(num):
    print("我开始执行", num)
    await asyncio.sleep(num)
    print("我休息完毕", num, "我继续执行")
    await asyncio.sleep(2)
    print("我已经执行完毕", num)
    return num
 
 
async def main():
    task_list = []  ##创建要并行的任务列表
    for i in [5, 3, 8, 6, 10, 15]:
        ## asyncio.create_task(coro, *, name=None)
        ## 将 coro 协程 封装为一个 Task 并调度其执行。返回 Task 对象。
        ## name 不为 None,它将使用 Task.set_name() 来设为任务的名称。
        ## 该任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError。
        task = asyncio.create_task(get_pokemon(i))  # 其实它的本质是先get_running_loog 然后 loop.create_task
        task_list.append(task)
 
    ## awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)
    ## 并发 运行 aws 序列中的 可等待对象。
    ## 如果 aws 中的某个可等待对象为协程,它将自动被作为一个任务调度。
    ## 如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。
    datas = await asyncio.gather(*task_list)  ##注意结合asyncio.create_task封装任务,你就全明白了
    for data in datas:
        ###除了函数本身外,下面是打印整个任务序列结果
        print(data)  # 等待所有协程结果
 
 
if __name__ == '__main__':
    tt = time.time()
    asyncio.run(main())
    print(time.time() - tt)

下面再放一个官方的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import asyncio
import concurrent.futures
 
 
def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('123123.txt', 'wb') as f:
        for i in range(3000):
            f.write(b"asdflkjasdlfkjasldkfj\n")
 
 
def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))
 
 
async def main():
    loop = asyncio.get_running_loop()
 
    ## Options:
 
    # 1. 在默认循环的执行器中运行一个协程函数(或者说运行一个协程任务)
    result = await loop.run_in_executor(
        None, blocking_io)
    print('default thread pool', result)
 
    # 2. 在一个线程中运行一个协程函数
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)
 
    # 3. 在一个进程中运行一个协程函数
    with concurrent.futures.ProcessPoolExecutor() as pools:
        result = await loop.run_in_executor(
            pools, cpu_bound)
        print('custom process pool', result)
 
 
if __name__ == "__main__":
    asyncio.run(main())

 

对了如果官方的代码报错:concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending,你就复制我上面的,绝对不会出现这个问题。主要就是你没有加入:if __name__ == "__main__": 这句导致的。

 

本文参考:python 百万「并发」基础之异步编程详解

超级详细解释 Python 中的 async await 概念

最新python request使用异步协程async/await详解

同时参考了官方的:

https://docs.python.org/zh-cn/3/library/asyncio-eventloop.html

https://docs.python.org/zh-cn/3/library/asyncio-future.html

https://docs.python.org/zh-cn/3/library/asyncio-task.html

 

 

Python:asyncio.wait 和 asyncio.gather 的异同

1. 异同点综述

  • 相同:从功能上看,asyncio.wait 和 asyncio.gather 实现的效果是相同的,都是把所有 Task 任务结果收集起来。
  • 不同asyncio.wait 会返回两个值:done 和 pendingdone 为已完成的协程 Taskpending 为超时未完成的协程 Task,需通过 future.result 调用 Task 的 result;而asyncio.gather 返回的是所有已完成 Task 的 result,不需要再进行调用或其他操作,就可以得到全部结果。

2. asyncio.wait 用法:

最常见的写法是:await asyncio.wait(task_list)

 

import asyncio
import arrow


def current_time():
    '''
    获取当前时间
    :return:
    '''
    cur_time = arrow.now().to('Asia/Shanghai').format('YYYY-MM-DD HH:mm:ss')
    return cur_time


async def func(sleep_time):
    func_name_suffix = sleep_time        # 使用 sleep_time(函数 I/O 等待时长)作为函数名后缀,以区分任务对象
    print(f"[{current_time()}] 执行异步函数 {func.__name__}-{func_name_suffix}")
    await asyncio.sleep(sleep_time)
    print(f"[{current_time()}] 函数 {func.__name__}-{func_name_suffix} 执行完毕")
    return f"【[{current_time()}] 得到函数 {func.__name__}-{func_name_suffix} 执行结果】"


async def run():
    task_list = []
    for i in range(5):
        task = asyncio.create_task(async_func(i))
        task_list.append(task)

    done, pending = await asyncio.wait(task_list, timeout=None)
    for done_task in done:
        print((f"[{current_time()}] 得到执行结果 {done_task.result()}"))

def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())


if __name__ == '__main__':
    main()

代码执行结果如下:

 

[2020-11-03 22:45:53] 执行异步函数 func-0
[2020-11-03 22:45:53] 执行异步函数 func-1
[2020-11-03 22:45:53] 执行异步函数 func-2
[2020-11-03 22:45:53] 执行异步函数 func-3
[2020-11-03 22:45:53] 执行异步函数 func-4
[2020-11-03 22:45:53] 函数 func-0 执行完毕
[2020-11-03 22:45:54] 函数 func-1 执行完毕
[2020-11-03 22:45:55] 函数 func-2 执行完毕
[2020-11-03 22:45:56] 函数 func-3 执行完毕
[2020-11-03 22:45:57] 函数 func-4 执行完毕
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:57] 得到函数 func-4 执行结果】
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:55] 得到函数 func-2 执行结果】
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:53] 得到函数 func-0 执行结果】
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:56] 得到函数 func-3 执行结果】
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:54] 得到函数 func-1 执行结果】

3. asyncio.gather 用法:

最常见的用法是:await asyncio.gather(*task_list),注意这里 task_list 前面有一个 *

 

import asyncio
import arrow


def current_time():
    '''
    获取当前时间
    :return:
    '''
    cur_time = arrow.now().to('Asia/Shanghai').format('YYYY-MM-DD HH:mm:ss')
    return cur_time


async def func(sleep_time):
    func_name_suffix = sleep_time     # 使用 sleep_time(函数 I/O 等待时长)作为函数名后缀,以区分任务对象
    print(f"[{current_time()}] 执行异步函数 {func.__name__}-{func_name_suffix}")
    await asyncio.sleep(sleep_time)
    print(f"[{current_time()}] 函数 {func.__name__}-{func_name_suffix} 执行完毕")
    return f"【[{current_time()}] 得到函数 {func.__name__}-{func_name_suffix} 执行结果】"


async def run():
    task_list = []
    for i in range(5):
        task = asyncio.create_task(func(i))
        task_list.append(task)
    results = await asyncio.gather(*task_list)
    for result in results:
        print((f"[{current_time()}] 得到执行结果 {result}"))


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())


if __name__ == '__main__':
    main()

代码执行结果如下:

 

[2020-11-03 22:35:54] 执行异步函数 func-0
[2020-11-03 22:35:54] 执行异步函数 func-1
[2020-11-03 22:35:54] 执行异步函数 func-2
[2020-11-03 22:35:54] 执行异步函数 func-3
[2020-11-03 22:35:54] 执行异步函数 func-4
[2020-11-03 22:35:54] 函数 func-0 执行完毕
[2020-11-03 22:35:55] 函数 func-1 执行完毕
[2020-11-03 22:35:56] 函数 func-2 执行完毕
[2020-11-03 22:35:57] 函数 func-3 执行完毕
[2020-11-03 22:35:58] 函数 func-4 执行完毕
[2020-11-03 22:35:58] 得到执行结果 【[2020-11-03 22:35:54] 得到函数 func-0 执行结果】
[2020-11-03 22:35:58] 得到执行结果 【[2020-11-03 22:35:55] 得到函数 func-1 执行结果】
[2020-11-03 22:35:58] 得到执行结果 【[2020-11-03 22:35:56] 得到函数 func-2 执行结果】
[2020-11-03 22:35:58] 得到执行结果 【[2020-11-03 22:35:57] 得到函数 func-3 执行结果】
[2020-11-03 22:35:58] 得到执行结果 【[2020-11-03 22:35:58] 得到函数 func-4 执行结果】

上面的文章来源:https://www.jianshu.com/p/6872bf356af7

布施恩德可便相知重

微信扫一扫打赏

支付宝扫一扫打赏

×

给我留言