在使用分散式鎖進行互斥資源訪問時候,我們很多方案是採用redis的實現。固然,redis的單節點鎖在極端情況也是有問題的,假設你的業務允許偶爾的失效,使用單節點的redis鎖方案就足夠了,簡單而且效率高。redis鎖失效的情況:
客戶端1從master節點獲取了鎖master宕機了,儲存鎖的key還沒來得及同步到slave節點上slave升級為master客戶端2從新的master上獲取到同一個資源的鎖於是,客戶端1和客戶端2同時持有了同一個資源的鎖,鎖的安全性被打破。如果我們不考慮這種極端情況,需要實現一個基於單節點redis鎖的大致流程:
set cache_key random_seed NX PX 30000
上面這個set命令拆解開就是:
setnx cache_key random_seed expire cache_key 30
雖然這兩組命令執行的效果一樣,但是第二個是非原子性操作,如果執行了setnx成功,但是expire失敗的話,就會造成這個key一直存在了,無法釋放的情況。
redis的作者也指出,在使用單節點redis鎖的時候,設定一個隨機種子作為key的值是很有必要的,保證了一個客戶端釋放的鎖必須是自己所持有的那個鎖。假設獲取鎖時set的不是一個隨機數,而是一個固定值,那麼可能會出現下面的情況:
客戶端1獲取鎖成功客戶端1在某個操作上阻塞了很長時間過期時間到了,鎖自動釋放(但是在客戶端1看來自己還是持有鎖中)客戶端2獲取到了對應同一個資源的鎖客戶端1從阻塞中恢復了,釋放掉自己持有的鎖,也就是釋放掉了客戶端2持有的鎖客戶端2的鎖被客戶端1是否,失去安全性。釋放鎖的操作,很多人直接用del命令,這會有很大的問題,保證不了這個key是被加鎖人鎖刪。這時候需要用到隨機數了。釋放鎖的操作有三步:
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1])else return 0end
這段指令碼在執行的時候,需要把前面的隨機數作為argv[1] 的值傳進去,把cache_key作為keys[1]的值傳進去。
public class RedisLockHelper { @Resource private R2mClusterClient r2mClusterClient; /** * 類似於setNx的功能,同時設定過期時間為expire毫秒 * * @param key 加鎖key * @param value 確保在加鎖時間內的唯一因子 * @param expire 過期時間的毫秒數 * @return */ private String setLock(String key, String value, long expire) { return this.set(key, value, "NX", "PX", expire); } /** * 刪除指定key value * 如果 r2m中 key 對應的value==value 返回 1 * 如果 r2m中 key 對應的value!=value 返回 0 * * @param key * @return */ private boolean atomDelete(String key, String value) { List<String> values = new ArrayList<>(); values.add(value); String sb = "if redis.call('get',KEYS[1])==ARGV[1] then " + " return redis.call('del',KEYS[1]) " + " else " + " return 0" + " end"; if (this.eval(sb, key, values) == 1) { return true; } return false; } private Long eval(String mobel, String key, List<String> value) { return (Long) this.r2mClusterClient.eval(mobel, key, value); } private String set(String key, String value, String nxxx, String expx, long time) { return this.r2mClusterClient.set(key, value, nxxx, expx, time); }}
r2mClusterClient 就是jedis客戶端的封裝。
作者|程式設計師安安|OSCHINA
最新評論