多任务#

对于操作系统来说,一个任务就是一个进程(Process),如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程。

有些进程还不止同时干一件事,如 Word,它可同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个子任务,我们把进程内的这些子任务称为线程(Thread)。

由于每个进程至少要干一件事,故,一个进程至少有一个线程。真正地同时执行多线程需要多核 CPU 才可能实现。同时执行多个任务通常各个任务之间并不是没有关联的,而是需要相互通信和协调,有时,任务 1 必须暂停等待任务 2 完成后才能继续执行,有时,任务 3 和任务 4 又不能同时执行。但,有很多时候,没有多任务还真不行。幸运的是,Python 既支持多进程,又支持多线程。

1. 进程#

1.1. 多进程#

Unix/Linux 操作系统提供了一个 fork() 系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但,fork() 调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。子进程永远返回 0,而父进程返回子进程的 ID。这样做的理由是,一个父进程可 fork 出很多子进程,故,父进程要记下每个子进程的 ID,而子进程只需要调用 getppid() 就可拿到父进程的 ID。

import os

print(f'Process ({os.getpid()}) start...')
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print(f'I am child process ({os.getpid()}) and my parent is {os.getppid}.')
else:
    print(f'I ({os.getpid()}) just created a child process ({pid}).')

若打算编写多进程的服务程序,Unix/Linux 无疑是正确的选择。由于 Windows 没有 fork 调用,可使用 multiprocessing 模块。multiprocessing 模块提供了一个 Process 类来代表一个进程对象。

  • 创建一个 Process 实例

  • start() 启动

  • join() 加入任务,并同步进程

from multiprocessing import Process
import os


# 子进程要执行的代码
def run_proc(name):
    print(f'Run child process {name} ({os.getpid()})...')


if __name__ == '__main__':
    print(f'Parent process {os.getpid()}.')
    p = Process(target=run_proc, args=('test', ))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

1.2. 进程池#

若要启动大量的子进程,可用进程池的方式批量创建子进程。对 Pool 对象调用 join() 方法会等待所有子进程执行完毕,调用 join() 之前必须先调用 close(),调用 close() 之后就不能继续添加新的 Process 了。

from multiprocessing import Pool
import os, time, random


def long_time_task(name):
    print(f'Run task {name} ({os.getpid()})...')
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print(f'Task {name} runs {(end - start):0.2f} seconds.')


if __name__ == '__main__':
    print(f'Parent process {os.getpid()}.')
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i, ))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

请注意输出的结果,task 0,1,2,3 是立刻执行的,而 task 4 要等待前面某个 task 完成后才执行,这是因为 Pool 的默认大小是 4(默认大小是 CPU 的核数),故,最多同时执行 4 个进程。

1.3. 子进程#

很多时候,子进程并不是自身,而是一个外部进程。创建了子进程后,还需要控制子进程的输入和输出。管理子进程的最佳选择是使用 subprocess 内置模块。若子进程还需要输入,则可通过 communicate() 方法输入:

from subprocess import Popen, PIPE

print('$nslookup')
p = Popen(['nslookup'], stdin=PIPE, stdout=PIPE, stderr=PIPE)
output, err = p.communicate(b'set q=mx\npython.org!=xit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)

上面的代码相当于在命令行执行命令 nslookup,然后手动输入:

set q=mx
python.org
exit

1.4. 进程队列#

进程之间肯定是需要通信的。Python 的 multiprocessing 模块包装了底层的通信机制,提供了 QueuePipes 等多种方式来交换数据。以 Queue 为例,在父进程中创建两个子进程,一个往 Queue 里写数据,一个从 Queue 里读数据:

from multiprocessing import Process, Queue
import os, time, random


# 写数据进程执行的代码:
def write(q):
    print(f'Process to write: {os.getpid()}')
    for value in ['A', 'B', 'C']:
        print(f'Put {value} to queue...')
        q.put(value)
        time.sleep(random.random())


# 读数据进程执行的代码:
def read(q):
    print(f'Process to read: {os.getpid()}')
    while True:
        value = q.get(True)
        print(f'Get {value} from queue.')


if __name__ == '__main__':
    # 父进程创建 Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q, ))
    pr = Process(target=read, args=(q, ))
    # 启动子进程 pw,写入:
    pw.start()
    # 启动子进程 pr,读取:
    pr.start()
    # 等待 pw 结束:
    pw.join()
    # pr 进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

1.5. 管道#

Python 可将程序中的数据以管道的形式输入到一个子进程中并检索其输出。这允许您利用许多其他程序来并行地完成工作。例如,设我想使用 openssl 命令行工具来加密一些数据。用命令行参数和 I/O 管道启动子进程很容易。

import os
from subprocess import Popen, PIPE


def run_encrypt(data):
    env = os.environ.copy()
    env['password'] = 'zf7ShyBhZOraQDdE/FiZpm/m/8f9X+M1'
    proc = Popen(['openssl', 'enc', '-pbkdf2', '-pass', 'env:password'],
                 env=env,
                 stdin=PIPE,
                 stdout=PIPE)
    proc.stdin.write(data)
    proc.stdin.flush()  # Ensure that the child gets input
    return proc


procs = []
for _ in range(3):
    data = os.urandom(10)
    proc = run_encrypt(data)
    procs.append(proc)

for proc in procs:
    out, _ = proc.communicate()
    print(out[-10:])

2. 线程#

2.1. GIL#

Python 的标准实现称为 CPython。CPython 分两步运行一个 Python 程序。

  • 首先,它解析并编译源文本为字节码,是 8 位或 16 位指令的字码。

  • 然后,CPython 使用一个基于堆栈的解释器来运行字节码。

CPython 解释器本身就不是线程安全的,CPython 通过一个称为全局解释器锁(global interpreter lock,GIL)的机制来强制执行一致性。GIL 是一个互斥锁(mutex),一次只允许使用一个线程执行 Python 字节码。因此,一个 Python 进程通常不能同时使用多个 CPU 核心。

2.2. 阻塞 I/O#

有一些方法可让 CPython 利用多核,但它们不能与标准的 Thread 类一起工作,且可能需要大量努力。考虑到这些限制,为什么 Python 还支持线程呢?有两个很好的理由。

  • 有了线程,可让 Python 来并发运行你的函数。

  • 为了处理阻塞 I/O。当 Python 进行某些类型的系统调用时,就会出现这种情况。一个 Python 程序使用系统调用于要求计算机的操作系统代表它与外部环境交互。阻塞 I/O 包括读写文件、与网络交互、与显示器等设备通信等。线程通过使程序与操作系统响应请求所需的时间绝缘来帮助处理阻塞 I/O。

GIL 阻止了 Python 代码的并行运行,但它没有影响系统调用。这是因为 Python 线程在进行系统调用之前就释放了 GIL,而一旦系统调用完成,它们就会重新获取 GIL。

即便可突破 GIL,线程的代价也不可忽视。

由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可启动新的线程,Python 的 threading 模块有个 current_thread() 函数,它永远返回当前线程的实例。

import select
import socket
import time
from threading import Thread


def slow_systemcall():
    select.select([socket.socket()], [], [], 0.1)


start = time.time()
for _ in range(5):
    slow_systemcall()
end = time.time()

delta = end - start
print(f'Took {delta:.3f} seconds')

start = time.time()
threads = []
for _ in range(5):
    thread = Thread(target=slow_systemcall)
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()
end = time.time()

delta = end - start
print(f'Took {delta:.3f} seconds')

2.3. 锁步#

GIL 不会保护你。多线程和多进程最大的不同在于

  • 多进程中,同一个变量,各自有一份复制存在于每个进程中,互不影响

  • 多线程中,所有变量都由所有线程共享

故,任何一个变量都可被任何一个线程修改,故,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

from threading import Thread, Lock


def worker(sensor_index, how_many, counter):
    for _ in range(how_many):
        # Read from the sensor
        counter.increment(1)


def threadRun():
    for i in range(5):
        thread = Thread(target=worker, args=(i, how_many, counter))
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()

    expected = how_many * 5
    found = counter.count
    print(f'Counter should be {expected}, got {found}')


how_many = 10**5
threads = []


class Counter:

    def __init__(self):
        self.count = 0

    def increment(self, offset):
        self.count += offset


counter = Counter()
threadRun()


class LockingCounter:

    def __init__(self):
        self.lock = Lock()
        self.count = 0

    def increment(self, offset):
        with self.lock:
            self.count += offset


counter = LockingCounter()
threadRun()

2.4. ThreadLocal#

在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。

但,局部变量也有问题,就是在函数调用的时候,传递起来很麻烦。 ThreadLocal 应运而生。

from threading import Thread, local, current_thread

# 创建全局 ThreadLocal 对象:
local_school = local()


def process_student():
    # 获取当前线程关联的 student:
    std = local_school.student
    print(f'Hello, {std} (in {current_thread().name})')


def process_thread(name):
    # 绑定 ThreadLocal 的 student:
    local_school.student = name
    process_student()


t1 = Thread(target=process_thread, args=('Alice', ), name='Thread-A')
t2 = Thread(target=process_thread, args=('Bob', ), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

全局变量 local_school 是一个 ThreadLocal 对象,每个 Thread 对它都可读写 student 属性,但互不影响。可看成全局 dict,但每个属性如 local_school.student 均是线程的局部变量,可任意读写而互不干扰,也不用管理锁的问题,ThreadLocal 内部会处理。

ThreadLocal 最常用的地方就是为每个线程绑定一个数据库连接,HTTP 请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可非常方便地访问这些资源。

2.5. 线程队列#

Python 程序若要同时做很多事情,往往需要协调它们的工作。并发工作最有用的安排之一是函数管道。

管道有许多串行(serial)的阶段,每个阶段有一个特定的函数。新的工作件不断地被添加到管道的开头。各功能可同时运行,每个功能都在其阶段的工作件上工作。这种方法特别适用于包括阻塞 I/O 或子进程的工作,这些活动可很容易地使用 Python 进行并行化。

queue 内置模块中的 Queue 类提供了解决上述问题所需的所有功能。 Queue 通过使 get 方法阻塞,直到有新的数据可用,从而消除了 worker 中繁忙的等待。

from queue import Queue
from threading import Thread
import time

my_queue = Queue(1)  # Buffer size of 1


def consumer():
    time.sleep(0.1)  # Wait
    my_queue.get()  # Runs 2nd
    print('Consumer got 1')
    my_queue.get()  # Runs 4th
    print('Consumer got 2')
    print('Consumer done')


thread = Thread(target=consumer)
thread.start()

my_queue.put(object())  # Runs 1st
print('Producer put 1')
my_queue.put(object())  # Runs 3rd
print('Producer put 2')
print('Producer done')
thread.join()

Queue 类还可使用 task_done 方法跟踪工作进度。这让你可等待一个阶段的输入队列耗尽,且不需要轮询管道的最后一个阶段(就像上面的 done_queue )。

from queue import Queue
from threading import Thread

in_queue = Queue()


def consumer():
    print('Consumer waiting')
    work = in_queue.get()  # Runs 2nd
    print('Consumer working')
    print('Consumer done')
    in_queue.task_done()  # Runs 3rd


thread = Thread(target=consumer)
thread.start()

print('Producer putting')
in_queue.put(object())  # Runs 1st
print('Producer waiting')
in_queue.join()  # Runs 4th
print('Producer done')
thread.join()

3. 进程与线程#

3.1. 开销#

要实现多任务,通常会设计 Master-Worker 模式,Master 负责分配任务,Worker 负责执行任务,故,多任务环境下,通常是一个 Master,多个 Worker

  • 多进程

    • 优点:稳定性高,一个子进程崩溃了,不会影响主进程和其他子进程。

    • 缺点:

      • 创建进程的代价大,尤其在 Windows 下。

      • 在内存和 CPU 的限制下,操作系统能同时运行的进程数亦为有限的。

  • 多线程模式

    • 优点:通常比多进程快一点

    • 缺点:稳定性差,任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存。

3.2. 密集型#

任务可分为计算密集型和 IO 密集型。

计算密集型任务

  • CPU 消耗大,Python 这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用 C 语言编写。

  • 计算密集型任务同时进行的数量应当等于 CPU 的核心数。

IO 密集型

  • CPU 消耗很少

  • 任务的大部分时间都在等待 IO 操作完成(因为 IO 的速度远远低于 CPU 和内存的速度)。

  • 任务越多,CPU 效率越高,但也有一个限度。

3.3. 扇出与扇入#

为每个工作单元产生一条并发执行线的过程称为扇出(fan-out)。等待所有这些并发的工作单元完成后,再在一个协调的过程中进入下一阶段,称为扇入(fan-in)。

应尽量避免通过扇出来创建新的线程。