✅P158_缓存-分布式锁-分布式锁原理与使用
一、分布式锁演进 - 基本原理
我们可以同时去一个地方“占坑”,如果占到,就执行逻辑。否则就必须等待,直到释放锁。
“占坑”可以去redis,可以去数据库,可以去任何大家都能访问的地方。
等待可以自旋的方式。
set命令
set key value [EX secondes] [PX milliseconds] [NX|XX]
key
:必填,键value
:必填,值[EX secondes]
:过期时间,单位(秒)[PX milliseconds]
:过期时间,毫秒[NX|XX]
:- NX,只有key不存在的时候才会设置key的值
- XX,只有key存在的时候才会设置key的值
二、模拟多个客户端去redis抢占锁
2.1 MobaXterm测试
实现执行一条命令发送到多个服务器,这个功能叫做多执行器(MultiExec)
执行以下命令发送给全部会话
# 统一进入redis控制台
docker exec -it redis redis-cli
# 进行抢占锁
set lock test NX
发现只有3号会话抢到锁了,返回了ok,其它会话都没设置上,返回的是nil,所以set lock test NX
命令原子加锁
2.2 Xshell测试
首先多复制几份虚拟机的会话
执行以下命令发送给全部会话
# 统一进入redis控制台
docker exec -it redis redis-cli
# 进行抢占锁
set lock test NX
发现只有1号会话抢到锁了,返回了ok,其它会话都没设置上,返回的是nil,set lock test NX
命令原子加锁
显示撰写栏
显示撰写栏:查看->撰写->撰写栏
发送所有绘话:下方红色框最左侧鼠标单击。
三、分布式锁演进 - 阶段一
cfmall-product/src/main/java/com/gyz/cfmall/product/service/impl/CategoryServiceImpl.java
问题:
- setnx占好了位,业务代码异常或者程序在页面过程中宕机。没有执行删除锁逻辑,这就造成了死锁
@Override
public Map<String, List<Catelog2Vo>> getCatalogJson() {
//加入缓存逻辑
String catelogJSON = stringRedisTemplate.opsForValue().get("catelogJSON");
if (StringUtils.isEmpty(catelogJSON)) {
//缓存中没有,查询数据库
System.out.println("缓存未命中.....查询数据库");
Map<String, List<Catelog2Vo>> categoryJsonMap = getCatalogJsonFromDb();
}
System.out.println("缓存命中.....直接返回");
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catelogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {
});
return result;
}
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDb() {
return getCatalogJsonFromDbWithRedisLock();
}
private Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
// 去redis占锁
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111");
if (lock) {
// 加锁成功,执行业务
Map<String, List<Catelog2Vo>> categoryMap = getDataFromDb();
// 业务执行完,需要删除锁,别人就可以来占锁了
stringRedisTemplate.delete("lock");
return categoryMap;
} else {
// 加锁失败,休眠200ms重试
try {
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
// 重试,使用自旋的方式,模仿本地sync监听锁
return getCatalogJsonFromDbWithRedisLock();
}
}
解决:
- 设置锁的自动过期,即使没有删除,会自动删除。
stringRedisTemplate.expire("lock",300,TimeUnit.SECONDS);
private Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
// 去redis占锁
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111");
if (lock) {
stringRedisTemplate.expire("lock",300,TimeUnit.SECONDS);
// 加锁成功,执行业务
Map<String, List<Catelog2Vo>> categoryMap = getDataFromDb();
// 业务执行完,需要删除锁,别人就可以来占锁了
stringRedisTemplate.delete("lock");
return categoryMap;
} else {
// 加锁失败,休眠200ms重试
try {
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
// 重试,使用自旋的方式,模仿本地sync监听锁
return getCatalogJsonFromDbWithRedisLock();
}
}
四、分布式锁演进 - 阶段二
问题:
- setnx设置好,正要去设置过期时间,宕机。又死锁了。
解决:
- 设置过期时间和占位必须是原子的。redis支持使用setnx ex命令
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111", 300, TimeUnit.SECONDS);
private Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
// 去redis占锁
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111", 300, TimeUnit.SECONDS);
if (lock) {
// 加锁成功,执行业务
Map<String, List<Catelog2Vo>> categoryMap = getDataFromDb();
// 业务执行完,需要删除锁,别人就可以来占锁了
stringRedisTemplate.delete("lock");
return categoryMap;
} else {
// 加锁失败,休眠200ms重试
try {
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
// 重试,使用自旋的方式,模仿本地sync监听锁
return getCatalogJsonFromDbWithRedisLock();
}
}
五、分布式锁演进 - 阶段三
5.1 问题一
假设一开始A线程抢到了锁,一开始设置的锁过期时间为10秒,执行业务的时候,由于业务较为复杂,执行了30秒,等要去删锁的时候,其实已经过期了,redis里面已经没有了,这还是比较好的情况。
5.2 问题二
最坏的情况是这样,执行业务的代码超时了,花费了30秒,这30秒发生了很多事情
- A线程,在业务执行到第10秒的时候,锁就过期了,redis把锁删除了,此时,外面的线程都在等着抢占锁,结果发现锁可以抢了,直接就去抢锁;
- B线程抢到了锁,又开始执行业务,它执行到第10秒的时候,它的锁也过期了;
- C线程又抢到了锁,又开始执行业务,它执行到第10秒的时候,它的锁也过期了;
- D线程又抢到了锁,由于是同步的过程,此时的A线程已经执行了30秒,也就是把业务执行完了,然后它会手动删除锁,但是在这30秒期间,A、B、C的锁早就因为过期自动被删了;
- 所以它真正删除的是D线程的锁,而D线程还在执行业务,它的锁一旦被删除,又会导致其它线程抢到锁,如此循环下去,就会使这个锁失去作用
5.3 解决
占锁的时候,值指定为uuid,每个人匹配是自己的锁才删除
private Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
String uuid = UUID.randomUUID().toString();
// 去redis占锁
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
if (lock) {
// 加锁成功,执行业务
Map<String, List<Catelog2Vo>> categoryMap = getDataFromDb();
String lock1 = stringRedisTemplate.opsForValue().get("lock");
if (uuid.equals(lock1)) {
stringRedisTemplate.delete("lock");
}
return categoryMap;
} else {
// 加锁失败,休眠200ms重试
try {
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
// 重试,使用自旋的方式,模仿本地sync监听锁
return getCatalogJsonFromDbWithRedisLock();
}
}
5.4 结论
就算业务超时,锁也会因为我们设置的过期时间,自动释放,别的线程就可以抢到锁,等到真的要手动删除锁的时候,很有可能删除的就是别人的锁。
六、分布式锁演进 - 阶段四
6.1 问题
假如说,业务执行到删除锁这里,由于删除锁的命令,需要到远程服务器拿到redis的数据,再让远程服务器将数据返回,这中间是要花费一定时间的,如果锁在10秒过期,我们的业务已经执行到9.5秒了,彻底取到值要花费0.8秒,我们去服务器取数据,假设花费了0.3秒,让服务器将数据返回给我们,这期间又要花费0.5秒,然而数据才刚走到一半,锁就过期了,然后就被B线程抢到了,又过了0.3秒,数据回到了A线程,A线程判断这个值的确是当时设置的token,然后就将锁删了,殊不知它的锁早就因为过期被自动删了,而它删除的,正是当前B线程的锁,相当于是给误删了。
6.2 解决
删除锁必须保证原子性,使用redis+Lua脚本完成。见 最终形态
6.3 结论
获取值+对比成功删除的操作不是原子操作,导致了上述问题的发生。
七、分布式锁演进 - 阶段五 - 最终形态
7.1 问题
假设我们的业务执行时间超长,我们就需要给锁自动续期。
当然最简单的方法就是给锁设置的时间长一些, 比如说,设置个300秒,哪个业务也不可能让它执行300秒,我们不会等它。
7.2 解决
代码
cfmall-product/src/main/java/com/gyz/cfmall/product/service/impl/CategoryServiceImpl.java
@Override
public Map<String, List<Catelog2Vo>> getCatalogJson() {
//加入缓存逻辑
String catelogJSON = stringRedisTemplate.opsForValue().get("catelogJSON");
if (StringUtils.isEmpty(catelogJSON)) {
//缓存中没有,查询数据库
System.out.println("缓存未命中.....查询数据库");
Map<String, List<Catelog2Vo>> categoryJsonMap = getCatalogJsonFromDb();
}
System.out.println("缓存命中.....直接返回");
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catelogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {
});
return result;
}
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDb() {
return getCatalogJsonFromDbWithRedisLock();
}
//分布式锁
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
// 1、分布式锁,去redis占坑,同时设置过期时间
//每个线程设置随机的UUID,也可以成为token
String token = UUID.randomUUID().toString();
//只有键key不存在的时候才会设置key的值。保证分布式情况下一个锁能进线程
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", token, 300, TimeUnit.SECONDS);
//setIfAbsent()如果返回true代表此线程拿到锁;如果返回false代表没拿到锁,就sleep一会递归重试,一直到某一层获取到锁并层层返回redis或数据库结果。
if (lock) {
// 加锁成功....执行业务【内部会判断一次redis是否有值】
System.out.println("获取分布式锁成功....");
Map<String, List<Catelog2Vo>> dataFromDB;
try {
dataFromDB = getDataFromDb();
} finally {
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 删除锁
Long result = stringRedisTemplate.execute(new DefaultRedisScript<Long>(luaScript, Long.class), Arrays.asList("lock"), token); //把key和value传给lua脚本
}
return dataFromDB;
} else {
System.out.println("获取分布式锁失败....等待重试...");
// 加锁失败....重试
// 休眠200ms重试
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 自旋方式
return getCatalogJsonFromDbWithRedisLock();
}
}
private Map<String, List<Catelog2Vo>> getDataFromDb() {
//得到锁后,我们应该再去缓存查询一次,如果没有才需要继续查询
String catalogJSON = stringRedisTemplate.opsForValue().get("catelogJSON");
if (!StringUtils.isEmpty(catalogJSON)) {
Map<String, List<Catelog2Vo>> result = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {
});
return result;
}
System.out.println("查询了数据库.....");
// 一次性获取所有数据
List<CategoryEntity> selectList = baseMapper.selectList(null);
// 1)、所有1级分类
List<CategoryEntity> level1Categorys = getParent_cid(selectList, 0L);
// 2)、封装数据
Map<String, List<Catelog2Vo>> collect = level1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), level1 -> {
// 查到当前1级分类的2级分类
List<CategoryEntity> category2level = getParent_cid(selectList, level1.getCatId());
List<Catelog2Vo> catalog2Vos = null;
if (category2level != null) {
catalog2Vos = category2level.stream().map(level12 -> {
// 查询当前2级分类的3级分类
List<CategoryEntity> category3level = getParent_cid(selectList, level12.getCatId());
List<Catelog2Vo.Category3Vo> catalog3Vos = null;
if (category3level != null) {
catalog3Vos = category3level.stream().map(level13 -> {
return new Catelog2Vo.Category3Vo(level12.getCatId().toString(), level13.getCatId().toString(), level13.getName());
}).collect(Collectors.toList());
}
return new Catelog2Vo(level1.getCatId().toString(), catalog3Vos, level12.getCatId().toString(), level12.getName());
}).collect(Collectors.toList());
}
return catalog2Vos;
}));
String newCategoryJson = JSON.toJSONString(collect);
// 设置过期时间,解决缓存雪崩
stringRedisTemplate.opsForValue().set("catelogJSON", newCategoryJson, 1, TimeUnit.DAYS);
return collect;
}
JMeter压测
测试结果
只有8200服务查询了一次数据库
八、总结
分布式锁核心分为两部分
- 使用NX/EX进行原子加锁
- 使用lua脚本进行原子解锁