Redisson 分布式锁避免重复提交任务
为什么要引入 分布式锁 ?
在本地锁场景下,已经能够解决本地服务下多线程竞争资源的同步问题,但是在分布式的场景下,服务A对资源x加锁,而服务B不受服务A的限制,若此时服务B也去访问资源B,就会出现数据不一致的问题,导致服务出错。
分布式锁
分布式锁是分布式集群场景下共享的一种锁机制,保证了不同服务下的互斥性,任意时刻,只有一个客户端可以持有锁。
特性:
- 互斥性:同一时刻,只有一个客户端可以持有锁,不同线程之间具有互斥性,即便是跨服务。
- 超时机制:正常情况下,持有锁的客户端执行完成之后释放锁,但是如果执行时发生异常,导致锁无法正确释放,就会造成死锁。
为了防止死锁,需要设置超时机制,超时自动释放。 - 自动续期:超时机制会带来一个问题,如果持有锁的客户端在时间内无法完成任务,会导致任务还没完成就释放锁,这与我们的初衷是违背的(我们引入超时机制并不是为了阻碍正常的任务完成,而是为了解决潜在服务异常造成的死锁问题),因此,需要引入自动续期机制。
开启一个监听任务,监听线程,如果客户端还存活就延长超时时间,直到任务完成或者是任务异常。
几种实现方式
- 数据库
- Zookeeper
- Redis
其中,Redis性能最高,方式有两种,一是利用SetNX(但是有死锁问题),而是Redisson ,也是最常用的。
实现
直接实现
String lockKey = REVIEW_LOCK_PREFIX + taskId;
RLock lock = redissonCilent.getLock(lockKey);
try{
// 尝试获取锁(最多等待2秒,锁30秒后释放)
if(lock.tryLock(2, 30, TimeUnit.second){
// 任务执行逻辑
// ...
}else{
log.info("任务 {} 正在被其他人员处理,跳过此次提交", taskId);
}
}catch(InterruptedException e){
Thread.currentThread().interrupt();
log.info("线程终端,任务 {} 执行失败", taskId);
}catch(Exception e){
log.info("任务 {} 执行异常", taskId);
}finally{
if(lock.isHeldByCurrentThread){
lock.unlock();
lock.info("已释放锁 {}", lockKey);
}
}这里假设任务比较简单,没有加入锁续期机制,如果要续期,使用lock.lock()可以自动实现看门狗的续期机制。
注解式非入侵实现
直接实现分布式锁需要对具体方法进行代码改动,如果我们引入注解来实现,可以实现非入侵的分布式锁实现。
使用方式:
@DistributedLock(key = "#taskId", prefix = "precheck:", leaseTime = 30)
public void handlePrecheck(Long taskId){...}只需要在方法之前加上自定义的分布式锁注解,使用起来十分轻便。
业务方法(@DistributedLock)
│
▼
DistributedLockAspect(AOP拦截)
│
├─ 解析 SpEL 表达式,得到完整锁 key
│
├─ 调用 IDistributedLock.tryLock(...)
│
│ │
│ ▼
│ RedissonDistributedLock(封装 Redisson 实现)
│ │
│ ├─ 获取 RLock
│ ├─ 执行 tryLock(...)
│ ├─ 成功 → new ILock(RLock, this)
│ ▼
│ 返回 ILock(封装锁引用)
│
├─ 执行业务逻辑 joinPoint.proceed()
│
├─ finally:lock.close()
│
│ ▼
│ ILock.close()
│ │
│ └─ 调用 distributedLock.unlock(lock)
│
│ │
│ ▼
│ RedissonDistributedLock.unlock()
│ ├─ 校验线程持有锁
│ └─ 调用 RLock.unlock()
▼
🔓 锁释放那么如何实现呢?
定义注解@DistributedLock
public @interface DistributedLock{ String key(); String prefix() default "lock:"; long tryTime() default 30; // 尝试时间,30s long leaseTime() default 30; // 释放时间,30s TimeUnit unit() default TimeUnit.Seconds; // 时间单位,秒 boolean fair default false; // 默认非公平锁 }ILock 封装锁逻辑(策略模式)
public class ILock implements AutoCloseable{ private final Object lock; private IDistributedLock distributedLock; // 可以适应不同的分布式锁实现 @Override public void close() throws Exception{ if(lock != null){ distributedLock.unlock(lock); } } }ILock 将锁的逻辑封装,可以通过不同的DistributedLock实现不同的锁(可以是Redis、Zookeeper),这里主要是为了能够实现不同的close,即便是不同的分布式锁,close方法都封装在ILock中,使用时无需关心具体的是什么分布式锁,实现了
策略模式。
继承AutoCloseable很关键,使得try { ILock lock = distributedLock.tryLock(...); }具备自动释放资源的能力,而不用在finally手写资源释放close。IDistributedLock 分布式锁操作接口
public interface IDistributedLock{ ILock lock(String key, long leaseTime, TimeUnit unit, boolean fair); ILock tryLock(String key, long tryTime, long leaseTime, TimeUnit unit, boolean fair) throws Exception; void unlock(Object lock); }RedissonDistributedLock 实现 IDistributedLock 接口
public class RedissonDitributedLock implements IDistributedLock{ @Resource RedissonClient redissonClient; @Override public ILock lock(String key, long leaseTime, TimeUnit unit, boolean fair){ RLock lock = fair? redissonClient.getFairLock(key) : redissonClient.getLock(key); lock.lock(leaseTime, unit); return new ILock(lock, this); } @Override public ILock tryLock(String key, long tryTime, long leaseTime, TimeUnit unit, boolean fair){ RLock lock = fair? redissonClient.getFairLock(key) : redissonClient.getLock(key); boolean acquired = lock.tryLock(tryTime, leaseTime, unit); if(acquired){ return new ILock(lock, this); } return null; } @Override public void unlock(Object lock){ if(lock instanceOf RLock){ RLock rLock = (RLock) lock; try{ if(rLock.islocked() && rLock.isHeldByCurrentThread){ rLock.unlock(); } }catch(IllegalMonitorStateException e){ log.warn("分布式锁释放失败:{}", e.getMessage()); } } } }5.DistibutedLockAspect 切面
public class DistributedLockAspect{ @Resource private final IDistributedLock distributedLock; @around(@annoation(distributedLockAnno)) private Object around(ProceedingJointPoint jointPoint, DistributedLock distributedLockAnno) throws Exception{ String key = parseKey(jointPoint, distributedLockAnno); ILock lock = null; try{ lock = distributedLock.tryLock( key, distributedLockAnno.tryTime(), distributedLockAnno.leaseTime(), distributedLockAnno.unit(), distributedLockAnno.fair() ); if(lock == null){ log.warn("未能获取到锁, {}", key); return null; } Object result = jointPoint.proceed(); // 原本方法执行的结果 } finally{ if (lock != null) { lock.close(); log.info("🔓 已释放锁:{}", key); } } } private String parseKey(ProceedingJoinPoint joinPoint, DistributedLock anno) { String spelKey = anno.key(); MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); String[] paramNames = signature.getParameterNames(); Object[] args = joinPoint.getArgs(); EvaluationContext context = new StandardEvaluationContext(); for (int i = 0; i < paramNames.length; i++) { context.setVariable(paramNames[i], args[i]); } Expression expression = parser.parseExpression(spelKey); Object value = expression.getValue(context); return anno.prefix() + Objects.toString(value); } }
整体流程:
┌────────────────────────────┐
│ @DistributedLock(key="#id")│ ← 业务方法
└────────────┬───────────────┘
│
▼
[DistributedLockAspect]
│
▼
[IDistributedLock 接口]
│
▼
[RedissonDistributedLock]
│
▼
[Redis 实例]