Heim> Datenbank> Redis> Hauptteil

聊一聊分布式系统下基于Redis的分布式锁

青灯夜游
Freigeben: 2021-10-29 10:52:11
nach vorne
1444 Leute haben es durchsucht

加锁了,还有并发问题?Redis分布式锁你真的了解?下面本篇文章就来给大家聊一聊分布式系统下基于Redis的分布式锁,希望对大家有所帮助!

聊一聊分布式系统下基于Redis的分布式锁

新接手的项目,偶尔会出现账不平的问题。之前的技术老大临走时给的解释是:排查了,没找到原因,之后太忙就没再解决,可能是框架的原因……

既然项目交付到手中,这样的问题是必须要解决的。梳理了所有账务处理逻辑,最终找到了原因:数据库并发操作热点账户导致。就这这个问题,来聊一聊分布式系统下基于Redis的分布式锁。顺便也分解一下问题形成原因及解决方案。【相关推荐:Redis视频教程

原因分析

系统并发量并不高,存在热点账户,但也不至于那么严重。问题的根源在于系统架构设计,人为的制造了并发。场景是这样的:商户批量导入一批数据,系统会进行前置处理,并对账户余额进行增减。

此时,另外一个定时任务,也会对账户进行扫描更新。而且对同一账户的操作分布到各个系统当中,热点账户也就出现了。

针对此问题的解决方案,从架构层面可以考虑将账务系统进行抽离,集中在一个系统中进行处理,所有的数据库事务及执行顺序由账务系统来统筹处理。从技术方面来讲,则可以通过锁机制来对热点账户进行加锁。

本篇文章就针对热点账户基于分布式锁的实现方式进行详细的讲解。

锁的分析

在Java的多线程环境下,通常有几类锁可以使用:

  • JVM内存模型级别的锁,常用的有:synchronized、Lock等;
  • 数据库锁,比如乐观锁,悲观锁等;
  • 分布式锁;

JVM内存级别的锁,可以保证单体服务下线程的安全性,比如多个线程访问/修改一个全局变量。但当系统进行集群部署时,JVM级别的本地锁就无能为力了。

悲观锁与乐观锁

像上述案例中,热点账户就属于分布式系统中的共享资源,我们通常会采用数据库锁分布式锁来进行解决。

数据库锁,又分为乐观锁悲观锁

悲观锁是基于数据库(Mysql的InnoDB)提供的排他锁来实现的。在进行事务操作时,通过select ... for update语句,MySQL会对查询结果集中每行数据都添加排他锁,其他线程对该记录的更新与删除操作都会阻塞。从而达到共享资源的顺序执行(修改);

乐观锁是相对悲观锁而言的,乐观锁假设数据一般情况不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测。如果冲突则返回给用户异常信息,让用户决定如何去做。乐观锁适用于读多写少的场景,这样可以提高程序的吞吐量。在乐观锁实现时通常会基于记录状态或添加version版本来进行实现。

悲观锁失效场景

项目中使用了悲观锁,但悲观锁却失效了。这也是使用悲观锁时,常见的误区,下面来分析一下。

正常使用悲观锁的流程:

  • 通过select ... for update锁定记录;
  • 计算新余额,修改金额并存储;
  • 执行完成释放锁;

经常犯错的处理流程:

  • 查询账户余额,计算新余额;
  • 通过select ... for update锁定记录;
  • 修改金额并存储;
  • 执行完成释放锁;

错误的流程中,比如A和B服务查询到的余额都是100,A扣减50,B扣减40,然后A锁定记录,更新数据库为50;A释放锁之后,B锁定记录,更新数据库为60。显然,后者把前者的更新给覆盖掉了。解决的方案就是扩大锁的范围,将锁提前到计算新余额之前。

通常悲观锁对数据库的压力是非常大的,在实践中通常会根据场景使用乐观锁或分布式锁等方式来实现。

下面进入正题,讲讲基于Redis的分布式锁实现。

Redis分布式锁实战演习

这里以Spring Boot、Redis、Lua脚本为例来演示分布式锁的实现。为了简化处理,示例中Redis既承担了分布式锁的功能,也承担了数据库的功能。

场景构建

集群环境下,对同一个账户的金额进行操作,基本步骤:

  • 从数据库读取用户金额;
  • 程序修改金额;
  • 再将最新金额存储到数据库;

下面从最初不加锁,不同步处理,逐步推演出最终的分布式锁。

基础集成及类构建

准备一个不加锁处理的基础业务环境。

首先在Spring Boot项目中引入相关依赖:

 org.springframework.boot spring-boot-starter-data-redis   org.springframework.boot spring-boot-starter-web 
Nach dem Login kopieren

账户对应实体类UserAccount:

public class UserAccount { //用户ID private String userId; //账户内金额 private int amount; //添加账户金额 public void addAmount(int amount) { this.amount = this.amount + amount; } // 省略构造方法和getter/setter }
Nach dem Login kopieren

创建一个线程实现类AccountOperationThread:

public class AccountOperationThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(AccountOperationThread.class); private static final Long RELEASE_SUCCESS = 1L; private String userId; private RedisTemplate redisTemplate; public AccountOperationThread(String userId, RedisTemplate redisTemplate) { this.userId = userId; this.redisTemplate = redisTemplate; } @Override public void run() { noLock(); } /** * 不加锁 */ private void noLock() { try { Random random = new Random(); // 模拟线程进行业务处理 TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1); } catch (InterruptedException e) { e.printStackTrace(); } //模拟数据库中获取用户账号 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); // 金额+1 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模拟存回数据库 redisTemplate.opsForValue().set(userId, userAccount); } }
Nach dem Login kopieren

其中RedisTemplate的实例化交给了Spring Boot:

@Configuration public class RedisConfig { @Bean public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); // 设置value的序列化规则和 key的序列化规则 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } }
        
Nach dem Login kopieren

最后,再准备一个TestController来进行触发多线程的运行:

@RestController public class TestController { private final static Logger logger = LoggerFactory.getLogger(TestController.class); private static ExecutorService executorService = Executors.newFixedThreadPool(10); @Autowired private RedisTemplate redisTemplate; @GetMapping("/test") public String test() throws InterruptedException { // 初始化用户user_001到Redis,账户金额为0 redisTemplate.opsForValue().set("user_001", new UserAccount("user_001", 0)); // 开启10个线程进行同步测试,每个线程为账户增加1元 for (int i = 0; i < 10; i++) { logger.info("创建线程i=" + i); executorService.execute(new AccountOperationThread("user_001", redisTemplate)); } // 主线程休眠1秒等待线程跑完 TimeUnit.MILLISECONDS.sleep(1000); // 查询Redis中的user_001账户 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get("user_001"); logger.info("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount()); return "success"; } }
Nach dem Login kopieren

执行上述程序,正常来说10个线程,每个线程加1,结果应该是10。但多执行几次,会发现,结果变化很大,基本上都要比10小。

[pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 1 [pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 1 [pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 1 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 1 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 2 [pool-1-thread-2] c.s.redis.thread.AccountOperationThread : pool-1-thread-2 : user id : user_001 amount : 2 [pool-1-thread-5] c.s.redis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 2 [pool-1-thread-4] c.s.redis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 3 [pool-1-thread-1] c.s.redis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 4 [pool-1-thread-3] c.s.redis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 5 [nio-8080-exec-1] c.s.redis.controller.TestController : user id : user_001 amount : 5
Nach dem Login kopieren

以上述日志为例,前四个线程都将值改为1,也就是后面三个线程都将前面的修改进行了覆盖,导致最终结果不是10,只有5。这显然是有问题的。

Redis同步锁实现

针对上面的情况,在同一个JVM当中,我们可以通过线程加锁来完成。但在分布式环境下,JVM级别的锁是没办法实现的,这里可以采用Redis同步锁实现。

基本思路:第一个线程进入时,在Redis中进记录,当后续线程过来请求时,判断Redis是否存在该记录,如果存在则说明处于锁定状态,进行等待或返回。如果不存在,则进行后续业务处理。

/** * 1.抢占资源时判断是否被锁。 * 2.如未锁则抢占成功且加锁,否则等待锁释放。 * 3.业务完成后释放锁,让给其它线程。 * 

* 该方案并未解决同步问题,原因:线程获得锁和加锁的过程,并非原子性操作,可能会导致线程A获得锁,还未加锁时,线程B也获得了锁。 */ private void redisLock() { Random random = new Random(); try { TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1); } catch (InterruptedException e) { e.printStackTrace(); } while (true) { Object lock = redisTemplate.opsForValue().get(userId + ":syn"); if (lock == null) { // 获得锁 -> 加锁 -> 跳出循环 logger.info(Thread.currentThread().getName() + ":获得锁"); redisTemplate.opsForValue().set(userId + ":syn", "lock"); break; } try { // 等待500毫秒重试获得锁 TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } try { //模拟数据库中获取用户账号 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); if (userAccount != null) { //设置金额 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模拟存回数据库 redisTemplate.opsForValue().set(userId, userAccount); } } finally { //释放锁 redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁"); } }

Nach dem Login kopieren

在while代码块中,先判断对应用户ID是否在Redis中存在,如果不存在,则进行set加锁,如果存在,则跳出循环继续等待。

上述代码,看起来实现了加锁的功能,但当执行程序时,会发现与未加锁一样,依旧存在并发问题。原因是:获取锁和加锁的操作并不是原子的。比如两个线程发现lock都是null,都进行了加锁,此时并发问题依旧存在。

Redis原子性同步锁

针对上述问题,可将获取锁和加锁的过程原子化处理。基于spring-boot-data-redis提供的原子化API可以实现:

// 该方法使用了redis的指令:SETNX key value // 1.key不存在,设置成功返回value,setIfAbsent返回true; // 2.key存在,则设置失败返回null,setIfAbsent返回false; // 3.原子性操作; Boolean setIfAbsent(K var1, V var2);
Nach dem Login kopieren

上述方法的原子化操作是对Redis的setnx命令的封装,在Redis中setnx的使用如下实例:

redis> SETNX mykey "Hello" (integer) 1 redis> SETNX mykey "World" (integer) 0 redis> GET mykey "Hello"
Nach dem Login kopieren

第一次,设置mykey时,并不存在,则返回1,表示设置成功;第二次设置mykey时,已经存在,则返回0,表示设置失败。再次查询mykey对应的值,会发现依旧是第一次设置的值。也就是说redis的setnx保证了唯一的key只能被一个服务设置成功。

理解了上述API及底层原理,来看看线程中的实现方法代码如下:

/** * 1.原子操作加锁 * 2.竞争线程循环重试获得锁 * 3.业务完成释放锁 */ private void atomicityRedisLock() { //Spring data redis 支持的原子性操作 while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) { try { // 等待100毫秒重试获得锁 TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info(Thread.currentThread().getName() + ":获得锁"); try { //模拟数据库中获取用户账号 UserAccount userAccount = (UserAccount) redisTemplate.opsForValue().get(userId); if (userAccount != null) { //设置金额 userAccount.addAmount(1); logger.info(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount()); //模拟存回数据库 redisTemplate.opsForValue().set(userId, userAccount); } } finally { //释放锁 redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁"); } }
Nach dem Login kopieren

再次执行代码,会发现结果正确了,也就是说可以成功的对分布式线程进行了加锁。

Redis分布式锁的死锁

虽然上述代码执行结果没问题,但如果应用异常宕机,没来得及执行finally中释放锁的方法,那么其他线程则永远无法获得这个锁。

此时可采用setIfAbsent的重载方法:

Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);
Nach dem Login kopieren

基于该方法,可以设置锁的过期时间。这样即便获得锁的线程宕机,在Redis中数据过期之后,其他线程可正常获得该锁。

示例代码如下:

private void atomicityAndExRedisLock() { try { //Spring data redis 支持的原子性操作,并设置5秒过期时间 while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) { // 等待100毫秒重试获得锁 logger.info(Thread.currentThread().getName() + ":尝试循环获取锁"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":获得锁--------"); // 应用在这里宕机,进程退出,无法执行 finally; Thread.currentThread().interrupt(); // 业务逻辑... } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放锁 if (!Thread.currentThread().isInterrupted()) { redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁"); } } }
Nach dem Login kopieren

业务超时及守护线程

上面添加了Redis所的超时时间,看似解决了问题,但又引入了新的问题。

比如,正常情况下线程A在5秒内可正常处理完业务,但偶发会出现超过5秒的情况。如果将超时时间设置为5秒,线程A获得了锁,但业务逻辑处理需要6秒。此时,线程A还在正常业务逻辑,线程B已经获得了锁。当线程A处理完时,有可能将线程B的锁给释放掉。

在上述场景中有两个问题点:

  • 第一,线程A和线程B可能会同时在执行,存在并发问题。
  • 第二,线程A可能会把线程B的锁给释放掉,导致一系列的恶性循环。

当然,可以通过在Redis中设置value值来判断锁是属于线程A还是线程B。但仔细分析会发现,这个问题的本质是因为线程A执行业务逻辑耗时超出了锁超时的时间。

那么就有两个解决方案了:

  • 第一,将超时时间设置的足够长,确保业务代码能够在锁释放之前执行完成;
  • 第二,为锁添加守护线程,为将要过期释放但未释放的锁增加时间;

第一种方式需要全行大多数情况下业务逻辑的耗时,进行超时时间的设定。

第二种方式,可通过如下守护线程的方式来动态增加锁超时时间。

public class DaemonThread implements Runnable { private final static Logger logger = LoggerFactory.getLogger(DaemonThread.class); // 是否需要守护 主线程关闭则结束守护线程 private volatile boolean daemon = true; // 守护锁 private String lockKey; private RedisTemplate redisTemplate; public DaemonThread(String lockKey, RedisTemplate redisTemplate) { this.lockKey = lockKey; this.redisTemplate = redisTemplate; } @Override public void run() { try { while (daemon) { long time = redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS); // 剩余有效期小于1秒则续命 if (time < 1000) { logger.info("守护进程: " + Thread.currentThread().getName() + " 延长锁时间 5000 毫秒"); redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS); } TimeUnit.MILLISECONDS.sleep(300); } logger.info(" 守护进程: " + Thread.currentThread().getName() + "关闭 "); } catch (InterruptedException e) { e.printStackTrace(); } } // 主线程主动调用结束 public void stop() { daemon = false; } }
Nach dem Login kopieren

上述线程每隔300毫秒获取一下Redis中锁的超时时间,如果小于1秒,则延长5秒。当主线程调用关闭时,守护线程也随之关闭。

主线程中相关代码实现:

private void deamonRedisLock() { //守护线程 DaemonThread daemonThread = null; //Spring data redis 支持的原子性操作,并设置5秒过期时间 String uuid = UUID.randomUUID().toString(); String value = Thread.currentThread().getId() + ":" + uuid; try { while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) { // 等待100毫秒重试获得锁 logger.info(Thread.currentThread().getName() + ":尝试循环获取锁"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":获得锁----"); // 开启守护线程 daemonThread = new DaemonThread(userId + ":syn", redisTemplate); Thread thread = new Thread(daemonThread); thread.start(); // 业务逻辑执行10秒... TimeUnit.MILLISECONDS.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放锁 这里也需要原子操作,今后通过 Redis + Lua 讲 String result = (String) redisTemplate.opsForValue().get(userId + ":syn"); if (value.equals(result)) { redisTemplate.delete(userId + ":syn"); logger.info(Thread.currentThread().getName() + ":释放锁-----"); } //关闭守护线程 if (daemonThread != null) { daemonThread.stop(); } } }
Nach dem Login kopieren

其中在获得锁之后,开启守护线程,在finally中将守护线程关闭。

基于Lua脚本的实现

在上述逻辑中,我们是基于spring-boot-data-redis提供的原子化操作来保证锁判断和执行的原子化的。在非Spring Boot项目中,则可以基于Lua脚本来实现。

首先定义加锁和解锁的Lua脚本及对应的DefaultRedisScript对象,在RedisConfig配置类中添加如下实例化代码:

@Configuration public class RedisConfig { //lock script private static final String LOCK_SCRIPT = " if redis.call('setnx',KEYS[1],ARGV[1]) == 1 " + " then redis.call('expire',KEYS[1],ARGV[2]) " + " return 1 " + " else return 0 end "; private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call" + "('del', KEYS[1]) else return 0 end"; // ... 省略部分代码 @Bean public DefaultRedisScript lockRedisScript() { DefaultRedisScript defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(Boolean.class); defaultRedisScript.setScriptText(LOCK_SCRIPT); return defaultRedisScript; } @Bean public DefaultRedisScript unlockRedisScript() { DefaultRedisScript defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(Long.class); defaultRedisScript.setScriptText(UNLOCK_SCRIPT); return defaultRedisScript; } }
Nach dem Login kopieren

再通过在AccountOperationThread类中新建构造方法,将上述两个对象传入类中(省略此部分演示)。然后,就可以基于RedisTemplate来调用了,改造之后的代码实现如下:

private void deamonRedisLockWithLua() { //守护线程 DaemonThread daemonThread = null; //Spring data redis 支持的原子性操作,并设置5秒过期时间 String uuid = UUID.randomUUID().toString(); String value = Thread.currentThread().getId() + ":" + uuid; try { while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + ":syn"), value, 5)) { // 等待1000毫秒重试获得锁 logger.info(Thread.currentThread().getName() + ":尝试循环获取锁"); TimeUnit.MILLISECONDS.sleep(1000); } logger.info(Thread.currentThread().getName() + ":获得锁----"); // 开启守护线程 daemonThread = new DaemonThread(userId + ":syn", redisTemplate); Thread thread = new Thread(daemonThread); thread.start(); // 业务逻辑执行10秒... TimeUnit.MILLISECONDS.sleep(10000); } catch (InterruptedException e) { logger.error("异常", e); } finally { //使用Lua脚本:先判断是否是自己设置的锁,再执行删除 // key存在,当前值=期望值时,删除key;key存在,当前值!=期望值时,返回0; Long result = redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId + ":syn"), value); logger.info("redis解锁:{}", RELEASE_SUCCESS.equals(result)); if (RELEASE_SUCCESS.equals(result)) { if (daemonThread != null) { //关闭守护线程 daemonThread.stop(); logger.info(Thread.currentThread().getName() + ":释放锁---"); } } } }
Nach dem Login kopieren

其中while循环中加锁和finally中的释放锁都是基于Lua脚本来实现了。

Redis锁的其他因素

除了上述实例,在使用Redis分布式锁时,还可以考虑以下情况及方案。

Redis锁的不可重入

当线程在持有锁的情况下再次请求加锁,如果一个锁支持一个线程多次加锁,那么这个锁就是可重入的。如果一个不可重入锁被再次加锁,由于该锁已经被持有,再次加锁会失败。Redis可通过对锁进行重入计数,加锁时加 1,解锁时减 1,当计数归 0时释放锁。

可重入锁虽然高效但会增加代码的复杂性,这里就不举例说明了。

等待锁释放

有的业务场景,发现被锁则直接返回。但有的场景下,客户端需要等待锁释放然后去抢锁。上述示例就属于后者。针对等待锁释放也有两种方案:

  • 客户端轮训:当未获得锁时,等待一段时间再重新获取,直到成功。上述示例就是基于这种方式实现的。这种方式的缺点也很明显,比较耗费服务器资源,当并发量大时会影响服务器的效率。
  • 使用Redis的订阅发布功能:当获取锁失败时,订阅锁释放消息,获取锁成功后释放时,发送释放消息。

集群中的主备切换和脑裂

在Redis包含主从同步的集群部署方式中,如果主节点挂掉,从节点提升为主节点。如果客户端A在主节点加锁成功,指令还未同步到从节点,此时主节点挂掉,从节点升为主节点,新的主节点中没有锁的数据。这种情况下,客户端B就可能加锁成功,从而出现并发的场景。

当集群发生脑裂时,Redis master节点跟slave 节点和 sentinel 集群处于不同的网络分区。sentinel集群无法感知到master的存在,会将 slave 节点提升为 master 节点,此时就会存在两个不同的 master 节点。从而也会导致并发问题的出现。Redis Cluster集群部署方式同理。

小结

通过生产环境中的一个问题,排查原因,寻找解决方案,到最终对基于Redis分布式的深入研究,这便是学习的过程。

同时,每当面试或被问题如何解决分布式共享资源时,我们会脱口而出”基于Redis实现分布式锁“,但通过本文的学习会发现,Redis分布式锁并不是万能的,而且在使用的过程中还需要注意超时、死锁、误解锁、集群选主/脑裂等问题。

Redis以高性能著称,但在实现分布式锁的过程中还是存在一些问题。因此,基于Redis的分布式锁可以极大的缓解并发问题,但要完全防止并发,还是得从数据库层面入手。

源码地址:https://github.com/secbr/springboot-all/tree/master/springboot-redis-lock

更多编程相关知识,请访问:编程入门!!

Das obige ist der detaillierte Inhalt von聊一聊分布式系统下基于Redis的分布式锁. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:juejin.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage
Über uns Haftungsausschluss Sitemap
Chinesische PHP-Website:Online-PHP-Schulung für das Gemeinwohl,Helfen Sie PHP-Lernenden, sich schnell weiterzuentwickeln!