Django 用户认证与权限
2026/3/20大约 11 分钟
Django 用户认证与权限
一、认证系统概述
1.1 Django 认证框架
Django 内置了功能完善的用户认证系统,包括用户管理、权限控制、用户组等功能。
# settings.py
INSTALLED_APPS = [
'django.contrib.auth', # 认证框架
'django.contrib.contenttypes', # 内容类型(权限依赖)
'django.contrib.sessions', # 会话框架
# ...
]
MIDDLEWARE = [
'django.contrib.sessions.middleware.SessionMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
# ...
]
# 认证后端
AUTHENTICATION_BACKENDS = [
'django.contrib.auth.backends.ModelBackend',
]
# 密码哈希算法
PASSWORD_HASHERS = [
'django.contrib.auth.hashers.Argon2PasswordHasher',
'django.contrib.auth.hashers.PBKDF2PasswordHasher',
'django.contrib.auth.hashers.PBKDF2SHA1PasswordHasher',
'django.contrib.auth.hashers.BCryptSHA256PasswordHasher',
]
# 登录相关设置
LOGIN_URL = '/accounts/login/'
LOGIN_REDIRECT_URL = '/'
LOGOUT_REDIRECT_URL = '/'
1.2 User 模型
from django.contrib.auth.models import User
# 创建用户
user = User.objects.create_user(
username='john',
email='john@example.com',
password='password123',
first_name='John',
last_name='Doe',
)
# 创建超级用户
admin = User.objects.create_superuser(
username='admin',
email='admin@example.com',
password='admin123',
)
# 用户属性
user.username # 用户名
user.email # 邮箱
user.first_name # 名
user.last_name # 姓
user.is_active # 是否激活
user.is_staff # 是否员工(可访问 admin)
user.is_superuser # 是否超级用户
user.date_joined # 注册时间
user.last_login # 最后登录时间
# 检查密码
user.check_password('password123') # True
# 修改密码
user.set_password('newpassword')
user.save()
# 获取全名
user.get_full_name() # 'John Doe'
user.get_short_name() # 'John'
二、自定义用户模型
2.1 扩展用户模型(OneToOne)
# models.py
from django.db import models
from django.contrib.auth.models import User
from django.db.models.signals import post_save
from django.dispatch import receiver
class Profile(models.Model):
"""用户资料扩展"""
user = models.OneToOneField(User, on_delete=models.CASCADE, related_name='profile')
avatar = models.ImageField('头像', upload_to='avatars/', blank=True)
bio = models.TextField('个人简介', max_length=500, blank=True)
phone = models.CharField('手机号', max_length=11, blank=True)
birth_date = models.DateField('生日', null=True, blank=True)
location = models.CharField('位置', max_length=100, blank=True)
website = models.URLField('网站', blank=True)
class Meta:
verbose_name = '用户资料'
verbose_name_plural = verbose_name
def __str__(self):
return f'{self.user.username} 的资料'
# 自动创建 Profile
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
if created:
Profile.objects.create(user=instance)
@receiver(post_save, sender=User)
def save_user_profile(sender, instance, **kwargs):
instance.profile.save()
2.2 自定义用户模型(AbstractUser)
# accounts/models.py
from django.contrib.auth.models import AbstractUser
from django.db import models
class User(AbstractUser):
"""自定义用户模型"""
email = models.EmailField('邮箱', unique=True)
phone = models.CharField('手机号', max_length=11, unique=True, blank=True, null=True)
avatar = models.ImageField('头像', upload_to='avatars/', blank=True)
bio = models.TextField('个人简介', max_length=500, blank=True)
# 使用邮箱登录
USERNAME_FIELD = 'email'
REQUIRED_FIELDS = ['username']
class Meta:
verbose_name = '用户'
verbose_name_plural = verbose_name
db_table = 'users'
def __str__(self):
return self.email
# settings.py
AUTH_USER_MODEL = 'accounts.User'
2.3 自定义用户模型(AbstractBaseUser)
# accounts/models.py
from django.contrib.auth.models import AbstractBaseUser, BaseUserManager, PermissionsMixin
from django.db import models
class UserManager(BaseUserManager):
"""自定义用户管理器"""
def create_user(self, email, password=None, **extra_fields):
if not email:
raise ValueError('邮箱是必填项')
email = self.normalize_email(email)
user = self.model(email=email, **extra_fields)
user.set_password(password)
user.save(using=self._db)
return user
def create_superuser(self, email, password=None, **extra_fields):
extra_fields.setdefault('is_staff', True)
extra_fields.setdefault('is_superuser', True)
extra_fields.setdefault('is_active', True)
if extra_fields.get('is_staff') is not True:
raise ValueError('超级用户必须设置 is_staff=True')
if extra_fields.get('is_superuser') is not True:
raise ValueError('超级用户必须设置 is_superuser=True')
return self.create_user(email, password, **extra_fields)
class User(AbstractBaseUser, PermissionsMixin):
"""完全自定义用户模型"""
email = models.EmailField('邮箱', unique=True)
username = models.CharField('用户名', max_length=50, unique=True)
phone = models.CharField('手机号', max_length=11, unique=True, blank=True, null=True)
avatar = models.ImageField('头像', upload_to='avatars/', blank=True)
is_active = models.BooleanField('激活状态', default=True)
is_staff = models.BooleanField('员工状态', default=False)
date_joined = models.DateTimeField('注册时间', auto_now_add=True)
objects = UserManager()
USERNAME_FIELD = 'email'
REQUIRED_FIELDS = ['username']
class Meta:
verbose_name = '用户'
verbose_name_plural = verbose_name
def get_full_name(self):
return self.username
def get_short_name(self):
return self.username
三、用户认证
3.1 登录视图
from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.forms import AuthenticationForm
from django.shortcuts import render, redirect
from django.contrib import messages
def login_view(request):
"""登录视图"""
if request.user.is_authenticated:
return redirect('home')
if request.method == 'POST':
form = AuthenticationForm(request, data=request.POST)
if form.is_valid():
user = form.get_user()
login(request, user)
messages.success(request, f'欢迎回来,{user.username}!')
# 重定向到之前的页面
next_url = request.GET.get('next', 'home')
return redirect(next_url)
else:
messages.error(request, '用户名或密码错误')
else:
form = AuthenticationForm()
return render(request, 'accounts/login.html', {'form': form})
def logout_view(request):
"""登出视图"""
logout(request)
messages.info(request, '您已成功登出')
return redirect('home')
3.2 注册视图
from django import forms
from django.contrib.auth import get_user_model
from django.contrib.auth.forms import UserCreationForm
User = get_user_model()
class RegistrationForm(UserCreationForm):
"""注册表单"""
email = forms.EmailField(required=True)
class Meta:
model = User
fields = ['username', 'email', 'password1', 'password2']
def clean_email(self):
email = self.cleaned_data.get('email')
if User.objects.filter(email=email).exists():
raise forms.ValidationError('该邮箱已被注册')
return email
def register_view(request):
"""注册视图"""
if request.user.is_authenticated:
return redirect('home')
if request.method == 'POST':
form = RegistrationForm(request.POST)
if form.is_valid():
user = form.save(commit=False)
user.is_active = False # 需要邮箱验证
user.save()
# 发送验证邮件
send_verification_email(user)
messages.success(request, '注册成功!请查收验证邮件。')
return redirect('login')
else:
form = RegistrationForm()
return render(request, 'accounts/register.html', {'form': form})
3.3 密码重置
# urls.py
from django.contrib.auth import views as auth_views
urlpatterns = [
# 密码重置
path('password_reset/',
auth_views.PasswordResetView.as_view(
template_name='accounts/password_reset.html',
email_template_name='accounts/password_reset_email.html',
subject_template_name='accounts/password_reset_subject.txt',
success_url='/accounts/password_reset/done/',
),
name='password_reset'),
path('password_reset/done/',
auth_views.PasswordResetDoneView.as_view(
template_name='accounts/password_reset_done.html'
),
name='password_reset_done'),
path('reset/<uidb64>/<token>/',
auth_views.PasswordResetConfirmView.as_view(
template_name='accounts/password_reset_confirm.html',
success_url='/accounts/reset/done/',
),
name='password_reset_confirm'),
path('reset/done/',
auth_views.PasswordResetCompleteView.as_view(
template_name='accounts/password_reset_complete.html'
),
name='password_reset_complete'),
# 密码修改
path('password_change/',
auth_views.PasswordChangeView.as_view(
template_name='accounts/password_change.html',
success_url='/accounts/password_change/done/',
),
name='password_change'),
path('password_change/done/',
auth_views.PasswordChangeDoneView.as_view(
template_name='accounts/password_change_done.html'
),
name='password_change_done'),
]
3.4 邮箱验证
from django.contrib.auth.tokens import default_token_generator
from django.utils.http import urlsafe_base64_encode, urlsafe_base64_decode
from django.utils.encoding import force_bytes, force_str
from django.core.mail import send_mail
from django.template.loader import render_to_string
from django.contrib.sites.shortcuts import get_current_site
def send_verification_email(user, request):
"""发送验证邮件"""
current_site = get_current_site(request)
subject = '验证您的邮箱'
message = render_to_string('accounts/verification_email.html', {
'user': user,
'domain': current_site.domain,
'uid': urlsafe_base64_encode(force_bytes(user.pk)),
'token': default_token_generator.make_token(user),
})
send_mail(subject, message, 'noreply@example.com', [user.email])
def verify_email(request, uidb64, token):
"""验证邮箱"""
try:
uid = force_str(urlsafe_base64_decode(uidb64))
user = User.objects.get(pk=uid)
except (TypeError, ValueError, User.DoesNotExist):
user = None
if user and default_token_generator.check_token(user, token):
user.is_active = True
user.save()
messages.success(request, '邮箱验证成功!请登录。')
return redirect('login')
else:
messages.error(request, '验证链接无效或已过期')
return redirect('home')
四、认证后端
4.1 自定义认证后端
# accounts/backends.py
from django.contrib.auth.backends import ModelBackend
from django.contrib.auth import get_user_model
User = get_user_model()
class EmailBackend(ModelBackend):
"""邮箱认证后端"""
def authenticate(self, request, username=None, password=None, **kwargs):
try:
user = User.objects.get(email=username)
except User.DoesNotExist:
return None
if user.check_password(password) and self.user_can_authenticate(user):
return user
return None
class PhoneBackend(ModelBackend):
"""手机号认证后端"""
def authenticate(self, request, phone=None, code=None, **kwargs):
try:
user = User.objects.get(phone=phone)
except User.DoesNotExist:
return None
# 验证短信验证码
if verify_sms_code(phone, code) and self.user_can_authenticate(user):
return user
return None
class MultipleBackend(ModelBackend):
"""多方式认证后端"""
def authenticate(self, request, username=None, password=None, **kwargs):
try:
# 尝试用户名
user = User.objects.get(username=username)
except User.DoesNotExist:
try:
# 尝试邮箱
user = User.objects.get(email=username)
except User.DoesNotExist:
try:
# 尝试手机号
user = User.objects.get(phone=username)
except User.DoesNotExist:
return None
if user.check_password(password) and self.user_can_authenticate(user):
return user
return None
# settings.py
AUTHENTICATION_BACKENDS = [
'accounts.backends.MultipleBackend',
'django.contrib.auth.backends.ModelBackend',
]
4.2 第三方认证(OAuth)
# 使用 django-allauth
# pip install django-allauth
# settings.py
INSTALLED_APPS = [
# ...
'django.contrib.sites',
'allauth',
'allauth.account',
'allauth.socialaccount',
'allauth.socialaccount.providers.google',
'allauth.socialaccount.providers.github',
]
SITE_ID = 1
AUTHENTICATION_BACKENDS = [
'django.contrib.auth.backends.ModelBackend',
'allauth.account.auth_backends.AuthenticationBackend',
]
# allauth 配置
ACCOUNT_EMAIL_REQUIRED = True
ACCOUNT_EMAIL_VERIFICATION = 'mandatory'
ACCOUNT_AUTHENTICATION_METHOD = 'email'
ACCOUNT_USERNAME_REQUIRED = False
SOCIALACCOUNT_PROVIDERS = {
'google': {
'SCOPE': ['profile', 'email'],
'AUTH_PARAMS': {'access_type': 'online'},
},
'github': {
'SCOPE': ['user:email'],
},
}
# urls.py
urlpatterns = [
path('accounts/', include('allauth.urls')),
]
五、权限系统
5.1 权限基础
from django.contrib.auth.models import Permission
from django.contrib.contenttypes.models import ContentType
# 获取模型的权限
content_type = ContentType.objects.get_for_model(Article)
permissions = Permission.objects.filter(content_type=content_type)
# 创建自定义权限
permission = Permission.objects.create(
codename='can_publish',
name='Can publish articles',
content_type=content_type,
)
# 在模型中定义权限
class Article(models.Model):
# ...
class Meta:
permissions = [
('can_publish', '可以发布文章'),
('can_feature', '可以推荐文章'),
('can_moderate', '可以审核文章'),
]
# 检查权限
user.has_perm('blog.add_article')
user.has_perm('blog.change_article')
user.has_perm('blog.delete_article')
user.has_perm('blog.view_article')
user.has_perm('blog.can_publish')
# 添加/移除权限
user.user_permissions.add(permission)
user.user_permissions.remove(permission)
user.user_permissions.clear()
# 获取所有权限
user.get_all_permissions()
user.get_group_permissions()
5.2 用户组
from django.contrib.auth.models import Group
# 创建用户组
editors_group = Group.objects.create(name='Editors')
authors_group = Group.objects.create(name='Authors')
# 为组添加权限
editors_group.permissions.add(
Permission.objects.get(codename='add_article'),
Permission.objects.get(codename='change_article'),
Permission.objects.get(codename='can_publish'),
)
# 将用户添加到组
user.groups.add(editors_group)
user.groups.remove(authors_group)
# 检查组
user.groups.filter(name='Editors').exists()
# 通过组检查权限
user.has_perm('blog.can_publish') # 会检查用户和组的权限
5.3 视图权限控制
from django.contrib.auth.decorators import login_required, permission_required, user_passes_test
from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMixin, UserPassesTestMixin
# 函数视图装饰器
@login_required
def profile(request):
return render(request, 'accounts/profile.html')
@login_required(login_url='/accounts/login/')
def dashboard(request):
return render(request, 'dashboard.html')
@permission_required('blog.add_article')
def create_article(request):
pass
@permission_required('blog.add_article', raise_exception=True)
def create_article(request):
pass
@permission_required(['blog.add_article', 'blog.change_article'])
def manage_articles(request):
pass
@user_passes_test(lambda u: u.is_staff)
def staff_only(request):
pass
def is_editor(user):
return user.groups.filter(name='Editors').exists()
@user_passes_test(is_editor)
def editor_view(request):
pass
# 类视图 Mixin
class ArticleCreateView(LoginRequiredMixin, CreateView):
login_url = '/accounts/login/'
redirect_field_name = 'next'
class ArticlePublishView(PermissionRequiredMixin, UpdateView):
permission_required = 'blog.can_publish'
# permission_required = ['blog.can_publish', 'blog.change_article']
class ArticleDeleteView(UserPassesTestMixin, DeleteView):
def test_func(self):
article = self.get_object()
return self.request.user == article.author or self.request.user.is_staff
5.4 对象级权限
# 使用 django-guardian
# pip install django-guardian
# settings.py
INSTALLED_APPS = [
# ...
'guardian',
]
AUTHENTICATION_BACKENDS = [
'django.contrib.auth.backends.ModelBackend',
'guardian.backends.ObjectPermissionBackend',
]
# 使用
from guardian.shortcuts import assign_perm, remove_perm, get_perms, get_objects_for_user
# 分配对象权限
article = Article.objects.get(pk=1)
assign_perm('change_article', user, article)
assign_perm('delete_article', user, article)
# 移除对象权限
remove_perm('change_article', user, article)
# 检查对象权限
user.has_perm('change_article', article)
# 获取用户有权限的对象
articles = get_objects_for_user(user, 'blog.change_article')
# 获取对象的所有权限
perms = get_perms(user, article)
# 视图中使用
from guardian.decorators import permission_required_or_403
@permission_required_or_403('blog.change_article', (Article, 'pk', 'pk'))
def edit_article(request, pk):
article = get_object_or_404(Article, pk=pk)
# ...
# 类视图
from guardian.mixins import PermissionRequiredMixin
class ArticleUpdateView(PermissionRequiredMixin, UpdateView):
model = Article
permission_required = 'blog.change_article'
六、会话管理
6.1 会话配置
# settings.py
# 会话引擎
SESSION_ENGINE = 'django.contrib.sessions.backends.db' # 数据库(默认)
# SESSION_ENGINE = 'django.contrib.sessions.backends.cache' # 缓存
# SESSION_ENGINE = 'django.contrib.sessions.backends.cached_db' # 缓存+数据库
# SESSION_ENGINE = 'django.contrib.sessions.backends.file' # 文件
# SESSION_ENGINE = 'django.contrib.sessions.backends.signed_cookies' # Cookie
# 会话过期时间
SESSION_COOKIE_AGE = 1209600 # 两周(秒)
SESSION_EXPIRE_AT_BROWSER_CLOSE = False
# 会话 Cookie 设置
SESSION_COOKIE_NAME = 'sessionid'
SESSION_COOKIE_SECURE = True # 仅 HTTPS
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax'
# 保存每次请求
SESSION_SAVE_EVERY_REQUEST = True
6.2 会话操作
def session_example(request):
# 设置会话数据
request.session['user_id'] = 123
request.session['cart'] = {'item1': 2, 'item2': 1}
# 获取会话数据
user_id = request.session.get('user_id')
cart = request.session.get('cart', {})
# 删除会话数据
del request.session['user_id']
# 检查键是否存在
if 'cart' in request.session:
pass
# 清空会话
request.session.flush()
# 设置过期时间
request.session.set_expiry(300) # 5分钟
request.session.set_expiry(0) # 浏览器关闭时过期
request.session.set_expiry(None) # 使用全局设置
# 获取过期时间
request.session.get_expiry_age()
request.session.get_expiry_date()
# 循环更换会话 key
request.session.cycle_key()
6.3 强制单设备登录
from django.contrib.sessions.models import Session
from django.utils import timezone
def force_single_session(user):
"""强制单设备登录,踢出其他会话"""
# 获取所有会话
sessions = Session.objects.filter(expire_date__gte=timezone.now())
for session in sessions:
data = session.get_decoded()
if data.get('_auth_user_id') == str(user.pk):
session.delete()
# 在登录时调用
def login_view(request):
if request.method == 'POST':
form = AuthenticationForm(request, data=request.POST)
if form.is_valid():
user = form.get_user()
force_single_session(user) # 踢出其他会话
login(request, user)
return redirect('home')
七、密码策略
7.1 密码验证器
# settings.py
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
'OPTIONS': {
'user_attributes': ('username', 'email', 'first_name', 'last_name'),
'max_similarity': 0.7,
}
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
'OPTIONS': {
'min_length': 8,
}
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
# 自定义验证器
{
'NAME': 'accounts.validators.CustomPasswordValidator',
},
]
7.2 自定义密码验证器
# accounts/validators.py
from django.core.exceptions import ValidationError
import re
class CustomPasswordValidator:
"""自定义密码验证器"""
def __init__(self, min_length=8):
self.min_length = min_length
def validate(self, password, user=None):
if len(password) < self.min_length:
raise ValidationError(
f'密码至少需要 {self.min_length} 个字符',
code='password_too_short',
)
if not re.search(r'[A-Z]', password):
raise ValidationError(
'密码必须包含至少一个大写字母',
code='password_no_upper',
)
if not re.search(r'[a-z]', password):
raise ValidationError(
'密码必须包含至少一个小写字母',
code='password_no_lower',
)
if not re.search(r'\d', password):
raise ValidationError(
'密码必须包含至少一个数字',
code='password_no_digit',
)
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
raise ValidationError(
'密码必须包含至少一个特殊字符',
code='password_no_special',
)
def get_help_text(self):
return '密码必须至少包含8个字符,包括大写字母、小写字母、数字和特殊字符'
7.3 密码过期策略
from django.utils import timezone
from datetime import timedelta
class User(AbstractUser):
password_changed_at = models.DateTimeField('密码修改时间', auto_now_add=True)
def is_password_expired(self, days=90):
"""检查密码是否过期"""
if not self.password_changed_at:
return True
expiry_date = self.password_changed_at + timedelta(days=days)
return timezone.now() > expiry_date
def set_password(self, raw_password):
super().set_password(raw_password)
self.password_changed_at = timezone.now()
# 中间件检查密码过期
class PasswordExpiryMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
if request.user.is_authenticated:
if request.user.is_password_expired():
if not request.path.startswith('/accounts/password_change/'):
return redirect('password_change')
return self.get_response(request)
八、双因素认证
8.1 TOTP 认证
# 使用 django-otp
# pip install django-otp qrcode
# settings.py
INSTALLED_APPS = [
# ...
'django_otp',
'django_otp.plugins.otp_totp',
'django_otp.plugins.otp_static',
]
MIDDLEWARE = [
# ...
'django_otp.middleware.OTPMiddleware',
]
# 视图
from django_otp.plugins.otp_totp.models import TOTPDevice
import qrcode
from io import BytesIO
import base64
def setup_2fa(request):
"""设置双因素认证"""
user = request.user
# 创建 TOTP 设备
device, created = TOTPDevice.objects.get_or_create(
user=user,
name='default',
defaults={'confirmed': False}
)
# 生成 QR 码
url = device.config_url
qr = qrcode.make(url)
buffer = BytesIO()
qr.save(buffer, format='PNG')
qr_code = base64.b64encode(buffer.getvalue()).decode()
if request.method == 'POST':
token = request.POST.get('token')
if device.verify_token(token):
device.confirmed = True
device.save()
messages.success(request, '双因素认证设置成功')
return redirect('profile')
else:
messages.error(request, '验证码错误')
return render(request, 'accounts/setup_2fa.html', {
'qr_code': qr_code,
'secret': device.key,
})
def verify_2fa(request):
"""验证双因素认证"""
if request.method == 'POST':
token = request.POST.get('token')
device = TOTPDevice.objects.get(user=request.user, confirmed=True)
if device.verify_token(token):
request.session['2fa_verified'] = True
return redirect('dashboard')
else:
messages.error(request, '验证码错误')
return render(request, 'accounts/verify_2fa.html')
# 装饰器
from django_otp.decorators import otp_required
@otp_required
def sensitive_view(request):
pass
8.2 短信验证码
import random
from django.core.cache import cache
def send_sms_code(phone):
"""发送短信验证码"""
code = ''.join([str(random.randint(0, 9)) for _ in range(6)])
# 存储验证码(5分钟有效)
cache.set(f'sms_code_{phone}', code, 300)
# 发送短信(调用第三方 API)
send_sms(phone, f'您的验证码是:{code},5分钟内有效。')
return True
def verify_sms_code(phone, code):
"""验证短信验证码"""
cached_code = cache.get(f'sms_code_{phone}')
if cached_code and cached_code == code:
cache.delete(f'sms_code_{phone}')
return True
return False
def sms_login(request):
"""短信验证码登录"""
if request.method == 'POST':
phone = request.POST.get('phone')
code = request.POST.get('code')
if verify_sms_code(phone, code):
try:
user = User.objects.get(phone=phone)
login(request, user)
return redirect('home')
except User.DoesNotExist:
messages.error(request, '用户不存在')
else:
messages.error(request, '验证码错误或已过期')
return render(request, 'accounts/sms_login.html')
九、API 认证
9.1 Token 认证
# 使用 Django REST framework
from rest_framework.authentication import TokenAuthentication
from rest_framework.authtoken.models import Token
from rest_framework.authtoken.views import ObtainAuthToken
# 创建 Token
token, created = Token.objects.get_or_create(user=user)
# 视图
class CustomAuthToken(ObtainAuthToken):
def post(self, request, *args, **kwargs):
serializer = self.serializer_class(data=request.data)
serializer.is_valid(raise_exception=True)
user = serializer.validated_data['user']
token, created = Token.objects.get_or_create(user=user)
return Response({
'token': token.key,
'user_id': user.pk,
'email': user.email
})
# settings.py
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.TokenAuthentication',
],
}
9.2 JWT 认证
# 使用 djangorestframework-simplejwt
# pip install djangorestframework-simplejwt
# settings.py
from datetime import timedelta
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework_simplejwt.authentication.JWTAuthentication',
],
}
SIMPLE_JWT = {
'ACCESS_TOKEN_LIFETIME': timedelta(minutes=5),
'REFRESH_TOKEN_LIFETIME': timedelta(days=1),
'ROTATE_REFRESH_TOKENS': True,
'BLACKLIST_AFTER_ROTATION': True,
'ALGORITHM': 'HS256',
'SIGNING_KEY': SECRET_KEY,
'AUTH_HEADER_TYPES': ('Bearer',),
}
# urls.py
from rest_framework_simplejwt.views import TokenObtainPairView, TokenRefreshView
urlpatterns = [
path('api/token/', TokenObtainPairView.as_view(), name='token_obtain_pair'),
path('api/token/refresh/', TokenRefreshView.as_view(), name='token_refresh'),
]
# 自定义 Token
from rest_framework_simplejwt.serializers import TokenObtainPairSerializer
class CustomTokenObtainPairSerializer(TokenObtainPairSerializer):
@classmethod
def get_token(cls, user):
token = super().get_token(user)
# 添加自定义声明
token['username'] = user.username
token['email'] = user.email
token['is_staff'] = user.is_staff
return token
十、安全最佳实践
10.1 防暴力破解
# 使用 django-axes
# pip install django-axes
# settings.py
INSTALLED_APPS = [
# ...
'axes',
]
MIDDLEWARE = [
# 必须在 AuthenticationMiddleware 之后
'axes.middleware.AxesMiddleware',
]
AUTHENTICATION_BACKENDS = [
'axes.backends.AxesStandaloneBackend',
'django.contrib.auth.backends.ModelBackend',
]
# axes 配置
AXES_FAILURE_LIMIT = 5 # 失败次数限制
AXES_COOLOFF_TIME = 1 # 冷却时间(小时)
AXES_LOCKOUT_TEMPLATE = 'accounts/lockout.html'
AXES_RESET_ON_SUCCESS = True
AXES_ONLY_USER_FAILURES = True
10.2 安全配置清单
# settings.py
# 密码哈希
PASSWORD_HASHERS = [
'django.contrib.auth.hashers.Argon2PasswordHasher',
'django.contrib.auth.hashers.PBKDF2PasswordHasher',
]
# 会话安全
SESSION_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax'
SESSION_EXPIRE_AT_BROWSER_CLOSE = True
# CSRF
CSRF_COOKIE_SECURE = True
CSRF_COOKIE_HTTPONLY = True
CSRF_COOKIE_SAMESITE = 'Lax'
# HTTPS
SECURE_SSL_REDIRECT = True
SECURE_PROXY_SSL_HEADER = ('HTTP_X_FORWARDED_PROTO', 'https')
# 安全头
SECURE_BROWSER_XSS_FILTER = True
SECURE_CONTENT_TYPE_NOSNIFF = True
X_FRAME_OPTIONS = 'DENY'
# 密码策略
AUTH_PASSWORD_VALIDATORS = [
# ... 验证器配置
]
10.3 审计日志
from django.contrib.auth.signals import user_logged_in, user_logged_out, user_login_failed
from django.dispatch import receiver
import logging
logger = logging.getLogger('security')
@receiver(user_logged_in)
def log_user_login(sender, request, user, **kwargs):
ip = get_client_ip(request)
logger.info(f'用户登录: {user.username} from {ip}')
@receiver(user_logged_out)
def log_user_logout(sender, request, user, **kwargs):
ip = get_client_ip(request)
logger.info(f'用户登出: {user.username} from {ip}')
@receiver(user_login_failed)
def log_user_login_failed(sender, credentials, request, **kwargs):
ip = get_client_ip(request)
username = credentials.get('username', 'unknown')
logger.warning(f'登录失败: {username} from {ip}')
def get_client_ip(request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
return x_forwarded_for.split(',')[0]
return request.META.get('REMOTE_ADDR')