Redisson分布式锁避免重复提交任务

· 默认分类

Redisson 分布式锁避免重复提交任务

为什么要引入 分布式锁

在本地锁场景下,已经能够解决本地服务下多线程竞争资源的同步问题,但是在分布式的场景下,服务A对资源x加锁,而服务B不受服务A的限制,若此时服务B也去访问资源B,就会出现数据不一致的问题,导致服务出错。

分布式锁

分布式锁是分布式集群场景下共享的一种锁机制,保证了不同服务下的互斥性,任意时刻,只有一个客户端可以持有锁
特性:

几种实现方式

其中,Redis性能最高,方式有两种,一是利用SetNX(但是有死锁问题),而是Redisson ,也是最常用的。

实现

直接实现

String lockKey = REVIEW_LOCK_PREFIX + taskId;
RLock lock = redissonCilent.getLock(lockKey);

try{
    // 尝试获取锁(最多等待2秒,锁30秒后释放)
    if(lock.tryLock(2, 30, TimeUnit.second){
        // 任务执行逻辑
        // ...
    }else{
        log.info("任务 {} 正在被其他人员处理,跳过此次提交", taskId);
    }
}catch(InterruptedException e){
    Thread.currentThread().interrupt();
    log.info("线程终端,任务 {} 执行失败", taskId);
}catch(Exception e){
    log.info("任务 {} 执行异常", taskId);
}finally{
    if(lock.isHeldByCurrentThread){
        lock.unlock();
        lock.info("已释放锁 {}", lockKey);
    }
}

这里假设任务比较简单,没有加入锁续期机制,如果要续期,使用lock.lock()可以自动实现看门狗的续期机制。

注解式非入侵实现

直接实现分布式锁需要对具体方法进行代码改动,如果我们引入注解来实现,可以实现非入侵的分布式锁实现。

使用方式:

@DistributedLock(key = "#taskId", prefix = "precheck:", leaseTime = 30)
public void handlePrecheck(Long taskId){...}

只需要在方法之前加上自定义的分布式锁注解,使用起来十分轻便。

业务方法(@DistributedLock)
     │
     ▼
DistributedLockAspect(AOP拦截)
     │
     ├─ 解析 SpEL 表达式,得到完整锁 key
     │
     ├─ 调用 IDistributedLock.tryLock(...)
     │
     │       │
     │       ▼
     │   RedissonDistributedLock(封装 Redisson 实现)
     │       │
     │       ├─ 获取 RLock
     │       ├─ 执行 tryLock(...)
     │       ├─ 成功 → new ILock(RLock, this)
     │       ▼
     │   返回 ILock(封装锁引用)
     │
     ├─ 执行业务逻辑 joinPoint.proceed()
     │
     ├─ finally:lock.close()
     │
     │       ▼
     │   ILock.close()
     │       │
     │       └─ 调用 distributedLock.unlock(lock)
     │
     │              │
     │              ▼
     │        RedissonDistributedLock.unlock()
     │              ├─ 校验线程持有锁
     │              └─ 调用 RLock.unlock()
     ▼
   🔓 锁释放

那么如何实现呢?

  1. 定义注解@DistributedLock

    public @interface DistributedLock{
        String key(); 
        String prefix() default "lock:";
        long tryTime() default 30; // 尝试时间,30s
        long leaseTime() default 30; // 释放时间,30s
        TimeUnit unit() default TimeUnit.Seconds; // 时间单位,秒
        boolean fair default false; // 默认非公平锁
    }
  2. ILock 封装锁逻辑(策略模式)

    public class ILock implements AutoCloseable{
        private final Object lock;
        private IDistributedLock distributedLock; // 可以适应不同的分布式锁实现
        
        @Override
        public void close() throws Exception{
            if(lock != null){
                distributedLock.unlock(lock);
            }
        }
    }

    ILock 将锁的逻辑封装,可以通过不同的DistributedLock实现不同的锁(可以是Redis、Zookeeper),这里主要是为了能够实现不同的close,即便是不同的分布式锁,close方法都封装在ILock中,使用时无需关心具体的是什么分布式锁,实现了策略模式
    继承AutoCloseable很关键,使得 try { ILock lock = distributedLock.tryLock(...); } 具备自动释放资源的能力,而不用在finally手写资源释放close。

  3. IDistributedLock 分布式锁操作接口

    public interface IDistributedLock{
        ILock lock(String key, long leaseTime, TimeUnit unit, boolean fair);
        ILock tryLock(String key, long tryTime, long leaseTime, TimeUnit unit, boolean fair) throws Exception;
        void unlock(Object lock);
    }
  4. RedissonDistributedLock 实现 IDistributedLock 接口

    public class RedissonDitributedLock implements IDistributedLock{
        @Resource
        RedissonClient redissonClient;
        
        @Override
        public ILock lock(String key, long leaseTime, TimeUnit unit, boolean fair){
            RLock lock = fair? redissonClient.getFairLock(key) : redissonClient.getLock(key);
            lock.lock(leaseTime, unit);
            return new ILock(lock, this);
        }
        
        @Override
        public ILock tryLock(String key, long tryTime, long leaseTime, TimeUnit unit, boolean fair){
            RLock lock = fair? redissonClient.getFairLock(key) : redissonClient.getLock(key);
            boolean acquired = lock.tryLock(tryTime, leaseTime, unit);
            if(acquired){
                return new ILock(lock, this);
            }
            return null;
        }
        
        @Override
        public void unlock(Object lock){
            if(lock instanceOf RLock){
                RLock rLock = (RLock) lock;
                try{
                    if(rLock.islocked() && rLock.isHeldByCurrentThread){
                        rLock.unlock();
                    }
                }catch(IllegalMonitorStateException e){
                    log.warn("分布式锁释放失败:{}", e.getMessage());
                }
            } 
        }
    }

    5.DistibutedLockAspect 切面

    public class DistributedLockAspect{
        
        @Resource
        private final IDistributedLock distributedLock;
        
        @around(@annoation(distributedLockAnno))
        private Object around(ProceedingJointPoint jointPoint, DistributedLock distributedLockAnno) throws Exception{
            String key = parseKey(jointPoint, distributedLockAnno);
            ILock lock = null;
            try{
                lock = distributedLock.tryLock(
                        key,
                        distributedLockAnno.tryTime(),
                        distributedLockAnno.leaseTime(),
                        distributedLockAnno.unit(),
                        distributedLockAnno.fair()        
                );
                if(lock == null){
                    log.warn("未能获取到锁, {}", key);
                    return null;
                }
                
                Object result = jointPoint.proceed(); // 原本方法执行的结果
            } finally{
                if (lock != null) {
                    lock.close();
                    log.info("🔓 已释放锁:{}", key);
                }
            }
    
        }
        
        private String parseKey(ProceedingJoinPoint joinPoint, DistributedLock anno) {
            String spelKey = anno.key();
            MethodSignature signature = (MethodSignature) joinPoint.getSignature();
            Method method = signature.getMethod();
            String[] paramNames = signature.getParameterNames();
            Object[] args = joinPoint.getArgs();
    
            EvaluationContext context = new StandardEvaluationContext();
            for (int i = 0; i < paramNames.length; i++) {
                context.setVariable(paramNames[i], args[i]);
            }
    
            Expression expression = parser.parseExpression(spelKey);
            Object value = expression.getValue(context);
            return anno.prefix() + Objects.toString(value);
        }
        
    }

整体流程:

┌────────────────────────────┐
│ @DistributedLock(key="#id")│  ← 业务方法
└────────────┬───────────────┘
             │
             ▼
     [DistributedLockAspect]
             │
             ▼
     [IDistributedLock 接口]
             │
             ▼
     [RedissonDistributedLock]
             │
             ▼
     [Redis 实例]