간단하고 일반적인 과매도 논리 코드를 작성하면 여러 사용자가 동시에 동일한 데이터 조각을 조작하여 발생하는 문제를 탐색할 수 있습니다.
Redis에 데이터 정보를 저장하고, 해당 인터페이스를 요청하고, 제품 수량 정보를 얻습니다.
제품 수량 정보가 0보다 크면 1을 빼고 Redis에 다시 저장합니다.
코드를 실행하여 테스트합니다. 문제.
/** * Redis数据库操作,超卖问题模拟 * @author * */ @RestController public class RedisController { // 引入String类型redis操作模板 @Autowired private StringRedisTemplate stringRedisTemplate; // 测试数据设置接口 @RequestMapping("/setStock") public String setStock() { stringRedisTemplate.opsForValue().set("stock", "100"); return "ok"; } // 模拟商品超卖代码 @RequestMapping("/deductStock") public String deductStock() { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if(stock > 0) { int realStock = stock -1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:"+realStock); }else { System.out.println("库存不足....."); } return "end"; } }
단일 애플리케이션 모드에서는 스트레스 테스트를 위해jmeter
를 사용하세요.jmeter
压测。
测试结果:
每个请求相当于一个线程,当几个线程同时拿到数据时,线程A拿到库存为84,这个时候线程B也进入程序,并且抢占了CPU,访问库存为84,最后两个线程都对库存减一,导致最后修改为83,实际上多卖出去了一件
既然线程和线程之间,数据处理不一致,能否使用synchronized
加锁测试?
依旧还是先测试单服务器
// 模拟商品超卖代码, // 设置synchronized同步锁 @RequestMapping("/deductStock1") public String deductStock1() { synchronized (this) { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if(stock > 0) { int realStock = stock -1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:"+realStock); }else { System.out.println("库存不足....."); } } return "end"; }
数量100
重新压测,得到的日志信息如下所示:
在单机模式下,添加synchronized关键字,的确能够避免商品的超卖现象!
但是在分布式微服务中,针对该服务设置了集群,synchronized依旧还能保证数据的正确性吗?
假设多个请求,被注册中心负载均衡,每个微服务中的该处理接口,都添加有synchronized,
依然会出现类似的超卖
问题:
synchronized
只是针对单一服务器
的JVM
进行加锁
,但是分布式是很多个不同的服务器,导致两个线程或多个在不同服务器上共同对商品数量信息做了操作!
在Redis中存在一条命令setnx (set if not exists)
setnx key value
如果不存在key,则可以设置成功;否则设置失败。
修改处理接口,增加key
// 模拟商品超卖代码 @RequestMapping("/deductStock2") public String deductStock2() { // 创建一个key,保存至redis String key = "lock"; // setnx // 由于redis是一个单线程,执行命令采取“队列”形式排队! // 优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败。 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock"); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if(stock > 0) { int realStock = stock -1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:"+realStock); }else { System.out.println("库存不足....."); } // 程序执行完成,则删除这个key stringRedisTemplate.delete(key); return "end"; }
1、请求进入接口中,如果redis中不存在key,则会新建一个setnx;如果存在,则不会新建,同时返回错误编码,不会继续执行抢购逻辑。
2、当创建成功后,执行抢购逻辑。
3、抢购逻辑执行完成后,删除数据库中对应的setnx
的key
。让其他请求能够设置并操作。
这种逻辑来说比之前单一使用syn
合理的多,但是如果执行抢购操作中出现了异常,导致这个key
无法被删除
。以至于其他处理请求,一直无法拿到key
,程序逻辑死锁!
可以采取try … finally进行操作
/** * 模拟商品超卖代码 设置 * * @return */ @RequestMapping("/deductStock3") public String deductStock3() { // 创建一个key,保存至redis String key = "lock"; // setnx // 由于redis是一个单线程,执行命令采取队列形式排队!优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock"); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } try { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } finally { // 程序执行完成,则删除这个key // 放置于finally中,保证即使上述逻辑出问题,也能del掉 stringRedisTemplate.delete(key); } return "end"; }
这个逻辑相比上面其他的逻辑来说,显得更加的严谨。
但是,如果一套服务器,因为断电、系统崩溃等原因出现宕机
,导致本该执行finally
中的语句未成功执行完成!!同样出现key一直存在
,导致死锁
!
在设置成功setnx
后,以及抢购代码逻辑执行前,增加key的限时。
/** * 模拟商品超卖代码 设置setnx保证分布式环境下,数据处理安全行问题;
* 但如果某个代码段执行异常,导致key无法清理,出现死锁,添加try...finally;
* 如果某个服务因某些问题导致释放key不能执行,导致死锁,此时解决思路为:增加key的有效时间;
* 为了保证设置key的值和设置key的有效时间,两条命令构成同一条原子命令,将下列逻辑换成其他代码。 * * @return */ @RequestMapping("/deductStock4") public String deductStock4() { // 创建一个key,保存至redis String key = "lock"; // setnx // 由于redis是一个单线程,执行命令采取队列形式排队!优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败 //boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock"); //让设置key和设置key的有效时间都可以同时执行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock", 10, TimeUnit.SECONDS); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } // 设置key有效时间 //stringRedisTemplate.expire(key, 10, TimeUnit.SECONDS); try { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } finally { // 程序执行完成,则删除这个key // 放置于finally中,保证即使上述逻辑出问题,也能del掉 stringRedisTemplate.delete(key); } return "end"; }
但是上述代码的逻辑中依旧会有问题:
如果处理逻辑中,出现
超时
테스트 결과:
동기화 설정단일 서버를 먼저 테스트하세요
각 요청은 스레드와 동일합니다. 여러 스레드가 동시에 데이터를 가져오면 스레드 A는 인벤토리를 84로 가져옵니다. 이때 스레드 B도 프로그램에 들어가 CPU를 점유하여 84로 인벤토리에 액세스합니다. 결국 두 스레드 모두 인벤토리를 1만큼 감소시킵니다. 하나, 결과적으로 83으로 최종 수정되었습니다. 실제로 하나 더 판매되었습니다
스레드 간에 데이터 처리가 일치하지 않기 때문에synchronized
를 사용하여 테스트를 잠글 수 있나요?Quantity/** * 模拟商品超卖代码
* 解决`deductStock6`中,key形同虚设的问题。 * * @return */ @RequestMapping("/deductStock5") public String deductStock5() { // 创建一个key,保存至redis String key = "lock"; String lock_value = UUID.randomUUID().toString(); // setnx //让设置key和设置key的有效时间都可以同时执行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, lock_value, 10, TimeUnit.SECONDS); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } try { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } finally { // 程序执行完成,则删除这个key // 放置于finally中,保证即使上述逻辑出问题,也能del掉 // 判断redis中该数据是否是这个接口处理时的设置的,如果是则删除 if(lock_value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(key))) { stringRedisTemplate.delete(key); } } return "end"; }로그인 후 복사로그인 후 복사100
Re-stress 테스트에서 얻은 로그 정보는 다음과 같습니다. 독립형 모드에서 동기화 키워드를 추가하면 실제로 제품의 과매도 현상을 피할 수 있습니다! 하지만 분산형 마이크로서비스에서 클러스터가 서비스에 대해 설정된 경우 동기화가 여전히 데이터의 정확성을 보장할 수 있나요? 등록 센터에서 여러 요청의 로드 밸런싱을 가정하면 각 마이크로서비스의 처리 인터페이스가 동기화되어 추가됩니다. 유사한과매도
문제는 계속 발생합니다.동기화
만단일 서버
의JVM
에 대해잠금
되었지만 여러 서버에 분산되어 2개 이상의 스레드가 발생했습니다. 제품 수량 정보가 공동으로 운영되었습니다. 다른 서버에서!
Redis는 분산 잠금을 구현합니다. Redis에는setnx(존재하지 않는 경우 설정)
setnx 키 값키가 존재하지 않는 경우 다음과 같은 명령이 있습니다. 성공적으로 설정되었습니다. 그렇지 않으면 설정이 실패합니다. 처리 인터페이스 수정 및 키 추가1. 요청이 인터페이스에 들어갈 때 키가 redis에 없으면 새 setnx가 생성되고, 키가 있으면 생성되지 않습니다. 오류 코드가 반환되고 긴급 구매 로직이 계속 실행되지 않습니다. 2. 생성이 성공한 후 스냅업 로직을 실행합니다. 3. 긴급 구매 로직이 실행된 후 데이터베이스에서 해당@Component public class RedisLock { private final Logger log = LoggerFactory.getLogger(this.getClass()); private final long acquireTimeout = 10*1000; // 获取锁之前的超时时间(获取锁的等待重试时间) private final int timeOut = 20; // 获取锁之后的超时时间(防止死锁) @Autowired private StringRedisTemplate stringRedisTemplate; // 引入String类型redis操作模板 /** * 获取分布式锁 * @return 锁标识 */ public boolean getRedisLock(String lockName,String lockValue) { // 1.计算获取锁的时间 Long endTime = System.currentTimeMillis() + acquireTimeout; // 2.尝试获取锁 while (System.currentTimeMillis() < endTime) { //3. 获取锁成功就设置过期时间 让设置key和设置key的有效时间都可以同时执行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockName, lockValue, timeOut, TimeUnit.SECONDS); if (result) { return true; } } return false; } /** * 释放分布式锁 * @param lockName 锁名称 * @param lockValue 锁值 */ public void unRedisLock(String lockName,String lockValue) { if(lockValue.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(lockName))) { stringRedisTemplate.delete(lockName); } } }로그인 후 복사로그인 후 복사setnx
의key
를 삭제합니다. 다른 요청을 설정하고 조치를 취할 수 있도록 합니다. 이 논리는 이전에syn
을 단독으로 사용하는 것보다 훨씬 합리적입니다. 그러나 긴급 구매 작업 중에 예외가 발생하면키
는로 삭제할 수 없습니다.
. 결과적으로 다른 처리 요청이키
를 얻을 수 없으며 프로그램 논리가 교착 상태에 빠졌습니다! try...finally를 사용하여이 논리는 위의 다른 논리보다 더 엄격합니다. 그러나 정전, 시스템 충돌 등으로 인해 서버 집합이@RestController public class RedisController { // 引入String类型redis操作模板 @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private RedisLock redisLock; @RequestMapping("/setStock") public String setStock() { stringRedisTemplate.opsForValue().set("stock", "100"); return "ok"; } @RequestMapping("/deductStock") public String deductStock() { // 创建一个key,保存至redis String key = "lock"; String lock_value = UUID.randomUUID().toString(); try { boolean redisLock = this.redisLock.getRedisLock(key, lock_value);//获取锁 if (redisLock) { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } } finally { redisLock.unRedisLock(key,lock_value); //释放锁 } return "end"; } }로그인 후 복사로그인 후 복사다운타임
되는 경우 실행해야 할finally
의 명령문이 성공적으로 실행되지 않습니다! ! 또한키가 항상 존재
하여교착 상태
가 발생하는 것으로 보입니다! 타임아웃을 통해 위의 문제를 해결하세요setnx
설정을 성공적으로 마친 후 긴급 구매 코드 로직이 실행되기 전에 키의 제한 시간을 늘려주세요. rrreee하지만 위 코드의 로직에는 여전히 문제가 있습니다. 처리 로직에timeout
문제가 있는 경우. 로직이 실행되어 설정된 키 유효 시간을 초과하면 이때 어떤 문제가 발생하나요? 위 그림에서 문제를 명확하게 찾을 수 있습니다. 요청 실행 시간이 키의 유효 시간을 초과하는 경우. 새 요청이 실행되면 반드시 키를 가져오고 시간을 설정할 수 있습니다. 이때 Redis에 저장된 키는 요청 1의 키가 아니라 다른 요청에 의해 설정됩니다. 요청 1 실행이 완료되면 여기서 키가 삭제됩니다. 삭제되는 것은 다른 요청에 의해 설정된 키입니다!
依然出现了key形同虚设
的问题!如果失效一直存在,超卖问题依旧不会解决。
既然出现key形同虚设的现象,是否可以增加条件,当finally中需要执行删除操作时,获取数据判断值是否是该请求中对应的,如果是则删除,不是则不管!
修改上述代码如下所示:
/** * 模拟商品超卖代码
* 解决`deductStock6`中,key形同虚设的问题。 * * @return */ @RequestMapping("/deductStock5") public String deductStock5() { // 创建一个key,保存至redis String key = "lock"; String lock_value = UUID.randomUUID().toString(); // setnx //让设置key和设置key的有效时间都可以同时执行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, lock_value, 10, TimeUnit.SECONDS); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } try { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } finally { // 程序执行完成,则删除这个key // 放置于finally中,保证即使上述逻辑出问题,也能del掉 // 判断redis中该数据是否是这个接口处理时的设置的,如果是则删除 if(lock_value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(key))) { stringRedisTemplate.delete(key); } } return "end"; }
由于获得锁的线程必须执行完减库存逻辑才能释放锁,所以在此期间所有其他的线程都会由于没获得锁,而直接结束程序,导致有很多库存根本没有卖出去,所以这里应该可以优化,让没获得锁的线程等待,或者循环检查锁
我们将锁封装到一个实体类中,然后加入两个方法,加锁和解锁
@Component public class RedisLock { private final Logger log = LoggerFactory.getLogger(this.getClass()); private final long acquireTimeout = 10*1000; // 获取锁之前的超时时间(获取锁的等待重试时间) private final int timeOut = 20; // 获取锁之后的超时时间(防止死锁) @Autowired private StringRedisTemplate stringRedisTemplate; // 引入String类型redis操作模板 /** * 获取分布式锁 * @return 锁标识 */ public boolean getRedisLock(String lockName,String lockValue) { // 1.计算获取锁的时间 Long endTime = System.currentTimeMillis() + acquireTimeout; // 2.尝试获取锁 while (System.currentTimeMillis() < endTime) { //3. 获取锁成功就设置过期时间 让设置key和设置key的有效时间都可以同时执行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockName, lockValue, timeOut, TimeUnit.SECONDS); if (result) { return true; } } return false; } /** * 释放分布式锁 * @param lockName 锁名称 * @param lockValue 锁值 */ public void unRedisLock(String lockName,String lockValue) { if(lockValue.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(lockName))) { stringRedisTemplate.delete(lockName); } } }
@RestController public class RedisController { // 引入String类型redis操作模板 @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private RedisLock redisLock; @RequestMapping("/setStock") public String setStock() { stringRedisTemplate.opsForValue().set("stock", "100"); return "ok"; } @RequestMapping("/deductStock") public String deductStock() { // 创建一个key,保存至redis String key = "lock"; String lock_value = UUID.randomUUID().toString(); try { boolean redisLock = this.redisLock.getRedisLock(key, lock_value);//获取锁 if (redisLock) { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } } finally { redisLock.unRedisLock(key,lock_value); //释放锁 } return "end"; } }
可以看到失败的线程不会直接结束,而是会尝试重试,一直到重试结束时间,才会结束
实际上这个最终版依然存在3个问题
1、在finally流程中,由于是先判断在处理。如果判断条件结束后,获取到的结果为true。但是在执行del操作前,此时jvm在执行GC操作(为了保证GC操作获取GC roots根完全,会暂停java程序),导致程序暂停。在GC操作完成并恢复后,执行del操作时,当前被加锁的key是否仍然存在?
2、问题如图所示
위 내용은 Springboot가 Redis를 통합하여 과잉 판매 문제를 해결하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!