Token机制与JWT深度剖析
2026/3/20大约 16 分钟
Token 机制与 JWT 深度剖析
Token 认证概述
什么是 Token
Token(令牌)是一种无状态的身份凭证,服务端不存储会话信息,而是将用户信息编码在 Token 中,由客户端保存和传递。
Token 类型
JWT 深度解析
JWT 结构
JWT(JSON Web Token)由三部分组成,用点号(.)分隔:
标准声明 (Registered Claims)
| 声明 | 全称 | 说明 |
|---|---|---|
iss | Issuer | 签发者 |
sub | Subject | 主题(通常是用户 ID) |
aud | Audience | 受众(接收方) |
exp | Expiration Time | 过期时间(Unix 时间戳) |
nbf | Not Before | 生效时间 |
iat | Issued At | 签发时间 |
jti | JWT ID | 唯一标识符 |
JWT 签名算法
Python JWT 实现
import jwt
import time
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from dataclasses import dataclass
from enum import Enum
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
class TokenType(Enum):
"""Token 类型"""
ACCESS = "access"
REFRESH = "refresh"
ID = "id"
@dataclass
class TokenConfig:
"""Token 配置"""
secret_key: str # 对称加密密钥
private_key: str = None # RSA 私钥
public_key: str = None # RSA 公钥
algorithm: str = "HS256" # 签名算法
access_token_expire: int = 900 # 访问令牌有效期(秒)
refresh_token_expire: int = 604800 # 刷新令牌有效期(秒)
issuer: str = "my-app" # 签发者
audience: str = "my-app" # 受众
class JWTManager:
"""JWT 管理器"""
def __init__(self, config: TokenConfig):
self.config = config
def create_access_token(
self,
user_id: int,
username: str,
roles: list = None,
extra_claims: dict = None
) -> str:
"""
创建访问令牌
Args:
user_id: 用户 ID
username: 用户名
roles: 用户角色列表
extra_claims: 额外的声明
Returns:
JWT 字符串
"""
now = datetime.utcnow()
payload = {
# 标准声明
"iss": self.config.issuer,
"sub": str(user_id),
"aud": self.config.audience,
"iat": now,
"exp": now + timedelta(seconds=self.config.access_token_expire),
"jti": self._generate_jti(),
# 自定义声明
"type": TokenType.ACCESS.value,
"username": username,
"roles": roles or [],
}
# 添加额外声明
if extra_claims:
payload.update(extra_claims)
return self._encode(payload)
def create_refresh_token(self, user_id: int, device_id: str = None) -> str:
"""
创建刷新令牌
Args:
user_id: 用户 ID
device_id: 设备 ID(用于多设备管理)
Returns:
JWT 字符串
"""
now = datetime.utcnow()
payload = {
"iss": self.config.issuer,
"sub": str(user_id),
"aud": self.config.audience,
"iat": now,
"exp": now + timedelta(seconds=self.config.refresh_token_expire),
"jti": self._generate_jti(),
"type": TokenType.REFRESH.value,
"device_id": device_id,
}
return self._encode(payload)
def create_token_pair(
self,
user_id: int,
username: str,
roles: list = None,
device_id: str = None
) -> Dict[str, str]:
"""
创建令牌对(访问令牌 + 刷新令牌)
Returns:
{
"access_token": "xxx",
"refresh_token": "xxx",
"token_type": "Bearer",
"expires_in": 900
}
"""
access_token = self.create_access_token(user_id, username, roles)
refresh_token = self.create_refresh_token(user_id, device_id)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "Bearer",
"expires_in": self.config.access_token_expire
}
def verify_token(
self,
token: str,
expected_type: TokenType = None,
verify_exp: bool = True
) -> Optional[Dict[str, Any]]:
"""
验证并解码 Token
Args:
token: JWT 字符串
expected_type: 期望的 Token 类型
verify_exp: 是否验证过期时间
Returns:
解码后的 payload,验证失败返回 None
"""
try:
payload = self._decode(token, verify_exp=verify_exp)
# 验证 Token 类型
if expected_type and payload.get("type") != expected_type.value:
return None
return payload
except jwt.ExpiredSignatureError:
# Token 已过期
return None
except jwt.InvalidTokenError:
# Token 无效
return None
def decode_without_verify(self, token: str) -> Optional[Dict[str, Any]]:
"""
不验证签名,仅解码 Token(用于调试或获取过期 Token 的信息)
⚠️ 警告:不要在生产环境中依赖此方法的结果
"""
try:
return jwt.decode(
token,
options={"verify_signature": False}
)
except jwt.InvalidTokenError:
return None
def refresh_access_token(self, refresh_token: str) -> Optional[Dict[str, str]]:
"""
使用刷新令牌获取新的访问令牌
Args:
refresh_token: 刷新令牌
Returns:
新的令牌对,刷新失败返回 None
"""
payload = self.verify_token(refresh_token, TokenType.REFRESH)
if not payload:
return None
# 这里应该查询数据库获取最新的用户信息
user_id = int(payload["sub"])
# 创建新的访问令牌
# 注意:通常不会刷新 refresh_token,除非快过期
return {
"access_token": self.create_access_token(
user_id=user_id,
username=payload.get("username", ""),
roles=payload.get("roles", [])
),
"token_type": "Bearer",
"expires_in": self.config.access_token_expire
}
def _encode(self, payload: dict) -> str:
"""编码 JWT"""
if self.config.algorithm.startswith("RS"):
# RSA 算法使用私钥签名
key = self.config.private_key
else:
# HMAC 算法使用对称密钥
key = self.config.secret_key
return jwt.encode(payload, key, algorithm=self.config.algorithm)
def _decode(self, token: str, verify_exp: bool = True) -> dict:
"""解码 JWT"""
if self.config.algorithm.startswith("RS"):
# RSA 算法使用公钥验证
key = self.config.public_key
else:
key = self.config.secret_key
options = {
"verify_exp": verify_exp,
"verify_aud": True,
"verify_iss": True
}
return jwt.decode(
token,
key,
algorithms=[self.config.algorithm],
audience=self.config.audience,
issuer=self.config.issuer,
options=options
)
def _generate_jti(self) -> str:
"""生成唯一的 Token ID"""
import uuid
return str(uuid.uuid4())
# 使用示例
if __name__ == "__main__":
# 配置
config = TokenConfig(
secret_key="your-256-bit-secret-key-here-keep-safe",
algorithm="HS256",
access_token_expire=900, # 15 分钟
refresh_token_expire=604800, # 7 天
issuer="my-app",
audience="my-app"
)
jwt_manager = JWTManager(config)
# 创建令牌
tokens = jwt_manager.create_token_pair(
user_id=12345,
username="john_doe",
roles=["user", "admin"],
device_id="device_abc123"
)
print("Access Token:", tokens["access_token"][:50] + "...")
print("Refresh Token:", tokens["refresh_token"][:50] + "...")
# 验证令牌
payload = jwt_manager.verify_token(tokens["access_token"], TokenType.ACCESS)
print("Verified Payload:", payload)
# 刷新令牌
new_tokens = jwt_manager.refresh_access_token(tokens["refresh_token"])
print("New Access Token:", new_tokens["access_token"][:50] + "...")
RSA 非对称签名
密钥生成与管理
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
import os
class RSAKeyManager:
"""RSA 密钥管理器"""
@staticmethod
def generate_key_pair(key_size: int = 2048) -> tuple[str, str]:
"""
生成 RSA 密钥对
Args:
key_size: 密钥长度(推荐 2048 或 4096)
Returns:
(private_key_pem, public_key_pem)
"""
# 生成私钥
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
backend=default_backend()
)
# 序列化私钥
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode('utf-8')
# 获取并序列化公钥
public_key = private_key.public_key()
public_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode('utf-8')
return private_pem, public_pem
@staticmethod
def save_keys(private_key: str, public_key: str, directory: str = "./keys"):
"""保存密钥到文件"""
os.makedirs(directory, exist_ok=True)
with open(os.path.join(directory, "private.pem"), "w") as f:
f.write(private_key)
with open(os.path.join(directory, "public.pem"), "w") as f:
f.write(public_key)
@staticmethod
def load_keys(directory: str = "./keys") -> tuple[str, str]:
"""从文件加载密钥"""
with open(os.path.join(directory, "private.pem"), "r") as f:
private_key = f.read()
with open(os.path.join(directory, "public.pem"), "r") as f:
public_key = f.read()
return private_key, public_key
# RSA JWT 使用示例
def create_rsa_jwt_example():
"""RSA JWT 使用示例"""
# 生成密钥对
private_key, public_key = RSAKeyManager.generate_key_pair()
# 配置
config = TokenConfig(
secret_key="", # RSA 不需要
private_key=private_key,
public_key=public_key,
algorithm="RS256",
issuer="my-auth-server"
)
jwt_manager = JWTManager(config)
# 创建 Token
token = jwt_manager.create_access_token(
user_id=123,
username="alice",
roles=["user"]
)
print("RSA Signed Token:", token[:80] + "...")
# 验证 Token(只需要公钥)
payload = jwt_manager.verify_token(token)
print("Verified:", payload)
return public_key # 公钥可以分发给其他服务
JWKS(JSON Web Key Set)
用于公开发布公钥,便于其他服务验证 Token:
import json
import base64
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
class JWKSProvider:
"""JWKS 提供者"""
def __init__(self, public_key_pem: str, key_id: str = "default"):
self.public_key_pem = public_key_pem
self.key_id = key_id
def get_jwks(self) -> dict:
"""
生成 JWKS(用于 /.well-known/jwks.json 端点)
Returns:
JWKS 格式的公钥集合
"""
public_key = serialization.load_pem_public_key(
self.public_key_pem.encode(),
backend=default_backend()
)
# 获取公钥的数字参数
numbers = public_key.public_numbers()
# 转换为 Base64Url 编码
def int_to_base64url(n: int, length: int) -> str:
data = n.to_bytes(length, byteorder='big')
return base64.urlsafe_b64encode(data).rstrip(b'=').decode('ascii')
# 计算字节长度
n_bytes = (numbers.n.bit_length() + 7) // 8
e_bytes = (numbers.e.bit_length() + 7) // 8
jwk = {
"kty": "RSA",
"use": "sig",
"alg": "RS256",
"kid": self.key_id,
"n": int_to_base64url(numbers.n, n_bytes),
"e": int_to_base64url(numbers.e, e_bytes)
}
return {"keys": [jwk]}
# Flask JWKS 端点示例
from flask import Flask, jsonify
app = Flask(__name__)
# 初始化 JWKS 提供者
jwks_provider = JWKSProvider(public_key_pem, key_id="my-key-2024")
@app.route("/.well-known/jwks.json")
def jwks():
"""JWKS 端点"""
return jsonify(jwks_provider.get_jwks())
Token 刷新机制
刷新策略对比
完整的 Token 刷新实现
from datetime import datetime, timedelta
from typing import Optional
import redis
import json
class TokenRefreshManager:
"""Token 刷新管理器"""
def __init__(
self,
jwt_manager: JWTManager,
redis_client: redis.Redis,
refresh_window: int = 180 # 刷新窗口(秒)
):
self.jwt_manager = jwt_manager
self.redis = redis_client
self.refresh_window = refresh_window
self.blacklist_prefix = "token_blacklist:"
self.refresh_token_prefix = "refresh_token:"
def should_refresh(self, token: str) -> bool:
"""
检查 Token 是否应该刷新
Args:
token: Access Token
Returns:
是否应该刷新
"""
payload = self.jwt_manager.decode_without_verify(token)
if not payload:
return False
exp = payload.get("exp")
if not exp:
return False
# 检查是否在刷新窗口内
exp_time = datetime.utcfromtimestamp(exp)
time_left = (exp_time - datetime.utcnow()).total_seconds()
return 0 < time_left <= self.refresh_window
def refresh_with_rotation(
self,
refresh_token: str
) -> Optional[dict]:
"""
刷新 Token(带轮换)
刷新 Access Token 的同时生成新的 Refresh Token,
旧的 Refresh Token 失效
Args:
refresh_token: 刷新令牌
Returns:
新的令牌对,失败返回 None
"""
# 验证 Refresh Token
payload = self.jwt_manager.verify_token(refresh_token, TokenType.REFRESH)
if not payload:
return None
jti = payload.get("jti")
user_id = payload.get("sub")
# 检查是否在黑名单中
if self._is_blacklisted(jti):
return None
# 检查是否是有效的 Refresh Token(防止重放)
stored_jti = self._get_active_refresh_token(user_id, payload.get("device_id"))
if stored_jti and stored_jti != jti:
# 可能是 Token 被盗用,撤销所有 Token
self._revoke_all_tokens(user_id)
return None
# 将旧的 Refresh Token 加入黑名单
self._blacklist_token(jti, payload.get("exp"))
# 获取用户信息(应从数据库查询)
user_info = self._get_user_info(int(user_id))
if not user_info:
return None
# 创建新的令牌对
new_tokens = self.jwt_manager.create_token_pair(
user_id=user_info["id"],
username=user_info["username"],
roles=user_info.get("roles", []),
device_id=payload.get("device_id")
)
# 存储新的 Refresh Token JTI
new_payload = self.jwt_manager.decode_without_verify(new_tokens["refresh_token"])
self._store_active_refresh_token(
user_id,
payload.get("device_id"),
new_payload["jti"],
new_payload["exp"]
)
return new_tokens
def _is_blacklisted(self, jti: str) -> bool:
"""检查 Token 是否在黑名单中"""
return self.redis.exists(f"{self.blacklist_prefix}{jti}")
def _blacklist_token(self, jti: str, exp: int):
"""将 Token 加入黑名单"""
# 黑名单过期时间与原 Token 一致
ttl = exp - int(datetime.utcnow().timestamp())
if ttl > 0:
self.redis.setex(f"{self.blacklist_prefix}{jti}", ttl, "1")
def _get_active_refresh_token(self, user_id: str, device_id: str) -> Optional[str]:
"""获取用户当前有效的 Refresh Token JTI"""
key = f"{self.refresh_token_prefix}{user_id}:{device_id or 'default'}"
return self.redis.get(key)
def _store_active_refresh_token(
self,
user_id: str,
device_id: str,
jti: str,
exp: int
):
"""存储用户当前有效的 Refresh Token JTI"""
key = f"{self.refresh_token_prefix}{user_id}:{device_id or 'default'}"
ttl = exp - int(datetime.utcnow().timestamp())
if ttl > 0:
self.redis.setex(key, ttl, jti)
def _revoke_all_tokens(self, user_id: str):
"""撤销用户所有 Token(安全措施)"""
# 删除所有 Refresh Token 记录
pattern = f"{self.refresh_token_prefix}{user_id}:*"
for key in self.redis.scan_iter(pattern):
self.redis.delete(key)
# 实际应用中可能还需要:
# 1. 通知用户账号可能被盗
# 2. 记录安全日志
# 3. 强制用户重新登录
def _get_user_info(self, user_id: int) -> Optional[dict]:
"""获取用户信息(示例)"""
# 实际应从数据库查询
return {
"id": user_id,
"username": "user",
"roles": ["user"]
}
Token 撤销方案
撤销策略对比
混合撤销方案实现
from typing import Set
from datetime import datetime
import redis
class TokenRevocationManager:
"""Token 撤销管理器(混合方案)"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.blacklist_prefix = "revoked:"
self.version_prefix = "user_token_version:"
def revoke_token(self, jti: str, exp: int) -> bool:
"""
撤销单个 Token(黑名单方式)
Args:
jti: Token ID
exp: Token 过期时间戳
Returns:
是否成功
"""
ttl = exp - int(datetime.utcnow().timestamp())
if ttl <= 0:
return True # 已过期,无需处理
key = f"{self.blacklist_prefix}{jti}"
self.redis.setex(key, ttl, "1")
return True
def revoke_all_user_tokens(self, user_id: int) -> bool:
"""
撤销用户所有 Token(版本号方式)
Args:
user_id: 用户 ID
Returns:
新的版本号
"""
key = f"{self.version_prefix}{user_id}"
return self.redis.incr(key)
def is_token_revoked(self, jti: str, user_id: int, token_version: int = None) -> bool:
"""
检查 Token 是否被撤销
Args:
jti: Token ID
user_id: 用户 ID
token_version: Token 中的版本号(可选)
Returns:
是否被撤销
"""
# 检查黑名单
if self.redis.exists(f"{self.blacklist_prefix}{jti}"):
return True
# 检查版本号
if token_version is not None:
current_version = self.redis.get(f"{self.version_prefix}{user_id}")
if current_version:
if token_version < int(current_version):
return True
return False
def get_user_token_version(self, user_id: int) -> int:
"""获取用户当前 Token 版本"""
version = self.redis.get(f"{self.version_prefix}{user_id}")
return int(version) if version else 0
# 集成到 JWT 验证中
class SecureJWTManager(JWTManager):
"""带撤销检查的 JWT 管理器"""
def __init__(self, config: TokenConfig, revocation_manager: TokenRevocationManager):
super().__init__(config)
self.revocation = revocation_manager
def create_access_token(
self,
user_id: int,
username: str,
roles: list = None,
extra_claims: dict = None
) -> str:
"""创建带版本号的访问令牌"""
# 获取当前版本号
token_version = self.revocation.get_user_token_version(user_id)
extra = extra_claims or {}
extra["token_version"] = token_version
return super().create_access_token(user_id, username, roles, extra)
def verify_token(
self,
token: str,
expected_type: TokenType = None,
verify_exp: bool = True
) -> Optional[dict]:
"""验证 Token(包含撤销检查)"""
payload = super().verify_token(token, expected_type, verify_exp)
if not payload:
return None
# 检查是否被撤销
is_revoked = self.revocation.is_token_revoked(
jti=payload.get("jti"),
user_id=int(payload.get("sub")),
token_version=payload.get("token_version")
)
if is_revoked:
return None
return payload
安全最佳实践
JWT 安全清单
客户端 Token 存储
"""
客户端 Token 存储建议(前端参考)
"""
# 浏览器端存储方案对比
STORAGE_COMPARISON = """
┌──────────────────┬────────────┬────────────┬─────────────────┐
│ 存储方式 │ XSS 风险 │ CSRF 风险 │ 推荐场景 │
├──────────────────┼────────────┼────────────┼─────────────────┤
│ localStorage │ 高 │ 低 │ ❌ 不推荐 │
├──────────────────┼────────────┼────────────┼─────────────────┤
│ sessionStorage │ 高 │ 低 │ ❌ 不推荐 │
├──────────────────┼────────────┼────────────┼─────────────────┤
│ HttpOnly Cookie │ 低 │ 高 │ ✅ 推荐 │
│ (配合SameSite) │ │ (可防) │ Web 应用 │
├──────────────────┼────────────┼────────────┼─────────────────┤
│ 内存 (变量) │ 低 │ 低 │ ✅ 推荐 │
│ │ │ │ SPA 应用 │
└──────────────────┴────────────┴────────────┴─────────────────┘
"""
# 推荐方案:HttpOnly Cookie + CSRF Token
def set_token_cookie_example():
"""
服务端设置 Token Cookie 示例
"""
from flask import make_response
response = make_response({"message": "登录成功"})
# Access Token 放在 HttpOnly Cookie 中
response.set_cookie(
key="access_token",
value=access_token,
httponly=True, # 防止 XSS 读取
secure=True, # 仅 HTTPS
samesite="Strict", # 防止 CSRF
max_age=900 # 15 分钟
)
# Refresh Token 放在更严格的 Cookie 中
response.set_cookie(
key="refresh_token",
value=refresh_token,
httponly=True,
secure=True,
samesite="Strict",
path="/api/auth/refresh", # 限制路径
max_age=604800 # 7 天
)
return response
本章小结
核心要点
- JWT 结构:Header + Payload + Signature,Base64Url 编码
- 签名算法:对称(HS256)用于内部服务,非对称(RS256)用于分布式系统
- 刷新机制:Access Token 短有效期 + Refresh Token 长有效期
- 撤销方案:黑名单、版本号、短有效期各有适用场景