SSO协议与标准
2026/3/20大约 17 分钟
SSO 协议与标准
协议概览
SSO 领域有多种成熟的协议标准,各有其适用场景:
协议对比总览
| 特性 | CAS | SAML 2.0 | OAuth 2.0 | OIDC |
|---|---|---|---|---|
| 主要用途 | Web SSO | 企业联邦身份 | 授权委托 | 身份认证+授权 |
| 数据格式 | XML/自定义 | XML | JSON | JSON |
| 传输方式 | 重定向 | 重定向/POST | 重定向 | 重定向 |
| 复杂度 | 低 | 高 | 中 | 中 |
| 移动端支持 | 一般 | 差 | 优秀 | 优秀 |
| 典型场景 | 学校/企业内网 | 大型企业/政府 | 第三方授权 | 现代应用 SSO |
CAS 协议
CAS 简介
CAS(Central Authentication Service)由耶鲁大学开发,是最早的开源 SSO 解决方案之一,设计简洁,适合企业内部应用。
CAS 协议流程
CAS 验证响应格式
<!-- 成功响应 -->
<cas:serviceResponse xmlns:cas="http://www.yale.edu/tp/cas">
<cas:authenticationSuccess>
<cas:user>admin</cas:user>
<cas:attributes>
<cas:email>admin@example.com</cas:email>
<cas:name>系统管理员</cas:name>
<cas:roles>
<cas:role>ADMIN</cas:role>
<cas:role>USER</cas:role>
</cas:roles>
</cas:attributes>
</cas:authenticationSuccess>
</cas:serviceResponse>
<!-- 失败响应 -->
<cas:serviceResponse xmlns:cas="http://www.yale.edu/tp/cas">
<cas:authenticationFailure code="INVALID_TICKET">
Ticket ST-xxx 不存在或已过期
</cas:authenticationFailure>
</cas:serviceResponse>
Python CAS 客户端实现
import requests
import xml.etree.ElementTree as ET
from flask import Flask, request, redirect, session, jsonify
from functools import wraps
from urllib.parse import urlencode
app = Flask(__name__)
app.secret_key = "your-secret-key"
# CAS 配置
CAS_SERVER = "https://cas.example.com"
APP_SERVICE_URL = "https://app.example.com"
class CASClient:
"""CAS 客户端"""
def __init__(self, cas_server: str, service_url: str):
self.cas_server = cas_server
self.service_url = service_url
def get_login_url(self, target_url: str = None) -> str:
"""获取 CAS 登录 URL"""
service = target_url or self.service_url
params = urlencode({"service": service})
return f"{self.cas_server}/login?{params}"
def get_logout_url(self, target_url: str = None) -> str:
"""获取 CAS 登出 URL"""
if target_url:
params = urlencode({"service": target_url})
return f"{self.cas_server}/logout?{params}"
return f"{self.cas_server}/logout"
def validate_ticket(self, ticket: str, service_url: str) -> dict | None:
"""验证服务票据"""
validate_url = f"{self.cas_server}/serviceValidate"
params = {
"ticket": ticket,
"service": service_url
}
try:
response = requests.get(validate_url, params=params, timeout=10)
if response.status_code == 200:
return self._parse_cas_response(response.text)
except requests.RequestException as e:
print(f"CAS 验证请求失败: {e}")
return None
def _parse_cas_response(self, xml_text: str) -> dict | None:
"""解析 CAS XML 响应"""
try:
# 定义命名空间
ns = {"cas": "http://www.yale.edu/tp/cas"}
root = ET.fromstring(xml_text)
# 查找成功元素
success = root.find(".//cas:authenticationSuccess", ns)
if success is not None:
user = success.find("cas:user", ns)
result = {"username": user.text if user is not None else None}
# 解析属性
attributes = success.find("cas:attributes", ns)
if attributes is not None:
for child in attributes:
# 去除命名空间前缀
tag = child.tag.replace(f"{{{ns['cas']}}}", "")
result[tag] = child.text
return result
# 查找失败元素
failure = root.find(".//cas:authenticationFailure", ns)
if failure is not None:
print(f"CAS 认证失败: {failure.text}")
except ET.ParseError as e:
print(f"XML 解析错误: {e}")
return None
cas_client = CASClient(CAS_SERVER, APP_SERVICE_URL)
def cas_required(f):
"""CAS 认证装饰器"""
@wraps(f)
def decorated(*args, **kwargs):
# 1. 检查本地会话
if "cas_user" in session:
return f(*args, **kwargs)
# 2. 检查是否有 ticket 参数
ticket = request.args.get("ticket")
if ticket:
# 获取当前请求 URL(去除 ticket 参数)
service_url = request.url.split("?")[0]
# 验证票据
user_info = cas_client.validate_ticket(ticket, service_url)
if user_info:
session["cas_user"] = user_info
# 重定向去除 ticket 参数
return redirect(service_url)
# 3. 重定向到 CAS 登录
login_url = cas_client.get_login_url(request.url)
return redirect(login_url)
return decorated
@app.route("/")
def index():
"""首页(无需登录)"""
return "欢迎访问!<a href='/dashboard'>进入系统</a>"
@app.route("/dashboard")
@cas_required
def dashboard():
"""仪表盘(需要登录)"""
user = session.get("cas_user", {})
return f"欢迎, {user.get('username')}!"
@app.route("/logout")
def logout():
"""登出"""
session.clear()
return redirect(cas_client.get_logout_url(APP_SERVICE_URL))
@app.route("/api/user")
@cas_required
def get_user():
"""获取当前用户信息"""
return jsonify(session.get("cas_user"))
SAML 2.0 协议
SAML 简介
SAML(Security Assertion Markup Language)是一种基于 XML 的开放标准,用于在身份提供者和服务提供者之间交换认证和授权数据。
SAML Web SSO 流程
SAML 消息示例
AuthnRequest(认证请求):
<samlp:AuthnRequest
xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol"
xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion"
ID="_abc123"
Version="2.0"
IssueInstant="2024-01-15T10:00:00Z"
Destination="https://idp.example.com/sso"
AssertionConsumerServiceURL="https://sp.example.com/acs"
ProtocolBinding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST">
<saml:Issuer>https://sp.example.com</saml:Issuer>
<samlp:NameIDPolicy
Format="urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress"
AllowCreate="true"/>
</samlp:AuthnRequest>
SAML Assertion(断言):
<saml:Assertion
xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion"
ID="_xyz789"
Version="2.0"
IssueInstant="2024-01-15T10:01:00Z">
<saml:Issuer>https://idp.example.com</saml:Issuer>
<ds:Signature xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
<!-- 数字签名 -->
</ds:Signature>
<saml:Subject>
<saml:NameID Format="urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress">
user@example.com
</saml:NameID>
<saml:SubjectConfirmation Method="urn:oasis:names:tc:SAML:2.0:cm:bearer">
<saml:SubjectConfirmationData
NotOnOrAfter="2024-01-15T10:06:00Z"
Recipient="https://sp.example.com/acs"/>
</saml:SubjectConfirmation>
</saml:Subject>
<saml:Conditions
NotBefore="2024-01-15T10:00:00Z"
NotOnOrAfter="2024-01-15T10:06:00Z">
<saml:AudienceRestriction>
<saml:Audience>https://sp.example.com</saml:Audience>
</saml:AudienceRestriction>
</saml:Conditions>
<saml:AuthnStatement
AuthnInstant="2024-01-15T10:00:30Z"
SessionIndex="_session123">
<saml:AuthnContext>
<saml:AuthnContextClassRef>
urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport
</saml:AuthnContextClassRef>
</saml:AuthnContext>
</saml:AuthnStatement>
<saml:AttributeStatement>
<saml:Attribute Name="email">
<saml:AttributeValue>user@example.com</saml:AttributeValue>
</saml:Attribute>
<saml:Attribute Name="displayName">
<saml:AttributeValue>张三</saml:AttributeValue>
</saml:Attribute>
<saml:Attribute Name="roles">
<saml:AttributeValue>admin</saml:AttributeValue>
<saml:AttributeValue>user</saml:AttributeValue>
</saml:Attribute>
</saml:AttributeStatement>
</saml:Assertion>
Python SAML 实现(使用 python3-saml)
from flask import Flask, request, redirect, session, make_response
from onelogin.saml2.auth import OneLogin_Saml2_Auth
from onelogin.saml2.utils import OneLogin_Saml2_Utils
import json
app = Flask(__name__)
app.secret_key = "your-secret-key"
def init_saml_auth(req):
"""初始化 SAML 认证对象"""
auth = OneLogin_Saml2_Auth(req, custom_base_path="saml/")
return auth
def prepare_flask_request(request):
"""准备 Flask 请求数据供 SAML 库使用"""
url_data = request.url.split("?")
return {
"https": "on" if request.scheme == "https" else "off",
"http_host": request.host,
"server_port": request.environ.get("SERVER_PORT"),
"script_name": request.path,
"get_data": request.args.copy(),
"post_data": request.form.copy(),
"query_string": request.query_string.decode("utf-8")
}
# SAML 配置文件示例 (saml/settings.json)
SAML_SETTINGS = {
"strict": True,
"debug": False,
"sp": {
"entityId": "https://sp.example.com/metadata",
"assertionConsumerService": {
"url": "https://sp.example.com/acs",
"binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST"
},
"singleLogoutService": {
"url": "https://sp.example.com/sls",
"binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
},
"NameIDFormat": "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress"
},
"idp": {
"entityId": "https://idp.example.com",
"singleSignOnService": {
"url": "https://idp.example.com/sso",
"binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
},
"singleLogoutService": {
"url": "https://idp.example.com/slo",
"binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
},
"x509cert": "-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----"
}
}
@app.route("/")
def index():
"""首页"""
if "saml_user" in session:
return f"已登录: {session['saml_user']}"
return '<a href="/login">SAML 登录</a>'
@app.route("/login")
def login():
"""发起 SAML 登录请求"""
req = prepare_flask_request(request)
auth = init_saml_auth(req)
# 生成 AuthnRequest 并重定向到 IdP
return redirect(auth.login())
@app.route("/acs", methods=["POST"])
def acs():
"""Assertion Consumer Service - 接收 SAML Response"""
req = prepare_flask_request(request)
auth = init_saml_auth(req)
# 处理 SAML Response
auth.process_response()
errors = auth.get_errors()
if errors:
return f"SAML 错误: {', '.join(errors)}", 400
if not auth.is_authenticated():
return "认证失败", 401
# 获取用户信息
session["saml_user"] = {
"name_id": auth.get_nameid(),
"session_index": auth.get_session_index(),
"attributes": auth.get_attributes()
}
# 重定向到原始请求页面
relay_state = request.form.get("RelayState", "/")
return redirect(relay_state)
@app.route("/logout")
def logout():
"""发起 SAML 登出请求"""
req = prepare_flask_request(request)
auth = init_saml_auth(req)
name_id = session.get("saml_user", {}).get("name_id")
session_index = session.get("saml_user", {}).get("session_index")
return redirect(auth.logout(name_id=name_id, session_index=session_index))
@app.route("/sls", methods=["GET", "POST"])
def sls():
"""Single Logout Service - 处理登出响应/请求"""
req = prepare_flask_request(request)
auth = init_saml_auth(req)
# 处理登出
url = auth.process_slo()
errors = auth.get_errors()
if errors:
return f"登出错误: {', '.join(errors)}", 400
# 清除本地会话
session.clear()
if url:
return redirect(url)
return redirect("/")
@app.route("/metadata")
def metadata():
"""提供 SP 元数据"""
req = prepare_flask_request(request)
auth = init_saml_auth(req)
settings = auth.get_settings()
metadata = settings.get_sp_metadata()
errors = settings.validate_metadata(metadata)
if errors:
return f"元数据错误: {', '.join(errors)}", 500
response = make_response(metadata)
response.headers["Content-Type"] = "application/xml"
return response
OAuth 2.0 协议
OAuth 2.0 简介
OAuth 2.0 是一个授权框架,允许第三方应用在用户授权的情况下访问用户资源,而无需获取用户的密码。
OAuth 2.0 角色
| 角色 | 说明 | 示例 |
|---|---|---|
| Resource Owner | 资源所有者,通常是用户 | 微信用户 |
| Client | 客户端,请求访问资源的应用 | 第三方 App |
| Authorization Server | 授权服务器,负责颁发 Token | 微信开放平台 |
| Resource Server | 资源服务器,托管受保护资源 | 微信 API 服务器 |
OAuth 2.0 授权模式
Authorization Code 流程详解
Python OAuth 2.0 实现
from flask import Flask, request, redirect, session, jsonify
import requests
import secrets
from urllib.parse import urlencode
app = Flask(__name__)
app.secret_key = "your-flask-secret-key"
# OAuth 配置(以 GitHub 为例)
OAUTH_CONFIG = {
"client_id": "your_github_client_id",
"client_secret": "your_github_client_secret",
"authorize_url": "https://github.com/login/oauth/authorize",
"token_url": "https://github.com/login/oauth/access_token",
"api_base_url": "https://api.github.com",
"redirect_uri": "http://localhost:5000/callback",
"scope": "read:user user:email"
}
@app.route("/")
def index():
"""首页"""
if "user" in session:
user = session["user"]
return f"欢迎, {user['login']}! <a href='/logout'>登出</a>"
return '<a href="/login">使用 GitHub 登录</a>'
@app.route("/login")
def login():
"""发起 OAuth 授权请求"""
# 生成 state 防止 CSRF
state = secrets.token_urlsafe(32)
session["oauth_state"] = state
# 构建授权 URL
params = {
"client_id": OAUTH_CONFIG["client_id"],
"redirect_uri": OAUTH_CONFIG["redirect_uri"],
"scope": OAUTH_CONFIG["scope"],
"state": state,
"response_type": "code"
}
auth_url = f"{OAUTH_CONFIG['authorize_url']}?{urlencode(params)}"
return redirect(auth_url)
@app.route("/callback")
def callback():
"""OAuth 回调处理"""
# 1. 验证 state
state = request.args.get("state")
if state != session.pop("oauth_state", None):
return "State 验证失败(可能是 CSRF 攻击)", 400
# 2. 检查错误
error = request.args.get("error")
if error:
error_desc = request.args.get("error_description", "未知错误")
return f"授权失败: {error_desc}", 400
# 3. 获取授权码
code = request.args.get("code")
if not code:
return "未收到授权码", 400
# 4. 用授权码换取 Token
token_data = exchange_code_for_token(code)
if not token_data:
return "获取 Token 失败", 400
access_token = token_data.get("access_token")
# 5. 使用 Token 获取用户信息
user_info = get_user_info(access_token)
if not user_info:
return "获取用户信息失败", 400
# 6. 存储到会话
session["user"] = user_info
session["access_token"] = access_token
return redirect("/")
def exchange_code_for_token(code: str) -> dict | None:
"""用授权码换取 Access Token"""
data = {
"client_id": OAUTH_CONFIG["client_id"],
"client_secret": OAUTH_CONFIG["client_secret"],
"code": code,
"redirect_uri": OAUTH_CONFIG["redirect_uri"],
"grant_type": "authorization_code"
}
headers = {"Accept": "application/json"}
try:
response = requests.post(
OAUTH_CONFIG["token_url"],
data=data,
headers=headers,
timeout=10
)
if response.status_code == 200:
return response.json()
except requests.RequestException as e:
print(f"Token 请求失败: {e}")
return None
def get_user_info(access_token: str) -> dict | None:
"""使用 Access Token 获取用户信息"""
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json"
}
try:
response = requests.get(
f"{OAUTH_CONFIG['api_base_url']}/user",
headers=headers,
timeout=10
)
if response.status_code == 200:
return response.json()
except requests.RequestException as e:
print(f"获取用户信息失败: {e}")
return None
@app.route("/logout")
def logout():
"""登出"""
session.clear()
return redirect("/")
@app.route("/api/repos")
def get_repos():
"""获取用户的仓库列表(需要登录)"""
access_token = session.get("access_token")
if not access_token:
return jsonify({"error": "未登录"}), 401
headers = {"Authorization": f"Bearer {access_token}"}
try:
response = requests.get(
f"{OAUTH_CONFIG['api_base_url']}/user/repos",
headers=headers,
timeout=10
)
return jsonify(response.json())
except requests.RequestException:
return jsonify({"error": "请求失败"}), 500
OpenID Connect (OIDC)
OIDC 简介
OpenID Connect 是建立在 OAuth 2.0 之上的身份认证层,它在 OAuth 2.0 的授权能力基础上增加了身份认证功能。
OIDC 核心概念
OIDC 认证流程
Python OIDC 实现
from flask import Flask, request, redirect, session, jsonify
import requests
import jwt
import secrets
from urllib.parse import urlencode
from jwt import PyJWKClient
app = Flask(__name__)
app.secret_key = "your-flask-secret-key"
# OIDC 配置(以 Keycloak 为例)
OIDC_CONFIG = {
"issuer": "https://keycloak.example.com/realms/myrealm",
"client_id": "my-client",
"client_secret": "my-client-secret",
"redirect_uri": "http://localhost:5000/callback",
"scope": "openid profile email"
}
class OIDCClient:
"""OIDC 客户端"""
def __init__(self, config: dict):
self.config = config
self._discovery = None
self._jwks_client = None
@property
def discovery(self) -> dict:
"""获取 OIDC Discovery 配置"""
if self._discovery is None:
discovery_url = f"{self.config['issuer']}/.well-known/openid-configuration"
response = requests.get(discovery_url, timeout=10)
self._discovery = response.json()
return self._discovery
@property
def jwks_client(self) -> PyJWKClient:
"""获取 JWKS 客户端(用于验证 Token 签名)"""
if self._jwks_client is None:
self._jwks_client = PyJWKClient(self.discovery["jwks_uri"])
return self._jwks_client
def get_auth_url(self, state: str, nonce: str) -> str:
"""生成授权 URL"""
params = {
"client_id": self.config["client_id"],
"redirect_uri": self.config["redirect_uri"],
"scope": self.config["scope"],
"response_type": "code",
"state": state,
"nonce": nonce
}
return f"{self.discovery['authorization_endpoint']}?{urlencode(params)}"
def exchange_code(self, code: str) -> dict | None:
"""用授权码换取 Token"""
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": self.config["redirect_uri"],
"client_id": self.config["client_id"],
"client_secret": self.config["client_secret"]
}
response = requests.post(
self.discovery["token_endpoint"],
data=data,
timeout=10
)
if response.status_code == 200:
return response.json()
return None
def verify_id_token(self, id_token: str, nonce: str) -> dict | None:
"""验证 ID Token"""
try:
# 获取签名密钥
signing_key = self.jwks_client.get_signing_key_from_jwt(id_token)
# 验证并解码 Token
payload = jwt.decode(
id_token,
signing_key.key,
algorithms=["RS256"],
audience=self.config["client_id"],
issuer=self.config["issuer"]
)
# 验证 nonce
if payload.get("nonce") != nonce:
print("Nonce 不匹配")
return None
return payload
except jwt.InvalidTokenError as e:
print(f"ID Token 验证失败: {e}")
return None
def get_userinfo(self, access_token: str) -> dict | None:
"""获取 UserInfo"""
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.get(
self.discovery["userinfo_endpoint"],
headers=headers,
timeout=10
)
if response.status_code == 200:
return response.json()
return None
def get_logout_url(self, id_token_hint: str = None, redirect_uri: str = None) -> str:
"""获取登出 URL"""
params = {}
if id_token_hint:
params["id_token_hint"] = id_token_hint
if redirect_uri:
params["post_logout_redirect_uri"] = redirect_uri
params["client_id"] = self.config["client_id"]
base_url = self.discovery.get("end_session_endpoint", "")
if params:
return f"{base_url}?{urlencode(params)}"
return base_url
oidc_client = OIDCClient(OIDC_CONFIG)
@app.route("/")
def index():
"""首页"""
if "user" in session:
return f"欢迎, {session['user'].get('name', '用户')}! <a href='/logout'>登出</a>"
return '<a href="/login">OIDC 登录</a>'
@app.route("/login")
def login():
"""发起 OIDC 登录"""
state = secrets.token_urlsafe(32)
nonce = secrets.token_urlsafe(32)
session["oauth_state"] = state
session["oauth_nonce"] = nonce
auth_url = oidc_client.get_auth_url(state, nonce)
return redirect(auth_url)
@app.route("/callback")
def callback():
"""OIDC 回调"""
# 验证 state
state = request.args.get("state")
if state != session.pop("oauth_state", None):
return "State 验证失败", 400
nonce = session.pop("oauth_nonce", None)
# 检查错误
if request.args.get("error"):
return f"授权失败: {request.args.get('error_description')}", 400
# 获取授权码
code = request.args.get("code")
if not code:
return "未收到授权码", 400
# 换取 Token
tokens = oidc_client.exchange_code(code)
if not tokens:
return "获取 Token 失败", 400
# 验证 ID Token
id_token = tokens.get("id_token")
claims = oidc_client.verify_id_token(id_token, nonce)
if not claims:
return "ID Token 验证失败", 400
# 存储用户信息
session["user"] = {
"sub": claims.get("sub"),
"name": claims.get("name"),
"email": claims.get("email"),
"preferred_username": claims.get("preferred_username")
}
session["id_token"] = id_token
session["access_token"] = tokens.get("access_token")
return redirect("/")
@app.route("/userinfo")
def userinfo():
"""获取 UserInfo(演示)"""
access_token = session.get("access_token")
if not access_token:
return jsonify({"error": "未登录"}), 401
info = oidc_client.get_userinfo(access_token)
return jsonify(info)
@app.route("/logout")
def logout():
"""OIDC 登出"""
id_token = session.get("id_token")
session.clear()
# 重定向到 IdP 登出
logout_url = oidc_client.get_logout_url(
id_token_hint=id_token,
redirect_uri="http://localhost:5000/"
)
return redirect(logout_url)
协议选型指南
本章小结
核心要点
- CAS:简单的 SSO 协议,适合企业内部应用
- SAML 2.0:企业级联邦身份标准,XML 格式,功能完整
- OAuth 2.0:授权框架,解决"用户授权第三方访问资源"问题
- OIDC:OAuth 2.0 + 身份认证,现代应用的首选
下一章预告
下一章将深入探讨多点登录机制,包括多设备登录管理、互踢策略、登录历史追踪等企业级功能实现。