Python 多进程编程
本章内容主要讲解一下 Python 脚本任务处理中常用的多进程编程,这里主要使用的是 python 内置的 mutilprocessing 模块。
效果展示
这里我们首先利用一个例子来让我们看到多进程的作用。我们设置了一个 worker 方法,它负责每隔 1s 输出一次 success(实际应用中是一个处理或者多个处理过程)!假设我们这时候需要执行这个操作两次或者更多次,为了充分利用系统资源,减少时间开销,就需要我们使用多线程或者多进程来进行处理。
from multiprocessing import Processimport timedef worker (num ): for i in range (3 ): print (f"worker_{num} :success_{i} " ) time.sleep(1 ) if __name__ == "__main__" : start = time.time() worker_1 = Process(target=worker, args=[1 ]) worker_2 = Process(target=worker, args=[2 ]) worker_1.start() worker_2.start() worker_1.join() worker_2.join() end = time.time() print (end-start)
根据输出结果我们能看到,当使用多进程时,整个流程处理完所花时间为 3.39s ,而不使用时,整个流程所花时间为 6.06s ,效率几乎提升了 1 倍 ,当然这个结果因程序而异。
Process 类介绍
Process 是我们使用 python 时负责进程创建、运行的一个类,其主要属性及方法如下:
属性名或方法名
功能
target(func)
指定需要实例化为进程的函数;
name(Str)
可以指定进程名;也可调用属性获取进程名;
daemon(Bool)
是否使用 daemon,类似于守护进程;
pid(Int)
可调用属性获取进程名;
args
为进程函数提供的参数;
start()
**可体现多进程。**进程启动,等待 CPU 的切片时间进行调用;
run()
**未体现多进程。**是一个进程体,进程运行,直到进程运行完毕;
join()
进程合并,父进程要在调用 join 的子进程结束后才能结束。在多进程执行过程,其他进程必须等到调用 join() 方法的进程执行完毕(或者执行规定的 timeout 时间)后,才能继续执行;
is_alive()
判断该进程是否存活。
terminate()
中断该进程。
这里有几个需要注意的小细节。
args:当我们提供给 args 参数如果只有一个的话,这时候如果我们使用中括号,例如 args = ["name"]
是没问题的;但当我们使用小括号时,例如 args=("name",)
,这时候我们是不能省略逗号的,否则会报错。
start() 和 run():这两个方法是有区别的,正如表格中描述,当我们需要使用多进程时,一定要使用 start(),这个会使得我们的进程启动,而后等待 CPU 时间片的调度;如果使用了 run(),他将作为一个进程体,直到运行完成后,才会进行下面代码的执行。这里举个例子来进行讲解。
from multiprocessing import Process import timedef download (): for i in range (3 ): print ("download------" , i) time.sleep(1 ) def show (): for i in range (3 ): print ("show------" , i) time.sleep(1 ) if __name__ == "__main__" : shows = Process(target = show) downloads = Process(target = download) downloads.start() shows.start()
当我们用上述代码分别运行 run() 和 start() 时,我们能够发现,run 是运行完一个后才进行另一个实例的运行,而 start 是同时实例化,并且在后台等时间片操作,同步运行。
进程创建
一个进程的创建主要有两种创建方法。(1)使用内置类;(2)继承自定义类;
我们只需要传入一个需要运行的方法作为一个进程去跑就可以。
from multiprocessing import Processdef testFunc (name ): print (f"process_{name} " , "is running" ) if __name__ == "__main__" : process_list = [] for i in range (5 ): p = Process(target=testFunc, args=(str (i),)) p.start() process_list.append(p) for i in process_list: i.join() print ("end" )
往往继承原生的 Process 类,接着在该类中重新 __init__
和 run
两个方法。__init__
方法是对象初始化函数,往往用于我们传递参数;run
是我们自定义类实例化后,进程对象使用 start()
时自调用的方法(即进程执行方法)。
from multiprocessing import Processclass MyProcess (Process ): def __init__ (self,name ): super (MyProcess,self).__init__() self.name = name def run (self ): print (f"process_{self.name} " , "is running" ) if __name__ == '__main__' : process_list = [] for i in range (5 ): p = MyProcess(str (i)) p.start() process_list.append(p) for i in process_list: i.join() print ("end" )
进程通信
Queue
所谓 Queue,就是队列的意思。本质上就是利用一个安全队列来进行多个进程间的数据传递,主要使用 put()
和 get()
两个方法。这里两个方法都有两个参数 block
和 timeout
。当 block
为 False 时,直接写入或读取数据,当读取时队列为空,直接抛出 Empty 异常,当写入时队列已满,直接抛出 Full 异常;当 block
为 True 并且 timeout
为正数时,读取和写入直接进行,当存在队列已满(写入)或者为空(读取),会阻塞 Timeout 时长,超时后再抛出异常。此外其他方法还包括 empty()
判断是否为空、 qsize()
返回大致大小、full()
判断是否为满等。
Queue 返回一个队列对象,负责在吞吐信息。其他类型还包括 SimpleQueue 、LifoQueue 、PriorityQueue 。具体用法如下:
from multiprocessing import Process, Queueclass MyProcess (Process ): def __init__ (self, queue, name ): super (MyProcess,self).__init__() self.name = name self.queue = queue def run (self ): print (f"process_{self.name} " , "is running" ) if self.name == "1" : self.queue.put('data 1' ) elif self.name == "2" : print (self.queue.get(block=True , timeout=1 )) if __name__ == '__main__' : queue = Queue() process_list = [] p_1 = MyProcess(queue, "1" ) p_2 = MyProcess(queue, "2" ) p_1.start() p_2.start() p_1.join() p_2.join() queue.get(block=True , timeout=1 ) print ("end" )
输出结果:发现队列完成输入输出;设置时长后,队列长时间没有数据,队列会抛出队列为空的异常。
Pipe
所谓 Pipe,就是管道的意思。本质上就是利用管道数据进行数据传递,而不是数据共享。与 socket 很像,需要使用 send()
和 recv()
函数来进行数据的传递。
Pipe 返回两个连接对象,分别作为管道的两端,两端都可以收发消息。具体用法如下:
from multiprocessing import Process, Pipeclass MyProcess (Process ): def __init__ (self, con, name ): super (MyProcess,self).__init__() self.name = name self.con = con def run (self ): print (f"process_{self.name} " , "is running" ) if self.name == "1" : self.con.send("test for pipe!" ) elif self.name == "2" : print (self.con.recv()) if __name__ == '__main__' : con1, con2 = Pipe() process_list = [] p_1 = MyProcess(con1, "1" ) p_2 = MyProcess(con2, "2" ) p_1.start() p_2.start() p_1.join() p_2.join()
输出结果:进程 1 传入数据,进程 2 成功接收并打印。
Managers
前面两个操作都实现了数据传输,但未实现数据共享,Manager 就是来实现该功能的,使得一个进程可以修改另一进程的数据。Manager 支持的数据类型包括 list(常用), dict(常用), Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value和 Array。
Manager 返回管理器对象控制一个服务进程,以供多个进程共同访问。具体用法如下:
from multiprocessing import Process, Managerclass MyProcess (Process ): def __init__ (self, dict , name ): super (MyProcess,self).__init__() self.name = name self.dict = dict def run (self ): print (f"process_{self.name} " , "is running" ) self.dict [f"name_{self.name} " ] = self.name if __name__ == '__main__' : mgr = Manager() dict = mgr.dict () process_list = [] p_1 = MyProcess(dict , "1" ) p_2 = MyProcess(dict , "2" ) p_1.start() p_2.start() p_1.join() p_2.join() print (dict ) print ("end" )
输出结果:进程 1 完成对数据的添加,进程 2 修改数据,并添加其他信息。
进程互斥
锁 Lock
锁,顾名思义就是对共享资源的一种访问限制,是上一篇文章所举例到的车厢厕所(只能有一个用户使用)的问题。
Lock 返回一个锁对象,用来限制对共享资源的访问,此外还有 RLock(递归锁,会在多线程中介绍)。具体用法如下:
from multiprocessing import Lock, Processimport timedef worker_no_lock (f, name ): fs = open (f, 'a+' ) n = 10 while n > 1 : fs.write(f"write by {name} \n" ) time.sleep(0.5 ) n -= 1 fs.close() def worker_with_lock (lock, f, name ): with lock: fs = open (f, 'a+' ) n = 10 while n > 1 : fs.write(f"write(with lock) by {name} \n" ) time.sleep(0.5 ) n -= 1 fs.close() def worker_with_lock_2 (lock, f ): lock.acquire() try : fs = open (f, 'a+' ) n = 10 while n > 1 : fs.write(f"write(with lock) by {name} \n" ) time.sleep(0.5 ) n -= 1 fs.close() finally : lock.release() if __name__ == "__main__" : lock = Lock() f = "file.txt" w_1 = Process(target = worker_with_lock, args=(lock, f, "1" )) w_2 = Process(target = worker_with_lock, args=(lock, f, "2" )) w_1.start() w_2.start() w_1.join() w_2.join() print ("end" )
输出结果:可以看到如果不加锁对文件进行多线程书写时,会因为访问冲突原因导致数据不完全(只有 2 进程写入);而加锁后由于同一时间实际只有一个进程在书写,因此文件正常。
信号量 semaphore
信号量简单来说就是对共享资源的访问数量,是对 Lock 的另一种表现形式,也是就上一篇文章所提到的餐厅里最多容纳多少个人吃饭的问题。
Semaphore 返回一个内部计数器,用来记录当前剩余可访问量,此外还有 BoundedSemaphore(有界信号量)。具体用法如下:
from multiprocessing import Semaphore, Processimport timedef worker (s ): s.acquire() print (multiprocessing.current_process().name + "acquire" ); time.sleep(0.5 ) print (multiprocessing.current_process().name + "release" ); s.release() if __name__ == "__main__" : s = Semaphore(3 ) p_list = [] for i in range (5 ): p = Process(target = worker, args=(s,)) p.start() p_list.append(p) for i in p_list: i.join() print ("end" )
输出结果:可以看到每个最开始有 3 个进程访问了,于是其他进程不能再访问,只能等到有进程结束访问才可以继续有别的进程进行访问。
进程池
前面我们创建多进程时往往要写多个初始化,而进程池 Pool 可以帮助我们解决这个问题,一步创建多个进程。其常用方法如下表:
方法名
功能
apply()
阻塞(相当于单进程,应该废弃使用)
apply_async()
非阻塞
terminate()
立刻关闭进程池
join()
主进程等待所有子进程执行完毕。在 close 和 terminate 之后使用。
close()
等待所有进程结束后,关闭进程池
Pool 返回一个进程池对象,负责管理一定数目的进程运行,有进程运行完毕后,添加新进程进入。具体代码如下:
from multiprocessing import Poolimport timedef func (msg ): print ("msg:" , msg) time.sleep(3 ) print ("end" ) if __name__ == "__main__" : pool = Pool(processes = 3 ) start = time.time() for i in range (4 ): msg = "hello %d" %(i) pool.apply(func, (msg, )) print ("pool test" ) pool.close() pool.join() print (f"Sub-process(es) done. Total time:{time.time() - start} " )
输出结果:可以看到非阻塞的子进程是同时运行,有进程运行完毕后,新进程加入进程池,运行时间短;阻塞的子进程是一个一个运行完毕,同样有进程运行完毕后,新进程加入进程池,运行时间长。