使用Redis實作分散式鎖定
redis特性介紹
1、支援豐富的資料類型,如String、List、Map 、Set、ZSet等。
2、支援資料持久化,RDB和AOF兩種方式
3、支援叢集工作模式,分區容錯性強
4、單線程,順序處理指令
5、支援交易
6、支援發布與訂閱
Redis實作分散式鎖定使用了SETNX指令:
SETNX key value
#將key的值設為value ,當且僅當key不存在。
若給定的key已經存在,則SETNX不做任何動作。
SETNX 是『SET if Not eXists』(如果不存在,則 SET)的簡寫。
可用版本:>= 1.0.0時間複雜度:O(1)回傳值:
設定成功,回傳 1 。
設定失敗,回到 0 。
redis> EXISTS job # job 不存在 (integer) 0 redis> SETNX job "programmer" # job 设置成功 (integer) 1 redis> SETNX job "code-farmer" # 尝试覆盖 job ,失败 (integer) 0 redis> GET job # 没有被覆盖 "programmer"
首先,我們需要封裝一個公共的Redis存取工具類別。這類需要注入RedisTemplate實例和ValueOperations實例,使用ValueOperations實例是因為Redis實作的分散式鎖定使用了最簡單的String類型。另外,我們需要封裝3個方法,分別是setIfObsent (String key, String value)、 expire (String key, long timeout, TimeUnit unit) 、delete (String key) ,分別對應Redis的SETNX、expire、del指令。以下是Redis存取工具類別的具體實作:
@Component public class RedisDao { @Autowired private RedisTemplate redisTemplate; @Resource(name="redisTemplate") private ValueOperations<Object, Object> valOpsObj; /** * 如果key不存在,就存储一个key-value,相当于SETNX命令 * @param key 键 * @param value 值,可以为空 * @return */ public boolean setIfObsent (String key, String value) { return valOpsObj.setIfAbsent(key, value); } /** * 为key设置失效时间 * @param key 键 * @param timeout 时间大小 * @param unit 时间单位 */ public boolean expire (String key, long timeout, TimeUnit unit) { return redisTemplate.expire(key, timeout, unit); } /** * 删除key * @param key 键 */ public void delete (String key) { redisTemplate.delete(key); } }
完成了Redis存取工具類別的實現,現在需要考慮的是如何去模擬競爭分散式鎖定。因為Redis本身就是支援分散式叢集的,所以只需要模擬出多執行緒處理業務場景。這裡採用線程池來模擬,以下是測試類別的具體實作:
@RestController @RequestMapping("test") public class TestController { private static final Logger LOG = LoggerFactory.getLogger(TestController.class); //日志对象 @Autowired private RedisDao redisDao; //定义的分布式锁key private static final String LOCK_KEY = "MyTestLock"; @RequestMapping(value={"testRedisLock"}, method=RequestMethod.GET) public void testRedisLock () { ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { executorService.submit(new Runnable() { @Override public void run() { //获取分布式锁 boolean flag = redisDao.setIfObsent(LOCK_KEY, "lock"); if (flag) { LOG.info(Thread.currentThread().getName() + ":获取Redis分布式锁成功"); //获取锁成功后设置失效时间 redisDao.expire(LOCK_KEY, 2, TimeUnit.SECONDS); try { LOG.info(Thread.currentThread().getName() + ":处理业务开始"); Thread.sleep(1000); //睡眠1000ms模拟处理业务 LOG.info(Thread.currentThread().getName() + ":处理业务结束"); //处理业务完成后删除锁 redisDao.delete(LOCK_KEY); } catch (InterruptedException e) { LOG.error("处理业务异常:", e); } } else { LOG.info(Thread.currentThread().getName() + ":获取Redis分布式锁失败"); } } }); } } }
透過上面這段程式碼,可能會產生以下幾個問題:
執行緒如果取得分散式鎖定失敗,為什麼不嘗試重新取得鎖?
線程取得分散式鎖定成功後,設定了鎖的失效時間,這個失效時間長短如何確定?
執行緒業務處理結束後,為什麼要做刪除鎖定的操作?
針對這幾個疑問,我們可以來討論下。
第一,Redis的SETNX指令,如果key已經存在,則不會做任何操作,所以SETNX實作的分散式鎖定並不是可重入鎖定。當然,也可以自己透過程式碼實現重試n次或直到取得到分散式鎖定為止。但是,這不能保證競爭的公平性,某個執行緒會因為一直等待鎖而阻塞。因此,Redis實作的分散式鎖定更適用於共享資源一寫多讀的場景。
第二,分散式鎖定必須設定失效時間,且失效時間必須大於業務處理所需的時間(確保資料一致性)。所以,在測試階段盡可能準確的預測出業務正常處理所需的時間,設定失效時間是防止因為業務處理流程的某些原因導致死鎖的情況。
第三,業務處理結束,必須要做刪除鎖定的操作。
上面設定分散式鎖和為鎖設定失效時間是透過兩個操作步驟完成的,更合理的方式應該是把設定分散式鎖和為鎖設定失效時間透過一個操作完成。要嘛都成功,要嘛都失敗。實作程式碼如下:
/** * Redis访问工具类 */ @Component public class RedisDao { private static Logger logger = LoggerFactory.getLogger(RedisDao.class); @Autowired private StringRedisTemplate stringRedisTemplate; /** * 设置分布式锁 * @param key 键 * @param value 值 * @param timeout 失效时间 * @return */ public boolean setDistributeLock (String key, String value, long timeout) { RedisConnection connection = null; boolean flag = false; try { //获取一个连接 connection = stringRedisTemplate.getConnectionFactory().getConnection(); //设置分布式锁的同时为锁设置失效时间 connection.set(key.getBytes(), value.getBytes(), Expiration.seconds(timeout), RedisStringCommands.SetOption.SET_IF_ABSENT); flag = true; } catch (Exception e) { logger.error("set automic lock error:", e); } finally { //使用后关闭连接 connection.close(); } return flag; } /** * 查询key的失效时间 * @param key 键 * @param timeUnit 时间单位 * @return */ public long ttl (String key, TimeUnit timeUnit) { return stringRedisTemplate.getExpire(key, timeUnit); } } /** * 单元测试类 */ @RunWith(SpringRunner.class) @SpringBootTest public class Demo1ApplicationTests { private static final Logger LOG = LoggerFactory.getLogger(Demo1ApplicationTests.class); @Autowired private RedisDao redisDao; @Test public void testDistributeLock () { String key = "MyDistributeLock"; //设置分布式锁,失效时间20s boolean result = redisDao.setDistributeLock(key, "1", 20); if (result) { LOG.info("设置分布式锁成功"); long ttl = redisDao.ttl(key, TimeUnit.SECONDS); LOG.info("{}距离失效还有{}s", key, ttl); } } }
執行單元測試類,結果如下:
2019-05-15 13:07:10.827 - 设置分布式锁成功 2019-05-15 13:07:10.838 - MyDistributeLock距离失效还有19s
更多Redis相關知識,請造訪Redis使用教學欄位!
以上是如何使用redis實現分散式鎖的詳細內容。更多資訊請關注PHP中文網其他相關文章!