Django 缓存与性能优化
2026/3/20大约 7 分钟
Django 缓存与性能优化
一、缓存系统
1.1 缓存配置
# settings.py
# 内存缓存(开发环境)
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'LOCATION': 'unique-snowflake',
}
}
# 文件缓存
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.filebased.FileBasedCache',
'LOCATION': '/var/tmp/django_cache',
}
}
# Memcached
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.memcached.PyMemcacheCache',
'LOCATION': '127.0.0.1:11211',
}
}
# Redis(推荐生产环境)
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379',
'OPTIONS': {
'db': 1,
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
}
},
# 可配置多个缓存
'session': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/2',
}
}
# 数据库缓存
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.db.DatabaseCache',
'LOCATION': 'cache_table',
}
}
# 需要创建缓存表:python manage.py createcachetable
1.2 缓存 API
from django.core.cache import cache, caches
# 基本操作
cache.set('key', 'value', timeout=300) # 设置缓存,5分钟过期
value = cache.get('key') # 获取缓存
value = cache.get('key', 'default') # 带默认值
cache.delete('key') # 删除缓存
cache.clear() # 清空所有缓存
# 批量操作
cache.set_many({'a': 1, 'b': 2, 'c': 3})
values = cache.get_many(['a', 'b', 'c'])
cache.delete_many(['a', 'b', 'c'])
# 原子操作
cache.add('key', 'value') # 只在 key 不存在时设置
cache.incr('counter') # 自增
cache.decr('counter') # 自减
cache.touch('key', 300) # 更新过期时间
# 获取或设置
def get_expensive_data():
# 耗时计算
return computed_value
value = cache.get_or_set('key', get_expensive_data, timeout=300)
# 使用不同缓存
session_cache = caches['session']
session_cache.set('user_session', data)
# 永不过期
cache.set('key', 'value', timeout=None)
1.3 视图缓存
from django.views.decorators.cache import cache_page, cache_control
from django.utils.decorators import method_decorator
# 函数视图缓存
@cache_page(60 * 15) # 缓存15分钟
def article_list(request):
articles = Article.objects.all()
return render(request, 'articles.html', {'articles': articles})
# 类视图缓存
class ArticleListView(ListView):
model = Article
@method_decorator(cache_page(60 * 15))
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
# URL 中配置缓存
from django.views.decorators.cache import cache_page
urlpatterns = [
path('articles/', cache_page(60 * 15)(views.article_list)),
]
# 缓存控制头
@cache_control(private=True, max_age=3600)
def my_view(request):
pass
# 不缓存
from django.views.decorators.cache import never_cache
@never_cache
def sensitive_view(request):
pass
1.4 模板片段缓存
{% load cache %}
<!-- 基本片段缓存 -->
{% cache 500 sidebar %}
<div class="sidebar">
{% for category in categories %}
<a href="{{ category.url }}">{{ category.name }}</a>
{% endfor %}
</div>
{% endcache %}
<!-- 带变量的缓存键 -->
{% cache 300 article_detail article.id %}
<article>
<h1>{{ article.title }}</h1>
<p>{{ article.content }}</p>
</article>
{% endcache %}
<!-- 用户相关缓存 -->
{% cache 600 user_profile request.user.id %}
<div class="profile">{{ request.user.profile.bio }}</div>
{% endcache %}
<!-- 带语言的缓存 -->
{% cache 300 menu LANGUAGE_CODE %}
<nav>
{% for item in menu_items %}
<a href="{{ item.url }}">{{ item.title }}</a>
{% endfor %}
</nav>
{% endcache %}
1.5 低级缓存
from django.core.cache import cache
def get_article_list():
"""带缓存的文章列表"""
cache_key = 'article_list'
articles = cache.get(cache_key)
if articles is None:
articles = list(Article.objects.filter(
status='published'
).select_related('author', 'category'))
cache.set(cache_key, articles, 60 * 15)
return articles
def get_article_detail(pk):
"""带缓存的文章详情"""
cache_key = f'article_{pk}'
article = cache.get(cache_key)
if article is None:
try:
article = Article.objects.select_related(
'author', 'category'
).prefetch_related('tags').get(pk=pk)
cache.set(cache_key, article, 60 * 15)
except Article.DoesNotExist:
return None
return article
def invalidate_article_cache(article_id):
"""清除文章缓存"""
cache.delete(f'article_{article_id}')
cache.delete('article_list')
cache.delete(f'author_{article.author_id}_articles')
1.6 缓存装饰器
from functools import wraps
from django.core.cache import cache
import hashlib
def cache_result(timeout=300, key_prefix=''):
"""自定义缓存装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
key_parts = [key_prefix, func.__name__]
key_parts.extend(str(arg) for arg in args)
key_parts.extend(f'{k}={v}' for k, v in sorted(kwargs.items()))
cache_key = hashlib.md5(':'.join(key_parts).encode()).hexdigest()
result = cache.get(cache_key)
if result is None:
result = func(*args, **kwargs)
cache.set(cache_key, result, timeout)
return result
return wrapper
return decorator
@cache_result(timeout=600, key_prefix='article')
def get_popular_articles(limit=10):
return list(Article.objects.filter(
status='published'
).order_by('-views')[:limit])
# 带缓存失效的装饰器
class CachedProperty:
"""缓存属性装饰器"""
def __init__(self, func, timeout=300):
self.func = func
self.timeout = timeout
def __get__(self, obj, objtype=None):
if obj is None:
return self
cache_key = f'{obj.__class__.__name__}_{obj.pk}_{self.func.__name__}'
value = cache.get(cache_key)
if value is None:
value = self.func(obj)
cache.set(cache_key, value, self.timeout)
return value
class Article(models.Model):
@CachedProperty
def comments_count(self):
return self.comments.count()
二、数据库优化
2.1 查询优化
# select_related - 一对一/外键关系
articles = Article.objects.select_related('author', 'category').all()
# prefetch_related - 多对多/反向外键关系
articles = Article.objects.prefetch_related('tags', 'comments').all()
# 组合使用
articles = Article.objects.select_related(
'author', 'category'
).prefetch_related(
'tags',
Prefetch(
'comments',
queryset=Comment.objects.filter(is_approved=True).select_related('author'),
to_attr='approved_comments'
)
)
# 自定义 Prefetch
from django.db.models import Prefetch
categories = Category.objects.prefetch_related(
Prefetch(
'articles',
queryset=Article.objects.filter(status='published').order_by('-created_at')[:5],
to_attr='recent_articles'
)
)
# 只查询需要的字段
articles = Article.objects.only('id', 'title', 'created_at')
articles = Article.objects.defer('content') # 延迟加载
# 使用 values 和 values_list
titles = Article.objects.values_list('title', flat=True)
data = Article.objects.values('title', 'author__username')
2.2 批量操作
# 批量创建
articles = [
Article(title=f'Article {i}', content=f'Content {i}')
for i in range(1000)
]
Article.objects.bulk_create(articles, batch_size=100)
# 批量更新
Article.objects.filter(status='draft').update(status='published')
# bulk_update
articles = list(Article.objects.filter(status='draft'))
for article in articles:
article.status = 'published'
Article.objects.bulk_update(articles, ['status'], batch_size=100)
# 批量删除
Article.objects.filter(created_at__lt=cutoff_date).delete()
# 使用 iterator 处理大数据集
for article in Article.objects.iterator(chunk_size=1000):
process(article)
2.3 索引优化
class Article(models.Model):
title = models.CharField(max_length=200, db_index=True)
slug = models.SlugField(unique=True)
status = models.CharField(max_length=20)
created_at = models.DateTimeField()
class Meta:
indexes = [
# 单列索引
models.Index(fields=['status']),
# 复合索引
models.Index(fields=['status', '-created_at']),
# 部分索引
models.Index(
fields=['title'],
condition=Q(status='published'),
name='published_title_idx'
),
# 包含索引(PostgreSQL)
models.Index(
fields=['status'],
include=['title', 'created_at'],
name='status_covering_idx'
),
# 表达式索引
models.Index(
Lower('title'),
name='title_lower_idx'
),
]
# 分析查询
from django.db import connection
with connection.cursor() as cursor:
cursor.execute('EXPLAIN ANALYZE SELECT * FROM articles WHERE status = %s', ['published'])
print(cursor.fetchall())
2.4 查询调试
# 使用 Django Debug Toolbar
INSTALLED_APPS = [
# ...
'debug_toolbar',
]
MIDDLEWARE = [
'debug_toolbar.middleware.DebugToolbarMiddleware',
# ...
]
INTERNAL_IPS = ['127.0.0.1']
# 手动打印查询
from django.db import connection, reset_queries
reset_queries()
articles = list(Article.objects.all())
print(connection.queries)
# 使用 QuerySet.explain()
print(Article.objects.filter(status='published').explain())
# 自定义查询分析器
import time
class QueryLogger:
def __init__(self):
self.queries = []
def __enter__(self):
reset_queries()
self.start_time = time.time()
return self
def __exit__(self, *args):
self.queries = connection.queries
self.total_time = time.time() - self.start_time
def report(self):
print(f'Total queries: {len(self.queries)}')
print(f'Total time: {self.total_time:.3f}s')
for q in self.queries:
print(f" {q['time']}s: {q['sql'][:100]}")
with QueryLogger() as logger:
articles = list(Article.objects.select_related('author').all())
logger.report()
三、异步任务
3.1 Celery 配置
# pip install celery redis
# celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
# 任务设置
CELERY_TASK_SOFT_TIME_LIMIT = 300 # 5分钟
CELERY_TASK_TIME_LIMIT = 600 # 10分钟
CELERY_TASK_ACKS_LATE = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
# __init__.py
from .celery import app as celery_app
__all__ = ['celery_app']
3.2 定义任务
# tasks.py
from celery import shared_task
from django.core.mail import send_mail
from django.template.loader import render_to_string
@shared_task
def send_email_task(subject, message, recipient_list):
"""发送邮件任务"""
send_mail(subject, message, 'noreply@example.com', recipient_list)
@shared_task(bind=True, max_retries=3)
def process_article(self, article_id):
"""处理文章任务"""
try:
article = Article.objects.get(pk=article_id)
# 生成摘要
article.excerpt = generate_excerpt(article.content)
# 更新搜索索引
update_search_index(article)
article.save()
except Article.DoesNotExist:
pass
except Exception as exc:
self.retry(exc=exc, countdown=60)
@shared_task
def cleanup_old_articles():
"""清理旧文章"""
cutoff = timezone.now() - timedelta(days=365)
deleted, _ = Article.objects.filter(
status='draft',
updated_at__lt=cutoff
).delete()
return f'Deleted {deleted} articles'
# 调用任务
send_email_task.delay('Subject', 'Message', ['user@example.com'])
process_article.delay(article.id)
# 延迟执行
from datetime import timedelta
send_email_task.apply_async(args=['Subject', 'Message', ['user@example.com']], countdown=60)
send_email_task.apply_async(args=['Subject', 'Message', ['user@example.com']], eta=datetime(2024, 1, 1, 10, 0))
3.3 定时任务
# settings.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'cleanup-old-articles': {
'task': 'blog.tasks.cleanup_old_articles',
'schedule': crontab(hour=3, minute=0), # 每天凌晨3点
},
'generate-statistics': {
'task': 'blog.tasks.generate_statistics',
'schedule': crontab(minute=0, hour='*/1'), # 每小时
},
'send-newsletter': {
'task': 'blog.tasks.send_newsletter',
'schedule': crontab(hour=9, minute=0, day_of_week=1), # 每周一上午9点
},
}
# 启动 beat
# celery -A myproject beat -l info
四、静态文件优化
4.1 静态文件配置
# settings.py
# 开发环境
STATIC_URL = '/static/'
STATICFILES_DIRS = [BASE_DIR / 'static']
# 生产环境
STATIC_ROOT = BASE_DIR / 'staticfiles'
# 静态文件存储
STATICFILES_STORAGE = 'django.contrib.staticfiles.storage.ManifestStaticFilesStorage'
# 或使用 WhiteNoise
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'whitenoise.middleware.WhiteNoiseMiddleware',
# ...
]
STATICFILES_STORAGE = 'whitenoise.storage.CompressedManifestStaticFilesStorage'
# 收集静态文件
# python manage.py collectstatic
4.2 CDN 配置
# 使用 django-storages + AWS S3
# pip install django-storages boto3
AWS_ACCESS_KEY_ID = 'your-access-key'
AWS_SECRET_ACCESS_KEY = 'your-secret-key'
AWS_STORAGE_BUCKET_NAME = 'your-bucket'
AWS_S3_CUSTOM_DOMAIN = f'{AWS_STORAGE_BUCKET_NAME}.s3.amazonaws.com'
AWS_S3_OBJECT_PARAMETERS = {
'CacheControl': 'max-age=86400',
}
# 静态文件使用 S3
STATICFILES_STORAGE = 'storages.backends.s3boto3.S3Boto3Storage'
STATIC_URL = f'https://{AWS_S3_CUSTOM_DOMAIN}/static/'
# 媒体文件使用 S3
DEFAULT_FILE_STORAGE = 'storages.backends.s3boto3.S3Boto3Storage'
MEDIA_URL = f'https://{AWS_S3_CUSTOM_DOMAIN}/media/'
五、性能监控
5.1 日志配置
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
'style': '{',
},
},
'handlers': {
'file': {
'level': 'WARNING',
'class': 'logging.FileHandler',
'filename': BASE_DIR / 'logs' / 'django.log',
'formatter': 'verbose',
},
'slow_queries': {
'level': 'DEBUG',
'class': 'logging.FileHandler',
'filename': BASE_DIR / 'logs' / 'slow_queries.log',
},
},
'loggers': {
'django': {
'handlers': ['file'],
'level': 'WARNING',
'propagate': True,
},
'django.db.backends': {
'handlers': ['slow_queries'],
'level': 'DEBUG',
},
},
}
5.2 性能中间件
import time
import logging
from django.db import connection
logger = logging.getLogger('performance')
class PerformanceMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
start_time = time.time()
start_queries = len(connection.queries)
response = self.get_response(request)
duration = time.time() - start_time
num_queries = len(connection.queries) - start_queries
# 慢请求警告
if duration > 1.0:
logger.warning(
f'Slow request: {request.path} '
f'Duration: {duration:.2f}s '
f'Queries: {num_queries}'
)
# 查询过多警告
if num_queries > 50:
logger.warning(
f'Too many queries: {request.path} '
f'Queries: {num_queries}'
)
return response
5.3 APM 集成
# 使用 Sentry
# pip install sentry-sdk
import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration
sentry_sdk.init(
dsn="your-sentry-dsn",
integrations=[DjangoIntegration()],
traces_sample_rate=0.1,
send_default_pii=True,
)
# 使用 New Relic
# pip install newrelic
# newrelic.ini 配置
# 启动:NEW_RELIC_CONFIG_FILE=newrelic.ini newrelic-admin run-program gunicorn myproject.wsgi
六、数据库连接池
6.1 配置连接池
# 使用 django-db-connection-pool
# pip install django-db-connection-pool
DATABASES = {
'default': {
'ENGINE': 'dj_db_conn_pool.backends.postgresql',
'NAME': 'mydb',
'USER': 'user',
'PASSWORD': 'password',
'HOST': 'localhost',
'PORT': '5432',
'POOL_OPTIONS': {
'POOL_SIZE': 10,
'MAX_OVERFLOW': 20,
'RECYCLE': 300,
}
}
}
# 或使用 PgBouncer
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'mydb',
'USER': 'user',
'PASSWORD': 'password',
'HOST': 'localhost',
'PORT': '6432', # PgBouncer 端口
'CONN_MAX_AGE': 0, # PgBouncer 管理连接
}
}
七、其他优化
7.1 压缩响应
# 使用 GZip 中间件
MIDDLEWARE = [
'django.middleware.gzip.GZipMiddleware',
# ...
]
# 或使用 WhiteNoise 压缩
STATICFILES_STORAGE = 'whitenoise.storage.CompressedManifestStaticFilesStorage'
7.2 Session 优化
# 使用缓存存储 Session
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'default'
# 或使用 Redis
SESSION_ENGINE = 'django.contrib.sessions.backends.cached_db'
7.3 模板优化
# 生产环境启用模板缓存
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'OPTIONS': {
'loaders': [
('django.template.loaders.cached.Loader', [
'django.template.loaders.filesystem.Loader',
'django.template.loaders.app_directories.Loader',
]),
],
},
},
]
八、最佳实践清单
8.1 缓存策略
# 1. 使用多级缓存
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/0',
},
'local': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
},
}
# 2. 缓存键命名规范
def get_cache_key(prefix, *args):
return f'{prefix}:{"_".join(str(a) for a in args)}'
# 3. 缓存失效策略
def invalidate_article_caches(article):
keys = [
f'article:{article.id}',
f'article_list:category:{article.category_id}',
f'author:{article.author_id}:articles',
]
cache.delete_many(keys)
8.2 查询优化清单
# 1. 使用 select_related 和 prefetch_related
# 2. 只查询需要的字段
# 3. 使用数据库索引
# 4. 避免 N+1 查询
# 5. 使用批量操作
# 6. 使用 exists() 代替 count() > 0
# 7. 使用 F() 表达式进行原子更新
# 8. 使用 annotate 代替 Python 计算