现代计算机CPU物理核心普遍比较多,我们在编写程序时经常会用到多线程技术来提高程序运行的效率。作为python萌新,我在掌握基本语法后就很想摆弄一下python的多线程,使用起来确实很有python的特点,代码量少、操作方便。之后断断续续写了一些多线程程序,直到今天下午逛论坛,看到很多人说python多线程机制的一些内情,我才意识到自己有多愚蠢哈哈。我把自己学到的内容系统的整理一下,以备后忘,也帮帮后来者。

1. 线程创建与管理

省流:python多线程效率堪忧,想了解这方面的去看第2小节GIL,想继续看看怎么使用的继续接着看。

1.1 创建线程

  Python提供了thread、threading等模块来进行线程的创建与管理,后者在线程管理能力上更进一步,因此我们通常使用threading模块。创建一个线程需要指定该线程执行的任务(函数名)、以及该函数需要的参数,示例代码如下所示:

from threading import Thread, current_thread

def target01(args1, args2):
	print("这里是{}".format(current_thread().name))

# 创建线程
thread01 = Thread(target=target01, args="参数", name="线程1")
# 设置守护线程【可选】
thread01.setDaemon(True)
# 启动线程
thread01.start() 

1.2 设置守护线程

  线程是程序执行的最小单位,Python在进程启动起来后,会自动创建一个主线程,之后使用多线程机制可以在此基础上进行分支,产生新的子线程。子线程启动起来后,主线程默认会等待所有线程执行完成之后再退出。但是我们可以将子线程设置为守护线程,此时主线程任务一旦完成,所有子线程将会和主线程一起结束(就算子线程没有执行完也会退出)。
  守护线程可以在线程启动之前,通过setDaemon(True)的形式进行设置,或者在创建子线程对象时,以参数的形式指定:

thread01 = Thread(target=target01, args="", name="线程1", daemon=True)

  但是需要注意,如果希望主程序不等待任何线程直接退出,只有所有的线程都被设置为守护线程才有用。

1.3 设置线程阻塞

  我们可以用join()方法使主线程陷入阻塞,以等待某个线程执行完毕。因此这也是实现线程同步的一种方式。参数 t i m e o u t timeout timeout 可以用来设置主线程陷入阻塞的时间,如果线程不是守护线程,即没有设置daemon为True,那么参数 t i m e o u t timeout timeout 是无效的,主线程会一直阻塞,直到子线程执行结束。
  测试代码如下:

import time
from threading import Thread, current_thread


def target():
    if current_thread().name == "1":
        time.sleep(5)
    else:
        time.sleep(6)
    print("线程{}已退出".format(current_thread().name))


thread01 = Thread(target=target, daemon=True, name="1")
thread02 = Thread(target=target, daemon=True, name="2")

thread01.start()
thread02.start()

print("程序因线程1陷入阻塞")
thread01.join(timeout=3)
print("程序因线程2陷入阻塞")
thread02.join(timeout=3)
print("主线程已退出")

1.4 线程间通信的方法

  我们知道,线程之间共享同一块内存。子线程虽然可以通过指定target来执行一个函数,但是这个函数的返回值是没有办法直接传回主线程的。我们使用多线程一般是用于并行执行一些其他任务,因此获取子线程的执行结果十分有必要。
  直接使用全局变量虽然可行,但是资源的并发读写会引来线程安全问题。下面给出常用的两种处理方式:

1.4.1 线程锁

   其一是可以考虑使用锁来处理,当多个线程对同一份资源进行读写操作时,我们可以通过加锁来确保数据安全。Python中给出了多种锁的实现,例如:同步锁 Lock,递归锁 RLock,条件锁 Condition,事件锁 Event,信号量锁 Semaphore,这里只给出 Lock 的使用方式,其余的大家感兴趣可以自己查阅。
   可以通过threading.lock类来创建锁对象,一旦一个线程获得一个锁,会阻塞之后所有尝试获得该锁对象的线程,直到它被重新释放。这里举一个例子,通过加锁来确保两个线程在对同一个全局变量进行读写时的数据安全:

from threading import Thread, Lock
from time import sleep

book_num = 100  # 图书馆最开始有100本图书
bookLock = Lock()


def books_return():
    global book_num
    while True:
        bookLock.acquire()
        book_num += 1
        print("归还1本,现有图书{}本".format(book_num))
        bookLock.release()
        sleep(1)  # 模拟事件发生周期


def books_lease():
    global book_num
    while True:
        bookLock.acquire()
        book_num -= 1
        print("借走1本,现有图书{}本".format(book_num))
        bookLock.release()
        sleep(2)  # 模拟事件发生周期


if __name__ == "__main__":
    thread_lease = Thread(target=books_lease)
    thread_return = Thread(target=books_return)
    thread_lease.start()
    thread_return.start()

线程锁示例程序执行结果
   从结果中可以看出,其中没有出现由于读写冲突导致的数据错误。

1.4.2 queue模块(同步队列类)

   或者,我们可以采用Python的queue模块来实现线程通信。Python中的 q u e u e queue queue 模块实现了多生产者、多消费者队列,特别适用于在多线程间安全的进行信息交换。该模块提供了4种我们可以利用的队列容器,分别 Q u e u e Queue Queue(先进先出队列)、 L i f o Q u e u e LifoQueue LifoQueue(先进后出队列)、 P r i o r t y Q u e u e PriortyQueue PriortyQueue(优先级队列)、 S i m p l e Q u e u e SimpleQueue SimpleQueue(无界的先进先出队列,简单实现,缺少Queue中的任务跟踪等高级功能)。下面我们以 Q u e u e Queue Queue 为例介绍其使用方法,其他容器请自行查阅。

Queue(maxsize=5)  # 创建一个FIFO队列,并制定队列大小,若maxsize被指定为小于等于0,则队列无限大

Queue.qsize() # 返回队列的大致大小,注意并不是确切值,所以不能被用来当做后续线程是否会被阻塞的依据

Queue.empty() # 判断队列为空是否成立,同样不能作为阻塞依据

Queue.full()  # 判断队列为满是否成立,同样不能作为阻塞依据

Queue.put(item, block=True, timeout=None) # 投放元素进入队列,block为True表示如果队列满了投放失败,将阻塞该线程,timeout可用来设置线程阻塞的时间长短(秒);
# 注意,如果block为False,如果队列为满,则将直接引发Full异常,timeout将被忽略(在外界用try处理异常即可)
Queue.put_nowait(item) # 相当于put(item, block=False)

Queue.get(block=True, timeout=False) # 从队列中取出元素,block为False而队列为空时,会引发Empty异常
Queue.get_nowait() # 相当于get(block=False)

Queue.task_done() # 每个线程使用get方法从队列中获取一个元素,该线程通过调用task_done()表示该元素已处理完成。

Queue.join() # 阻塞至队列中所有元素都被处理完成,即队列中所有元素都已被接收,且接收线程全已调用task_done()。

   下面给出一个例子,场景是3个厨师给4个客人上菜,这是对多生产者多消费者场景的模拟:

import queue
from random import choice
from threading import Thread

q = queue.Queue(maxsize=5)
dealList = ["红烧猪蹄", "卤鸡爪", "酸菜鱼", "糖醋里脊", "九转大肠", "阳春面", "烤鸭", "烧鸡", "剁椒鱼头", "酸汤肥牛", "炖羊肉"]


def cooking(chefname: str):
    for i in range(4):
        deal = choice(dealList)
        q.put(deal, block=True)
        print("厨师{}给大家带来一道:{}  ".format(chefname, deal))


def eating(custname: str):
    for i in range(3):
        deal = q.get(block=True)
        print("顾客{}吃掉了:{}  ".format(custname, deal))
        q.task_done()


if __name__ == "__main__":
    # 创建并启动厨师ABC线程,创建并启动顾客1234线程
    threadlist_chef = [Thread(target=cooking, args=chefname).start() for chefname in ["A", "B", "C"]]
    threadlist_cust = [Thread(target=eating, args=str(custname)).start() for custname in range(4)]
    # 队列阻塞,直到所有线程对每个元素都调用了task_done
    q.join()

   上述程序执行结果如下图所示:

在这里插入图片描述

1.5 杀死线程

   在一些场景下,我们可能需要杀死某个线程,但是在这之前,请仔细的考量代码的上下文环境。强制杀死线程可能会带来一些意想不到的结果,并且从程序设计来讲,这本身就是不合理的。而且,锁资源并不会因为当前线程的退出而释放,这在程序运行过程中,可能会成为典型的死锁场景。所以杀死线程之前,请一定慎重。杀死线程的方法网上有好几种,我这里给出一种我觉得比较稳妥的方式。
   前面我们提到过如何做线程通信,这里可以用全局变量给出一个flag,线程任务采用循环形式进行,每次循环都会检查该flag,外界可以通过修改这一flag来通知这一线程退出循环,结束任务,从而起到杀死线程的目的,但请注意,为了线程安全,退出前一定要释放这一线程所占用的资源。下面给出一个示例程序:

from threading import Lock, Thread
from time import sleep

flag = True
lock = Lock()


def tar():
    global flag, lock
    while True:
        lock.acquire()
        "线程任务逻辑"
        if flag is False:
            break
        lock.release()
    lock.release()


if __name__ == "__main__":
    thread = Thread(target=tar)
    thread.start()
    print("3秒后线程会被杀死")
    sleep(3)
    flag = False
    print("线程已被杀死")

   执行结果如图所示,如果需要其他的方法请自行查阅,网上有不少。

在这里插入图片描述

1.6 线程池的使用

   在程序运行过程之中,临时创建一个线程需要耗费不小的代价(包括与操作系统的交互部分),尤其是我们只对一个线程分配一个简短的任务,此时,频繁的线程创建将会严重拖垮程序的执行的效率
   因此,在这种情形下,我们可以选择采用线程池技术,即通过预先创建几个空闲线程,在需要多线程来处理任务时,将任务分配给一个处于空闲状态的线程,该线程在执行完成后,将会回归空闲状态,而不是直接销毁;而如果申请从线程池中分配一个空闲线程时,遇到所有线程均处于运行状态,则当前线程可以选择阻塞来等待线程资源的空闲。如此一来,程序对于线程的管理将会更加灵活。
   Python从3.2开始,就将线程池作为内置模块包含了进来,可以通过concurrent.futures.ThreadPoolExecutor来调用,使用方法也很简单。下面给出线程池的程序例子:

from concurrent.futures import ThreadPoolExecutor
from time import sleep

tasklist = ["任务1", "任务2", "任务3", "任务4"]


def task(taskname: str):
    sleep(5)
    print(taskname + " 已完成\n")
    return taskname + " 的执行结果"


executor = ThreadPoolExecutor(max_workers=3)  # 创建线程池(是一个ThreadPoolExecutor对象),线程数为3
future_a = executor.submit(task, tasklist[0])  # 通过submit方法向线程池提交任务,返回一个对应的Future对象
future_b = executor.submit(task, tasklist[1])
future_c = executor.submit(task, tasklist[2])
future_d = executor.submit(task, tasklist[3])  # 如果提交时,线程池中没有空余线程,则该线程会进入等待状态,主线程不会阻塞
print(future_a.result(), future_b.result())  # 通过Future对象的result()方法获取任务的返回值,若没有执行完,则会陷入阻塞

  有关于线程池的详细使用方法,我后面还会出一篇文章,大家这里没理解的可以去看一下。

2. GIL 全局解释器锁

2.1 GIL是什么?

   G I L GIL GIL G l o b a l I n t e r p r e t e r L o c k Global Interpreter Lock GlobalInterpreterLock,全局解释器锁)是CPython中采用的一种机制,它确保同一时刻只有一个线程在执行Python字节码。给整个解释器加锁使得解释器多线程运行更方便,而且开发的CPython也更易于维护,但是代价是牺牲了在多处理器上的并行性。因此,在相当多的场景中,CPython解释器下的多线程机制的性能都不尽如人意。
GIL执行模型

2.2 GIL给Python带来的影响?

  上图是David Beazley的UnderstandGIL幻灯片中的一张,用于描述GIL的执行模型。
  从这套幻灯篇的介绍中,我们可以得知,GIL本质上是条件锁与互斥锁结合的一种二值信号量类的一个实例,在程序执行过程中,一个线程通过acquire操作获得GIL,从而执行其字节码,而当其遇到IO操作时,他将会release释放掉GIL锁资源GIL这时可以被其他线程获得以执行该线程的任务而原先线程的IO操作将会同时进行
  由此我们可以看到,GIL使得Python多线程程序在计算密集的场景下,不能充分利用多核心并发的优势(因为无论机器有多少核心,并且无论有多少线程来执行计算任务,同时运行的只有1个),而在IO密集的场景下,其性能受到的影响则较小

2.3 如何绕过GIL?

  那么如何避免GIL对多线程性能带来的影响呢?

  1. 绕过CPython,使用JPython(Java实现的)等别的Python解释器

  首先,GIL是CPython解释器中的实现,在JPython等其他解释器中并没有采用,因此我们可以考虑更换解释器实现。我们现在从官网下载,或者通过Anaconda部署的解释器普遍采用的是CPython,可以通过下面的方法安装其它实现:以JPython为例,去JPython官网下载其安装包(jar包),然后用 java -jar (前提是你的电脑安装了Java环境)去执行它,最后再配置一下环境变量即可。

  2. 把关键性能代码,放到别的语言(一般是C++)中实现

  这个是常用的一种方式,很多追求执行性能的模块例如Numpy、Pytorch等都是将自身性能代码放在C语言扩展中来完成的,如何开发Python模块的C语言扩展部分,可以参考这个链接http://t.csdn.cn/4YuDO

  3. 并行改并发,在Python中,多进程有时比多线程管用

  Python程序的每个进程都有自己的GIL锁,互补干涉,因此我们也可以直接使用多线程来处理一些计算任务,Python的多线程可以使用 m u l t i p r o c e s s i n g multiprocessing multiprocessing模块来完成,示例程序如下。

from multiprocessing import Process


def task(procName: int):
    print("这是线程{}".format(procName))


if __name__ == "__main__":
    proc1 = Process(target=task, args=(1,))
    proc2 = Process(target=task, args=(2,))
    proc1.start()
    proc2.start()

2.4 使用GIL并非绝对线程安全

  前面讲了这么多,总而言之就是Python的多线程机制很奇怪,就算有多个物理核心,在任何时刻只会有一个线程在执行。那么有人就会问,那我还要给公共资源加锁干什么?
  各位记住,Python的GIL只是负责Python解释器的线程安全,也只能保证同时只有一个线程在执行字节码,而Python程序本身的线程安全,Python一概不负责。想了解详情的,可以参考下网上的讨论,比如这个:https://www.zhihu.com/question/521650365

参考资料:
[1] https://zhuanlan.zhihu.com/p/504157513
[2] http://c.biancheng.net/view/5537.html
[3] https://zhuanlan.zhihu.com/p/379930260
[4] https://zhuanlan.zhihu.com/p/76343641
[5] https://zhuanlan.zhihu.com/p/77674796
[6] Python线程池
[7] UnderstandingGIL @ David Beazley
[8] Python守护线程
[9] Python 线程的join方法原理解析
[10] Python 线程同步机制解析
[11] Python threading 模块官方文档

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐