实战项目案例
2026/3/20大约 14 分钟
实战项目案例
第一章:项目规划
项目概述
本章将构建一个完整的博客系统,涵盖 Flask 开发的核心知识点:
- 用户认证与授权
- RESTful API 设计
- 数据库设计与操作
- 文件上传处理
- 评论系统
- 搜索功能
- 缓存优化
- 后台管理
技术栈
后端框架:Flask 3.x
数据库:PostgreSQL + SQLAlchemy
缓存:Redis
任务队列:Celery
API 文档:Flask-RESTX
认证:Flask-Login + JWT
表单验证:Flask-WTF
前端:Jinja2 模板 + Bootstrap 5
部署:Docker + Nginx + Gunicorn
项目结构
blog/
├── app/
│ ├── __init__.py # 应用工厂
│ ├── extensions.py # 扩展初始化
│ ├── config.py # 配置
│ │
│ ├── auth/ # 认证模块
│ │ ├── __init__.py
│ │ ├── models.py
│ │ ├── views.py
│ │ ├── forms.py
│ │ └── templates/
│ │
│ ├── blog/ # 博客模块
│ │ ├── __init__.py
│ │ ├── models.py
│ │ ├── views.py
│ │ ├── forms.py
│ │ └── templates/
│ │
│ ├── api/ # API 模块
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ ├── posts.py
│ │ ├── comments.py
│ │ └── schemas.py
│ │
│ ├── admin/ # 管理后台
│ │ ├── __init__.py
│ │ └── views.py
│ │
│ ├── templates/ # 公共模板
│ │ ├── base.html
│ │ ├── macros/
│ │ └── errors/
│ │
│ └── static/ # 静态文件
│ ├── css/
│ ├── js/
│ └── uploads/
│
├── migrations/ # 数据库迁移
├── tests/ # 测试
├── docker/ # Docker 配置
├── requirements.txt
├── .env.example
└── run.py
第二章:核心模型设计
用户模型
# app/auth/models.py
from datetime import datetime
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
from app.extensions import db
class User(UserMixin, db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(256))
avatar = db.Column(db.String(200), default='default.png')
bio = db.Column(db.Text)
website = db.Column(db.String(200))
# 状态
is_active = db.Column(db.Boolean, default=True)
is_admin = db.Column(db.Boolean, default=False)
email_verified = db.Column(db.Boolean, default=False)
# 时间戳
created_at = db.Column(db.DateTime, default=datetime.utcnow)
last_login = db.Column(db.DateTime)
# 关系
posts = db.relationship('Post', backref='author', lazy='dynamic',
cascade='all, delete-orphan')
comments = db.relationship('Comment', backref='author', lazy='dynamic',
cascade='all, delete-orphan')
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def get_avatar_url(self, size=100):
if self.avatar and self.avatar != 'default.png':
return f'/static/uploads/avatars/{self.avatar}'
# Gravatar 备用
import hashlib
hash = hashlib.md5(self.email.lower().encode()).hexdigest()
return f'https://www.gravatar.com/avatar/{hash}?s={size}&d=identicon'
def to_dict(self, include_email=False):
data = {
'id': self.id,
'username': self.username,
'avatar': self.get_avatar_url(),
'bio': self.bio,
'website': self.website,
'posts_count': self.posts.count(),
'created_at': self.created_at.isoformat()
}
if include_email:
data['email'] = self.email
return data
def __repr__(self):
return f'<User {self.username}>'
class Follow(db.Model):
"""关注关系"""
__tablename__ = 'follows'
follower_id = db.Column(db.Integer, db.ForeignKey('users.id'), primary_key=True)
followed_id = db.Column(db.Integer, db.ForeignKey('users.id'), primary_key=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
follower = db.relationship('User', foreign_keys=[follower_id],
backref=db.backref('following', lazy='dynamic'))
followed = db.relationship('User', foreign_keys=[followed_id],
backref=db.backref('followers', lazy='dynamic'))
文章模型
# app/blog/models.py
from datetime import datetime
from slugify import slugify
from app.extensions import db
# 标签关联表
post_tags = db.Table('post_tags',
db.Column('post_id', db.Integer, db.ForeignKey('posts.id'), primary_key=True),
db.Column('tag_id', db.Integer, db.ForeignKey('tags.id'), primary_key=True)
)
class Category(db.Model):
__tablename__ = 'categories'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, nullable=False)
slug = db.Column(db.String(60), unique=True, nullable=False)
description = db.Column(db.Text)
posts = db.relationship('Post', backref='category', lazy='dynamic')
def __init__(self, **kwargs):
super().__init__(**kwargs)
if self.name and not self.slug:
self.slug = slugify(self.name)
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'slug': self.slug,
'description': self.description,
'posts_count': self.posts.count()
}
class Tag(db.Model):
__tablename__ = 'tags'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, nullable=False)
slug = db.Column(db.String(60), unique=True, nullable=False)
def __init__(self, **kwargs):
super().__init__(**kwargs)
if self.name and not self.slug:
self.slug = slugify(self.name)
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'slug': self.slug
}
class Post(db.Model):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
slug = db.Column(db.String(220), unique=True, nullable=False, index=True)
summary = db.Column(db.Text)
content = db.Column(db.Text, nullable=False)
cover_image = db.Column(db.String(200))
# 状态
is_published = db.Column(db.Boolean, default=False)
is_featured = db.Column(db.Boolean, default=False)
allow_comments = db.Column(db.Boolean, default=True)
# 统计
views_count = db.Column(db.Integer, default=0)
# 时间戳
created_at = db.Column(db.DateTime, default=datetime.utcnow, index=True)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
published_at = db.Column(db.DateTime)
# 外键
author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
category_id = db.Column(db.Integer, db.ForeignKey('categories.id'))
# 关系
tags = db.relationship('Tag', secondary=post_tags, backref='posts')
comments = db.relationship('Comment', backref='post', lazy='dynamic',
cascade='all, delete-orphan')
def __init__(self, **kwargs):
super().__init__(**kwargs)
if self.title and not self.slug:
self.generate_slug()
def generate_slug(self):
base_slug = slugify(self.title)
slug = base_slug
counter = 1
while Post.query.filter_by(slug=slug).first():
slug = f'{base_slug}-{counter}'
counter += 1
self.slug = slug
def publish(self):
self.is_published = True
self.published_at = datetime.utcnow()
def increment_views(self):
self.views_count += 1
db.session.commit()
def to_dict(self, include_content=True, include_author=True):
data = {
'id': self.id,
'title': self.title,
'slug': self.slug,
'summary': self.summary,
'cover_image': self.cover_image,
'is_published': self.is_published,
'is_featured': self.is_featured,
'views_count': self.views_count,
'comments_count': self.comments.count(),
'created_at': self.created_at.isoformat(),
'published_at': self.published_at.isoformat() if self.published_at else None,
'category': self.category.to_dict() if self.category else None,
'tags': [tag.to_dict() for tag in self.tags]
}
if include_content:
data['content'] = self.content
if include_author:
data['author'] = self.author.to_dict()
return data
def __repr__(self):
return f'<Post {self.title}>'
class Comment(db.Model):
__tablename__ = 'comments'
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text, nullable=False)
is_approved = db.Column(db.Boolean, default=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# 外键
post_id = db.Column(db.Integer, db.ForeignKey('posts.id'), nullable=False)
author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
parent_id = db.Column(db.Integer, db.ForeignKey('comments.id'))
# 回复关系
replies = db.relationship('Comment', backref=db.backref('parent', remote_side=[id]),
lazy='dynamic')
def to_dict(self, include_replies=False):
data = {
'id': self.id,
'content': self.content,
'created_at': self.created_at.isoformat(),
'author': self.author.to_dict(),
'replies_count': self.replies.count()
}
if include_replies:
data['replies'] = [r.to_dict() for r in self.replies.all()]
return data
第三章:应用工厂与扩展
扩展初始化
# app/extensions.py
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_login import LoginManager
from flask_mail import Mail
from flask_caching import Cache
from flask_wtf.csrf import CSRFProtect
db = SQLAlchemy()
migrate = Migrate()
login_manager = LoginManager()
mail = Mail()
cache = Cache()
csrf = CSRFProtect()
# 配置 LoginManager
login_manager.login_view = 'auth.login'
login_manager.login_message = '请先登录'
login_manager.login_message_category = 'warning'
@login_manager.user_loader
def load_user(user_id):
from app.auth.models import User
return User.query.get(int(user_id))
应用工厂
# app/__init__.py
import os
from flask import Flask
from app.extensions import db, migrate, login_manager, mail, cache, csrf
from app.config import config
def create_app(config_name=None):
if config_name is None:
config_name = os.environ.get('FLASK_CONFIG', 'default')
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
# 初始化扩展
register_extensions(app)
# 注册蓝图
register_blueprints(app)
# 注册错误处理
register_error_handlers(app)
# 注册模板过滤器
register_template_filters(app)
# 注册 CLI 命令
register_commands(app)
return app
def register_extensions(app):
db.init_app(app)
migrate.init_app(app, db)
login_manager.init_app(app)
mail.init_app(app)
cache.init_app(app)
csrf.init_app(app)
def register_blueprints(app):
from app.auth import auth_bp
from app.blog import blog_bp
from app.api import api_bp
from app.admin import admin_bp
app.register_blueprint(auth_bp)
app.register_blueprint(blog_bp)
app.register_blueprint(api_bp, url_prefix='/api/v1')
app.register_blueprint(admin_bp, url_prefix='/admin')
def register_error_handlers(app):
from flask import render_template, jsonify, request
@app.errorhandler(404)
def not_found(error):
if request.path.startswith('/api/'):
return jsonify({'error': 'Not found'}), 404
return render_template('errors/404.html'), 404
@app.errorhandler(500)
def internal_error(error):
db.session.rollback()
if request.path.startswith('/api/'):
return jsonify({'error': 'Internal server error'}), 500
return render_template('errors/500.html'), 500
def register_template_filters(app):
from datetime import datetime
@app.template_filter('time_ago')
def time_ago_filter(dt):
if not dt:
return ''
now = datetime.utcnow()
diff = now - dt
if diff.days > 365:
return f'{diff.days // 365} 年前'
if diff.days > 30:
return f'{diff.days // 30} 个月前'
if diff.days > 0:
return f'{diff.days} 天前'
if diff.seconds > 3600:
return f'{diff.seconds // 3600} 小时前'
if diff.seconds > 60:
return f'{diff.seconds // 60} 分钟前'
return '刚刚'
@app.template_filter('markdown')
def markdown_filter(text):
import markdown
return markdown.markdown(text, extensions=['fenced_code', 'tables'])
def register_commands(app):
import click
@app.cli.command()
@click.option('--drop', is_flag=True, help='Drop all tables first')
def init_db(drop):
"""初始化数据库"""
if drop:
click.confirm('This will delete all data. Continue?', abort=True)
db.drop_all()
click.echo('Dropped all tables.')
db.create_all()
click.echo('Initialized database.')
@app.cli.command()
@click.argument('username')
@click.argument('email')
@click.password_option()
def create_admin(username, email, password):
"""创建管理员"""
from app.auth.models import User
user = User(username=username, email=email, is_admin=True)
user.set_password(password)
db.session.add(user)
db.session.commit()
click.echo(f'Admin user {username} created.')
@app.cli.command()
def seed():
"""填充测试数据"""
from tests.factories import UserFactory, PostFactory, CategoryFactory
categories = CategoryFactory.create_batch(5)
users = UserFactory.create_batch(10)
for user in users:
PostFactory.create_batch(5, author=user)
click.echo('Test data seeded.')
第四章:认证系统
认证表单
# app/auth/forms.py
from flask_wtf import FlaskForm
from flask_wtf.file import FileField, FileAllowed
from wtforms import StringField, PasswordField, BooleanField, TextAreaField, SubmitField
from wtforms.validators import DataRequired, Email, Length, EqualTo, ValidationError, Optional
from app.auth.models import User
class LoginForm(FlaskForm):
username = StringField('用户名', validators=[DataRequired()])
password = PasswordField('密码', validators=[DataRequired()])
remember = BooleanField('记住我')
submit = SubmitField('登录')
class RegistrationForm(FlaskForm):
username = StringField('用户名', validators=[
DataRequired(),
Length(3, 20, message='用户名长度需要 3-20 个字符')
])
email = StringField('邮箱', validators=[DataRequired(), Email()])
password = PasswordField('密码', validators=[
DataRequired(),
Length(min=8, message='密码至少 8 个字符')
])
confirm_password = PasswordField('确认密码', validators=[
DataRequired(),
EqualTo('password', message='两次密码不一致')
])
submit = SubmitField('注册')
def validate_username(self, field):
if User.query.filter_by(username=field.data).first():
raise ValidationError('用户名已存在')
def validate_email(self, field):
if User.query.filter_by(email=field.data.lower()).first():
raise ValidationError('邮箱已注册')
class ProfileForm(FlaskForm):
username = StringField('用户名', validators=[DataRequired(), Length(3, 20)])
email = StringField('邮箱', validators=[DataRequired(), Email()])
bio = TextAreaField('个人简介', validators=[Optional(), Length(max=500)])
website = StringField('网站', validators=[Optional(), Length(max=200)])
avatar = FileField('头像', validators=[
Optional(),
FileAllowed(['jpg', 'png', 'gif'], '只支持图片格式')
])
submit = SubmitField('保存')
def __init__(self, user, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user = user
def validate_username(self, field):
if field.data != self.user.username:
if User.query.filter_by(username=field.data).first():
raise ValidationError('用户名已存在')
def validate_email(self, field):
if field.data.lower() != self.user.email:
if User.query.filter_by(email=field.data.lower()).first():
raise ValidationError('邮箱已注册')
认证视图
# app/auth/views.py
from flask import Blueprint, render_template, redirect, url_for, flash, request
from flask_login import login_user, logout_user, login_required, current_user
from datetime import datetime
from app.extensions import db
from app.auth.models import User
from app.auth.forms import LoginForm, RegistrationForm, ProfileForm
from app.utils.helpers import save_avatar
auth_bp = Blueprint('auth', __name__, template_folder='templates')
@auth_bp.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('blog.index'))
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user is None or not user.check_password(form.password.data):
flash('用户名或密码错误', 'error')
return redirect(url_for('auth.login'))
if not user.is_active:
flash('账户已被禁用', 'error')
return redirect(url_for('auth.login'))
login_user(user, remember=form.remember.data)
user.last_login = datetime.utcnow()
db.session.commit()
next_page = request.args.get('next')
if next_page and is_safe_url(next_page):
return redirect(next_page)
flash('登录成功!', 'success')
return redirect(url_for('blog.index'))
return render_template('auth/login.html', form=form)
@auth_bp.route('/register', methods=['GET', 'POST'])
def register():
if current_user.is_authenticated:
return redirect(url_for('blog.index'))
form = RegistrationForm()
if form.validate_on_submit():
user = User(
username=form.username.data,
email=form.email.data.lower()
)
user.set_password(form.password.data)
db.session.add(user)
db.session.commit()
flash('注册成功,请登录!', 'success')
return redirect(url_for('auth.login'))
return render_template('auth/register.html', form=form)
@auth_bp.route('/logout')
@login_required
def logout():
logout_user()
flash('已退出登录', 'info')
return redirect(url_for('blog.index'))
@auth_bp.route('/profile', methods=['GET', 'POST'])
@login_required
def profile():
form = ProfileForm(current_user)
if form.validate_on_submit():
current_user.username = form.username.data
current_user.email = form.email.data.lower()
current_user.bio = form.bio.data
current_user.website = form.website.data
if form.avatar.data:
filename = save_avatar(form.avatar.data)
current_user.avatar = filename
db.session.commit()
flash('个人资料已更新', 'success')
return redirect(url_for('auth.profile'))
form.username.data = current_user.username
form.email.data = current_user.email
form.bio.data = current_user.bio
form.website.data = current_user.website
return render_template('auth/profile.html', form=form)
def is_safe_url(target):
from urllib.parse import urlparse, urljoin
from flask import request
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc
第五章:博客核心功能
博客视图
# app/blog/views.py
from flask import Blueprint, render_template, redirect, url_for, flash, request, abort
from flask_login import login_required, current_user
from app.extensions import db, cache
from app.blog.models import Post, Category, Tag, Comment
from app.blog.forms import PostForm, CommentForm
blog_bp = Blueprint('blog', __name__, template_folder='templates')
@blog_bp.route('/')
@cache.cached(timeout=60, query_string=True)
def index():
page = request.args.get('page', 1, type=int)
per_page = 10
query = Post.query.filter_by(is_published=True)
# 分类过滤
category_slug = request.args.get('category')
if category_slug:
category = Category.query.filter_by(slug=category_slug).first_or_404()
query = query.filter_by(category=category)
# 标签过滤
tag_slug = request.args.get('tag')
if tag_slug:
tag = Tag.query.filter_by(slug=tag_slug).first_or_404()
query = query.filter(Post.tags.contains(tag))
# 搜索
search = request.args.get('q')
if search:
query = query.filter(
db.or_(
Post.title.ilike(f'%{search}%'),
Post.content.ilike(f'%{search}%')
)
)
pagination = query.order_by(Post.published_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return render_template('blog/index.html',
posts=pagination.items,
pagination=pagination,
categories=Category.query.all())
@blog_bp.route('/post/<slug>')
def post_detail(slug):
post = Post.query.filter_by(slug=slug).first_or_404()
if not post.is_published and (not current_user.is_authenticated or
post.author != current_user):
abort(404)
# 增加浏览量
post.increment_views()
# 评论表单
form = CommentForm()
# 获取评论
comments = post.comments.filter_by(parent_id=None, is_approved=True)\
.order_by(Comment.created_at.asc()).all()
# 相关文章
related_posts = Post.query.filter(
Post.id != post.id,
Post.is_published == True,
Post.category_id == post.category_id
).limit(3).all()
return render_template('blog/post_detail.html',
post=post,
form=form,
comments=comments,
related_posts=related_posts)
@blog_bp.route('/post/new', methods=['GET', 'POST'])
@login_required
def create_post():
form = PostForm()
if form.validate_on_submit():
post = Post(
title=form.title.data,
content=form.content.data,
summary=form.summary.data,
category_id=form.category.data,
author=current_user
)
# 处理标签
for tag_name in form.tags.data.split(','):
tag_name = tag_name.strip()
if tag_name:
tag = Tag.query.filter_by(name=tag_name).first()
if not tag:
tag = Tag(name=tag_name)
post.tags.append(tag)
# 封面图片
if form.cover_image.data:
from app.utils.helpers import save_image
post.cover_image = save_image(form.cover_image.data, 'covers')
if form.publish.data:
post.publish()
db.session.add(post)
db.session.commit()
# 清除缓存
cache.delete_memoized(index)
flash('文章已保存', 'success')
return redirect(url_for('blog.post_detail', slug=post.slug))
return render_template('blog/post_form.html', form=form)
@blog_bp.route('/post/<slug>/edit', methods=['GET', 'POST'])
@login_required
def edit_post(slug):
post = Post.query.filter_by(slug=slug).first_or_404()
if post.author != current_user and not current_user.is_admin:
abort(403)
form = PostForm()
if form.validate_on_submit():
post.title = form.title.data
post.content = form.content.data
post.summary = form.summary.data
post.category_id = form.category.data
# 更新标签
post.tags.clear()
for tag_name in form.tags.data.split(','):
tag_name = tag_name.strip()
if tag_name:
tag = Tag.query.filter_by(name=tag_name).first()
if not tag:
tag = Tag(name=tag_name)
post.tags.append(tag)
if form.cover_image.data:
from app.utils.helpers import save_image
post.cover_image = save_image(form.cover_image.data, 'covers')
if form.publish.data and not post.is_published:
post.publish()
db.session.commit()
cache.delete_memoized(index)
flash('文章已更新', 'success')
return redirect(url_for('blog.post_detail', slug=post.slug))
form.title.data = post.title
form.content.data = post.content
form.summary.data = post.summary
form.category.data = post.category_id
form.tags.data = ', '.join([t.name for t in post.tags])
return render_template('blog/post_form.html', form=form, post=post)
@blog_bp.route('/post/<slug>/comment', methods=['POST'])
@login_required
def add_comment(slug):
post = Post.query.filter_by(slug=slug).first_or_404()
if not post.allow_comments:
flash('该文章不允许评论', 'warning')
return redirect(url_for('blog.post_detail', slug=slug))
form = CommentForm()
if form.validate_on_submit():
comment = Comment(
content=form.content.data,
post=post,
author=current_user,
parent_id=form.parent_id.data or None
)
db.session.add(comment)
db.session.commit()
flash('评论已发布', 'success')
return redirect(url_for('blog.post_detail', slug=slug))
@blog_bp.route('/post/<slug>/delete', methods=['POST'])
@login_required
def delete_post(slug):
post = Post.query.filter_by(slug=slug).first_or_404()
if post.author != current_user and not current_user.is_admin:
abort(403)
db.session.delete(post)
db.session.commit()
cache.delete_memoized(index)
flash('文章已删除', 'success')
return redirect(url_for('blog.index'))
第六章:RESTful API
API 设计
# app/api/__init__.py
from flask import Blueprint
from flask_restx import Api
api_bp = Blueprint('api', __name__)
api = Api(
api_bp,
version='1.0',
title='Blog API',
description='博客系统 RESTful API',
doc='/docs'
)
from app.api import auth, posts, comments
文章 API
# app/api/posts.py
from flask import request, g
from flask_restx import Namespace, Resource, fields
from app.extensions import db
from app.blog.models import Post, Category, Tag
from app.api.auth import token_required
ns = Namespace('posts', description='文章相关接口')
# 模型定义
post_model = ns.model('Post', {
'id': fields.Integer(readonly=True),
'title': fields.String(required=True),
'slug': fields.String(readonly=True),
'summary': fields.String(),
'content': fields.String(required=True),
'is_published': fields.Boolean(default=False),
'views_count': fields.Integer(readonly=True),
'created_at': fields.DateTime(readonly=True),
'author': fields.Nested(ns.model('Author', {
'id': fields.Integer(),
'username': fields.String(),
'avatar': fields.String()
})),
'category': fields.Nested(ns.model('Category', {
'id': fields.Integer(),
'name': fields.String(),
'slug': fields.String()
})),
'tags': fields.List(fields.Nested(ns.model('Tag', {
'id': fields.Integer(),
'name': fields.String()
})))
})
post_input_model = ns.model('PostInput', {
'title': fields.String(required=True),
'content': fields.String(required=True),
'summary': fields.String(),
'category_id': fields.Integer(),
'tags': fields.List(fields.String()),
'is_published': fields.Boolean(default=False)
})
@ns.route('/')
class PostList(Resource):
@ns.doc('获取文章列表')
@ns.param('page', '页码', type=int, default=1)
@ns.param('per_page', '每页数量', type=int, default=20)
@ns.param('category', '分类 slug')
@ns.param('tag', '标签 slug')
@ns.param('q', '搜索关键词')
def get(self):
"""获取文章列表"""
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 20, type=int), 100)
query = Post.query.filter_by(is_published=True)
# 分类过滤
category = request.args.get('category')
if category:
cat = Category.query.filter_by(slug=category).first()
if cat:
query = query.filter_by(category=cat)
# 标签过滤
tag = request.args.get('tag')
if tag:
t = Tag.query.filter_by(slug=tag).first()
if t:
query = query.filter(Post.tags.contains(t))
# 搜索
q = request.args.get('q')
if q:
query = query.filter(
db.or_(
Post.title.ilike(f'%{q}%'),
Post.content.ilike(f'%{q}%')
)
)
pagination = query.order_by(Post.published_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return {
'posts': [p.to_dict(include_content=False) for p in pagination.items],
'pagination': {
'page': pagination.page,
'per_page': pagination.per_page,
'total': pagination.total,
'pages': pagination.pages
}
}
@ns.doc('创建文章')
@ns.expect(post_input_model)
@ns.marshal_with(post_model, code=201)
@token_required
def post(self):
"""创建新文章"""
data = request.json
post = Post(
title=data['title'],
content=data['content'],
summary=data.get('summary'),
category_id=data.get('category_id'),
author=g.current_user
)
# 处理标签
for tag_name in data.get('tags', []):
tag = Tag.query.filter_by(name=tag_name).first()
if not tag:
tag = Tag(name=tag_name)
post.tags.append(tag)
if data.get('is_published'):
post.publish()
db.session.add(post)
db.session.commit()
return post.to_dict(), 201
@ns.route('/<slug>')
@ns.param('slug', '文章 slug')
class PostResource(Resource):
@ns.doc('获取文章详情')
@ns.marshal_with(post_model)
def get(self, slug):
"""获取文章详情"""
post = Post.query.filter_by(slug=slug, is_published=True).first_or_404()
post.increment_views()
return post.to_dict()
@ns.doc('更新文章')
@ns.expect(post_input_model)
@ns.marshal_with(post_model)
@token_required
def put(self, slug):
"""更新文章"""
post = Post.query.filter_by(slug=slug).first_or_404()
if post.author != g.current_user and not g.current_user.is_admin:
ns.abort(403, '无权操作')
data = request.json
post.title = data.get('title', post.title)
post.content = data.get('content', post.content)
post.summary = data.get('summary', post.summary)
post.category_id = data.get('category_id', post.category_id)
# 更新标签
if 'tags' in data:
post.tags.clear()
for tag_name in data['tags']:
tag = Tag.query.filter_by(name=tag_name).first()
if not tag:
tag = Tag(name=tag_name)
post.tags.append(tag)
if data.get('is_published') and not post.is_published:
post.publish()
db.session.commit()
return post.to_dict()
@ns.doc('删除文章')
@token_required
def delete(self, slug):
"""删除文章"""
post = Post.query.filter_by(slug=slug).first_or_404()
if post.author != g.current_user and not g.current_user.is_admin:
ns.abort(403, '无权操作')
db.session.delete(post)
db.session.commit()
return '', 204
第七章:模板系统
基础模板
<!-- app/templates/base.html -->
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>{% block title %}{% endblock %} - 我的博客</title>
<link
href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css"
rel="stylesheet"
/>
<link
href="{{ url_for('static', filename='css/style.css') }}"
rel="stylesheet"
/>
{% block styles %}{% endblock %}
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-dark bg-dark">
<div class="container">
<a class="navbar-brand" href="{{ url_for('blog.index') }}">我的博客</a>
<button
class="navbar-toggler"
type="button"
data-bs-toggle="collapse"
data-bs-target="#navbarNav"
>
<span class="navbar-toggler-icon"></span>
</button>
<div class="collapse navbar-collapse" id="navbarNav">
<ul class="navbar-nav me-auto">
<li class="nav-item">
<a class="nav-link" href="{{ url_for('blog.index') }}">首页</a>
</li>
{% for category in categories %}
<li class="nav-item">
<a
class="nav-link"
href="{{ url_for('blog.index', category=category.slug) }}"
>
{{ category.name }}
</a>
</li>
{% endfor %}
</ul>
<form
class="d-flex me-3"
action="{{ url_for('blog.index') }}"
method="get"
>
<input
class="form-control me-2"
type="search"
name="q"
placeholder="搜索..."
/>
<button class="btn btn-outline-light" type="submit">搜索</button>
</form>
<ul class="navbar-nav">
{% if current_user.is_authenticated %}
<li class="nav-item dropdown">
<a
class="nav-link dropdown-toggle"
href="#"
data-bs-toggle="dropdown"
>
<img
src="{{ current_user.get_avatar_url(32) }}"
class="rounded-circle"
width="24"
/>
{{ current_user.username }}
</a>
<ul class="dropdown-menu dropdown-menu-end">
<li>
<a class="dropdown-item" href="{{ url_for('auth.profile') }}"
>个人资料</a
>
</li>
<li>
<a
class="dropdown-item"
href="{{ url_for('blog.create_post') }}"
>写文章</a
>
</li>
{% if current_user.is_admin %}
<li><hr class="dropdown-divider" /></li>
<li>
<a class="dropdown-item" href="{{ url_for('admin.index') }}"
>管理后台</a
>
</li>
{% endif %}
<li><hr class="dropdown-divider" /></li>
<li>
<a class="dropdown-item" href="{{ url_for('auth.logout') }}"
>退出</a
>
</li>
</ul>
</li>
{% else %}
<li class="nav-item">
<a class="nav-link" href="{{ url_for('auth.login') }}">登录</a>
</li>
<li class="nav-item">
<a class="nav-link" href="{{ url_for('auth.register') }}">注册</a>
</li>
{% endif %}
</ul>
</div>
</div>
</nav>
<main class="container py-4">
{% with messages = get_flashed_messages(with_categories=true) %} {% if
messages %} {% for category, message in messages %}
<div class="alert alert-{{ category }} alert-dismissible fade show">
{{ message }}
<button
type="button"
class="btn-close"
data-bs-dismiss="alert"
></button>
</div>
{% endfor %} {% endif %} {% endwith %} {% block content %}{% endblock %}
</main>
<footer class="bg-dark text-light py-4 mt-5">
<div class="container text-center">
<p>© {{ now().year }} 我的博客. All rights reserved.</p>
</div>
</footer>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/js/bootstrap.bundle.min.js"></script>
{% block scripts %}{% endblock %}
</body>
</html>
文章列表模板
<!-- app/templates/blog/index.html -->
{% extends 'base.html' %} {% block title %}首页{% endblock %} {% block content
%}
<div class="row">
<div class="col-lg-8">
{% if posts %} {% for post in posts %}
<article class="card mb-4">
{% if post.cover_image %}
<img
src="{{ url_for('static', filename='uploads/covers/' + post.cover_image) }}"
class="card-img-top"
alt="{{ post.title }}"
/>
{% endif %}
<div class="card-body">
<div class="d-flex align-items-center mb-2">
<img
src="{{ post.author.get_avatar_url(32) }}"
class="rounded-circle me-2"
width="32"
/>
<span class="text-muted">{{ post.author.username }}</span>
<span class="text-muted mx-2">·</span>
<span class="text-muted">{{ post.published_at | time_ago }}</span>
{% if post.category %}
<span class="badge bg-primary ms-2">{{ post.category.name }}</span>
{% endif %}
</div>
<h2 class="card-title">
<a
href="{{ url_for('blog.post_detail', slug=post.slug) }}"
class="text-decoration-none"
>
{{ post.title }}
</a>
</h2>
<p class="card-text text-muted">
{{ post.summary or post.content[:200] }}...
</p>
<div class="d-flex justify-content-between align-items-center">
<div class="tags">
{% for tag in post.tags %}
<a
href="{{ url_for('blog.index', tag=tag.slug) }}"
class="badge bg-secondary text-decoration-none"
>
{{ tag.name }}
</a>
{% endfor %}
</div>
<div class="stats text-muted">
<span>{{ post.views_count }} 阅读</span>
<span class="mx-2">·</span>
<span>{{ post.comments.count() }} 评论</span>
</div>
</div>
</div>
</article>
{% endfor %}
<!-- 分页 -->
{% if pagination.pages > 1 %}
<nav>
<ul class="pagination justify-content-center">
{% if pagination.has_prev %}
<li class="page-item">
<a
class="page-link"
href="{{ url_for('blog.index', page=pagination.prev_num) }}"
>上一页</a
>
</li>
{% endif %} {% for page in pagination.iter_pages() %} {% if page %}
<li class="page-item {{ 'active' if page == pagination.page else '' }}">
<a class="page-link" href="{{ url_for('blog.index', page=page) }}"
>{{ page }}</a
>
</li>
{% else %}
<li class="page-item disabled"><span class="page-link">...</span></li>
{% endif %} {% endfor %} {% if pagination.has_next %}
<li class="page-item">
<a
class="page-link"
href="{{ url_for('blog.index', page=pagination.next_num) }}"
>下一页</a
>
</li>
{% endif %}
</ul>
</nav>
{% endif %} {% else %}
<div class="text-center text-muted py-5">
<h4>暂无文章</h4>
</div>
{% endif %}
</div>
<!-- 侧边栏 -->
<div class="col-lg-4">
<div class="card mb-4">
<div class="card-header">分类</div>
<ul class="list-group list-group-flush">
{% for category in categories %}
<li class="list-group-item d-flex justify-content-between">
<a href="{{ url_for('blog.index', category=category.slug) }}"
>{{ category.name }}</a
>
<span class="badge bg-primary">{{ category.posts.count() }}</span>
</li>
{% endfor %}
</ul>
</div>
</div>
</div>
{% endblock %}
第八章:测试与部署
测试配置
# tests/conftest.py
import pytest
from app import create_app, db
from app.auth.models import User
@pytest.fixture
def app():
app = create_app('testing')
with app.app_context():
db.create_all()
yield app
db.session.remove()
db.drop_all()
@pytest.fixture
def client(app):
return app.test_client()
@pytest.fixture
def auth_client(app, client):
with app.app_context():
user = User(username='testuser', email='test@example.com')
user.set_password('password123')
db.session.add(user)
db.session.commit()
client.post('/login', data={
'username': 'testuser',
'password': 'password123'
})
return client
Docker 部署配置
# docker-compose.yml
version: "3.8"
services:
web:
build: .
ports:
- "8000:8000"
environment:
- FLASK_ENV=production
- DATABASE_URL=postgresql://postgres:password@db:5432/blog
- REDIS_URL=redis://redis:6379/0
- SECRET_KEY=${SECRET_KEY}
depends_on:
- db
- redis
volumes:
- uploads:/app/static/uploads
restart: unless-stopped
db:
image: postgres:15
environment:
- POSTGRES_DB=blog
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
redis:
image: redis:7-alpine
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/conf.d/default.conf
- ./static:/var/www/static
depends_on:
- web
restart: unless-stopped
celery:
build: .
command: celery -A app.celery worker --loglevel=info
environment:
- DATABASE_URL=postgresql://postgres:password@db:5432/blog
- REDIS_URL=redis://redis:6379/0
depends_on:
- db
- redis
restart: unless-stopped
volumes:
postgres_data:
uploads:
总结
本章通过一个完整的博客系统案例,综合应用了 Flask 开发的核心知识:
- 项目规划:技术选型、目录结构设计
- 模型设计:用户、文章、评论、分类、标签
- 应用工厂:扩展初始化、蓝图注册
- 认证系统:登录、注册、会话管理
- 博客功能:CRUD、分页、搜索、缓存
- RESTful API:Flask-RESTX、认证、文档
- 模板系统:继承、组件化、响应式
- 测试部署:pytest、Docker、Nginx
通过这个实战项目,你应该已经掌握了 Flask Web 开发的完整流程。继续深入学习和实践,你将成为一名优秀的 Flask 开发者!