我来详细解释Python异步编程中的核心概念,让你彻底理解async/await的工作原理。
1. 基本概念
async
async关键字用于定义异步函数(协程函数):
1 2 3 4 5
| async def my_function(): return "Hello"
coroutine = my_function()
|
await
await用于挂起当前协程,等待另一个协程完成:
1 2 3
| async def main(): result = await my_function() print(result)
|
2. Future对象
Future代表一个异步操作的最终结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import asyncio
async def example_future(): future = asyncio.Future() future.set_result("Future completed!") result = await future print(result)
asyncio.run(example_future())
|
3. 实际应用示例
基本异步操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import asyncio import time
async def say_after(delay, message): await asyncio.sleep(delay) return message
async def main(): print(f"开始时间: {time.strftime('%X')}") result1 = await say_after(1, "Hello") result2 = await say_after(2, "World") print(result1, result2) print(f"结束时间: {time.strftime('%X')}")
asyncio.run(main())
|
并发执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| async def main_concurrent(): print(f"开始时间: {time.strftime('%X')}") task1 = asyncio.create_task(say_after(1, "Hello")) task2 = asyncio.create_task(say_after(2, "World")) result1 = await task1 result2 = await task2 print(result1, result2) print(f"结束时间: {time.strftime('%X')}")
asyncio.run(main_concurrent())
|
使用asyncio.gather
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| async def main_gather(): print(f"开始时间: {time.strftime('%X')}") results = await asyncio.gather( say_after(1, "Hello"), say_after(2, "World"), say_after(1.5, "Async") ) print(results) print(f"结束时间: {time.strftime('%X')}")
asyncio.run(main_gather())
|
4. 高级用法
自定义Future
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| async def set_after_delay(future, delay, value): await asyncio.sleep(delay) future.set_result(value)
async def custom_future_example(): loop = asyncio.get_running_loop() future = loop.create_future() asyncio.create_task(set_after_delay(future, 2, "Future result")) print("等待future...") result = await future print(f"收到结果: {result}")
asyncio.run(custom_future_example())
|
超时处理
1 2 3 4 5 6 7 8 9 10 11 12 13
| async def might_timeout(): await asyncio.sleep(5) return "Success"
async def with_timeout(): try: result = await asyncio.wait_for(might_timeout(), timeout=3.0) print(result) except asyncio.TimeoutError: print("操作超时!")
asyncio.run(with_timeout())
|
事件循环控制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| async def event_loop_example(): loop = asyncio.get_running_loop() future = loop.create_future() def callback(): print("回调执行") future.set_result("回调完成") loop.call_soon(callback) result = await future print(f"结果: {result}")
asyncio.run(event_loop_example())
|
5. 实际应用场景
网络请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import aiohttp import asyncio
async def fetch_url(session, url): async with session.get(url) as response: return await response.text()
async def multiple_requests(): async with aiohttp.ClientSession() as session: urls = [ 'https://httpbin.org/delay/1', 'https://httpbin.org/delay/2', 'https://httpbin.org/delay/1' ] tasks = [fetch_url(session, url) for url in urls] results = await asyncio.gather(*tasks) for i, result in enumerate(results): print(f"请求 {i+1} 完成,长度: {len(result)}")
asyncio.run(multiple_requests())
|
数据库操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import asyncpg import asyncio
async def database_example(): conn = await asyncpg.connect('postgresql://user:pass@localhost/db') try: user_query = conn.fetch("SELECT * FROM users WHERE id = $1", 1) post_query = conn.fetch("SELECT * FROM posts WHERE user_id = $1", 1) user, posts = await asyncio.gather(user_query, post_query) print(f"用户: {user}, 文章数: {len(posts)}") finally: await conn.close()
|
关键要点总结
- async def:定义协程函数
- await:挂起当前协程,等待其他操作完成
- Future:代表异步操作的最终结果
- Task:包装协程,用于并发执行
- 事件循环:管理和调度所有异步任务
异步编程的核心思想是:在等待I/O操作时让出控制权,而不是阻塞线程,从而实现高效的并发处理。