用户认证与安全
2026/3/20大约 14 分钟
用户认证与安全
第一章:Flask-Login 用户认证
安装与配置
pip install flask-login
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
db = SQLAlchemy(app)
login_manager = LoginManager(app)
# 配置登录视图
login_manager.login_view = 'auth.login' # 未登录时重定向到此视图
login_manager.login_message = '请先登录'
login_manager.login_message_category = 'warning'
# 会话保护级别
login_manager.session_protection = 'strong' # None, 'basic', 'strong'
用户模型
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime
class User(UserMixin, db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(256))
is_active = db.Column(db.Boolean, default=True)
is_admin = db.Column(db.Boolean, default=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
last_login = db.Column(db.DateTime)
def set_password(self, password):
"""设置密码哈希"""
self.password_hash = generate_password_hash(
password,
method='pbkdf2:sha256',
salt_length=16
)
def check_password(self, password):
"""验证密码"""
return check_password_hash(self.password_hash, password)
def get_id(self):
"""Flask-Login 需要的方法"""
return str(self.id)
@property
def is_authenticated(self):
return True
@property
def is_anonymous(self):
return False
# 用户加载回调
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
登录与登出
from flask import Blueprint, render_template, redirect, url_for, flash, request
from flask_login import login_user, logout_user, login_required, current_user
from datetime import datetime
auth = Blueprint('auth', __name__)
@auth.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('main.index'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
remember = request.form.get('remember', False) == 'on'
user = User.query.filter_by(username=username).first()
if user is None or not user.check_password(password):
flash('用户名或密码错误', 'error')
return redirect(url_for('auth.login'))
if not user.is_active:
flash('账户已被禁用', 'error')
return redirect(url_for('auth.login'))
# 登录用户
login_user(user, remember=remember)
# 更新最后登录时间
user.last_login = datetime.utcnow()
db.session.commit()
# 处理 next 参数
next_page = request.args.get('next')
if next_page and is_safe_url(next_page):
return redirect(next_page)
flash('登录成功!', 'success')
return redirect(url_for('main.index'))
return render_template('auth/login.html')
@auth.route('/logout')
@login_required
def logout():
logout_user()
flash('已退出登录', 'info')
return redirect(url_for('main.index'))
def is_safe_url(target):
"""验证重定向 URL 安全性"""
from urllib.parse import urlparse, urljoin
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and \
ref_url.netloc == test_url.netloc
用户注册
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, SubmitField
from wtforms.validators import DataRequired, Email, Length, EqualTo, ValidationError
class RegistrationForm(FlaskForm):
username = StringField('用户名', validators=[
DataRequired(),
Length(min=3, max=20)
])
email = StringField('邮箱', validators=[
DataRequired(),
Email()
])
password = PasswordField('密码', validators=[
DataRequired(),
Length(min=8)
])
confirm_password = PasswordField('确认密码', validators=[
DataRequired(),
EqualTo('password', message='密码不匹配')
])
submit = SubmitField('注册')
def validate_username(self, field):
if User.query.filter_by(username=field.data).first():
raise ValidationError('用户名已存在')
def validate_email(self, field):
if User.query.filter_by(email=field.data).first():
raise ValidationError('邮箱已注册')
@auth.route('/register', methods=['GET', 'POST'])
def register():
if current_user.is_authenticated:
return redirect(url_for('main.index'))
form = RegistrationForm()
if form.validate_on_submit():
user = User(
username=form.username.data,
email=form.email.data.lower()
)
user.set_password(form.password.data)
db.session.add(user)
db.session.commit()
flash('注册成功,请登录!', 'success')
return redirect(url_for('auth.login'))
return render_template('auth/register.html', form=form)
保护视图
from flask_login import login_required, current_user
from functools import wraps
# 基本保护
@app.route('/dashboard')
@login_required
def dashboard():
return render_template('dashboard.html')
# 自定义装饰器:管理员权限
def admin_required(f):
@wraps(f)
@login_required
def decorated_function(*args, **kwargs):
if not current_user.is_admin:
flash('需要管理员权限', 'error')
return redirect(url_for('main.index'))
return f(*args, **kwargs)
return decorated_function
@app.route('/admin')
@admin_required
def admin_panel():
return render_template('admin/panel.html')
# 角色权限装饰器
def role_required(*roles):
def decorator(f):
@wraps(f)
@login_required
def decorated_function(*args, **kwargs):
if current_user.role not in roles:
flash('权限不足', 'error')
return redirect(url_for('main.index'))
return f(*args, **kwargs)
return decorated_function
return decorator
@app.route('/editor')
@role_required('editor', 'admin')
def editor_panel():
return render_template('editor/panel.html')
第二章:Flask-Principal 权限管理
安装与配置
pip install flask-principal
from flask import Flask
from flask_principal import Principal, Identity, AnonymousIdentity
from flask_principal import identity_changed, identity_loaded
from flask_principal import Permission, RoleNeed, UserNeed
app = Flask(__name__)
principal = Principal(app)
# 定义权限
admin_permission = Permission(RoleNeed('admin'))
editor_permission = Permission(RoleNeed('editor'))
moderator_permission = Permission(RoleNeed('moderator'))
身份管理
from flask import g, session
from flask_login import current_user
from flask_principal import identity_changed, identity_loaded
from flask_principal import Identity, AnonymousIdentity, RoleNeed, UserNeed
# 登录时设置身份
@auth.route('/login', methods=['POST'])
def login():
# ... 验证用户 ...
login_user(user)
# 告知 Principal 身份变更
identity_changed.send(
app._get_current_object(),
identity=Identity(user.id)
)
return redirect(url_for('main.index'))
# 登出时清除身份
@auth.route('/logout')
@login_required
def logout():
logout_user()
# 清除身份
identity_changed.send(
app._get_current_object(),
identity=AnonymousIdentity()
)
return redirect(url_for('main.index'))
# 加载用户权限
@identity_loaded.connect_via(app)
def on_identity_loaded(sender, identity):
# 设置用户
identity.user = current_user
if hasattr(current_user, 'id'):
identity.provides.add(UserNeed(current_user.id))
# 添加角色
if hasattr(current_user, 'role'):
identity.provides.add(RoleNeed(current_user.role))
# 添加多个角色(如果用户有多个角色)
if hasattr(current_user, 'roles'):
for role in current_user.roles:
identity.provides.add(RoleNeed(role.name))
权限检查
from flask_principal import Permission, RoleNeed
admin_permission = Permission(RoleNeed('admin'))
# 使用装饰器
@app.route('/admin')
@admin_permission.require(http_exception=403)
def admin_panel():
return render_template('admin/panel.html')
# 在视图中检查
@app.route('/settings')
@login_required
def settings():
if admin_permission.can():
return render_template('admin/settings.html')
return render_template('user/settings.html')
# 在模板中检查
# {% if admin_permission.can() %}
# <a href="/admin">Admin Panel</a>
# {% endif %}
自定义权限
from flask_principal import Permission, Need
# 定义自定义 Need
EditPostNeed = namedtuple('EditPostNeed', ['method', 'value'])
DeletePostNeed = namedtuple('DeletePostNeed', ['method', 'value'])
class EditPostPermission(Permission):
def __init__(self, post_id):
need = EditPostNeed('post', post_id)
super().__init__(need)
class DeletePostPermission(Permission):
def __init__(self, post_id):
need = DeletePostNeed('post', post_id)
super().__init__(need)
# 加载用户权限
@identity_loaded.connect_via(app)
def on_identity_loaded(sender, identity):
identity.user = current_user
if hasattr(current_user, 'id'):
identity.provides.add(UserNeed(current_user.id))
# 添加用户可编辑的文章权限
for post in current_user.posts:
identity.provides.add(EditPostNeed('post', post.id))
identity.provides.add(DeletePostNeed('post', post.id))
# 使用权限
@app.route('/post/<int:post_id>/edit')
@login_required
def edit_post(post_id):
permission = EditPostPermission(post_id)
if not permission.can():
abort(403)
post = Post.query.get_or_404(post_id)
return render_template('post/edit.html', post=post)
第三章:密码安全
密码哈希
from werkzeug.security import generate_password_hash, check_password_hash
import bcrypt
import hashlib
import secrets
class PasswordManager:
"""密码管理器"""
@staticmethod
def hash_password(password, method='pbkdf2:sha256'):
"""使用 Werkzeug 哈希密码"""
return generate_password_hash(
password,
method=method,
salt_length=16
)
@staticmethod
def verify_password(password_hash, password):
"""验证密码"""
return check_password_hash(password_hash, password)
@staticmethod
def hash_with_bcrypt(password):
"""使用 bcrypt 哈希"""
salt = bcrypt.gensalt(rounds=12)
return bcrypt.hashpw(password.encode('utf-8'), salt)
@staticmethod
def verify_bcrypt(hashed, password):
"""验证 bcrypt 密码"""
return bcrypt.checkpw(password.encode('utf-8'), hashed)
@staticmethod
def generate_random_password(length=16):
"""生成随机密码"""
alphabet = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*'
return ''.join(secrets.choice(alphabet) for _ in range(length))
密码策略验证
import re
class PasswordValidator:
"""密码策略验证器"""
def __init__(self,
min_length=8,
require_uppercase=True,
require_lowercase=True,
require_digit=True,
require_special=True):
self.min_length = min_length
self.require_uppercase = require_uppercase
self.require_lowercase = require_lowercase
self.require_digit = require_digit
self.require_special = require_special
def validate(self, password):
"""验证密码强度"""
errors = []
if len(password) < self.min_length:
errors.append(f'密码长度至少 {self.min_length} 个字符')
if self.require_uppercase and not re.search(r'[A-Z]', password):
errors.append('密码需要包含大写字母')
if self.require_lowercase and not re.search(r'[a-z]', password):
errors.append('密码需要包含小写字母')
if self.require_digit and not re.search(r'\d', password):
errors.append('密码需要包含数字')
if self.require_special and not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
errors.append('密码需要包含特殊字符')
return errors
def is_valid(self, password):
"""检查密码是否有效"""
return len(self.validate(password)) == 0
def get_strength(self, password):
"""计算密码强度 (0-100)"""
score = 0
# 长度分数
score += min(len(password) * 4, 40)
# 复杂度分数
if re.search(r'[a-z]', password):
score += 10
if re.search(r'[A-Z]', password):
score += 10
if re.search(r'\d', password):
score += 10
if re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
score += 15
# 混合字符奖励
if re.search(r'(?=.*[a-z])(?=.*[A-Z])(?=.*\d)', password):
score += 15
return min(score, 100)
# 在表单中使用
class RegistrationForm(FlaskForm):
password = PasswordField('密码', validators=[DataRequired()])
def validate_password(self, field):
validator = PasswordValidator()
errors = validator.validate(field.data)
if errors:
raise ValidationError(' | '.join(errors))
密码重置
from itsdangerous import URLSafeTimedSerializer, SignatureExpired, BadSignature
from flask_mail import Mail, Message
mail = Mail(app)
def generate_reset_token(email, expires_in=3600):
"""生成密码重置令牌"""
serializer = URLSafeTimedSerializer(app.config['SECRET_KEY'])
return serializer.dumps(email, salt='password-reset-salt')
def verify_reset_token(token, max_age=3600):
"""验证密码重置令牌"""
serializer = URLSafeTimedSerializer(app.config['SECRET_KEY'])
try:
email = serializer.loads(
token,
salt='password-reset-salt',
max_age=max_age
)
return email
except (SignatureExpired, BadSignature):
return None
@auth.route('/forgot-password', methods=['GET', 'POST'])
def forgot_password():
if request.method == 'POST':
email = request.form.get('email')
user = User.query.filter_by(email=email).first()
if user:
token = generate_reset_token(email)
reset_url = url_for('auth.reset_password', token=token, _external=True)
msg = Message(
'重置密码',
sender=app.config['MAIL_DEFAULT_SENDER'],
recipients=[email]
)
msg.body = f'点击链接重置密码:{reset_url}\n\n此链接1小时内有效。'
mail.send(msg)
# 无论用户是否存在,都显示相同消息(安全考虑)
flash('如果该邮箱已注册,您将收到密码重置邮件', 'info')
return redirect(url_for('auth.login'))
return render_template('auth/forgot_password.html')
@auth.route('/reset-password/<token>', methods=['GET', 'POST'])
def reset_password(token):
email = verify_reset_token(token)
if not email:
flash('无效或过期的重置链接', 'error')
return redirect(url_for('auth.forgot_password'))
if request.method == 'POST':
password = request.form.get('password')
confirm = request.form.get('confirm_password')
if password != confirm:
flash('两次密码不一致', 'error')
return redirect(request.url)
user = User.query.filter_by(email=email).first()
if user:
user.set_password(password)
db.session.commit()
flash('密码已重置,请登录', 'success')
return redirect(url_for('auth.login'))
return render_template('auth/reset_password.html')
第四章:JWT 令牌认证
PyJWT 基础
pip install pyjwt
import jwt
from datetime import datetime, timedelta
from functools import wraps
from flask import request, jsonify, current_app
class JWTManager:
"""JWT 管理器"""
def __init__(self, app=None):
self.app = app
if app:
self.init_app(app)
def init_app(self, app):
app.config.setdefault('JWT_SECRET_KEY', app.config['SECRET_KEY'])
app.config.setdefault('JWT_ACCESS_TOKEN_EXPIRES', timedelta(hours=1))
app.config.setdefault('JWT_REFRESH_TOKEN_EXPIRES', timedelta(days=30))
app.config.setdefault('JWT_ALGORITHM', 'HS256')
def create_access_token(self, identity, additional_claims=None):
"""创建访问令牌"""
payload = {
'sub': identity,
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + current_app.config['JWT_ACCESS_TOKEN_EXPIRES'],
'type': 'access'
}
if additional_claims:
payload.update(additional_claims)
return jwt.encode(
payload,
current_app.config['JWT_SECRET_KEY'],
algorithm=current_app.config['JWT_ALGORITHM']
)
def create_refresh_token(self, identity):
"""创建刷新令牌"""
payload = {
'sub': identity,
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + current_app.config['JWT_REFRESH_TOKEN_EXPIRES'],
'type': 'refresh'
}
return jwt.encode(
payload,
current_app.config['JWT_SECRET_KEY'],
algorithm=current_app.config['JWT_ALGORITHM']
)
def decode_token(self, token):
"""解码令牌"""
try:
payload = jwt.decode(
token,
current_app.config['JWT_SECRET_KEY'],
algorithms=[current_app.config['JWT_ALGORITHM']]
)
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
jwt_manager = JWTManager()
JWT 认证装饰器
from functools import wraps
from flask import request, jsonify, g
def jwt_required(f):
"""JWT 认证装饰器"""
@wraps(f)
def decorated(*args, **kwargs):
token = None
# 从请求头获取令牌
if 'Authorization' in request.headers:
auth_header = request.headers['Authorization']
try:
token = auth_header.split(' ')[1] # Bearer <token>
except IndexError:
return jsonify({'error': 'Invalid token format'}), 401
if not token:
return jsonify({'error': 'Token is missing'}), 401
# 解码令牌
payload = jwt_manager.decode_token(token)
if not payload:
return jsonify({'error': 'Token is invalid or expired'}), 401
if payload.get('type') != 'access':
return jsonify({'error': 'Invalid token type'}), 401
# 加载用户
user = User.query.get(payload['sub'])
if not user:
return jsonify({'error': 'User not found'}), 401
g.current_user = user
return f(*args, **kwargs)
return decorated
def get_current_user():
"""获取当前用户"""
return getattr(g, 'current_user', None)
JWT API 端点
from flask import Blueprint
api = Blueprint('api', __name__, url_prefix='/api')
@api.route('/auth/login', methods=['POST'])
def api_login():
"""登录获取令牌"""
data = request.get_json()
if not data:
return jsonify({'error': 'Missing JSON data'}), 400
username = data.get('username')
password = data.get('password')
if not username or not password:
return jsonify({'error': 'Missing username or password'}), 400
user = User.query.filter_by(username=username).first()
if not user or not user.check_password(password):
return jsonify({'error': 'Invalid credentials'}), 401
access_token = jwt_manager.create_access_token(
identity=user.id,
additional_claims={'role': user.role}
)
refresh_token = jwt_manager.create_refresh_token(identity=user.id)
return jsonify({
'access_token': access_token,
'refresh_token': refresh_token,
'user': user.to_dict()
})
@api.route('/auth/refresh', methods=['POST'])
def api_refresh():
"""刷新令牌"""
data = request.get_json()
refresh_token = data.get('refresh_token')
if not refresh_token:
return jsonify({'error': 'Missing refresh token'}), 400
payload = jwt_manager.decode_token(refresh_token)
if not payload or payload.get('type') != 'refresh':
return jsonify({'error': 'Invalid refresh token'}), 401
user = User.query.get(payload['sub'])
if not user:
return jsonify({'error': 'User not found'}), 401
access_token = jwt_manager.create_access_token(
identity=user.id,
additional_claims={'role': user.role}
)
return jsonify({'access_token': access_token})
@api.route('/profile', methods=['GET'])
@jwt_required
def api_profile():
"""获取用户资料"""
user = get_current_user()
return jsonify(user.to_dict())
Flask-JWT-Extended
pip install flask-jwt-extended
from flask import Flask
from flask_jwt_extended import JWTManager, create_access_token
from flask_jwt_extended import jwt_required, get_jwt_identity
from flask_jwt_extended import create_refresh_token, get_jwt
app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = 'your-secret-key'
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=1)
app.config['JWT_REFRESH_TOKEN_EXPIRES'] = timedelta(days=30)
jwt = JWTManager(app)
# 令牌黑名单
blacklist = set()
@jwt.token_in_blocklist_loader
def check_if_token_revoked(jwt_header, jwt_payload):
jti = jwt_payload['jti']
return jti in blacklist
@app.route('/api/login', methods=['POST'])
def login():
username = request.json.get('username')
password = request.json.get('password')
user = User.query.filter_by(username=username).first()
if not user or not user.check_password(password):
return jsonify({'error': 'Invalid credentials'}), 401
access_token = create_access_token(
identity=user.id,
additional_claims={'role': user.role}
)
refresh_token = create_refresh_token(identity=user.id)
return jsonify({
'access_token': access_token,
'refresh_token': refresh_token
})
@app.route('/api/protected', methods=['GET'])
@jwt_required()
def protected():
user_id = get_jwt_identity()
claims = get_jwt()
return jsonify({
'user_id': user_id,
'role': claims.get('role')
})
@app.route('/api/logout', methods=['POST'])
@jwt_required()
def logout():
jti = get_jwt()['jti']
blacklist.add(jti)
return jsonify({'message': 'Successfully logged out'})
第五章:OAuth 2.0 认证
Flask-OAuthlib
pip install Flask-Dance
from flask import Flask, redirect, url_for
from flask_dance.contrib.github import make_github_blueprint, github
from flask_dance.contrib.google import make_google_blueprint, google
from flask_login import login_user, current_user
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
# GitHub OAuth
github_bp = make_github_blueprint(
client_id='your-github-client-id',
client_secret='your-github-client-secret',
scope='read:user,user:email'
)
app.register_blueprint(github_bp, url_prefix='/login')
# Google OAuth
google_bp = make_google_blueprint(
client_id='your-google-client-id',
client_secret='your-google-client-secret',
scope=[
'openid',
'https://www.googleapis.com/auth/userinfo.email',
'https://www.googleapis.com/auth/userinfo.profile'
]
)
app.register_blueprint(google_bp, url_prefix='/login')
@app.route('/login/github')
def github_login():
if not github.authorized:
return redirect(url_for('github.login'))
resp = github.get('/user')
if resp.ok:
github_info = resp.json()
github_id = str(github_info['id'])
# 查找或创建用户
user = User.query.filter_by(github_id=github_id).first()
if not user:
user = User(
username=github_info['login'],
email=github_info.get('email', ''),
github_id=github_id
)
db.session.add(user)
db.session.commit()
login_user(user)
return redirect(url_for('main.index'))
return redirect(url_for('auth.login'))
@app.route('/login/google')
def google_login():
if not google.authorized:
return redirect(url_for('google.login'))
resp = google.get('/oauth2/v2/userinfo')
if resp.ok:
google_info = resp.json()
google_id = google_info['id']
user = User.query.filter_by(google_id=google_id).first()
if not user:
user = User(
username=google_info['name'],
email=google_info['email'],
google_id=google_id
)
db.session.add(user)
db.session.commit()
login_user(user)
return redirect(url_for('main.index'))
return redirect(url_for('auth.login'))
OAuth 用户模型
class User(UserMixin, db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80))
email = db.Column(db.String(120), unique=True)
password_hash = db.Column(db.String(256), nullable=True)
# OAuth IDs
github_id = db.Column(db.String(50), unique=True, nullable=True)
google_id = db.Column(db.String(50), unique=True, nullable=True)
facebook_id = db.Column(db.String(50), unique=True, nullable=True)
# OAuth 令牌(可选,用于访问第三方 API)
oauth_tokens = db.relationship('OAuthToken', backref='user', lazy='dynamic')
class OAuthToken(db.Model):
__tablename__ = 'oauth_tokens'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
provider = db.Column(db.String(50)) # github, google, etc.
access_token = db.Column(db.String(500))
refresh_token = db.Column(db.String(500), nullable=True)
expires_at = db.Column(db.DateTime, nullable=True)
第六章:安全最佳实践
XSS 防护
from markupsafe import escape
from flask import Markup
# 1. 自动转义(Jinja2 默认开启)
# 在模板中 {{ user_input }} 会自动转义
# 2. 手动转义
user_input = '<script>alert("XSS")</script>'
safe_output = escape(user_input)
# 3. 标记安全内容(谨慎使用)
trusted_html = Markup('<strong>Safe HTML</strong>')
# 4. 内容安全策略(CSP)
@app.after_request
def add_security_headers(response):
response.headers['Content-Security-Policy'] = (
"default-src 'self'; "
"script-src 'self' https://cdn.example.com; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"font-src 'self' https://fonts.googleapis.com; "
"connect-src 'self'; "
"frame-ancestors 'none'; "
"base-uri 'self'; "
"form-action 'self'"
)
return response
# 5. 清理 HTML(使用 bleach)
import bleach
def clean_html(html):
allowed_tags = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code',
'em', 'i', 'li', 'ol', 'strong', 'ul', 'p', 'br']
allowed_attrs = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']}
return bleach.clean(
html,
tags=allowed_tags,
attributes=allowed_attrs,
strip=True
)
SQL 注入防护
# 永远使用参数化查询,不要拼接 SQL
# 危险(不要这样做):
# User.query.filter(f"username = '{user_input}'")
# db.engine.execute(f"SELECT * FROM users WHERE username = '{user_input}'")
# 安全(使用参数化):
User.query.filter_by(username=user_input).first()
User.query.filter(User.username == user_input).first()
# 使用 text() 时也要参数化
from sqlalchemy import text
result = db.session.execute(
text("SELECT * FROM users WHERE username = :username"),
{"username": user_input}
)
# 原生 SQL 参数化
db.session.execute(
text("UPDATE users SET role = :role WHERE id = :id"),
{"role": "admin", "id": user_id}
)
安全请求头
from flask_talisman import Talisman
# 使用 Flask-Talisman 添加安全头
talisman = Talisman(
app,
force_https=True,
strict_transport_security=True,
strict_transport_security_max_age=31536000,
content_security_policy={
'default-src': "'self'",
'script-src': "'self'",
'style-src': "'self' 'unsafe-inline'",
},
x_content_type_options=True,
x_xss_protection=True,
referrer_policy='strict-origin-when-cross-origin'
)
# 或手动添加
@app.after_request
def add_security_headers(response):
# 防止 MIME 类型嗅探
response.headers['X-Content-Type-Options'] = 'nosniff'
# 防止点击劫持
response.headers['X-Frame-Options'] = 'DENY'
# XSS 过滤器
response.headers['X-XSS-Protection'] = '1; mode=block'
# HTTPS 强制
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
# 引用策略
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
# 权限策略
response.headers['Permissions-Policy'] = 'geolocation=(), microphone=()'
return response
会话安全
from datetime import timedelta
# 会话配置
app.config.update(
# 密钥(生产环境使用强随机密钥)
SECRET_KEY=os.environ.get('SECRET_KEY') or secrets.token_hex(32),
# 会话过期
PERMANENT_SESSION_LIFETIME=timedelta(hours=24),
# Cookie 安全设置
SESSION_COOKIE_SECURE=True, # 仅 HTTPS
SESSION_COOKIE_HTTPONLY=True, # 禁止 JavaScript 访问
SESSION_COOKIE_SAMESITE='Lax', # 防止 CSRF
SESSION_COOKIE_NAME='__Host-session', # 安全前缀
# 会话刷新
SESSION_REFRESH_EACH_REQUEST=True,
)
# 会话固定攻击防护
from flask import session
@auth.route('/login', methods=['POST'])
def login():
# 登录成功后重新生成会话 ID
if validate_login():
session.clear() # 清除旧会话
session['user_id'] = user.id
session.permanent = True
return redirect(url_for('main.index'))
速率限制
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"],
storage_uri="redis://localhost:6379"
)
# 全局限制
@app.route('/api/data')
@limiter.limit("100 per minute")
def get_data():
return jsonify(data)
# 登录限制(更严格)
@auth.route('/login', methods=['POST'])
@limiter.limit("5 per minute")
@limiter.limit("20 per hour")
def login():
# 登录逻辑
pass
# 按用户限制
def get_user_identifier():
if current_user.is_authenticated:
return current_user.id
return get_remote_address()
@app.route('/api/expensive')
@limiter.limit("10 per minute", key_func=get_user_identifier)
def expensive_operation():
return jsonify(result)
# 豁免某些路由
@limiter.exempt
@app.route('/health')
def health_check():
return 'OK'
输入验证
import re
from wtforms.validators import ValidationError
class InputValidator:
"""输入验证器"""
@staticmethod
def validate_username(username):
"""验证用户名"""
if not re.match(r'^[a-zA-Z][a-zA-Z0-9_]{2,19}$', username):
raise ValueError('用户名格式无效')
return username
@staticmethod
def validate_email(email):
"""验证邮箱"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(pattern, email):
raise ValueError('邮箱格式无效')
return email.lower()
@staticmethod
def validate_phone(phone):
"""验证手机号"""
if not re.match(r'^1[3-9]\d{9}$', phone):
raise ValueError('手机号格式无效')
return phone
@staticmethod
def sanitize_filename(filename):
"""清理文件名"""
from werkzeug.utils import secure_filename
return secure_filename(filename)
@staticmethod
def validate_url(url):
"""验证 URL"""
from urllib.parse import urlparse
try:
result = urlparse(url)
if not all([result.scheme in ('http', 'https'), result.netloc]):
raise ValueError('URL 格式无效')
return url
except Exception:
raise ValueError('URL 格式无效')
第七章:审计日志
日志模型
from datetime import datetime
from flask import request, g
class AuditLog(db.Model):
"""审计日志模型"""
__tablename__ = 'audit_logs'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=True)
action = db.Column(db.String(50), nullable=False) # login, logout, create, update, delete
resource_type = db.Column(db.String(50)) # user, post, comment
resource_id = db.Column(db.Integer)
details = db.Column(db.JSON)
ip_address = db.Column(db.String(45))
user_agent = db.Column(db.String(500))
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
user = db.relationship('User', backref='audit_logs')
@classmethod
def log(cls, action, resource_type=None, resource_id=None, details=None):
"""记录审计日志"""
log = cls(
user_id=getattr(g, 'current_user', None) and g.current_user.id,
action=action,
resource_type=resource_type,
resource_id=resource_id,
details=details,
ip_address=request.remote_addr,
user_agent=request.user_agent.string[:500] if request.user_agent else None
)
db.session.add(log)
db.session.commit()
return log
审计日志装饰器
from functools import wraps
def audit_log(action, resource_type=None):
"""审计日志装饰器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# 执行原函数
result = f(*args, **kwargs)
# 获取资源 ID
resource_id = kwargs.get('id') or kwargs.get(f'{resource_type}_id')
# 记录日志
AuditLog.log(
action=action,
resource_type=resource_type,
resource_id=resource_id,
details={'args': args, 'kwargs': {k: str(v) for k, v in kwargs.items()}}
)
return result
return decorated_function
return decorator
# 使用装饰器
@app.route('/user/<int:id>', methods=['DELETE'])
@login_required
@admin_required
@audit_log('delete', 'user')
def delete_user(id):
user = User.query.get_or_404(id)
db.session.delete(user)
db.session.commit()
return jsonify({'message': 'User deleted'})
登录审计
class LoginAttempt(db.Model):
"""登录尝试记录"""
__tablename__ = 'login_attempts'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80))
ip_address = db.Column(db.String(45))
user_agent = db.Column(db.String(500))
success = db.Column(db.Boolean)
failure_reason = db.Column(db.String(100))
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
@classmethod
def record(cls, username, success, failure_reason=None):
attempt = cls(
username=username,
ip_address=request.remote_addr,
user_agent=request.user_agent.string[:500] if request.user_agent else None,
success=success,
failure_reason=failure_reason
)
db.session.add(attempt)
db.session.commit()
return attempt
@classmethod
def get_recent_failures(cls, username=None, ip_address=None, minutes=30):
"""获取最近的失败尝试次数"""
since = datetime.utcnow() - timedelta(minutes=minutes)
query = cls.query.filter(
cls.success == False,
cls.timestamp > since
)
if username:
query = query.filter(cls.username == username)
if ip_address:
query = query.filter(cls.ip_address == ip_address)
return query.count()
# 在登录视图中使用
@auth.route('/login', methods=['POST'])
def login():
username = request.form.get('username')
ip = request.remote_addr
# 检查是否被锁定
recent_failures = LoginAttempt.get_recent_failures(ip_address=ip)
if recent_failures >= 5:
flash('登录尝试次数过多,请稍后再试', 'error')
return render_template('auth/login.html')
user = User.query.filter_by(username=username).first()
if not user or not user.check_password(request.form.get('password')):
LoginAttempt.record(username, success=False, failure_reason='invalid_credentials')
flash('用户名或密码错误', 'error')
return render_template('auth/login.html')
if not user.is_active:
LoginAttempt.record(username, success=False, failure_reason='account_disabled')
flash('账户已被禁用', 'error')
return render_template('auth/login.html')
# 登录成功
LoginAttempt.record(username, success=True)
login_user(user)
return redirect(url_for('main.index'))
总结
本章详细介绍了 Flask 用户认证与安全:
- Flask-Login:用户会话管理、登录保护
- Flask-Principal:权限管理、角色控制
- 密码安全:哈希存储、强度验证、密码重置
- JWT 认证:令牌生成、验证、刷新
- OAuth 2.0:第三方登录集成
- 安全最佳实践:XSS/SQL 注入防护、安全头、会话安全
- 审计日志:操作记录、登录监控
下一章我们将学习蓝图与项目结构。