进程池与线程池

一直不太明白进程与线程的具体实现,近日看到这篇文章,算是有了进一步的了解。

关于线程池:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

import time
from concurrent.futures import ThreadPoolExecutor

# 线程池

def return_future_result(message):
time.sleep(2)

return message

pool = ThreadPoolExecutor(max_workers=2) # 创建一个最大可以容纳两个task的线程池
future1 = pool.submit(return_future_result, ('hello')) # 向线程池加一个task
future2 = pool.submit(return_future_result, ('world')) # 向线程池加一个task
print(future1.done()) # 判断task1是否结束[sleeping...故线程未完成]
time.sleep(3) # 主线程sleep 3s使得task2得以完成
print(future2.done()) # 判断task2是否结束
# 查看线程结果
print(future1.result())
print(future2.result())

输出:

False
True
hello
world

可以看到当前程序(Pools.py)有一个进程,三个线程被开启:
UID PID PPID LWP C NLWP STIME TTY TIME CMD

关于进程池:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import time
from concurrent.futures import ProcessPoolExecutor

def return_future_result(message):
time.sleep(2)
return message

pool = ProcessPoolExecutor(max_workers=2) # 创建一个最大可以容纳两个task的进程池
future1 = pool.submit(return_future_result, ('hello')) # 向进程池加一个task
future2 = pool.submit(return_future_result, ('world')) # 向进程池加一个task
print(future1.done()) # 判断task1是否结束
time.sleep(3)
print(future2.done()) # 判断task2是否结束
# 查看线程结果
print(future1.result())
print(future2.result())

输出;

False
True
hello
world

关于任务的提交:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

import urllib.request

URLS = ['http://httpbin.org', ' http://example.com/', 'http://api.github.com/']
def load_url(url):
with urllib.request.urlopen(url, timeout=60) as conn:
return conn.read()

# 普通操作
# We can use a with statement t ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url): url for url in URLS}
print(future_to_url) # 打印当前任务状态
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
# print(future_to_url) # 打印当前任务状态
try:
data = future.result()
except Exception as exc:
print('%r generated an exception:%s'%(url, exc))
else:
print('%r page is %d bytes'%(url ,len(data)))


这里我们通过submit提交了3个task,并且max_workers=3,所以在全部3个任务提交后,全是running状态, 如果max_workers大于3的话,同样都是running的。但是如果将max_workers设置的小于加入的任务数,那么

将会有任务进入pending状态。
如设置max_workers=2, 可以看到最后的任务为pending状态。

Map的使用:

[实现的任务和上面时一样的]

1
2
3
4
5
6
7
8
9
10
11
12
13
14

import concurrent.futures
import urllib.request

URLS = ['http://httpbin.org', ' http://example.com/', 'http://api.github.com/']
def load_url(url):
with urllib.request.urlopen(url, timeout=60) as conn:
return conn.read()

# Map
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
for url, data in zip(URLS, executor.map(load_url, URLS)):
print('%r page os %d bytes' % (url, len(data)))

wait 使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# wait
import time
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_random_secs(num):
t = randint(1, 5)
print('I will sleep:', str(t), 's')
sleep(t)
return 'Return of {}'.format(num)

if __name__=='__main__':
s = time.time()
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
futures.append(pool.submit(return_after_random_secs, (x)))

print(wait(futures)) # 阻塞直到所有线程完成
# print(wait(futures, timeout=None, return_when='FIRST_COMPLETED'))
e = time.time()

print('total time-->', e-s)

可以看到,在子线程未完成时,程序一直被阻塞,最后所有的任务都完成时,程序结束运行。
如果将:

print(wait(futures))

改为:

print(wait(futures, timeout=None, return_when=’FIRST_COMPLETED’))

那么,在有一个线程完成时,程序就结束运行:

击蒙御寇