1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
| #!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
# 为什么需要线程池
# 1.主线程中可以获取某一个线程的状态或者某一个任务的状态,以及返回值
# 2.当一个线程完成时,主线程能够立即知道
# 3.futures可以让多线程和多进程编码接口一致
# 导入相应的模块
from concurrent.futures import ThreadPoolExecutor
import time
def get_sleep(name, t):
time.sleep(t)
print(f"{name}睡了{t}秒")
# 创建一个一定容量的线程池
# max_workers=3表示池子里最多可以容纳三个线程
executor = ThreadPoolExecutor(max_workers=3)
# 往池子里添加任务
# 第一个是函数名,注意不要加括号,否则变成调用了
# 然后依次写参数
executor.submit(get_sleep, "satori", 4)
executor.submit(get_sleep, "mashiro", 3)
executor.submit(get_sleep, "miku", 2)
# 注意:submit不会阻塞,submit相当于开启了一个线程
# 然后主线程会立即往下执行
print("i love satori") # 因此这句话会最先被打印出来
# 程序运行结果
'''
i love satori
miku睡了2秒
mashiro睡了3秒
satori睡了4秒
''' |
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
# 为什么需要线程池
# 1.主线程中可以获取某一个线程的状态或者某一个任务的状态,以及返回值
# 2.当一个线程完成时,主线程能够立即知道
# 3.futures可以让多线程和多进程编码接口一致
# 导入相应的模块
from concurrent.futures import ThreadPoolExecutor
import time
def get_sleep(name, t):
time.sleep(t)
print(f"{name}睡了{t}秒")
# 创建一个一定容量的线程池
# max_workers=3表示池子里最多可以容纳三个线程
executor = ThreadPoolExecutor(max_workers=3)
# 往池子里添加任务
# 第一个是函数名,注意不要加括号,否则变成调用了
# 然后依次写参数
executor.submit(get_sleep, "satori", 4)
executor.submit(get_sleep, "mashiro", 3)
executor.submit(get_sleep, "miku", 2)
# 注意:submit不会阻塞,submit相当于开启了一个线程
# 然后主线程会立即往下执行
print("i love satori") # 因此这句话会最先被打印出来
# 程序运行结果
'''
i love satori
miku睡了2秒
mashiro睡了3秒
satori睡了4秒
'''
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
| #!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
from concurrent.futures import ThreadPoolExecutor
import time
def get_sleep(name, t):
time.sleep(t)
print(f"{name}睡了{t}秒")
return f"{name}--{t}秒"
executor = ThreadPoolExecutor(max_workers=3)
# 我们submit函数是具有返回值的,比方说我们赋值为task
# 那么task1,task2,task3可以获取对应线程的执行状态
task1 = executor.submit(get_sleep, "satori", 4)
task2 = executor.submit(get_sleep, "mashiro", 3)
task3 = executor.submit(get_sleep, "miku", 2)
# task.done()表示任务是否完成
print(task1.done())
print(task2.done())
print(task3.done())
# 我们等五秒,因此上面的任务肯定已经全部执行完毕
# 再打印状态
print("-------等待五秒钟-------")
time.sleep(5)
print(task1.done())
print(task2.done())
print(task3.done())
# 当然我们也可以获取任务的返回值
print(task1.result())
print(task2.result())
print(task3.result())
# 程序运行结果
'''
False
False
False
-------等待五秒钟-------
miku睡了2秒
mashiro睡了3秒
satori睡了4秒
True
True
True
satori--4秒
mashiro--3秒
miku--2秒
'''
# 首先主线程在添加完任务之后,会立刻执行task.done(),此时三个任务还没有执行完毕,因此打印三个False
# 打印等待五秒钟
# 主线程等待五秒钟之后,三个任务已经执行完毕,并且会打印各自的内容。
# 执行task.done(),由于此时三个任务执行完毕,因此打印三个True
# 然后通过task.result()会得到任务的返回值 |
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
from concurrent.futures import ThreadPoolExecutor
import time
def get_sleep(name, t):
time.sleep(t)
print(f"{name}睡了{t}秒")
return f"{name}--{t}秒"
executor = ThreadPoolExecutor(max_workers=3)
# 我们submit函数是具有返回值的,比方说我们赋值为task
# 那么task1,task2,task3可以获取对应线程的执行状态
task1 = executor.submit(get_sleep, "satori", 4)
task2 = executor.submit(get_sleep, "mashiro", 3)
task3 = executor.submit(get_sleep, "miku", 2)
# task.done()表示任务是否完成
print(task1.done())
print(task2.done())
print(task3.done())
# 我们等五秒,因此上面的任务肯定已经全部执行完毕
# 再打印状态
print("-------等待五秒钟-------")
time.sleep(5)
print(task1.done())
print(task2.done())
print(task3.done())
# 当然我们也可以获取任务的返回值
print(task1.result())
print(task2.result())
print(task3.result())
# 程序运行结果
'''
False
False
False
-------等待五秒钟-------
miku睡了2秒
mashiro睡了3秒
satori睡了4秒
True
True
True
satori--4秒
mashiro--3秒
miku--2秒
'''
# 首先主线程在添加完任务之后,会立刻执行task.done(),此时三个任务还没有执行完毕,因此打印三个False
# 打印等待五秒钟
# 主线程等待五秒钟之后,三个任务已经执行完毕,并且会打印各自的内容。
# 执行task.done(),由于此时三个任务执行完毕,因此打印三个True
# 然后通过task.result()会得到任务的返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| #!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
# 关于task.result()方法,这个方法是阻塞的
# 只有在获取到函数的返回值之后才会执行,那么此时任务也已经执行完毕
from concurrent.futures import ThreadPoolExecutor
import time
def get_sleep(t):
print(f"{t}")
time.sleep(t)
return f"我睡了{t}秒"
executor = ThreadPoolExecutor(max_workers=3)
task1 = executor.submit(get_sleep, 3)
task2 = executor.submit(get_sleep, 4)
task3 = executor.submit(get_sleep, 1)
task2.result()
print("主线程执行完毕··········")
'''
3
4
1
主线程执行完毕··········
'''
# 可以看到先打印3,4,1,然后等待大概四秒钟,打印"主线程执行完毕··········"
# task.result()方法是会阻塞的,其实也很好理解,task.result()是为了获取任务的返回值,如果任务都还没有执行完,那么当然会卡住 |
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
# 关于task.result()方法,这个方法是阻塞的
# 只有在获取到函数的返回值之后才会执行,那么此时任务也已经执行完毕
from concurrent.futures import ThreadPoolExecutor
import time
def get_sleep(t):
print(f"{t}")
time.sleep(t)
return f"我睡了{t}秒"
executor = ThreadPoolExecutor(max_workers=3)
task1 = executor.submit(get_sleep, 3)
task2 = executor.submit(get_sleep, 4)
task3 = executor.submit(get_sleep, 1)
task2.result()
print("主线程执行完毕··········")
'''
3
4
1
主线程执行完毕··········
'''
# 可以看到先打印3,4,1,然后等待大概四秒钟,打印"主线程执行完毕··········"
# task.result()方法是会阻塞的,其实也很好理解,task.result()是为了获取任务的返回值,如果任务都还没有执行完,那么当然会卡住
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| #!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
# 一般我们要确保所有的任务都执行完毕,才选择让主线程往下走
from concurrent.futures import ThreadPoolExecutor
import time
def get_sleep(t):
print(f"{t}")
time.sleep(t)
return f"我睡了{t}秒"
executor = ThreadPoolExecutor(max_workers=3)
task1 = executor.submit(get_sleep, 3)
task2 = executor.submit(get_sleep, 4)
task3 = executor.submit(get_sleep, 1)
# 将所有的task添加到一个列表中
all_task = [task1, task2, task3]
for task in all_task:
print(task.result())
print("主线程执行完毕···")
'''
4
我睡了3秒
我睡了4秒
我睡了1秒
主线程执行完毕···
'''
# 首先打印3,4,1很好理解,但是为什么先是"我睡了3秒",难道不应该是"我睡了1秒"吗?
# 关于task.result()的返回顺序问题,是按照添加任务的顺序返回的
# 先执行的是task1.result(),所以必须要先等待三秒
# 再执行task2.result(),由于已经过了三秒,因此再等待一秒
# 最后执行task3.result(),此时task3早已经执行完毕,然后直接打印"我睡了1秒"
# 因此task.result()的返回顺序,是由任务的添加顺序决定的 |
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
# 一般我们要确保所有的任务都执行完毕,才选择让主线程往下走
from concurrent.futures import ThreadPoolExecutor
import time
def get_sleep(t):
print(f"{t}")
time.sleep(t)
return f"我睡了{t}秒"
executor = ThreadPoolExecutor(max_workers=3)
task1 = executor.submit(get_sleep, 3)
task2 = executor.submit(get_sleep, 4)
task3 = executor.submit(get_sleep, 1)
# 将所有的task添加到一个列表中
all_task = [task1, task2, task3]
for task in all_task:
print(task.result())
print("主线程执行完毕···")
'''
4
我睡了3秒
我睡了4秒
我睡了1秒
主线程执行完毕···
'''
# 首先打印3,4,1很好理解,但是为什么先是"我睡了3秒",难道不应该是"我睡了1秒"吗?
# 关于task.result()的返回顺序问题,是按照添加任务的顺序返回的
# 先执行的是task1.result(),所以必须要先等待三秒
# 再执行task2.result(),由于已经过了三秒,因此再等待一秒
# 最后执行task3.result(),此时task3早已经执行完毕,然后直接打印"我睡了1秒"
# 因此task.result()的返回顺序,是由任务的添加顺序决定的
1
2
3
4
5
6
7
8
9
10
11
12
| # 比如,我换一种方式添加
all_task = [task3, task2, task1]
for task in all_task:
print(task.result())
'''
我睡了1秒
我睡了4秒
我睡了3秒
'''
# 返回结果也验证了我们上述的判断 |
# 比如,我换一种方式添加
all_task = [task3, task2, task1]
for task in all_task:
print(task.result())
'''
我睡了1秒
我睡了4秒
我睡了3秒
'''
# 返回结果也验证了我们上述的判断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| #!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
# 如何取消一个任务,注意任务一旦开始执行那么便不能被取消了
from concurrent.futures import ThreadPoolExecutor
import time
def get_sleep(t):
print(f"{t}")
time.sleep(t)
return f"我睡了{t}秒"
executor = ThreadPoolExecutor(max_workers=3)
task1 = executor.submit(get_sleep, 3)
task2 = executor.submit(get_sleep, 4)
task3 = executor.submit(get_sleep, 1)
# task.cancel()表示取消一个函数,返回值是一个布尔类型。
# 通过True或者False,来判断是否取消成功
print(task3.cancel())
'''
4
False
'''
# 可以看到返回结果为False,因为程序已经执行了 |
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
# 如何取消一个任务,注意任务一旦开始执行那么便不能被取消了
from concurrent.futures import ThreadPoolExecutor
import time
def get_sleep(t):
print(f"{t}")
time.sleep(t)
return f"我睡了{t}秒"
executor = ThreadPoolExecutor(max_workers=3)
task1 = executor.submit(get_sleep, 3)
task2 = executor.submit(get_sleep, 4)
task3 = executor.submit(get_sleep, 1)
# task.cancel()表示取消一个函数,返回值是一个布尔类型。
# 通过True或者False,来判断是否取消成功
print(task3.cancel())
'''
4
False
'''
# 可以看到返回结果为False,因为程序已经执行了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| #!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
# 取消任务,只有任务在还没有执行的时候
from concurrent.futures import ThreadPoolExecutor
import time
def get_sleep(t):
print(f"{t}")
time.sleep(t)
return f"我睡了{t}秒"
# 这次我们只添加两个任务,首先task3肯定被添加到了任务队列里
# 但由于最大工作数是2,因此暂时是不会执行的,只有当某个其他的任务执行完毕,才会被执行
# 那么此时就可以取消了,因为task3还没有被执行
executor = ThreadPoolExecutor(max_workers=2)
task1 = executor.submit(get_sleep, 3)
task2 = executor.submit(get_sleep, 4)
task3 = executor.submit(get_sleep, 1)
print(task3.cancel())
'''
4
True
'''
# 可以看到返回结果True,说明取消成功了,而且task3的任务也没有执行 |
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
# 取消任务,只有任务在还没有执行的时候
from concurrent.futures import ThreadPoolExecutor
import time
def get_sleep(t):
print(f"{t}")
time.sleep(t)
return f"我睡了{t}秒"
# 这次我们只添加两个任务,首先task3肯定被添加到了任务队列里
# 但由于最大工作数是2,因此暂时是不会执行的,只有当某个其他的任务执行完毕,才会被执行
# 那么此时就可以取消了,因为task3还没有被执行
executor = ThreadPoolExecutor(max_workers=2)
task1 = executor.submit(get_sleep, 3)
task2 = executor.submit(get_sleep, 4)
task3 = executor.submit(get_sleep, 1)
print(task3.cancel())
'''
4
True
'''
# 可以看到返回结果True,说明取消成功了,而且task3的任务也没有执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
| #!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
# 如何获取已完成的任务的返回值
# 这里使用as_complete函数
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def get_sleep(name, t):
time.sleep(t)
return f"{name}睡了{t}秒"
executor = ThreadPoolExecutor(max_workers=3)
task1 = executor.submit(get_sleep, "mashiro", 3)
task2 = executor.submit(get_sleep, "satori", 4)
task3 = executor.submit(get_sleep, "miku", 1)
all_task = [task1, task2, task3]
# 会异步检测有哪些任务完成
# 既然这样的话,那么我们可以推测,返回值应该和添加顺序无关,哪个先完成,哪个就先被返回
for task in as_completed(all_task):
print(task.result())
'''
miku睡了1秒
mashiro睡了3秒
satori睡了4秒
'''
# 因此我们分析的是正确的,as_complete函数会不断地检测有哪些任务完成
# 那么as_complete是如何做到这一点的呢?
# 我们可以看一下源码
# as_complete源码
'''
def as_completed(fs, timeout=None):
"""An iterator over the given futures that yields each as it completes.
Args:
fs: The sequence of Futures (possibly created by different Executors) to
iterate over.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
Returns:
An iterator that yields the given Futures as they complete (finished or
cancelled). If any given Futures are duplicated, they will be returned
once.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
"""
if timeout is not None:
end_time = timeout + time.time()
fs = set(fs)
total_futures = len(fs)
with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
finished = list(finished)
try:
yield from _yield_finished_futures(finished, waiter,
ref_collect=(fs,))
while pending:
if timeout is None:
wait_timeout = None
else:
wait_timeout = end_time - time.time()
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
len(pending), total_futures))
waiter.event.wait(wait_timeout)
with waiter.lock:
finished = waiter.finished_futures
waiter.finished_futures = []
waiter.event.clear()
# reverse to keep finishing order
finished.reverse()
yield from _yield_finished_futures(finished, waiter,
ref_collect=(fs, pending))
finally:
# Remove waiter from unfinished futures
for f in fs:
with f._condition:
f._waiters.remove(waiter)
'''
# 可以看到函数当中出现了yield from,说明这个as_complete是一个生成器
# yield from 出现了两次
# 第一次:有可能主线程当中遇到了阻塞,在执行到as_complete函数的时候,已经有一部分任务执行完毕,所以第一个yield from将那些已经结束的任务yield出去
# 第二次:然后对于那些没有完成的任务,就不断地循环检测它们的状态,如果有完成的任务,那么继续yield出去
# 因此异步的特点就在于此,当任务完成之后,我们主线程会立刻感受到,从而获取已经完成的任务的返回值 |
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
# 如何获取已完成的任务的返回值
# 这里使用as_complete函数
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def get_sleep(name, t):
time.sleep(t)
return f"{name}睡了{t}秒"
executor = ThreadPoolExecutor(max_workers=3)
task1 = executor.submit(get_sleep, "mashiro", 3)
task2 = executor.submit(get_sleep, "satori", 4)
task3 = executor.submit(get_sleep, "miku", 1)
all_task = [task1, task2, task3]
# 会异步检测有哪些任务完成
# 既然这样的话,那么我们可以推测,返回值应该和添加顺序无关,哪个先完成,哪个就先被返回
for task in as_completed(all_task):
print(task.result())
'''
miku睡了1秒
mashiro睡了3秒
satori睡了4秒
'''
# 因此我们分析的是正确的,as_complete函数会不断地检测有哪些任务完成
# 那么as_complete是如何做到这一点的呢?
# 我们可以看一下源码
# as_complete源码
'''
def as_completed(fs, timeout=None):
"""An iterator over the given futures that yields each as it completes.
Args:
fs: The sequence of Futures (possibly created by different Executors) to
iterate over.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
Returns:
An iterator that yields the given Futures as they complete (finished or
cancelled). If any given Futures are duplicated, they will be returned
once.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
"""
if timeout is not None:
end_time = timeout + time.time()
fs = set(fs)
total_futures = len(fs)
with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
finished = list(finished)
try:
yield from _yield_finished_futures(finished, waiter,
ref_collect=(fs,))
while pending:
if timeout is None:
wait_timeout = None
else:
wait_timeout = end_time - time.time()
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
len(pending), total_futures))
waiter.event.wait(wait_timeout)
with waiter.lock:
finished = waiter.finished_futures
waiter.finished_futures = []
waiter.event.clear()
# reverse to keep finishing order
finished.reverse()
yield from _yield_finished_futures(finished, waiter,
ref_collect=(fs, pending))
finally:
# Remove waiter from unfinished futures
for f in fs:
with f._condition:
f._waiters.remove(waiter)
'''
# 可以看到函数当中出现了yield from,说明这个as_complete是一个生成器
# yield from 出现了两次
# 第一次:有可能主线程当中遇到了阻塞,在执行到as_complete函数的时候,已经有一部分任务执行完毕,所以第一个yield from将那些已经结束的任务yield出去
# 第二次:然后对于那些没有完成的任务,就不断地循环检测它们的状态,如果有完成的任务,那么继续yield出去
# 因此异步的特点就在于此,当任务完成之后,我们主线程会立刻感受到,从而获取已经完成的任务的返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
| #!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
# 我们也可以通过executor获取返回结果
# 可以用到executor下的map方法
from concurrent.futures import ThreadPoolExecutor
import time
def get_sleep(t):
time.sleep(t)
return f"睡了{t}秒"
executor = ThreadPoolExecutor(max_workers=3)
# 首先这里的map和python内置的map是比较类似的
# map里面传入函数,后面是任务的参数,作为一个可迭代对象
# 这样就可以自动执行了,不需要submit,map函数是位于executor下的一个方法
for data in executor.map(get_sleep, [3, 4, 1]):
print(data)
# 程序运行结果
'''
睡了3秒
睡了4秒
睡了1秒
'''
# 可以看到返回值的顺序和你添加任务的顺序保持一致
# map函数不像as_complete一样,具有异步检测机制,可以先返回已经完成的任务
# 并且也不用再使用result()方法了,返回的直接就是任务的返回值
# 至于原因我们也可以看一下源码
'''
def map(self, fn, *iterables, timeout=None, chunksize=1):
"""Returns an iterator equivalent to map(fn, iter).
Args:
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
chunksize: The size of the chunks the iterable will be broken into
before being passed to a child process. This argument is only
used by ProcessPoolExecutor; it is ignored by
ThreadPoolExecutor.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
if timeout is not None:
end_time = timeout + time.time()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
try:
# reverse to keep finishing order
fs.reverse()
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield fs.pop().result()
else:
yield fs.pop().result(end_time - time.time())
finally:
for future in fs:
future.cancel()
return result_iterator()
'''
# 可以到在yield的时候,并没有yield出来task,而是直接将task.result()给yield出来了
# 因此循环得到的就是task的返回值
'''
fs = [self.submit(fn, *args) for args in zip(*iterables)]
'''
# 而且从这句源码我们也可以看到,当调用map的时候,本质上调用的还是executor.submit,因为self就相当于我们这里的executor |
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
# 我们也可以通过executor获取返回结果
# 可以用到executor下的map方法
from concurrent.futures import ThreadPoolExecutor
import time
def get_sleep(t):
time.sleep(t)
return f"睡了{t}秒"
executor = ThreadPoolExecutor(max_workers=3)
# 首先这里的map和python内置的map是比较类似的
# map里面传入函数,后面是任务的参数,作为一个可迭代对象
# 这样就可以自动执行了,不需要submit,map函数是位于executor下的一个方法
for data in executor.map(get_sleep, [3, 4, 1]):
print(data)
# 程序运行结果
'''
睡了3秒
睡了4秒
睡了1秒
'''
# 可以看到返回值的顺序和你添加任务的顺序保持一致
# map函数不像as_complete一样,具有异步检测机制,可以先返回已经完成的任务
# 并且也不用再使用result()方法了,返回的直接就是任务的返回值
# 至于原因我们也可以看一下源码
'''
def map(self, fn, *iterables, timeout=None, chunksize=1):
"""Returns an iterator equivalent to map(fn, iter).
Args:
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
chunksize: The size of the chunks the iterable will be broken into
before being passed to a child process. This argument is only
used by ProcessPoolExecutor; it is ignored by
ThreadPoolExecutor.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
if timeout is not None:
end_time = timeout + time.time()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
try:
# reverse to keep finishing order
fs.reverse()
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield fs.pop().result()
else:
yield fs.pop().result(end_time - time.time())
finally:
for future in fs:
future.cancel()
return result_iterator()
'''
# 可以到在yield的时候,并没有yield出来task,而是直接将task.result()给yield出来了
# 因此循环得到的就是task的返回值
'''
fs = [self.submit(fn, *args) for args in zip(*iterables)]
'''
# 而且从这句源码我们也可以看到,当调用map的时候,本质上调用的还是executor.submit,因为self就相当于我们这里的executor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
| #!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
# 关于map方法,再补充一点
# 有个地方,个人觉得非常恶心(只是本人这样觉得)
from concurrent.futures import ThreadPoolExecutor
import time
# 之前的例子是一个参数,那么如果改成两个,map函数那里如何传参呢?
def get_sleep(name, t):
time.sleep(t)
return f"{name}睡了{t}秒"
executor = ThreadPoolExecutor(max_workers=3)
'''
def map(self, fn, *iterables, timeout=None, chunksize=1):
'''
# 通过源码,得到map函数定义如下。可以看到接收很多参数,但参数都是可迭代的类型
# 于是我们想到了 executor.map(get_sleep,["mashiro",3],["satori",4],["miku",1])
# 但是这样的话就完了。我们可以看看源码对我们传入的参数是怎么处理的
'''
fs = [self.submit(fn, *args) for args in zip(*iterables)]
'''
# 可以看到源码干了这么一件事
# 如果我们像之前那样传参的话,举个栗子
'''
def mmp(*iterable):
for args in zip(*iterable):
print(*args)
mmp(["mashiro", 3], ["satori", 4], ["miku", 1])
'mashiro', 'satori', 'miku'
3, 4, 1
'''
# 那么此时self.submit(fn,*args) ===>self.submit(fn, "mashiro", "satori", "miku")
# 这与我们传参完全搞反了
# 因此,当我们要传入多个参数的是,应该这样传,executor.map(get_sleep,["mashiro","satori","miku"],[3,4,1])
for data in executor.map(get_sleep, ["mashiro", "satori", "miku"], [3, 4, 1]):
print(data)
# 程序运行结果
'''
mashiro睡了3秒
satori睡了4秒
miku睡了1秒
'''
# 这样程序便可正确执行
# 关于传参的方式,我个人的话可能会这么设计
'''
def mmp(*iterable):
for args in iterable:
print(*args)
mmp(["mashiro", 3], ["satori", 4], ["miku", 1])
输出结果:
mashiro 3
satori 4
miku 1
''' |
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
# 关于map方法,再补充一点
# 有个地方,个人觉得非常恶心(只是本人这样觉得)
from concurrent.futures import ThreadPoolExecutor
import time
# 之前的例子是一个参数,那么如果改成两个,map函数那里如何传参呢?
def get_sleep(name, t):
time.sleep(t)
return f"{name}睡了{t}秒"
executor = ThreadPoolExecutor(max_workers=3)
'''
def map(self, fn, *iterables, timeout=None, chunksize=1):
'''
# 通过源码,得到map函数定义如下。可以看到接收很多参数,但参数都是可迭代的类型
# 于是我们想到了 executor.map(get_sleep,["mashiro",3],["satori",4],["miku",1])
# 但是这样的话就完了。我们可以看看源码对我们传入的参数是怎么处理的
'''
fs = [self.submit(fn, *args) for args in zip(*iterables)]
'''
# 可以看到源码干了这么一件事
# 如果我们像之前那样传参的话,举个栗子
'''
def mmp(*iterable):
for args in zip(*iterable):
print(*args)
mmp(["mashiro", 3], ["satori", 4], ["miku", 1])
'mashiro', 'satori', 'miku'
3, 4, 1
'''
# 那么此时self.submit(fn,*args) ===>self.submit(fn, "mashiro", "satori", "miku")
# 这与我们传参完全搞反了
# 因此,当我们要传入多个参数的是,应该这样传,executor.map(get_sleep,["mashiro","satori","miku"],[3,4,1])
for data in executor.map(get_sleep, ["mashiro", "satori", "miku"], [3, 4, 1]):
print(data)
# 程序运行结果
'''
mashiro睡了3秒
satori睡了4秒
miku睡了1秒
'''
# 这样程序便可正确执行
# 关于传参的方式,我个人的话可能会这么设计
'''
def mmp(*iterable):
for args in iterable:
print(*args)
mmp(["mashiro", 3], ["satori", 4], ["miku", 1])
输出结果:
mashiro 3
satori 4
miku 1
'''
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| #!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
# 让主线程等待
# 可以使用wait方法
from concurrent.futures import ThreadPoolExecutor, wait
import time
def get_sleep(name, t):
print(f"{name}睡完{t}秒了,主线程你走了吗?")
time.sleep(t)
return f"{name}睡了{t}秒"
executor = ThreadPoolExecutor(max_workers=3)
task1 = executor.submit(get_sleep, "mashiro", 3)
task2 = executor.submit(get_sleep, "satori", 4)
task3 = executor.submit(get_sleep, "miku", 1)
all_task = [task1, task2, task3]
# wait(all_task),会使主线程卡在这里,只有等待所有任务完成才会往下走
wait(all_task) # 里面有个return_when参数,默认是ALL_COMPLETE,会等待所有任务完成。也可以指定FIRST_COMPLETE,等待第一个任务完成就往下走
print("你们都睡完了,我才能往下走···")
# 程序运行结果
'''
mashiro睡完3秒了,主线程你走了吗?
satori睡完4秒了,主线程你走了吗?
miku睡完1秒了,主线程你走了吗?
你们都睡完了,我才能往下走···
''' |
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# author:love_cat
# 让主线程等待
# 可以使用wait方法
from concurrent.futures import ThreadPoolExecutor, wait
import time
def get_sleep(name, t):
print(f"{name}睡完{t}秒了,主线程你走了吗?")
time.sleep(t)
return f"{name}睡了{t}秒"
executor = ThreadPoolExecutor(max_workers=3)
task1 = executor.submit(get_sleep, "mashiro", 3)
task2 = executor.submit(get_sleep, "satori", 4)
task3 = executor.submit(get_sleep, "miku", 1)
all_task = [task1, task2, task3]
# wait(all_task),会使主线程卡在这里,只有等待所有任务完成才会往下走
wait(all_task) # 里面有个return_when参数,默认是ALL_COMPLETE,会等待所有任务完成。也可以指定FIRST_COMPLETE,等待第一个任务完成就往下走
print("你们都睡完了,我才能往下走···")
# 程序运行结果
'''
mashiro睡完3秒了,主线程你走了吗?
satori睡完4秒了,主线程你走了吗?
miku睡完1秒了,主线程你走了吗?
你们都睡完了,我才能往下走···
'''
文章来源:https://www.cnblogs.com/traditional/p/9221269.html
布施恩德可便相知重
微信扫一扫打赏
支付宝扫一扫打赏