异步#
CPU 的速度远远快于磁盘、网络等 IO。在一个线程中,CPU 执行代码的速度极快,但,一旦遇到 IO 操作,如读写文件、发送网络数据时,就需要等待 IO 操作完成,才能继续进行下一步操作。这种情况称为同步 IO。
考虑到 CPU 和 IO 之间巨大的速度差异,一个任务在执行的过程中大部分时间都在等待 IO 操作,单进程单线程模型会导致别的任务无法并行执行。多线程和多进程的模型虽然解决了并发问题,但,系统不能无上限地增加线程。由于系统切换线程的开销也很大,故,一旦线程数量过多,CPU 的时间就花在线程切换上了,真正运行代码的时间就少了,结果导致性能严重下降。
一种解决 IO 问题的方法是异步 IO。当代码需要执行一个耗时的 IO 操作时,它只发出 IO 指令,并不等待 IO 结果,然后就去执行其他代码了。一段时间后,当 IO 返回结果时,再通知 CPU 进行处理。
1. 协程#
对应到 Python 语言,单线程的异步编程模型称为协程(coroutine),有了协程的支持,就可基于事件驱动编写高效的多任务程序。
协程,又称微线程,纤程。线程是系统级别的,它们是由操作系统调度;协程是程序级别的,由程序员根据需要自己调度。一个线程中的一个个函数称为子程序,则子程序在执行过程中可中断去执行别的子程序;别的子程序也可中断回来继续执行之前的子程序,这就是协程。
Python 用协程解决了高并发 I/O 的需求。协程使 Python 程序中拥有大量的看似同时运行的函数。它们使用 async
和 await
关键字以及与生成器相同的基础设施来实现。启动一个协程的成本是一个函数调用。一旦一个协程被激活,它使用不到 1 KB 的内存,直到用完为止。
和线程一样,协程是独立的函数,可从环境中消耗输入并产生结果输出。不同的是,协程在每个 await
表达式时都会暂停,并在解决了待定的 awaitable
后继续执行一个异步函数(类似于生成器中的 yield
行为)。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
协程的优点:
无需线程上下文切换的开销,协程避免了无意义的调度,由此可提高性能
无需原多线程的锁机制及同步的开销
方便切换控制流,简化编程模型
高并发 + 高扩展性 + 低成本:一个 CPU 支持上万的协程都不是问题。故很适合用于高并发处理。
协程的缺点:
无法利用多核资源:协程的本质是个单线程,它不能同时将单个 CPU 的多个核用上,协程需要和进程配合才能运行在多 CPU 上。当然我们日常所编写的绝大部分应用都没有这个必要,除非是 CPU 密集型应用。
进行阻塞操作 IO 时,会阻塞掉整个程序
1.1. 协程返回值#
可以用 inspect.getgeneratorstate(gen)
输出生成器状态。
1.2. 创建#
子程序,或称为函数,在所有语言中均是层级调用,子程序调用是通过栈(stack)实现的,一个线程就是执行一个子程序。子程序调用总是一个入口,一次返回,调用顺序是明确的。
协程的调用和子程序不同。协程看上去亦为子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。
注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似 CPU 的中断。
def consumer():
print("[CONSUMER] start")
r = 'start'
while True:
n = yield r
if not n:
print("n is empty")
continue
print(f"[CONSUMER] Consumer is consuming {n}")
r = "200 ok"
def producer(c):
# 启动生成器
start_value = c.send(None)
print(start_value)
n = 0
while n < 3:
n += 1
print(f"[PRODUCER] Producer is producing {n}")
r = c.send(n)
print(f'[PRODUCER] Consumer return: {r}')
# 关闭生成器
c.close()
# 创建生成器
c = consumer()
# 传入生成器
producer(c)
2. 并行#
并行(parallelism)指实际上在同一时间做许多不同的事情。一台拥有多个 CPU 核的计算机可同时执行多个程序。每个 CPU 核运行一个独立程序的指令,使每个程序在同一时刻都能向前推进。
2.1. 创建#
通过 concurrent.futures
内置模块可很容易地访问多处理内置模块,它可能正是我们所需要的。它通过将额外的解释器作为子进程运行,使 Python 能够并行地利用多个 CPU 核。这些子进程与主解释器是分开的,故它们的全局解释器锁亦为分开的。每个子进程可充分利用一个 CPU 核。每个子进程都有一个与主进程的链接,在这里接收指令进行计算并返回结果。
ThreadPoolExecutor
使用线程池中的一个线程执行给定的任务。池中一共有 5 个线程,每个线程从池中取得一个任务然后执行它。当任务执行完成,再从池中拿到另一个任务。
如同 ThreadPoolExecutor
一样,ProcessPoolExecutor
是一个 executor
,使用一个线程池来并行执行任务。但,不同的是,ProcessPoolExecutor
使用了多核处理的模块,让我们可不受 GIL 的限制,大大缩短执行时间。
3. 并发#
并发(concurrency)使计算机能够在同一时间做许多不同的事情。例如,在一台只有一个 CPU 核的计算机上,操作系统会迅速地改变哪个程序正在单处理器上运行。在这样做的过程中,它交错执行程序,提供了程序同时运行的假象。
3.1. 创建#
标准库 asyncio
内置对异步 IO 的支持。 asyncio
的编程模型就是一个消息循环。async
把一个生成器标记为协程类型,然后,把这个协程扔到 EventLoop
中执行。
由于 asyncio.sleep()
亦为一个协程,故线程不会等待 asyncio.sleep()
,而是直接中断并执行下一个消息循环。当 asyncio.sleep()
返回时,线程就可从 await
拿到返回值(此处是 None
),然后接着执行下一行语句。把 asyncio.sleep(1)
看成是一个耗时 1 秒的 IO 操作,在此期间,主线程并未等待,而是去执行 EventLoop
中其他可执行的 coroutine
了,因此可实现并发执行。
4. 读写#
4.1. pathlib
#
from pathlib import Path
def unify_ext_with_pathlib(path):
for fpath in Path(path).glob('*.txt'):
fpath.rename(fpath.with_suffix('.csv'))
4.2. 流式读取#
def read_from_file(filename, block_size=1024 * 8):
with open(filename, "r") as fp:
while chunk := fp.read(block_size):
yield chunk
4.3. 条件读取#
from collections import deque
# Get last n lines of a file:
def tail(filename, n=10):
with open(filename) as f:
return deque(f, n)