数据库操作
2026/3/20大约 12 分钟
数据库操作
第一章:Flask-SQLAlchemy 入门
什么是 SQLAlchemy?
SQLAlchemy 是 Python 中最流行的 ORM(对象关系映射)库。Flask-SQLAlchemy 是 SQLAlchemy 的 Flask 扩展,简化了配置和使用。
安装与配置
pip install flask-sqlalchemy
# 可选:安装数据库驱动
pip install pymysql # MySQL
pip install psycopg2-binary # PostgreSQL
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 数据库配置
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True # 调试:打印 SQL 语句
db = SQLAlchemy(app)
数据库 URI 格式
# SQLite
SQLALCHEMY_DATABASE_URI = 'sqlite:///app.db' # 相对路径
SQLALCHEMY_DATABASE_URI = 'sqlite:////absolute/path/to/app.db' # 绝对路径
SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:' # 内存数据库
# MySQL
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://user:password@localhost:3306/dbname'
# 带字符集
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://user:password@localhost/dbname?charset=utf8mb4'
# PostgreSQL
SQLALCHEMY_DATABASE_URI = 'postgresql://user:password@localhost:5432/dbname'
# 使用 psycopg2
SQLALCHEMY_DATABASE_URI = 'postgresql+psycopg2://user:password@localhost/dbname'
# 从环境变量读取
import os
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///app.db'
使用工厂函数
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
def create_app(config_name='default'):
app = Flask(__name__)
# 加载配置
app.config.from_object(config[config_name])
# 初始化扩展
db.init_app(app)
# 注册蓝图
from .main import main as main_blueprint
app.register_blueprint(main_blueprint)
# 创建数据库表
with app.app_context():
db.create_all()
return app
第二章:模型定义
基础模型
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
db = SQLAlchemy()
class User(db.Model):
__tablename__ = 'users' # 表名(可选,默认为类名小写)
# 主键
id = db.Column(db.Integer, primary_key=True)
# 字符串字段
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
# 密码(不要存储明文)
password_hash = db.Column(db.String(256))
# 布尔值
is_active = db.Column(db.Boolean, default=True)
# 时间戳
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f'<User {self.username}>'
def set_password(self, password):
from werkzeug.security import generate_password_hash
self.password_hash = generate_password_hash(password)
def check_password(self, password):
from werkzeug.security import check_password_hash
return check_password_hash(self.password_hash, password)
常用字段类型
from sqlalchemy.dialects.postgresql import JSONB, ARRAY
class Product(db.Model):
__tablename__ = 'products'
id = db.Column(db.Integer, primary_key=True)
# 字符串类型
name = db.Column(db.String(100), nullable=False) # VARCHAR
description = db.Column(db.Text) # TEXT
sku = db.Column(db.String(50), unique=True)
# 数值类型
price = db.Column(db.Numeric(10, 2), nullable=False) # DECIMAL
quantity = db.Column(db.Integer, default=0)
weight = db.Column(db.Float)
rating = db.Column(db.SmallInteger) # 小整数
# 布尔类型
is_available = db.Column(db.Boolean, default=True)
# 日期时间类型
created_at = db.Column(db.DateTime, default=datetime.utcnow)
release_date = db.Column(db.Date)
sale_time = db.Column(db.Time)
# 二进制类型
thumbnail = db.Column(db.LargeBinary) # BLOB
# 枚举类型
status = db.Column(
db.Enum('draft', 'published', 'archived', name='product_status'),
default='draft'
)
# JSON 类型(SQLAlchemy 1.1+)
metadata_json = db.Column(db.JSON)
# PostgreSQL 特有类型
# tags = db.Column(ARRAY(db.String))
# extra = db.Column(JSONB)
字段选项
class Article(db.Model):
id = db.Column(db.Integer, primary_key=True)
# 常用选项
title = db.Column(
db.String(200),
nullable=False, # 不允许 NULL
unique=True, # 唯一约束
index=True, # 创建索引
default='Untitled' # 默认值
)
# 服务器默认值
created_at = db.Column(
db.DateTime,
server_default=db.func.now() # 数据库级别默认值
)
# 更新时自动修改
updated_at = db.Column(
db.DateTime,
default=datetime.utcnow,
onupdate=datetime.utcnow
)
# 自定义列名
author_name = db.Column('author', db.String(100))
# 注释(MySQL 8.0+)
content = db.Column(db.Text, comment='Article content')
复合主键和索引
class OrderItem(db.Model):
__tablename__ = 'order_items'
# 复合主键
order_id = db.Column(db.Integer, db.ForeignKey('orders.id'), primary_key=True)
product_id = db.Column(db.Integer, db.ForeignKey('products.id'), primary_key=True)
quantity = db.Column(db.Integer, default=1)
price = db.Column(db.Numeric(10, 2))
# 复合索引
__table_args__ = (
db.Index('idx_order_product', 'order_id', 'product_id'),
db.UniqueConstraint('order_id', 'product_id', name='uq_order_product'),
db.CheckConstraint('quantity > 0', name='ck_quantity_positive'),
)
第三章:关系定义
一对多关系
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
# 一对多关系:一个用户有多篇文章
posts = db.relationship('Post', backref='author', lazy='dynamic')
class Post(db.Model):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.Text)
# 外键
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
# 使用
user = User.query.get(1)
user.posts # 获取用户的所有文章
post = Post.query.get(1)
post.author # 获取文章的作者
一对一关系
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80))
# 一对一关系
profile = db.relationship('Profile', backref='user', uselist=False)
class Profile(db.Model):
__tablename__ = 'profiles'
id = db.Column(db.Integer, primary_key=True)
bio = db.Column(db.Text)
avatar = db.Column(db.String(200))
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), unique=True)
# 使用
user.profile # 返回单个 Profile 对象,而不是列表
多对多关系
# 关联表
post_tags = db.Table('post_tags',
db.Column('post_id', db.Integer, db.ForeignKey('posts.id'), primary_key=True),
db.Column('tag_id', db.Integer, db.ForeignKey('tags.id'), primary_key=True)
)
class Post(db.Model):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200))
# 多对多关系
tags = db.relationship('Tag', secondary=post_tags, backref='posts')
class Tag(db.Model):
__tablename__ = 'tags'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True)
# 使用
post = Post.query.get(1)
post.tags # 获取文章的所有标签
tag = Tag.query.get(1)
tag.posts # 获取标签的所有文章
# 添加标签
post.tags.append(tag)
db.session.commit()
# 移除标签
post.tags.remove(tag)
db.session.commit()
带额外字段的多对多
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80))
class Course(db.Model):
__tablename__ = 'courses'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100))
# 关联模型(带额外字段)
class Enrollment(db.Model):
__tablename__ = 'enrollments'
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), primary_key=True)
course_id = db.Column(db.Integer, db.ForeignKey('courses.id'), primary_key=True)
# 额外字段
enrolled_at = db.Column(db.DateTime, default=datetime.utcnow)
grade = db.Column(db.Float)
status = db.Column(db.String(20), default='enrolled')
# 关系
user = db.relationship('User', backref='enrollments')
course = db.relationship('Course', backref='enrollments')
# 使用
enrollment = Enrollment(user_id=1, course_id=1, grade=95.5)
db.session.add(enrollment)
db.session.commit()
# 查询
user = User.query.get(1)
for enrollment in user.enrollments:
print(f'{enrollment.course.name}: {enrollment.grade}')
自引用关系
class Category(db.Model):
__tablename__ = 'categories'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100))
parent_id = db.Column(db.Integer, db.ForeignKey('categories.id'))
# 自引用关系
children = db.relationship(
'Category',
backref=db.backref('parent', remote_side=[id]),
lazy='dynamic'
)
# 使用
root = Category(name='Electronics')
child = Category(name='Phones', parent=root)
db.session.add_all([root, child])
db.session.commit()
root.children.all() # [<Category Phones>]
child.parent # <Category Electronics>
关系选项详解
class User(db.Model):
posts = db.relationship(
'Post',
backref='author',
# lazy 选项:控制如何加载关联对象
# 'select' - 默认,访问时执行单独查询
# 'dynamic' - 返回查询对象,可继续过滤
# 'joined' - 使用 JOIN 一次性加载
# 'subquery' - 使用子查询加载
# 'selectin' - 使用 IN 查询加载(推荐用于一对多)
lazy='dynamic',
# 级联删除
cascade='all, delete-orphan',
# 被动删除(设置外键为 NULL)
passive_deletes=True,
# 排序
order_by='Post.created_at.desc()',
# 关联条件
primaryjoin='User.id == Post.user_id'
)
# cascade 选项
# 'save-update' - 添加到 session 时级联
# 'merge' - merge() 时级联
# 'delete' - 删除时级联删除关联对象
# 'delete-orphan' - 解除关联时删除孤儿对象
# 'all' - 包含 save-update, merge, delete
第四章:CRUD 操作
创建(Create)
# 创建单个对象
user = User(username='alice', email='alice@example.com')
user.set_password('password123')
db.session.add(user)
db.session.commit()
# 获取新创建对象的 ID
print(user.id) # 自动填充
# 批量创建
users = [
User(username='bob', email='bob@example.com'),
User(username='charlie', email='charlie@example.com'),
]
db.session.add_all(users)
db.session.commit()
# 带关系的创建
post = Post(
title='Hello World',
content='This is my first post',
author=user # 直接设置关系
)
db.session.add(post)
db.session.commit()
# 或者
post = Post(title='Hello', content='...')
post.user_id = user.id
db.session.add(post)
db.session.commit()
查询(Read)
# 获取所有记录
users = User.query.all()
# 按主键获取
user = User.query.get(1) # 废弃警告
user = db.session.get(User, 1) # SQLAlchemy 2.0 推荐
# 获取第一条
user = User.query.first()
# 条件查询
user = User.query.filter_by(username='alice').first()
user = User.query.filter(User.username == 'alice').first()
# 获取或 404
from flask import abort
user = User.query.get_or_404(1)
user = User.query.filter_by(username='alice').first_or_404()
# 统计
count = User.query.count()
count = User.query.filter(User.is_active == True).count()
# 检查是否存在
exists = db.session.query(
User.query.filter_by(username='alice').exists()
).scalar()
过滤条件
from sqlalchemy import and_, or_, not_, func
# 等于
User.query.filter(User.username == 'alice')
User.query.filter_by(username='alice')
# 不等于
User.query.filter(User.username != 'alice')
# 比较
User.query.filter(User.age > 18)
User.query.filter(User.age >= 18)
User.query.filter(User.age < 60)
User.query.filter(User.age <= 60)
User.query.filter(User.age.between(18, 60))
# LIKE
User.query.filter(User.username.like('%alice%'))
User.query.filter(User.username.ilike('%ALICE%')) # 不区分大小写
User.query.filter(User.username.startswith('a'))
User.query.filter(User.username.endswith('e'))
User.query.filter(User.username.contains('lic'))
# IN
User.query.filter(User.id.in_([1, 2, 3]))
User.query.filter(~User.id.in_([1, 2, 3])) # NOT IN
# IS NULL
User.query.filter(User.bio.is_(None))
User.query.filter(User.bio.isnot(None))
# AND
User.query.filter(and_(User.is_active == True, User.age > 18))
User.query.filter(User.is_active == True, User.age > 18) # 隐式 AND
User.query.filter(User.is_active == True).filter(User.age > 18)
# OR
User.query.filter(or_(User.username == 'alice', User.username == 'bob'))
# NOT
User.query.filter(not_(User.is_active))
User.query.filter(~User.is_active)
# 正则表达式(PostgreSQL)
User.query.filter(User.username.op('~')('[a-z]+'))
排序和分页
# 排序
User.query.order_by(User.created_at) # 升序
User.query.order_by(User.created_at.desc()) # 降序
User.query.order_by(User.last_name, User.first_name) # 多字段排序
# 限制数量
User.query.limit(10).all()
User.query.offset(20).limit(10).all()
# 分页
page = request.args.get('page', 1, type=int)
per_page = 20
# 方式一:使用 paginate
pagination = User.query.order_by(User.created_at.desc()).paginate(
page=page,
per_page=per_page,
error_out=False # 页码超出时不报错
)
users = pagination.items # 当前页的数据
total = pagination.total # 总记录数
pages = pagination.pages # 总页数
has_prev = pagination.has_prev # 是否有上一页
has_next = pagination.has_next # 是否有下一页
# 方式二:手动分页
users = User.query.order_by(User.created_at.desc()) \
.offset((page - 1) * per_page) \
.limit(per_page) \
.all()
聚合查询
from sqlalchemy import func
# 计数
count = db.session.query(func.count(User.id)).scalar()
# 求和
total = db.session.query(func.sum(Order.amount)).scalar()
# 平均值
avg = db.session.query(func.avg(Product.price)).scalar()
# 最大/最小值
max_price = db.session.query(func.max(Product.price)).scalar()
min_price = db.session.query(func.min(Product.price)).scalar()
# 分组统计
results = db.session.query(
User.role,
func.count(User.id).label('count')
).group_by(User.role).all()
for role, count in results:
print(f'{role}: {count}')
# 分组过滤(HAVING)
results = db.session.query(
Post.user_id,
func.count(Post.id).label('post_count')
).group_by(Post.user_id).having(func.count(Post.id) > 5).all()
# 去重
unique_cities = db.session.query(User.city.distinct()).all()
更新(Update)
# 方式一:查询后修改
user = User.query.get(1)
user.username = 'new_username'
user.email = 'new@example.com'
db.session.commit()
# 方式二:批量更新
User.query.filter(User.is_active == False).update({
'is_active': True,
'updated_at': datetime.utcnow()
})
db.session.commit()
# 使用 synchronize_session
User.query.filter(User.role == 'guest').update(
{'role': 'member'},
synchronize_session='fetch' # 或 'evaluate', False
)
db.session.commit()
# 原子操作(递增)
User.query.filter_by(id=1).update({
'login_count': User.login_count + 1
})
db.session.commit()
删除(Delete)
# 方式一:查询后删除
user = User.query.get(1)
db.session.delete(user)
db.session.commit()
# 方式二:批量删除
User.query.filter(User.is_active == False).delete()
db.session.commit()
# 级联删除(需要在关系中配置 cascade)
user = User.query.get(1)
db.session.delete(user) # 同时删除用户的所有文章
db.session.commit()
# 软删除(推荐)
class User(db.Model):
deleted_at = db.Column(db.DateTime, nullable=True)
@property
def is_deleted(self):
return self.deleted_at is not None
def soft_delete(self):
self.deleted_at = datetime.utcnow()
# 使用软删除
user = User.query.get(1)
user.soft_delete()
db.session.commit()
# 查询时排除已删除
User.query.filter(User.deleted_at.is_(None)).all()
第五章:关联查询
预加载(Eager Loading)
from sqlalchemy.orm import joinedload, selectinload, subqueryload
# 问题:N+1 查询
users = User.query.all()
for user in users:
print(user.posts) # 每次访问都执行查询
# 解决方案一:joinedload(使用 JOIN)
users = User.query.options(joinedload(User.posts)).all()
# 解决方案二:selectinload(使用 IN)
users = User.query.options(selectinload(User.posts)).all()
# 解决方案三:subqueryload(使用子查询)
users = User.query.options(subqueryload(User.posts)).all()
# 嵌套预加载
users = User.query.options(
joinedload(User.posts).joinedload(Post.comments)
).all()
# 多个关系预加载
users = User.query.options(
selectinload(User.posts),
selectinload(User.comments)
).all()
JOIN 查询
from sqlalchemy import join
# 内连接
results = db.session.query(User, Post).join(Post).all()
# 指定连接条件
results = db.session.query(User, Post).join(
Post, User.id == Post.user_id
).all()
# 左外连接
results = db.session.query(User, Post).outerjoin(Post).all()
# 多表连接
results = db.session.query(User, Post, Comment).join(
Post, User.id == Post.user_id
).join(
Comment, Post.id == Comment.post_id
).all()
# 使用关系名连接
results = db.session.query(User).join(User.posts).filter(
Post.status == 'published'
).all()
# 聚合 + JOIN
results = db.session.query(
User.username,
func.count(Post.id).label('post_count')
).outerjoin(Post).group_by(User.id).all()
子查询
from sqlalchemy import select
# 标量子查询
subq = db.session.query(
func.count(Post.id)
).filter(Post.user_id == User.id).correlate(User).scalar_subquery()
users = db.session.query(User, subq.label('post_count')).all()
# 子查询作为 FROM
subq = db.session.query(
Post.user_id,
func.count(Post.id).label('post_count')
).group_by(Post.user_id).subquery()
results = db.session.query(User, subq.c.post_count).join(
subq, User.id == subq.c.user_id
).all()
# EXISTS 子查询
from sqlalchemy import exists
has_posts = db.session.query(
exists().where(Post.user_id == User.id)
).scalar_subquery()
users_with_posts = User.query.filter(has_posts).all()
第六章:数据库迁移
Flask-Migrate 安装配置
pip install flask-migrate
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
db = SQLAlchemy(app)
migrate = Migrate(app, db)
迁移命令
# 初始化迁移仓库
flask db init
# 生成迁移脚本
flask db migrate -m "Initial migration"
# 应用迁移
flask db upgrade
# 回滚迁移
flask db downgrade
# 查看迁移历史
flask db history
# 查看当前版本
flask db current
# 显示待应用的迁移
flask db show
迁移脚本示例
# migrations/versions/xxx_add_user_profile.py
"""add user profile
Revision ID: abc123
Revises: def456
Create Date: 2024-01-01 12:00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers
revision = 'abc123'
down_revision = 'def456'
branch_labels = None
depends_on = None
def upgrade():
# 添加列
op.add_column('users', sa.Column('bio', sa.Text(), nullable=True))
op.add_column('users', sa.Column('avatar', sa.String(200), nullable=True))
# 创建索引
op.create_index('ix_users_email', 'users', ['email'], unique=True)
# 创建表
op.create_table(
'profiles',
sa.Column('id', sa.Integer(), primary_key=True),
sa.Column('user_id', sa.Integer(), sa.ForeignKey('users.id')),
sa.Column('website', sa.String(200)),
)
def downgrade():
op.drop_table('profiles')
op.drop_index('ix_users_email', 'users')
op.drop_column('users', 'avatar')
op.drop_column('users', 'bio')
数据迁移
def upgrade():
# 结构变更
op.add_column('users', sa.Column('full_name', sa.String(100)))
# 数据迁移
from sqlalchemy.sql import table, column
users = table('users',
column('id', sa.Integer),
column('first_name', sa.String),
column('last_name', sa.String),
column('full_name', sa.String)
)
connection = op.get_bind()
connection.execute(
users.update().values(
full_name=users.c.first_name + ' ' + users.c.last_name
)
)
# 删除旧列
op.drop_column('users', 'first_name')
op.drop_column('users', 'last_name')
第七章:事务管理
基础事务
try:
user = User(username='alice', email='alice@example.com')
db.session.add(user)
post = Post(title='Hello', author=user)
db.session.add(post)
db.session.commit()
except Exception as e:
db.session.rollback()
raise e
嵌套事务(保存点)
from sqlalchemy import savepoint
# 创建保存点
savepoint = db.session.begin_nested()
try:
user = User(username='alice')
db.session.add(user)
# 内部事务
inner_savepoint = db.session.begin_nested()
try:
post = Post(title='Hello', user_id=user.id)
db.session.add(post)
inner_savepoint.commit()
except:
inner_savepoint.rollback()
# 继续外部事务
db.session.commit()
except:
db.session.rollback()
上下文管理器
from contextlib import contextmanager
@contextmanager
def transaction():
"""事务上下文管理器"""
try:
yield db.session
db.session.commit()
except Exception:
db.session.rollback()
raise
# 使用
with transaction():
user = User(username='alice')
db.session.add(user)
# 自动提交或回滚
并发控制
# 乐观锁(使用版本号)
class Product(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100))
quantity = db.Column(db.Integer)
version = db.Column(db.Integer, default=1)
__mapper_args__ = {
'version_id_col': version
}
# 更新时会自动检查版本
product = Product.query.get(1)
product.quantity -= 1
try:
db.session.commit()
except StaleDataError:
db.session.rollback()
# 处理并发冲突
# 悲观锁
product = Product.query.filter_by(id=1).with_for_update().first()
product.quantity -= 1
db.session.commit()
第八章:性能优化
查询优化
# 1. 只查询需要的列
users = db.session.query(User.id, User.username).all()
# 2. 使用 defer 延迟加载大字段
from sqlalchemy.orm import defer
users = User.query.options(defer(User.bio)).all()
# 3. 使用 load_only 只加载指定字段
from sqlalchemy.orm import load_only
users = User.query.options(load_only(User.id, User.username)).all()
# 4. 批量操作使用 bulk_insert_mappings
db.session.bulk_insert_mappings(User, [
{'username': 'user1', 'email': 'user1@example.com'},
{'username': 'user2', 'email': 'user2@example.com'},
])
# 5. 批量更新
db.session.bulk_update_mappings(User, [
{'id': 1, 'username': 'new_name1'},
{'id': 2, 'username': 'new_name2'},
])
# 6. 使用 yield_per 处理大量数据
for user in User.query.yield_per(100):
process(user)
# 7. 避免在循环中查询
# 不好
for post_id in post_ids:
post = Post.query.get(post_id)
# 好
posts = Post.query.filter(Post.id.in_(post_ids)).all()
索引优化
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, index=True)
email = db.Column(db.String(120), unique=True, index=True)
created_at = db.Column(db.DateTime, index=True)
# 复合索引
__table_args__ = (
db.Index('idx_user_status_created', 'status', 'created_at'),
)
连接池配置
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_size': 10, # 连接池大小
'pool_recycle': 3600, # 连接回收时间(秒)
'pool_pre_ping': True, # 使用前检查连接
'max_overflow': 20, # 超出池大小的最大连接数
}
查询缓存
from flask_caching import Cache
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
@cache.memoize(timeout=300)
def get_user_by_id(user_id):
return User.query.get(user_id)
@cache.cached(timeout=60, key_prefix='all_users')
def get_all_users():
return User.query.all()
# 清除缓存
cache.delete_memoized(get_user_by_id, user_id)
cache.delete('all_users')
第九章:模型最佳实践
基础模型类
from datetime import datetime
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class BaseModel(db.Model):
"""基础模型类"""
__abstract__ = True
id = db.Column(db.Integer, primary_key=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def save(self):
"""保存对象"""
db.session.add(self)
db.session.commit()
return self
def delete(self):
"""删除对象"""
db.session.delete(self)
db.session.commit()
@classmethod
def get_by_id(cls, id):
"""根据 ID 获取"""
return cls.query.get(id)
@classmethod
def get_or_404(cls, id):
"""获取或返回 404"""
return cls.query.get_or_404(id)
def to_dict(self):
"""转换为字典"""
return {
column.name: getattr(self, column.name)
for column in self.__table__.columns
}
class SoftDeleteMixin:
"""软删除混入类"""
deleted_at = db.Column(db.DateTime, nullable=True)
def soft_delete(self):
self.deleted_at = datetime.utcnow()
db.session.commit()
def restore(self):
self.deleted_at = None
db.session.commit()
@classmethod
def query_active(cls):
return cls.query.filter(cls.deleted_at.is_(None))
使用示例
class User(BaseModel, SoftDeleteMixin):
__tablename__ = 'users'
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(256))
role = db.Column(db.String(20), default='user')
posts = db.relationship('Post', backref='author', lazy='dynamic')
def __repr__(self):
return f'<User {self.username}>'
def to_dict(self, include_posts=False):
data = {
'id': self.id,
'username': self.username,
'email': self.email,
'role': self.role,
'created_at': self.created_at.isoformat(),
}
if include_posts:
data['posts'] = [post.to_dict() for post in self.posts]
return data
# 使用
user = User(username='alice', email='alice@example.com').save()
user.soft_delete()
active_users = User.query_active().all()
总结
本章详细介绍了 Flask 数据库操作:
- Flask-SQLAlchemy 配置:数据库连接、工厂模式
- 模型定义:字段类型、选项、约束
- 关系定义:一对多、多对多、自引用
- CRUD 操作:创建、查询、更新、删除
- 高级查询:过滤、排序、分页、聚合
- 关联查询:预加载、JOIN、子查询
- 数据库迁移:Flask-Migrate 使用
- 事务管理:基础事务、保存点、并发控制
- 性能优化:查询优化、索引、连接池、缓存
下一章我们将学习用户认证与安全。