分布式Session管理
2026/3/20大约 12 分钟
分布式 Session 管理
Session 共享问题
问题背景
在传统单机部署中,Session 存储在应用服务器内存中,不存在共享问题。但在分布式环境下,多台服务器需要共享 Session 数据:
解决方案概览
方案详解
方案一:会话保持
通过负载均衡器配置,确保同一用户的请求始终路由到同一台服务器。
Nginx 配置示例:
upstream backend {
# 方式1: IP Hash
ip_hash;
# 方式2: Cookie 绑定(推荐)
# hash $cookie_session_id consistent;
server 192.168.1.10:8080;
server 192.168.1.11:8080;
server 192.168.1.12:8080;
}
server {
location / {
proxy_pass http://backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}
优缺点:
| 优点 | 缺点 |
|---|---|
| 实现简单,无需改造应用 | 服务器宕机导致 Session 丢失 |
| 性能好,无额外网络开销 | 负载不均衡 |
| 无一致性问题 | 不利于弹性扩缩容 |
方案二:Session 复制
服务器之间实时同步 Session 数据,常见于 Tomcat 集群。
不推荐原因:复杂度高、扩展性差、网络开销大。
方案三:集中式存储(推荐)
使用 Redis 等集中式存储统一管理 Session,是目前最主流的方案。
Redis Session 实现
基础实现
import redis
import json
import secrets
from datetime import datetime, timedelta
from typing import Optional, Any
from dataclasses import dataclass, asdict
@dataclass
class SessionData:
"""Session 数据"""
user_id: int
username: str
roles: list
created_at: str
last_accessed: str
extra: dict = None
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, data: dict) -> "SessionData":
return cls(**data)
class RedisSessionManager:
"""Redis Session 管理器"""
def __init__(
self,
redis_client: redis.Redis,
prefix: str = "session:",
default_ttl: int = 1800, # 30 分钟
max_ttl: int = 86400 # 24 小时
):
self.redis = redis_client
self.prefix = prefix
self.default_ttl = default_ttl
self.max_ttl = max_ttl
def create_session(
self,
user_id: int,
username: str,
roles: list = None,
ttl: int = None,
extra: dict = None
) -> str:
"""
创建新 Session
Args:
user_id: 用户 ID
username: 用户名
roles: 角色列表
ttl: 有效期(秒)
extra: 额外数据
Returns:
Session ID
"""
session_id = self._generate_session_id()
now = datetime.utcnow().isoformat()
session_data = SessionData(
user_id=user_id,
username=username,
roles=roles or [],
created_at=now,
last_accessed=now,
extra=extra or {}
)
key = self._get_key(session_id)
ttl = min(ttl or self.default_ttl, self.max_ttl)
self.redis.setex(
key,
ttl,
json.dumps(session_data.to_dict())
)
# 记录用户的所有 Session(用于管理)
self._add_user_session(user_id, session_id, ttl)
return session_id
def get_session(self, session_id: str) -> Optional[SessionData]:
"""获取 Session 数据"""
key = self._get_key(session_id)
data = self.redis.get(key)
if not data:
return None
return SessionData.from_dict(json.loads(data))
def update_session(self, session_id: str, **kwargs) -> bool:
"""更新 Session 数据"""
session = self.get_session(session_id)
if not session:
return False
# 更新字段
for field, value in kwargs.items():
if hasattr(session, field):
setattr(session, field, value)
session.last_accessed = datetime.utcnow().isoformat()
key = self._get_key(session_id)
ttl = self.redis.ttl(key)
if ttl > 0:
self.redis.setex(key, ttl, json.dumps(session.to_dict()))
return True
return False
def refresh_session(self, session_id: str, ttl: int = None) -> bool:
"""
刷新 Session(滑动过期)
Args:
session_id: Session ID
ttl: 新的有效期
Returns:
是否成功
"""
session = self.get_session(session_id)
if not session:
return False
session.last_accessed = datetime.utcnow().isoformat()
key = self._get_key(session_id)
new_ttl = min(ttl or self.default_ttl, self.max_ttl)
self.redis.setex(key, new_ttl, json.dumps(session.to_dict()))
return True
def destroy_session(self, session_id: str) -> bool:
"""销毁 Session"""
session = self.get_session(session_id)
if not session:
return False
key = self._get_key(session_id)
self.redis.delete(key)
# 从用户 Session 列表移除
self._remove_user_session(session.user_id, session_id)
return True
def destroy_user_sessions(self, user_id: int, except_session: str = None) -> int:
"""销毁用户所有 Session"""
user_sessions_key = f"{self.prefix}user:{user_id}"
session_ids = self.redis.smembers(user_sessions_key)
count = 0
for session_id in session_ids:
sid = session_id.decode() if isinstance(session_id, bytes) else session_id
if sid != except_session:
if self.destroy_session(sid):
count += 1
return count
def get_user_sessions(self, user_id: int) -> list:
"""获取用户所有 Session"""
user_sessions_key = f"{self.prefix}user:{user_id}"
session_ids = self.redis.smembers(user_sessions_key)
sessions = []
for session_id in session_ids:
sid = session_id.decode() if isinstance(session_id, bytes) else session_id
session = self.get_session(sid)
if session:
sessions.append({
"session_id": sid,
"data": session.to_dict()
})
else:
# 清理过期的 Session 引用
self.redis.srem(user_sessions_key, sid)
return sessions
def _get_key(self, session_id: str) -> str:
"""获取 Redis 键"""
return f"{self.prefix}data:{session_id}"
def _generate_session_id(self) -> str:
"""生成 Session ID"""
return secrets.token_urlsafe(32)
def _add_user_session(self, user_id: int, session_id: str, ttl: int):
"""记录用户 Session"""
key = f"{self.prefix}user:{user_id}"
self.redis.sadd(key, session_id)
self.redis.expire(key, ttl + 3600) # 稍长于 Session TTL
def _remove_user_session(self, user_id: int, session_id: str):
"""移除用户 Session"""
key = f"{self.prefix}user:{user_id}"
self.redis.srem(key, session_id)
Flask 集成
from flask import Flask, request, g, jsonify
from functools import wraps
import redis
app = Flask(__name__)
# Redis 连接
redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True
)
session_manager = RedisSessionManager(redis_client)
def session_required(f):
"""Session 验证装饰器"""
@wraps(f)
def decorated(*args, **kwargs):
# 从 Cookie 或 Header 获取 Session ID
session_id = request.cookies.get("session_id")
if not session_id:
session_id = request.headers.get("X-Session-ID")
if not session_id:
return jsonify({"error": "未提供 Session"}), 401
session = session_manager.get_session(session_id)
if not session:
return jsonify({"error": "Session 无效或已过期"}), 401
# 刷新 Session(滑动过期)
session_manager.refresh_session(session_id)
g.session_id = session_id
g.session = session
g.user_id = session.user_id
return f(*args, **kwargs)
return decorated
@app.route("/api/login", methods=["POST"])
def login():
"""登录"""
data = request.get_json()
username = data.get("username")
password = data.get("password")
# 验证用户(示例)
user = authenticate_user(username, password)
if not user:
return jsonify({"error": "认证失败"}), 401
# 创建 Session
session_id = session_manager.create_session(
user_id=user["id"],
username=user["username"],
roles=user.get("roles", []),
extra={"login_ip": request.remote_addr}
)
response = jsonify({
"message": "登录成功",
"user": {"id": user["id"], "username": user["username"]}
})
# 设置 Cookie
response.set_cookie(
"session_id",
session_id,
httponly=True,
secure=True,
samesite="Lax",
max_age=1800
)
return response
@app.route("/api/logout", methods=["POST"])
@session_required
def logout():
"""登出"""
session_manager.destroy_session(g.session_id)
response = jsonify({"message": "登出成功"})
response.delete_cookie("session_id")
return response
@app.route("/api/logout-all", methods=["POST"])
@session_required
def logout_all():
"""登出所有设备"""
count = session_manager.destroy_user_sessions(
g.user_id,
except_session=g.session_id
)
return jsonify({"message": f"已登出 {count} 个设备"})
@app.route("/api/sessions", methods=["GET"])
@session_required
def get_sessions():
"""获取所有登录设备"""
sessions = session_manager.get_user_sessions(g.user_id)
return jsonify({
"current_session": g.session_id,
"sessions": sessions
})
@app.route("/api/profile", methods=["GET"])
@session_required
def get_profile():
"""获取用户信息"""
return jsonify({
"user_id": g.session.user_id,
"username": g.session.username,
"roles": g.session.roles
})
def authenticate_user(username: str, password: str) -> dict | None:
"""验证用户(示例)"""
if username == "admin" and password == "admin123":
return {"id": 1, "username": "admin", "roles": ["admin"]}
return None
Redis 高可用架构
Redis 集群方案对比
Sentinel 模式连接
from redis.sentinel import Sentinel
class SentinelSessionManager(RedisSessionManager):
"""基于 Sentinel 的 Session 管理器"""
def __init__(
self,
sentinel_hosts: list,
master_name: str,
password: str = None,
**kwargs
):
"""
初始化 Sentinel 连接
Args:
sentinel_hosts: Sentinel 节点列表 [('host1', 26379), ('host2', 26379)]
master_name: Master 名称
password: Redis 密码
"""
sentinel = Sentinel(
sentinel_hosts,
socket_timeout=0.5,
password=password
)
# 获取 Master 连接(用于写操作)
self.master = sentinel.master_for(
master_name,
socket_timeout=0.5,
password=password
)
# 获取 Slave 连接(用于读操作)
self.slave = sentinel.slave_for(
master_name,
socket_timeout=0.5,
password=password
)
super().__init__(self.master, **kwargs)
def get_session(self, session_id: str) -> Optional[SessionData]:
"""从 Slave 读取 Session(读写分离)"""
key = self._get_key(session_id)
data = self.slave.get(key)
if not data:
return None
return SessionData.from_dict(json.loads(data))
# 使用示例
sentinel_manager = SentinelSessionManager(
sentinel_hosts=[
('sentinel1.example.com', 26379),
('sentinel2.example.com', 26379),
('sentinel3.example.com', 26379)
],
master_name='mymaster',
password='your_password'
)
Cluster 模式连接
from redis.cluster import RedisCluster
class ClusterSessionManager(RedisSessionManager):
"""基于 Cluster 的 Session 管理器"""
def __init__(
self,
startup_nodes: list,
password: str = None,
**kwargs
):
"""
初始化 Cluster 连接
Args:
startup_nodes: 集群节点列表 [{'host': 'xxx', 'port': 7000}]
password: Redis 密码
"""
cluster = RedisCluster(
startup_nodes=startup_nodes,
password=password,
decode_responses=True,
skip_full_coverage_check=True
)
super().__init__(cluster, **kwargs)
def _add_user_session(self, user_id: int, session_id: str, ttl: int):
"""
Cluster 模式下使用 Hash Tag 确保相关数据在同一槽
使用 {user:xxx} 作为前缀,确保同一用户的数据落在同一节点
"""
key = f"{{user:{user_id}}}:sessions"
self.redis.sadd(key, session_id)
self.redis.expire(key, ttl + 3600)
# 使用示例
cluster_manager = ClusterSessionManager(
startup_nodes=[
{'host': 'redis1.example.com', 'port': 7000},
{'host': 'redis2.example.com', 'port': 7000},
{'host': 'redis3.example.com', 'port': 7000}
],
password='your_password'
)
Session 一致性问题
常见一致性问题
解决方案
import redis
from contextlib import contextmanager
class ConsistentSessionManager(RedisSessionManager):
"""保证一致性的 Session 管理器"""
def __init__(self, redis_client: redis.Redis, **kwargs):
super().__init__(redis_client, **kwargs)
self.lock_prefix = "lock:session:"
self.lock_timeout = 5 # 锁超时时间
@contextmanager
def session_lock(self, session_id: str):
"""Session 锁(用于并发更新)"""
lock_key = f"{self.lock_prefix}{session_id}"
lock = self.redis.lock(lock_key, timeout=self.lock_timeout)
acquired = lock.acquire(blocking=True, blocking_timeout=3)
if not acquired:
raise Exception("获取 Session 锁失败")
try:
yield
finally:
lock.release()
def atomic_update(self, session_id: str, update_func) -> bool:
"""
原子更新 Session
Args:
session_id: Session ID
update_func: 更新函数,接收当前 Session 数据,返回更新后的数据
Returns:
是否成功
"""
key = self._get_key(session_id)
# 使用 Redis 事务 + Watch 实现乐观锁
with self.redis.pipeline() as pipe:
while True:
try:
pipe.watch(key)
# 读取当前数据
data = pipe.get(key)
if not data:
pipe.unwatch()
return False
session = SessionData.from_dict(json.loads(data))
ttl = pipe.ttl(key)
# 执行更新
updated_session = update_func(session)
# 开始事务
pipe.multi()
pipe.setex(key, ttl, json.dumps(updated_session.to_dict()))
pipe.execute()
return True
except redis.WatchError:
# 数据被其他客户端修改,重试
continue
def add_to_cart(self, session_id: str, item_id: str) -> bool:
"""
添加商品到购物车(原子操作示例)
"""
def update(session: SessionData) -> SessionData:
cart = session.extra.get("cart", [])
if item_id not in cart:
cart.append(item_id)
session.extra["cart"] = cart
return session
return self.atomic_update(session_id, update)
def get_session_from_master(self, session_id: str) -> Optional[SessionData]:
"""
强制从 Master 读取(解决主从延迟问题)
在关键操作后调用,确保读取最新数据
"""
# 如果使用的是 Sentinel 或 Cluster,这里需要特殊处理
return self.get_session(session_id)
# 使用 Lua 脚本实现原子操作(更高效)
class LuaSessionManager(RedisSessionManager):
"""使用 Lua 脚本的 Session 管理器"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._register_scripts()
def _register_scripts(self):
"""注册 Lua 脚本"""
# 原子添加购物车商品
self._add_to_cart_script = self.redis.register_script("""
local key = KEYS[1]
local item_id = ARGV[1]
local data = redis.call('GET', key)
if not data then
return 0
end
local session = cjson.decode(data)
local cart = session.extra and session.extra.cart or {}
-- 检查是否已存在
for _, v in ipairs(cart) do
if v == item_id then
return 1 -- 已存在
end
end
-- 添加商品
table.insert(cart, item_id)
session.extra = session.extra or {}
session.extra.cart = cart
-- 获取 TTL
local ttl = redis.call('TTL', key)
-- 写回
redis.call('SETEX', key, ttl, cjson.encode(session))
return 1
""")
def add_to_cart_lua(self, session_id: str, item_id: str) -> bool:
"""使用 Lua 脚本添加购物车商品"""
key = self._get_key(session_id)
result = self._add_to_cart_script(keys=[key], args=[item_id])
return result == 1
Session 数据结构优化
大对象拆分
class OptimizedSessionManager(RedisSessionManager):
"""优化的 Session 管理器"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.large_data_prefix = "session_data:"
def store_large_data(self, session_id: str, data_key: str, data: Any) -> bool:
"""
存储大数据到独立的 key
适用于:购物车、浏览历史等大对象
"""
key = f"{self.large_data_prefix}{session_id}:{data_key}"
session = self.get_session(session_id)
if not session:
return False
ttl = self.redis.ttl(self._get_key(session_id))
if ttl <= 0:
return False
self.redis.setex(key, ttl, json.dumps(data))
return True
def get_large_data(self, session_id: str, data_key: str) -> Any:
"""获取大数据"""
key = f"{self.large_data_prefix}{session_id}:{data_key}"
data = self.redis.get(key)
return json.loads(data) if data else None
def delete_large_data(self, session_id: str, data_key: str) -> bool:
"""删除大数据"""
key = f"{self.large_data_prefix}{session_id}:{data_key}"
return self.redis.delete(key) > 0
# 使用示例
@app.route("/api/cart/add", methods=["POST"])
@session_required
def add_to_cart():
"""添加商品到购物车"""
data = request.get_json()
item = data.get("item")
# 获取当前购物车
cart = optimized_manager.get_large_data(g.session_id, "cart") or []
# 添加商品
cart.append(item)
# 存储(独立的 key,不影响 Session 主体)
optimized_manager.store_large_data(g.session_id, "cart", cart)
return jsonify({"cart_size": len(cart)})
本章小结
核心要点
- 集中式存储:Redis 是分布式 Session 的首选方案
- 高可用:生产环境应使用 Sentinel 或 Cluster 模式
- 一致性:使用锁或 Lua 脚本处理并发更新
- 优化:大对象拆分存储,避免 Session 膨胀