✅P158_缓存-分布式锁-分布式锁原理与使用

gong_yz大约 10 分钟谷粒商城

一、分布式锁演进 - 基本原理

Redis官方网址open in new window

我们可以同时去一个地方“占坑”,如果占到,就执行逻辑。否则就必须等待,直到释放锁。

“占坑”可以去redis,可以去数据库,可以去任何大家都能访问的地方。

等待可以自旋的方式。

set命令

set key value [EX secondes] [PX milliseconds] [NX|XX]
  • key:必填,键
  • value:必填,值
  • [EX secondes]:过期时间,单位(秒)
  • [PX milliseconds]:过期时间,毫秒
  • [NX|XX]
    • NX,只有key不存在的时候才会设置key的值
    • XX,只有key存在的时候才会设置key的值

二、模拟多个客户端去redis抢占锁

2.1 MobaXterm测试

实现执行一条命令发送到多个服务器,这个功能叫做多执行器(MultiExec)

执行以下命令发送给全部会话

# 统一进入redis控制台
docker exec -it redis redis-cli

# 进行抢占锁
set lock test NX

发现只有3号会话抢到锁了,返回了ok,其它会话都没设置上,返回的是nil,所以set lock test NX命令原子加锁

2.2 Xshell测试

首先多复制几份虚拟机的会话

执行以下命令发送给全部会话

# 统一进入redis控制台
docker exec -it redis redis-cli

# 进行抢占锁
set lock test NX

发现只有1号会话抢到锁了,返回了ok,其它会话都没设置上,返回的是nil,set lock test NX命令原子加锁

显示撰写栏

显示撰写栏:查看->撰写->撰写栏

发送所有绘话:下方红色框最左侧鼠标单击。


三、分布式锁演进 - 阶段一

cfmall-product/src/main/java/com/gyz/cfmall/product/service/impl/CategoryServiceImpl.java

问题:

  • setnx占好了位,业务代码异常或者程序在页面过程中宕机。没有执行删除锁逻辑,这就造成了死锁
@Override
public Map<String, List<Catelog2Vo>> getCatalogJson() {
    //加入缓存逻辑
    String catelogJSON = stringRedisTemplate.opsForValue().get("catelogJSON");
    if (StringUtils.isEmpty(catelogJSON)) {
        //缓存中没有,查询数据库
        System.out.println("缓存未命中.....查询数据库");
        Map<String, List<Catelog2Vo>> categoryJsonMap = getCatalogJsonFromDb();
    }
    System.out.println("缓存命中.....直接返回");
    Map<String, List<Catelog2Vo>> result = JSON.parseObject(catelogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {
    });
    return result;
}
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDb() {
    return getCatalogJsonFromDbWithRedisLock();
}
private Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
    // 去redis占锁
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111");
    if (lock) {
        // 加锁成功,执行业务
        Map<String, List<Catelog2Vo>> categoryMap = getDataFromDb();

        // 业务执行完,需要删除锁,别人就可以来占锁了
        stringRedisTemplate.delete("lock");
        return categoryMap;
    } else {
        // 加锁失败,休眠200ms重试
        try {
            Thread.sleep(200);
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 重试,使用自旋的方式,模仿本地sync监听锁
        return getCatalogJsonFromDbWithRedisLock();
    }
}

解决:

  • 设置锁的自动过期,即使没有删除,会自动删除。
  • stringRedisTemplate.expire("lock",300,TimeUnit.SECONDS);
private Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
    // 去redis占锁
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111");
    if (lock) {
        stringRedisTemplate.expire("lock",300,TimeUnit.SECONDS);
        // 加锁成功,执行业务
        Map<String, List<Catelog2Vo>> categoryMap = getDataFromDb();

        // 业务执行完,需要删除锁,别人就可以来占锁了
        stringRedisTemplate.delete("lock");
        return categoryMap;
    } else {
        // 加锁失败,休眠200ms重试
        try {
            Thread.sleep(200);
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 重试,使用自旋的方式,模仿本地sync监听锁
        return getCatalogJsonFromDbWithRedisLock();
    }
}

四、分布式锁演进 - 阶段二

问题:

  • setnx设置好,正要去设置过期时间,宕机。又死锁了。

解决:

  • 设置过期时间和占位必须是原子的。redis支持使用setnx ex命令
  • Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111", 300, TimeUnit.SECONDS);
private Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
    // 去redis占锁
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111", 300, TimeUnit.SECONDS);
    if (lock) {
        // 加锁成功,执行业务
        Map<String, List<Catelog2Vo>> categoryMap = getDataFromDb();

        // 业务执行完,需要删除锁,别人就可以来占锁了
        stringRedisTemplate.delete("lock");
        return categoryMap;
    } else {
        // 加锁失败,休眠200ms重试
        try {
            Thread.sleep(200);
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 重试,使用自旋的方式,模仿本地sync监听锁
        return getCatalogJsonFromDbWithRedisLock();
    }
}

五、分布式锁演进 - 阶段三

5.1 问题一

假设一开始A线程抢到了锁,一开始设置的锁过期时间为10秒,执行业务的时候,由于业务较为复杂,执行了30秒,等要去删锁的时候,其实已经过期了,redis里面已经没有了,这还是比较好的情况。

5.2 问题二

最坏的情况是这样,执行业务的代码超时了,花费了30秒,这30秒发生了很多事情

  • A线程,在业务执行到第10秒的时候,锁就过期了,redis把锁删除了,此时,外面的线程都在等着抢占锁,结果发现锁可以抢了,直接就去抢锁;
  • B线程抢到了锁,又开始执行业务,它执行到第10秒的时候,它的锁也过期了;
  • C线程又抢到了锁,又开始执行业务,它执行到第10秒的时候,它的锁也过期了;
  • D线程又抢到了锁,由于是同步的过程,此时的A线程已经执行了30秒,也就是把业务执行完了,然后它会手动删除锁,但是在这30秒期间,A、B、C的锁早就因为过期自动被删了;
  • 所以它真正删除的是D线程的锁,而D线程还在执行业务,它的锁一旦被删除,又会导致其它线程抢到锁,如此循环下去,就会使这个锁失去作用

5.3 解决

占锁的时候,值指定为uuid,每个人匹配是自己的锁才删除

private Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
    String uuid = UUID.randomUUID().toString();
    // 去redis占锁
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
    if (lock) {
        // 加锁成功,执行业务
        Map<String, List<Catelog2Vo>> categoryMap = getDataFromDb();
        String lock1 = stringRedisTemplate.opsForValue().get("lock");
        if (uuid.equals(lock1)) {
            stringRedisTemplate.delete("lock");
        }
        return categoryMap;
    } else {
        // 加锁失败,休眠200ms重试
        try {
            Thread.sleep(200);
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 重试,使用自旋的方式,模仿本地sync监听锁
        return getCatalogJsonFromDbWithRedisLock();
    }
}

5.4 结论

就算业务超时,锁也会因为我们设置的过期时间,自动释放,别的线程就可以抢到锁,等到真的要手动删除锁的时候,很有可能删除的就是别人的锁。


六、分布式锁演进 - 阶段四

6.1 问题

假如说,业务执行到删除锁这里,由于删除锁的命令,需要到远程服务器拿到redis的数据,再让远程服务器将数据返回,这中间是要花费一定时间的,如果锁在10秒过期,我们的业务已经执行到9.5秒了,彻底取到值要花费0.8秒,我们去服务器取数据,假设花费了0.3秒,让服务器将数据返回给我们,这期间又要花费0.5秒,然而数据才刚走到一半,锁就过期了,然后就被B线程抢到了,又过了0.3秒,数据回到了A线程,A线程判断这个值的确是当时设置的token,然后就将锁删了,殊不知它的锁早就因为过期被自动删了,而它删除的,正是当前B线程的锁,相当于是给误删了。

6.2 解决

删除锁必须保证原子性,使用redis+Lua脚本完成。见 最终形态

6.3 结论

获取值+对比成功删除的操作不是原子操作,导致了上述问题的发生。


七、分布式锁演进 - 阶段五 - 最终形态

7.1 问题

假设我们的业务执行时间超长,我们就需要给锁自动续期。

当然最简单的方法就是给锁设置的时间长一些, 比如说,设置个300秒,哪个业务也不可能让它执行300秒,我们不会等它。

7.2 解决

代码

cfmall-product/src/main/java/com/gyz/cfmall/product/service/impl/CategoryServiceImpl.java

@Override
public Map<String, List<Catelog2Vo>> getCatalogJson() {
    //加入缓存逻辑
    String catelogJSON = stringRedisTemplate.opsForValue().get("catelogJSON");
    if (StringUtils.isEmpty(catelogJSON)) {
        //缓存中没有,查询数据库
        System.out.println("缓存未命中.....查询数据库");
        Map<String, List<Catelog2Vo>> categoryJsonMap = getCatalogJsonFromDb();
    }
    System.out.println("缓存命中.....直接返回");
    Map<String, List<Catelog2Vo>> result = JSON.parseObject(catelogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {
    });
    return result;
}
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDb() {
    return getCatalogJsonFromDbWithRedisLock();
}
//分布式锁
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
    // 1、分布式锁,去redis占坑,同时设置过期时间
    //每个线程设置随机的UUID,也可以成为token
    String token = UUID.randomUUID().toString();
    //只有键key不存在的时候才会设置key的值。保证分布式情况下一个锁能进线程
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", token, 300, TimeUnit.SECONDS);
    //setIfAbsent()如果返回true代表此线程拿到锁;如果返回false代表没拿到锁,就sleep一会递归重试,一直到某一层获取到锁并层层返回redis或数据库结果。
    if (lock) {
        // 加锁成功....执行业务【内部会判断一次redis是否有值】
        System.out.println("获取分布式锁成功....");
        Map<String, List<Catelog2Vo>> dataFromDB;
        try {
            dataFromDB = getDataFromDb();
        } finally {
            String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            // 删除锁
            Long result = stringRedisTemplate.execute(new DefaultRedisScript<Long>(luaScript, Long.class), Arrays.asList("lock"), token);    //把key和value传给lua脚本
        }
        return dataFromDB;
    } else {
        System.out.println("获取分布式锁失败....等待重试...");
        // 加锁失败....重试
        // 休眠200ms重试
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 自旋方式
        return getCatalogJsonFromDbWithRedisLock();
    }
}
private Map<String, List<Catelog2Vo>> getDataFromDb() {
    //得到锁后,我们应该再去缓存查询一次,如果没有才需要继续查询
    String catalogJSON = stringRedisTemplate.opsForValue().get("catelogJSON");
    if (!StringUtils.isEmpty(catalogJSON)) {
        Map<String, List<Catelog2Vo>> result = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {
        });
        return result;
    }
    System.out.println("查询了数据库.....");
    // 一次性获取所有数据
    List<CategoryEntity> selectList = baseMapper.selectList(null);
    // 1)、所有1级分类
    List<CategoryEntity> level1Categorys = getParent_cid(selectList, 0L);
    // 2)、封装数据
    Map<String, List<Catelog2Vo>> collect = level1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), level1 -> {
        // 查到当前1级分类的2级分类
        List<CategoryEntity> category2level = getParent_cid(selectList, level1.getCatId());
        List<Catelog2Vo> catalog2Vos = null;
        if (category2level != null) {
            catalog2Vos = category2level.stream().map(level12 -> {
                // 查询当前2级分类的3级分类
                List<CategoryEntity> category3level = getParent_cid(selectList, level12.getCatId());
                List<Catelog2Vo.Category3Vo> catalog3Vos = null;
                if (category3level != null) {
                    catalog3Vos = category3level.stream().map(level13 -> {
                        return new Catelog2Vo.Category3Vo(level12.getCatId().toString(), level13.getCatId().toString(), level13.getName());
                    }).collect(Collectors.toList());
                }
                return new Catelog2Vo(level1.getCatId().toString(), catalog3Vos, level12.getCatId().toString(), level12.getName());
            }).collect(Collectors.toList());
        }
        return catalog2Vos;
    }));
    String newCategoryJson = JSON.toJSONString(collect);
    // 设置过期时间,解决缓存雪崩
    stringRedisTemplate.opsForValue().set("catelogJSON", newCategoryJson, 1, TimeUnit.DAYS);
    return collect;
}

JMeter压测

测试结果

只有8200服务查询了一次数据库


八、总结

分布式锁核心分为两部分

  • 使用NX/EX进行原子加锁
  • 使用lua脚本进行原子解锁

参考博客open in new window