安全防护与攻击防范
2026/3/20大约 12 分钟
安全防护与攻击防范
认证系统安全威胁
威胁全景图
凭证攻击防护
暴力破解防护
import redis
import time
from dataclasses import dataclass
from typing import Optional, Tuple
from enum import Enum
class LockoutLevel(Enum):
"""锁定级别"""
NONE = "none"
SOFT = "soft" # 软锁定:增加验证码
HARD = "hard" # 硬锁定:禁止登录
@dataclass
class LoginAttemptResult:
"""登录尝试结果"""
allowed: bool
lockout_level: LockoutLevel
remaining_attempts: int
lockout_seconds: int = 0
require_captcha: bool = False
message: str = ""
class BruteForceProtection:
"""暴力破解防护"""
def __init__(
self,
redis_client: redis.Redis,
# 软锁定配置
soft_lock_attempts: int = 3, # 触发验证码的失败次数
soft_lock_window: int = 300, # 统计窗口(秒)
# 硬锁定配置
hard_lock_attempts: int = 10, # 触发账户锁定的失败次数
hard_lock_duration: int = 1800, # 锁定时长(秒)
# IP 限制配置
ip_limit_attempts: int = 50, # IP 限制阈值
ip_limit_window: int = 3600 # IP 限制窗口
):
self.redis = redis_client
self.soft_lock_attempts = soft_lock_attempts
self.soft_lock_window = soft_lock_window
self.hard_lock_attempts = hard_lock_attempts
self.hard_lock_duration = hard_lock_duration
self.ip_limit_attempts = ip_limit_attempts
self.ip_limit_window = ip_limit_window
self.prefix = "login_protect:"
def check_login_allowed(
self,
username: str,
ip_address: str
) -> LoginAttemptResult:
"""
检查是否允许登录尝试
Args:
username: 用户名
ip_address: IP 地址
Returns:
LoginAttemptResult
"""
# 1. 检查账户是否被硬锁定
lock_key = f"{self.prefix}lock:{username}"
lock_ttl = self.redis.ttl(lock_key)
if lock_ttl > 0:
return LoginAttemptResult(
allowed=False,
lockout_level=LockoutLevel.HARD,
remaining_attempts=0,
lockout_seconds=lock_ttl,
message=f"账户已锁定,请 {lock_ttl} 秒后重试"
)
# 2. 检查 IP 是否被限制
ip_key = f"{self.prefix}ip:{ip_address}"
ip_attempts = int(self.redis.get(ip_key) or 0)
if ip_attempts >= self.ip_limit_attempts:
return LoginAttemptResult(
allowed=False,
lockout_level=LockoutLevel.HARD,
remaining_attempts=0,
message="当前 IP 请求过于频繁,请稍后重试"
)
# 3. 检查用户失败次数
attempt_key = f"{self.prefix}attempts:{username}"
attempts = int(self.redis.get(attempt_key) or 0)
# 判断锁定级别
remaining = self.hard_lock_attempts - attempts
if attempts >= self.soft_lock_attempts:
return LoginAttemptResult(
allowed=True,
lockout_level=LockoutLevel.SOFT,
remaining_attempts=remaining,
require_captcha=True,
message=f"请完成验证码验证,剩余 {remaining} 次尝试机会"
)
return LoginAttemptResult(
allowed=True,
lockout_level=LockoutLevel.NONE,
remaining_attempts=remaining
)
def record_failed_attempt(self, username: str, ip_address: str):
"""记录失败的登录尝试"""
pipe = self.redis.pipeline()
# 记录用户失败次数
attempt_key = f"{self.prefix}attempts:{username}"
pipe.incr(attempt_key)
pipe.expire(attempt_key, self.soft_lock_window)
# 记录 IP 失败次数
ip_key = f"{self.prefix}ip:{ip_address}"
pipe.incr(ip_key)
pipe.expire(ip_key, self.ip_limit_window)
pipe.execute()
# 检查是否需要硬锁定
attempts = int(self.redis.get(attempt_key) or 0)
if attempts >= self.hard_lock_attempts:
self._hard_lock_account(username)
def record_successful_login(self, username: str, ip_address: str):
"""记录成功的登录(清除失败计数)"""
attempt_key = f"{self.prefix}attempts:{username}"
self.redis.delete(attempt_key)
def _hard_lock_account(self, username: str):
"""硬锁定账户"""
lock_key = f"{self.prefix}lock:{username}"
self.redis.setex(lock_key, self.hard_lock_duration, "1")
# 记录安全事件
self._log_security_event(
event_type="account_locked",
username=username,
reason="too_many_failed_attempts"
)
def unlock_account(self, username: str):
"""解锁账户"""
lock_key = f"{self.prefix}lock:{username}"
attempt_key = f"{self.prefix}attempts:{username}"
self.redis.delete(lock_key)
self.redis.delete(attempt_key)
def _log_security_event(self, event_type: str, **kwargs):
"""记录安全事件"""
event = {
"type": event_type,
"timestamp": time.time(),
**kwargs
}
# 实际应写入安全日志系统
print(f"Security Event: {event}")
# Flask 集成示例
from flask import Flask, request, jsonify
app = Flask(__name__)
brute_force = BruteForceProtection(redis_client)
@app.route("/api/login", methods=["POST"])
def login():
"""登录接口(带暴力破解防护)"""
data = request.get_json()
username = data.get("username")
password = data.get("password")
captcha = data.get("captcha")
ip_address = request.remote_addr
# 1. 检查是否允许登录
check_result = brute_force.check_login_allowed(username, ip_address)
if not check_result.allowed:
return jsonify({
"error": check_result.message,
"lockout_seconds": check_result.lockout_seconds
}), 429
# 2. 如果需要验证码,验证验证码
if check_result.require_captcha:
if not verify_captcha(captcha):
return jsonify({
"error": "验证码错误",
"require_captcha": True
}), 400
# 3. 验证用户凭证
user = authenticate_user(username, password)
if not user:
# 记录失败尝试
brute_force.record_failed_attempt(username, ip_address)
# 重新检查状态
new_check = brute_force.check_login_allowed(username, ip_address)
return jsonify({
"error": "用户名或密码错误",
"remaining_attempts": new_check.remaining_attempts,
"require_captcha": new_check.require_captcha
}), 401
# 4. 登录成功
brute_force.record_successful_login(username, ip_address)
return jsonify({
"message": "登录成功",
"user": {"id": user["id"], "username": user["username"]}
})
密码安全策略
import re
import hashlib
from typing import Tuple, List
from dataclasses import dataclass
@dataclass
class PasswordValidationResult:
"""密码验证结果"""
valid: bool
score: int # 密码强度评分 0-100
errors: List[str] # 错误列表
suggestions: List[str] # 改进建议
class PasswordPolicy:
"""密码安全策略"""
def __init__(
self,
min_length: int = 8,
max_length: int = 128,
require_uppercase: bool = True,
require_lowercase: bool = True,
require_digit: bool = True,
require_special: bool = True,
special_chars: str = "!@#$%^&*()_+-=[]{}|;:,.<>?",
max_consecutive_chars: int = 3,
check_common_passwords: bool = True
):
self.min_length = min_length
self.max_length = max_length
self.require_uppercase = require_uppercase
self.require_lowercase = require_lowercase
self.require_digit = require_digit
self.require_special = require_special
self.special_chars = special_chars
self.max_consecutive_chars = max_consecutive_chars
self.check_common_passwords = check_common_passwords
# 常见弱密码列表
self.common_passwords = self._load_common_passwords()
def validate(self, password: str, username: str = None) -> PasswordValidationResult:
"""验证密码"""
errors = []
suggestions = []
score = 100
# 长度检查
if len(password) < self.min_length:
errors.append(f"密码长度不能少于 {self.min_length} 个字符")
score -= 30
elif len(password) > self.max_length:
errors.append(f"密码长度不能超过 {self.max_length} 个字符")
# 复杂度检查
has_upper = bool(re.search(r'[A-Z]', password))
has_lower = bool(re.search(r'[a-z]', password))
has_digit = bool(re.search(r'\d', password))
has_special = bool(re.search(f'[{re.escape(self.special_chars)}]', password))
if self.require_uppercase and not has_upper:
errors.append("密码必须包含大写字母")
score -= 15
if self.require_lowercase and not has_lower:
errors.append("密码必须包含小写字母")
score -= 15
if self.require_digit and not has_digit:
errors.append("密码必须包含数字")
score -= 15
if self.require_special and not has_special:
errors.append("密码必须包含特殊字符")
score -= 15
# 连续字符检查
if self._has_consecutive_chars(password, self.max_consecutive_chars):
errors.append(f"密码不能包含超过 {self.max_consecutive_chars} 个连续相同字符")
score -= 10
# 常见序列检查
if self._has_common_sequence(password):
errors.append("密码不能包含常见序列(如 123、abc)")
score -= 10
# 用户名检查
if username and username.lower() in password.lower():
errors.append("密码不能包含用户名")
score -= 20
# 常见弱密码检查
if self.check_common_passwords:
if self._is_common_password(password):
errors.append("密码过于常见,请使用更复杂的密码")
score -= 30
# 生成建议
if len(password) < 12:
suggestions.append("建议使用 12 个字符以上的密码")
if not has_special:
suggestions.append("添加特殊字符可以增强密码安全性")
return PasswordValidationResult(
valid=len(errors) == 0,
score=max(0, score),
errors=errors,
suggestions=suggestions
)
def _has_consecutive_chars(self, password: str, max_count: int) -> bool:
"""检查连续重复字符"""
count = 1
for i in range(1, len(password)):
if password[i] == password[i-1]:
count += 1
if count > max_count:
return True
else:
count = 1
return False
def _has_common_sequence(self, password: str) -> bool:
"""检查常见序列"""
sequences = [
'abcdefghijklmnopqrstuvwxyz',
'qwertyuiopasdfghjklzxcvbnm',
'01234567890',
]
password_lower = password.lower()
for seq in sequences:
for i in range(len(seq) - 3):
if seq[i:i+4] in password_lower:
return True
# 反向检查
if seq[i:i+4][::-1] in password_lower:
return True
return False
def _is_common_password(self, password: str) -> bool:
"""检查是否是常见密码"""
password_hash = hashlib.sha256(password.lower().encode()).hexdigest()
return password_hash in self.common_passwords
def _load_common_passwords(self) -> set:
"""加载常见密码列表"""
# 实际应用中应该从文件加载(如 rockyou.txt)
common = [
"123456", "password", "12345678", "qwerty", "123456789",
"12345", "1234", "111111", "1234567", "dragon",
"master", "monkey", "letmein", "abc123", "admin"
]
return {hashlib.sha256(p.encode()).hexdigest() for p in common}
会话安全防护
CSRF 防护
import secrets
import hmac
import hashlib
from functools import wraps
from flask import Flask, request, session, jsonify, abort
app = Flask(__name__)
class CSRFProtection:
"""CSRF 防护"""
def __init__(self, secret_key: str, token_length: int = 32):
self.secret_key = secret_key
self.token_length = token_length
def generate_token(self, session_id: str) -> str:
"""
生成 CSRF Token
Token = random_bytes + HMAC(secret + session_id + random_bytes)
"""
random_part = secrets.token_hex(self.token_length // 2)
# 使用 HMAC 绑定到 session
signature = hmac.new(
self.secret_key.encode(),
f"{session_id}{random_part}".encode(),
hashlib.sha256
).hexdigest()
return f"{random_part}.{signature}"
def validate_token(self, token: str, session_id: str) -> bool:
"""验证 CSRF Token"""
try:
random_part, signature = token.split(".")
expected_signature = hmac.new(
self.secret_key.encode(),
f"{session_id}{random_part}".encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected_signature)
except Exception:
return False
csrf = CSRFProtection(app.secret_key)
def csrf_required(f):
"""CSRF 验证装饰器"""
@wraps(f)
def decorated(*args, **kwargs):
# 从 Header 或表单获取 Token
token = request.headers.get("X-CSRF-Token")
if not token:
token = request.form.get("csrf_token")
if not token:
abort(403, "缺少 CSRF Token")
session_id = session.get("session_id", "")
if not csrf.validate_token(token, session_id):
abort(403, "CSRF Token 无效")
return f(*args, **kwargs)
return decorated
@app.route("/api/csrf-token", methods=["GET"])
def get_csrf_token():
"""获取 CSRF Token"""
session_id = session.get("session_id", "")
token = csrf.generate_token(session_id)
return jsonify({"csrf_token": token})
@app.route("/api/transfer", methods=["POST"])
@csrf_required
def transfer():
"""需要 CSRF 保护的敏感操作"""
data = request.get_json()
# 处理转账逻辑...
return jsonify({"message": "转账成功"})
Session 劫持防护
import hashlib
from dataclasses import dataclass
from typing import Optional
@dataclass
class SessionFingerprint:
"""会话指纹"""
user_agent: str
accept_language: str
ip_address: str
class SessionSecurityManager:
"""Session 安全管理"""
def __init__(self, session_manager, strict_ip_binding: bool = False):
self.session_manager = session_manager
self.strict_ip_binding = strict_ip_binding
def create_secure_session(
self,
user_id: int,
username: str,
fingerprint: SessionFingerprint,
**kwargs
) -> str:
"""创建带指纹的安全 Session"""
# 生成指纹哈希
fingerprint_hash = self._hash_fingerprint(fingerprint)
return self.session_manager.create_session(
user_id=user_id,
username=username,
extra={
"fingerprint": fingerprint_hash,
"ip_address": fingerprint.ip_address,
**kwargs.get("extra", {})
},
**{k: v for k, v in kwargs.items() if k != "extra"}
)
def validate_session(
self,
session_id: str,
current_fingerprint: SessionFingerprint
) -> tuple[bool, str]:
"""
验证 Session 安全性
Returns:
(is_valid, error_message)
"""
session = self.session_manager.get_session(session_id)
if not session:
return False, "Session 不存在"
stored_fingerprint = session.extra.get("fingerprint")
stored_ip = session.extra.get("ip_address")
# 验证指纹
current_hash = self._hash_fingerprint(current_fingerprint)
if stored_fingerprint != current_hash:
# 指纹变化,可能是会话劫持
self._log_security_event(
"fingerprint_mismatch",
session_id=session_id,
user_id=session.user_id
)
return False, "会话异常,请重新登录"
# 严格 IP 绑定
if self.strict_ip_binding:
if stored_ip != current_fingerprint.ip_address:
self._log_security_event(
"ip_changed",
session_id=session_id,
user_id=session.user_id,
old_ip=stored_ip,
new_ip=current_fingerprint.ip_address
)
return False, "IP 地址变化,请重新登录"
return True, ""
def _hash_fingerprint(self, fingerprint: SessionFingerprint) -> str:
"""生成指纹哈希"""
data = f"{fingerprint.user_agent}|{fingerprint.accept_language}"
return hashlib.sha256(data.encode()).hexdigest()
def _log_security_event(self, event_type: str, **kwargs):
"""记录安全事件"""
print(f"Security: {event_type} - {kwargs}")
# 使用示例
@app.route("/api/protected")
@session_required
def protected_resource():
"""受保护的资源"""
# 创建当前指纹
fingerprint = SessionFingerprint(
user_agent=request.headers.get("User-Agent", ""),
accept_language=request.headers.get("Accept-Language", ""),
ip_address=request.remote_addr
)
# 验证 Session 安全性
is_valid, error = security_manager.validate_session(
g.session_id,
fingerprint
)
if not is_valid:
return jsonify({"error": error}), 401
return jsonify({"data": "sensitive information"})
XSS 防护
输出编码
import html
from markupsafe import Markup, escape
from functools import wraps
class XSSProtection:
"""XSS 防护工具"""
@staticmethod
def escape_html(text: str) -> str:
"""HTML 实体编码"""
return html.escape(text)
@staticmethod
def escape_js(text: str) -> str:
"""JavaScript 编码"""
# 转义可能导致 XSS 的字符
replacements = {
'\\': '\\\\',
"'": "\\'",
'"': '\\"',
'\n': '\\n',
'\r': '\\r',
'<': '\\x3c',
'>': '\\x3e',
'&': '\\x26',
}
for old, new in replacements.items():
text = text.replace(old, new)
return text
@staticmethod
def escape_url(text: str) -> str:
"""URL 编码"""
from urllib.parse import quote
return quote(text, safe='')
@staticmethod
def sanitize_html(html_content: str, allowed_tags: list = None) -> str:
"""
清理 HTML(保留安全标签)
使用 bleach 库进行 HTML 清理
"""
import bleach
if allowed_tags is None:
allowed_tags = ['p', 'br', 'b', 'i', 'u', 'strong', 'em', 'a']
allowed_attrs = {
'a': ['href', 'title'],
}
return bleach.clean(
html_content,
tags=allowed_tags,
attributes=allowed_attrs,
strip=True
)
# Content Security Policy 配置
class CSPMiddleware:
"""CSP 中间件"""
def __init__(self, app):
self.app = app
@app.after_request
def add_csp_header(response):
# 严格的 CSP 策略
csp = "; ".join([
"default-src 'self'",
"script-src 'self' 'nonce-{nonce}'", # 脚本只允许自身域
"style-src 'self' 'unsafe-inline'", # 样式允许内联
"img-src 'self' data: https:", # 图片允许 HTTPS
"font-src 'self'",
"connect-src 'self'", # AJAX 只允许自身域
"frame-ancestors 'none'", # 禁止被 iframe 嵌入
"form-action 'self'", # 表单只能提交到自身
"base-uri 'self'",
"object-src 'none'" # 禁止插件
])
response.headers['Content-Security-Policy'] = csp
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
return response
# 使用示例
xss = XSSProtection()
@app.route("/api/comment", methods=["POST"])
def post_comment():
"""发表评论(带 XSS 防护)"""
data = request.get_json()
content = data.get("content", "")
# 清理用户输入的 HTML
safe_content = xss.sanitize_html(content)
# 存储到数据库...
return jsonify({"content": safe_content})
JWT 安全防护
JWT 攻击防范
import jwt
from jwt import PyJWKClient
from typing import Optional
class SecureJWTValidator:
"""安全的 JWT 验证器"""
def __init__(
self,
secret_key: str = None,
public_key: str = None,
issuer: str = None,
audience: str = None,
algorithms: list = None
):
self.secret_key = secret_key
self.public_key = public_key
self.issuer = issuer
self.audience = audience
# 显式指定允许的算法,防止算法混淆攻击
self.algorithms = algorithms or ["RS256", "HS256"]
def validate(self, token: str) -> Optional[dict]:
"""
安全验证 JWT
防护措施:
1. 显式指定算法,防止 alg=none 攻击
2. 验证 issuer 和 audience
3. 验证过期时间
"""
try:
# 获取 Header(不验证)
header = jwt.get_unverified_header(token)
# 检查算法
alg = header.get("alg")
if alg not in self.algorithms:
raise jwt.InvalidAlgorithmError(f"不允许的算法: {alg}")
# 防止 alg=none 攻击
if alg.lower() == "none":
raise jwt.InvalidAlgorithmError("不允许使用 none 算法")
# 选择密钥
if alg.startswith("HS"):
key = self.secret_key
else:
key = self.public_key
# 验证 Token
payload = jwt.decode(
token,
key,
algorithms=self.algorithms,
issuer=self.issuer,
audience=self.audience,
options={
"verify_signature": True,
"verify_exp": True,
"verify_nbf": True,
"verify_iat": True,
"verify_aud": self.audience is not None,
"verify_iss": self.issuer is not None,
"require": ["exp", "iat", "sub"]
}
)
return payload
except jwt.ExpiredSignatureError:
print("Token 已过期")
except jwt.InvalidAudienceError:
print("Token 受众无效")
except jwt.InvalidIssuerError:
print("Token 签发者无效")
except jwt.InvalidAlgorithmError as e:
print(f"算法错误: {e}")
except jwt.InvalidTokenError as e:
print(f"Token 无效: {e}")
return None
# 防止 JWT 密钥混淆攻击
class JWKSValidator:
"""使用 JWKS 的 JWT 验证器"""
def __init__(self, jwks_url: str, issuer: str, audience: str):
self.jwks_client = PyJWKClient(jwks_url)
self.issuer = issuer
self.audience = audience
def validate(self, token: str) -> Optional[dict]:
"""使用 JWKS 验证 Token"""
try:
# 从 JWKS 获取签名密钥
signing_key = self.jwks_client.get_signing_key_from_jwt(token)
payload = jwt.decode(
token,
signing_key.key,
algorithms=["RS256"], # 只允许 RS256
issuer=self.issuer,
audience=self.audience
)
return payload
except Exception as e:
print(f"JWKS 验证失败: {e}")
return None
安全审计日志
审计日志系统
import json
import time
from datetime import datetime
from typing import Optional
from dataclasses import dataclass, asdict
from enum import Enum
import redis
class AuditEventType(Enum):
"""审计事件类型"""
# 认证事件
LOGIN_SUCCESS = "login_success"
LOGIN_FAILED = "login_failed"
LOGOUT = "logout"
PASSWORD_CHANGED = "password_changed"
MFA_ENABLED = "mfa_enabled"
MFA_DISABLED = "mfa_disabled"
# 会话事件
SESSION_CREATED = "session_created"
SESSION_DESTROYED = "session_destroyed"
SESSION_HIJACK_ATTEMPT = "session_hijack_attempt"
# 授权事件
PERMISSION_DENIED = "permission_denied"
ROLE_CHANGED = "role_changed"
# 安全事件
ACCOUNT_LOCKED = "account_locked"
ACCOUNT_UNLOCKED = "account_unlocked"
BRUTE_FORCE_DETECTED = "brute_force_detected"
SUSPICIOUS_ACTIVITY = "suspicious_activity"
@dataclass
class AuditEvent:
"""审计事件"""
event_type: str
user_id: Optional[int]
username: Optional[str]
ip_address: str
user_agent: str
timestamp: float
success: bool
details: dict
session_id: Optional[str] = None
request_id: Optional[str] = None
class AuditLogger:
"""审计日志记录器"""
def __init__(
self,
redis_client: redis.Redis,
prefix: str = "audit:",
retention_days: int = 90
):
self.redis = redis_client
self.prefix = prefix
self.retention_seconds = retention_days * 86400
def log(
self,
event_type: AuditEventType,
user_id: int = None,
username: str = None,
ip_address: str = "",
user_agent: str = "",
success: bool = True,
details: dict = None,
session_id: str = None,
request_id: str = None
):
"""记录审计事件"""
event = AuditEvent(
event_type=event_type.value,
user_id=user_id,
username=username,
ip_address=ip_address,
user_agent=user_agent,
timestamp=time.time(),
success=success,
details=details or {},
session_id=session_id,
request_id=request_id
)
# 存储到 Redis Stream
stream_key = f"{self.prefix}stream"
self.redis.xadd(
stream_key,
asdict(event),
maxlen=1000000 # 最多保留 100 万条
)
# 按用户索引
if user_id:
user_key = f"{self.prefix}user:{user_id}"
self.redis.lpush(user_key, json.dumps(asdict(event)))
self.redis.ltrim(user_key, 0, 999) # 保留最近 1000 条
self.redis.expire(user_key, self.retention_seconds)
# 安全事件告警
if event_type in [
AuditEventType.BRUTE_FORCE_DETECTED,
AuditEventType.SESSION_HIJACK_ATTEMPT,
AuditEventType.ACCOUNT_LOCKED
]:
self._send_security_alert(event)
def get_user_events(
self,
user_id: int,
limit: int = 100,
event_type: AuditEventType = None
) -> list:
"""获取用户的审计事件"""
user_key = f"{self.prefix}user:{user_id}"
events = self.redis.lrange(user_key, 0, limit - 1)
result = []
for event_data in events:
event = json.loads(event_data)
if event_type is None or event["event_type"] == event_type.value:
result.append(event)
return result
def get_login_history(self, user_id: int, limit: int = 20) -> list:
"""获取登录历史"""
events = self.get_user_events(user_id, limit=100)
login_events = [
e for e in events
if e["event_type"] in [
AuditEventType.LOGIN_SUCCESS.value,
AuditEventType.LOGIN_FAILED.value
]
]
return login_events[:limit]
def detect_anomaly(self, user_id: int, current_ip: str) -> dict:
"""检测异常行为"""
events = self.get_user_events(user_id, limit=50)
# 统计最近的登录位置
ip_counts = {}
failed_count = 0
recent_ips = set()
for event in events:
if event["event_type"] == AuditEventType.LOGIN_SUCCESS.value:
ip = event["ip_address"]
ip_counts[ip] = ip_counts.get(ip, 0) + 1
recent_ips.add(ip)
elif event["event_type"] == AuditEventType.LOGIN_FAILED.value:
# 统计最近 1 小时的失败次数
if time.time() - event["timestamp"] < 3600:
failed_count += 1
anomalies = []
# 新 IP 登录
if current_ip not in recent_ips and len(recent_ips) > 0:
anomalies.append({
"type": "new_ip",
"message": "从新的 IP 地址登录",
"risk_level": "medium"
})
# 大量失败尝试
if failed_count > 5:
anomalies.append({
"type": "many_failures",
"message": f"最近 1 小时内有 {failed_count} 次失败登录",
"risk_level": "high"
})
return {
"has_anomaly": len(anomalies) > 0,
"anomalies": anomalies
}
def _send_security_alert(self, event: AuditEvent):
"""发送安全告警"""
# 实际应用中应发送到告警系统
print(f"🚨 安全告警: {event.event_type} - User: {event.user_id}")
# Flask 集成
audit_logger = AuditLogger(redis_client)
@app.route("/api/login", methods=["POST"])
def login_with_audit():
"""带审计的登录接口"""
data = request.get_json()
username = data.get("username")
password = data.get("password")
# 获取请求信息
ip_address = request.remote_addr
user_agent = request.headers.get("User-Agent", "")
request_id = request.headers.get("X-Request-ID", "")
# 验证用户
user = authenticate_user(username, password)
if user:
# 记录成功登录
audit_logger.log(
event_type=AuditEventType.LOGIN_SUCCESS,
user_id=user["id"],
username=username,
ip_address=ip_address,
user_agent=user_agent,
success=True,
request_id=request_id,
details={"login_method": "password"}
)
return jsonify({"message": "登录成功"})
else:
# 记录失败登录
audit_logger.log(
event_type=AuditEventType.LOGIN_FAILED,
username=username,
ip_address=ip_address,
user_agent=user_agent,
success=False,
request_id=request_id,
details={"reason": "invalid_credentials"}
)
return jsonify({"error": "认证失败"}), 401
@app.route("/api/login-history", methods=["GET"])
@session_required
def get_login_history():
"""获取登录历史"""
history = audit_logger.get_login_history(g.user_id)
return jsonify({"history": history})