协奏曲

协程是在 Tornado 中编写异步代码的推荐方法。协程使用 yieldawait 关键字来挂起和恢复执行,而不是回调链(gevent 等框架中的协作轻量级线程有时也称为协程,但在 Tornado 中,所有协程都使用显式上下文切换并称为异步函数)。

协程没有线程的开销,它们还通过减少可能发生上下文切换的位置数量,使并发性更容易推理。

1
2
3
4
async def fetch_coroutine(url):
client = AsyncHTTPClient()
resp = await client.fetch(url)
return resp.body

本机 <-> 装饰 coroutines

Python 3.5(或更高版本)使用 yieldawait 来构建本机协程。使用旧版本的 Python,您可以使用装饰协程或基于收益的协程。在龙卷风中, tornado.gen.coroutine 提供它,

您可以通过以下两种方法使用协程:

1
2
3
4
@gen.coroutine
def fun():
data = yield fun_wait()
raise gen.Return(data)
1
2
3
async def func():
data = await func_wait()
return data

两种形式的协程的区别:

  • 原生协程通常更快。
  • 本机协程可以使用 async forasync with 语句,这使得某些模式更加简单。
  • 本机协程根本不会运行,除非您 awaityield 它们。装饰协程一旦被调用就可以开始在后台运行。这两种协程对于使用 awaityield 都很重要,这样任何异常都有地方可去。
  • 装饰协程与 coroutine.future 包进行了额外的集成,允许直接生成 executor.submit 的结果。对于本机协程,请使用 tornado.gen.convert_yielded
  • 装饰协程总是返回一个 Future 对象。本机协程返回一个可等待的对象。

它是如何工作的

本机协程在概念上类似,但由于与 Python 运行时的额外集成而稍微复杂一些。

包含yield的函数是生成器。 所有生成器都是异步的,调用时它们返回一个生成器对象,而不是运行到完成。 @gen.coroutine 装饰器通过yield 表达式与生成器通信,并通过返回 Future 与协程的调用者通信。

这是协程装饰器版本的示例:

1
2
3
4
5
6
7
8
# Inner code in tornado.gen.Runner
def run(self):
# send(x) makes the current_yield return x,
# It returns when the next yield is reached.
def callback(f):
self.next = f.result()
self.run()
future.add_done_callback(callback)

装饰器从生成器接收一个 Future,等待(无阻塞)该 Future 完成,然后 unwarps Future 并将结果发送回生成器作为yield 表达式的结果。

如何调用协程

协程不会以正常方式引发异常:它们引发的任何异常都将被捕获在生成的可等待对象中。这意味着以正确的方式调用协程非常重要,否则您可能会出现未被注意到的错误:

1
2
3
4
5
6
async def divide(x, y):
return x/y

def bad_call():
# This would raise a ZeroDivisionError, but it won't because the coroutine is call incorrectly.
divide(1, 0)

几乎在所有情况下,任何调用协程的函数本身都必须是协程,并在调用中使用await 或yield。当您重写超类中定义的方法时,请查阅文档以查看是否允许协程(文档应该说明该方法可能是协程或可能返回 Future

1
2
3
async def good_call():
# await will unwrap the object returned by divide() and raise the exception
await divide(1, 0)

有时,您可能想要“解雇并忘记”协程而不等待其结果。在这种情况下,建议使用 IOLoop.spawn_callback,这使得 IOLOOP 负责调用。如果失败,IOLoop 将记录堆栈跟踪。

1
2
# The IOLoop will catch the exception and print a stack trace in the logs. Note that this doesn't look like a normal call, since we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)

对于使用 @gen.coroutines 的函数,建议以这种方式使用 IOLoop.spawn_callback,但对于使用 async def 的函数来说,这是必需的(否则协程运行器将无法启动)。

最后,在程序的顶层,如果 IOLoop 尚未运行,您可以启动 IOLoop,运行协程,然后使用 IOLoop.run_sync 方法停止 IOLoop。这通常用于启动面向批处理的程序的主要功能:

1
2
# run_sync() dosen't take arguments, so we must wrap the call in a lambda.
IOLoop.current().run_sync(lambda: divide(0, 1))

协程模式

调用阻塞函数

从协程调用阻塞函数的最简单方法是使用 IOLoop.run_in_executor,它返回 Future 对象。

1
2
async def call_blocking():
await IOLoop.current().run_in_executor(blocking_func, args)

并行性

mutli 函数接受值为 Future 的列表和字典,并并行等待所有这些 Future:

1
2
3
4
5
6
7
8
9
from tornado.gen import mutil

async def parallel_fetch(url1, url2):
resp1, resp2 = await mutil([
http_client.fetch(url1), http_client.fetch(url2)
])

async def parallel_fetch_many(urls):
respones = await mutil([http_client.fetch(url) for url in urls])

在装饰协程中,您应该在列表或字典中使用yield。

1
2
3
4
5
6
@gen.coroutine
def parallel_fetch_decorated(url1, url2):
resp1, resp2 = yield [
http_client.fetch(url1),
http_client.fetch(url2)
]

交错

有时,保存 Future 对象而不是立即生成它很有用,因此您可以在等待之前启动另一个操作。

1
2
3
4
5
6
7
8
9
10
11
12
from tornado.gen import convert_yielded

async def get(self):
# convert_yielded() starts the native coroutine in the background.
# This is equivalent to asyncio.ensure_future().
fetch_future = convert_yielded(self.fetch_next_chunk())
while True:
chunk = yield fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = self.fetch_next_chunk()
yield self.flush()

循环

在原生协程中,可以使用 async for 。在旧版本的 Python 中,协程的循环很棘手,因为无法在 for 或 while 循环的每次迭代中产生并捕获产生的结果。相反,您需要将循环条件与结果访问分开。

1
2
3
4
5
6
7
8
import motor
db = motor.MotorClient().test

@gen.coroutine
def loop_example(collection):
cursor = db.connection.find()
while (yield cursor.fetch_next()):
doc = cursor.next_object()

后台运行

periodiccallback 通常不与协程一起使用。相反,协程可以包含 while Ture 循环并使用 tornado.gen.sleep

1
2
3
4
5
6
7
async def minute_loop():
while True:
await do_something()
await gen.sleep(60)

# Coroutines that loop forever are generally started with spawn_callback()
IOLoop.current().spwan_callback(minute_loop)

可能需要更复杂的循环。例如,前面的循环每 60+X 秒运行一次,其中 X 是 do_something() 的运行次数。要恰好每 60 秒运行一次,您可以使用上面的交错模式:

1
2
3
4
5
async def minute_loop():
while True:
nxt = gen.sleep(60)
await do_something()
await nxt