异步编程与性能优化
2026/3/20大约 9 分钟
异步编程与性能优化
第一章:异步编程基础
Python 异步编程概述
FastAPI 基于 ASGI(Asynchronous Server Gateway Interface)构建,原生支持异步编程。理解 Python 的 asyncio 对于充分发挥 FastAPI 性能至关重要。
async/await 基础
import asyncio
from fastapi import FastAPI
app = FastAPI()
# 同步函数
def sync_task():
import time
time.sleep(1)
return "sync result"
# 异步函数
async def async_task():
await asyncio.sleep(1)
return "async result"
# 异步端点(推荐)
@app.get("/async")
async def async_endpoint():
result = await async_task()
return {"result": result}
# 同步端点(FastAPI 会在线程池中运行)
@app.get("/sync")
def sync_endpoint():
result = sync_task()
return {"result": result}
并发执行
import asyncio
from fastapi import FastAPI
app = FastAPI()
async def fetch_user(user_id: int):
await asyncio.sleep(0.5) # 模拟数据库查询
return {"id": user_id, "name": f"User {user_id}"}
async def fetch_orders(user_id: int):
await asyncio.sleep(0.5) # 模拟数据库查询
return [{"order_id": i} for i in range(3)]
async def fetch_profile(user_id: int):
await asyncio.sleep(0.5) # 模拟数据库查询
return {"bio": "User bio", "avatar": "avatar.jpg"}
# 串行执行(慢)
@app.get("/user/{user_id}/serial")
async def get_user_serial(user_id: int):
user = await fetch_user(user_id) # 0.5s
orders = await fetch_orders(user_id) # 0.5s
profile = await fetch_profile(user_id) # 0.5s
# 总耗时约 1.5s
return {"user": user, "orders": orders, "profile": profile}
# 并发执行(快)
@app.get("/user/{user_id}/concurrent")
async def get_user_concurrent(user_id: int):
user, orders, profile = await asyncio.gather(
fetch_user(user_id),
fetch_orders(user_id),
fetch_profile(user_id)
)
# 总耗时约 0.5s
return {"user": user, "orders": orders, "profile": profile}
# 带错误处理的并发
@app.get("/user/{user_id}/safe-concurrent")
async def get_user_safe_concurrent(user_id: int):
results = await asyncio.gather(
fetch_user(user_id),
fetch_orders(user_id),
fetch_profile(user_id),
return_exceptions=True # 异常不会中断其他任务
)
user, orders, profile = results
response = {}
if not isinstance(user, Exception):
response["user"] = user
if not isinstance(orders, Exception):
response["orders"] = orders
if not isinstance(profile, Exception):
response["profile"] = profile
return response
任务超时
import asyncio
from fastapi import FastAPI, HTTPException
app = FastAPI()
async def slow_operation():
await asyncio.sleep(10)
return "done"
@app.get("/with-timeout")
async def with_timeout():
try:
result = await asyncio.wait_for(
slow_operation(),
timeout=5.0 # 5秒超时
)
return {"result": result}
except asyncio.TimeoutError:
raise HTTPException(status_code=408, detail="Operation timed out")
# 带超时的多任务
@app.get("/multi-timeout")
async def multi_timeout():
tasks = [
asyncio.create_task(fetch_user(1)),
asyncio.create_task(fetch_orders(1)),
]
done, pending = await asyncio.wait(
tasks,
timeout=2.0,
return_when=asyncio.ALL_COMPLETED
)
# 取消未完成的任务
for task in pending:
task.cancel()
results = [task.result() for task in done]
return {"results": results, "pending_count": len(pending)}
第二章:后台任务
FastAPI 后台任务
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel, EmailStr
import asyncio
app = FastAPI()
class EmailSchema(BaseModel):
email: EmailStr
subject: str
body: str
# 后台任务函数
async def send_email(email: str, subject: str, body: str):
await asyncio.sleep(2) # 模拟发送邮件
print(f"Email sent to {email}: {subject}")
def write_log(message: str):
with open("log.txt", "a") as f:
f.write(f"{message}\n")
# 使用后台任务
@app.post("/send-notification")
async def send_notification(
email: EmailSchema,
background_tasks: BackgroundTasks
):
# 添加后台任务
background_tasks.add_task(
send_email,
email.email,
email.subject,
email.body
)
background_tasks.add_task(write_log, f"Notification sent to {email.email}")
# 立即返回响应
return {"message": "Notification will be sent in the background"}
# 依赖注入中使用后台任务
async def common_background_tasks(background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, "Request processed")
@app.get("/items/", dependencies=[Depends(common_background_tasks)])
async def read_items():
return {"items": []}
使用 Celery 处理复杂任务
# tasks.py
from celery import Celery
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery_app.task
def process_data(data: dict):
# 耗时操作
import time
time.sleep(10)
return {"processed": data}
@celery_app.task
def send_batch_emails(emails: list):
for email in emails:
# 发送邮件
pass
return {"sent": len(emails)}
# main.py
from fastapi import FastAPI
from tasks import process_data, send_batch_emails
app = FastAPI()
@app.post("/process")
async def process(data: dict):
# 异步提交任务
task = process_data.delay(data)
return {"task_id": task.id}
@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
task = process_data.AsyncResult(task_id)
if task.ready():
return {"status": "completed", "result": task.result}
return {"status": "pending"}
使用 ARQ 异步任务队列
# worker.py
from arq import create_pool
from arq.connections import RedisSettings
async def process_task(ctx, data: dict):
"""异步任务"""
import asyncio
await asyncio.sleep(5)
return {"processed": data}
async def send_email_task(ctx, to: str, subject: str):
"""发送邮件任务"""
# 异步发送邮件
return {"sent_to": to}
class WorkerSettings:
functions = [process_task, send_email_task]
redis_settings = RedisSettings()
# main.py
from fastapi import FastAPI
from arq import create_pool
from arq.connections import RedisSettings
app = FastAPI()
redis_pool = None
@app.on_event("startup")
async def startup():
global redis_pool
redis_pool = await create_pool(RedisSettings())
@app.post("/enqueue")
async def enqueue_task(data: dict):
job = await redis_pool.enqueue_job("process_task", data)
return {"job_id": job.job_id}
@app.get("/job/{job_id}")
async def get_job(job_id: str):
job = await redis_pool.job(job_id)
status = await job.status()
result = await job.result(timeout=0)
return {"status": status, "result": result}
第三章:缓存策略
内存缓存
from fastapi import FastAPI, Depends
from functools import lru_cache
from cachetools import TTLCache
import asyncio
app = FastAPI()
# 使用 lru_cache 缓存配置
@lru_cache()
def get_settings():
return {"db_url": "postgresql://...", "secret": "..."}
# 使用 TTLCache 实现带过期时间的缓存
cache = TTLCache(maxsize=100, ttl=300) # 5分钟过期
async def get_user_from_db(user_id: int):
await asyncio.sleep(0.5) # 模拟数据库查询
return {"id": user_id, "name": f"User {user_id}"}
@app.get("/users/{user_id}")
async def get_user(user_id: int):
cache_key = f"user:{user_id}"
# 检查缓存
if cache_key in cache:
return {"source": "cache", "data": cache[cache_key]}
# 从数据库获取
user = await get_user_from_db(user_id)
# 存入缓存
cache[cache_key] = user
return {"source": "database", "data": user}
Redis 缓存
import redis.asyncio as redis
from fastapi import FastAPI, Depends
import json
from typing import Optional
app = FastAPI()
class RedisCache:
def __init__(self):
self.redis = None
async def init(self):
self.redis = await redis.from_url(
"redis://localhost:6379",
encoding="utf-8",
decode_responses=True
)
async def get(self, key: str) -> Optional[dict]:
data = await self.redis.get(key)
if data:
return json.loads(data)
return None
async def set(self, key: str, value: dict, expire: int = 300):
await self.redis.setex(key, expire, json.dumps(value))
async def delete(self, key: str):
await self.redis.delete(key)
async def close(self):
await self.redis.close()
cache = RedisCache()
@app.on_event("startup")
async def startup():
await cache.init()
@app.on_event("shutdown")
async def shutdown():
await cache.close()
# 缓存装饰器
def cached(prefix: str, expire: int = 300):
def decorator(func):
async def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = f"{prefix}:{':'.join(str(v) for v in args)}"
# 尝试从缓存获取
cached_data = await cache.get(cache_key)
if cached_data:
return cached_data
# 执行函数
result = await func(*args, **kwargs)
# 存入缓存
await cache.set(cache_key, result, expire)
return result
return wrapper
return decorator
@cached("user", expire=600)
async def get_user_cached(user_id: int):
# 数据库查询
return {"id": user_id, "name": f"User {user_id}"}
@app.get("/users/{user_id}")
async def get_user(user_id: int):
return await get_user_cached(user_id)
# 缓存失效
@app.put("/users/{user_id}")
async def update_user(user_id: int, name: str):
# 更新数据库...
# 删除缓存
await cache.delete(f"user:{user_id}")
return {"id": user_id, "name": name}
HTTP 缓存
from fastapi import FastAPI, Response
from datetime import datetime, timedelta
app = FastAPI()
@app.get("/static-data")
async def static_data(response: Response):
# 设置缓存控制头
response.headers["Cache-Control"] = "public, max-age=3600" # 1小时
response.headers["ETag"] = '"abc123"'
return {"data": "This is static data"}
@app.get("/data/{item_id}")
async def get_data(item_id: int, response: Response):
# 私有缓存(仅客户端)
response.headers["Cache-Control"] = "private, max-age=60"
return {"item_id": item_id, "value": "some value"}
# 条件请求
from fastapi import Header, HTTPException
@app.get("/conditional")
async def conditional_get(
if_none_match: str = Header(None),
response: Response = None
):
current_etag = '"version123"'
if if_none_match == current_etag:
raise HTTPException(status_code=304) # Not Modified
response.headers["ETag"] = current_etag
return {"data": "Full response"}
第四章:数据库性能优化
连接池配置
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
# 优化的连接池配置
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=20, # 连接池大小
max_overflow=10, # 超出 pool_size 后最多创建的连接数
pool_timeout=30, # 获取连接超时时间
pool_recycle=1800, # 连接回收时间(30分钟)
pool_pre_ping=True, # 每次使用前检查连接可用性
echo=False, # 生产环境关闭 SQL 日志
echo_pool=False, # 关闭连接池日志
)
查询优化
from sqlalchemy import select, func
from sqlalchemy.orm import selectinload, load_only
from sqlalchemy.ext.asyncio import AsyncSession
# 只查询需要的列
async def get_user_names(db: AsyncSession):
result = await db.execute(
select(User.id, User.name) # 只选择特定列
)
return result.all()
# 使用 load_only 限制加载的列
async def get_users_partial(db: AsyncSession):
result = await db.execute(
select(User).options(
load_only(User.id, User.name, User.email)
)
)
return result.scalars().all()
# 批量查询优化
async def get_users_by_ids(db: AsyncSession, user_ids: list[int]):
result = await db.execute(
select(User).where(User.id.in_(user_ids))
)
return result.scalars().all()
# 分页查询
async def get_users_paginated(
db: AsyncSession,
page: int = 1,
size: int = 20
):
# 使用 offset/limit
offset = (page - 1) * size
result = await db.execute(
select(User)
.order_by(User.id)
.offset(offset)
.limit(size)
)
return result.scalars().all()
# 游标分页(大数据量更高效)
async def get_users_cursor(
db: AsyncSession,
cursor: int = 0,
size: int = 20
):
result = await db.execute(
select(User)
.where(User.id > cursor)
.order_by(User.id)
.limit(size)
)
users = result.scalars().all()
next_cursor = users[-1].id if users else None
return {"users": users, "next_cursor": next_cursor}
# 统计查询优化
async def get_user_count(db: AsyncSession):
# 使用 count 而不是 len(all())
result = await db.execute(
select(func.count()).select_from(User)
)
return result.scalar()
批量操作
from sqlalchemy import insert, update, delete
# 批量插入
async def bulk_insert_users(db: AsyncSession, users_data: list[dict]):
await db.execute(insert(User), users_data)
await db.commit()
# 批量更新
async def bulk_update_status(
db: AsyncSession,
user_ids: list[int],
is_active: bool
):
await db.execute(
update(User)
.where(User.id.in_(user_ids))
.values(is_active=is_active)
)
await db.commit()
# 批量删除
async def bulk_delete_users(db: AsyncSession, user_ids: list[int]):
await db.execute(
delete(User).where(User.id.in_(user_ids))
)
await db.commit()
# 使用 executemany(更高效的批量插入)
async def insert_many_users(db: AsyncSession, users: list[User]):
db.add_all(users)
await db.commit()
第五章:应用性能优化
响应压缩
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware
app = FastAPI()
# 启用 GZip 压缩
app.add_middleware(GZipMiddleware, minimum_size=1000)
流式响应
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def generate_large_data():
"""生成大量数据的生成器"""
for i in range(1000):
yield f"data line {i}\n"
await asyncio.sleep(0.01)
@app.get("/stream")
async def stream_data():
return StreamingResponse(
generate_large_data(),
media_type="text/plain"
)
# SSE(Server-Sent Events)
async def event_generator():
while True:
data = await get_next_event()
yield f"data: {data}\n\n"
await asyncio.sleep(1)
@app.get("/events")
async def stream_events():
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
请求并发限制
import asyncio
from fastapi import FastAPI, HTTPException
app = FastAPI()
# 信号量限制并发
semaphore = asyncio.Semaphore(10) # 最多10个并发
async def limited_operation():
async with semaphore:
await asyncio.sleep(1)
return "done"
@app.get("/limited")
async def limited_endpoint():
return await limited_operation()
# 使用 asyncio.Queue 实现请求队列
request_queue = asyncio.Queue(maxsize=100)
@app.get("/queued")
async def queued_endpoint():
try:
await asyncio.wait_for(
request_queue.put("request"),
timeout=5.0
)
except asyncio.TimeoutError:
raise HTTPException(503, "Service temporarily unavailable")
try:
result = await process_request()
return result
finally:
await request_queue.get()
启动优化
from fastapi import FastAPI
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时执行
await init_database_pool()
await init_cache()
await warm_up_cache()
yield
# 关闭时执行
await close_database_pool()
await close_cache()
app = FastAPI(lifespan=lifespan)
# 预热缓存
async def warm_up_cache():
"""启动时预热常用数据到缓存"""
popular_items = await get_popular_items()
for item in popular_items:
await cache.set(f"item:{item.id}", item.dict())
第六章:监控与分析
性能指标收集
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
import time
from prometheus_client import Counter, Histogram, generate_latest
from starlette.responses import Response
app = FastAPI()
# Prometheus 指标
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"]
)
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"HTTP request latency",
["method", "endpoint"]
)
class MetricsMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
# 记录指标
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_LATENCY.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
return response
app.add_middleware(MetricsMiddleware)
@app.get("/metrics")
async def metrics():
return Response(
content=generate_latest(),
media_type="text/plain"
)
性能分析
import cProfile
import pstats
import io
from fastapi import FastAPI
app = FastAPI()
def profile_route(func):
"""性能分析装饰器"""
async def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
result = await func(*args, **kwargs)
profiler.disable()
# 输出分析结果
stream = io.StringIO()
stats = pstats.Stats(profiler, stream=stream)
stats.sort_stats("cumulative")
stats.print_stats(20)
print(stream.getvalue())
return result
return wrapper
@app.get("/profile-test")
@profile_route
async def profile_test():
# 要分析的代码
result = await some_complex_operation()
return result
慢查询日志
import logging
import time
from sqlalchemy import event
from sqlalchemy.engine import Engine
logger = logging.getLogger("slowquery")
SLOW_QUERY_THRESHOLD = 0.5 # 500ms
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
conn.info.setdefault("query_start_time", []).append(time.time())
@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total_time = time.time() - conn.info["query_start_time"].pop()
if total_time > SLOW_QUERY_THRESHOLD:
logger.warning(
f"Slow query ({total_time:.2f}s): {statement[:200]}..."
)
常见问题
Q1:什么时候使用 async def,什么时候使用 def?
# 使用 async def:
# - 调用异步函数(await)
# - I/O 密集型操作(数据库、HTTP请求、文件读写)
@app.get("/async")
async def async_endpoint():
data = await async_db_query() # 异步数据库查询
return data
# 使用 def:
# - CPU 密集型操作
# - 调用同步库
# FastAPI 会自动在线程池中运行
@app.get("/sync")
def sync_endpoint():
result = cpu_intensive_calculation() # CPU密集型
return result
Q2:如何处理阻塞操作?
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=10)
def blocking_io():
# 同步阻塞操作
import time
time.sleep(1)
return "done"
@app.get("/non-blocking")
async def non_blocking():
# 在线程池中运行阻塞操作
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, blocking_io)
return {"result": result}
Q3:如何优化大量小请求的场景?
# 1. 使用连接池
# 2. 启用 HTTP Keep-Alive
# 3. 使用批量 API
@app.post("/batch")
async def batch_operation(items: list[dict]):
"""批量处理多个操作"""
results = await asyncio.gather(*[
process_item(item) for item in items
])
return {"results": results}
学习资源
- Python asyncio 文档:https://docs.python.org/3/library/asyncio.html
- FastAPI 并发文档:https://fastapi.tiangolo.com/async/
- SQLAlchemy 异步文档:https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html
- Redis 异步库:https://redis-py.readthedocs.io/en/stable/examples/asyncio_examples.html