最佳实践与常见问题
2026/3/20大约 15 分钟
最佳实践与常见问题
SSO 实施最佳实践
规划阶段
开发阶段最佳实践
1. Token 设计原则
"""Token 设计最佳实践"""
# ✅ 好的做法
class TokenBestPractices:
# 1. 使用短有效期 + 刷新机制
ACCESS_TOKEN_EXPIRE = 15 * 60 # 15 分钟
REFRESH_TOKEN_EXPIRE = 7 * 24 * 3600 # 7 天
# 2. Token 中只包含必要信息
def create_token(self, user_id: int, username: str):
return {
"sub": str(user_id), # 用户标识
"username": username, # 显示用的用户名
"exp": ..., # 过期时间
"iat": ..., # 签发时间
"jti": ..., # 唯一标识
# ❌ 不要包含:密码、邮箱、手机号等敏感信息
}
# 3. 使用非对称加密(分布式场景)
ALGORITHM = "RS256" # 私钥签名,公钥验证
# 4. 实现 Token 撤销机制
def revoke_token(self, token_id: str):
# 加入黑名单
self.blacklist.add(token_id)
# ❌ 不好的做法
class TokenBadPractices:
# 1. 过长的有效期
ACCESS_TOKEN_EXPIRE = 30 * 24 * 3600 # ❌ 30 天太长
# 2. Token 中包含敏感信息
def create_token(self, user):
return {
"password_hash": user.password_hash, # ❌ 绝对不要
"phone": user.phone, # ❌ 敏感信息
}
# 3. 硬编码密钥
SECRET_KEY = "my-secret-key" # ❌ 不要硬编码
2. Session 安全配置
"""Session 安全配置最佳实践"""
# Flask Session 配置
app.config.update(
# ✅ 使用强随机密钥
SECRET_KEY=os.urandom(32).hex(),
# ✅ Session Cookie 安全配置
SESSION_COOKIE_SECURE=True, # 仅 HTTPS
SESSION_COOKIE_HTTPONLY=True, # 禁止 JS 访问
SESSION_COOKIE_SAMESITE='Lax', # CSRF 防护
# ✅ 合理的过期时间
PERMANENT_SESSION_LIFETIME=timedelta(hours=2),
)
# Cookie 设置最佳实践
def set_secure_cookie(response, name, value):
response.set_cookie(
name,
value,
secure=True, # ✅ 仅 HTTPS
httponly=True, # ✅ 禁止 JS 访问
samesite='Lax', # ✅ 防止 CSRF
max_age=7200, # ✅ 合理有效期
domain='.example.com' # ✅ 限制域名(如需跨子域)
)
3. 密码处理
"""密码处理最佳实践"""
import bcrypt
from argon2 import PasswordHasher
class PasswordHandler:
"""密码处理器"""
def __init__(self):
# ✅ 使用 Argon2(推荐)或 bcrypt
self.hasher = PasswordHasher(
time_cost=2,
memory_cost=65536,
parallelism=4
)
def hash_password(self, password: str) -> str:
"""哈希密码"""
# ✅ 永远不要存储明文密码
return self.hasher.hash(password)
def verify_password(self, password: str, hash: str) -> bool:
"""验证密码"""
try:
self.hasher.verify(hash, password)
return True
except:
return False
def needs_rehash(self, hash: str) -> bool:
"""检查是否需要重新哈希(算法升级)"""
return self.hasher.check_needs_rehash(hash)
# ❌ 错误做法
def bad_password_handling():
# ❌ MD5/SHA1 存储密码
password_hash = hashlib.md5(password.encode()).hexdigest()
# ❌ 明文存储
user.password = password
# ❌ 简单加盐
password_hash = hashlib.sha256((salt + password).encode()).hexdigest()
运维阶段最佳实践
1. 日志规范
"""审计日志最佳实践"""
import logging
import json
from datetime import datetime
class AuditLogger:
"""审计日志记录器"""
def __init__(self):
self.logger = logging.getLogger('audit')
def log_event(
self,
event_type: str,
user_id: int = None,
ip_address: str = None,
details: dict = None,
success: bool = True
):
"""
记录审计事件
必须记录的信息:
- 时间戳
- 事件类型
- 用户标识
- IP 地址
- 操作结果
- 请求 ID(用于追踪)
"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": event_type,
"user_id": user_id,
"ip_address": ip_address,
"success": success,
"details": details or {},
"request_id": self._get_request_id()
}
# ✅ 结构化日志
self.logger.info(json.dumps(log_entry))
# 必须记录的事件
AUDIT_EVENTS = [
"login_success", # 登录成功
"login_failed", # 登录失败
"logout", # 登出
"password_changed", # 密码修改
"account_locked", # 账户锁定
"permission_denied", # 权限拒绝
"token_revoked", # Token 撤销
"suspicious_activity", # 可疑活动
]
2. 监控指标
# 关键监控指标
sso_metrics:
# 可用性指标
availability:
- name: "服务可用率"
target: ">= 99.9%"
calculation: "成功请求数 / 总请求数"
# 性能指标
performance:
- name: "登录 P99 延迟"
target: "< 2s"
alert_threshold: "3s"
- name: "票据验证 P99 延迟"
target: "< 100ms"
alert_threshold: "200ms"
# 安全指标
security:
- name: "登录失败率"
target: "< 5%"
alert_threshold: "10%"
- name: "异常登录检测"
alert: "异地登录、暴力破解"
# 业务指标
business:
- name: "日活跃用户数"
- name: "峰值并发登录数"
- name: "各应用登录分布"
常见问题与解决方案
问题一:跨域 Cookie 问题
# 解决方案示例:重定向 + 票据
@app.route('/callback')
def sso_callback():
"""接收 SSO 回调"""
ticket = request.args.get('ticket')
if not ticket:
return redirect(sso_client.get_login_url())
# 验证票据
user_info = sso_client.validate_ticket(ticket)
if not user_info:
return "票据验证失败", 401
# 建立本地 Session(本域 Cookie)
session['user'] = user_info
return redirect('/')
问题二:Session 超时不一致
// 前端静默刷新示例
class SessionKeepAlive {
constructor(ssoUrl, checkInterval = 5 * 60 * 1000) {
this.ssoUrl = ssoUrl;
this.checkInterval = checkInterval;
}
start() {
setInterval(() => this.refresh(), this.checkInterval);
}
refresh() {
// 使用隐藏 iframe 访问 SSO 检查会话
const iframe = document.createElement("iframe");
iframe.style.display = "none";
iframe.src = `${this.ssoUrl}/sso/keep-alive`;
document.body.appendChild(iframe);
// 3 秒后移除
setTimeout(() => iframe.remove(), 3000);
}
}
问题三:单点登出不完整
# 后端通道登出通知(带重试)
import asyncio
import aiohttp
from typing import List
class LogoutNotifier:
"""登出通知器"""
def __init__(self, max_retries=3, retry_delay=1):
self.max_retries = max_retries
self.retry_delay = retry_delay
async def notify_all(self, services: List[dict]):
"""并发通知所有服务"""
tasks = [
self.notify_service(s['url'], s['session_id'])
for s in services
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def notify_service(self, url: str, session_id: str) -> bool:
"""通知单个服务(带重试)"""
for attempt in range(self.max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{url}/sso/logout-callback",
json={"session_id": session_id},
timeout=5
) as response:
if response.status == 200:
return True
except Exception as e:
print(f"通知失败 (尝试 {attempt + 1}): {e}")
if attempt < self.max_retries - 1:
await asyncio.sleep(self.retry_delay * (attempt + 1))
return False
问题四:性能瓶颈
# 性能优化示例
# 1. 用户缓存
class CachedUserService:
"""带缓存的用户服务"""
def __init__(self, cache, db, ttl=300):
self.cache = cache
self.db = db
self.ttl = ttl
def get_user(self, user_id: int) -> dict | None:
"""获取用户(缓存优先)"""
cache_key = f"user:{user_id}"
# 1. 查缓存
cached = self.cache.get(cache_key)
if cached:
return json.loads(cached)
# 2. 查数据库
user = self.db.query(User).get(user_id)
if user:
user_dict = user.to_dict()
self.cache.setex(cache_key, self.ttl, json.dumps(user_dict))
return user_dict
return None
# 2. Redis 连接池
import redis
redis_pool = redis.ConnectionPool(
host='localhost',
port=6379,
max_connections=100,
decode_responses=True
)
redis_client = redis.Redis(connection_pool=redis_pool)
# 3. 异步审计日志
from concurrent.futures import ThreadPoolExecutor
audit_executor = ThreadPoolExecutor(max_workers=4)
def async_log(event: dict):
"""异步记录日志"""
audit_executor.submit(write_audit_log, event)
问题五:Token 泄露
故障排查指南
排查流程
常见错误码
| 错误码 | 说明 | 排查方向 |
|---|---|---|
INVALID_TICKET | 票据无效 | 检查票据是否过期、是否重复使用 |
SERVICE_NOT_ALLOWED | 服务未授权 | 检查应用是否注册、回调地址是否匹配 |
SESSION_EXPIRED | 会话过期 | 检查 Redis 连接、会话 TTL 配置 |
INVALID_CREDENTIALS | 凭证错误 | 检查密码是否正确、账户是否锁定 |
ACCOUNT_LOCKED | 账户锁定 | 检查失败次数、锁定时间 |
TOKEN_REVOKED | Token 已撤销 | 检查黑名单、版本号 |
调试工具
# SSO 调试工具
class SSODebugger:
"""SSO 调试工具"""
def __init__(self, redis_client, db):
self.redis = redis_client
self.db = db
def check_tgt(self, tgt_id: str) -> dict:
"""检查 TGT 状态"""
key = f"tgt:{tgt_id}"
data = self.redis.get(key)
ttl = self.redis.ttl(key)
return {
"exists": data is not None,
"data": json.loads(data) if data else None,
"ttl": ttl,
"status": "valid" if ttl > 0 else "expired"
}
def check_st(self, st_id: str) -> dict:
"""检查 ST 状态"""
key = f"st:{st_id}"
data = self.redis.get(key)
ttl = self.redis.ttl(key)
if data:
st_data = json.loads(data)
return {
"exists": True,
"used": st_data.get("used", False),
"service": st_data.get("service_url"),
"ttl": ttl
}
return {"exists": False, "status": "not_found_or_expired"}
def check_user_sessions(self, user_id: int) -> list:
"""检查用户所有会话"""
pattern = f"session:*"
sessions = []
for key in self.redis.scan_iter(pattern):
data = self.redis.get(key)
if data:
session = json.loads(data)
if session.get("user_id") == user_id:
sessions.append({
"session_id": key.replace("session:", ""),
"created_at": session.get("created_at"),
"ttl": self.redis.ttl(key)
})
return sessions
def check_user_lockout(self, username: str) -> dict:
"""检查账户锁定状态"""
lock_key = f"lockout:{username}"
attempts_key = f"login_attempts:{username}"
return {
"locked": self.redis.exists(lock_key),
"lock_ttl": self.redis.ttl(lock_key),
"failed_attempts": int(self.redis.get(attempts_key) or 0)
}
# 使用示例
debugger = SSODebugger(redis_client, db)
# 检查票据
print(debugger.check_tgt("TGT-xxx"))
print(debugger.check_st("ST-xxx"))
# 检查用户状态
print(debugger.check_user_sessions(12345))
print(debugger.check_user_lockout("admin"))