表单处理与数据验证
2026/3/20大约 15 分钟
表单处理与数据验证
第一章:原生表单处理
基础表单处理
在没有使用表单库的情况下,Flask 通过 request 对象处理表单:
from flask import Flask, request, render_template, redirect, url_for, flash
app = Flask(__name__)
app.secret_key = 'your-secret-key'
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
remember = request.form.get('remember', False)
# 简单验证
errors = []
if not username:
errors.append('Username is required')
if not password:
errors.append('Password is required')
if len(password) < 6:
errors.append('Password must be at least 6 characters')
if errors:
for error in errors:
flash(error, 'error')
return render_template('login.html', username=username)
# 验证用户
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
login_user(user, remember=remember)
return redirect(url_for('dashboard'))
flash('Invalid username or password', 'error')
return render_template('login.html')
<!-- templates/login.html -->
<!DOCTYPE html>
<html>
<head>
<title>Login</title>
</head>
<body>
{% with messages = get_flashed_messages(with_categories=true) %} {% if
messages %} {% for category, message in messages %}
<div class="alert alert-{{ category }}">{{ message }}</div>
{% endfor %} {% endif %} {% endwith %}
<form method="POST">
<div>
<label for="username">Username:</label>
<input
type="text"
id="username"
name="username"
value="{{ username | default('') }}"
required
/>
</div>
<div>
<label for="password">Password:</label>
<input type="password" id="password" name="password" required />
</div>
<div>
<input type="checkbox" id="remember" name="remember" />
<label for="remember">Remember me</label>
</div>
<button type="submit">Login</button>
</form>
</body>
</html>
文件上传
import os
from flask import Flask, request, redirect, url_for, flash
from werkzeug.utils import secure_filename
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB
app.config['ALLOWED_EXTENSIONS'] = {'png', 'jpg', 'jpeg', 'gif', 'pdf'}
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']
@app.route('/upload', methods=['GET', 'POST'])
def upload_file():
if request.method == 'POST':
# 检查文件是否存在
if 'file' not in request.files:
flash('No file selected', 'error')
return redirect(request.url)
file = request.files['file']
# 检查文件名
if file.filename == '':
flash('No file selected', 'error')
return redirect(request.url)
if file and allowed_file(file.filename):
# 安全处理文件名
filename = secure_filename(file.filename)
# 生成唯一文件名
import uuid
unique_filename = f"{uuid.uuid4().hex}_{filename}"
# 保存文件
filepath = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
file.save(filepath)
flash('File uploaded successfully', 'success')
return redirect(url_for('upload_file'))
flash('File type not allowed', 'error')
return render_template('upload.html')
多文件上传
@app.route('/upload-multiple', methods=['POST'])
def upload_multiple():
if 'files' not in request.files:
return {'error': 'No files'}, 400
files = request.files.getlist('files')
uploaded = []
for file in files:
if file and file.filename and allowed_file(file.filename):
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
uploaded.append(filename)
return {'uploaded': uploaded, 'count': len(uploaded)}
第二章:Flask-WTF 表单
安装与配置
pip install flask-wtf
from flask import Flask
from flask_wtf import CSRFProtect
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['WTF_CSRF_ENABLED'] = True
app.config['WTF_CSRF_TIME_LIMIT'] = 3600 # CSRF token 过期时间(秒)
csrf = CSRFProtect(app)
定义表单类
from flask_wtf import FlaskForm
from wtforms import (
StringField, PasswordField, TextAreaField, BooleanField,
SelectField, SelectMultipleField, RadioField,
IntegerField, FloatField, DecimalField,
DateField, DateTimeField, TimeField,
HiddenField, SubmitField
)
from wtforms.validators import (
DataRequired, Email, Length, EqualTo, URL,
NumberRange, Regexp, Optional, InputRequired,
ValidationError
)
class LoginForm(FlaskForm):
username = StringField('Username', validators=[
DataRequired(message='Username is required'),
Length(min=3, max=20, message='Username must be 3-20 characters')
])
password = PasswordField('Password', validators=[
DataRequired(message='Password is required'),
Length(min=6, message='Password must be at least 6 characters')
])
remember = BooleanField('Remember Me')
submit = SubmitField('Login')
class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[
DataRequired(),
Length(min=3, max=20),
Regexp('^[A-Za-z][A-Za-z0-9_.]*$', message='Username must start with a letter')
])
email = StringField('Email', validators=[
DataRequired(),
Email(message='Invalid email address')
])
password = PasswordField('Password', validators=[
DataRequired(),
Length(min=8),
Regexp(
r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)',
message='Password must contain uppercase, lowercase and number'
)
])
confirm_password = PasswordField('Confirm Password', validators=[
DataRequired(),
EqualTo('password', message='Passwords must match')
])
accept_terms = BooleanField('I accept the terms', validators=[
DataRequired(message='You must accept the terms')
])
submit = SubmitField('Register')
class ProfileForm(FlaskForm):
bio = TextAreaField('Bio', validators=[
Optional(),
Length(max=500)
])
website = StringField('Website', validators=[
Optional(),
URL(message='Invalid URL')
])
age = IntegerField('Age', validators=[
Optional(),
NumberRange(min=1, max=150)
])
gender = SelectField('Gender', choices=[
('', 'Select...'),
('male', 'Male'),
('female', 'Female'),
('other', 'Other')
])
interests = SelectMultipleField('Interests', choices=[
('tech', 'Technology'),
('sports', 'Sports'),
('music', 'Music'),
('travel', 'Travel')
])
newsletter = RadioField('Newsletter', choices=[
('daily', 'Daily'),
('weekly', 'Weekly'),
('monthly', 'Monthly'),
('never', 'Never')
], default='weekly')
submit = SubmitField('Update Profile')
使用表单
from flask import Flask, render_template, redirect, url_for, flash
@app.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
# validate_on_submit() 检查是否是 POST 请求且验证通过
if form.validate_on_submit():
username = form.username.data
password = form.password.data
remember = form.remember.data
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
login_user(user, remember=remember)
flash('Login successful!', 'success')
return redirect(url_for('dashboard'))
flash('Invalid username or password', 'error')
return render_template('login.html', form=form)
@app.route('/register', methods=['GET', 'POST'])
def register():
form = RegistrationForm()
if form.validate_on_submit():
user = User(
username=form.username.data,
email=form.email.data
)
user.set_password(form.password.data)
db.session.add(user)
db.session.commit()
flash('Registration successful! Please login.', 'success')
return redirect(url_for('login'))
return render_template('register.html', form=form)
模板渲染
<!-- templates/login.html -->
{% extends 'base.html' %} {% block content %}
<div class="form-container">
<h1>Login</h1>
<form method="POST" novalidate>
{{ form.hidden_tag() }}
<!-- 包含 CSRF token -->
<div class="form-group">
{{ form.username.label(class='form-label') }} {{
form.username(class='form-control' ~ (' is-invalid' if
form.username.errors else '')) }} {% if form.username.errors %}
<div class="invalid-feedback">
{% for error in form.username.errors %}
<span>{{ error }}</span>
{% endfor %}
</div>
{% endif %}
</div>
<div class="form-group">
{{ form.password.label(class='form-label') }} {{
form.password(class='form-control' ~ (' is-invalid' if
form.password.errors else '')) }} {% if form.password.errors %}
<div class="invalid-feedback">
{% for error in form.password.errors %}
<span>{{ error }}</span>
{% endfor %}
</div>
{% endif %}
</div>
<div class="form-check">
{{ form.remember(class='form-check-input') }} {{
form.remember.label(class='form-check-label') }}
</div>
{{ form.submit(class='btn btn-primary') }}
</form>
</div>
{% endblock %}
表单宏(可复用模板)
<!-- templates/macros/forms.html -->
{% macro render_field(field, label_visible=true) %}
<div class="form-group {% if field.errors %}has-error{% endif %}">
{% if label_visible and field.type != 'HiddenField' %} {{
field.label(class='form-label') }} {% endif %} {% if field.type ==
'TextAreaField' %} {{ field(class='form-control', rows=5, **kwargs) }} {% elif
field.type == 'BooleanField' %}
<div class="form-check">
{{ field(class='form-check-input', **kwargs) }} {{
field.label(class='form-check-label') }}
</div>
{% elif field.type == 'RadioField' %} {% for subfield in field %}
<div class="form-check">
{{ subfield(class='form-check-input') }} {{
subfield.label(class='form-check-label') }}
</div>
{% endfor %} {% elif field.type == 'SelectField' %} {{
field(class='form-select', **kwargs) }} {% elif field.type ==
'SelectMultipleField' %} {{ field(class='form-select', multiple=true,
**kwargs) }} {% else %} {{ field(class='form-control', **kwargs) }} {% endif
%} {% if field.description %}
<small class="form-text text-muted">{{ field.description }}</small>
{% endif %} {% if field.errors %}
<ul class="errors">
{% for error in field.errors %}
<li class="text-danger">{{ error }}</li>
{% endfor %}
</ul>
{% endif %}
</div>
{% endmacro %} {% macro render_form(form, action='', method='POST',
enctype=None, submit_text='Submit') %}
<form
method="{{ method }}"
action="{{ action }}"
{%
if
enctype
%}enctype="{{ enctype }}"
{%
endif
%}
novalidate
>
{{ form.hidden_tag() }} {% for field in form %} {% if field.type not in
['HiddenField', 'CSRFTokenField', 'SubmitField'] %} {{ render_field(field) }}
{% endif %} {% endfor %}
<button type="submit" class="btn btn-primary">{{ submit_text }}</button>
</form>
{% endmacro %}
<!-- 使用表单宏 -->
{% from 'macros/forms.html' import render_field, render_form %}
<!-- 方式一:逐字段渲染 -->
<form method="POST" novalidate>
{{ form.hidden_tag() }} {{ render_field(form.username) }} {{
render_field(form.email) }} {{ render_field(form.password) }}
<button type="submit">Submit</button>
</form>
<!-- 方式二:一键渲染整个表单 -->
{{ render_form(form, submit_text='Register') }}
第三章:验证器详解
内置验证器
from wtforms.validators import *
class ExampleForm(FlaskForm):
# DataRequired - 必填字段
name = StringField('Name', validators=[
DataRequired(message='This field is required')
])
# InputRequired - 必须输入(区别于 DataRequired,空字符串也视为无效)
code = StringField('Code', validators=[
InputRequired(message='Please input a code')
])
# Email - 电子邮件格式
email = StringField('Email', validators=[
Email(message='Invalid email format')
])
# Length - 长度限制
username = StringField('Username', validators=[
Length(min=3, max=20, message='Must be 3-20 characters')
])
# EqualTo - 字段匹配
password = PasswordField('Password')
confirm = PasswordField('Confirm', validators=[
EqualTo('password', message='Passwords must match')
])
# NumberRange - 数值范围
age = IntegerField('Age', validators=[
NumberRange(min=1, max=150, message='Age must be 1-150')
])
# URL - URL 格式
website = StringField('Website', validators=[
URL(message='Invalid URL')
])
# UUID - UUID 格式
user_id = StringField('User ID', validators=[
UUID(message='Invalid UUID')
])
# Regexp - 正则表达式
phone = StringField('Phone', validators=[
Regexp(r'^1[3-9]\d{9}$', message='Invalid phone number')
])
# AnyOf - 值在列表中
status = StringField('Status', validators=[
AnyOf(['active', 'inactive', 'pending'], message='Invalid status')
])
# NoneOf - 值不在列表中
word = StringField('Word', validators=[
NoneOf(['admin', 'root'], message='This word is not allowed')
])
# Optional - 可选字段(允许为空)
bio = TextAreaField('Bio', validators=[
Optional(),
Length(max=500)
])
# IPAddress - IP 地址格式
ip = StringField('IP', validators=[
IPAddress(message='Invalid IP address')
])
# MacAddress - MAC 地址格式
mac = StringField('MAC', validators=[
MacAddress(message='Invalid MAC address')
])
自定义验证器
from wtforms.validators import ValidationError
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField
# 方式一:行内验证器(validate_fieldname)
class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
email = StringField('Email', validators=[DataRequired(), Email()])
def validate_username(self, field):
"""验证用户名是否已存在"""
user = User.query.filter_by(username=field.data).first()
if user:
raise ValidationError('Username already exists')
def validate_email(self, field):
"""验证邮箱是否已注册"""
user = User.query.filter_by(email=field.data).first()
if user:
raise ValidationError('Email already registered')
# 方式二:可复用的验证器函数
def username_exists(form, field):
if User.query.filter_by(username=field.data).first():
raise ValidationError('Username already exists')
def email_exists(form, field):
if User.query.filter_by(email=field.data).first():
raise ValidationError('Email already registered')
class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[
DataRequired(),
username_exists
])
email = StringField('Email', validators=[
DataRequired(),
Email(),
email_exists
])
# 方式三:带参数的验证器类
class Unique:
"""检查数据库中是否已存在该值"""
def __init__(self, model, field, message='This value already exists'):
self.model = model
self.field = field
self.message = message
def __call__(self, form, field):
check = self.model.query.filter(
getattr(self.model, self.field) == field.data
).first()
if check:
raise ValidationError(self.message)
class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[
DataRequired(),
Unique(User, 'username', 'Username already taken')
])
email = StringField('Email', validators=[
DataRequired(),
Email(),
Unique(User, 'email', 'Email already registered')
])
# 方式四:带参数的验证器工厂函数
def unique(model, field, message='This value already exists'):
def validator(form, form_field):
check = model.query.filter(
getattr(model, field) == form_field.data
).first()
if check:
raise ValidationError(message)
return validator
class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[
DataRequired(),
unique(User, 'username', 'Username is taken')
])
复杂验证逻辑
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, SelectField
from wtforms.validators import DataRequired, ValidationError
import re
class PasswordForm(FlaskForm):
password = PasswordField('Password', validators=[DataRequired()])
confirm = PasswordField('Confirm', validators=[DataRequired()])
def validate_password(self, field):
password = field.data
errors = []
if len(password) < 8:
errors.append('Password must be at least 8 characters')
if not re.search(r'[A-Z]', password):
errors.append('Password must contain uppercase letter')
if not re.search(r'[a-z]', password):
errors.append('Password must contain lowercase letter')
if not re.search(r'\d', password):
errors.append('Password must contain a digit')
if not re.search(r'[!@#$%^&*]', password):
errors.append('Password must contain a special character')
if errors:
raise ValidationError(' | '.join(errors))
def validate_confirm(self, field):
if field.data != self.password.data:
raise ValidationError('Passwords do not match')
# 跨字段验证
class OrderForm(FlaskForm):
start_date = DateField('Start Date', validators=[DataRequired()])
end_date = DateField('End Date', validators=[DataRequired()])
quantity = IntegerField('Quantity', validators=[DataRequired()])
discount = FloatField('Discount', validators=[Optional()])
def validate_end_date(self, field):
if field.data and self.start_date.data:
if field.data < self.start_date.data:
raise ValidationError('End date must be after start date')
def validate_discount(self, field):
if field.data:
if field.data < 0 or field.data > 100:
raise ValidationError('Discount must be 0-100%')
if self.quantity.data and self.quantity.data < 10 and field.data > 10:
raise ValidationError('Discount > 10% requires quantity >= 10')
第四章:高级表单功能
动态表单字段
from flask_wtf import FlaskForm
from wtforms import StringField, SelectField, FieldList, FormField
# 动态选项
class CategoryForm(FlaskForm):
category = SelectField('Category', choices=[])
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 动态加载选项
self.category.choices = [
(c.id, c.name) for c in Category.query.all()
]
# 动态添加字段
def create_survey_form(questions):
class SurveyForm(FlaskForm):
pass
for i, question in enumerate(questions):
field_name = f'question_{i}'
if question['type'] == 'text':
setattr(SurveyForm, field_name, StringField(question['text']))
elif question['type'] == 'choice':
choices = [(c, c) for c in question['choices']]
setattr(SurveyForm, field_name, SelectField(
question['text'], choices=choices
))
elif question['type'] == 'rating':
choices = [(str(i), str(i)) for i in range(1, 6)]
setattr(SurveyForm, field_name, RadioField(
question['text'], choices=choices
))
return SurveyForm
# 使用
questions = [
{'type': 'text', 'text': 'What is your name?'},
{'type': 'choice', 'text': 'Favorite color?', 'choices': ['Red', 'Blue', 'Green']},
{'type': 'rating', 'text': 'Rate our service'}
]
SurveyForm = create_survey_form(questions)
form = SurveyForm()
嵌套表单
from flask_wtf import FlaskForm
from wtforms import StringField, FormField, FieldList
# 子表单
class AddressForm(FlaskForm):
class Meta:
csrf = False # 子表单不需要 CSRF
street = StringField('Street')
city = StringField('City')
state = StringField('State')
zip_code = StringField('ZIP Code')
class PhoneForm(FlaskForm):
class Meta:
csrf = False
type = SelectField('Type', choices=[
('home', 'Home'),
('work', 'Work'),
('mobile', 'Mobile')
])
number = StringField('Number')
# 主表单
class ContactForm(FlaskForm):
name = StringField('Name', validators=[DataRequired()])
email = StringField('Email', validators=[Email()])
# 单个嵌套表单
address = FormField(AddressForm)
# 多个嵌套表单(列表)
phones = FieldList(FormField(PhoneForm), min_entries=1, max_entries=5)
submit = SubmitField('Save')
<!-- 渲染嵌套表单 -->
<form method="POST">
{{ form.hidden_tag() }} {{ form.name.label() }} {{ form.name() }} {{
form.email.label() }} {{ form.email() }}
<fieldset>
<legend>Address</legend>
{{ form.address.street.label() }} {{ form.address.street() }} {{
form.address.city.label() }} {{ form.address.city() }} {{
form.address.state.label() }} {{ form.address.state() }} {{
form.address.zip_code.label() }} {{ form.address.zip_code() }}
</fieldset>
<fieldset>
<legend>Phone Numbers</legend>
{% for phone in form.phones %}
<div class="phone-entry">
{{ phone.type.label() }} {{ phone.type() }} {{ phone.number.label() }} {{
phone.number() }}
</div>
{% endfor %}
</fieldset>
{{ form.submit() }}
</form>
文件上传表单
from flask_wtf import FlaskForm
from flask_wtf.file import FileField, FileRequired, FileAllowed, FileSize
from wtforms import MultipleFileField
class UploadForm(FlaskForm):
# 单文件上传
avatar = FileField('Avatar', validators=[
FileRequired(message='Please select a file'),
FileAllowed(['jpg', 'png', 'gif'], message='Images only!')
])
# 多文件上传
documents = MultipleFileField('Documents', validators=[
FileAllowed(['pdf', 'doc', 'docx'], message='Documents only!')
])
# 带大小限制
photo = FileField('Photo', validators=[
FileRequired(),
FileAllowed(['jpg', 'png']),
FileSize(max_size=5 * 1024 * 1024, message='File too large (max 5MB)')
])
submit = SubmitField('Upload')
# 处理上传
@app.route('/upload', methods=['GET', 'POST'])
def upload():
form = UploadForm()
if form.validate_on_submit():
# 单文件
avatar = form.avatar.data
avatar_filename = secure_filename(avatar.filename)
avatar.save(os.path.join(app.config['UPLOAD_FOLDER'], avatar_filename))
# 多文件
for doc in form.documents.data:
if doc:
filename = secure_filename(doc.filename)
doc.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
flash('Files uploaded successfully!')
return redirect(url_for('upload'))
return render_template('upload.html', form=form)
<!-- 文件上传表单必须设置 enctype -->
<form method="POST" enctype="multipart/form-data">
{{ form.hidden_tag() }}
<div class="form-group">
{{ form.avatar.label() }} {{ form.avatar(class='form-control-file') }} {%
for error in form.avatar.errors %}
<span class="text-danger">{{ error }}</span>
{% endfor %}
</div>
<div class="form-group">
{{ form.documents.label() }} {{ form.documents(class='form-control-file',
multiple=true) }}
</div>
{{ form.submit(class='btn btn-primary') }}
</form>
AJAX 表单验证
from flask import Flask, jsonify, request
@app.route('/api/validate/username')
def validate_username():
"""AJAX 验证用户名"""
username = request.args.get('username', '')
if len(username) < 3:
return jsonify({'valid': False, 'message': 'Username too short'})
if User.query.filter_by(username=username).first():
return jsonify({'valid': False, 'message': 'Username taken'})
return jsonify({'valid': True, 'message': 'Username available'})
@app.route('/api/validate/email')
def validate_email():
"""AJAX 验证邮箱"""
email = request.args.get('email', '')
import re
if not re.match(r'^[^@]+@[^@]+\.[^@]+$', email):
return jsonify({'valid': False, 'message': 'Invalid email format'})
if User.query.filter_by(email=email).first():
return jsonify({'valid': False, 'message': 'Email already registered'})
return jsonify({'valid': True, 'message': 'Email available'})
# 完整表单 AJAX 提交
@app.route('/api/register', methods=['POST'])
def api_register():
form = RegistrationForm()
if form.validate_on_submit():
user = User(
username=form.username.data,
email=form.email.data
)
user.set_password(form.password.data)
db.session.add(user)
db.session.commit()
return jsonify({
'success': True,
'message': 'Registration successful',
'redirect': url_for('login')
})
# 返回验证错误
errors = {}
for field_name, field_errors in form.errors.items():
errors[field_name] = field_errors[0] # 只取第一个错误
return jsonify({
'success': False,
'errors': errors
}), 400
// 前端 JavaScript
document
.getElementById("registerForm")
.addEventListener("submit", async (e) => {
e.preventDefault();
const formData = new FormData(e.target);
try {
const response = await fetch("/api/register", {
method: "POST",
body: formData,
});
const data = await response.json();
if (data.success) {
alert(data.message);
window.location.href = data.redirect;
} else {
// 显示错误
Object.keys(data.errors).forEach((field) => {
const input = document.querySelector(`[name="${field}"]`);
const errorDiv = input.nextElementSibling;
input.classList.add("is-invalid");
if (errorDiv) {
errorDiv.textContent = data.errors[field];
}
});
}
} catch (error) {
console.error("Error:", error);
}
});
// 实时验证用户名
document.getElementById("username").addEventListener("blur", async (e) => {
const username = e.target.value;
if (username.length < 3) return;
const response = await fetch(`/api/validate/username?username=${username}`);
const data = await response.json();
const feedback = e.target.nextElementSibling;
if (data.valid) {
e.target.classList.remove("is-invalid");
e.target.classList.add("is-valid");
feedback.textContent = data.message;
feedback.classList.remove("invalid-feedback");
feedback.classList.add("valid-feedback");
} else {
e.target.classList.remove("is-valid");
e.target.classList.add("is-invalid");
feedback.textContent = data.message;
feedback.classList.remove("valid-feedback");
feedback.classList.add("invalid-feedback");
}
});
第五章:CSRF 保护
CSRF 工作原理
from flask import Flask
from flask_wtf.csrf import CSRFProtect, CSRFError
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['WTF_CSRF_TIME_LIMIT'] = 3600 # Token 有效期(秒)
csrf = CSRFProtect(app)
# 处理 CSRF 错误
@app.errorhandler(CSRFError)
def handle_csrf_error(e):
return render_template('errors/csrf_error.html', reason=e.description), 400
在模板中使用
<!-- 方式一:使用 form.hidden_tag() -->
<form method="POST">
{{ form.hidden_tag() }}
<!-- 表单字段 -->
</form>
<!-- 方式二:手动添加 CSRF token -->
<form method="POST">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}" />
<!-- 表单字段 -->
</form>
<!-- 方式三:在 meta 标签中(用于 AJAX) -->
<meta name="csrf-token" content="{{ csrf_token() }}" />
AJAX 请求中的 CSRF
// 从 meta 标签获取 token
const csrfToken = document.querySelector('meta[name="csrf-token"]').content;
// 方式一:在请求头中发送
fetch("/api/data", {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-CSRFToken": csrfToken,
},
body: JSON.stringify(data),
});
// 方式二:在请求体中发送
const formData = new FormData();
formData.append("csrf_token", csrfToken);
formData.append("data", JSON.stringify(data));
fetch("/api/data", {
method: "POST",
body: formData,
});
// 全局设置(jQuery)
$.ajaxSetup({
beforeSend: function (xhr, settings) {
if (!/^(GET|HEAD|OPTIONS|TRACE)$/i.test(settings.type)) {
xhr.setRequestHeader("X-CSRFToken", csrfToken);
}
},
});
// 全局设置(Axios)
axios.defaults.headers.common["X-CSRFToken"] = csrfToken;
豁免 CSRF 保护
from flask_wtf.csrf import CSRFProtect
csrf = CSRFProtect(app)
# 豁免特定视图
@app.route('/webhook', methods=['POST'])
@csrf.exempt
def webhook():
# 第三方回调,不需要 CSRF
return process_webhook(request.json)
# 豁免整个蓝图
from flask import Blueprint
api = Blueprint('api', __name__)
@csrf.exempt
@api.route('/data', methods=['POST'])
def api_data():
return jsonify(data)
# 使用装饰器豁免
from flask_wtf.csrf import CSRFProtect
csrf = CSRFProtect()
@app.route('/external-api', methods=['POST'])
@csrf.exempt
def external_api():
return 'OK'
第六章:表单安全最佳实践
输入验证
import re
import bleach
from wtforms.validators import ValidationError
class SecureForm(FlaskForm):
# 1. 白名单验证
status = SelectField('Status', choices=[
('draft', 'Draft'),
('published', 'Published'),
('archived', 'Archived')
])
# 2. 格式验证
phone = StringField('Phone', validators=[
Regexp(r'^\+?1?\d{10,14}$', message='Invalid phone number')
])
# 3. 长度限制
title = StringField('Title', validators=[
Length(min=1, max=200)
])
# 4. HTML 过滤
content = TextAreaField('Content')
def validate_content(self, field):
# 使用 bleach 过滤危险 HTML
allowed_tags = ['p', 'br', 'strong', 'em', 'a', 'ul', 'ol', 'li']
allowed_attrs = {'a': ['href', 'title']}
field.data = bleach.clean(
field.data,
tags=allowed_tags,
attributes=allowed_attrs,
strip=True
)
# 5. SQL 注入防护(使用参数化查询)
def validate_username(self, field):
# 不要这样做:
# User.query.filter(f"username = '{field.data}'")
#
# 正确方式(参数化):
user = User.query.filter_by(username=field.data).first()
防止重复提交
from flask import Flask, session, redirect, url_for
import uuid
app = Flask(__name__)
def generate_form_token():
"""生成表单令牌"""
token = str(uuid.uuid4())
session['form_token'] = token
return token
def validate_form_token(token):
"""验证并消费表单令牌"""
stored_token = session.pop('form_token', None)
return stored_token and stored_token == token
class OrderForm(FlaskForm):
form_token = HiddenField()
# ... 其他字段
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not self.form_token.data:
self.form_token.data = generate_form_token()
@app.route('/order', methods=['GET', 'POST'])
def create_order():
form = OrderForm()
if form.validate_on_submit():
if not validate_form_token(form.form_token.data):
flash('Form already submitted', 'error')
return redirect(url_for('create_order'))
# 处理订单
order = create_order_from_form(form)
# PRG 模式:Post-Redirect-Get
return redirect(url_for('order_success', order_id=order.id))
return render_template('order.html', form=form)
文件上传安全
import os
import magic
from werkzeug.utils import secure_filename
from flask_wtf.file import FileField, FileRequired, FileAllowed
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'pdf'}
ALLOWED_MIMETYPES = {
'image/png', 'image/jpeg', 'image/gif', 'application/pdf'
}
def validate_file(file):
"""安全验证上传文件"""
# 1. 检查文件名
filename = secure_filename(file.filename)
if not filename:
raise ValueError('Invalid filename')
# 2. 检查扩展名
ext = filename.rsplit('.', 1)[-1].lower() if '.' in filename else ''
if ext not in ALLOWED_EXTENSIONS:
raise ValueError('File extension not allowed')
# 3. 检查 MIME 类型(使用 python-magic)
file.seek(0)
mime = magic.from_buffer(file.read(2048), mime=True)
file.seek(0)
if mime not in ALLOWED_MIMETYPES:
raise ValueError('File type not allowed')
# 4. 检查文件大小(已通过 MAX_CONTENT_LENGTH 限制)
return True
@app.route('/upload', methods=['POST'])
def upload():
file = request.files.get('file')
try:
validate_file(file)
except ValueError as e:
return jsonify({'error': str(e)}), 400
# 生成安全的文件名
filename = secure_filename(file.filename)
unique_name = f"{uuid.uuid4().hex}_{filename}"
# 保存到安全目录(不在 Web 根目录)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], unique_name)
file.save(filepath)
return jsonify({'filename': unique_name})
速率限制
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)
@app.route('/login', methods=['POST'])
@limiter.limit("5 per minute") # 每分钟最多 5 次
def login():
form = LoginForm()
if form.validate_on_submit():
# 登录逻辑
pass
return render_template('login.html', form=form)
@app.route('/register', methods=['POST'])
@limiter.limit("3 per hour") # 每小时最多 3 次
def register():
form = RegistrationForm()
if form.validate_on_submit():
# 注册逻辑
pass
return render_template('register.html', form=form)
第七章:实战表单示例
完整用户注册表单
# forms.py
from flask_wtf import FlaskForm
from flask_wtf.file import FileField, FileAllowed
from wtforms import (
StringField, PasswordField, BooleanField,
SelectField, TextAreaField, DateField, SubmitField
)
from wtforms.validators import (
DataRequired, Email, Length, EqualTo, Regexp, Optional, ValidationError
)
from models import User
class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[
DataRequired('Please enter a username'),
Length(min=3, max=20, message='Username must be 3-20 characters'),
Regexp(
'^[A-Za-z][A-Za-z0-9_]*$',
message='Username must start with a letter and contain only letters, numbers, and underscores'
)
])
email = StringField('Email', validators=[
DataRequired('Please enter your email'),
Email('Please enter a valid email address')
])
password = PasswordField('Password', validators=[
DataRequired('Please enter a password'),
Length(min=8, message='Password must be at least 8 characters')
])
confirm_password = PasswordField('Confirm Password', validators=[
DataRequired('Please confirm your password'),
EqualTo('password', message='Passwords must match')
])
first_name = StringField('First Name', validators=[
Optional(),
Length(max=50)
])
last_name = StringField('Last Name', validators=[
Optional(),
Length(max=50)
])
birth_date = DateField('Birth Date', validators=[Optional()])
gender = SelectField('Gender', choices=[
('', 'Prefer not to say'),
('male', 'Male'),
('female', 'Female'),
('other', 'Other')
], validators=[Optional()])
bio = TextAreaField('Bio', validators=[
Optional(),
Length(max=500, message='Bio must not exceed 500 characters')
])
avatar = FileField('Profile Picture', validators=[
Optional(),
FileAllowed(['jpg', 'png', 'gif'], 'Images only!')
])
accept_terms = BooleanField('I accept the Terms of Service', validators=[
DataRequired('You must accept the terms to register')
])
newsletter = BooleanField('Subscribe to newsletter')
submit = SubmitField('Create Account')
def validate_username(self, field):
if User.query.filter_by(username=field.data.lower()).first():
raise ValidationError('This username is already taken')
def validate_email(self, field):
if User.query.filter_by(email=field.data.lower()).first():
raise ValidationError('This email is already registered')
def validate_password(self, field):
password = field.data
errors = []
if not any(c.isupper() for c in password):
errors.append('at least one uppercase letter')
if not any(c.islower() for c in password):
errors.append('at least one lowercase letter')
if not any(c.isdigit() for c in password):
errors.append('at least one number')
if errors:
raise ValidationError(f'Password must contain {", ".join(errors)}')
# views.py
from flask import render_template, redirect, url_for, flash
from forms import RegistrationForm
from models import User, db
@app.route('/register', methods=['GET', 'POST'])
def register():
if current_user.is_authenticated:
return redirect(url_for('dashboard'))
form = RegistrationForm()
if form.validate_on_submit():
user = User(
username=form.username.data.lower(),
email=form.email.data.lower(),
first_name=form.first_name.data,
last_name=form.last_name.data,
birth_date=form.birth_date.data,
gender=form.gender.data,
bio=form.bio.data,
newsletter=form.newsletter.data
)
user.set_password(form.password.data)
# 处理头像上传
if form.avatar.data:
filename = save_avatar(form.avatar.data)
user.avatar = filename
db.session.add(user)
db.session.commit()
# 发送验证邮件
send_verification_email(user)
flash('Registration successful! Please check your email to verify your account.', 'success')
return redirect(url_for('login'))
return render_template('auth/register.html', form=form)
多步骤表单向导
# forms.py
class Step1Form(FlaskForm):
"""基本信息"""
email = StringField('Email', validators=[DataRequired(), Email()])
username = StringField('Username', validators=[DataRequired(), Length(3, 20)])
next_step = SubmitField('Next')
class Step2Form(FlaskForm):
"""密码设置"""
password = PasswordField('Password', validators=[DataRequired(), Length(8)])
confirm = PasswordField('Confirm', validators=[EqualTo('password')])
prev_step = SubmitField('Back')
next_step = SubmitField('Next')
class Step3Form(FlaskForm):
"""个人资料"""
full_name = StringField('Full Name', validators=[Optional()])
bio = TextAreaField('Bio', validators=[Optional(), Length(max=500)])
prev_step = SubmitField('Back')
submit = SubmitField('Complete Registration')
# views.py
@app.route('/register/wizard', methods=['GET', 'POST'])
def register_wizard():
# 从 session 获取当前步骤和数据
step = session.get('register_step', 1)
form_data = session.get('register_data', {})
if step == 1:
form = Step1Form(data=form_data)
if form.validate_on_submit():
session['register_data'] = {
**form_data,
'email': form.email.data,
'username': form.username.data
}
session['register_step'] = 2
return redirect(url_for('register_wizard'))
elif step == 2:
form = Step2Form(data=form_data)
if 'prev_step' in request.form:
session['register_step'] = 1
return redirect(url_for('register_wizard'))
if form.validate_on_submit():
session['register_data'] = {
**form_data,
'password': form.password.data
}
session['register_step'] = 3
return redirect(url_for('register_wizard'))
elif step == 3:
form = Step3Form(data=form_data)
if 'prev_step' in request.form:
session['register_step'] = 2
return redirect(url_for('register_wizard'))
if form.validate_on_submit():
# 创建用户
data = session.get('register_data', {})
user = User(
email=data['email'],
username=data['username'],
full_name=form.full_name.data,
bio=form.bio.data
)
user.set_password(data['password'])
db.session.add(user)
db.session.commit()
# 清理 session
session.pop('register_step', None)
session.pop('register_data', None)
flash('Registration complete!', 'success')
return redirect(url_for('login'))
return render_template(
f'register/step{step}.html',
form=form,
step=step
)
总结
本章详细介绍了 Flask 表单处理:
- 原生表单处理:使用 request 对象处理表单数据
- Flask-WTF:表单类定义、验证、渲染
- 验证器:内置验证器、自定义验证器、跨字段验证
- 高级功能:动态表单、嵌套表单、文件上传
- CSRF 保护:配置、模板使用、AJAX 处理
- 安全最佳实践:输入验证、防重复提交、文件安全
- 实战示例:完整注册表单、多步骤向导
下一章我们将学习数据库操作与 SQLAlchemy。