Springboot가 Redis를 통합하여 과잉 판매 문제를 해결하는 방법

WBOY
풀어 주다: 2023-05-26 19:52:39
앞으로
1366명이 탐색했습니다.

    과매도에 대한 간단한 코드

    간단하고 일반적인 과매도 논리 코드를 작성하면 여러 사용자가 동시에 동일한 데이터 조각을 조작하여 발생하는 문제를 탐색할 수 있습니다.

    Redis에 데이터 정보를 저장하고, 해당 인터페이스를 요청하고, 제품 수량 정보를 얻습니다.
    제품 수량 정보가 0보다 크면 1을 빼고 Redis에 다시 저장합니다.
    코드를 실행하여 테스트합니다. 문제.

    /** * Redis数据库操作,超卖问题模拟 * @author * */ @RestController public class RedisController { // 引入String类型redis操作模板 @Autowired private StringRedisTemplate stringRedisTemplate; // 测试数据设置接口 @RequestMapping("/setStock") public String setStock() { stringRedisTemplate.opsForValue().set("stock", "100"); return "ok"; } // 模拟商品超卖代码 @RequestMapping("/deductStock") public String deductStock() { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if(stock > 0) { int realStock = stock -1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:"+realStock); }else { System.out.println("库存不足....."); } return "end"; } }
    로그인 후 복사

    과잉 문제

    단일 서버 및 단일 애플리케이션의 경우

    단일 애플리케이션 모드에서는 스트레스 테스트를 위해jmeter를 사용하세요.jmeter压测。

    Springboot가 Redis를 통합하여 과잉 판매 문제를 해결하는 방법

    Springboot가 Redis를 통합하여 과잉 판매 문제를 해결하는 방법

    测试结果:

    Springboot가 Redis를 통합하여 과잉 판매 문제를 해결하는 방법

    每个请求相当于一个线程,当几个线程同时拿到数据时,线程A拿到库存为84,这个时候线程B也进入程序,并且抢占了CPU,访问库存为84,最后两个线程都对库存减一,导致最后修改为83,实际上多卖出去了一件

    既然线程和线程之间,数据处理不一致,能否使用synchronized加锁测试?

    设置synchronized

    依旧还是先测试单服务器

    // 模拟商品超卖代码, // 设置synchronized同步锁 @RequestMapping("/deductStock1") public String deductStock1() { synchronized (this) { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if(stock > 0) { int realStock = stock -1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:"+realStock); }else { System.out.println("库存不足....."); } } return "end"; }
    로그인 후 복사

    数量100

    Springboot가 Redis를 통합하여 과잉 판매 문제를 해결하는 방법

    重新压测,得到的日志信息如下所示:

    Springboot가 Redis를 통합하여 과잉 판매 문제를 해결하는 방법

    在单机模式下,添加synchronized关键字,的确能够避免商品的超卖现象!

    但是在分布式微服务中,针对该服务设置了集群,synchronized依旧还能保证数据的正确性吗?

    假设多个请求,被注册中心负载均衡,每个微服务中的该处理接口,都添加有synchronized,

    Springboot가 Redis를 통합하여 과잉 판매 문제를 해결하는 방법

    依然会出现类似的超卖问题:

    synchronized只是针对单一服务器JVM进行加锁,但是分布式是很多个不同的服务器,导致两个线程或多个在不同服务器上共同对商品数量信息做了操作!


    Redis实现分布式锁

    在Redis中存在一条命令setnx (set if not exists)

    setnx key value
    如果不存在key,则可以设置成功;否则设置失败。

    修改处理接口,增加key

    // 模拟商品超卖代码 @RequestMapping("/deductStock2") public String deductStock2() { // 创建一个key,保存至redis String key = "lock"; // setnx // 由于redis是一个单线程,执行命令采取“队列”形式排队! // 优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败。 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock"); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if(stock > 0) { int realStock = stock -1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:"+realStock); }else { System.out.println("库存不足....."); } // 程序执行完成,则删除这个key stringRedisTemplate.delete(key); return "end"; }
    로그인 후 복사

    1、请求进入接口中,如果redis中不存在key,则会新建一个setnx;如果存在,则不会新建,同时返回错误编码,不会继续执行抢购逻辑。
    2、当创建成功后,执行抢购逻辑。
    3、抢购逻辑执行完成后,删除数据库中对应的setnxkey。让其他请求能够设置并操作。

    这种逻辑来说比之前单一使用syn合理的多,但是如果执行抢购操作中出现了异常,导致这个key无法被删除。以至于其他处理请求,一直无法拿到key,程序逻辑死锁!

    可以采取try … finally进行操作

    /** * 模拟商品超卖代码 设置 * * @return */ @RequestMapping("/deductStock3") public String deductStock3() { // 创建一个key,保存至redis String key = "lock"; // setnx // 由于redis是一个单线程,执行命令采取队列形式排队!优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock"); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } try { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } finally { // 程序执行完成,则删除这个key // 放置于finally中,保证即使上述逻辑出问题,也能del掉 stringRedisTemplate.delete(key); } return "end"; }
    로그인 후 복사

    这个逻辑相比上面其他的逻辑来说,显得更加的严谨。

    但是,如果一套服务器,因为断电、系统崩溃等原因出现宕机,导致本该执行finally中的语句未成功执行完成!!同样出现key一直存在,导致死锁

    通过超时间解决上述问题

    在设置成功setnx后,以及抢购代码逻辑执行前,增加key的限时。

    /** * 模拟商品超卖代码 设置setnx保证分布式环境下,数据处理安全行问题;
    * 但如果某个代码段执行异常,导致key无法清理,出现死锁,添加try...finally;
    * 如果某个服务因某些问题导致释放key不能执行,导致死锁,此时解决思路为:增加key的有效时间;
    * 为了保证设置key的值和设置key的有效时间,两条命令构成同一条原子命令,将下列逻辑换成其他代码。 * * @return */ @RequestMapping("/deductStock4") public String deductStock4() { // 创建一个key,保存至redis String key = "lock"; // setnx // 由于redis是一个单线程,执行命令采取队列形式排队!优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败 //boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock"); //让设置key和设置key的有效时间都可以同时执行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock", 10, TimeUnit.SECONDS); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } // 设置key有效时间 //stringRedisTemplate.expire(key, 10, TimeUnit.SECONDS); try { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } finally { // 程序执行完成,则删除这个key // 放置于finally中,保证即使上述逻辑出问题,也能del掉 stringRedisTemplate.delete(key); } return "end"; }
    로그인 후 복사

    但是上述代码的逻辑中依旧会有问题:

    如果处理逻辑中,出现超时
    과잉 판매 문제를 해결하기 위해 Springboot가 Redis를 통합하는 방법

    과잉 판매 문제를 해결하기 위해 Springboot가 Redis를 통합하는 방법

    Springboot가 Redis를 통합하여 과잉 판매 문제를 해결하는 방법테스트 결과:

    과잉 판매 문제를 해결하기 위해 Springboot가 Redis를 통합하는 방법


    각 요청은 스레드와 동일합니다. 여러 스레드가 동시에 데이터를 가져오면 스레드 A는 인벤토리를 84로 가져옵니다. 이때 스레드 B도 프로그램에 들어가 CPU를 점유하여 84로 인벤토리에 액세스합니다. 결국 두 스레드 모두 인벤토리를 1만큼 감소시킵니다. 하나, 결과적으로 83으로 최종 수정되었습니다. 실제로 하나 더 판매되었습니다

    스레드 간에 데이터 처리가 일치하지 않기 때문에synchronized를 사용하여 테스트를 잠글 수 있나요?

    동기화 설정단일 서버를 먼저 테스트하세요
    /** * 模拟商品超卖代码 
    * 解决`deductStock6`中,key形同虚设的问题。 * * @return */ @RequestMapping("/deductStock5") public String deductStock5() { // 创建一个key,保存至redis String key = "lock"; String lock_value = UUID.randomUUID().toString(); // setnx //让设置key和设置key的有效时间都可以同时执行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, lock_value, 10, TimeUnit.SECONDS); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } try { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } finally { // 程序执行完成,则删除这个key // 放置于finally中,保证即使上述逻辑出问题,也能del掉 // 判断redis中该数据是否是这个接口处理时的设置的,如果是则删除 if(lock_value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(key))) { stringRedisTemplate.delete(key); } } return "end"; }
    로그인 후 복사
    로그인 후 복사
    Quantity 100 과잉 문제를 해결하기 위해 Springboot가 Redis를 통합하는 방법Re-stress 테스트에서 얻은 로그 정보는 다음과 같습니다. Springboot가 Redis를 통합하여 과매도 문제를 해결하는 방법독립형 모드에서 동기화 키워드를 추가하면 실제로 제품의 과매도 현상을 피할 수 있습니다! 하지만 분산형 마이크로서비스에서 클러스터가 서비스에 대해 설정된 경우 동기화가 여전히 데이터의 정확성을 보장할 수 있나요? 등록 센터에서 여러 요청의 로드 밸런싱을 가정하면 각 마이크로서비스의 처리 인터페이스가 동기화되어 추가됩니다. Springboot가 과매도 문제를 해결하기 위해 Redis를 통합하는 방법유사한 과매도문제는 계속 발생합니다. 동기화단일 서버JVM에 대해 잠금되었지만 여러 서버에 분산되어 2개 이상의 스레드가 발생했습니다. 제품 수량 정보가 공동으로 운영되었습니다. 다른 서버에서!
    Redis는 분산 잠금을 구현합니다. Redis에는 setnx(존재하지 않는 경우 설정)setnx 키 값키가 존재하지 않는 경우 다음과 같은 명령이 있습니다. 성공적으로 설정되었습니다. 그렇지 않으면 설정이 실패합니다. 처리 인터페이스 수정 및 키 추가
    @Component public class RedisLock { private final Logger log = LoggerFactory.getLogger(this.getClass()); private final long acquireTimeout = 10*1000; // 获取锁之前的超时时间(获取锁的等待重试时间) private final int timeOut = 20; // 获取锁之后的超时时间(防止死锁) @Autowired private StringRedisTemplate stringRedisTemplate; // 引入String类型redis操作模板 /** * 获取分布式锁 * @return 锁标识 */ public boolean getRedisLock(String lockName,String lockValue) { // 1.计算获取锁的时间 Long endTime = System.currentTimeMillis() + acquireTimeout; // 2.尝试获取锁 while (System.currentTimeMillis() < endTime) { //3. 获取锁成功就设置过期时间 让设置key和设置key的有效时间都可以同时执行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockName, lockValue, timeOut, TimeUnit.SECONDS); if (result) { return true; } } return false; } /** * 释放分布式锁 * @param lockName 锁名称 * @param lockValue 锁值 */ public void unRedisLock(String lockName,String lockValue) { if(lockValue.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(lockName))) { stringRedisTemplate.delete(lockName); } } }
    로그인 후 복사
    로그인 후 복사
    1. 요청이 인터페이스에 들어갈 때 키가 redis에 없으면 새 setnx가 생성되고, 키가 있으면 생성되지 않습니다. 오류 코드가 반환되고 긴급 구매 로직이 계속 실행되지 않습니다. 2. 생성이 성공한 후 스냅업 로직을 실행합니다. 3. 긴급 구매 로직이 실행된 후 데이터베이스에서 해당 setnxkey를 삭제합니다. 다른 요청을 설정하고 조치를 취할 수 있도록 합니다. 이 논리는 이전에 syn을 단독으로 사용하는 것보다 훨씬 합리적입니다. 그러나 긴급 구매 작업 중에 예외가 발생하면 로 삭제할 수 없습니다.. 결과적으로 다른 처리 요청이 를 얻을 수 없으며 프로그램 논리가 교착 상태에 빠졌습니다! try...finally를 사용하여
    @RestController public class RedisController { // 引入String类型redis操作模板 @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private RedisLock redisLock; @RequestMapping("/setStock") public String setStock() { stringRedisTemplate.opsForValue().set("stock", "100"); return "ok"; } @RequestMapping("/deductStock") public String deductStock() { // 创建一个key,保存至redis String key = "lock"; String lock_value = UUID.randomUUID().toString(); try { boolean redisLock = this.redisLock.getRedisLock(key, lock_value);//获取锁 if (redisLock) { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } } finally { redisLock.unRedisLock(key,lock_value); //释放锁 } return "end"; } }
    로그인 후 복사
    로그인 후 복사
    이 논리는 위의 다른 논리보다 더 엄격합니다. 그러나 정전, 시스템 충돌 등으로 인해 서버 집합이 다운타임되는 경우 실행해야 할 finally의 명령문이 성공적으로 실행되지 않습니다! ! 또한 키가 항상 존재하여 교착 상태가 발생하는 것으로 보입니다! 타임아웃을 통해 위의 문제를 해결하세요 setnx설정을 성공적으로 마친 후 긴급 구매 코드 로직이 실행되기 전에 키의 제한 시간을 늘려주세요. rrreee하지만 위 코드의 로직에는 여전히 문제가 있습니다. 처리 로직에 timeout문제가 있는 경우. 로직이 실행되어 설정된 키 유효 시간을 초과하면 이때 어떤 문제가 발생하나요? 위 그림에서 문제를 명확하게 찾을 수 있습니다. 요청 실행 시간이 키의 유효 시간을 초과하는 경우. 새 요청이 실행되면 반드시 키를 가져오고 시간을 설정할 수 있습니다. 이때 Redis에 저장된 키는 요청 1의 키가 아니라 다른 요청에 의해 설정됩니다. 요청 1 실행이 완료되면 여기서 키가 삭제됩니다. 삭제되는 것은 다른 요청에 의해 설정된 키입니다!

    依然出现了key形同虚设的问题!如果失效一直存在,超卖问题依旧不会解决。

    通过key设置值匹配的方式解决形同虚设问题

    既然出现key形同虚设的现象,是否可以增加条件,当finally中需要执行删除操作时,获取数据判断值是否是该请求中对应的,如果是则删除,不是则不管!

    修改上述代码如下所示:

    /** * 模拟商品超卖代码 
    * 解决`deductStock6`中,key形同虚设的问题。 * * @return */ @RequestMapping("/deductStock5") public String deductStock5() { // 创建一个key,保存至redis String key = "lock"; String lock_value = UUID.randomUUID().toString(); // setnx //让设置key和设置key的有效时间都可以同时执行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, lock_value, 10, TimeUnit.SECONDS); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } try { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } finally { // 程序执行完成,则删除这个key // 放置于finally中,保证即使上述逻辑出问题,也能del掉 // 判断redis中该数据是否是这个接口处理时的设置的,如果是则删除 if(lock_value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(key))) { stringRedisTemplate.delete(key); } } return "end"; }
    로그인 후 복사
    로그인 후 복사

    由于获得锁的线程必须执行完减库存逻辑才能释放锁,所以在此期间所有其他的线程都会由于没获得锁,而直接结束程序,导致有很多库存根本没有卖出去,所以这里应该可以优化,让没获得锁的线程等待,或者循环检查锁

    Springboot가 Redis를 통합하여 과잉 판매 문제를 해결하는 방법


    最终版

    我们将锁封装到一个实体类中,然后加入两个方法,加锁和解锁

    @Component public class RedisLock { private final Logger log = LoggerFactory.getLogger(this.getClass()); private final long acquireTimeout = 10*1000; // 获取锁之前的超时时间(获取锁的等待重试时间) private final int timeOut = 20; // 获取锁之后的超时时间(防止死锁) @Autowired private StringRedisTemplate stringRedisTemplate; // 引入String类型redis操作模板 /** * 获取分布式锁 * @return 锁标识 */ public boolean getRedisLock(String lockName,String lockValue) { // 1.计算获取锁的时间 Long endTime = System.currentTimeMillis() + acquireTimeout; // 2.尝试获取锁 while (System.currentTimeMillis() < endTime) { //3. 获取锁成功就设置过期时间 让设置key和设置key的有效时间都可以同时执行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockName, lockValue, timeOut, TimeUnit.SECONDS); if (result) { return true; } } return false; } /** * 释放分布式锁 * @param lockName 锁名称 * @param lockValue 锁值 */ public void unRedisLock(String lockName,String lockValue) { if(lockValue.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(lockName))) { stringRedisTemplate.delete(lockName); } } }
    로그인 후 복사
    로그인 후 복사
    @RestController public class RedisController { // 引入String类型redis操作模板 @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private RedisLock redisLock; @RequestMapping("/setStock") public String setStock() { stringRedisTemplate.opsForValue().set("stock", "100"); return "ok"; } @RequestMapping("/deductStock") public String deductStock() { // 创建一个key,保存至redis String key = "lock"; String lock_value = UUID.randomUUID().toString(); try { boolean redisLock = this.redisLock.getRedisLock(key, lock_value);//获取锁 if (redisLock) { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } } finally { redisLock.unRedisLock(key,lock_value); //释放锁 } return "end"; } }
    로그인 후 복사
    로그인 후 복사

    可以看到失败的线程不会直接结束,而是会尝试重试,一直到重试结束时间,才会结束

    Springboot가 Redis를 통합하여 과잉 판매 문제를 해결하는 방법


    实际上这个最终版依然存在3个问题

    1、在finally流程中,由于是先判断在处理。如果判断条件结束后,获取到的结果为true。但是在执行del操作前,此时jvm在执行GC操作(为了保证GC操作获取GC roots根完全,会暂停java程序),导致程序暂停。在GC操作完成并恢复后,执行del操作时,当前被加锁的key是否仍然存在?

    2、问题如图所示

    Springboot가 Redis를 통합하여 과잉 판매 문제를 해결하는 방법

    위 내용은 Springboot가 Redis를 통합하여 과잉 판매 문제를 해결하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

    관련 라벨:
    원천:yisu.com
    본 웹사이트의 성명
    본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
    최신 이슈
    최신 다운로드
    더>
    웹 효과
    웹사이트 소스 코드
    웹사이트 자료
    프론트엔드 템플릿
    회사 소개 부인 성명 Sitemap
    PHP 중국어 웹사이트:공공복지 온라인 PHP 교육,PHP 학습자의 빠른 성장을 도와주세요!