企业级架构设计
2026/3/20大约 10 分钟
企业级 SSO 架构设计
高可用架构
整体架构
Nginx 配置示例
# SSO 高可用负载均衡配置
upstream sso_servers {
# 使用 IP Hash 保持会话(或使用共享存储后无需 IP Hash)
# ip_hash;
# 健康检查
server 192.168.1.10:8080 weight=10 max_fails=3 fail_timeout=30s;
server 192.168.1.11:8080 weight=10 max_fails=3 fail_timeout=30s;
server 192.168.1.12:8080 weight=10 max_fails=3 fail_timeout=30s;
# 备用服务器
server 192.168.1.20:8080 backup;
# 保持连接
keepalive 32;
}
server {
listen 443 ssl http2;
server_name sso.company.com;
# SSL 配置
ssl_certificate /etc/nginx/ssl/sso.crt;
ssl_certificate_key /etc/nginx/ssl/sso.key;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256;
ssl_prefer_server_ciphers on;
ssl_session_cache shared:SSL:10m;
ssl_session_timeout 10m;
# 安全头
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-Content-Type-Options "nosniff" always;
# 限流
limit_req_zone $binary_remote_addr zone=sso_limit:10m rate=10r/s;
limit_req zone=sso_limit burst=20 nodelay;
location / {
proxy_pass http://sso_servers;
proxy_http_version 1.1;
# 代理头
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 超时设置
proxy_connect_timeout 5s;
proxy_read_timeout 60s;
proxy_send_timeout 60s;
# 保持连接
proxy_set_header Connection "";
}
# 健康检查端点
location /health {
proxy_pass http://sso_servers;
access_log off;
}
# 静态资源
location /static {
alias /var/www/sso/static;
expires 7d;
add_header Cache-Control "public, immutable";
}
}
Docker 部署配置
# docker-compose.yml
version: "3.8"
services:
# SSO 服务(可横向扩展)
sso:
build: ./sso_server
image: sso-server:latest
deploy:
replicas: 3
resources:
limits:
cpus: "1"
memory: 1G
environment:
- SECRET_KEY=${SECRET_KEY}
- DATABASE_URI=mysql+pymysql://sso:password@mysql:3306/sso
- REDIS_HOST=redis-master
- REDIS_PORT=6379
depends_on:
- mysql
- redis-master
networks:
- sso-network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
# Nginx 负载均衡
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro
- ./nginx/ssl:/etc/nginx/ssl:ro
depends_on:
- sso
networks:
- sso-network
# MySQL 主库
mysql:
image: mysql:8.0
environment:
- MYSQL_ROOT_PASSWORD=${MYSQL_ROOT_PASSWORD}
- MYSQL_DATABASE=sso
- MYSQL_USER=sso
- MYSQL_PASSWORD=${MYSQL_PASSWORD}
volumes:
- mysql-data:/var/lib/mysql
- ./mysql/init.sql:/docker-entrypoint-initdb.d/init.sql
networks:
- sso-network
# Redis 主节点
redis-master:
image: redis:7-alpine
command: redis-server --requirepass ${REDIS_PASSWORD} --appendonly yes
volumes:
- redis-data:/data
networks:
- sso-network
volumes:
mysql-data:
redis-data:
networks:
sso-network:
driver: overlay
多数据中心架构
跨地域部署
数据同步策略
# 多数据中心数据同步策略
class DataSyncStrategy:
"""数据同步策略"""
# 1. 用户数据 - 强一致性
USER_SYNC = {
"mode": "sync", # 同步复制
"timeout": 3000, # 超时时间 ms
"fallback": "local_cache", # 降级策略
"conflict_resolution": "last_write_wins"
}
# 2. Session 数据 - 最终一致性
SESSION_SYNC = {
"mode": "async", # 异步复制
"delay": 100, # 延迟 ms
"ttl": 1800, # 生存时间
"local_priority": True # 本地优先
}
# 3. 票据数据 - 本地存储(不同步)
TICKET_SYNC = {
"mode": "local_only", # 仅本地
"reason": "短生命周期,无需同步"
}
class CrossDCSessionManager:
"""跨数据中心 Session 管理"""
def __init__(self, local_redis, remote_redis_list):
self.local = local_redis
self.remotes = remote_redis_list
def create_session(self, session_id: str, data: dict, dc_id: str):
"""创建 Session(带数据中心标识)"""
session_data = {
**data,
"dc_id": dc_id,
"created_dc": dc_id
}
# 本地写入
self.local.setex(f"session:{session_id}", 1800, json.dumps(session_data))
# 异步同步到其他数据中心
self._async_sync(session_id, session_data)
def get_session(self, session_id: str) -> dict | None:
"""获取 Session(本地优先)"""
# 先查本地
data = self.local.get(f"session:{session_id}")
if data:
return json.loads(data)
# 本地没有,查询其他数据中心
for remote in self.remotes:
try:
data = remote.get(f"session:{session_id}")
if data:
# 缓存到本地
session_data = json.loads(data)
self.local.setex(
f"session:{session_id}",
1800,
data
)
return session_data
except Exception:
continue
return None
def _async_sync(self, session_id: str, data: dict):
"""异步同步到其他数据中心"""
import threading
def sync_task():
for remote in self.remotes:
try:
remote.setex(
f"session:{session_id}",
1800,
json.dumps(data)
)
except Exception as e:
print(f"同步失败: {e}")
thread = threading.Thread(target=sync_task)
thread.start()
灰度发布
灰度策略
灰度实现
import hashlib
import random
from typing import Optional
from dataclasses import dataclass
from enum import Enum
class GrayStrategy(Enum):
"""灰度策略"""
USER_ID = "user_id"
IP = "ip"
COOKIE = "cookie"
RANDOM = "random"
CLIENT = "client"
@dataclass
class GrayConfig:
"""灰度配置"""
enabled: bool = False
strategy: GrayStrategy = GrayStrategy.RANDOM
percentage: int = 10 # 灰度比例
whitelist_users: list = None # 白名单用户
whitelist_ips: list = None # 白名单 IP
whitelist_clients: list = None # 白名单应用
class GrayReleaseManager:
"""灰度发布管理器"""
def __init__(self, config: GrayConfig):
self.config = config
def should_use_new_version(
self,
user_id: int = None,
ip_address: str = None,
client_id: str = None,
cookies: dict = None
) -> bool:
"""
判断是否使用新版本
Returns:
True: 使用新版本
False: 使用旧版本
"""
if not self.config.enabled:
return False
strategy = self.config.strategy
# 白名单优先
if user_id and self.config.whitelist_users:
if user_id in self.config.whitelist_users:
return True
if ip_address and self.config.whitelist_ips:
if ip_address in self.config.whitelist_ips:
return True
if client_id and self.config.whitelist_clients:
if client_id in self.config.whitelist_clients:
return True
# 按策略判断
if strategy == GrayStrategy.USER_ID and user_id:
return self._hash_mod(str(user_id)) < self.config.percentage
elif strategy == GrayStrategy.IP and ip_address:
return self._hash_mod(ip_address) < self.config.percentage
elif strategy == GrayStrategy.COOKIE and cookies:
gray_flag = cookies.get('gray_version')
return gray_flag == 'v2'
elif strategy == GrayStrategy.RANDOM:
return random.randint(1, 100) <= self.config.percentage
elif strategy == GrayStrategy.CLIENT and client_id:
return client_id in (self.config.whitelist_clients or [])
return False
def _hash_mod(self, value: str) -> int:
"""哈希取模(确保同一用户始终分配到同一版本)"""
hash_value = int(hashlib.md5(value.encode()).hexdigest(), 16)
return hash_value % 100
# Nginx 灰度配置示例
NGINX_GRAY_CONFIG = """
# 灰度发布 Nginx 配置
# 定义灰度版本 upstream
upstream sso_stable {
server 192.168.1.10:8080;
server 192.168.1.11:8080;
}
upstream sso_canary {
server 192.168.1.20:8080;
}
# 基于 Cookie 的灰度
map $cookie_gray_version $backend {
v2 sso_canary;
default sso_stable;
}
# 基于用户 ID 的灰度(需要 Lua)
# 或者基于请求头
server {
listen 443 ssl;
server_name sso.company.com;
location / {
proxy_pass http://$backend;
# 设置灰度标记 Cookie
# add_header Set-Cookie "gray_version=v2; Path=/; Max-Age=3600";
}
}
"""
监控与告警
监控指标
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from functools import wraps
import time
class SSOMetrics:
"""SSO 监控指标"""
def __init__(self):
# 计数器
self.login_total = Counter(
'sso_login_total',
'登录总数',
['status', 'method'] # 成功/失败, 密码/OAuth
)
self.logout_total = Counter(
'sso_logout_total',
'登出总数'
)
self.ticket_created = Counter(
'sso_ticket_created_total',
'票据创建总数',
['type'] # TGT/ST
)
self.ticket_validated = Counter(
'sso_ticket_validated_total',
'票据验证总数',
['status'] # success/failed
)
# 直方图(延迟)
self.login_latency = Histogram(
'sso_login_latency_seconds',
'登录延迟',
buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
self.ticket_validation_latency = Histogram(
'sso_ticket_validation_latency_seconds',
'票据验证延迟',
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5]
)
# 仪表盘(当前值)
self.active_sessions = Gauge(
'sso_active_sessions',
'活跃会话数'
)
self.tgt_count = Gauge(
'sso_tgt_count',
'TGT 数量'
)
def record_login(self, success: bool, method: str, duration: float):
"""记录登录"""
status = 'success' if success else 'failed'
self.login_total.labels(status=status, method=method).inc()
self.login_latency.observe(duration)
def record_logout(self):
"""记录登出"""
self.logout_total.inc()
def record_ticket_created(self, ticket_type: str):
"""记录票据创建"""
self.ticket_created.labels(type=ticket_type).inc()
def record_ticket_validated(self, success: bool, duration: float):
"""记录票据验证"""
status = 'success' if success else 'failed'
self.ticket_validated.labels(status=status).inc()
self.ticket_validation_latency.observe(duration)
def update_active_sessions(self, count: int):
"""更新活跃会话数"""
self.active_sessions.set(count)
metrics = SSOMetrics()
def track_latency(metric_func):
"""延迟追踪装饰器"""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
start = time.time()
try:
result = f(*args, **kwargs)
return result
finally:
duration = time.time() - start
metric_func(duration)
return wrapper
return decorator
# Flask 监控端点
from flask import Blueprint
metrics_bp = Blueprint('metrics', __name__)
@metrics_bp.route('/metrics')
def prometheus_metrics():
"""Prometheus 指标端点"""
from flask import Response
return Response(
generate_latest(),
mimetype='text/plain'
)
告警规则
# prometheus/alerts.yml
groups:
- name: sso_alerts
rules:
# 登录失败率过高
- alert: HighLoginFailureRate
expr: |
sum(rate(sso_login_total{status="failed"}[5m])) /
sum(rate(sso_login_total[5m])) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "SSO 登录失败率过高"
description: "登录失败率超过 10%,当前: {{ $value | humanizePercentage }}"
# 登录延迟过高
- alert: HighLoginLatency
expr: |
histogram_quantile(0.95, rate(sso_login_latency_seconds_bucket[5m])) > 2
for: 5m
labels:
severity: warning
annotations:
summary: "SSO 登录延迟过高"
description: "P95 登录延迟超过 2 秒"
# 服务不可用
- alert: SSOServiceDown
expr: up{job="sso"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "SSO 服务不可用"
description: "SSO 服务 {{ $labels.instance }} 已下线"
# 活跃会话数异常
- alert: AbnormalSessionCount
expr: |
abs(sso_active_sessions - sso_active_sessions offset 1h) /
sso_active_sessions offset 1h > 0.5
for: 15m
labels:
severity: warning
annotations:
summary: "活跃会话数异常波动"
description: "会话数在 1 小时内变化超过 50%"
# Redis 连接失败
- alert: RedisConnectionFailed
expr: sso_redis_connection_errors_total > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Redis 连接失败"
description: "SSO 无法连接 Redis"
容灾与降级
降级策略
from typing import Callable, Any
from functools import wraps
import time
class CircuitBreaker:
"""熔断器"""
def __init__(
self,
failure_threshold: int = 5,
reset_timeout: int = 60,
half_open_max_calls: int = 3
):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.half_open_max_calls = half_open_max_calls
self.failures = 0
self.last_failure_time = 0
self.state = "closed" # closed, open, half_open
self.half_open_calls = 0
def call(self, func: Callable, fallback: Callable = None) -> Any:
"""执行调用(带熔断)"""
if self.state == "open":
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = "half_open"
self.half_open_calls = 0
else:
if fallback:
return fallback()
raise Exception("Circuit breaker is open")
try:
result = func()
self._on_success()
return result
except Exception as e:
self._on_failure()
if fallback:
return fallback()
raise
def _on_success(self):
"""成功回调"""
if self.state == "half_open":
self.half_open_calls += 1
if self.half_open_calls >= self.half_open_max_calls:
self.state = "closed"
self.failures = 0
def _on_failure(self):
"""失败回调"""
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = "open"
class SSOFallbackService:
"""SSO 降级服务"""
def __init__(self, local_cache):
self.cache = local_cache
self.circuit_breaker = CircuitBreaker()
def validate_ticket_with_fallback(self, ticket: str, service: str) -> dict | None:
"""带降级的票据验证"""
def normal_validate():
# 正常验证流程
return ticket_service.validate_st(ticket, service)
def fallback_validate():
# 降级:使用本地缓存
cached = self.cache.get(f"ticket_cache:{ticket}")
if cached:
return json.loads(cached)
return None
return self.circuit_breaker.call(normal_validate, fallback_validate)
def get_user_with_fallback(self, user_id: int) -> dict | None:
"""带降级的用户查询"""
def normal_query():
return User.query.get(user_id).to_dict()
def fallback_query():
# 降级:使用缓存的用户信息
cached = self.cache.get(f"user_cache:{user_id}")
if cached:
return json.loads(cached)
# 返回基本信息
return {"id": user_id, "username": "unknown", "roles": ["user"]}
return self.circuit_breaker.call(normal_query, fallback_query)
本章小结
核心要点
- 高可用:多实例部署 + 负载均衡 + 健康检查
- 数据中心:主从架构 + 数据同步 + 就近访问
- 灰度发布:多种灰度策略,渐进式发布
- 监控告警:关键指标监控 + 智能告警
- 容灾降级:熔断器 + 降级策略 + 本地缓存