异步#

CPU 的速度远远快于磁盘、网络等 IO。在一个线程中,CPU 执行代码的速度极快,但,一旦遇到 IO 操作,如读写文件、发送网络数据时,就需要等待 IO 操作完成,才能继续进行下一步操作。这种情况称为同步 IO。

考虑到 CPU 和 IO 之间巨大的速度差异,一个任务在执行的过程中大部分时间都在等待 IO 操作,单进程单线程模型会导致别的任务无法并行执行。多线程和多进程的模型虽然解决了并发问题,但,系统不能无上限地增加线程。由于系统切换线程的开销也很大,故,一旦线程数量过多,CPU 的时间就花在线程切换上了,真正运行代码的时间就少了,结果导致性能严重下降。

一种解决 IO 问题的方法是异步 IO。当代码需要执行一个耗时的 IO 操作时,它只发出 IO 指令,并不等待 IO 结果,然后就去执行其他代码了。一段时间后,当 IO 返回结果时,再通知 CPU 进行处理。

1. 协程#

对应到 Python 语言,单线程的异步编程模型称为协程(coroutine),有了协程的支持,就可基于事件驱动编写高效的多任务程序。

协程,又称微线程,纤程。线程是系统级别的,它们是由操作系统调度;协程是程序级别的,由程序员根据需要自己调度。一个线程中的一个个函数称为子程序,则子程序在执行过程中可中断去执行别的子程序;别的子程序也可中断回来继续执行之前的子程序,这就是协程。

Python 用协程解决了高并发 I/O 的需求。协程使 Python 程序中拥有大量的看似同时运行的函数。它们使用 asyncawait 关键字以及与生成器相同的基础设施来实现。启动一个协程的成本是一个函数调用。一旦一个协程被激活,它使用不到 1 KB 的内存,直到用完为止。

和线程一样,协程是独立的函数,可从环境中消耗输入并产生结果输出。不同的是,协程在每个 await 表达式时都会暂停,并在解决了待定的 awaitable 后继续执行一个异步函数(类似于生成器中的 yield 行为)。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

协程的优点:

  • 无需线程上下文切换的开销,协程避免了无意义的调度,由此可提高性能

  • 无需原多线程的锁机制及同步的开销

  • 方便切换控制流,简化编程模型

  • 高并发 + 高扩展性 + 低成本:一个 CPU 支持上万的协程都不是问题。故很适合用于高并发处理。

协程的缺点:

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将单个 CPU 的多个核用上,协程需要和进程配合才能运行在多 CPU 上。当然我们日常所编写的绝大部分应用都没有这个必要,除非是 CPU 密集型应用。

  • 进行阻塞操作 IO 时,会阻塞掉整个程序

1.1. 协程返回值#

可以用 inspect.getgeneratorstate(gen) 输出生成器状态。

1.2. 创建#

子程序,或称为函数,在所有语言中均是层级调用,子程序调用是通过栈(stack)实现的,一个线程就是执行一个子程序。子程序调用总是一个入口,一次返回,调用顺序是明确的。

协程的调用和子程序不同。协程看上去亦为子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似 CPU 的中断。

def consumer():
    print("[CONSUMER] start")
    r = 'start'
    while True:
        n = yield r
        if not n:
            print("n is empty")
            continue
        print(f"[CONSUMER] Consumer is consuming {n}")
        r = "200 ok"


def producer(c):
    # 启动生成器
    start_value = c.send(None)
    print(start_value)
    n = 0
    while n < 3:
        n += 1
        print(f"[PRODUCER] Producer is producing {n}")
        r = c.send(n)
        print(f'[PRODUCER] Consumer return: {r}')
    # 关闭生成器
    c.close()


# 创建生成器
c = consumer()
# 传入生成器
producer(c)

2. 并行#

并行(parallelism)指实际上在同一时间做许多不同的事情。一台拥有多个 CPU 核的计算机可同时执行多个程序。每个 CPU 核运行一个独立程序的指令,使每个程序在同一时刻都能向前推进。

2.1. 创建#

通过 concurrent.futures 内置模块可很容易地访问多处理内置模块,它可能正是我们所需要的。它通过将额外的解释器作为子进程运行,使 Python 能够并行地利用多个 CPU 核。这些子进程与主解释器是分开的,故它们的全局解释器锁亦为分开的。每个子进程可充分利用一个 CPU 核。每个子进程都有一个与主进程的链接,在这里接收指令进行计算并返回结果。

ThreadPoolExecutor 使用线程池中的一个线程执行给定的任务。池中一共有 5 个线程,每个线程从池中取得一个任务然后执行它。当任务执行完成,再从池中拿到另一个任务。

如同 ThreadPoolExecutor 一样,ProcessPoolExecutor 是一个 executor,使用一个线程池来并行执行任务。但,不同的是,ProcessPoolExecutor 使用了多核处理的模块,让我们可不受 GIL 的限制,大大缩短执行时间。

3. 并发#

并发(concurrency)使计算机能够在同一时间做许多不同的事情。例如,在一台只有一个 CPU 核的计算机上,操作系统会迅速地改变哪个程序正在单处理器上运行。在这样做的过程中,它交错执行程序,提供了程序同时运行的假象。

3.1. 创建#

标准库 asyncio 内置对异步 IO 的支持。 asyncio 的编程模型就是一个消息循环。async 把一个生成器标记为协程类型,然后,把这个协程扔到 EventLoop 中执行。

由于 asyncio.sleep() 亦为一个协程,故线程不会等待 asyncio.sleep(),而是直接中断并执行下一个消息循环。当 asyncio.sleep() 返回时,线程就可从 await 拿到返回值(此处是 None ),然后接着执行下一行语句。把 asyncio.sleep(1) 看成是一个耗时 1 秒的 IO 操作,在此期间,主线程并未等待,而是去执行 EventLoop 中其他可执行的 coroutine 了,因此可实现并发执行。

4. 读写#

4.1. pathlib#

from pathlib import Path


def unify_ext_with_pathlib(path):
    for fpath in Path(path).glob('*.txt'):
        fpath.rename(fpath.with_suffix('.csv'))

4.2. 流式读取#

def read_from_file(filename, block_size=1024 * 8):
    with open(filename, "r") as fp:
        while chunk := fp.read(block_size):
            yield chunk

4.3. 条件读取#

from collections import deque


# Get last n lines of a file:
def tail(filename, n=10):
    with open(filename) as f:
        return deque(f, n)