Python 多进程编程

本章内容主要讲解一下 Python 脚本任务处理中常用的多进程编程,这里主要使用的是 python 内置的 mutilprocessing 模块。

效果展示

这里我们首先利用一个例子来让我们看到多进程的作用。我们设置了一个 worker 方法,它负责每隔 1s 输出一次 success(实际应用中是一个处理或者多个处理过程)!假设我们这时候需要执行这个操作两次或者更多次,为了充分利用系统资源,减少时间开销,就需要我们使用多线程或者多进程来进行处理。

from multiprocessing import Process
import time

def worker(num):
for i in range(3):
print(f"worker_{num}:success_{i}")
time.sleep(1)

if __name__ == "__main__":
start = time.time()
# 正常流程
# worker(1)
# worker(2)
# 多进程流程
worker_1 = Process(target=worker, args=[1])
worker_2 = Process(target=worker, args=[2])
worker_1.start()
worker_2.start()
worker_1.join()
worker_2.join()

end = time.time()
print(end-start)

1

根据输出结果我们能看到,当使用多进程时,整个流程处理完所花时间为 3.39s,而不使用时,整个流程所花时间为 6.06s效率几乎提升了 1 倍,当然这个结果因程序而异。

Process 类介绍

Process 是我们使用 python 时负责进程创建、运行的一个类,其主要属性及方法如下:

属性名或方法名 功能
target(func) 指定需要实例化为进程的函数;
name(Str) 可以指定进程名;也可调用属性获取进程名;
daemon(Bool) 是否使用 daemon,类似于守护进程;
pid(Int) 可调用属性获取进程名;
args 为进程函数提供的参数;
start() **可体现多进程。**进程启动,等待 CPU 的切片时间进行调用;
run() **未体现多进程。**是一个进程体,进程运行,直到进程运行完毕;
join() 进程合并,父进程要在调用 join 的子进程结束后才能结束。在多进程执行过程,其他进程必须等到调用 join() 方法的进程执行完毕(或者执行规定的 timeout 时间)后,才能继续执行;
is_alive() 判断该进程是否存活。
terminate() 中断该进程。

这里有几个需要注意的小细节。

  • args:当我们提供给 args 参数如果只有一个的话,这时候如果我们使用中括号,例如 args = ["name"] 是没问题的;但当我们使用小括号时,例如 args=("name",) ,这时候我们是不能省略逗号的,否则会报错。
  • start() 和 run():这两个方法是有区别的,正如表格中描述,当我们需要使用多进程时,一定要使用 start(),这个会使得我们的进程启动,而后等待 CPU 时间片的调度;如果使用了 run(),他将作为一个进程体,直到运行完成后,才会进行下面代码的执行。这里举个例子来进行讲解。
from multiprocessing import Process  
import time
def download():
for i in range(3):
print("download------", i)
time.sleep(1)

def show():
for i in range(3):
print("show------", i)
time.sleep(1)

if __name__ == "__main__":
shows = Process(target = show)
downloads = Process(target = download)
# downloads.run()
# shows.run()
downloads.start()
shows.start()

当我们用上述代码分别运行 run() 和 start() 时,我们能够发现,run 是运行完一个后才进行另一个实例的运行,而 start 是同时实例化,并且在后台等时间片操作,同步运行。

1

进程创建

一个进程的创建主要有两种创建方法。(1)使用内置类;(2)继承自定义类;

  • 内置类

我们只需要传入一个需要运行的方法作为一个进程去跑就可以。

# 内置类
from multiprocessing import Process
def testFunc(name):
print(f"process_{name}", "is running")

if __name__ == "__main__":
process_list = []
for i in range(5):
p = Process(target=testFunc, args=(str(i),))
p.start()
process_list.append(p)

for i in process_list:
i.join()

print("end")

# 输出结果
# process_0 is running
# process_2 is running
# process_1 is running
# process_3 is running
# process_4 is running
  • 自定义类

往往继承原生的 Process 类,接着在该类中重新 __init__run 两个方法。__init__ 方法是对象初始化函数,往往用于我们传递参数;run 是我们自定义类实例化后,进程对象使用 start() 时自调用的方法(即进程执行方法)。

# 自定义类
from multiprocessing import Process
class MyProcess(Process):
def __init__(self,name):
super(MyProcess,self).__init__()
self.name = name

def run(self):
print(f"process_{self.name}", "is running")

if __name__ == '__main__':
process_list = []
for i in range(5):
p = MyProcess(str(i))
p.start()
process_list.append(p)

for i in process_list:
i.join()

print("end")

# 输出结果同上

进程通信

Queue

所谓 Queue,就是队列的意思。本质上就是利用一个安全队列来进行多个进程间的数据传递,主要使用 put()get() 两个方法。这里两个方法都有两个参数 blocktimeout 。当 block 为 False 时,直接写入或读取数据,当读取时队列为空,直接抛出 Empty 异常,当写入时队列已满,直接抛出 Full 异常;当 block 为 True 并且 timeout 为正数时,读取和写入直接进行,当存在队列已满(写入)或者为空(读取),会阻塞 Timeout 时长,超时后再抛出异常。此外其他方法还包括 empty() 判断是否为空、 qsize() 返回大致大小、full() 判断是否为满等。

Queue 返回一个队列对象,负责在吞吐信息。其他类型还包括 SimpleQueueLifoQueuePriorityQueue。具体用法如下:

from multiprocessing import Process, Queue

class MyProcess(Process):
def __init__(self, queue, name):
super(MyProcess,self).__init__()
self.name = name
self.queue = queue

def run(self):
print(f"process_{self.name}", "is running")
# 1 进程放入队列
if self.name == "1":
self.queue.put('data 1')
# 2 进程取出队列并打印
elif self.name == "2":
print(self.queue.get(block=True, timeout=1))

if __name__ == '__main__':
queue = Queue()
process_list = []
p_1 = MyProcess(queue, "1")
p_2 = MyProcess(queue, "2")
p_1.start()
p_2.start()
p_1.join()
p_2.join()
# 再次取时队列为空,抛出empty异常
queue.get(block=True, timeout=1)
print("end")

输出结果:发现队列完成输入输出;设置时长后,队列长时间没有数据,队列会抛出队列为空的异常。

3

Pipe

所谓 Pipe,就是管道的意思。本质上就是利用管道数据进行数据传递,而不是数据共享。与 socket 很像,需要使用 send()recv() 函数来进行数据的传递。

Pipe 返回两个连接对象,分别作为管道的两端,两端都可以收发消息。具体用法如下:

from multiprocessing import Process, Pipe

class MyProcess(Process):
def __init__(self, con, name):
super(MyProcess,self).__init__()
self.name = name
self.con = con
def run(self):
print(f"process_{self.name}", "is running")
# 1 进程发送信息
if self.name == "1":
self.con.send("test for pipe!")
# 2 进程接收并打印信息
elif self.name == "2":
print(self.con.recv())

if __name__ == '__main__':
con1, con2 = Pipe()
process_list = []
p_1 = MyProcess(con1, "1")
p_2 = MyProcess(con2, "2")
p_1.start()
p_2.start()
p_1.join()
p_2.join()

输出结果:进程 1 传入数据,进程 2 成功接收并打印。

4

Managers

前面两个操作都实现了数据传输,但未实现数据共享,Manager 就是来实现该功能的,使得一个进程可以修改另一进程的数据。Manager 支持的数据类型包括 list(常用), dict(常用), Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value和 Array。

Manager 返回管理器对象控制一个服务进程,以供多个进程共同访问。具体用法如下:

from multiprocessing import Process, Manager

class MyProcess(Process):
def __init__(self, dict, name):
super(MyProcess,self).__init__()
self.name = name
self.dict = dict

def run(self):
print(f"process_{self.name}", "is running")
# 修改字典
self.dict[f"name_{self.name}"] = self.name

if __name__ == '__main__':
mgr = Manager()
dict = mgr.dict()
process_list = []
p_1 = MyProcess(dict, "1")
p_2 = MyProcess(dict, "2")
p_1.start()
p_2.start()
p_1.join()
p_2.join()
# 打印字典
print(dict)
print("end")

输出结果:进程 1 完成对数据的添加,进程 2 修改数据,并添加其他信息。

5

进程互斥

锁 Lock

锁,顾名思义就是对共享资源的一种访问限制,是上一篇文章所举例到的车厢厕所(只能有一个用户使用)的问题。

Lock 返回一个锁对象,用来限制对共享资源的访问,此外还有 RLock(递归锁,会在多线程中介绍)。具体用法如下:

from multiprocessing import Lock, Process
import time

# 不请求锁
def worker_no_lock(f, name):
fs = open(f, 'a+')
n = 10
while n > 1:
fs.write(f"write by {name}\n")
time.sleep(0.5)
n -= 1
fs.close()

# 使用 with 语句请求锁
def worker_with_lock(lock, f, name):
with lock:
fs = open(f, 'a+')
n = 10
while n > 1:
fs.write(f"write(with lock) by {name}\n")
time.sleep(0.5)
n -= 1
fs.close()

# 效果同上一个函数,只不过用 lock.acquire() 和 release() 请求和释放锁
def worker_with_lock_2(lock, f):
lock.acquire()
try:
fs = open(f, 'a+')
n = 10
while n > 1:
fs.write(f"write(with lock) by {name}\n")
time.sleep(0.5)
n -= 1
fs.close()
finally:
lock.release()

if __name__ == "__main__":
lock = Lock()
f = "file.txt"
w_1 = Process(target = worker_with_lock, args=(lock, f, "1"))
w_2 = Process(target = worker_with_lock, args=(lock, f, "2"))
# w_1 = Process(target = worker_no_lock, args=(f, "1"))
# w_2 = Process(target = worker_no_lock, args=(f, "2"))
# nw = Process(target = worker_no_with, args=(lock, f))
w_1.start()
w_2.start()
w_1.join()
w_2.join()

print("end")

输出结果:可以看到如果不加锁对文件进行多线程书写时,会因为访问冲突原因导致数据不完全(只有 2 进程写入);而加锁后由于同一时间实际只有一个进程在书写,因此文件正常。

6

信号量 semaphore

信号量简单来说就是对共享资源的访问数量,是对 Lock 的另一种表现形式,也是就上一篇文章所提到的餐厅里最多容纳多少个人吃饭的问题。

Semaphore 返回一个内部计数器,用来记录当前剩余可访问量,此外还有 BoundedSemaphore(有界信号量)。具体用法如下:

from multiprocessing import Semaphore, Process
import time

def worker(s):
s.acquire()
print(multiprocessing.current_process().name + "acquire");
time.sleep(0.5)
print(multiprocessing.current_process().name + "release");
s.release()

if __name__ == "__main__":
# 这里我们设置最多可以容纳 3 个进程
s = Semaphore(3)
p_list = []
for i in range(5):
p = Process(target = worker, args=(s,))
p.start()
p_list.append(p)
for i in p_list:
i.join()
print("end")

输出结果:可以看到每个最开始有 3 个进程访问了,于是其他进程不能再访问,只能等到有进程结束访问才可以继续有别的进程进行访问。

7

进程池

前面我们创建多进程时往往要写多个初始化,而进程池 Pool 可以帮助我们解决这个问题,一步创建多个进程。其常用方法如下表:

方法名 功能
apply() 阻塞(相当于单进程,应该废弃使用)
apply_async() 非阻塞
terminate() 立刻关闭进程池
join() 主进程等待所有子进程执行完毕。在 close 和 terminate 之后使用。
close() 等待所有进程结束后,关闭进程池

Pool 返回一个进程池对象,负责管理一定数目的进程运行,有进程运行完毕后,添加新进程进入。具体代码如下:

from multiprocessing import Pool
import time

def func(msg):
print("msg:", msg)
time.sleep(3)
print("end")

if __name__ == "__main__":
pool = Pool(processes = 3) # 维持执行的进程总数为 processes = 3,当一个进程执行完毕后会添加新的进程进去
start = time.time()
for i in range(4):
msg = "hello %d" %(i)
# pool.apply_async(func, (msg, )) # 非阻塞
pool.apply(func, (msg, )) # 阻塞

print("pool test")
pool.close()
pool.join() # 调用join之前,先调用close函数,否则会出错。执行完 close 后不会有新的进程加入到 pool, join 函数等待所有子进程结束
print(f"Sub-process(es) done. Total time:{time.time() - start}")

输出结果:可以看到非阻塞的子进程是同时运行,有进程运行完毕后,新进程加入进程池,运行时间短;阻塞的子进程是一个一个运行完毕,同样有进程运行完毕后,新进程加入进程池,运行时间长。

8