测试与调试
2026/3/20大约 10 分钟
测试与调试
第一章:测试基础
测试类型
- 单元测试:测试最小代码单元(函数、方法)
- 集成测试:测试多个组件的协作
- 功能测试:测试完整的用户场景
- 端到端测试:测试整个应用流程
pytest 安装与配置
pip install pytest pytest-cov pytest-flask
# pytest.ini
[pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts = -v --tb=short
# conftest.py
import pytest
from app import create_app, db
from app.models import User
@pytest.fixture
def app():
"""创建测试应用"""
app = create_app('testing')
with app.app_context():
db.create_all()
yield app
db.session.remove()
db.drop_all()
@pytest.fixture
def client(app):
"""创建测试客户端"""
return app.test_client()
@pytest.fixture
def runner(app):
"""创建 CLI 测试运行器"""
return app.test_cli_runner()
@pytest.fixture
def auth_client(app, client):
"""创建已认证的测试客户端"""
with app.app_context():
user = User(username='testuser', email='test@example.com')
user.set_password('password123')
db.session.add(user)
db.session.commit()
# 登录
client.post('/auth/login', data={
'username': 'testuser',
'password': 'password123'
})
return client
第二章:Flask 测试客户端
基础请求测试
# tests/test_basic.py
def test_index(client):
"""测试首页"""
response = client.get('/')
assert response.status_code == 200
assert b'Welcome' in response.data
def test_not_found(client):
"""测试 404"""
response = client.get('/nonexistent')
assert response.status_code == 404
def test_redirect(client):
"""测试重定向"""
response = client.get('/old-url')
assert response.status_code == 302
assert response.location.endswith('/new-url')
# 跟随重定向
response = client.get('/old-url', follow_redirects=True)
assert response.status_code == 200
POST 请求测试
def test_login_success(client):
"""测试登录成功"""
# 先创建用户
with client.application.app_context():
user = User(username='alice', email='alice@example.com')
user.set_password('password123')
db.session.add(user)
db.session.commit()
# 登录
response = client.post('/auth/login', data={
'username': 'alice',
'password': 'password123'
}, follow_redirects=True)
assert response.status_code == 200
assert b'Dashboard' in response.data
def test_login_failure(client):
"""测试登录失败"""
response = client.post('/auth/login', data={
'username': 'nonexistent',
'password': 'wrongpassword'
})
assert response.status_code == 200
assert b'Invalid username or password' in response.data
def test_create_post(auth_client):
"""测试创建文章"""
response = auth_client.post('/posts/create', data={
'title': 'Test Post',
'content': 'This is test content.'
}, follow_redirects=True)
assert response.status_code == 200
assert b'Test Post' in response.data
JSON API 测试
def test_api_get_users(client):
"""测试获取用户列表 API"""
response = client.get('/api/users')
assert response.status_code == 200
assert response.content_type == 'application/json'
data = response.get_json()
assert 'users' in data
assert isinstance(data['users'], list)
def test_api_create_user(client):
"""测试创建用户 API"""
response = client.post('/api/users', json={
'username': 'newuser',
'email': 'new@example.com',
'password': 'password123'
})
assert response.status_code == 201
data = response.get_json()
assert data['username'] == 'newuser'
assert 'id' in data
def test_api_with_auth(client):
"""测试需要认证的 API"""
# 先登录获取 token
login_response = client.post('/api/auth/login', json={
'username': 'testuser',
'password': 'password123'
})
token = login_response.get_json()['access_token']
# 带 token 请求
response = client.get('/api/profile', headers={
'Authorization': f'Bearer {token}'
})
assert response.status_code == 200
def test_api_validation_error(client):
"""测试数据验证错误"""
response = client.post('/api/users', json={
'username': 'a', # 太短
'email': 'invalid-email' # 无效格式
})
assert response.status_code == 400
data = response.get_json()
assert 'errors' in data
文件上传测试
import io
def test_file_upload(auth_client):
"""测试文件上传"""
data = {
'file': (io.BytesIO(b'test file content'), 'test.txt')
}
response = auth_client.post(
'/upload',
data=data,
content_type='multipart/form-data'
)
assert response.status_code == 200
assert b'uploaded' in response.data
def test_image_upload(auth_client):
"""测试图片上传"""
# 创建一个简单的 PNG 图片
png_content = b'\x89PNG\r\n\x1a\n...' # 简化的 PNG 头
data = {
'image': (io.BytesIO(png_content), 'test.png', 'image/png')
}
response = auth_client.post(
'/upload/image',
data=data,
content_type='multipart/form-data'
)
assert response.status_code == 200
第三章:模型测试
基础模型测试
# tests/test_models.py
import pytest
from app.models import User, Post
from app import db
class TestUserModel:
def test_password_hashing(self, app):
"""测试密码哈希"""
with app.app_context():
user = User(username='test', email='test@example.com')
user.set_password('password123')
assert user.password_hash is not None
assert user.password_hash != 'password123'
assert user.check_password('password123')
assert not user.check_password('wrongpassword')
def test_user_creation(self, app):
"""测试用户创建"""
with app.app_context():
user = User(
username='alice',
email='alice@example.com'
)
user.set_password('password')
db.session.add(user)
db.session.commit()
assert user.id is not None
assert User.query.count() == 1
def test_unique_username(self, app):
"""测试用户名唯一性"""
with app.app_context():
user1 = User(username='alice', email='alice1@example.com')
user1.set_password('password')
db.session.add(user1)
db.session.commit()
user2 = User(username='alice', email='alice2@example.com')
user2.set_password('password')
db.session.add(user2)
with pytest.raises(Exception): # IntegrityError
db.session.commit()
def test_user_repr(self, app):
"""测试用户字符串表示"""
with app.app_context():
user = User(username='test', email='test@example.com')
assert '<User test>' in repr(user)
class TestPostModel:
def test_create_post(self, app):
"""测试创建文章"""
with app.app_context():
user = User(username='author', email='author@example.com')
user.set_password('password')
db.session.add(user)
db.session.commit()
post = Post(
title='Test Post',
content='Test content',
author=user
)
db.session.add(post)
db.session.commit()
assert post.id is not None
assert post.author.username == 'author'
def test_user_posts_relationship(self, app):
"""测试用户与文章关系"""
with app.app_context():
user = User(username='author', email='author@example.com')
user.set_password('password')
db.session.add(user)
post1 = Post(title='Post 1', content='Content 1', author=user)
post2 = Post(title='Post 2', content='Content 2', author=user)
db.session.add_all([post1, post2])
db.session.commit()
assert user.posts.count() == 2
复杂查询测试
class TestUserQueries:
@pytest.fixture
def sample_users(self, app):
"""创建测试用户"""
with app.app_context():
users = [
User(username=f'user{i}', email=f'user{i}@example.com', is_active=i % 2 == 0)
for i in range(10)
]
for u in users:
u.set_password('password')
db.session.add_all(users)
db.session.commit()
return users
def test_filter_active_users(self, app, sample_users):
"""测试过滤活跃用户"""
with app.app_context():
active_users = User.query.filter_by(is_active=True).all()
assert len(active_users) == 5
def test_search_users(self, app, sample_users):
"""测试搜索用户"""
with app.app_context():
results = User.query.filter(
User.username.ilike('%user1%')
).all()
assert len(results) == 1
def test_pagination(self, app, sample_users):
"""测试分页"""
with app.app_context():
page1 = User.query.paginate(page=1, per_page=5)
assert page1.total == 10
assert page1.pages == 2
assert len(page1.items) == 5
assert page1.has_next
assert not page1.has_prev
第四章:视图测试
认证测试
# tests/test_auth.py
class TestAuth:
def test_login_page(self, client):
"""测试登录页面显示"""
response = client.get('/auth/login')
assert response.status_code == 200
assert b'Login' in response.data
def test_register(self, client):
"""测试用户注册"""
response = client.post('/auth/register', data={
'username': 'newuser',
'email': 'new@example.com',
'password': 'password123',
'confirm_password': 'password123'
}, follow_redirects=True)
assert response.status_code == 200
assert b'Registration successful' in response.data
def test_register_duplicate_email(self, client, app):
"""测试重复邮箱注册"""
with app.app_context():
user = User(username='existing', email='existing@example.com')
user.set_password('password')
db.session.add(user)
db.session.commit()
response = client.post('/auth/register', data={
'username': 'newuser',
'email': 'existing@example.com',
'password': 'password123',
'confirm_password': 'password123'
})
assert b'Email already registered' in response.data
def test_logout(self, auth_client):
"""测试登出"""
response = auth_client.get('/auth/logout', follow_redirects=True)
assert response.status_code == 200
def test_protected_route(self, client):
"""测试受保护路由"""
response = client.get('/dashboard')
assert response.status_code == 302 # 重定向到登录页
def test_protected_route_authenticated(self, auth_client):
"""测试认证后访问受保护路由"""
response = auth_client.get('/dashboard')
assert response.status_code == 200
表单测试
# tests/test_forms.py
from app.forms import LoginForm, RegistrationForm
class TestLoginForm:
def test_valid_form(self, app):
"""测试有效的登录表单"""
with app.test_request_context():
form = LoginForm(data={
'username': 'testuser',
'password': 'password123'
})
assert form.validate()
def test_empty_username(self, app):
"""测试空用户名"""
with app.test_request_context():
form = LoginForm(data={
'username': '',
'password': 'password123'
})
assert not form.validate()
assert 'username' in form.errors
def test_short_password(self, app):
"""测试过短密码"""
with app.test_request_context():
form = LoginForm(data={
'username': 'testuser',
'password': '123'
})
assert not form.validate()
assert 'password' in form.errors
class TestRegistrationForm:
def test_password_mismatch(self, app):
"""测试密码不匹配"""
with app.test_request_context():
form = RegistrationForm(data={
'username': 'testuser',
'email': 'test@example.com',
'password': 'password123',
'confirm_password': 'different'
})
assert not form.validate()
assert 'confirm_password' in form.errors
def test_invalid_email(self, app):
"""测试无效邮箱"""
with app.test_request_context():
form = RegistrationForm(data={
'username': 'testuser',
'email': 'invalid-email',
'password': 'password123',
'confirm_password': 'password123'
})
assert not form.validate()
assert 'email' in form.errors
第五章:API 测试
RESTful API 测试
# tests/test_api.py
import pytest
class TestUserAPI:
@pytest.fixture
def api_token(self, client, app):
"""获取 API 令牌"""
with app.app_context():
user = User(username='apiuser', email='api@example.com')
user.set_password('password123')
db.session.add(user)
db.session.commit()
response = client.post('/api/auth/login', json={
'username': 'apiuser',
'password': 'password123'
})
return response.get_json()['access_token']
def test_get_users_list(self, client, api_token):
"""测试获取用户列表"""
response = client.get('/api/users', headers={
'Authorization': f'Bearer {api_token}'
})
assert response.status_code == 200
data = response.get_json()
assert 'users' in data
assert 'pagination' in data
def test_get_single_user(self, client, api_token, app):
"""测试获取单个用户"""
with app.app_context():
user = User.query.first()
user_id = user.id
response = client.get(f'/api/users/{user_id}', headers={
'Authorization': f'Bearer {api_token}'
})
assert response.status_code == 200
data = response.get_json()
assert data['id'] == user_id
def test_create_user(self, client, api_token):
"""测试创建用户"""
response = client.post('/api/users', json={
'username': 'newuser',
'email': 'new@example.com',
'password': 'password123'
}, headers={
'Authorization': f'Bearer {api_token}'
})
assert response.status_code == 201
data = response.get_json()
assert data['username'] == 'newuser'
def test_update_user(self, client, api_token, app):
"""测试更新用户"""
with app.app_context():
user = User.query.first()
user_id = user.id
response = client.put(f'/api/users/{user_id}', json={
'username': 'updated_name'
}, headers={
'Authorization': f'Bearer {api_token}'
})
assert response.status_code == 200
data = response.get_json()
assert data['username'] == 'updated_name'
def test_delete_user(self, client, api_token, app):
"""测试删除用户"""
with app.app_context():
user = User(username='todelete', email='delete@example.com')
user.set_password('password')
db.session.add(user)
db.session.commit()
user_id = user.id
response = client.delete(f'/api/users/{user_id}', headers={
'Authorization': f'Bearer {api_token}'
})
assert response.status_code == 204
def test_unauthorized_access(self, client):
"""测试未授权访问"""
response = client.get('/api/users')
assert response.status_code == 401
def test_invalid_token(self, client):
"""测试无效令牌"""
response = client.get('/api/users', headers={
'Authorization': 'Bearer invalid_token'
})
assert response.status_code == 401
API 错误处理测试
class TestAPIErrors:
def test_validation_error(self, client, api_token):
"""测试数据验证错误"""
response = client.post('/api/users', json={
'username': 'a', # 太短
'email': 'invalid' # 无效格式
}, headers={
'Authorization': f'Bearer {api_token}'
})
assert response.status_code == 400
data = response.get_json()
assert 'errors' in data
def test_not_found_error(self, client, api_token):
"""测试资源不存在"""
response = client.get('/api/users/99999', headers={
'Authorization': f'Bearer {api_token}'
})
assert response.status_code == 404
data = response.get_json()
assert 'error' in data
def test_conflict_error(self, client, api_token, app):
"""测试资源冲突"""
with app.app_context():
user = User(username='existing', email='existing@example.com')
user.set_password('password')
db.session.add(user)
db.session.commit()
response = client.post('/api/users', json={
'username': 'existing',
'email': 'new@example.com',
'password': 'password123'
}, headers={
'Authorization': f'Bearer {api_token}'
})
assert response.status_code in [400, 409]
第六章:Mock 与测试替身
使用 unittest.mock
from unittest.mock import Mock, patch, MagicMock
def test_send_email_called(client, app):
"""测试邮件发送被调用"""
with patch('app.email.send_email') as mock_send:
mock_send.return_value = True
response = client.post('/auth/register', data={
'username': 'newuser',
'email': 'new@example.com',
'password': 'password123',
'confirm_password': 'password123'
})
mock_send.assert_called_once()
call_args = mock_send.call_args[0]
assert call_args[0] == 'new@example.com'
def test_external_api_call(client):
"""测试外部 API 调用"""
with patch('requests.get') as mock_get:
mock_get.return_value = Mock(
status_code=200,
json=lambda: {'data': 'mocked'}
)
response = client.get('/api/external-data')
assert response.status_code == 200
assert response.get_json()['data'] == 'mocked'
def test_database_error(client, app):
"""测试数据库错误处理"""
with patch.object(db.session, 'commit') as mock_commit:
mock_commit.side_effect = Exception('Database error')
response = client.post('/api/users', json={
'username': 'newuser',
'email': 'new@example.com',
'password': 'password123'
})
assert response.status_code == 500
Factory Boy 测试数据工厂
pip install factory-boy
# tests/factories.py
import factory
from factory.alchemy import SQLAlchemyModelFactory
from app import db
from app.models import User, Post
class UserFactory(SQLAlchemyModelFactory):
class Meta:
model = User
sqlalchemy_session = db.session
username = factory.Sequence(lambda n: f'user{n}')
email = factory.LazyAttribute(lambda obj: f'{obj.username}@example.com')
is_active = True
@factory.lazy_attribute
def password_hash(self):
from werkzeug.security import generate_password_hash
return generate_password_hash('password123')
class PostFactory(SQLAlchemyModelFactory):
class Meta:
model = Post
sqlalchemy_session = db.session
title = factory.Faker('sentence')
content = factory.Faker('paragraph')
author = factory.SubFactory(UserFactory)
# 使用
def test_with_factory(app):
with app.app_context():
user = UserFactory()
posts = PostFactory.create_batch(5, author=user)
assert user.posts.count() == 5
Faker 测试数据生成
from faker import Faker
fake = Faker('zh_CN')
@pytest.fixture
def fake_user_data():
return {
'username': fake.user_name(),
'email': fake.email(),
'password': fake.password(length=12)
}
def test_create_user_with_fake_data(client, fake_user_data):
response = client.post('/api/users', json={
**fake_user_data,
'password': 'password123'
})
assert response.status_code == 201
第七章:测试覆盖率
配置覆盖率
# .coveragerc
[run]
source = app
omit =
app/__init__.py
app/config.py
*/__pycache__/*
*/tests/*
[report]
exclude_lines =
pragma: no cover
def __repr__
raise NotImplementedError
if __name__ == .__main__.:
[html]
directory = htmlcov
运行覆盖率测试
# 运行测试并生成覆盖率报告
pytest --cov=app --cov-report=html --cov-report=term-missing
# 查看覆盖率摘要
pytest --cov=app --cov-report=term
# 设置覆盖率阈值
pytest --cov=app --cov-fail-under=80
覆盖率报告分析
# 确保关键路径被测试
def test_all_routes_covered(client, app):
"""测试所有路由都被访问"""
routes_tested = []
for rule in app.url_map.iter_rules():
if 'GET' in rule.methods and not rule.endpoint.startswith('static'):
routes_tested.append(rule.rule)
# 确保至少有一个测试覆盖每个路由
# 这里可以添加具体的断言
第八章:调试技巧
Flask 调试模式
# 开发环境启用调试
app.run(debug=True)
# 或通过配置
app.config['DEBUG'] = True
# 环境变量
# export FLASK_DEBUG=1
Flask Debug Toolbar
pip install flask-debugtoolbar
from flask import Flask
from flask_debugtoolbar import DebugToolbarExtension
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['SECRET_KEY'] = 'development-key'
app.config['DEBUG_TB_INTERCEPT_REDIRECTS'] = False
toolbar = DebugToolbarExtension(app)
日志调试
import logging
from logging.handlers import RotatingFileHandler
def configure_logging(app):
# 控制台日志
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
# 文件日志
file_handler = RotatingFileHandler(
'logs/debug.log',
maxBytes=10240,
backupCount=10
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'
))
app.logger.addHandler(console_handler)
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.DEBUG)
# 使用日志
@app.route('/debug')
def debug_route():
app.logger.debug('Debug message')
app.logger.info('Info message')
app.logger.warning('Warning message')
app.logger.error('Error message')
return 'Check logs'
pdb 调试
# 在代码中设置断点
@app.route('/debug')
def debug_route():
data = get_some_data()
import pdb; pdb.set_trace() # 断点
return process_data(data)
# 使用 breakpoint() (Python 3.7+)
@app.route('/debug')
def debug_route():
data = get_some_data()
breakpoint() # 断点
return process_data(data)
# 运行时指定
# python -m pdb run.py
# pdb 常用命令
# n - 下一行
# s - 进入函数
# c - 继续执行
# p variable - 打印变量
# l - 查看代码
# q - 退出
请求上下文调试
# 在 Shell 中调试
# flask shell
from app import create_app, db
from app.models import User
app = create_app()
ctx = app.app_context()
ctx.push()
# 现在可以使用应用上下文
users = User.query.all()
# 完成后清理
ctx.pop()
性能分析
from flask import Flask
from werkzeug.middleware.profiler import ProfilerMiddleware
app = Flask(__name__)
# 添加性能分析中间件
app.wsgi_app = ProfilerMiddleware(
app.wsgi_app,
restrictions=[30], # 只显示前 30 个
profile_dir='profiles'
)
# 使用 line_profiler
# pip install line_profiler
from line_profiler import profile
@profile
def slow_function():
# 函数代码
pass
# 使用 memory_profiler
# pip install memory_profiler
from memory_profiler import profile
@profile
def memory_heavy_function():
# 函数代码
pass
第九章:持续集成
GitHub Actions
# .github/workflows/tests.yml
name: Tests
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:13
env:
POSTGRES_USER: test
POSTGRES_PASSWORD: test
POSTGRES_DB: test
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.11"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements/dev.txt
- name: Run tests
env:
DATABASE_URL: postgresql://test:test@localhost:5432/test
SECRET_KEY: test-secret-key
run: |
pytest --cov=app --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
测试配置
# config.py
class TestingConfig(Config):
TESTING = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
WTF_CSRF_ENABLED = False
SERVER_NAME = 'localhost'
# 禁用邮件发送
MAIL_SUPPRESS_SEND = True
# 禁用缓存
CACHE_TYPE = 'null'
第十章:测试最佳实践
测试组织
tests/
├── conftest.py # 共享 fixtures
├── factories.py # 测试数据工厂
├── unit/ # 单元测试
│ ├── test_models.py
│ ├── test_forms.py
│ └── test_utils.py
├── integration/ # 集成测试
│ ├── test_auth.py
│ ├── test_views.py
│ └── test_api.py
└── e2e/ # 端到端测试
├── test_user_flow.py
└── test_checkout.py
测试命名规范
# 使用描述性名称
def test_user_can_login_with_valid_credentials():
pass
def test_user_cannot_login_with_invalid_password():
pass
def test_creating_post_requires_authentication():
pass
# 使用类组织相关测试
class TestUserAuthentication:
def test_login_success(self):
pass
def test_login_failure(self):
pass
def test_logout(self):
pass
AAA 模式
def test_create_user():
# Arrange - 准备
user_data = {
'username': 'testuser',
'email': 'test@example.com',
'password': 'password123'
}
# Act - 执行
response = client.post('/api/users', json=user_data)
# Assert - 断言
assert response.status_code == 201
assert response.get_json()['username'] == 'testuser'
测试隔离
@pytest.fixture(autouse=True)
def clean_db(app):
"""每个测试后清理数据库"""
yield
with app.app_context():
db.session.remove()
db.drop_all()
db.create_all()
@pytest.fixture
def isolated_client(app):
"""隔离的测试客户端"""
with app.test_client() as client:
with app.app_context():
db.create_all()
yield client
with app.app_context():
db.drop_all()
总结
本章详细介绍了 Flask 测试与调试:
- 测试基础:pytest 配置、fixtures
- 测试客户端:请求测试、表单测试、文件上传
- 模型测试:CRUD 操作、关系、查询
- 视图测试:认证、表单验证
- API 测试:RESTful API、错误处理
- Mock:unittest.mock、Factory Boy、Faker
- 覆盖率:配置、报告、阈值
- 调试:Debug Toolbar、日志、pdb、性能分析
- 持续集成:GitHub Actions
- 最佳实践:组织、命名、AAA 模式
下一章我们将学习部署与生产环境。