我来详细解释Python异步编程中的核心概念,让你彻底理解async/await的工作原理。

1. 基本概念

async

async关键字用于定义异步函数(协程函数):

1
2
3
4
5
async def my_function():
return "Hello"

# 调用异步函数会返回一个协程对象,而不是直接执行
coroutine = my_function()

await

await用于挂起当前协程,等待另一个协程完成:

1
2
3
async def main():
result = await my_function() # 等待my_function完成
print(result)

2. Future对象

Future代表一个异步操作的最终结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

async def example_future():
# 创建Future对象
future = asyncio.Future()

# 设置结果
future.set_result("Future completed!")

# 等待Future完成
result = await future
print(result) # 输出: Future completed!

# 运行
asyncio.run(example_future())

3. 实际应用示例

基本异步操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import time

async def say_after(delay, message):
await asyncio.sleep(delay)
return message

async def main():
print(f"开始时间: {time.strftime('%X')}")

# 顺序执行
result1 = await say_after(1, "Hello")
result2 = await say_after(2, "World")

print(result1, result2)
print(f"结束时间: {time.strftime('%X')}")

# 运行
asyncio.run(main())

并发执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def main_concurrent():
print(f"开始时间: {time.strftime('%X')}")

# 创建任务并发执行
task1 = asyncio.create_task(say_after(1, "Hello"))
task2 = asyncio.create_task(say_after(2, "World"))

# 等待所有任务完成
result1 = await task1
result2 = await task2

print(result1, result2)
print(f"结束时间: {time.strftime('%X')}")

asyncio.run(main_concurrent())

使用asyncio.gather

1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def main_gather():
print(f"开始时间: {time.strftime('%X')}")

# 同时等待多个协程
results = await asyncio.gather(
say_after(1, "Hello"),
say_after(2, "World"),
say_after(1.5, "Async")
)

print(results) # ['Hello', 'World', 'Async']
print(f"结束时间: {time.strftime('%X')}")

asyncio.run(main_gather())

4. 高级用法

自定义Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async def set_after_delay(future, delay, value):
await asyncio.sleep(delay)
future.set_result(value)

async def custom_future_example():
loop = asyncio.get_running_loop()
future = loop.create_future()

# 在后台设置future的结果
asyncio.create_task(set_after_delay(future, 2, "Future result"))

print("等待future...")
result = await future
print(f"收到结果: {result}")

asyncio.run(custom_future_example())

超时处理

1
2
3
4
5
6
7
8
9
10
11
12
13
async def might_timeout():
await asyncio.sleep(5)
return "Success"

async def with_timeout():
try:
# 设置超时
result = await asyncio.wait_for(might_timeout(), timeout=3.0)
print(result)
except asyncio.TimeoutError:
print("操作超时!")

asyncio.run(with_timeout())

事件循环控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
async def event_loop_example():
# 获取当前事件循环
loop = asyncio.get_running_loop()

# 在事件循环中安排回调
future = loop.create_future()

def callback():
print("回调执行")
future.set_result("回调完成")

# 立即执行回调
loop.call_soon(callback)

result = await future
print(f"结果: {result}")

asyncio.run(event_loop_example())

5. 实际应用场景

网络请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import aiohttp
import asyncio

async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()

async def multiple_requests():
async with aiohttp.ClientSession() as session:
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/1'
]

tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)

for i, result in enumerate(results):
print(f"请求 {i+1} 完成,长度: {len(result)}")

asyncio.run(multiple_requests())

数据库操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncpg
import asyncio

async def database_example():
# 连接数据库
conn = await asyncpg.connect('postgresql://user:pass@localhost/db')

try:
# 并发执行多个查询
user_query = conn.fetch("SELECT * FROM users WHERE id = $1", 1)
post_query = conn.fetch("SELECT * FROM posts WHERE user_id = $1", 1)

user, posts = await asyncio.gather(user_query, post_query)
print(f"用户: {user}, 文章数: {len(posts)}")
finally:
await conn.close()

关键要点总结

  1. async def:定义协程函数
  2. await:挂起当前协程,等待其他操作完成
  3. Future:代表异步操作的最终结果
  4. Task:包装协程,用于并发执行
  5. 事件循环:管理和调度所有异步任务

异步编程的核心思想是:在等待I/O操作时让出控制权,而不是阻塞线程,从而实现高效的并发处理。