> 데이터 베이스 > Redis > Redis 분산 잠금 인스턴스 분석

Redis 분산 잠금 인스턴스 분석

WBOY
풀어 주다: 2023-05-31 19:32:49
앞으로
1056명이 탐색했습니다.

분산 잠금 개요

다중 스레드 환경에서 동시에 하나의 스레드에서만 코드 블록에 액세스할 수 있도록 하기 위해 Java에서는 일반적으로 동기화된 구문과 ReetrantLock을 사용하여 이것이 실제로 이루어지도록 할 수 있습니다. 로컬 잠금 방법. 그러나 이제 기업은 분산 아키텍처를 채택하고 있습니다. 분산 환경에서 서로 다른 노드의 스레드가 동시에 실행되도록 하려면 어떻게 해야 할까요? 따라서 분산 시스템 간의 공유 리소스에 대한 상호 배타적인 액세스를 제어하는 ​​방법인 분산 잠금이 도입되었습니다. 분산 시스템에서는 여러 서비스가 여러 컴퓨터에 배포됩니다. 클라이언트의 사용자가 데이터 삽입 요청을 시작할 때 분산 잠금 메커니즘이 보장되지 않으면 해당 여러 컴퓨터의 여러 서비스가 동시 삽입 작업으로 인해 반복될 수 있습니다. 중복 데이터를 허용하지 않는 일부 기업에 문제를 일으킬 수 있는 데이터 삽입. 분산 잠금 메커니즘은 이와 같은 문제를 해결하고 여러 서비스 간의 공유 리소스에 대한 상호 배타적 액세스를 보장하기 위한 것입니다. 한 서비스가 분산 잠금을 확보하고 다른 서비스가 잠금을 획득하지 못하면 후속 작업이 수행되지 않습니다. 일반적인 의미는 아래 그림과 같습니다.

Redis 분산 잠금 인스턴스 분석분산 잠금의 특징

분산 잠금은 일반적으로 다음과 같은 특징을 갖습니다.

    상호 배타성: 동시에 하나의 스레드만이 잠금을 보유할 수 있습니다
  • 재진입: 동일한 노드의 동일한 스레드가 잠금을 획득한 후 다시 잠금을 획득할 수 있습니다.
  • 잠금 시간 초과: 교착 상태를 방지하기 위해 J.U.C의 잠금과 같은 잠금 시간 초과를 지원합니다.
  • 고성능 및 고가용성: 잠금 및 잠금 해제는 효율적이어야 하며 분산 잠금 실패를 방지하기 위해 고가용성을 보장해야 합니다
  • 차단 및 비차단 속성 보유: 시간 내에 차단 상태에서 깨어날 수 있음
  • 분산 구현 방법 locks

우리는 일반적으로 다음과 같은 방법으로 분산 잠금을 구현합니다.

    데이터베이스 기반
  • Redis 기반
  • 주키퍼 기반
  • Redis의 일반적인 분산 잠금 문제

말하기 Redis의 분산 잠금을 사용하는 경우 대부분의 사람들은 다음을 생각할 것입니다: setnx+lua(redis는 Lua 스크립트를 실행할 때 다른 작업이 수행되지 않도록 보장하여 작업의 원자성을 보장합니다) 또는 set를 알고 있습니다. 키 값 px 밀리초 nx code>. 후자 방식의 핵심 구현 명령어는 다음과 같습니다.

- 获取锁(unique_value可以是UUID等)
SET resource_name unique_value NX PX 30000

- 释放锁(lua脚本中,一定要比较value,防止误解锁)
if redis.call("get",KEYS[1]) == ARGV[1] then 
    return redis.call("del",KEYS[1])
else   
    return 0
end
로그인 후 복사

이 구현 방식에는 크게 3가지 포인트가 있습니다. (이 역시 면접 확률이 매우 높다는 점이기도 합니다.) setnx+lua(redis保证执行lua脚本时不执行其他操作,保证操作的原子性),或者知道set key value px milliseconds nx。后一种方式的核心实现命令如下:

<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.3.2</version>
</dependency>
로그인 후 복사
로그인 후 복사

这种实现方式有3大要点(也是面试概率非常高的地方):

  1. set命令要用set key value px milliseconds nx

  2. set 명령어는 set 키 값을 사용합니다. px milliseconds nx< /code>;

  3. 값은 고유해야 합니다.

잠금을 해제할 때 값을 확인해야 하며 잠금을 오해할 수 없습니다.

  1. 사실 가장 큰 단점은 이러한 유형의 잠금은 Redis 노드에서 잠겨 있을 때만 작동한다는 것입니다. Redis가 Sentinel을 통해 고가용성을 보장하더라도 마스터 노드가 어떤 이유로 마스터-슬레이브로 전환하면 잠금이 손실됩니다.

  2. Redis 마스터 노드 잠금

  3. 그러나 잠긴 키가 슬레이브 노드에 동기화되지 않았습니다.

  4. 마스터 오류가 발생하고 슬레이브 노드가 마스터 노드로 업그레이드되었습니다. 자물쇠가 분실되었습니다.

Single Point of Failure 문제를 피하기 위해 Redis 작성자 antirez는 분산 환경을 기반으로 한 보다 발전된 분산 잠금 구현 방법인 Redlock을 제안했습니다. Redlock은 Redis의 모든 분산 잠금 구현 중에서 면접관을 최고조로 만들 수 있는 유일한 방법이기도 합니다.

Redis 고급 분산 잠금: Redlock

antirez의 redlock 알고리즘은 대략 다음과 같습니다.

Redis 분산 환경에서는 N개의 Redis 마스터가 있다고 가정합니다. 이러한 노드는 서로 완전히 독립적이며 마스터-슬레이브 복제 또는 기타 클러스터 조정 메커니즘이 없습니다. Redis 단일 인스턴스에서와 동일한 방법을 사용하여 N 인스턴스에 대한 잠금을 획득하고 해제할 것입니다. 이제 우리는 5개의 Redis 마스터 노드가 있다고 가정하고, 이 Redis 인스턴스가 동시에 다운되지 않도록 5개의 서버에서 이러한 Redis 인스턴스를 실행해야 합니다. 잠금을 얻으려면 클라이언트는 다음 작업을 수행해야 합니다.

    현재 Unix 시간을 밀리초 단위로 가져옵니다.
  • 동일한 키와
  • 고유 값

    (예: UUID)을 사용하여 5개 인스턴스에서 순차적으로 잠금을 획득해 보세요. Redis에서 잠금을 요청할 때 클라이언트는 네트워크 연결 및 응답 시간 초과를 설정해야 하며, 이는 잠금 만료 시간보다 작아야 합니다. 예를 들어 잠금 자동 만료 시간 TTL이 10초인 경우 제한 시간은 5~50밀리초 사이여야 합니다. 이렇게 하면 서버 측 Redis가 중단되고 클라이언트가 여전히 응답 결과를 기다리고 있는 상황을 피할 수 있습니다. 서버가 지정된 시간 내에 응답하지 않으면 클라이언트는 가능한 한 빨리 다른 Redis 인스턴스에서 잠금을 얻으려고 시도해야 합니다.

  • 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功

  • 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。

  • 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

  • 此处不讨论时钟漂移

Redis 분산 잠금 인스턴스 분석

Redlock源码

redisson已经有对redlock算法封装,接下来对其用法进行简单介绍,并对核心源码进行分析(假设5个redis实例)。

1. Redlock依赖

<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.3.2</version>
</dependency>
로그인 후 복사
로그인 후 복사

2. Redlock用法

首先,我们来看一下redission封装的redlock算法实现的分布式锁用法,非常简单,跟重入锁(ReentrantLock)有点类似:

Config config = new Config();
config.useSentinelServers().addSentinelAddress("127.0.0.1:6369","127.0.0.1:6379", "127.0.0.1:6389")
        .setMasterName("masterName")
        .setPassword("password").setDatabase(0);
RedissonClient redissonClient = Redisson.create(config);
// 还可以getFairLock(), getReadWriteLock()
RLock redLock = redissonClient.getLock("REDLOCK_KEY");
boolean isLock;
try {
    isLock = redLock.tryLock();
    // 500ms拿不到锁, 就认为获取锁失败。10000ms即10s是锁失效时间。
    isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
    if (isLock) {
        //TODO if get lock success, do something;
    }
} catch (Exception e) {
} finally {
    // 无论如何, 最后都要解锁
    redLock.unlock();
}
로그인 후 복사

3. Redlock唯一ID

实现分布式锁的一个非常重要的点就是set的value要具有唯一性,redisson的value是怎样保证value的唯一性呢?答案是UUID+threadId。入口在redissonClient.getLock(“REDLOCK_KEY”),源码在Redisson.java和RedissonLock.java中:

protected final UUID id = UUID.randomUUID();
String getLockName(long threadId) {
    return id + ":" + threadId;
}
로그인 후 복사

4. Redlock获取锁

获取锁的代码为redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),两者的最终核心源码都是下面这段代码,只不过前者获取锁的默认租约时间(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    // 获取锁时向5个redis实例发送的命令
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              // 首先分布式锁的KEY不能存在,如果确实不存在,那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通过pexpire设置失效时间(也是锁的租约时间)
              "if (redis.call(&#39;exists&#39;, KEYS[1]) == 0) then " +
                  "redis.call(&#39;hset&#39;, KEYS[1], ARGV[2], 1); " +
                  "redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              // 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间
              "if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[2], 1); " +
                  "redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              // 获取分布式锁的KEY的失效时间毫秒数
              "return redis.call(&#39;pttl&#39;, KEYS[1]);",
              // 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2]
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
로그인 후 복사

获取锁的命令中,

  • KEYS[1]就是Collections.singletonList(getName()),表示分布式锁的key,即REDLOCK_KEY;

  • ARGV[1]就是internalLockLeaseTime,即锁的租约时间,默认30s;

  • ARGV[2]就是getLockName(threadId),是获取锁时set的唯一值,即UUID+threadId:

5. Redlock释放锁

释放锁的代码为redLock.unlock(),核心源码如下:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    // 向5个redis实例都执行如下命令
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 如果分布式锁KEY不存在,那么向channel发布一条消息
            "if (redis.call(&#39;exists&#39;, KEYS[1]) == 0) then " +
                "redis.call(&#39;publish&#39;, KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            // 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
            "if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            // 如果就是当前线程占有分布式锁,那么将重入次数减1
            "local counter = redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[3], -1); " +
            // 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除
            "if (counter > 0) then " +
                "redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                // 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息
                "redis.call(&#39;del&#39;, KEYS[1]); " +
                "redis.call(&#39;publish&#39;, KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            // 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}
로그인 후 복사

Redis实现的分布式锁轮子

下面利用SpringBoot + Jedis + AOP的组合来实现一个简易的分布式锁。

1. 自定义注解

自定义一个注解,被注解的方法会执行获取分布式锁的逻辑

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RedisLock {
    /**
     * 业务键
     *
     * @return
     */
    String key();
    /**
     * 锁的过期秒数,默认是5秒
     *
     * @return
     */
    int expire() default 5;

    /**
     * 尝试加锁,最多等待时间
     *
     * @return
     */
    long waitTime() default Long.MIN_VALUE;
    /**
     * 锁的超时时间单位
     *
     * @return
     */
    TimeUnit timeUnit() default TimeUnit.SECONDS;
}
로그인 후 복사

2. AOP拦截器实现

在AOP中我们去执行获取分布式锁和释放分布式锁的逻辑,代码如下:

@Aspect
@Component
public class LockMethodAspect {
    @Autowired
    private RedisLockHelper redisLockHelper;
    @Autowired
    private JedisUtil jedisUtil;
    private Logger logger = LoggerFactory.getLogger(LockMethodAspect.class);

    @Around("@annotation(com.redis.lock.annotation.RedisLock)")
    public Object around(ProceedingJoinPoint joinPoint) {
        Jedis jedis = jedisUtil.getJedis();
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();

        RedisLock redisLock = method.getAnnotation(RedisLock.class);
        String value = UUID.randomUUID().toString();
        String key = redisLock.key();
        try {
            final boolean islock = redisLockHelper.lock(jedis,key, value, redisLock.expire(), redisLock.timeUnit());
            logger.info("isLock : {}",islock);
            if (!islock) {
                logger.error("获取锁失败");
                throw new RuntimeException("获取锁失败");
            }
            try {
                return joinPoint.proceed();
            } catch (Throwable throwable) {
                throw new RuntimeException("系统异常");
            }
        }  finally {
            logger.info("释放锁");
            redisLockHelper.unlock(jedis,key, value);
            jedis.close();
        }
    }
}
로그인 후 복사

3. Redis实现分布式锁核心类

@Component
public class RedisLockHelper {
    private long sleepTime = 100;
    /**
     * 直接使用setnx + expire方式获取分布式锁
     * 非原子性
     *
     * @param key
     * @param value
     * @param timeout
     * @return
     */
    public boolean lock_setnx(Jedis jedis,String key, String value, int timeout) {
        Long result = jedis.setnx(key, value);
        // result = 1时,设置成功,否则设置失败
        if (result == 1L) {
            return jedis.expire(key, timeout) == 1L;
        } else {
            return false;
        }
    }

    /**
     * 使用Lua脚本,脚本中使用setnex+expire命令进行加锁操作
     *
     * @param jedis
     * @param key
     * @param UniqueId
     * @param seconds
     * @return
     */
    public boolean Lock_with_lua(Jedis jedis,String key, String UniqueId, int seconds) {
        String lua_scripts = "if redis.call(&#39;setnx&#39;,KEYS[1],ARGV[1]) == 1 then" +
                "redis.call(&#39;expire&#39;,KEYS[1],ARGV[2]) return 1 else return 0 end";
        List<String> keys = new ArrayList<>();
        List<String> values = new ArrayList<>();
        keys.add(key);
        values.add(UniqueId);
        values.add(String.valueOf(seconds));
        Object result = jedis.eval(lua_scripts, keys, values);
        //判断是否成功
        return result.equals(1L);
    }

    /**
     * 在Redis的2.6.12及以后中,使用 set key value [NX] [EX] 命令
     *
     * @param key
     * @param value
     * @param timeout
     * @return
     */
    public boolean lock(Jedis jedis,String key, String value, int timeout, TimeUnit timeUnit) {
        long seconds = timeUnit.toSeconds(timeout);
        return "OK".equals(jedis.set(key, value, "NX", "EX", seconds));
    }

    /**
     * 自定义获取锁的超时时间
     *
     * @param jedis
     * @param key
     * @param value
     * @param timeout
     * @param waitTime
     * @param timeUnit
     * @return
     * @throws InterruptedException
     */
    public boolean lock_with_waitTime(Jedis jedis,String key, String value, int timeout, long waitTime,TimeUnit timeUnit) throws InterruptedException {
        long seconds = timeUnit.toSeconds(timeout);
        while (waitTime >= 0) {
            String result = jedis.set(key, value, "nx", "ex", seconds);
            if ("OK".equals(result)) {
                return true;
            }
            waitTime -= sleepTime;
            Thread.sleep(sleepTime);
        }
        return false;
    }
    /**
     * 错误的解锁方法—直接删除key
     *
     * @param key
     */
    public void unlock_with_del(Jedis jedis,String key) {
        jedis.del(key);
    }

    /**
     * 使用Lua脚本进行解锁操纵,解锁的时候验证value值
     *
     * @param jedis
     * @param key
     * @param value
     * @return
     */
    public boolean unlock(Jedis jedis,String key,String value) {
        String luaScript = "if redis.call(&#39;get&#39;,KEYS[1]) == ARGV[1] then " +
                "return redis.call(&#39;del&#39;,KEYS[1]) else return 0 end";
        return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L);
    }
}
로그인 후 복사

4. Controller层控制

定义一个TestController来测试我们实现的分布式锁

@RestController
public class TestController {
    @RedisLock(key = "redis_lock")
    @GetMapping("/index")
    public String index() {
        return "index";
    }
}
로그인 후 복사

위 내용은 Redis 분산 잠금 인스턴스 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:yisu.com
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿