1、问题描述
随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的 Java API 并不能提供分布式锁的能力。为了解决这个问题就需要一种跨 JVM 的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题。
分布式锁主流的实现方案:
- 基于数据库实现分布式锁
- 基于缓存(Redis 等)
- 基于 Zookeeper
我们就基于 Redis 实现分布式锁。
2、分布式锁指令
使用命令
| 1
 | set <key> <value> <nx / xx> <px millisecond / ex second>
 | 
nx和xx 二选一:
- nx:只在键不存在时,才对键进行设置操作
- xx:只在键已经存在时,才对键进行设置操作
px millisecond 和 ex second二选一:
- px millisecond:设置键的过期时间为 millisecond 毫秒
- ex second:设置键的过期时间为 second 秒
注意:Redis 实现分布式锁的指令是 setnx,该指令的功能是:
- 如果插入的 key 没有存在 Redis,则将 key-value 存入 Redis
- 如果插入的 key 已经存在 Redis,则 value 失效,无法重新覆盖原来的 value
这样就实现了分布式锁:key 存在则代表有人操作,其他人无法操作。
3、Java分布式锁流程
- 拿锁
- 业务操作
- 释放锁
properties 配置文件内容
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 
 | server.port=8081
 
 spring.redis.host=192.168.199.27
 
 spring.redis.port=6379
 
 spring.redis.database= 0
 
 spring.redis.timeout=1800000
 
 spring.redis.lettuce.pool.max-active=20
 
 spring.redis.lettuce.pool.max-wait=-1
 
 spring.redis.lettuce.pool.max-idle=5
 
 spring.redis.lettuce.pool.min-idle=0
 
 | 
Redis 核心配置类:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 
 | @EnableCaching@Configuration
 public class RedisConfig extends CachingConfigurerSupport {
 
 @Bean
 public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
 RedisTemplate<String, Object> template = new RedisTemplate<>();
 RedisSerializer<String> redisSerializer = new StringRedisSerializer();
 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new
 Jackson2JsonRedisSerializer(Object.class);
 ObjectMapper om = new ObjectMapper();
 om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
 om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
 jackson2JsonRedisSerializer.setObjectMapper(om);
 template.setConnectionFactory(factory);
 
 template.setKeySerializer(redisSerializer);
 
 template.setValueSerializer(jackson2JsonRedisSerializer);
 
 template.setHashValueSerializer(jackson2JsonRedisSerializer);
 return template;
 }
 
 @Bean
 public CacheManager cacheManager(RedisConnectionFactory factory) {
 RedisSerializer<String> redisSerializer = new StringRedisSerializer();
 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
 
 ObjectMapper om = new ObjectMapper();
 om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
 om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
 jackson2JsonRedisSerializer.setObjectMapper(om);
 
 RedisCacheConfiguration config =
 RedisCacheConfiguration.defaultCacheConfig()
 .entryTtl(Duration.ofSeconds(600))
 .serializeKeysWith(RedisSerializationContext.SerializationPair.
 fromSerializer(redisSerializer))
 .serializeValuesWith(RedisSerializationContext.SerializationPair
 .fromSerializer(jackson2JsonRedisSerializer))
 .disableCachingNullValues();
 RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
 .cacheDefaults(config)
 .build();
 return cacheManager;
 }
 }
 
 | 
3.1 代码一(无过期时间)
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 
 | @RestController@RequestMapping("/redisTest")
 public class RedisLocked {
 @Autowired
 private RedisTemplate<String,String> redisTemplate;
 
 @GetMapping("testLock")
 public void testLock(){
 
 Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
 
 if(lock){
 Object value = redisTemplate.opsForValue().get("num");
 
 if(StringUtils.isEmpty(value)){
 return;
 }
 
 int num = Integer.parseInt(value + "");
 
 redisTemplate.opsForValue().set("num", String.valueOf(++num));
 
 redisTemplate.delete("lock");
 }else{
 
 try {
 Thread.sleep(100);
 testLock();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 }
 }
 
 | 
重启 Redis 服务集群,这里利用 ab 网关压力测试:
| 1
 | ab -n 1000 -c 100 http://192.168.1.113:8081/redisTest/testLock
 | 
192.168.1.113 是本机的 IP,此时是 Linux 系统访问本机的 Spring Boot 项目。

查看 redis 中 num 的值:

可能出现的问题:setnx 刚好获取到锁,业务逻辑出现异常 Exception,导致锁无法释放,卡死。
解决:设置过期时间,自动释放锁。
3.2 代码一优化——设置锁的过期时间
设置过期时间有两种方式:
- 首先想到通过 expire 设置过期时间(缺乏原子性:如果在 setnx 和 expire 之间出现异常,锁也无法释放)
- 在 set 时指定过期时间(推荐)

3.3 代码二(无唯一标识)
在代码一的基础上加上超时时间,看第八行代码
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 
 | public class RedisLocked {@Autowired
 private RedisTemplate<String,String> redisTemplate;
 
 @GetMapping("testLock1")
 public void testLock(){
 
 Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111", 2, TimeUnit.SECONDS);
 
 if(lock){
 Object value = redisTemplate.opsForValue().get("num");
 
 if(StringUtils.isEmpty(value)){
 return;
 }
 
 int num = Integer.parseInt(value + "");
 
 redisTemplate.opsForValue().set("num", String.valueOf(++num));
 
 redisTemplate.delete("lock");
 }else{
 
 try {
 Thread.sleep(100);
 testLock();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 }
 }
 
 | 
出现的问题:如果线程 1 持有锁,但是操作卡顿 3 秒,而锁是 2 秒过期,导致 2 秒后线程 2 拿到锁,当线程 2 拿到锁时,再过 1 秒后线程 1 才释放锁,也就是释放了进程 2 拿的锁。
解决:setnx 获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁。
3.4 代码二优化——UUID防误删

3.5 代码三(无原子性)
在代码一的基础上,加上了 uuid,看第 23 - 26 行代码
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 
 | public class RedisLocked {@Autowired
 private RedisTemplate<String,String> redisTemplate;
 
 @GetMapping("testLock")
 public void testLocked(){
 String locKey = "lock";
 String uuid = UUID.randomUUID().toString();
 Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 2, TimeUnit.SECONDS);
 
 if(lock){
 String value = redisTemplate.opsForValue().get("num");
 if(StringUtils.isEmpty(value)){
 return;
 }
 int num = Integer.parseInt(value + "");
 redisTemplate.opsForValue().set("num", String.valueOf(++num));
 
 
 
 
 
 if(uuid.equals(redisTemplate.opsForValue().get(locKey))){
 
 redisTemplate.delete(locKey);
 }
 }else {
 try {
 Thread.sleep(200);
 testLocked();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 }
 }
 
 | 
遇到的问题:当 uuid 相等时,进入方法里,执行释放锁的那一瞬间之前,锁过期了,那么其他进程拿到了锁,但释放的是其他进程拿的锁。
有时候就是那么巧,虽然 if 判断的时候锁没有过期,但是进入 if 里面的那一瞬间,过期了,导致过期后被其他进程拿到锁,可惜没拿稳,就被释放了。
解决:利用 LUA 脚本实现原子性,即流程没有完全结束(释放锁),不会被其他进程拿到锁。
3.6 代码四(终极版)
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 
 | public class RedisLocked {@Autowired
 private RedisTemplate<String,String> redisTemplate;
 
 @GetMapping("testLock")
 public void testLocked(){
 String locKey = "lock";
 String uuid = UUID.randomUUID().toString();
 Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 2, TimeUnit.SECONDS);
 
 if(lock){
 String value = redisTemplate.opsForValue().get("num");
 if(StringUtils.isEmpty(value)){
 return;
 }
 int num = Integer.parseInt(value + "");
 redisTemplate.opsForValue().set("num", String.valueOf(++num));
 
 
 
 
 
 
 
 
 
 
 
 
 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
 
 DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
 redisScript.setScriptText(script);
 
 
 
 redisScript.setResultType(Long.class);
 
 redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
 
 }else {
 try {
 Thread.sleep(200);
 testLocked();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 }
 }
 
 | 
总结
Java 代码总结
- 加锁(setnx 指令)
- 添加过期时间(setnx 指令加时间)
- 添加唯一标识如:uuid(将 uuid 放入 Reids,然后操作时获取 uuid,添加 if 判断)
- 添加原子性,用 LUA 语言实现(第 2、3 步用 LUA 语言编写)
分布式锁总结
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
- 互斥性。在任意时刻,只有一个客户端能持有锁
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁
- 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了
- 加锁和解锁必须具有原子性