多点登录机制详解
2026/3/20大约 14 分钟
多点登录机制详解
什么是多点登录
基本概念
多点登录(Multi-Device Login)是指同一用户账号可以同时在多个设备或客户端上登录并保持活跃会话的机制。与单点登录(SSO)关注"一次登录访问多系统"不同,多点登录关注的是"同一系统多设备同时在线"的管理。
多点登录的应用场景
| 场景 | 需求描述 | 典型产品 |
|---|---|---|
| 社交应用 | 手机、电脑、平板同时在线 | 微信、QQ |
| 视频平台 | 限制同时观看设备数 | Netflix、爱奇艺 |
| 办公软件 | 多设备无缝切换工作 | 钉钉、飞书 |
| 游戏平台 | 防止账号共享,保护收益 | Steam、原神 |
| 金融应用 | 高安全要求,限制登录 | 银行 App |
多点登录策略
策略类型总览
设备类型识别
from dataclasses import dataclass
from enum import Enum
from user_agents import parse
class DeviceType(Enum):
"""设备类型枚举"""
MOBILE = "mobile" # 手机
TABLET = "tablet" # 平板
DESKTOP = "desktop" # 桌面电脑
WEB = "web" # 网页端
API = "api" # API 调用
UNKNOWN = "unknown"
@dataclass
class DeviceInfo:
"""设备信息"""
device_type: DeviceType
device_name: str # 设备名称
os: str # 操作系统
os_version: str # 系统版本
browser: str # 浏览器
browser_version: str # 浏览器版本
ip_address: str # IP 地址
user_agent: str # 原始 UA
device_id: str = None # 设备唯一 ID(App 端)
class DeviceDetector:
"""设备检测器"""
@staticmethod
def detect(user_agent: str, ip_address: str, device_id: str = None) -> DeviceInfo:
"""从 User-Agent 解析设备信息"""
ua = parse(user_agent)
# 判断设备类型
if ua.is_mobile:
device_type = DeviceType.MOBILE
elif ua.is_tablet:
device_type = DeviceType.TABLET
elif ua.is_pc:
device_type = DeviceType.DESKTOP
else:
device_type = DeviceType.UNKNOWN
# 构建设备名称
device_name = f"{ua.device.brand or ''} {ua.device.model or ''}".strip()
if not device_name:
device_name = f"{ua.os.family} 设备"
return DeviceInfo(
device_type=device_type,
device_name=device_name,
os=ua.os.family or "Unknown",
os_version=ua.os.version_string or "",
browser=ua.browser.family or "Unknown",
browser_version=ua.browser.version_string or "",
ip_address=ip_address,
user_agent=user_agent,
device_id=device_id
)
# 使用示例
if __name__ == "__main__":
# 模拟不同设备的 User-Agent
user_agents = [
"Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X) AppleWebKit/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0",
"Mozilla/5.0 (iPad; CPU OS 15_0 like Mac OS X) AppleWebKit/605.1.15"
]
detector = DeviceDetector()
for ua in user_agents:
info = detector.detect(ua, "192.168.1.100")
print(f"{info.device_type.value}: {info.device_name} ({info.os})")
会话管理实现
数据模型设计
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import List, Optional
import uuid
class SessionStatus(Enum):
"""会话状态"""
ACTIVE = "active" # 活跃
EXPIRED = "expired" # 过期
KICKED = "kicked" # 被踢下线
LOGOUT = "logout" # 主动登出
@dataclass
class UserSession:
"""用户会话"""
session_id: str # 会话 ID
user_id: int # 用户 ID
device_info: DeviceInfo # 设备信息
created_at: datetime # 创建时间
last_active_at: datetime # 最后活跃时间
expires_at: datetime # 过期时间
status: SessionStatus = SessionStatus.ACTIVE
login_ip: str = "" # 登录 IP
login_location: str = "" # 登录地点
@staticmethod
def create(user_id: int, device_info: DeviceInfo, ttl_seconds: int = 7200) -> "UserSession":
"""创建新会话"""
now = datetime.utcnow()
return UserSession(
session_id=str(uuid.uuid4()),
user_id=user_id,
device_info=device_info,
created_at=now,
last_active_at=now,
expires_at=now + timedelta(seconds=ttl_seconds),
login_ip=device_info.ip_address
)
@dataclass
class LoginPolicy:
"""登录策略配置"""
max_sessions: int = 0 # 最大会话数,0 表示不限
kick_strategy: str = "oldest" # 踢人策略:oldest/all/same_type
allow_same_type_multi: bool = True # 是否允许同类型设备多登录
require_confirm_kick: bool = False # 踢人是否需要确认
session_ttl: int = 7200 # 会话有效期(秒)
remember_me_ttl: int = 604800 # 记住我有效期(秒)
Redis 会话存储
import redis
import json
from typing import List, Optional
from datetime import datetime, timedelta
class RedisSessionStore:
"""Redis 会话存储"""
def __init__(self, redis_client: redis.Redis, prefix: str = "session:"):
self.redis = redis_client
self.prefix = prefix
def _session_key(self, session_id: str) -> str:
"""会话键"""
return f"{self.prefix}data:{session_id}"
def _user_sessions_key(self, user_id: int) -> str:
"""用户会话集合键"""
return f"{self.prefix}user:{user_id}"
def _device_session_key(self, user_id: int, device_type: str) -> str:
"""用户设备会话键(用于同类型互踢)"""
return f"{self.prefix}device:{user_id}:{device_type}"
def save_session(self, session: UserSession) -> bool:
"""保存会话"""
session_key = self._session_key(session.session_id)
user_sessions_key = self._user_sessions_key(session.user_id)
device_key = self._device_session_key(
session.user_id,
session.device_info.device_type.value
)
# 计算 TTL
ttl = int((session.expires_at - datetime.utcnow()).total_seconds())
if ttl <= 0:
return False
# 序列化会话数据
session_data = {
"session_id": session.session_id,
"user_id": session.user_id,
"device_type": session.device_info.device_type.value,
"device_name": session.device_info.device_name,
"os": session.device_info.os,
"browser": session.device_info.browser,
"ip_address": session.device_info.ip_address,
"login_ip": session.login_ip,
"login_location": session.login_location,
"created_at": session.created_at.isoformat(),
"last_active_at": session.last_active_at.isoformat(),
"expires_at": session.expires_at.isoformat(),
"status": session.status.value
}
pipe = self.redis.pipeline()
# 1. 保存会话数据
pipe.setex(session_key, ttl, json.dumps(session_data))
# 2. 添加到用户会话集合
pipe.sadd(user_sessions_key, session.session_id)
pipe.expire(user_sessions_key, ttl + 3600) # 集合过期时间稍长
# 3. 记录设备类型会话(用于同类型互踢查询)
pipe.setex(device_key, ttl, session.session_id)
pipe.execute()
return True
def get_session(self, session_id: str) -> Optional[UserSession]:
"""获取会话"""
data = self.redis.get(self._session_key(session_id))
if not data:
return None
session_data = json.loads(data)
return self._deserialize_session(session_data)
def get_user_sessions(self, user_id: int) -> List[UserSession]:
"""获取用户所有会话"""
user_sessions_key = self._user_sessions_key(user_id)
session_ids = self.redis.smembers(user_sessions_key)
sessions = []
for session_id in session_ids:
session = self.get_session(session_id.decode() if isinstance(session_id, bytes) else session_id)
if session and session.status == SessionStatus.ACTIVE:
sessions.append(session)
# 按最后活跃时间排序
sessions.sort(key=lambda s: s.last_active_at, reverse=True)
return sessions
def get_session_by_device_type(self, user_id: int, device_type: DeviceType) -> Optional[UserSession]:
"""获取用户指定设备类型的会话"""
device_key = self._device_session_key(user_id, device_type.value)
session_id = self.redis.get(device_key)
if session_id:
return self.get_session(session_id.decode() if isinstance(session_id, bytes) else session_id)
return None
def delete_session(self, session_id: str, reason: SessionStatus = SessionStatus.LOGOUT) -> bool:
"""删除会话"""
session = self.get_session(session_id)
if not session:
return False
pipe = self.redis.pipeline()
# 标记状态后删除
session.status = reason
session_key = self._session_key(session_id)
pipe.delete(session_key)
# 从用户会话集合移除
user_sessions_key = self._user_sessions_key(session.user_id)
pipe.srem(user_sessions_key, session_id)
pipe.execute()
return True
def update_activity(self, session_id: str) -> bool:
"""更新会话活跃时间"""
session = self.get_session(session_id)
if not session:
return False
session.last_active_at = datetime.utcnow()
return self.save_session(session)
def _deserialize_session(self, data: dict) -> UserSession:
"""反序列化会话"""
device_info = DeviceInfo(
device_type=DeviceType(data["device_type"]),
device_name=data["device_name"],
os=data["os"],
os_version="",
browser=data["browser"],
browser_version="",
ip_address=data["ip_address"],
user_agent=""
)
return UserSession(
session_id=data["session_id"],
user_id=data["user_id"],
device_info=device_info,
created_at=datetime.fromisoformat(data["created_at"]),
last_active_at=datetime.fromisoformat(data["last_active_at"]),
expires_at=datetime.fromisoformat(data["expires_at"]),
status=SessionStatus(data["status"]),
login_ip=data.get("login_ip", ""),
login_location=data.get("login_location", "")
)
互踢策略实现
多点登录管理器
from typing import List, Tuple, Optional
from dataclasses import dataclass
from enum import Enum
class KickStrategy(Enum):
"""踢人策略"""
NONE = "none" # 不踢人
OLDEST = "oldest" # 踢最早的会话
ALL = "all" # 踢掉所有旧会话
SAME_TYPE = "same_type" # 踢同类型设备
CONFIRM = "confirm" # 需要用户确认
@dataclass
class LoginResult:
"""登录结果"""
success: bool
session: Optional[UserSession] = None
kicked_sessions: List[UserSession] = field(default_factory=list)
need_confirm: bool = False
conflict_sessions: List[UserSession] = field(default_factory=list)
error_message: str = ""
class MultiDeviceLoginManager:
"""多设备登录管理器"""
def __init__(self, session_store: RedisSessionStore, policy: LoginPolicy):
self.session_store = session_store
self.policy = policy
def login(
self,
user_id: int,
device_info: DeviceInfo,
force_kick: bool = False
) -> LoginResult:
"""
处理登录请求
Args:
user_id: 用户 ID
device_info: 设备信息
force_kick: 是否强制踢掉冲突会话
Returns:
LoginResult: 登录结果
"""
# 获取用户当前所有会话
current_sessions = self.session_store.get_user_sessions(user_id)
# 检查是否需要踢人
kick_result = self._check_and_kick(
user_id, device_info, current_sessions, force_kick
)
if kick_result.need_confirm and not force_kick:
# 需要用户确认
return kick_result
# 创建新会话
ttl = self.policy.remember_me_ttl if device_info.device_id else self.policy.session_ttl
new_session = UserSession.create(user_id, device_info, ttl)
# 保存会话
if self.session_store.save_session(new_session):
return LoginResult(
success=True,
session=new_session,
kicked_sessions=kick_result.kicked_sessions
)
return LoginResult(
success=False,
error_message="会话创建失败"
)
def _check_and_kick(
self,
user_id: int,
device_info: DeviceInfo,
current_sessions: List[UserSession],
force_kick: bool
) -> LoginResult:
"""检查并执行踢人逻辑"""
kicked_sessions = []
conflict_sessions = []
# 策略1: 同类型设备互踢
if not self.policy.allow_same_type_multi:
same_type_sessions = [
s for s in current_sessions
if s.device_info.device_type == device_info.device_type
]
if same_type_sessions:
if self.policy.require_confirm_kick and not force_kick:
return LoginResult(
success=False,
need_confirm=True,
conflict_sessions=same_type_sessions,
error_message=f"您的{device_info.device_type.value}设备已登录,是否踢掉?"
)
# 执行踢人
for session in same_type_sessions:
self.session_store.delete_session(session.session_id, SessionStatus.KICKED)
kicked_sessions.append(session)
# 策略2: 检查最大会话数
if self.policy.max_sessions > 0:
# 计算踢人后剩余会话数
remaining_sessions = [
s for s in current_sessions
if s.session_id not in [ks.session_id for ks in kicked_sessions]
]
# 需要额外踢掉的数量
excess = len(remaining_sessions) - self.policy.max_sessions + 1
if excess > 0:
if self.policy.kick_strategy == KickStrategy.CONFIRM and not force_kick:
# 找出将被踢的会话
sorted_sessions = sorted(remaining_sessions, key=lambda s: s.last_active_at)
to_kick = sorted_sessions[:excess]
return LoginResult(
success=False,
need_confirm=True,
conflict_sessions=to_kick,
error_message=f"已达最大设备数({self.policy.max_sessions}),需下线旧设备"
)
elif self.policy.kick_strategy in [KickStrategy.OLDEST, KickStrategy.CONFIRM]:
# 踢最早登录的
sorted_sessions = sorted(remaining_sessions, key=lambda s: s.created_at)
for session in sorted_sessions[:excess]:
self.session_store.delete_session(session.session_id, SessionStatus.KICKED)
kicked_sessions.append(session)
elif self.policy.kick_strategy == KickStrategy.ALL:
# 踢掉所有
for session in remaining_sessions:
self.session_store.delete_session(session.session_id, SessionStatus.KICKED)
kicked_sessions.append(session)
return LoginResult(
success=True,
kicked_sessions=kicked_sessions
)
def logout(self, session_id: str) -> bool:
"""登出"""
return self.session_store.delete_session(session_id, SessionStatus.LOGOUT)
def logout_all(self, user_id: int, except_session: str = None) -> int:
"""登出用户所有会话"""
sessions = self.session_store.get_user_sessions(user_id)
count = 0
for session in sessions:
if session.session_id != except_session:
if self.session_store.delete_session(session.session_id, SessionStatus.LOGOUT):
count += 1
return count
def kick_session(self, session_id: str) -> bool:
"""踢掉指定会话"""
return self.session_store.delete_session(session_id, SessionStatus.KICKED)
def get_online_devices(self, user_id: int) -> List[dict]:
"""获取用户在线设备列表"""
sessions = self.session_store.get_user_sessions(user_id)
return [
{
"session_id": s.session_id,
"device_type": s.device_info.device_type.value,
"device_name": s.device_info.device_name,
"os": s.device_info.os,
"browser": s.device_info.browser,
"login_ip": s.login_ip,
"login_location": s.login_location,
"login_time": s.created_at.isoformat(),
"last_active": s.last_active_at.isoformat(),
"is_current": False # 需要调用方设置
}
for s in sessions
]
异地登录检测
IP 地理位置服务
import requests
from dataclasses import dataclass
from typing import Optional
from functools import lru_cache
@dataclass
class GeoLocation:
"""地理位置信息"""
country: str # 国家
province: str # 省份
city: str # 城市
isp: str # 运营商
latitude: float # 纬度
longitude: float # 经度
class GeoIPService:
"""IP 地理位置服务"""
def __init__(self, api_key: str = None):
self.api_key = api_key
@lru_cache(maxsize=10000)
def get_location(self, ip: str) -> Optional[GeoLocation]:
"""根据 IP 获取地理位置"""
# 私有 IP 不查询
if self._is_private_ip(ip):
return GeoLocation(
country="本地",
province="",
city="内网",
isp="",
latitude=0,
longitude=0
)
try:
# 使用免费的 IP 查询服务(实际应使用商业服务)
response = requests.get(
f"http://ip-api.com/json/{ip}?lang=zh-CN",
timeout=5
)
if response.status_code == 200:
data = response.json()
if data.get("status") == "success":
return GeoLocation(
country=data.get("country", ""),
province=data.get("regionName", ""),
city=data.get("city", ""),
isp=data.get("isp", ""),
latitude=data.get("lat", 0),
longitude=data.get("lon", 0)
)
except Exception as e:
print(f"IP 查询失败: {e}")
return None
def _is_private_ip(self, ip: str) -> bool:
"""判断是否为私有 IP"""
private_ranges = [
("10.", "10."),
("172.16.", "172.31."),
("192.168.", "192.168."),
("127.", "127.")
]
for start, end in private_ranges:
if ip.startswith(start):
return True
return False
class LoginAnomalyDetector:
"""登录异常检测器"""
def __init__(self, geo_service: GeoIPService, session_store: RedisSessionStore):
self.geo_service = geo_service
self.session_store = session_store
def detect_anomaly(self, user_id: int, current_ip: str) -> dict:
"""
检测登录异常
Returns:
{
"is_anomaly": bool,
"anomaly_type": str,
"risk_level": str,
"message": str,
"details": dict
}
"""
current_location = self.geo_service.get_location(current_ip)
if not current_location:
return {"is_anomaly": False}
# 获取用户最近的登录历史
sessions = self.session_store.get_user_sessions(user_id)
if not sessions:
# 首次登录
return {"is_anomaly": False}
# 获取上次登录位置
last_session = sessions[0] # 按时间排序,最近的在前
last_location = self.geo_service.get_location(last_session.login_ip)
if not last_location:
return {"is_anomaly": False}
# 检测异常类型
# 1. 跨国登录
if current_location.country != last_location.country:
return {
"is_anomaly": True,
"anomaly_type": "cross_country",
"risk_level": "high",
"message": f"检测到跨国登录:从 {last_location.country} 变为 {current_location.country}",
"details": {
"last_country": last_location.country,
"current_country": current_location.country
}
}
# 2. 跨省登录
if current_location.province != last_location.province:
# 计算时间差
time_diff = (datetime.utcnow() - last_session.last_active_at).total_seconds()
# 如果时间差很短(比如1小时内),可能是异常
if time_diff < 3600:
return {
"is_anomaly": True,
"anomaly_type": "impossible_travel",
"risk_level": "high",
"message": f"短时间内跨省登录:{last_location.city} → {current_location.city}",
"details": {
"last_city": last_location.city,
"current_city": current_location.city,
"time_diff_minutes": int(time_diff / 60)
}
}
return {
"is_anomaly": True,
"anomaly_type": "cross_province",
"risk_level": "medium",
"message": f"检测到跨省登录:{last_location.city} → {current_location.city}",
"details": {
"last_city": last_location.city,
"current_city": current_location.city
}
}
return {"is_anomaly": False}
API 接口设计
Flask 多点登录 API
from flask import Flask, request, jsonify, g
from functools import wraps
import redis
app = Flask(__name__)
# 初始化
redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
session_store = RedisSessionStore(redis_client)
geo_service = GeoIPService()
anomaly_detector = LoginAnomalyDetector(geo_service, session_store)
# 登录策略配置
login_policy = LoginPolicy(
max_sessions=5,
kick_strategy="oldest",
allow_same_type_multi=False,
require_confirm_kick=True,
session_ttl=7200
)
login_manager = MultiDeviceLoginManager(session_store, login_policy)
def require_auth(f):
"""认证装饰器"""
@wraps(f)
def decorated(*args, **kwargs):
session_id = request.headers.get("X-Session-ID")
if not session_id:
return jsonify({"error": "未提供会话凭证"}), 401
session = session_store.get_session(session_id)
if not session or session.status != SessionStatus.ACTIVE:
return jsonify({"error": "会话无效或已过期"}), 401
# 更新活跃时间
session_store.update_activity(session_id)
g.current_session = session
g.current_user_id = session.user_id
return f(*args, **kwargs)
return decorated
@app.route("/api/auth/login", methods=["POST"])
def login():
"""
用户登录
Request:
{
"username": "user",
"password": "pass",
"device_id": "xxx", // 可选,App 端设备 ID
"force_kick": false // 可选,是否强制踢掉冲突设备
}
Response:
{
"success": true,
"session_id": "xxx",
"user": {...},
"kicked_devices": [...], // 被踢掉的设备
"need_confirm": false, // 是否需要确认踢人
"conflict_devices": [...] // 冲突的设备(需确认时返回)
}
"""
data = request.get_json()
username = data.get("username")
password = data.get("password")
device_id = data.get("device_id")
force_kick = data.get("force_kick", False)
# 验证用户凭证(示例)
user = authenticate_user(username, password)
if not user:
return jsonify({"success": False, "error": "用户名或密码错误"}), 401
# 解析设备信息
device_info = DeviceDetector.detect(
request.headers.get("User-Agent", ""),
request.remote_addr,
device_id
)
# 异地登录检测
anomaly = anomaly_detector.detect_anomaly(user["id"], request.remote_addr)
if anomaly.get("is_anomaly") and anomaly.get("risk_level") == "high":
# 高风险登录,可能需要二次验证
return jsonify({
"success": False,
"need_verification": True,
"anomaly": anomaly
}), 403
# 执行登录
result = login_manager.login(user["id"], device_info, force_kick)
if result.need_confirm:
return jsonify({
"success": False,
"need_confirm": True,
"message": result.error_message,
"conflict_devices": [
{
"session_id": s.session_id,
"device_name": s.device_info.device_name,
"login_time": s.created_at.isoformat(),
"login_location": s.login_location
}
for s in result.conflict_sessions
]
})
if result.success:
# 更新登录位置
location = geo_service.get_location(request.remote_addr)
if location:
result.session.login_location = f"{location.province} {location.city}"
session_store.save_session(result.session)
return jsonify({
"success": True,
"session_id": result.session.session_id,
"user": {
"id": user["id"],
"username": user["username"]
},
"kicked_devices": [
{
"device_name": s.device_info.device_name,
"login_time": s.created_at.isoformat()
}
for s in result.kicked_sessions
],
"anomaly_warning": anomaly if anomaly.get("is_anomaly") else None
})
return jsonify({"success": False, "error": result.error_message}), 400
@app.route("/api/auth/logout", methods=["POST"])
@require_auth
def logout():
"""当前设备登出"""
login_manager.logout(g.current_session.session_id)
return jsonify({"success": True, "message": "已登出"})
@app.route("/api/auth/logout-all", methods=["POST"])
@require_auth
def logout_all():
"""登出所有设备(除当前设备)"""
count = login_manager.logout_all(
g.current_user_id,
except_session=g.current_session.session_id
)
return jsonify({
"success": True,
"message": f"已登出 {count} 个设备"
})
@app.route("/api/auth/devices", methods=["GET"])
@require_auth
def get_devices():
"""获取在线设备列表"""
devices = login_manager.get_online_devices(g.current_user_id)
# 标记当前设备
for device in devices:
device["is_current"] = device["session_id"] == g.current_session.session_id
return jsonify({
"success": True,
"devices": devices,
"current_session_id": g.current_session.session_id
})
@app.route("/api/auth/devices/<session_id>", methods=["DELETE"])
@require_auth
def kick_device(session_id):
"""踢掉指定设备"""
# 不能踢自己
if session_id == g.current_session.session_id:
return jsonify({"success": False, "error": "不能踢掉当前设备"}), 400
# 验证是否是自己的会话
target_session = session_store.get_session(session_id)
if not target_session or target_session.user_id != g.current_user_id:
return jsonify({"success": False, "error": "会话不存在"}), 404
login_manager.kick_session(session_id)
return jsonify({"success": True, "message": "设备已下线"})
@app.route("/api/auth/session", methods=["GET"])
@require_auth
def get_session_info():
"""获取当前会话信息"""
return jsonify({
"success": True,
"session": {
"session_id": g.current_session.session_id,
"device_name": g.current_session.device_info.device_name,
"device_type": g.current_session.device_info.device_type.value,
"login_time": g.current_session.created_at.isoformat(),
"login_ip": g.current_session.login_ip,
"login_location": g.current_session.login_location,
"last_active": g.current_session.last_active_at.isoformat(),
"expires_at": g.current_session.expires_at.isoformat()
}
})
def authenticate_user(username: str, password: str) -> dict | None:
"""验证用户凭证(示例)"""
# 实际应查询数据库并验证密码哈希
if username == "admin" and password == "admin123":
return {"id": 1, "username": "admin"}
return None
消息推送通知
被踢下线通知
import json
from typing import Optional
from redis import Redis
class KickNotificationService:
"""下线通知服务"""
def __init__(self, redis_client: Redis):
self.redis = redis_client
self.channel_prefix = "kick_notification:"
def notify_kick(
self,
session_id: str,
reason: str,
new_device_info: Optional[dict] = None
):
"""
发送下线通知
Args:
session_id: 被踢的会话 ID
reason: 下线原因
new_device_info: 新登录设备信息
"""
channel = f"{self.channel_prefix}{session_id}"
message = {
"type": "kick",
"reason": reason,
"new_device": new_device_info,
"timestamp": datetime.utcnow().isoformat()
}
self.redis.publish(channel, json.dumps(message))
def subscribe_kick(self, session_id: str):
"""订阅下线通知(供 WebSocket 使用)"""
pubsub = self.redis.pubsub()
pubsub.subscribe(f"{self.channel_prefix}{session_id}")
return pubsub
# WebSocket 处理示例(使用 Flask-SocketIO)
from flask_socketio import SocketIO, emit, join_room
socketio = SocketIO(app, cors_allowed_origins="*")
kick_service = KickNotificationService(redis_client)
@socketio.on("connect")
def handle_connect():
"""WebSocket 连接"""
session_id = request.args.get("session_id")
if session_id:
# 加入以 session_id 为名的房间
join_room(session_id)
print(f"客户端 {session_id} 已连接")
def background_kick_listener():
"""后台监听踢人通知"""
pubsub = redis_client.pubsub()
pubsub.psubscribe(f"{kick_service.channel_prefix}*")
for message in pubsub.listen():
if message["type"] == "pmessage":
channel = message["channel"].decode()
session_id = channel.replace(kick_service.channel_prefix, "")
data = json.loads(message["data"])
# 通过 WebSocket 推送给客户端
socketio.emit("kicked", data, room=session_id)
# 启动后台监听
import threading
kick_listener_thread = threading.Thread(target=background_kick_listener, daemon=True)
kick_listener_thread.start()
本章小结
核心要点
- 多点登录策略:根据业务需求选择允许多登、限制数量、同类型互踢等策略
- 设备识别:通过 User-Agent 和设备 ID 识别设备类型
- 会话管理:使用 Redis 存储会话,支持分布式部署
- 异地登录检测:通过 IP 地理位置识别异常登录
- 实时通知:使用 WebSocket 实时通知用户被踢下线