原创

乐观锁、悲观锁、Redis分布式锁和Zookeeper分布式锁的实现以及流程原理

Service中@Transactional注解和synchronized关键字的问题

问题示例

就先举(编)个例子:

@Transactional(rollbackFor = Exception.class)
public synchronized Example exampleDemo(example e) {
    // ...
}

这段代码存在的问题:原本可能他是想要在执行exampleDemo()方法的时候,保证这个线程是安全的。

为什么我会说这段代码存在问题?

因为@Transactional(rollbackFor = Exception.class)是事务操作,事务的范围比synchronized范围大,当锁释放完之后,才会去提交事务,所以,在释放锁和提交事务之间的过程,可能有其它线程进来,简单来讲,加的这个synchronized就是白加了。

如何解决?

方法1-新加一个方法,去调用加了@Transactional注解的方法

这个不用多说...

方法2-不使用synchronized,改为分布式锁

1.数据库锁实现

1.1数据库乐观锁实现
1.1.1基于字段version记录机制实现

乐观锁通常实现是基于数据库版本(version)的记录机制实现的。

比如:我简单假设一下,方便理解,有一个b_product产品表,这个产品表有个字段为count记录这个产品还有多少库存,当一个用户下单之后,此产品需要减一,如果做的不好的count_num出现负数就GG。所以在并发情况下,我肯定起码要保证count_num不能为负数,且一定要保证这段流程没有任何问题。

乐观锁的实现方式为在此b_product表中加上一个version字段,查询的时候将version也查出来,当更新的时候,version加1。所以在更新之前,需要将version查出来,更新的时候,如果version是相同的,就进行更新,否则就说明在这段期间有其它线程对它进行了操作,就不进行操作,然后就是你自己定义的逻辑处理了。

sql语句:

# 1.将此商品的version查出来
select version from t_product where id = #{id}
# 2.更新商品的信息(version与前面查出的相同才进行更新)
update t_product set count_num = count_num - 1 where id = #{id} and version = #{version}
1.1.2基于时间戳实现

其实和字段version一样的,同样的,需要在表中新增一个字段,字段类型使用时间戳timestamp,假设为update_time,和前面version实现方式一样,也是在提交更新的时候,将第一次查询的时间戳与更新前取到的时间戳进行对比,如果一致,说明没线程在这期间对它进行修改,如果不一致,说明这期间有线程对它进行了修改,就不更新,然后就是你自己定义的逻辑处理了。

1.2数据库悲观锁实现

这个稍微费那么一丢丢时间就是,实现方式就是,我认为别人一定会同时和我一起修改数据,我在操作数据的时候直接把数据锁住,直到我操作完成之后我才释放锁,因为上锁了其它人就不能修改数据了。

sql语句:

# 1.开始事务 (三种方式开始事务,选一种就行了)
begin
begin work
start transaction

# 2.查询出商品信息
# 获取锁
select name, count_num from b_product where id = #{id} for update

update t_product set count_num = count_num - 1 where id = #{id} and version = #{version}

# 3.提交事务(释放锁)
commit
commit work

当然,使用数据库的锁,必须要注意,不要处理不当,产生了死锁。

死锁:例如,如果线程A锁住了记录1并等待记录2,而线程B锁住了记录2并等待记录1,这样两个线程就发生了死锁现象。

2.基于Redis的分布式锁-单节点与集群

单节点

Redis分布式锁的实现流程.png

添加Maven依赖:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

RedisLock注解:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RedisLock {

    /**
     * 锁的过期秒数,默认是5秒
     *
     * @return
     */
    int expire() default 5;

    /**
     * 尝试加锁,最多等待时间
     *
     * @return
     */
    long waitTime() default Long.MIN_VALUE;

    /**
     * 锁的超时时间单位
     *
     * @return
     */
    TimeUnit timeUnit() default TimeUnit.SECONDS;

}

RedisLockAspect进行AOP操作:

@Aspect
@Component
@Slf4j
@AllArgsConstructor
public class RedisLockAspect {
    private final RedisLockHelper redisLockHelper;
    private final JedisUtil jedisUtil;

    @Around("@annotation(com.lzhpo.ano.RedisLock)")
    public Object around(ProceedingJoinPoint joinPoint) {
        log.info("[Redis锁]环绕通知开启...");
        Jedis jedis = jedisUtil.getJedis();
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();

        // 拦截的实体类名
        String className = joinPoint.getTarget().getClass().getName();
        // 拦截的方法名称
        String methodName = joinPoint.getSignature().getName();
        String key = String.format("%s#%s", className, methodName);
        log.info("key(类名#方法名):{}", key);

        RedisLock redisLock = method.getAnnotation(RedisLock.class);
        // 使用UUID作为Value,将Key和Value关联,加锁使用SET key value EX second,释放锁先判断Key和Value是不是关联的,再释放
        String value = UUID.randomUUID().toString();
        try {
            final boolean isLocked = redisLockHelper.lock(jedis, key, value, redisLock.expire(), redisLock.timeUnit());
            log.info("是否被锁住? {}", isLocked);
            if (!isLocked) {
                log.error("获取锁失败");
                throw new RuntimeException("获取锁失败");
            }
            try {
                return joinPoint.proceed();
            } catch (Throwable e) {
                log.error("系统异常:{}", e.getMessage());
                // 打印栈堆信息,方便调试定位异常
                e.printStackTrace();
                // 嵌入自己的代码中可以使用自定义的异常,抛出异常,全局捕获异常返回
                throw new RuntimeException("系统异常:" + e.getMessage());
            }
        } finally {
            boolean unlock = redisLockHelper.unlock(jedis, key, value);
            log.info("是否成功释放锁?" + unlock);

            jedis.close();

            log.info("[Redis锁]环绕通知结束...");
        }
    }

}

加锁释放锁的工具RedisLockHelper

@Component
@Slf4j
public class RedisLockHelper {
    /**
     * 从 Redis 2.6.12 版本开始, SET 命令的行为可以通过一系列参数来修改:
     *
     * EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。
     * PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。
     * NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。
     * XX :只在键已经存在时,才对键进行设置操作。
     *
     * @param key
     * @param value
     * @param timeout
     * @return 加锁是否成功
     */
    public boolean lock(Jedis jedis, String key, String value, int timeout, TimeUnit timeUnit) {
        log.info("[lock]加锁...");
        long seconds = timeUnit.toSeconds(timeout);
        return "OK".equals(jedis.set(key, value, "NX", "EX", seconds));
    }

    /**
     * 使用Lua脚本进行解锁操纵,解锁的时候验证value值
     *
     * 解决的问题:A线程启动,如果业务代码没有执行完毕,Key过期了,此时,B线程启动,
     * 因为A线程的Key已经过期了,所以B线程自然而然也得到了锁,最后B线程释放锁的时候,
     * 其实释放的是A线程的锁,这就导致了不安全的事情可能会发生。
     *
     * 如何解决这个问题?
     * 在加锁的时候,设置Value为UUID或者当前线程的ID当做Value,并在删除的时候判断对应
     * 的Value是不是自己线程的ID。
     *
     * 线程ID的办法:
     * <code>
     * // 加锁:
     * String threadId = Thread.currentThread().getId()
     * set(key,threadId ,30,NX)
     *
     * doSomething.....
     *
     * // 解锁:
     * if(threadId .equals(redisClient.get(key))){
     *     del(key)
     * }
     * </code>
     *
     *
     * @param jedis
     * @param key
     * @param value
     * @return
     */
    public boolean unlock(Jedis jedis, String key, String value) {
        log.info("[unlock]使用Lua脚本释放锁...");
        String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
                "return redis.call('del',KEYS[1]) else return 0 end";
        return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L);
    }
}

Redis连接工具JedisUtil

@Component
@Slf4j
@AllArgsConstructor
public class JedisUtil {
    private final RedisProperties redisProperties;
    private final ConcurrentHashMap<String, JedisPool> map = new ConcurrentHashMap<>();

    private JedisPool getPool() {
        String key = redisProperties.getHost() + ":" + redisProperties.getHost();
        JedisPool pool;
        if (!map.containsKey(key)) {
            JedisPoolConfig config = new JedisPoolConfig();
            config.setMaxIdle(redisProperties.getMaxIdle());
            config.setMaxWaitMillis(redisProperties.getMaxWait());
            config.setTestOnBorrow(true);
            config.setTestOnReturn(true);

            pool = new JedisPool(config, 
                    redisProperties.getHost(), redisProperties.getPort(),
                    redisProperties.getTimeout(), redisProperties.getPassword(), 
                    redisProperties.getDataBase());
            map.put(key, pool);
        } else {
            pool = map.get(key);
        }
        return pool;
    }

    public Jedis getJedis() {
        Jedis jedis = null;
        int count = 0;
        do {
            try {
                jedis = getPool().getResource();
                count++;
            } catch (Exception e) {
                log.error("get jedis failed ", e);
                if (jedis != null) {
                    jedis.close();
                }
            }
        } while (jedis == null && count < redisProperties.getRetryNum());
        return jedis;
    }
}

配置文件以及配置:

redis:
  config:
    host: localhost
    port: 6379
    password: 123456
    dataBase: 1
    maxIdle: 30
    maxWait: 15000
    timeout: 30000
    retryNum: 5
@Setter
@Getter
@Configuration
@ConfigurationProperties(prefix = "redis.config")
public class RedisProperties {
    private String host;
    private int port;
    private String password;
    private int dataBase;
    private int maxIdle;
    private int maxWait;
    private int timeout;
    private int retryNum;
}

测试使用:

@RestController
@AllArgsConstructor
public class TestController {
    private final Test1Service test1Service;

    @RedisLock
    @RequestMapping("/test1")
    public String test() {
        return test1Service.test1();
    }
}
@Service
@Slf4j
public class Test1Service {

    /**
     * 当获取锁失败,抛出异常的时候,线程也就不会进来这里
     * @return
     */
    public String test1() {
        String str = "I'm Test1Service";
        log.info("Test1Service:{}", str);
        return str;
    }
}
为什么要使用UUID/当前线程ID作为Value?

不使用UUID/当前线程ID作为Value,存在的问题:

A线程启动,如果业务代码没有执行完毕,Key过期了,此时,B线程启动,因为A线程的Key已经过期了,所以B线程自然而然也得到了锁,最后B线程释放锁的时候,其实释放的是A线程的锁,这就导致了不安全的事情可能会发生。

---解决办法1

在加锁的时候,设置Value为UUID或者当前线程的ID当做Value,并在删除的时候判断对应的Value是不是自己线程的ID。也就是上面写的这种方式,请再次看我写的注释,就可以理解了。

    /**
     * 从 Redis 2.6.12 版本开始, SET 命令的行为可以通过一系列参数来修改:
     *
     * EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。
     * PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。
     * NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。
     * XX :只在键已经存在时,才对键进行设置操作。
     *
     * @param key
     * @param value
     * @param timeout
     * @return 加锁是否成功
     */
    public boolean lock(Jedis jedis, String key, String value, int timeout, TimeUnit timeUnit) {
        log.info("[lock]加锁...");
        long seconds = timeUnit.toSeconds(timeout);
        return "OK".equals(jedis.set(key, value, "NX", "EX", seconds));
    }

    /**
     * 使用Lua脚本进行解锁操纵,解锁的时候验证value值
     *
     * 解决的问题:A线程启动,如果业务代码没有执行完毕,Key过期了,此时,B线程启动,
     * 因为A线程的Key已经过期了,所以B线程自然而然也得到了锁,最后B线程释放锁的时候,
     * 其实释放的是A线程的锁,这就导致了不安全的事情可能会发生。
     *
     * 如何解决这个问题?
     * 在加锁的时候,设置Value为UUID或者当前线程的ID当做Value,并在删除的时候判断对应
     * 的Value是不是自己线程的ID。
     *
     * 线程ID的办法:
     * <code>
     * // 加锁:
     * String threadId = Thread.currentThread().getId()
     * set(key,threadId ,30,NX)
     *
     * doSomething.....
     *
     * // 解锁:
     * if(threadId .equals(redisClient.get(key))){
     *     del(key)
     * }
     * </code>
     *
     *
     * @param jedis
     * @param key
     * @param value
     * @return
     */
    public boolean unlock(Jedis jedis, String key, String value) {
        log.info("[unlock]使用Lua脚本释放锁...");
        String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
                "return redis.call('del',KEYS[1]) else return 0 end";
        return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L);
    }
---解决办法2

在加锁和释放锁的时候,设置一个守护线程,对key进行续命。

//开启守护线程:
final int tmpExpireTime = expireTime;
final String tmpKey = key;
Thread thread = new Thread(new Runnable(){
    @Override
    public void run() {
        for(;;){
            jedis.expire(tmpKey,tmpExpireTime);
            try {
                Thread.sleep(tmpExpireTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
});
thread.setDaemon(true);
thread.start();
Redis集群模式-Redis锁之Redlock算法

如果A往Master放入了一把锁,然后再数据同步到Slave之前,Master挂掉,Slave被提拔为Master,这时候Master上面就没有锁了,这样其他进程也可以拿到锁,违法了锁的互斥性。

针对Redis集群架构,redis的作者antirez提出了Redlock算法,来实现集群架构下的分布式锁。

Redlock算法:现在假设有5个Redis主节点(大于3的奇数个),这样基本保证他们不会同时都宕掉,获取锁和释放锁的过程中,客户端会执行以下操作:

  • 1.获取当前Unix时间,以毫秒为单位
  • 2.依次尝试从5个实例,使用相同的key和具有唯一性的value获取锁
    当向Redis请求获取锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间,这样可以避免客户端死等
  • 3.客户端使用当前时间减去开始获取锁时间就得到获取锁使用的时间。当且仅当从半数以上的Redis节点取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功
  • 4.如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间,这个很重要
  • 5.如果因为某些原因,获取锁失败(没有在半数以上实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁,无论Redis实例是否加锁成功,因为可能服务端响应消息丢失了但是实际成功了,毕竟多释放一次也不会有问题
马丁博士对Redlock的质疑

假设多节点Redis系统有五个节点A/B/C/D/E和两个客户端C1和C2,如果其中一个Redis节点上的时钟向前跳跃会发生什么?

  • 客户端C1获得了对节点A、B、c的锁定,由于网络问题,法到达节点D和节点E
  • 节点C上的时钟向前跳,导致锁提前过期
  • 客户端C2在节点C、D、E上获得锁定,由于网络问题,无法到达A和B
  • 客户端C1和客户端C2现在都认为他们自己持有锁

最后马丁出了如下的结论:

  • 为了效率而使用分布式锁
    单Redis节点的锁方案就足够了Redlock则是个过重而昂贵的设计
  • 为了正确而使用分布式锁
    Redlock不是建立在异步模型上的一个足够强的算法,它对于系统模型的假设中包含很多危险的成分

参考Demo,更多操作可以看Redlock的源码:

添加Maven依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.9.0</version>
</dependency>
@Bean
public RedissonClient redissonClient() {
    Config config = new Config();
    config.useClusterServers()
        .setScanInterval(2000) // 集群状态扫描间隔时间,单位是毫秒
        //可以用"rediss://"来启用SSL连接
        .addNodeAddress("redis://127.0.0.1:6377", "redis://127.0.0.1:6378")
        .addNodeAddress("redis://127.0.0.1:6379");
    return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient() {
    Config config = new Config();
    config.useClusterServers()
        .setScanInterval(2000) // 集群状态扫描间隔时间,单位是毫秒
        //可以用"rediss://"来启用SSL连接
        .addNodeAddress("redis://127.0.0.1:6377", "redis://127.0.0.1:6378")
        .addNodeAddress("redis://127.0.0.1:6379");
    return Redisson.create(config);
}
@Autowired
private RedissonClient redissonClient;

public void RLockDemo() {
    RLock redLock = redissonClient.getLock("RED_LOCK_DEMO");
    boolean isLock;
    try {
        isLock = redLock.tryLock();
        System.out.println("isLock?" +isLock);
        // 500ms拿不到锁, 就认为获取锁失败。10000ms即10s是锁失效时间。
        isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
        if (isLock) {
            // TODO:业务逻辑操作...
        }
    } catch (Exception e) {
        log.error("{}", e.getMessage());
    } finally {
        // 解锁
        redLock.unlock();
    }
}

推荐阅读文章:https://redis.io/topics/distlock

3.基于Zookeeper的分布式锁

主要原理就是:在Zookeeper创建一个临时的同步顺序节点,如果创建成功,说明获得了锁,如果创建失败,表示这歌锁已经被当前线程占有了,就对这个节点进行监听,当监听到这个节点被删除之后,就再进行创建临时节点。

说明:临时节点在ZK连接断开的时候会自动删除。

Zookeeper分布式锁的实现流程.png

自定义锁的接口Lock

/**
 * 自定义锁接口
 */
public interface Lock {

    /**
     * 获取锁
     */
    void getLock();

    /**
     * 释放锁
     */
    void unLock();

}

锁的抽象方法:

public abstract class AbstractLock implements Lock {
    @Override
    public void getLock() {
        if (tryLock()) {
            System.out.println("获取到了自定义Lock锁的资源......");
        } else {
            // 没拿到资源,进入等待
            waitLock();
            getLock();
        }
    }

    @Override
    public void unLock() {

    }

    /**
     * 尝试获取锁,如果拿到了锁返回true,没有拿到则返回false
     */
    public abstract boolean tryLock();

    /**
     * 阻塞,等待获取锁
     */
    public abstract void waitLock();
}

定义Zookeeper的连接,写为抽象类,让其锁的具体实现类继承:

public abstract class ZooKeeperAbstractLock extends AbstractLock {

    /** 多个地址使用逗号分隔 */
    private static final String SERVER_ADDR = "192.168.200.109:2181";

    protected ZkClient zkClient = new ZkClient(SERVER_ADDR);

    protected static final String PATH = "/lock";

}

具体锁的实现:

public class ZooKeeperDistrbuteLock2 extends ZooKeeperAbstractLock {

    private CountDownLatch countDownLatch = null;
    /**
     * 当前请求节点的前一个节点
     */
    private String beforePath;
    /**
     * 当前请求的节点
     */
    private String currentPath;

    public ZooKeeperDistrbuteLock2() {
        if (!zkClient.exists(PATH)) {
            // 创建持久节点,保存临时顺序节点
            zkClient.createPersistent(PATH);
        }
    }

    @Override
    public boolean tryLock() {
        // 如果currentPath为空则为第一次尝试拿锁,第一次拿锁赋值currentPath
        if (currentPath == null || currentPath.length() == 0) {
            // 在指定的持久节点下创建临时顺序节点
            currentPath = zkClient.createEphemeralSequential(PATH + "/", "lock");
        }
        // 获取所有临时节点并排序,例如:000044
        List<String> childrenList = zkClient.getChildren(PATH);
        Collections.sort(childrenList);

        if (currentPath.equals(PATH + "/" + childrenList.get(0))) {
            // 如果当前节点在所有节点中排名第一则获取锁成功
            return true;
        } else {
            int wz = Collections.binarySearch(childrenList, currentPath.substring(6));
            beforePath = PATH + "/" + childrenList.get(wz - 1);
        }
        return false;
    }

    @Override
    public void waitLock() {
        // 创建监听
        IZkDataListener iZkDataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {

            }

            @Override
            public void handleDataDeleted(String s) throws Exception {
                // 释放锁,删除节点时唤醒等待的线程
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
            }
        };

        // 注册监听,这里是给排在当前节点前面的节点增加(删除数据的)监听,本质是启动另外一个线程去监听前置节点
        zkClient.subscribeDataChanges(beforePath, iZkDataListener);

        // 前置节点存在时,等待前置节点删除唤醒
        if (zkClient.exists(beforePath)) {
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 删除对前置节点的监听
        zkClient.unsubscribeDataChanges(beforePath, iZkDataListener);
    }

    /**
     * 释放锁
     */
    @Override
    public void unLock() {
        if (zkClient != null) {
            System.out.println("释放锁资源");
            zkClient.delete(currentPath);
            zkClient.close();
        }
    }
}

测试:

/**
 * Zookeeper实现分布式锁 - 方法2
 *
 * 使用临时顺序节点,临时顺序节点排序,每个临时顺序节点只监听它本身的前一个节点变化,
 * 避免产生“羊群效应”(一旦临时节点删除,释放锁,那么其他在监听这个节点变化的线程,就会去竞争锁,同时访问 ZooKeeper)。
 */
public class AppRun {

    private static class OrderNumGeneratorService implements Runnable {
        private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();

        /** 方法1 */
//        private Lock lock = new ZooKeeperDistrbuteLock();

        /** 方法2 - 优化 */
        private Lock lock = new ZooKeeperDistrbuteLock2();

        @Override
        public void run() {
            lock.getLock();
            try {
                System.out.println(Thread.currentThread().getName() + ", 生成订单编号:"  + orderNumGenerator.getOrderNumber());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unLock();
            }
        }
    }

    public static void main(String[] args) {
        System.out.println("----------生成唯一订单号----------");
        // 使用50个线程来并发测试ZooKeeper实现的分布式锁
        for (int i = 0; i < 50; i++) {
            new Thread(new OrderNumGeneratorService()).start();
        }
    }
}

生成随机订单号:

/**
 * 生成随机订单号
 *
 * @author lzhpo
 */
public class OrderNumGenerator {

    private static long count = 0;

    /**
     * 使用日期加数值拼接成订单号
     */
    public String getOrderNumber() throws Exception {
        String date = DateTimeFormatter.ofPattern("yyyyMMddHHmmss").format(LocalDateTime.now());
        String number = new DecimalFormat("000000").format(count++);
        return date + number;
    }

}
正文到此结束
本文目录