Python异步编程与事件循环
2026/3/20大约 11 分钟
Python 异步编程与事件循环
提示
Python 的 asyncio 模块提供了强大的异步编程支持。本文将深入剖析 Python 的事件循环机制、协程原理,以及如何构建高性能的异步应用。
Python 异步编程的演进
发展历程
从生成器到协程
# Python 2.x / 早期 Python 3 - 基于生成器的协程
@asyncio.coroutine
def old_style_coroutine():
yield from asyncio.sleep(1)
return "done"
# Python 3.5+ - 原生协程
async def native_coroutine():
await asyncio.sleep(1)
return "done"
asyncio 核心概念
架构概览
核心组件
| 组件 | 说明 | 示例 |
|---|---|---|
| Coroutine | 协程函数,使用 async def 定义 | async def fetch() |
| Task | 协程的包装器,可以被调度 | asyncio.create_task() |
| Future | 表示异步操作的最终结果 | asyncio.Future() |
| Event Loop | 事件循环,调度和执行协程 | asyncio.get_event_loop() |
事件循环详解
事件循环的工作原理
事件循环的生命周期
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
# Python 3.7+ 推荐方式
asyncio.run(main())
# 等价于以下代码
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close()
事件循环方法详解
import asyncio
async def example():
loop = asyncio.get_running_loop()
# 调度回调
loop.call_soon(callback, arg1, arg2) # 尽快执行
loop.call_later(delay, callback, arg1) # 延迟执行
loop.call_at(when, callback, arg1) # 指定时间执行
# 创建Future和Task
future = loop.create_future() # 创建Future
task = loop.create_task(some_coroutine()) # 创建Task
# 运行阻塞代码
result = await loop.run_in_executor(
None, # 使用默认线程池
blocking_function, # 阻塞函数
arg1, arg2 # 参数
)
# 网络操作
reader, writer = await asyncio.open_connection(
'example.com', 80
)
协程深入理解
协程 vs 普通函数
# 普通函数 - 调用立即执行
def regular_function():
return "result"
result = regular_function() # 直接返回 "result"
# 协程函数 - 调用返回协程对象
async def coroutine_function():
return "result"
coro = coroutine_function() # 返回协程对象,未执行
# 必须通过await或事件循环来执行
result = await coro # 或 asyncio.run(coro)
协程的三种状态
await 的本质
async def fetch_data():
# await 做了什么?
# 1. 暂停当前协程的执行
# 2. 将控制权交还给事件循环
# 3. 等待右侧的协程/Future完成
# 4. 获取结果并恢复执行
result = await some_async_operation()
# 底层相当于:
# future = some_async_operation()
# yield from future # 挂起
# result = future.result() # 恢复后获取结果
return result
Task 与 Future
Task 的创建和管理
import asyncio
async def task_demo():
# 创建任务的几种方式
# 方式1:asyncio.create_task() (推荐)
task1 = asyncio.create_task(some_coro())
# 方式2:asyncio.ensure_future()
task2 = asyncio.ensure_future(some_coro())
# 方式3:loop.create_task()
loop = asyncio.get_running_loop()
task3 = loop.create_task(some_coro())
# 等待任务完成
result = await task1
# 取消任务
task2.cancel()
# 检查任务状态
if task1.done():
print("Task completed")
if task2.cancelled():
print("Task was cancelled")
async def some_coro():
await asyncio.sleep(1)
return "done"
并发执行多个任务
import asyncio
async def fetch(url: str) -> str:
print(f"Fetching {url}")
await asyncio.sleep(1) # 模拟网络请求
return f"Data from {url}"
async def main():
urls = [
"http://example.com/1",
"http://example.com/2",
"http://example.com/3",
]
# 方式1:asyncio.gather() - 等待所有完成
results = await asyncio.gather(
*[fetch(url) for url in urls]
)
print(results)
# 方式2:asyncio.wait() - 更灵活的控制
tasks = [asyncio.create_task(fetch(url)) for url in urls]
done, pending = await asyncio.wait(
tasks,
timeout=2.0,
return_when=asyncio.FIRST_COMPLETED
)
# 方式3:asyncio.as_completed() - 按完成顺序处理
for coro in asyncio.as_completed([fetch(url) for url in urls]):
result = await coro
print(f"Got: {result}")
asyncio.run(main())
Future 详解
import asyncio
async def future_demo():
loop = asyncio.get_running_loop()
# 创建一个Future
future = loop.create_future()
# Future的状态
print(future.done()) # False - 尚未完成
print(future.cancelled()) # False - 未被取消
# 设置结果(通常由其他协程或回调设置)
async def set_result_later():
await asyncio.sleep(1)
future.set_result("Hello, Future!")
asyncio.create_task(set_result_later())
# 等待Future完成
result = await future
print(result) # "Hello, Future!"
# 实际应用:包装回调式API
async def callback_to_async(callback_api):
loop = asyncio.get_running_loop()
future = loop.create_future()
def callback(result):
loop.call_soon_threadsafe(
future.set_result, result
)
callback_api(callback) # 调用回调式API
return await future # 等待结果
异步上下文管理器和迭代器
异步上下文管理器
import asyncio
import aiohttp
# 使用 async with
async def fetch_with_session():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com') as response:
return await response.json()
# 自定义异步上下文管理器
class AsyncResource:
async def __aenter__(self):
print("Acquiring resource")
await asyncio.sleep(0.1) # 模拟异步获取
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing resource")
await asyncio.sleep(0.1) # 模拟异步释放
return False
async def main():
async with AsyncResource() as resource:
print("Using resource")
asyncio.run(main())
异步迭代器
import asyncio
# 自定义异步迭代器
class AsyncRange:
def __init__(self, start: int, end: int):
self.start = start
self.end = end
self.current = start
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
await asyncio.sleep(0.1) # 模拟异步操作
value = self.current
self.current += 1
return value
# 使用 async for
async def main():
async for i in AsyncRange(0, 5):
print(i)
asyncio.run(main())
# 异步生成器 (Python 3.6+)
async def async_generator():
for i in range(5):
await asyncio.sleep(0.1)
yield i
async def main():
async for value in async_generator():
print(value)
异步同步原语
asyncio 提供的同步工具
使用示例
import asyncio
# Lock - 互斥锁
async def lock_demo():
lock = asyncio.Lock()
async def critical_section(name):
async with lock: # 自动获取和释放
print(f"{name} acquired lock")
await asyncio.sleep(1)
print(f"{name} releasing lock")
await asyncio.gather(
critical_section("Task 1"),
critical_section("Task 2"),
)
# Semaphore - 限制并发数
async def semaphore_demo():
sem = asyncio.Semaphore(3) # 最多3个并发
async def limited_task(n):
async with sem:
print(f"Task {n} running")
await asyncio.sleep(1)
print(f"Task {n} done")
await asyncio.gather(
*[limited_task(i) for i in range(10)]
)
# Queue - 生产者消费者模式
async def queue_demo():
queue = asyncio.Queue(maxsize=10)
async def producer():
for i in range(20):
await queue.put(i)
print(f"Produced: {i}")
await asyncio.sleep(0.1)
async def consumer(name):
while True:
item = await queue.get()
print(f"{name} consumed: {item}")
queue.task_done()
await asyncio.sleep(0.2)
# 启动消费者
consumers = [
asyncio.create_task(consumer(f"Consumer-{i}"))
for i in range(3)
]
# 运行生产者
await producer()
# 等待队列处理完成
await queue.join()
# 取消消费者
for c in consumers:
c.cancel()
与同步代码集成
在异步中运行同步代码
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def blocking_io():
"""模拟阻塞I/O操作"""
time.sleep(1)
return "IO Result"
def cpu_intensive():
"""模拟CPU密集型操作"""
return sum(i * i for i in range(10**6))
async def main():
loop = asyncio.get_running_loop()
# 使用默认线程池执行阻塞I/O
result = await loop.run_in_executor(None, blocking_io)
print(f"IO result: {result}")
# 使用自定义线程池
with ThreadPoolExecutor(max_workers=4) as pool:
result = await loop.run_in_executor(pool, blocking_io)
# 使用进程池执行CPU密集型任务
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_intensive)
print(f"CPU result: {result}")
asyncio.run(main())
在同步代码中运行异步
import asyncio
async def async_function():
await asyncio.sleep(1)
return "async result"
# 方式1:asyncio.run() (Python 3.7+)
result = asyncio.run(async_function())
# 方式2:在已存在的事件循环中
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(async_function())
finally:
loop.close()
# 方式3:使用 nest_asyncio(解决嵌套事件循环问题)
# pip install nest_asyncio
import nest_asyncio
nest_asyncio.apply()
# 现在可以在已运行的事件循环中再次调用 asyncio.run()
高性能异步框架
uvloop - 高性能事件循环
# pip install uvloop
import asyncio
import uvloop
# 设置uvloop作为事件循环策略
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 或者在Python 3.11+
# asyncio.run(main(), loop_factory=uvloop.new_event_loop)
async def main():
# 使用uvloop,性能提升2-4倍
await asyncio.sleep(1)
print("Hello with uvloop!")
asyncio.run(main())
性能对比
实战:异步 HTTP 客户端
import asyncio
import aiohttp
from typing import List, Dict
async def fetch_url(
session: aiohttp.ClientSession,
url: str
) -> Dict:
"""获取单个URL"""
try:
async with session.get(url, timeout=10) as response:
return {
"url": url,
"status": response.status,
"length": len(await response.text())
}
except Exception as e:
return {
"url": url,
"error": str(e)
}
async def fetch_all(urls: List[str], concurrency: int = 10) -> List[Dict]:
"""并发获取多个URL,限制并发数"""
semaphore = asyncio.Semaphore(concurrency)
async def fetch_with_limit(session: aiohttp.ClientSession, url: str):
async with semaphore:
return await fetch_url(session, url)
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://www.google.com",
"https://www.github.com",
"https://www.python.org",
"https://www.stackoverflow.com",
]
results = await fetch_all(urls, concurrency=3)
for result in results:
if "error" in result:
print(f"❌ {result['url']}: {result['error']}")
else:
print(f"✓ {result['url']}: {result['status']} ({result['length']} bytes)")
if __name__ == "__main__":
asyncio.run(main())
调试与监控
开启调试模式
import asyncio
# 方式1:环境变量
# PYTHONASYNCIODEBUG=1 python script.py
# 方式2:代码设置
asyncio.run(main(), debug=True)
# 方式3:设置循环
loop = asyncio.new_event_loop()
loop.set_debug(True)
调试技巧
import asyncio
import warnings
# 检测未await的协程
warnings.filterwarnings("error", category=RuntimeWarning)
# 获取所有任务
async def show_all_tasks():
for task in asyncio.all_tasks():
print(f"Task: {task.get_name()}")
print(f" Coroutine: {task.get_coro()}")
print(f" Done: {task.done()}")
print(f" Stack: {task.get_stack()}")
# 任务超时检测
async def with_timeout():
try:
await asyncio.wait_for(
long_running_task(),
timeout=5.0
)
except asyncio.TimeoutError:
print("Task timed out!")
# 取消处理
async def cancellable_task():
try:
await asyncio.sleep(100)
except asyncio.CancelledError:
print("Task was cancelled, cleaning up...")
raise # 重新抛出,让调用者知道任务被取消
常见陷阱与最佳实践
陷阱 1:忘记 await
# ❌ 错误:忘记await
async def wrong():
asyncio.sleep(1) # 没有效果!返回的协程未被执行
# ✅ 正确
async def correct():
await asyncio.sleep(1)
陷阱 2:阻塞事件循环
import time
# ❌ 错误:使用同步sleep阻塞整个事件循环
async def wrong():
time.sleep(1) # 阻塞!
# ✅ 正确:使用异步sleep
async def correct():
await asyncio.sleep(1)
# ✅ 对于必须的阻塞操作,使用执行器
async def with_blocking():
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, time.sleep, 1)
陷阱 3:在错误位置创建任务
# ❌ 问题:任务可能在函数返回前就被垃圾回收
async def wrong():
asyncio.create_task(background_task())
return "done" # background_task可能不会完成
# ✅ 正确:保持对任务的引用
background_tasks = set()
async def correct():
task = asyncio.create_task(background_task())
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
return "done"
最佳实践总结
本章小结
核心要点
- asyncio 是 Python 的异步框架:提供事件循环、协程、任务等抽象
- 协程是轻量级的:可以创建大量协程而不会消耗太多资源
- await 是让出控制权的点:理解 await 的本质是掌握 asyncio 的关键
- 与同步代码集成:使用 run_in_executor 处理阻塞操作