天青色等烟雨,而我在等你。

Understanding Asynchronous IO with Python 3.4's Asyncio and Node.js

这是一篇译文: 原文

介绍

今年夏天我都在开发运行于 Node.js 的 Web 平台。这是我第一次全职在写 Node.js,然后几周之后问题明显的暴露出来,就是好多开发者,包括我,缺乏对 Node.js 中异步特性的清晰认识以及在底层是如何实现的。因为我确信要想高效使用一个平台需要清楚的理解它是如何工作的,所以我决定深入研究一下。这份好奇心让我开始研究其他语言中的异步特性是如何实现的,尤其是 Python,是我每次学习和试验用的语言。这让我去研究 Python 3.4 中的异步 IO 库 asyncio ,跟我之前研究的 coroutines (看我之前的文章 combinatorial generation using coroutines in Python)。这篇文章探讨了一些我在学习这块主题时候遇到的问题和解答,我希望能够帮助大家理解和回答一些问题。

所有的 Python 代码需要 Python 3.4 环境。这主要是因为 Python 3.4 中介绍了 selectorsasyncio 模块。对于一些 Python 的早期版本中,类似 Twisted,gevent,tornado 等库也提供了类似的功能。

在下面的一些例子中,我决定几乎完全忽略了错误和异常处理。这主要是为了简单起见,但是需要注意的是适当的异常处理在下面的编码设计方式中也很重要。我会在最后用一些例子展示 Python 3.4 的 asyncio 模块中如果对异常进行处理。

Getting Sarted: 重访 Hello World

让我们从编写一个解决非常简单的问题的代码开始。我们从头到尾都将以这个问题和它的细微变化来展示概念。

Write a program to print “Hello world!” every three seconds, and at the same time wait for input from the user. Each line of user input will contain a single positive number n. As soon as input is entered, calculate and output the Fibonacci number F(n) and continue to wait for more input.

注意当用户输入的时候,周期性的「Hello World!」将会有可能出现,但是我们不关心这点。

熟悉 Node.js 和 JavaScript 的人可能已经想出了解决方案,可能看起来是这样:

log_execution_time = require('./utils').log_execution_time;

var fib = function fib(n) {
    if (n < 2) return n;
    return fib(n - 1) + fib(n - 2);
};

var timed_fib = log_execution_time(fib);

var sayHello = function sayHello() {
    console.log(Math.floor((new Date()).getTime() / 1000) + " - Hello world!");
};

var handleInput = function handleInput(data) {
    n = parseInt(data.toString());
    console.log('fib(' + n + ') = ' + timed_fib(n));
};

process.stdin.on('data', handleInput);
setInterval(sayHello, 3000);

大家可以看到,在 Node.js 中很容易实现。我们只需要设置一个间隔计时器来打印 「Hello World!」然后在吗process.stdin 上 attach 一个 data evnet handler,就做完了。在抽象层面很容易理解,并且也很容易使用。It just works!。But how? 为了回答这个问题,我们在 Python 里面做些同样的事情。

同样需要注意的,我们使用了一个 log_execution_time 的修饰符来输出计算 Fibonacci 数用掉的时间。

在这里我们用来计算 Fibonacci 数的算法我们有意选择了所有算法中最慢的一种(指数增长)。这是由于本篇文章不是关于 Fibonacci 数的(关于这个话题,参考这篇文章, 里面讲了一种对数时间复杂度的算法) 并且我就是想让代码运算得慢些来展示一些概念。这是 Python 实现,会用上几倍的时间。

from log_execution_time import log_execution_time


def fib(n):
    return fib(n - 1) + fib(n - 2) if n > 1 else n


    timed_fib = log_execution_time(fib)

现在回到手上的问题。我们是怎么开始的? Python 没用提供内置的 setInterval 或者 setTimeout。所以第一种可能的解决方案是使用系统级别的并发来解决。我们来使用两条线程来实现。我们需要对线程的细节多些了解。

from threading import Thread
from time import sleep
from time import time
from fib import timed_fib


def print_hello():
    while True:
        print("{} - Hello world!".format(int(time())))
        sleep(3)


def read_and_process_input():
    while True:
        n = int(input())
        print('fib({}) = {}'.format(n, timed_fib(n)))


def main():
    # Second thread will print the hello message. Starting as a daemon means
    # the thread will not prevent the process from exiting.
    t = Thread(target=print_hello)
    t.daemon = True
    t.start()
    # Main thread will read and process input
    read_and_process_input()

if __name__ == '__main__':
    main()

也很简单。那么基于线程的 Python 实现与 Node.js 中的实现相同吗? 我们来做个试验。像我们之前讨论过的,我们的 Fibonacci 数的计算代码非常的慢,让我们试一个非常大的数字,比如 Python 中用 37,Node.js 用45(JavaScript 在数字计算中确实比 Python 快些)。

$ python3.4 hello_threads.py
1412360472 - Hello world!
37
1412360475 - Hello world!
1412360478 - Hello world!
1412360481 - Hello world!
Executing fib took 8.96 seconds.
fib(37) = 24157817
1412360484 - Hello world!

在 Node.js 中,当计算 Fibbonacci 数时候,打印 「Hello World!」消息就停止了。让我们看看这是怎么回事。

事件循环和线程

为了理解前面部分中出现的两种解决方案之间的区别,我们需要简单的了解一下线程和事件循环。首先,我们从线程开始。线程被认为是一个单一的操作序列和 CPU 的执行它们的当前状态(CPU 状态指的是例如寄存器中的值,特别是下一条指令的地址)。

简单的同步程序通常在一条单一的线程中运行,这就是为什么有的操作需要等待,比如等待 IO 操作或者计时器,程序的执行会被暂停直到阻塞操作结束。最简单的阻塞操作之一就是sleep。事实上,sleep所做的全部事情,也就是按照给定的时间长度阻塞当前执行的线程。一个建成可以运行许多线程。在同一条进程里面的线程共享进程级的资源,比如内存和它的地址空间,文件描述符等等。

操作系统会控制操作线程,系统内部的调度器会关心进程内部的线程切换(和进程切换,但是我们在这里不太关心它,因为它不在本文的讨论范围)。操作系统的调度器会选择什么时候暂停一条线程然后把 CPU 的控制权交给另外一条线程执行。这个叫做上下文切换,需要关系到当前线程上下文的保存(比如,CPU 寄存器的值)然后载入目标线程的上下文。上下文切换的代价会比较大因为需要 CPU 的循环。

有很多原因可以让 CPU 决定切换到下一条线程。比如高优先级的进程或者线程需要立即执行(比如,处理硬件中断的代码),或者线程自己要求暂定一会(比如调用sleep),或者线程的执行超过了为它分配的时间(也被称作「量子时间」(时间片?,译者注))然后必须回到等待队列等待下一次被调度继续执行。

回到我们之前的解决方案,Python 解法明显是基于多线程的。这也解释了为什么两个任务可以同时执行,以及为什么计算大的 Fibonacci 数这种 CPU 密集型操作没有阻塞另一条线程执行。

那么 Node.js 是怎么样的呢? 事实证明,计算是阻塞了另外一个任务的,也就是说我们的代码是单线程执行的。这也是事实上 Node.js 是如何实现的。至于操作系统方面,你的程序也是运行在单线程的(这里我稍微简化了一些东西,以为基于 libuv 平台可能会对一些 IO 事件应用到线程池,但是这不影响你的 Javascript 代码还是运行在单线程的)。

有很多原因需要你在一些场景避免使用多线程。其中之一就是线程在计算机和资源方面是昂贵的,另外真正的基于线程的并行带来的内存共享问题会牵扯到比如死锁和竞争条件,需要在编程是考虑保持线程安全,是代码更加复杂。(当然,这些都是相对的,线程有自己的时间和空间(这里每太看懂,译者)。但是这不是本文的重点!)

让我们看看如果我们不用多线程解决这个问题。这样我们需要模仿 Node.js 背后的场景:事件循环。首先,我们需要一个办法来查询stdin来保持输入,也就是一个系统调用来等待一个文件描述符(这里就是stdin)是否有可读的输入。基于操作系统,有很多系统调用来实现比如pollselectkqueue等待。在 Python 3.4 中,selectors 模块提供了这些系统调用的一个抽象,你可以安全地(在某种意义上)在不同的机器上面使用它。

当我们有了查询功能,我们的事件循环将会非常简单:在每次循环迭代中,我们查询是否有可读输入,如果有,把它读进来处理。然后,我们检查是不是距离上次打印「Hello World!」是不是已经过了三秒钟,如果是,我们再打印。来看下。

import selectors
import sys
from time import time
from fib import timed_fib


def process_input(stream):
    text = stream.readline()
    n = int(text.strip())
    print('fib({}) = {}'.format(n, timed_fib(n)))


def print_hello():
    print("{} - Hello world!".format(int(time())))


def main():
    selector = selectors.DefaultSelector()
    # Register the selector to poll for "read" readiness on stdin
    selector.register(sys.stdin, selectors.EVENT_READ)
    last_hello = 0  # Setting to 0 means the timer will start right away
    while True:
        # Wait at most 10 milliseconds for input to be available
        for event, mask in selector.select(0.1):
            process_input(event.fileobj)
        if time() - last_hello > 3:
            last_hello = time()
            print_hello()


if __name__ == '__main__':
    main()

这是输出结果:

$ python3.4 hello_eventloop.py
1412376429 - Hello world!
1412376432 - Hello world!
1412376435 - Hello world!
37
Executing fib took 9.7 seconds.
fib(37) = 24157817
1412376447 - Hello world!
1412376450 - Hello world!

正如我们所料,由于我们使用了单线程,程序表现的跟 Node.js 一样,就是说,计算操作阻塞了打印操作。好的,优雅! 但是我们的解决是针对某种情形的硬编码。下面的部分,我们会关注通用化我们的事件循环代码来让它更强大和容易编写,首先应用回调,然后应用协程。

基于回调的事件循环

对于前面部分的事件循环的一个自然通用概括就是支持通用事件处理。使用回调可以相对容易的得到:对于每种事件类型(在我们的例子中,我们只有两种,stdin上面的输入和计时器计时),让用户可以添加任意的方法作为事件处理。代码很简单,我们直接跳过了。只有一点稍微有些复杂,就是应用bisect.insort来处理计时事件。这里的算法是维护一个有序的计时器事件,最早的计时器事件先执行。这样,在每次的循环迭代中,我们只需检查是否有计时器,如果有,在开始的时候执行然后执行所有过期的计时器。bisect.insort简化了列表的正确插入。有很多方法实现这一点,这里我选择了这种。

from bisect import insort
from fib import timed_fib
from time import time
import selectors
import sys


class EventLoop(object):
    """
    Implements a callback based single-threaded event loop as a simple
    demonstration.
    """
    def __init__(self, *tasks):
        self._running = False
        self._stdin_handlers = []
        self._timers = []
        self._selector = selectors.DefaultSelector()
        self._selector.register(sys.stdin, selectors.EVENT_READ)

    def run_forever(self):
        self._running = True
        while self._running:
            # First check for available IO input
            for key, mask in self._selector.select(0):
                line = key.fileobj.readline().strip()
                for callback in self._stdin_handlers:
                    callback(line)

            # Handle timer events
            while self._timers and self._timers[0][0] < time():
                handler = self._timers[0][1]
                del self._timers[0]
                handler()

    def add_stdin_handler(self, callback):
        self._stdin_handlers.append(callback)

    def add_timer(self, wait_time, callback):
        insort(self._timers, (time() + wait_time, callback))

    def stop(self):
        self._running = False


def main():
    loop = EventLoop()

    def on_stdin_input(line):
        if line == 'exit':
            loop.stop()
            return
        n = int(line)
        print("fib({}) = {}", n, timed_fib(n))

    def print_hello():
        print("{} - Hello world!".format(int(time())))
        loop.add_timer(3, print_hello)

    def f(x):
        def g():
            print(x)
        return g

    loop.add_stdin_handler(on_stdin_input)
    loop.add_timer(0, print_hello)
    loop.run_forever()


if __name__ == '__main__':
    main()

这个看起来很简单,在实践中,这也是 Node.js 中常用的方法。然而,在更复杂的程序中,这种书写异步代码的方式,特别是加入了异常处理之后,将会成为所谓的 callback hell。引用 Guido van Rossum 关于回调的说法:

It requires super human discipline to write readable code in callbacks and if you don’t believe me look at any piece of JavaScript code. - Guido van Rossum

有好多种其他的方案可以实现,比如 promises 和协程(已经有无数的 NPM 库做了对应的实现)。我更偏向的是(不是密码,我觉得协程很酷!)使用协程。下一部分我们基于协程实现了一个类似的事件循环。

基于协程的事件循环

协程就是当一个函数「返回」时仍然保存它返回时的状态(局部作用域的变量,以及下一步要执行的操作)。这样允许协程下一次被调用时直接进入它上次放回时的状态。这种形式的「返回」通常称为yielding。我在我的文章combinatorial generation using coroutines article中研究过许多协程的细节和他们在 Python 中的实现。下面在我们应用它之前,我提供了一个非常简要的介绍。

在 Python 中,yield关键字可以用来创建协程。当简单地被当作一条语句使用时,比如 yield value,给出的值将会被产出,控制权交还给调用者。想要从 yield 语句下一步继续执行协程,调用者需要调用内置的 next 函数。当作为表达式使用时,比如 y = yield xx被产出,当需要继续协程时,可以调用协程的 send 方法,这样传给 send 方法的值会被发送回去赋给协程的返回表达式的返回值(这个例子中是 y)。

这意味着我们可以利用协程编写异步代码,当我们需要等待异步操作时,简单的调用 yield。这样,我们可以 yield 需要继续执行的任务或者协程。这样代码看起来会非常有序而且看上去很像同步代码。这是一个产生 Fibonocci 数一部分的例子:

def read_input():
    while True:
        line = yield sys.stdin
        n = int(line)
        print("fib({}) = {}", n, timed_fib(n))

当然,为此我们需要一个事件循环来处理协程。为此,我们需要维护一个任务队列,在事件循环中执行。当有输入或者定时器到时(或者更通用一些,任何其他我们关心的事件),我们维护一个需要继续执行的协程列表(可能还有需要传入的值)。对于每一个任务,我们维护一个绑定的 stack 变量来跟踪协程的栈,保持他们链式执行,各自根据下一个协程完毕。这是基于 PEP342 中给出的 「Trampoline」的例子实现。我还用到了 Python 当中的 functools.partial,等价于 Javascript 中的 Function.prototyp.bind,也就是函数柯里化

像这样:

from bisect import insort
from collections import deque
from fib import timed_fib
from functools import partial
from time import time
import selectors
import sys
import types


class sleep_for_seconds(object):
    """
    Yield and object of this type from a coroutine to have it "sleep" for the
    given number of seconds.
    """
    def __init__(self, wait_time):
        self._wait_time = wait_time


class EventLoop(object):
    """
    Implements a simplified coroutine-based event loop as a demonstration.
    Very similar to the "Trampoline" example in PEP 342, with exception
    handling taken out for simplicity, and selectors added to handle file IO
    """
    def __init__(self, *tasks):
        self._running = False
        self._selector = selectors.DefaultSelector()

        # Queue of functions scheduled to run
        self._tasks = deque(tasks)

        # (coroutine, stack) pair of tasks waiting for input from stdin
        self._tasks_waiting_on_stdin = []

        # List of (time_to_run, task) pairs, in sorted order
        self._timers = []

        # Register for polling stdin for input to read
        self._selector.register(sys.stdin, selectors.EVENT_READ)

    def resume_task(self, coroutine, value=None, stack=()):
        result = coroutine.send(value)
        if isinstance(result, types.GeneratorType):
            self.schedule(result, None, (coroutine, stack))
        elif isinstance(result, sleep_for_seconds):
            self.schedule(coroutine, None, stack, time() + result._wait_time)
        elif result is sys.stdin:
            self._tasks_waiting_on_stdin.append((coroutine, stack))
        elif stack:
            self.schedule(stack[0], result, stack[1])

    def schedule(self, coroutine, value=None, stack=(), when=None):
        """
        Schedule a coroutine task to be run, with value to be sent to it, and
        stack containing the coroutines that are waiting for the value yielded
        by this coroutine.
        """
        # Bind the parameters to a function to be scheduled as a function with
        # no parameters.
        task = partial(self.resume_task, coroutine, value, stack)
        if when:
            insort(self._timers, (when, task))
        else:
            self._tasks.append(task)

    def stop(self):
        self._running = False

    def do_on_next_tick(self, func, *args, **kwargs):
        self._tasks.appendleft(partial(func, *args, **kwargs))

    def run_forever(self):
        self._running = True
        while self._running:
            # First check for available IO input
            for key, mask in self._selector.select(0):
                line = key.fileobj.readline().strip()
                for task, stack in self._tasks_waiting_on_stdin:
                    self.schedule(task, line, stack)
                self._tasks_waiting_on_stdin.clear()

            # Next, run the next task
            if self._tasks:
                task = self._tasks.popleft()
                task()

            # Finally run time scheduled tasks
            while self._timers and self._timers[0][0] < time():
                task = self._timers[0][1]
                del self._timers[0]
                task()

        self._running = False


def print_every(message, interval):
    """
    Coroutine task to repeatedly print the message at the given interval
    (in seconds)
    """
    while True:
        print("{} - {}".format(int(time()), message))
        yield sleep_for_seconds(interval)


def read_input(loop):
    """
    Coroutine task to repeatedly read new lines of input from stdin, treat
    the input as a number n, and calculate and display fib(n).
    """
    while True:
        line = yield sys.stdin
        if line == 'exit':
            loop.do_on_next_tick(loop.stop)
            continue
        n = int(line)
        print("fib({}) = {}".format(n, timed_fib(n)))


def main():
    loop = EventLoop()
    hello_task = print_every('Hello world!', 3)
    fib_task = read_input(loop)
    loop.schedule(hello_task)
    loop.schedule(fib_task)
    loop.run_forever()


if __name__ == '__main__':
    main()

注意到这种实现允许我们添加一个简单的 do_on_next_tick 方法,或多或少做些 Node.js 里面 process.nextTick 一样的事情。我用它实现了一个简单的键入 exit 就退出的功能(当然,我不是必须要用到 do_on_next_tick,我可以直接调用 loop.stop() !)

另一个有意思的事情就是我们可以重新实现我们的递归 Fibonacci 算法,使用协程,而不是递归调用,这样我们可以让它于其他协程一起「并行」执行,包括打印 hello。看起来像这样:

from event_loop_coroutine import EventLoop
from event_loop_coroutine import print_every
import sys


def fib(n):
    if n <= 1:
        yield n
    else:
        a = yield fib(n - 1)
        b = yield fib(n - 2)
        yield a + b


def read_input(loop):
    while True:
        line = yield sys.stdin
        n = int(line)
        fib_n = yield fib(n)
        print("fib({}) = {}".format(n, fib_n))


def main():
    loop = EventLoop()
    hello_task = print_every('Hello world!', 3)
    fib_task = read_input(loop)
    loop.schedule(hello_task)
    loop.schedule(fib_task)
    loop.run_forever()


if __name__ == '__main__':
    main()

程序的输出将会是:

$ python3.4 fib_coroutine.py
1412727829 - Hello world!
1412727832 - Hello world!
28
1412727835 - Hello world!
1412727838 - Hello world!
fib(28) = 317811
1412727841 - Hello world!
1412727844 - Hello world!

不是重复造轮子

在前面两部分里面,我们用通用的方法实现了允许使用回调或者协程编写异步代码的事件循环。这对实践和学习来说有很大的意义但是在实践中,已经有好多成熟的 Python 库提供了事件循环。另外,在 Python 3.4 中带有一个 asyncio 模块,提供了事件循环和 IO 操作,网络等等。让我们先用 asyncio 解决上面的问题,然后来看一些更有趣的例子。

import asyncio
import sys
from time import time
from fib import timed_fib


def process_input():
    text = sys.stdin.readline()
    n = int(text.strip())
    print('fib({}) = {}'.format(n, timed_fib(n)))


@asyncio.coroutine
def print_hello():
    while True:
        print("{} - Hello world!".format(int(time())))
        yield from asyncio.sleep(3)


def main():
    loop = asyncio.get_event_loop()
    loop.add_reader(sys.stdin, process_input)
    loop.run_until_complete(print_hello())


if __name__ == '__main__':
    main()

注意 @asyncio.coroutine 是如何修饰协程的,而且用 yield from 而不是仅仅 yield 在其他协程的返回值上面。

异常处理

Python 的协程允许在协程运行时栈帧内抛出异常,这是协程已经暂停。我们来看个简单的例子:

def coroutine():
    print("Starting")
    try:
        yield "Let's pause until continued."
        print("Continuing")
    except Exception as e:
        yield "Got an exception: " + str(e)


def main():
    c = coroutine()
    next(c)  # Execute until the first yield
    # Now throw an exception at the point where the coroutine has paused
    value = c.throw(Exception("Have an exceptional day!"))
    print(value)


if __name__ == '__main__':
    main()

会输出:

Starting
Got an exception: Have an exceptional day!

这将使用抛出异常来处理错误变得非常简单,在同步或者异步代码中,让事件循环可以适当的捕获和传递异常。例如,我们来看一个链式协程和事件循环:

import asyncio


@asyncio.coroutine
def A():
    raise Exception("Something went wrong in A!")


@asyncio.coroutine
def B():
    a = yield from A()
    yield a + 1


@asyncio.coroutine
def C():
    try:
        b = yield from B()
        print(b)
    except Exception as e:
        print("C got exception:", e)


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(C())


if __name__ == '__main__':
    main()

在这个例子中,协程 C 依赖 B 的结果,B 依赖 AA 抛出异常。所以如你所见,异常一直被传递到 C 之后被捕获,打印出消息。如你所见,这样的表现几乎跟同步代码一样。不需要手动在回调里面传递和捕获异常!

当然,这个例子更偏向理论也比较乏味。我们来看一个现实中的例子:我们来写一段代码,通过 ipify 异步获取外网 IP。因为 asyncio 并不具有 HTTP 客户端(无论如何,到目前为止!),我们必须在 TCP 层手写 HTTP 请求并且处理相应。因为我们已经想好了一个特定的 API (作为一个例子,不是生产环境代码!),我们直接开始。在实践中,用针对这个功能的现成的模块,比如 aiohttp,会好很多。让我们来看看代码是什么样的:

import asyncio
import json


host = 'api.ipify.org'
request_headers = {'User-Agent': 'python/3.4',
                   'Host': host,
                   'Accept': 'application/json',
                   'Accept-Charset': 'UTF-8'}


@asyncio.coroutine
def write_headers(writer):
    for key, value in request_headers.items():
        writer.write((key + ': ' + value + '\r\n').encode())
    writer.write(b'\r\n')
    yield from writer.drain()


@asyncio.coroutine
def read_headers(reader):
    response_headers = {}
    while True:
        line_bytes = yield from reader.readline()
        line = line_bytes.decode().strip()
        if not line:
            break
        key, value = line.split(':', 1)
        response_headers[key.strip()] = value.strip()
    return response_headers


@asyncio.coroutine
def get_my_ip_address(verbose):
    reader, writer = yield from asyncio.open_connection(host, 80)
    writer.write(b'GET /?format=json HTTP/1.1\r\n')
    yield from write_headers(writer)
    status_line = yield from reader.readline()
    status_line = status_line.decode().strip()
    http_version, status_code, status = status_line.split(' ')
    if verbose:
        print('Got status {} {}'.format(status_code, status))
    response_headers = yield from read_headers(reader)
    if verbose:
        print('Response headers:')
        for key, value in response_headers.items():
            print(key + ': ' + value)
    # Assume the content length is sent by the server, which is the case
    # with ipify
    content_length = int(response_headers['Content-Length'])
    response_body_bytes = yield from reader.read(content_length)
    response_body = response_body_bytes.decode()
    response_object = json.loads(response_body)
    writer.close()
    return response_object['ip']


@asyncio.coroutine
def print_my_ip_address(verbose):
    try:
        ip_address = yield from get_my_ip_address(verbose)
        print("My IP address is:")
        print(ip_address)
    except Exception as e:
        print("Error: ", e)


def main():
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(print_my_ip_address(verbose=True))
    finally:
        loop.close()


if __name__ == '__main__':
    main()

再一次,注意与同步代码的相似之处: 没有回调,没有复杂的错误处理,仅仅是非常简单而且可读的代码。我们来看看它怎么工作,没有发生任何错误:

$ python3.4 ipify.py
Got status 200 OK
Response headers:
Content-Length: 21
Server: Cowboy
Connection: keep-alive
Via: 1.1 vegur
Content-Type: application/json
Date: Fri, 10 Oct 2014 03:46:31 GMT
My IP address is:
<my IP address here, hidden for privacy!>

另外,如果发生了错误,例如没有网络连接,会输出:

$ python3.4 ipify.py
Error:  [Errno 8] nodename nor servname provided, or not known

我认为这是使用协程编代替编写异步代码的主要好处: 与同步代码中的错误处理是一致的。比如上面的例子,万一协程链中的某一步失败了,或者某一个同步调用失败,异常都会被同样的捕获和处理。

依赖多个相关协程的执行结果

在前面的例子中,我们编写的代码在内部都是有一定顺序的,就是说协程内部的每条语句依赖钱一个协程完成。有时候,我们会需要执行一系列相关的任务,使用他们的 完成时,而不关心他们执行的顺序。例如,写一个爬虫,我们可能会针对页面内所有链接发送一些异步请求然后把这些相应加入到一个队列里面,按照我们的需求来处理。

协程允许我们编写有序执行的异步代码,但是执行相关的任务然后处理他们的结果,可能是全部统一处理,也可能是单独逐一处理每一个完成的任务,比如可能用到回调,可能看起来会更好。然而,Python 3.4 的 asyncio 模块恰好自带了两个方法符合这两种场景,就是 asyncio.as_completedasyncio.gather 这两个方法。

我们来看两个简单的载入 URL 的例子,首先,应用 asyncio.as_completed 依次处理每一个完成的请求,然后,应用 asyncio.gather 等待所有全部完成之后统一处理。我们用随机暂停几秒的方法来代替真正的发送请求。这是代码:

import asyncio
import random


@asyncio.coroutine
def get_url(url):
    wait_time = random.randint(1, 4)
    yield from asyncio.sleep(wait_time)
    print('Done: URL {} took {}s to get!'.format(url, wait_time))
    return url, wait_time


@asyncio.coroutine
def process_as_results_come_in():
    coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
    for coroutine in asyncio.as_completed(coroutines):
        url, wait_time = yield from coroutine
        print('Coroutine for {} is done'.format(url))


@asyncio.coroutine
def process_once_everything_ready():
    coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
    results = yield from asyncio.gather(*coroutines)
    print(results)


def main():
    loop = asyncio.get_event_loop()
    print("First, process results as they come in:")
    loop.run_until_complete(process_as_results_come_in())
    print("\nNow, process results once they are all ready:")
    loop.run_until_complete(process_once_everything_ready())


if __name__ == '__main__':
    main()

输出:

$ python3.4 gather.py
First, process results as they come in:
Done: URL URL2 took 2s to get!
Coroutine for URL2 is done
Done: URL URL3 took 3s to get!
Coroutine for URL3 is done
Done: URL URL1 took 4s to get!
Coroutine for URL1 is done

Now, process results once they are all ready:
Done: URL URL1 took 1s to get!
Done: URL URL2 took 3s to get!
Done: URL URL3 took 4s to get!
[('URL1', 1), ('URL2', 3), ('URL3', 4)]

深入挖掘

还有很多我没有提到的,举个例子Futureslibuv。还有 Guido’s talk on asynchronous IO in Python 3.4。还有好多内容我忘记提到所以欢迎在评论里面提出推荐。

comments powered by Disqus