Redis 分布式锁实现
2026/3/20大约 17 分钟
Redis 分布式锁实现
一、分布式锁概述
1.1 为什么需要分布式锁
在单机环境中,可以使用 synchronized 或 ReentrantLock 实现线程同步。但在分布式系统中,多个服务实例运行在不同的机器上,本地锁无法跨进程生效。
1.2 分布式锁的特性要求
一个可靠的分布式锁需要具备以下特性:
| 特性 | 说明 |
|---|---|
| 互斥性 | 同一时刻只能有一个客户端持有锁 |
| 不会死锁 | 即使持有锁的客户端崩溃,锁也能被释放 |
| 容错性 | 只要大部分节点正常,就能获取和释放锁 |
| 加锁解锁同一客户端 | 不能释放别人的锁 |
| 可重入性 | 同一客户端可以多次获取同一把锁(可选) |
1.3 分布式锁实现方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| MySQL | 实现简单,可靠 | 性能较差 | 并发不高的场景 |
| Redis | 性能高,实现相对简单 | 主从切换可能丢锁 | 高性能要求场景 |
| ZooKeeper | 可靠性最高 | 性能一般,复杂度高 | 对一致性要求极高的场景 |
| etcd | 高可用,强一致 | 相对小众 | 云原生环境 |
二、Redis 分布式锁基础实现
2.1 最简单的实现
# 加锁
SET lock:resource value NX EX 30
# NX: 只有 key 不存在时才设置
# EX: 设置过期时间,防止死锁
# 解锁
DEL lock:resource
2.2 存在的问题
问题 1:锁被误删
解决方案:使用唯一标识
public class SimpleRedisLock {
private RedisTemplate<String, String> redisTemplate;
private String lockKey;
private String lockValue; // 唯一标识,用于识别锁的所有者
public SimpleRedisLock(RedisTemplate<String, String> redisTemplate, String lockKey) {
this.redisTemplate = redisTemplate;
this.lockKey = lockKey;
// 使用 UUID + 线程 ID 作为唯一标识
this.lockValue = UUID.randomUUID().toString() + ":" + Thread.currentThread().getId();
}
// 加锁
public boolean tryLock(long expireSeconds) {
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, expireSeconds, TimeUnit.SECONDS);
return Boolean.TRUE.equals(result);
}
// 解锁(存在问题:非原子操作)
public void unlock() {
String value = redisTemplate.opsForValue().get(lockKey);
if (lockValue.equals(value)) {
redisTemplate.delete(lockKey);
}
}
}
问题 2:解锁非原子性
上面的解锁代码存在问题:判断和删除是两个操作,不是原子的。
解决方案:使用 Lua 脚本
public class RedisLockWithLua {
private RedisTemplate<String, String> redisTemplate;
private String lockKey;
private String lockValue;
// Lua 脚本:判断和删除是原子操作
private static final String UNLOCK_SCRIPT =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
public RedisLockWithLua(RedisTemplate<String, String> redisTemplate, String lockKey) {
this.redisTemplate = redisTemplate;
this.lockKey = lockKey;
this.lockValue = UUID.randomUUID().toString() + ":" + Thread.currentThread().getId();
}
public boolean tryLock(long expireSeconds) {
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, expireSeconds, TimeUnit.SECONDS);
return Boolean.TRUE.equals(result);
}
public boolean unlock() {
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptText(UNLOCK_SCRIPT);
script.setResultType(Long.class);
Long result = redisTemplate.execute(script,
Collections.singletonList(lockKey),
lockValue);
return Long.valueOf(1L).equals(result);
}
}
问题 3:锁过期但业务未完成
三、Redisson 分布式锁
Redisson 是一个功能强大的 Redis 客户端,提供了完善的分布式锁实现,解决了上述所有问题。
3.1 引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.23.5</version>
</dependency>
3.2 配置 Redisson
# application.yml
spring:
redis:
host: localhost
port: 6379
password: your_password
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://localhost:6379")
.setPassword("your_password")
.setDatabase(0);
return Redisson.create(config);
}
}
3.3 基本使用
@Service
public class OrderService {
@Autowired
private RedissonClient redissonClient;
public void createOrder(Long productId, Long userId) {
String lockKey = "lock:order:product:" + productId;
RLock lock = redissonClient.getLock(lockKey);
try {
// 尝试获取锁,等待 10 秒,锁过期时间 30 秒
boolean locked = lock.tryLock(10, 30, TimeUnit.SECONDS);
if (!locked) {
throw new RuntimeException("系统繁忙,请稍后重试");
}
// 执行业务逻辑
doCreateOrder(productId, userId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("获取锁被中断");
} finally {
// 释放锁(只有锁的持有者才能释放)
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}
3.4 Redisson 锁的实现原理
加锁过程
-- Redisson 加锁的 Lua 脚本(简化版)
-- KEYS[1] = 锁的 key
-- ARGV[1] = 锁过期时间
-- ARGV[2] = 客户端唯一标识(UUID:ThreadId)
-- 如果锁不存在
if (redis.call('exists', KEYS[1]) == 0) then
-- 创建锁,使用 Hash 结构存储
redis.call('hset', KEYS[1], ARGV[2], 1)
-- 设置过期时间
redis.call('pexpire', KEYS[1], ARGV[1])
return nil
end
-- 如果锁存在,检查是否是当前客户端持有(可重入)
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 重入次数 +1
redis.call('hincrby', KEYS[1], ARGV[2], 1)
-- 重置过期时间
redis.call('pexpire', KEYS[1], ARGV[1])
return nil
end
-- 返回锁的剩余过期时间
return redis.call('pttl', KEYS[1])
锁的数据结构
看门狗机制
解锁过程
-- Redisson 解锁的 Lua 脚本(简化版)
-- KEYS[1] = 锁的 key
-- KEYS[2] = 发布订阅的 channel
-- ARGV[1] = 消息(用于通知等待的线程)
-- ARGV[2] = 锁过期时间
-- ARGV[3] = 客户端唯一标识
-- 如果锁不是当前客户端持有
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil
end
-- 重入次数 -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1)
if (counter > 0) then
-- 还有重入,重置过期时间
redis.call('pexpire', KEYS[1], ARGV[2])
return 0
else
-- 完全释放,删除锁
redis.call('del', KEYS[1])
-- 发布消息,通知等待的线程
redis.call('publish', KEYS[2], ARGV[1])
return 1
end
3.5 Redisson 锁的类型
可重入锁(RLock)
RLock lock = redissonClient.getLock("myLock");
// 阻塞式获取锁
lock.lock();
// 尝试获取锁(立即返回)
boolean locked = lock.tryLock();
// 尝试获取锁(等待 10 秒)
boolean locked = lock.tryLock(10, TimeUnit.SECONDS);
// 尝试获取锁(等待 10 秒,锁持有 30 秒后自动释放)
// 注意:指定 leaseTime 后不会启动看门狗
boolean locked = lock.tryLock(10, 30, TimeUnit.SECONDS);
// 释放锁
lock.unlock();
公平锁(Fair Lock)
// 公平锁:按请求顺序获取锁
RLock fairLock = redissonClient.getFairLock("fairLock");
fairLock.lock();
try {
// 业务逻辑
} finally {
fairLock.unlock();
}
读写锁(ReadWriteLock)
RReadWriteLock rwLock = redissonClient.getReadWriteLock("rwLock");
// 读锁(共享锁)
RLock readLock = rwLock.readLock();
readLock.lock();
try {
// 读取数据
} finally {
readLock.unlock();
}
// 写锁(排他锁)
RLock writeLock = rwLock.writeLock();
writeLock.lock();
try {
// 写入数据
} finally {
writeLock.unlock();
}
红锁(RedLock)
用于解决 Redis 主从切换时的锁丢失问题。
// 需要多个独立的 Redis 实例
RLock lock1 = redissonClient1.getLock("lock");
RLock lock2 = redissonClient2.getLock("lock");
RLock lock3 = redissonClient3.getLock("lock");
// 创建红锁
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
try {
// 在超过半数节点上加锁成功才算成功
boolean locked = redLock.tryLock(10, 30, TimeUnit.SECONDS);
if (locked) {
// 业务逻辑
}
} finally {
redLock.unlock();
}
联锁(MultiLock)
// 同时获取多把锁
RLock lock1 = redissonClient.getLock("lock1");
RLock lock2 = redissonClient.getLock("lock2");
RLock lock3 = redissonClient.getLock("lock3");
RedissonMultiLock multiLock = new RedissonMultiLock(lock1, lock2, lock3);
multiLock.lock();
try {
// 只有获取到所有锁才能执行
} finally {
multiLock.unlock();
}
四、实战场景
4.1 库存扣减
@Service
public class StockService {
@Autowired
private RedissonClient redissonClient;
@Autowired
private StockMapper stockMapper;
public boolean deductStock(Long productId, int quantity) {
String lockKey = "lock:stock:" + productId;
RLock lock = redissonClient.getLock(lockKey);
try {
// 等待 5 秒,锁持有 30 秒
boolean locked = lock.tryLock(5, 30, TimeUnit.SECONDS);
if (!locked) {
return false;
}
// 查询库存
Stock stock = stockMapper.selectByProductId(productId);
if (stock == null || stock.getQuantity() < quantity) {
return false;
}
// 扣减库存
int affected = stockMapper.deductStock(productId, quantity);
return affected > 0;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}
4.2 防止重复提交
@Aspect
@Component
public class NoRepeatSubmitAspect {
@Autowired
private RedissonClient redissonClient;
@Around("@annotation(noRepeatSubmit)")
public Object around(ProceedingJoinPoint point, NoRepeatSubmit noRepeatSubmit) throws Throwable {
// 获取用户标识
String userId = getCurrentUserId();
// 获取方法签名
String methodSignature = point.getSignature().toLongString();
// 锁的 key
String lockKey = "lock:submit:" + userId + ":" + methodSignature.hashCode();
RLock lock = redissonClient.getLock(lockKey);
try {
// 尝试加锁(不等待)
boolean locked = lock.tryLock(0, noRepeatSubmit.expireSeconds(), TimeUnit.SECONDS);
if (!locked) {
throw new RuntimeException("请勿重复提交");
}
return point.proceed();
} finally {
// 不主动解锁,让锁自然过期
// 这样在 expireSeconds 内无法重复提交
}
}
}
// 使用
@NoRepeatSubmit(expireSeconds = 3)
@PostMapping("/submit")
public Result submit(@RequestBody OrderRequest request) {
return orderService.createOrder(request);
}
4.3 分布式定时任务
@Component
public class ScheduledTask {
@Autowired
private RedissonClient redissonClient;
// 每分钟执行,但集群中只有一个节点执行
@Scheduled(cron = "0 * * * * ?")
public void syncData() {
String lockKey = "lock:task:syncData";
RLock lock = redissonClient.getLock(lockKey);
try {
// 尝试加锁(不等待),锁持有 50 秒
boolean locked = lock.tryLock(0, 50, TimeUnit.SECONDS);
if (!locked) {
log.info("其他节点正在执行任务,跳过");
return;
}
log.info("开始执行数据同步任务");
doSyncData();
log.info("数据同步任务完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}
4.4 缓存更新
@Service
public class ProductCacheService {
@Autowired
private RedissonClient redissonClient;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public Product getProduct(Long productId) {
String cacheKey = "product:" + productId;
String lockKey = "lock:cache:product:" + productId;
// 1. 查询缓存
Product product = (Product) redisTemplate.opsForValue().get(cacheKey);
if (product != null) {
return product;
}
// 2. 缓存未命中,加锁
RLock lock = redissonClient.getLock(lockKey);
try {
boolean locked = lock.tryLock(5, 10, TimeUnit.SECONDS);
if (locked) {
// 双重检查
product = (Product) redisTemplate.opsForValue().get(cacheKey);
if (product != null) {
return product;
}
// 查询数据库
product = productMapper.selectById(productId);
if (product != null) {
redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS);
}
return product;
} else {
// 获取锁失败,短暂等待后重试
Thread.sleep(100);
return getProduct(productId);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("获取锁被中断");
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}
五、注意事项与最佳实践
5.1 常见问题
问题 1:锁过期时间设置
// 错误:设置过短可能导致业务未完成锁就释放了
lock.tryLock(5, 5, TimeUnit.SECONDS);
// 正确:使用看门狗机制(不指定 leaseTime)
lock.tryLock(5, TimeUnit.SECONDS);
// 或者:根据业务预估合理的时间
lock.tryLock(5, 60, TimeUnit.SECONDS);
问题 2:未正确释放锁
// 错误:可能释放别人的锁
public void doSomething() {
RLock lock = redissonClient.getLock("lock");
lock.lock();
try {
// 业务逻辑
} finally {
lock.unlock(); // 如果不是当前线程持有,会抛异常
}
}
// 正确:检查是否是当前线程持有
public void doSomething() {
RLock lock = redissonClient.getLock("lock");
try {
if (lock.tryLock(10, TimeUnit.SECONDS)) {
try {
// 业务逻辑
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
问题 3:Redis 主从切换丢锁
5.2 最佳实践
@Component
public class DistributedLockTemplate {
@Autowired
private RedissonClient redissonClient;
private static final long DEFAULT_WAIT_TIME = 10L;
private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
/**
* 执行带分布式锁的操作
*
* @param lockKey 锁的 key
* @param supplier 业务逻辑
* @return 业务结果
*/
public <T> T executeWithLock(String lockKey, Supplier<T> supplier) {
return executeWithLock(lockKey, DEFAULT_WAIT_TIME, -1, DEFAULT_TIME_UNIT, supplier);
}
/**
* 执行带分布式锁的操作(自定义参数)
*
* @param lockKey 锁的 key
* @param waitTime 等待时间
* @param leaseTime 锁持有时间(-1 表示使用看门狗)
* @param timeUnit 时间单位
* @param supplier 业务逻辑
* @return 业务结果
*/
public <T> T executeWithLock(String lockKey, long waitTime, long leaseTime,
TimeUnit timeUnit, Supplier<T> supplier) {
RLock lock = redissonClient.getLock(lockKey);
boolean locked = false;
try {
if (leaseTime > 0) {
locked = lock.tryLock(waitTime, leaseTime, timeUnit);
} else {
locked = lock.tryLock(waitTime, timeUnit);
}
if (!locked) {
throw new LockException("获取锁失败: " + lockKey);
}
return supplier.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LockException("获取锁被中断: " + lockKey, e);
} finally {
if (locked && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
/**
* 执行带分布式锁的操作(无返回值)
*/
public void executeWithLock(String lockKey, Runnable runnable) {
executeWithLock(lockKey, () -> {
runnable.run();
return null;
});
}
}
// 使用示例
@Service
public class OrderService {
@Autowired
private DistributedLockTemplate lockTemplate;
public Order createOrder(OrderRequest request) {
String lockKey = "lock:order:" + request.getUserId();
return lockTemplate.executeWithLock(lockKey, () -> {
// 业务逻辑
return doCreateOrder(request);
});
}
}
5.3 监控与告警
@Aspect
@Component
public class LockMonitorAspect {
private static final Logger log = LoggerFactory.getLogger(LockMonitorAspect.class);
@Autowired
private MeterRegistry meterRegistry;
@Around("execution(* org.redisson.api.RLock.tryLock(..))")
public Object monitorLock(ProceedingJoinPoint point) throws Throwable {
String lockKey = getLockKey(point);
Timer.Sample sample = Timer.start(meterRegistry);
try {
Object result = point.proceed();
boolean locked = (boolean) result;
// 记录获取锁结果
meterRegistry.counter("redis.lock.acquire",
"key", lockKey,
"result", locked ? "success" : "failed").increment();
if (!locked) {
log.warn("获取锁失败: {}", lockKey);
}
return result;
} catch (Exception e) {
meterRegistry.counter("redis.lock.error",
"key", lockKey,
"error", e.getClass().getSimpleName()).increment();
throw e;
} finally {
sample.stop(meterRegistry.timer("redis.lock.duration", "key", lockKey));
}
}
}
六、总结
分布式锁实现方案
| 方案 | 复杂度 | 性能 | 可靠性 | 推荐场景 |
|---|---|---|---|---|
| SETNX + Lua | 低 | 高 | 中 | 简单场景 |
| Redisson 单机 | 低 | 高 | 中 | 一般生产环境 |
| Redisson + Sentinel | 中 | 高 | 较高 | 高可用要求 |
| RedLock | 高 | 中 | 高 | 强一致性要求 |
最佳实践总结
- 优先使用 Redisson:功能完善,解决了大部分问题
- 合理设置超时时间:根据业务预估,或使用看门狗
- 正确释放锁:检查是否是当前线程持有
- 使用 try-finally:确保锁一定被释放
- 添加监控:及时发现锁竞争和超时问题
- 考虑幂等性:即使锁失效,业务也能正确处理
选择建议
- 一般业务:Redisson 可重入锁
- 读多写少:Redisson 读写锁
- 公平竞争:Redisson 公平锁
- 极高可靠性:RedLock 或 ZooKeeper