Python的多线程(threading) 与多进程(multiprocessing)

Python的多线程实际上并不能真正利用多核,所以如果使用多线程实际上还是在一个核上做并发处理。

不过,如果使用多进程就可以真正利用多核,因为各进程之间是相互独立的,不共享资源,可以在不同的核上执行不同的进程,达到并行的效果。

multiprocessing模块[^1]

multiprocessing是Python中的多进程管理包。它与 threading.Thread类似,可以利用multiprocessing.Process对象来创建一个进程。

该进程可以允许放在Python程序内部编写的函数中。该Process对象与Thread对象的用法相同,拥有is_alive()join([timeout])run()start()terminate()等方法。属性有:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。

此外multiprocessing包中也有Lock/Event/Semaphore/Condition类,用来同步进程,其用法也与threading包中的同名类一样。

Process类

1
__init__(self, group=None, target=None, name=None, args=(), kwargs={})
  • group:进程所属组。基本不用
  • target:表示调用对象。
  • args:表示调用对象的位置参数元组。
  • name:别名
  • kwargs:表示调用对象的字典。

例:创建进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#coding=utf-8
import multiprocessing

def do(n) :
#获取当前进程的名字
name = multiprocessing.current_process().name
print name,'starting'
print "worker ", n
return

if __name__ == '__main__' :
numList = []
for i in xrange(5) : # 多个子进程
p = multiprocessing.Process(target=do, args=(i,)) # 创建一个子进程p,目标函数是do,args是参数列表
numList.append(p)
p.start() # 开始执行进程
p.join() # 等待子进程结束
print "Process end."

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Process-1 starting
worker 0
Process end.
Process-2 starting
worker 1
Process end.
Process-3 starting
worker 2
Process end.
Process-4 starting
worker 3
Process end.
Process-5 starting
worker 4
Process end.

在Windows上要想使用进程模块,就必须把有关进程的代码写在当前.py文件的if __name__ == __main__ :语句的下面,才能正常使用Windows下的进程模块。Unix/Linux下则不需要。

Pool类

Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。

apply()

1
apply(func[, args=()[, kwds={}]])

传递不定参数,主进程会被阻塞知道函数执行完毕。

apply_async()

1
apply_async(func[, args=()[, kwds={}[, callback=None]]])

类似apply(),但为非阻塞且支持结果返回回调。

map()

1
map(func, iterable[, chunksize=None])

使进程阻塞直到返回结果。 

虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。

close()

关闭进程池,不再接受新任务。

terminate()

结束工作进程,不再处理数据。

join()

主进程阻塞等待子进程退出,必须在close或terminate后使用。

例1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import time
from multiprocessing import Pool
def run(fn):
#fn: 函数参数是数据列表的一个元素
time.sleep(1)
return fn*fn

if __name__ == "__main__":
testFL = [1,2,3,4,5,6]
print 'shunxu:' #顺序执行(也就是串行执行,单进程)
s = time.time()
for fn in testFL:
run(fn)

e1 = time.time()
print "顺序执行时间:", int(e1 - s)

print 'concurrent:' #创建多个进程,并行执行
pool = Pool(5) #创建拥有5个进程数量的进程池
#testFL:要处理的数据列表,run:处理testFL列表中数据的函数
rl =pool.map(run, testFL)
pool.close()#关闭进程池,不再接受新的进程
pool.join()#主进程阻塞等待子进程的退出
e2 = time.time()
print "并行执行时间:", int(e2-e1)
print rl

执行结果

1
2
3
4
5
shunxu:
顺序执行时间: 6
concurrent:
并行执行时间: 2
[1, 4, 9, 16, 25, 36]

例2:Pool.map()多参数任务[^2]

1
2
3
4
5
6
7
8
9
10
def job(x ,y):
return x * y

def job1(z):
return job(z[0], z[1])

if __name__ == "__main__":
pool = multiprocessing.Pool()
res = pool.map(job1, [(2, 3), (3, 4)])
print res

实例

例1:文件字符数处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import os
import time
from multiprocessing import Pool

def getFile(path) :
#获取目录下的文件list
fileList = []
for root, dirs, files in list(os.walk(path)) :
for i in files :
if i.endswith('.txt') or i.endswith('.10w') :
fileList.append(root + "\\" + i)
return fileList

def operFile(filePath) :
#统计每个文件中行数和字符数,并返回
filePath = filePath
fp = open(filePath)
content = fp.readlines()
fp.close()
lines = len(content)
alphaNum = 0
for i in content :
alphaNum += len(i.strip('\n'))
return lines,alphaNum,filePath

def out(list1, writeFilePath) :
#将统计结果写入结果文件中
fileLines = 0
charNum = 0
fp = open(writeFilePath,'a')
for i in list1 :
fp.write(i[2] + " 行数:"+ str(i[0]) + " 字符数:"+str(i[1]) + "\n")
fileLines += i[0]
charNum += i[1]
fp.close()
print fileLines, charNum

if __name__ == "__main__":
#创建多个进程去统计目录中所有文件的行数和字符数
startTime = time.time()
filePath = "C:\\wcx\\a"
fileList = getFile(filePath)
pool = Pool(5)
resultList =pool.map(operFile, fileList)
pool.close()
pool.join()

writeFilePath = "c:\\wcx\\res.txt"
print resultList
out(resultList, writeFilePath)
endTime = time.time()
print "used time is ", endTime - startTime

例2:DnCnn-keras-master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def gen_patches(file_name):

# read image
img = cv2.imread(file_name, 0) # gray scale
h, w = img.shape
scales = [1, 0.9, 0.8, 0.7]
patches = []

for s in scales:
h_scaled, w_scaled = int(h*s),int(w*s)
img_scaled = cv2.resize(img, (h_scaled,w_scaled), interpolation=cv2.INTER_CUBIC)
# extract patches
for i in range(0, h_scaled-patch_size+1, stride):
for j in range(0, w_scaled-patch_size+1, stride):
x = img_scaled[i:i+patch_size, j:j+patch_size]
# data aug
for k in range(0, aug_times):
#x_aug = data_aug(x, mode=np.random.randint(0,8))
x_aug = data_aug(x, mode=0)
patches.append(x_aug)
return patches

if __name__ == '__main__':
# parameters
src_dir = './data/Train400/'
save_dir = './data/npy_data/'
file_list = glob.glob(src_dir+'*.png') # get name list of all .png files
num_threads = 16
print('Start...')
# initrialize
res = []
# generate patches
for i in range(0,len(file_list),num_threads):
# use multi-process to speed up
p = Pool(num_threads)
patch = p.map(gen_patches,file_list[i:min(i+num_threads,len(file_list))])
#patch = p.map(gen_patches,file_list[i:i+num_threads])
for x in patch:
res += x
print('Picture '+str(i)+' to '+str(i+num_threads)+' are finished...')

[^1]: Python 多进程 multiprocessing.Pool类详解
[^2]: 记录python multiprocessing Pool的map和apply_async方法
[^3]: Python 线程(threading) 进程(multiprocessing)