事件循环实战应用
2026/3/20大约 11 分钟
事件循环实战应用
高并发 HTTP 服务器
基于 Node.js 的实现
const http = require("http");
const cluster = require("cluster");
const os = require("os");
// 请求处理逻辑
async function handleRequest(req, res) {
// 模拟异步数据库查询
const data = await simulateDBQuery(req.url);
res.writeHead(200, { "Content-Type": "application/json" });
res.end(
JSON.stringify({
pid: process.pid,
data: data,
})
);
}
function simulateDBQuery(url) {
return new Promise((resolve) => {
setTimeout(() => {
resolve({ url, timestamp: Date.now() });
}, 10);
});
}
if (cluster.isMaster) {
// 主进程:创建工作进程
const numCPUs = os.cpus().length;
console.log(`Master ${process.pid} starting ${numCPUs} workers`);
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on("exit", (worker) => {
console.log(`Worker ${worker.process.pid} died, restarting...`);
cluster.fork();
});
} else {
// 工作进程:创建HTTP服务器
const server = http.createServer(handleRequest);
server.listen(3000, () => {
console.log(`Worker ${process.pid} listening on port 3000`);
});
}
架构说明
基于 Python FastAPI 的实现
from fastapi import FastAPI
from contextlib import asynccontextmanager
import asyncio
import uvicorn
# 连接池模拟
class ConnectionPool:
def __init__(self, size: int = 10):
self.size = size
self.semaphore = asyncio.Semaphore(size)
self.connections = []
async def get_connection(self):
async with self.semaphore:
# 模拟获取连接
await asyncio.sleep(0.001)
return {"connection_id": id(self)}
pool = ConnectionPool(size=100)
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时的初始化
print("Server starting...")
yield
# 关闭时的清理
print("Server shutting down...")
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.get("/data/{item_id}")
async def get_data(item_id: int):
# 模拟异步数据库操作
conn = await pool.get_connection()
await asyncio.sleep(0.01) # 模拟查询
return {"item_id": item_id, "connection": conn}
@app.get("/parallel")
async def parallel_requests():
# 并行执行多个异步操作
results = await asyncio.gather(
fetch_from_service_a(),
fetch_from_service_b(),
fetch_from_service_c(),
)
return {"results": results}
async def fetch_from_service_a():
await asyncio.sleep(0.1)
return {"service": "A", "data": "data_a"}
async def fetch_from_service_b():
await asyncio.sleep(0.1)
return {"service": "B", "data": "data_b"}
async def fetch_from_service_c():
await asyncio.sleep(0.1)
return {"service": "C", "data": "data_c"}
if __name__ == "__main__":
uvicorn.run(
"server:app",
host="0.0.0.0",
port=8000,
workers=4, # 多进程
loop="uvloop", # 使用uvloop
)
WebSocket 实时通信
聊天室实现
import asyncio
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Set
import json
app = FastAPI()
class ConnectionManager:
"""WebSocket连接管理器"""
def __init__(self):
# 房间 -> 连接集合
self.rooms: Dict[str, Set[WebSocket]] = {}
# 连接 -> 用户信息
self.connections: Dict[WebSocket, dict] = {}
async def connect(self, websocket: WebSocket, room: str, user: str):
await websocket.accept()
if room not in self.rooms:
self.rooms[room] = set()
self.rooms[room].add(websocket)
self.connections[websocket] = {"room": room, "user": user}
# 广播加入消息
await self.broadcast(room, {
"type": "system",
"message": f"{user} joined the room"
}, exclude=websocket)
async def disconnect(self, websocket: WebSocket):
if websocket in self.connections:
info = self.connections[websocket]
room = info["room"]
user = info["user"]
self.rooms[room].discard(websocket)
del self.connections[websocket]
# 广播离开消息
await self.broadcast(room, {
"type": "system",
"message": f"{user} left the room"
})
async def broadcast(
self,
room: str,
message: dict,
exclude: WebSocket = None
):
"""向房间内所有人广播消息"""
if room not in self.rooms:
return
tasks = []
for connection in self.rooms[room]:
if connection != exclude:
tasks.append(self._send_safe(connection, message))
await asyncio.gather(*tasks)
async def _send_safe(self, websocket: WebSocket, message: dict):
"""安全发送,处理可能的异常"""
try:
await websocket.send_json(message)
except Exception:
await self.disconnect(websocket)
manager = ConnectionManager()
@app.websocket("/ws/{room}/{user}")
async def websocket_endpoint(websocket: WebSocket, room: str, user: str):
await manager.connect(websocket, room, user)
try:
while True:
# 接收消息
data = await websocket.receive_text()
message = json.loads(data)
# 广播消息
await manager.broadcast(room, {
"type": "message",
"user": user,
"content": message.get("content", "")
})
except WebSocketDisconnect:
await manager.disconnect(websocket)
架构图
分布式任务队列
基于 Redis 的任务队列
import asyncio
import aioredis
import json
import uuid
from typing import Callable, Dict, Any
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Task:
id: str
name: str
args: tuple
kwargs: dict
created_at: str
class TaskQueue:
"""异步任务队列"""
def __init__(self, redis_url: str = "redis://localhost"):
self.redis_url = redis_url
self.redis = None
self.handlers: Dict[str, Callable] = {}
self.running = False
async def connect(self):
self.redis = await aioredis.from_url(self.redis_url)
async def close(self):
if self.redis:
await self.redis.close()
def register(self, name: str):
"""注册任务处理器的装饰器"""
def decorator(func: Callable):
self.handlers[name] = func
return func
return decorator
async def enqueue(
self,
name: str,
*args,
queue: str = "default",
**kwargs
) -> str:
"""将任务加入队列"""
task = Task(
id=str(uuid.uuid4()),
name=name,
args=args,
kwargs=kwargs,
created_at=datetime.now().isoformat()
)
await self.redis.lpush(
f"queue:{queue}",
json.dumps(task.__dict__)
)
return task.id
async def worker(self, queue: str = "default", concurrency: int = 10):
"""启动工作进程"""
self.running = True
semaphore = asyncio.Semaphore(concurrency)
while self.running:
try:
# 阻塞等待任务
result = await self.redis.brpop(
f"queue:{queue}",
timeout=1
)
if result:
_, task_data = result
task_dict = json.loads(task_data)
task = Task(**task_dict)
# 限制并发执行
asyncio.create_task(
self._execute_task(task, semaphore)
)
except asyncio.CancelledError:
break
except Exception as e:
print(f"Worker error: {e}")
async def _execute_task(self, task: Task, semaphore: asyncio.Semaphore):
"""执行任务"""
async with semaphore:
handler = self.handlers.get(task.name)
if not handler:
print(f"No handler for task: {task.name}")
return
try:
print(f"Executing task: {task.id} ({task.name})")
if asyncio.iscoroutinefunction(handler):
result = await handler(*task.args, **task.kwargs)
else:
result = handler(*task.args, **task.kwargs)
print(f"Task {task.id} completed: {result}")
except Exception as e:
print(f"Task {task.id} failed: {e}")
# 使用示例
queue = TaskQueue()
@queue.register("send_email")
async def send_email(to: str, subject: str, body: str):
# 模拟发送邮件
await asyncio.sleep(1)
return f"Email sent to {to}"
@queue.register("process_image")
async def process_image(image_url: str):
# 模拟图片处理
await asyncio.sleep(2)
return f"Processed: {image_url}"
async def main():
await queue.connect()
# 添加任务
await queue.enqueue(
"send_email",
to="user@example.com",
subject="Hello",
body="World"
)
await queue.enqueue(
"process_image",
image_url="https://example.com/image.jpg"
)
# 启动worker
await queue.worker(concurrency=5)
if __name__ == "__main__":
asyncio.run(main())
爬虫与数据采集
高性能异步爬虫
import asyncio
import aiohttp
from typing import List, Set, Optional
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AsyncCrawler:
"""高性能异步爬虫"""
def __init__(
self,
max_concurrency: int = 10,
max_depth: int = 3,
timeout: int = 10
):
self.max_concurrency = max_concurrency
self.max_depth = max_depth
self.timeout = timeout
self.visited: Set[str] = set()
self.results: List[dict] = []
self.semaphore: Optional[asyncio.Semaphore] = None
async def crawl(self, start_url: str) -> List[dict]:
"""开始爬取"""
self.semaphore = asyncio.Semaphore(self.max_concurrency)
async with aiohttp.ClientSession() as session:
await self._crawl_url(session, start_url, 0)
return self.results
async def _crawl_url(
self,
session: aiohttp.ClientSession,
url: str,
depth: int
):
"""爬取单个URL"""
if url in self.visited or depth > self.max_depth:
return
self.visited.add(url)
async with self.semaphore:
try:
logger.info(f"Crawling: {url} (depth={depth})")
async with session.get(
url,
timeout=aiohttp.ClientTimeout(total=self.timeout)
) as response:
if response.status != 200:
return
html = await response.text()
# 解析页面
soup = BeautifulSoup(html, 'html.parser')
title = soup.title.string if soup.title else ""
# 保存结果
self.results.append({
"url": url,
"title": title,
"depth": depth
})
# 提取链接
if depth < self.max_depth:
links = self._extract_links(soup, url)
tasks = [
self._crawl_url(session, link, depth + 1)
for link in links
]
await asyncio.gather(*tasks)
except Exception as e:
logger.error(f"Error crawling {url}: {e}")
def _extract_links(self, soup: BeautifulSoup, base_url: str) -> List[str]:
"""提取同域链接"""
links = []
base_domain = urlparse(base_url).netloc
for a in soup.find_all('a', href=True):
href = a['href']
full_url = urljoin(base_url, href)
parsed = urlparse(full_url)
# 只爬取同域链接
if parsed.netloc == base_domain and parsed.scheme in ('http', 'https'):
# 去除锚点
clean_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
if clean_url not in self.visited:
links.append(clean_url)
return links[:10] # 限制每页链接数
async def main():
crawler = AsyncCrawler(
max_concurrency=20,
max_depth=2
)
results = await crawler.crawl("https://example.com")
print(f"\nCrawled {len(results)} pages:")
for r in results[:10]:
print(f" [{r['depth']}] {r['title'][:50]} - {r['url']}")
if __name__ == "__main__":
asyncio.run(main())
定时任务调度器
类似 Cron 的异步调度器
import asyncio
from datetime import datetime, timedelta
from typing import Callable, Optional, List
from dataclasses import dataclass, field
import heapq
@dataclass(order=True)
class ScheduledTask:
next_run: datetime
name: str = field(compare=False)
func: Callable = field(compare=False)
interval: Optional[timedelta] = field(default=None, compare=False)
args: tuple = field(default=(), compare=False)
kwargs: dict = field(default_factory=dict, compare=False)
class AsyncScheduler:
"""异步任务调度器"""
def __init__(self):
self.tasks: List[ScheduledTask] = []
self.running = False
def schedule(
self,
func: Callable,
run_at: Optional[datetime] = None,
interval: Optional[timedelta] = None,
name: Optional[str] = None,
*args,
**kwargs
):
"""调度任务"""
if run_at is None:
run_at = datetime.now()
task = ScheduledTask(
next_run=run_at,
name=name or func.__name__,
func=func,
interval=interval,
args=args,
kwargs=kwargs
)
heapq.heappush(self.tasks, task)
return task
def every(self, seconds: float):
"""便捷的周期调度装饰器"""
def decorator(func: Callable):
self.schedule(
func,
interval=timedelta(seconds=seconds),
name=func.__name__
)
return func
return decorator
async def run(self):
"""运行调度器"""
self.running = True
while self.running and self.tasks:
# 获取下一个任务
task = self.tasks[0]
now = datetime.now()
if task.next_run <= now:
# 执行任务
heapq.heappop(self.tasks)
asyncio.create_task(self._execute_task(task))
# 如果是周期任务,重新调度
if task.interval:
task.next_run = now + task.interval
heapq.heappush(self.tasks, task)
else:
# 等待到下一个任务时间
wait_time = (task.next_run - now).total_seconds()
await asyncio.sleep(min(wait_time, 1.0))
async def _execute_task(self, task: ScheduledTask):
"""执行任务"""
try:
print(f"[{datetime.now()}] Running: {task.name}")
if asyncio.iscoroutinefunction(task.func):
result = await task.func(*task.args, **task.kwargs)
else:
result = task.func(*task.args, **task.kwargs)
print(f"[{datetime.now()}] Completed: {task.name} -> {result}")
except Exception as e:
print(f"[{datetime.now()}] Error in {task.name}: {e}")
def stop(self):
self.running = False
# 使用示例
scheduler = AsyncScheduler()
@scheduler.every(5) # 每5秒执行
async def heartbeat():
return "alive"
@scheduler.every(60) # 每分钟执行
async def cleanup():
# 清理临时文件
return "cleaned"
async def main():
# 添加一次性任务
scheduler.schedule(
lambda: print("One-time task"),
run_at=datetime.now() + timedelta(seconds=3)
)
# 运行调度器
await scheduler.run()
if __name__ == "__main__":
asyncio.run(main())
数据库连接池
异步连接池实现
import asyncio
from typing import Optional, List, Any
from dataclasses import dataclass
from contextlib import asynccontextmanager
@dataclass
class Connection:
"""模拟数据库连接"""
id: int
in_use: bool = False
async def execute(self, query: str) -> List[dict]:
# 模拟查询
await asyncio.sleep(0.01)
return [{"result": f"data from connection {self.id}"}]
async def close(self):
pass
class AsyncConnectionPool:
"""异步数据库连接池"""
def __init__(
self,
min_size: int = 5,
max_size: int = 20,
connection_timeout: float = 30.0
):
self.min_size = min_size
self.max_size = max_size
self.connection_timeout = connection_timeout
self._pool: List[Connection] = []
self._size = 0
self._lock = asyncio.Lock()
self._available = asyncio.Condition(self._lock)
self._connection_id = 0
async def initialize(self):
"""初始化连接池"""
async with self._lock:
for _ in range(self.min_size):
conn = await self._create_connection()
self._pool.append(conn)
async def _create_connection(self) -> Connection:
"""创建新连接"""
self._connection_id += 1
self._size += 1
print(f"Creating connection {self._connection_id}")
await asyncio.sleep(0.1) # 模拟连接建立
return Connection(id=self._connection_id)
async def acquire(self) -> Connection:
"""获取连接"""
async with self._available:
# 等待可用连接
while True:
# 尝试获取空闲连接
for conn in self._pool:
if not conn.in_use:
conn.in_use = True
return conn
# 如果可以创建新连接
if self._size < self.max_size:
conn = await self._create_connection()
conn.in_use = True
self._pool.append(conn)
return conn
# 等待连接释放
try:
await asyncio.wait_for(
self._available.wait(),
timeout=self.connection_timeout
)
except asyncio.TimeoutError:
raise Exception("Connection pool exhausted")
async def release(self, conn: Connection):
"""释放连接"""
async with self._available:
conn.in_use = False
self._available.notify()
@asynccontextmanager
async def connection(self):
"""连接上下文管理器"""
conn = await self.acquire()
try:
yield conn
finally:
await self.release(conn)
async def close(self):
"""关闭连接池"""
async with self._lock:
for conn in self._pool:
await conn.close()
self._pool.clear()
self._size = 0
# 使用示例
async def main():
pool = AsyncConnectionPool(min_size=3, max_size=10)
await pool.initialize()
async def query_task(task_id: int):
async with pool.connection() as conn:
result = await conn.execute(f"SELECT * FROM table_{task_id}")
print(f"Task {task_id}: {result}")
# 并发执行多个查询
tasks = [query_task(i) for i in range(20)]
await asyncio.gather(*tasks)
await pool.close()
if __name__ == "__main__":
asyncio.run(main())
实时数据流处理
股票行情处理系统
import asyncio
from typing import Dict, List, Callable, Any
from dataclasses import dataclass
from datetime import datetime
import random
@dataclass
class StockQuote:
symbol: str
price: float
volume: int
timestamp: datetime
class StockStream:
"""股票数据流处理器"""
def __init__(self):
self.subscribers: Dict[str, List[Callable]] = {}
self.running = False
def subscribe(self, symbol: str, callback: Callable):
"""订阅股票行情"""
if symbol not in self.subscribers:
self.subscribers[symbol] = []
self.subscribers[symbol].append(callback)
async def publish(self, quote: StockQuote):
"""发布行情数据"""
if quote.symbol in self.subscribers:
tasks = [
self._notify(callback, quote)
for callback in self.subscribers[quote.symbol]
]
await asyncio.gather(*tasks)
async def _notify(self, callback: Callable, quote: StockQuote):
"""通知订阅者"""
try:
if asyncio.iscoroutinefunction(callback):
await callback(quote)
else:
callback(quote)
except Exception as e:
print(f"Error in subscriber: {e}")
class MovingAverageCalculator:
"""移动平均计算器"""
def __init__(self, window_size: int = 10):
self.window_size = window_size
self.prices: Dict[str, List[float]] = {}
async def on_quote(self, quote: StockQuote):
"""处理新行情"""
if quote.symbol not in self.prices:
self.prices[quote.symbol] = []
prices = self.prices[quote.symbol]
prices.append(quote.price)
if len(prices) > self.window_size:
prices.pop(0)
ma = sum(prices) / len(prices)
print(f"{quote.symbol} MA({self.window_size}): {ma:.2f}")
class AlertSystem:
"""价格预警系统"""
def __init__(self, thresholds: Dict[str, tuple]):
self.thresholds = thresholds
async def on_quote(self, quote: StockQuote):
"""检查价格预警"""
if quote.symbol in self.thresholds:
low, high = self.thresholds[quote.symbol]
if quote.price < low:
print(f"ALERT: {quote.symbol} price {quote.price} below {low}")
elif quote.price > high:
print(f"ALERT: {quote.symbol} price {quote.price} above {high}")
async def simulate_market(stream: StockStream, symbols: List[str]):
"""模拟市场数据"""
prices = {s: 100.0 for s in symbols}
while stream.running:
for symbol in symbols:
# 随机波动
change = random.uniform(-0.5, 0.5)
prices[symbol] += change
quote = StockQuote(
symbol=symbol,
price=prices[symbol],
volume=random.randint(100, 10000),
timestamp=datetime.now()
)
await stream.publish(quote)
await asyncio.sleep(0.1) # 100ms更新间隔
async def main():
stream = StockStream()
stream.running = True
# 创建处理器
ma_calc = MovingAverageCalculator(window_size=5)
alert_sys = AlertSystem({
"AAPL": (95.0, 105.0),
"GOOGL": (95.0, 105.0),
})
# 订阅
symbols = ["AAPL", "GOOGL", "MSFT"]
for symbol in symbols:
stream.subscribe(symbol, ma_calc.on_quote)
stream.subscribe(symbol, alert_sys.on_quote)
# 运行一段时间
market_task = asyncio.create_task(simulate_market(stream, symbols))
await asyncio.sleep(5) # 运行5秒
stream.running = False
market_task.cancel()
if __name__ == "__main__":
asyncio.run(main())
本章小结
实战要点
最佳实践
- 控制并发:使用 Semaphore 限制并发数,避免资源耗尽
- 优雅关闭:处理取消信号,释放资源
- 错误处理:每个异步任务都应有异常处理
- 监控指标:记录关键性能指标
- 连接复用:使用连接池减少开销