最新python 协程 asyncio案例及详细刨析

【注意:此文章为博主原创文章!转载需注意,请带原文链接,至少也要是txt格式!】
首先关于python的协程可以参考官网的说明:https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.create_task 如果难以看懂,可以看本文,本文会做一个解释。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | # -*- coding:utf-8 -*- # __author__ = shrimp # __Blog__ = https://woj.app # __Date__ = 2021/8/20 # __ver__ = python3 import asyncio import time ##注意这里,就是你最终要执行的协程函数 async def get_pokemon(fut, num): print("我开始执行", num) await asyncio.sleep(num) print("我已经执行完毕", num) ##设置 *num* 作为 *fut* Future 的结果。 fut.set_result(num) ##类似于 return num async def main(): loop = asyncio.get_running_loop() ##获取当前事件循环 ##创建一个新的 Future 对象,如果你想获取协程函数执行的任务结果,就必须创建这个 fut = loop.create_future() task_list = [] ##创建要并行的任务列表 loop.create_task(get_pokemon(fut, 5)) # 使用高层级的 asyncio.create_task() 函数来创建 Task 对象,也可用低层级的loop.create_task() 或 ensure_future() 函数。不建议手动实例化 Task 对象。 print(await fut) #等待fut有结果,打印结果 if __name__ == '__main__': tt = time.time() asyncio.run(main()) print(time.time() - tt) |
下面再做一个高级函数的详细示例讲解:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | # -*- coding:utf-8 -*- # __author__ = shrimp # __Blog__ = https://woj.app # __Date__ = 2021/8/20 # __ver__ = python3 import asyncio import time ##注意这里,就是你最终要执行的协程函数 async def get_pokemon(num): print("我开始执行", num) await asyncio.sleep(num) print("我休息完毕", num, "我继续执行") await asyncio.sleep(2) print("我已经执行完毕", num) return num async def main(): task_list = [] ##创建要并行的任务列表 for i in [5, 3, 8, 6, 10, 15]: ## asyncio.create_task(coro, *, name=None) ## 将 coro 协程 封装为一个 Task 并调度其执行。返回 Task 对象。 ## name 不为 None,它将使用 Task.set_name() 来设为任务的名称。 ## 该任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError。 task = asyncio.create_task(get_pokemon(i)) # 其实它的本质是先get_running_loog 然后 loop.create_task task_list.append(task) ## awaitable asyncio.gather(*aws, loop=None, return_exceptions=False) ## 并发 运行 aws 序列中的 可等待对象。 ## 如果 aws 中的某个可等待对象为协程,它将自动被作为一个任务调度。 ## 如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。 datas = await asyncio.gather(*task_list) ##注意结合asyncio.create_task封装任务,你就全明白了 for data in datas: ###除了函数本身外,下面是打印整个任务序列结果 print(data) # 等待所有协程结果 if __name__ == '__main__': tt = time.time() asyncio.run(main()) print(time.time() - tt) |
下面再放一个官方的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | import asyncio import concurrent.futures def blocking_io(): # File operations (such as logging) can block the # event loop: run them in a thread pool. with open('123123.txt', 'wb') as f: for i in range(3000): f.write(b"asdflkjasdlfkjasldkfj\n") def cpu_bound(): # CPU-bound operations will block the event loop: # in general it is preferable to run them in a # process pool. return sum(i * i for i in range(10 ** 7)) async def main(): loop = asyncio.get_running_loop() ## Options: # 1. 在默认循环的执行器中运行一个协程函数(或者说运行一个协程任务) result = await loop.run_in_executor( None, blocking_io) print('default thread pool', result) # 2. 在一个线程中运行一个协程函数 with concurrent.futures.ThreadPoolExecutor() as pool: result = await loop.run_in_executor( pool, blocking_io) print('custom thread pool', result) # 3. 在一个进程中运行一个协程函数 with concurrent.futures.ProcessPoolExecutor() as pools: result = await loop.run_in_executor( pools, cpu_bound) print('custom process pool', result) if __name__ == "__main__": asyncio.run(main()) |
对了如果官方的代码报错:concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending,你就复制我上面的,绝对不会出现这个问题。主要就是你没有加入:if __name__ == "__main__":
这句导致的。
超级详细解释 Python 中的 async await 概念
最新python request使用异步协程async/await详解
同时参考了官方的:
https://docs.python.org/zh-cn/3/library/asyncio-eventloop.html
https://docs.python.org/zh-cn/3/library/asyncio-future.html
https://docs.python.org/zh-cn/3/library/asyncio-task.html
Python:asyncio.wait 和 asyncio.gather 的异同
1. 异同点综述
- 相同:从功能上看,
asyncio.wait
和asyncio.gather
实现的效果是相同的,都是把所有 Task 任务结果收集起来。 - 不同:
asyncio.wait
会返回两个值:done
和pending
,done
为已完成的协程Task
,pending
为超时未完成的协程Task
,需通过future.result
调用Task
的result
;而asyncio.gather
返回的是所有已完成Task
的result
,不需要再进行调用或其他操作,就可以得到全部结果。
2. asyncio.wait 用法:
最常见的写法是:await asyncio.wait(task_list)
。
import asyncio
import arrow
def current_time():
'''
获取当前时间
:return:
'''
cur_time = arrow.now().to('Asia/Shanghai').format('YYYY-MM-DD HH:mm:ss')
return cur_time
async def func(sleep_time):
func_name_suffix = sleep_time # 使用 sleep_time(函数 I/O 等待时长)作为函数名后缀,以区分任务对象
print(f"[{current_time()}] 执行异步函数 {func.__name__}-{func_name_suffix}")
await asyncio.sleep(sleep_time)
print(f"[{current_time()}] 函数 {func.__name__}-{func_name_suffix} 执行完毕")
return f"【[{current_time()}] 得到函数 {func.__name__}-{func_name_suffix} 执行结果】"
async def run():
task_list = []
for i in range(5):
task = asyncio.create_task(async_func(i))
task_list.append(task)
done, pending = await asyncio.wait(task_list, timeout=None)
for done_task in done:
print((f"[{current_time()}] 得到执行结果 {done_task.result()}"))
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
if __name__ == '__main__':
main()
代码执行结果如下:
[2020-11-03 22:45:53] 执行异步函数 func-0
[2020-11-03 22:45:53] 执行异步函数 func-1
[2020-11-03 22:45:53] 执行异步函数 func-2
[2020-11-03 22:45:53] 执行异步函数 func-3
[2020-11-03 22:45:53] 执行异步函数 func-4
[2020-11-03 22:45:53] 函数 func-0 执行完毕
[2020-11-03 22:45:54] 函数 func-1 执行完毕
[2020-11-03 22:45:55] 函数 func-2 执行完毕
[2020-11-03 22:45:56] 函数 func-3 执行完毕
[2020-11-03 22:45:57] 函数 func-4 执行完毕
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:57] 得到函数 func-4 执行结果】
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:55] 得到函数 func-2 执行结果】
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:53] 得到函数 func-0 执行结果】
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:56] 得到函数 func-3 执行结果】
[2020-11-03 22:45:57] 得到执行结果 【[2020-11-03 22:45:54] 得到函数 func-1 执行结果】
3. asyncio.gather 用法:
最常见的用法是:await asyncio.gather(*task_list)
,注意这里 task_list
前面有一个 *
。
import asyncio
import arrow
def current_time():
'''
获取当前时间
:return:
'''
cur_time = arrow.now().to('Asia/Shanghai').format('YYYY-MM-DD HH:mm:ss')
return cur_time
async def func(sleep_time):
func_name_suffix = sleep_time # 使用 sleep_time(函数 I/O 等待时长)作为函数名后缀,以区分任务对象
print(f"[{current_time()}] 执行异步函数 {func.__name__}-{func_name_suffix}")
await asyncio.sleep(sleep_time)
print(f"[{current_time()}] 函数 {func.__name__}-{func_name_suffix} 执行完毕")
return f"【[{current_time()}] 得到函数 {func.__name__}-{func_name_suffix} 执行结果】"
async def run():
task_list = []
for i in range(5):
task = asyncio.create_task(func(i))
task_list.append(task)
results = await asyncio.gather(*task_list)
for result in results:
print((f"[{current_time()}] 得到执行结果 {result}"))
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
if __name__ == '__main__':
main()
代码执行结果如下:
[2020-11-03 22:35:54] 执行异步函数 func-0
[2020-11-03 22:35:54] 执行异步函数 func-1
[2020-11-03 22:35:54] 执行异步函数 func-2
[2020-11-03 22:35:54] 执行异步函数 func-3
[2020-11-03 22:35:54] 执行异步函数 func-4
[2020-11-03 22:35:54] 函数 func-0 执行完毕
[2020-11-03 22:35:55] 函数 func-1 执行完毕
[2020-11-03 22:35:56] 函数 func-2 执行完毕
[2020-11-03 22:35:57] 函数 func-3 执行完毕
[2020-11-03 22:35:58] 函数 func-4 执行完毕
[2020-11-03 22:35:58] 得到执行结果 【[2020-11-03 22:35:54] 得到函数 func-0 执行结果】
[2020-11-03 22:35:58] 得到执行结果 【[2020-11-03 22:35:55] 得到函数 func-1 执行结果】
[2020-11-03 22:35:58] 得到执行结果 【[2020-11-03 22:35:56] 得到函数 func-2 执行结果】
[2020-11-03 22:35:58] 得到执行结果 【[2020-11-03 22:35:57] 得到函数 func-3 执行结果】
[2020-11-03 22:35:58] 得到执行结果 【[2020-11-03 22:35:58] 得到函数 func-4 执行结果】
上面的文章来源:https://www.jianshu.com/p/6872bf356af7
布施恩德可便相知重
微信扫一扫打赏
支付宝扫一扫打赏