Python 多线程编程

本章内容主要讲解一下 Python 脚本任务处理中常用的多线程编程,这里主要使用的是 python 内置的 threading 模块。本章内容与多进程处理方式大致相同,可能主要集中在库和函数不同,此外线程是可以共享全局变量(global)的,这部分会在后面与多进程进行对比。

至于为什么选择多线程和多进程编程,以及两种的区别和适用场景我已经在之前的一篇文章中介绍过了,大家可以看一下,讲的有问题的地方大家可以留言呢。

Thread 类介绍

Thread 是我们使用 python 时负责线程创建、运行的一个类,其主要属性及方法如下(基本与 Process 相同):

属性名或方法名 功能
target(func) 指定需要实例化为线程的函数;
name(Str) 可以指定名;也可调用属性获取线程名;
daemon(Bool) 是否使用 daemon,类似于守护进程;
pid(Int) 可调用属性获取线程名;
args 为线程函数提供的参数;
start() **可体现多线程。**线程启动,等待 CPU 的切片时间进行调用;
run() **未体现多线程。**是一个线程体,线程运行,直到线程运行完毕;
join() 线程合并,父进程要在调用 join 的子线程结束后才能结束。在多线程执行过程,其他线程必须等到调用 join() 方法的线程执行完毕(或者执行规定的 timeout 时间)后,才能继续执行;
is_alive() 判断该线程是否存活。
terminate() 中断该线程。

线程创建

一个线程的创建同样有两种创建方法。(1)使用内置类;(2)继承自定义类;

  • 内置类

我们只需要传入一个需要运行的方法作为进程的一个线程去跑就可以。

# 内置类
from threading import Thread

def testFunc(name):
print(f"threading_{name}", "is running")

if __name__ == "__main__":
process_list = []
for i in range(5):
t = Thread(target=testFunc, args=(str(i),))
t.start()
process_list.append(t)

for i in process_list:
i.join()

print("end")
  • 自定义类

往往继承原生的 Thread 类,接着在该类中重新 __init__run 两个方法。__init__ 方法是对象初始化函数,往往用于我们传递参数;run 是我们自定义类实例化后,线程对象使用 start() 时自调用的方法(即线程执行方法)。

# 自定义类
from threading import Thread

class MyThread(Thread):
def __init__(self,name):
super(MyThread,self).__init__()
self.name = name

def run(self):
print(f"threading_{self.name} is running")

if __name__ == '__main__':
process_list = []
for i in range(5):
t = MyThread(str(i))
t.start()
process_list.append(t)

for i in process_list:
i.join()

print("end")

结果可以看到两种初始化输出的效果是相同的。

1

线程通信

由于多线程和多进程的通信原理基本相同,因此这里就不过多介绍了,这里主要展示一下多线程和多进程在全局变量上共享的区别(即不同火车和不同车厢共享资源)。

global 全局变量,我们在一个进程里面可以设置全局变量,而不同进程之间是不能直接修改该变量的,然而对于多线程来说,他们运行在同一进程下,因此可以访问并修改该变量,以下代码可以帮助测试。

from multiprocessing import Lock, Process
num = 5
def worker(lock):
with lock:
global num
print(num)
n = 0
while n < 5:
n += 1
num += 1
print(num)

if __name__ == "__main__":
lock = Lock()
w_1 = Process(target = worker, args=(lock,))
w_2 = Process(target = worker, args=(lock,))
# 不注释时使用线程,注释时使用进程
# from threading import Lock, Thread
# lock = Lock()
# w_1 = Thread(target = worker, args=(lock,))
# w_2 = Thread(target = worker, args=(lock,))
w_1.start()
w_2.start()
w_1.join()
w_2.join()
print("end")

输出结果:可以看到当我们使用多线程时,内部声明的 global 将该进程当中的 num 作为全局变量引入子线程,此时子线程对 num 的操作被记录;当使用多进程时,子进程对主进程中的 num 修改时,并不会影响到该变量。

2

线程互斥

可重入锁(递归锁) RLock

锁 Lock,在线程中和在进程中的作用是基本相同的,同样是对资源的访问限制。而之前已经介绍了 Lock 的使用,这里作为补充介绍一下 RLock 的特性和适用场景。

  • 可以多次加锁,并且只能由加锁的线程(进程)解锁;
  • 递归及类似操作(当使用递归时,如果使用普通锁,加锁后未解锁就进入下一步,这时候就会导致死锁,因此就需要循环锁来多次加锁、解锁);

RLock 返回一个循环锁对象,用来限制对共享资源的访问。具体用法如下:

from threading import Lock, RLock

# 使用 lock = Lock() 会发生死锁无法运行
lock = RLock()

def factorial(n):
assert n > 0
if n == 1:
return 1

with lock:
out = n * factorial(n - 1)

return out

print(factorial(3))
# print 6

上述例子可以看出 RLock 的作用,此外我们还可以通过另一个例子来查看关于加锁释放的主体问题。

from threading import Lock, Thread, RLock
import time

def t1():
lock.acquire()
time.sleep(3)
try:
lock.release()
except:
print("error in t1")

def t2():
try:
lock.release()
print('lock released')
except:
print("error in t2")


if __name__ == "__main__":

lock = Lock()
# lock = RLock() # 使用 RLock

t1 = Thread(target = t1)
t2 = Thread(target = t2)

t1.start()
t2.start()

t1.join()
t2.join()

输出结果:两个线程功能不同,第一个线程申请一个锁,沉默 3 秒,然后解锁,第二个线程解锁;(1)使用 Lock 时,第一个线程加锁后,第二个线程启动并解锁,第一个线程沉默完毕后进行解锁时,输出不能释放一个没有加锁的 Lock;(2)使用 RLock 时,第一个线程加锁后,第二个线程尝试解锁,会发生无法解锁问题(不是线程 2 的锁);

3

其他

守护进程

守护进程与 join() 效果刚好相反,如果我们需要在主线程结束时终止子线程(不论是否执行完毕),我们可以使用setDaemon(bool)函数。它的作用是设置子线程是否随主线程一起结束,必须在 start() 之前调用,默认设置为 False

from threading import Thread
import time

def func(msg):
print("msg:", msg)
time.sleep(3)
print("thread_end")

if __name__ == "__main__":
t = Thread(target = func, args=["start"])
# t.setDaemon(1) # 添加守护进程
t.start()
print("end")

输出结果:当开启守护进程时,主进程结束,线程结束;不开启时,主进程结束,线程继续直至结束。

4

定时器

定时器,顾名思义就是为线程开始定时的装置。如果我们需要规定多长时间后启动该线程,我们就可以使用一个 Timer

Timer 返回一个定时器对象,指定时间启动线程。具体用法如下:

from threading import Thread, Timer
import time
def show():
print("Python")

if __name__ == "__main__":
print("start")
print(time.time())
# 指定一秒钟之后执行 show 函数
t = Timer(1, show)
t.start()
t.join()
print(time.time())
print("end")

输出结果:可以看到该线程执行约 1s(正常一个普通的 print 非常快),是完成计时等待之后才执行的。

5

以上就是我对多线程和多进程的一些理解和总结,如果后面发现一些常用的也会添加新的文章呢。