A-A+

多线程,多进程,线程池,进程池学习与实践

2019年03月23日 11:04 汪洋大海 暂无评论 阅读 65 views 次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# coding:utf-8
"""
    多线程,多进程,线程池,进程池学习与实践
"""
import urllib2
 
def worker(url):
    resp = urllib2.urlopen(url)
    print resp.url
    data = resp.readline()
    resp.close()
    return data
 
def threadBase():
    '创建线程'
    import threading
    # 定义线程列表, 参数详细见 Thread 的 __init__ 方法
    # 注意: args=(arg,): 加逗号表示参数只有arg一个; 不加逗号 如果arg可拆分则将arg拆成数组作为一堆参数传入(如string会被拆成一堆char,当成len个参数传入)
    threads = [threading.Thread(target=worker,args=(arg,)) for arg in ["http://www.zhihu.com","http://www.baidu.com"]]
    for thread in threads:
        # 启动线程
        thread.start()
    for thread in threads:
        # t.join()方法阻塞调用此方法的线程(calling thread), 直到线程t完. 此线程再继续; 通常用于在main()主线程内, 等待其它线程完成再结束main()主线程
        thread.join()
 
def threadPoolDemo():
    '线程池示例'
    import threadpool
    # 创建线程数
    pool = threadpool.ThreadPool(2)
    # 添加任务, 参数为: 开启多线程的函数, 相关参数, 回调函数
    requests = threadpool.makeRequests(worker,["https://www.zhihu.com","http://www.baidu.com"])
    # 将任务放到线程池去执行
    [pool.putRequest(req) for req in requests]
    # 主线程等待所有线程完成任务 
    pool.wait()
 
def multiProcess():
    '多进程示例'
    import multiprocessing
    # 获取CPU计数
    cpu_cnt = multiprocessing.cpu_count()
    results = []
    # 初始化进程池
    pool = multiprocessing.Pool(cpu_cnt)
    for url in ["http://www.zhihu.com","http://www.baidu.com"]:
        # 添加任务
        # apply: 顺序执行子进程(主进程等待), 可以直接获取执行结果而不需要等待. (内部实现: 调用 apply_async().get())
        # apply_async: 异步执行, 进程执行完毕后(即 join() 后)才可以通过get()方法获取返回值
        result = pool.apply_async(worker,(url,))
        print result
        # 保存进程返回的 结果(class对象,使用 get() 获取)
        results.append(result)
    # 关闭进程池, 即当前线程池不会再放入新的任务, 且一旦所有任务完成, 工作进程将退出
    pool.close()
    # 进程合并. 使用方法类似Java里的join. 不过Python中需要在 join() 前调用 close() 或 terminate()
    pool.join()
    print [results[index].get() for index in range(0,2)]
 
def futuresDemo():
    'concurrent.futures: https://docs.python.org/3/library/concurrent.futures.html'
    import concurrent.futures
    # 线程池
    # with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    # 进程池
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        # submit(fn,*args,**kwargs): 将fn方法安排为可执行, 返回Future对象
        results = {executor.submit(worker,url) for url in ["http://www.zhihu.com","http://www.baidu.com"]}
        # as_completed(fs,timeout): 当线程/进程执行完毕时,返回由fs给出的Future实例的迭代器
        for result in concurrent.futures.as_completed(results):
            # result(): 返回进程方法执行的结果. 如果进程没有完成, 如果没有超时就等待, 如果超时则 raise concurrent.futures.TimeoutError.
            print result.result()
 
def futuresDemo_Map():
    'concurrent.futures: map版本'
    import concurrent.futures
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        # map(func, *iterables,chunksize): chunksize 表示初始数据块大小(3.5以后版本支持),ThreadPoolExecutor设置chunksize无效
        # 返回值: 以参数传入的顺序返回执行结果的迭代器, as_completed()是乱序返回的(内部调用的submit执行任务,但是返回时按参数传入顺序排序)
        for result in executor.map(worker,[url for url in ["http://www.zhihu.com","http://www.baidu.com"]]):
            print result
 
if __name__ == "__main__":
    # threadBase()
    # threadPoolDemo()
    multiProcess()
    # futuresDemo()
    # futuresDemo_Map()

文章来源:https://github.com/everywan/note/blob/1d64df574db7b101be487c28c807f69d57883ade/draft/Lib/ThreadAndProcess.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from concurrent import futures
from chapter_17.demo_17_2 import save_flag, get_flag, show, main
 
'''
concurrent.futures 模块的主要特色是 ThreadPoolExecutor 和 ProcessPoolExecutor 类,
这两个类实现的接口能分别在不同的线程或进程中执行可调用的对象。
这两个类在内部维护着一个工作线程或进程池,以及要执行的任务队列。
不过,这个接口抽象的层级很高,像下载国旗这种简单的案例,无需关心任何实现细节。
示例 17-3 展示如何使用 ThreadPoolExecutor.map 方法,以最简单的方式实现并发下载。
示例 17-3 flags_threadpool.py:使用 futures.ThreadPoolExecutor 类实现多线程下载的脚本
'''
# 设定 ThreadPoolExecutor 类最多使用几个线程
MAX_WORKERS = 20
 
 
# 下载一个图像的函数;这是在各个线程中执行的函数
def download_one(cc):
	image = get_flag(cc)
	show(cc)
	save_flag(image, cc.lower() + '.gif')
	return cc
 
 
def download_many(cc_list):
	# 设定工作的线程数量:使用允许的最大值(MAX_WORKERS)与要处理的数量之间较小的那个值,以免创建多余的线程
	workers = min(MAX_WORKERS, len(cc_list))
	# 使 用 工 作 的 线 程 数 实 例 化 ThreadPoolExecutor 类;
	# executor.__exit__ 方 法 会 调 用executor.shutdown(wait=True) 方法,它会在所有线程都执行完毕前阻塞线程
	with futures.ThreadPoolExecutor(workers) as executor:
		# map 方法的作用与内置的 map 函数类似,不过 download_one 函数会在多个线程中并发调用;
		# map 方法返回一个生成器,因此可以迭代,获取各个函数返回的值
		res = executor.map(download_one, sorted(cc_list))
	# 返回获取的结果数量;如果有线程抛出异常,异常会在这里抛出,这与隐式调用 next() 函数从迭代器中获取相应的返回值一样
	return len(list(res))
 
 
def download_many(cc_list):
	# 这次演示只使用人口最多的 5 个国家
	cc_list = cc_list[:5]
	# 把 max_workers 硬编码为 3,以便在输出中观察待完成的期物
	with futures.ThreadPoolExecutor(max_workers=3) as executor:
		to_do = []
		# 按照字母表顺序迭代国家代码,明确表明输出的顺序与输入一致
		for cc in sorted(cc_list):
			# executor.submit 方法排定可调用对象的执行时间,然后返回一个期物,表示这个待执行的操作
			future = executor.submit(download_one, cc)
			# 存储各个期物,后面传给 as_completed 函数
			to_do.append(future)
			msg = 'Scheduled for {}: {}'
			# 显示一个消息,包含国家代码和对应的期物
			print(msg.format(cc, future))
		results = []
		# as_completed 函数在期物运行结束后产出期物
		for future in futures.as_completed(to_do):
			# 获取该期物的结果
			res = future.result()
			msg = '{} result: {!r}'
			# 显示期物及其结果
			print(msg.format(future, res))
			results.append(res)
		return len(results)
 
 
if __name__ == '__main__':
	main(download_many)

文章来源:https://github.com/linsanityHuang/the_fluent_python/blob/f5a0e91a522c5028364942d4012f644a6663ab96/chapter_17/demo_17_3.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# coding=utf-8
 
"""  concurrent
Python并发之concurrent.futures
Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码。从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持。 concurrent.futures基础模块是executor和future。
Executor
Executor是一个抽象类,它不能被直接使用。它为具体的异步执行定义了一些基本的方法。 ThreadPoolExecutor和ProcessPoolExecutor继承了Executor,分别被用来创建线程池和进程池的代码。
ThreadPoolExecutor对象
ThreadPoolExecutor类是Executor子类,使用线程池执行异步调用。
class concurrent.futures.ThreadPoolExecutor(max_workers)
使用max_workers数目的线程池执行异步调用。
ProcessPoolExecutor对象
ThreadPoolExecutor类是Executor子类,使用进程池执行异步调用.
class concurrent.futures.ProcessPoolExecutor(max_workers=None)
使用max_workers数目的进程池执行异步调用,如果max_workers为None则使用机器的处理器数目(如4核机器max_worker配置为None时,则使用4个进程进行异步并发)。
submit()方法
Executor中定义了submit()方法,这个方法的作用是提交一个可执行的回调task,并返回一个future实例。future对象代表的就是给定的调用。
Executor.submit(fn, *args, **kwargs)
fn:需要异步执行的函数
*args, **kwargs:fn参数
使用示例:
from concurrent import futures
def test(num):
    import time
    return time.ctime(), num
with futures.ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(test, 1)
    print(future.result())
"""
 
"""  map()方法
除了submit,Exectuor还为我们提供了map方法,这个方法返回一个map(func, *iterables)迭代器,迭代器中的回调执行返回的结果有序的。
Executor.map(func, *iterables, timeout=None)
func:需要异步执行的函数
*iterables:可迭代对象,如列表等。每一次func执行,都会从iterables中取参数。
timeout:设置每次异步操作的超时时间,timeout的值可以是int或float,如果操作超时,会返回raisesTimeoutError;如果不指定timeout参数,则不设置超时间。
使用示例:
from concurrent import futures
def test(num):
    import time
    return time.ctime(), num
data = [1, 2, 3]
with futures.ThreadPoolExecutor(max_workers=1) as executor:
    for future in executor.map(test, data):
        print(future)
"""
 
""" shutdown()
shutdown()方法
释放系统资源,在Executor.submit()或 Executor.map()等异步操作后调用。使用with语句可以避免显式调用此方法。
Executor.shutdown(wait=True)
"""
 
""" Future
Future可以理解为一个在未来完成的操作,这是异步编程的基础。通常情况下,我们执行io操作,访问url时(如下)在等待结果返回之前会产生阻塞,cpu不能做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。
Future类封装了可调用的异步执行。Future 实例通过 Executor.submit()方法创建。
cancel():试图取消调用。如果调用当前正在执行,并且不能被取消,那么该方法将返回False,否则调用将被取消,方法将返回True。
cancelled():如果成功取消调用,返回True。
running():如果调用当前正在执行并且不能被取消,返回True。
done():如果调用成功地取消或结束了,返回True。
result(timeout=None):返回调用返回的值。如果调用还没有完成,那么这个方法将等待超时秒。如果调用在超时秒内没有完成,那么就会有一个Futures.TimeoutError将报出。timeout可以是一个整形或者浮点型数值,如果timeout不指定或者为None,等待时间无限。如果futures在完成之前被取消了,那么 CancelledError 将会报出。
exception(timeout=None):返回调用抛出的异常,如果调用还未完成,该方法会等待timeout指定的时长,如果该时长后调用还未完成,就会报出超时错误futures.TimeoutError。timeout可以是一个整形或者浮点型数值,如果timeout不指定或者为None,等待时间无限。如果futures在完成之前被取消了,那么 CancelledError 将会报出。如果调用完成并且无异常报出,返回None.
add_done_callback(fn):将可调用fn捆绑到future上,当Future被取消或者结束运行,fn作为future的唯一参数将会被调用。如果future已经运行完成或者取消,fn将会被立即调用。
wait(fs, timeout=None, return_when=ALL_COMPLETED)
等待fs提供的 Future 实例(possibly created by different Executor instances) 运行结束。返回一个命名的2元集合,分表代表已完成的和未完成的
return_when 表明什么时候函数应该返回。它的值必须是一下值之一:
FIRST_COMPLETED :函数在任何future结束或者取消的时候返回。
FIRST_EXCEPTION :函数在任何future因为异常结束的时候返回,如果没有future报错,效果等于
ALL_COMPLETED :函数在所有future结束后才会返回。
as_completed(fs, timeout=None): 参数是一个 Future 实例列表,返回值是一个迭代器,在运行结束后产出 Future实例 。
使用示例:
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_5_secs(num):
    sleep(randint(1, 5))
    return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_5_secs, x))
print(1)
for x in as_completed(futures):
    print(x.result())
print(2)

文章来源:https://github.com/xiang12835/python-learning/blob/227e99734d8a13bd2c9dc86f398b4415c3b9f9b6/process_and_thread/concurrent.py

布施恩德可便相知重

微信扫一扫打赏

支付宝扫一扫打赏

×
标签:

给我留言