目前實(shí)現(xiàn)分布式鎖的方式主要有數(shù)據(jù)庫,、Redis和Zookeeper三種,,本文主要闡述利用Redis的相關(guān)命令來實(shí)現(xiàn)分布式鎖。
相關(guān)Redis命令
SETNX
如果當(dāng)前中沒有值,,則將其設(shè)置為并返回1,,否則返回0。
EXPIRE
將設(shè)置為秒后自動(dòng)過期,。
GETSET
將的值設(shè)置為,并返回其原來的舊值,。如果原來沒有舊值,,則返回nil。
EVAL與EVALSHA
Redis 2.6之后支持的功能,,可以將一段lua腳本發(fā)送到Redis服務(wù)器運(yùn)行,。
起——分布式鎖初探
利用SETNX命令的原子性,我們可以簡(jiǎn)單的實(shí)現(xiàn)一個(gè)初步的分布式鎖(這里原理就不詳述了,,直接上偽代碼):
boolean tryLock(String key, int lockSeconds) { if (SETNX key '1' == 1) { EXPIRE key lockSeconds return true } else { return false } } boolean unlock(String key) { DEL key }
tryLock是一個(gè)非阻塞的分布式鎖方法,,在獲得鎖失敗后會(huì)立即返回。如果需要一個(gè)阻塞式的鎖方法,,可以將tryLock方法包裝為輪詢(以一定的時(shí)間間隔來輪詢,,這很重要,否則Redis會(huì)吃不消?。?。
此種方法看似沒有什么問題,但其實(shí)則有一個(gè)漏洞:在加鎖的過程中,,客戶端順序的向Redis服務(wù)器發(fā)送了SETNX和EXPIRE命令,,那么假設(shè)在SETNX命令執(zhí)行完成之后,在EXPIRE命令發(fā)出去之前客戶端發(fā)生崩潰(或客戶端與Redis服務(wù)器的網(wǎng)絡(luò)連接突然斷掉),,導(dǎo)致EXPIRE命令沒有得到執(zhí)行,,其他客戶端將會(huì)發(fā)生永久死鎖!
承——分布式鎖的改進(jìn)
更新:此方法解鎖存在漏洞,,具體見最文后的追加內(nèi)容,。
為解決上面提出的問題,可以在加鎖時(shí)在key中存儲(chǔ)這個(gè)鎖過期的時(shí)間(當(dāng)前客戶端時(shí)間戳+鎖時(shí)間),,然后在獲取鎖失敗時(shí),,取出value與當(dāng)前客戶端時(shí)間進(jìn)行比較,如果確定是已經(jīng)過期的鎖,,則可以確認(rèn)發(fā)生了上面描述的錯(cuò)誤情況,,此時(shí)可以使用DEL清掉這個(gè)key,然后再重新嘗試去獲得這個(gè)鎖,??梢詥??當(dāng)然不可以!如果沒辦法保證DEL操作和下次SETNX操作之間的原子性,,則還是會(huì)產(chǎn)生一個(gè)競(jìng)態(tài)條件,,比如這樣:
C1 DEL key C1 SETNX key C2 DEL key C2 SETNX key
當(dāng)Redis服務(wù)器收到這樣的指令序列時(shí),C1和C2的SETNX都同時(shí)返回了1,,此時(shí)C1和C2都認(rèn)為自己拿到了鎖,,這種情況明顯是不符合預(yù)期的。
為解決這個(gè)問題,,Redis的GETSET命令就派上用場(chǎng)了,。客戶端可以使用GETSET命令去設(shè)置自己的過期時(shí)間,,然后得到的返回值與之前GET到的返回值進(jìn)行比較,,如果不同,則表示這個(gè)過期的鎖被其他客戶端搶占了(此時(shí)GETSET命令其實(shí)已經(jīng)生效,,也就是說key中的過期時(shí)間已經(jīng)被修改,,不過此誤差很小,可以忽略不計(jì)),。
根據(jù)上面的分析思路,,可以得出一個(gè)改進(jìn)后的分布式鎖,這里直接給出Java的實(shí)現(xiàn)代碼:
public class RedisLock { private static final Logger logger = LoggerFactory.getLogger(RedisLock.class); private final StringRedisTemplate stringRedisTemplate; private final byte[] lockKey; public RedisLock(StringRedisTemplate stringRedisTemplate, String lockKey) { this.stringRedisTemplate = stringRedisTemplate; this.lockKey = lockKey.getBytes(); } private boolean tryLock(RedisConnection conn, int lockSeconds) throws Exception { long nowTime = System.currentTimeMillis(); long expireTime = nowTime + lockSeconds * 1000 + 1000; // 容忍不同服務(wù)器時(shí)間有1秒內(nèi)的誤差 if (conn.setNX(lockKey, longToBytes(expireTime))) { conn.expire(lockKey, lockSeconds); return true; } else { byte[] oldValue = conn.get(lockKey); if (oldValue != null && bytesToLong(oldValue) < nowTime) { // 這個(gè)鎖已經(jīng)過期了,,可以獲得它 // PS: 如果setNX和expire之間客戶端發(fā)生崩潰,,可能會(huì)出現(xiàn)這樣的情況 byte[] oldValue2 = conn.getSet(lockKey, longToBytes(expireTime)); if (Arrays.equals(oldValue, oldValue2)) { // 獲得了鎖 conn.expire(lockKey, lockSeconds); return true; } else { // 被別人搶占了鎖(此時(shí)已經(jīng)修改了lockKey中的值,不過誤差很小可以忽略) return false; } } } return false; } /** * 嘗試獲得鎖,,成功返回true,,如果失敗或異常立即返回false * * @param lockSeconds 加鎖的時(shí)間(秒),超過這個(gè)時(shí)間后鎖會(huì)自動(dòng)釋放 */ public boolean tryLock(final int lockSeconds) { return stringRedisTemplate.execute(new RedisCallback() { @Override public Boolean doInRedis(RedisConnection conn) throws DataAccessException { try { return tryLock(conn, lockSeconds); } catch (Exception e) { logger.error('tryLock Error', e); return false; } } }); } /** * 輪詢的方式去獲得鎖,,成功返回true,,超過輪詢次數(shù)或異常返回false * * @param lockSeconds 加鎖的時(shí)間(秒),超過這個(gè)時(shí)間后鎖會(huì)自動(dòng)釋放 * @param tryIntervalMillis 輪詢的時(shí)間間隔(毫秒) * @param maxTryCount 最大的輪詢次數(shù) */ public boolean tryLock(final int lockSeconds, final long tryIntervalMillis, final int maxTryCount) { return stringRedisTemplate.execute(new RedisCallback() { @Override public Boolean doInRedis(RedisConnection conn) throws DataAccessException { int tryCount = 0; while (true) { if (++tryCount >= maxTryCount) { // 獲取鎖超時(shí) return false; } try { if (tryLock(conn, lockSeconds)) { return true; } } catch (Exception e) { logger.error('tryLock Error', e); return false; } try { Thread.sleep(tryIntervalMillis); } catch (InterruptedException e) { logger.error('tryLock interrupted', e); return false; } } } }); } /** * 如果加鎖后的操作比較耗時(shí),,調(diào)用方其實(shí)可以在unlock前根據(jù)時(shí)間判斷下鎖是否已經(jīng)過期 * 如果已經(jīng)過期可以不用調(diào)用,,減少一次請(qǐng)求 */ public void unlock() { stringRedisTemplate.delete(new String(lockKey)); } public byte[] longToBytes(long value) { ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE); buffer.putLong(value); return buffer.array(); } public long bytesToLong(byte[] bytes) { if (bytes.length != Long.SIZE / Byte.SIZE) { throw new IllegalArgumentException('wrong length of bytes!'); } return ByteBuffer.wrap(bytes).getLong(); } }
轉(zhuǎn)——分布式鎖的優(yōu)化
更新:此方法解鎖存在漏洞,具體見本后最后的追加內(nèi)容,。
以上的分布式鎖實(shí)現(xiàn)邏輯已經(jīng)較為復(fù)雜,,涉及到了較多的Redis命令,并使得每一次嘗試加鎖的過程都會(huì)有至少2次的Redis命令執(zhí)行,,這也就意味著至少兩次與Redis服務(wù)器的網(wǎng)絡(luò)通信,。而添加后面復(fù)雜邏輯的原因只是因?yàn)镾ETNX與EXPIRE這兩條命令執(zhí)行的原子性無法得到保證。(有些同學(xué)會(huì)提到Redis的pipeline特性,,此處明顯不適用,,因?yàn)榈诙l指令的執(zhí)行以來與第一條執(zhí)行的結(jié)果,,pipeline無法實(shí)現(xiàn))
另外,上面的分布式鎖還有一個(gè)問題,,那就是服務(wù)器之間時(shí)間同步的問題,。在分布式場(chǎng)景中,多臺(tái)服務(wù)器之間的時(shí)間做到同步是非常困難的,,所以在代碼中我加了1秒的時(shí)間容錯(cuò),,但依賴服務(wù)器時(shí)間的同步還是可能會(huì)不靠譜的。
從Redis 2.6開始,,客戶端可以直接向Redis服務(wù)器提交Lua腳本,,也就是說可以直接在Redis服務(wù)器來執(zhí)行一些較復(fù)雜的邏輯,而此腳本的提交對(duì)于客戶端來說是相對(duì)原子性的,。這恰好解決了我們的問題,!
我們可以用一個(gè)這樣的lua腳本來描述加鎖的邏輯(關(guān)于腳本的提交命令和Redis的相關(guān)規(guī)則可以看https:///commands/eval):
if (redis.call('setnx', KEYS[1], ARGV[1]) == 1) then redis.call('expire', KEYS[1], tonumber(ARGV[2])) return true else return false end
注意:此腳本中命令的執(zhí)行并不是嚴(yán)格意義上的原子性,,如果其中第二條指令EXPIRE執(zhí)行失敗,,整個(gè)腳本執(zhí)行會(huì)返回錯(cuò)誤,但是第一條指令SETNX仍然是已經(jīng)生效的,!不過此種情況基本可以認(rèn)為是Redis服務(wù)器已經(jīng)崩潰(除非是開發(fā)階段就可以排除的參數(shù)錯(cuò)誤之類的問題),,那么鎖的安全性就已經(jīng)不是這里可以關(guān)注的點(diǎn)了。這里認(rèn)為對(duì)客戶端來說是相對(duì)原子性的就足夠了,。
這個(gè)簡(jiǎn)單的腳本在Redis服務(wù)器得到執(zhí)行,,并返回是否得到鎖。因?yàn)槟_本的提交執(zhí)行只有一條Redis命令,,就避免了上面所說的客戶端異常問題,。
使用腳本優(yōu)化了鎖的邏輯和性能,這里給出最終的Java實(shí)現(xiàn)代碼:
public class RedisLock { private static final Logger logger = LoggerFactory.getLogger(RedisLock.class); private final StringRedisTemplate stringRedisTemplate; private final String lockKey; private final List keys; /** * 使用腳本在redis服務(wù)器執(zhí)行這個(gè)邏輯可以在一定程度上保證此操作的原子性 * (即不會(huì)發(fā)生客戶端在執(zhí)行setNX和expire命令之間,,發(fā)生崩潰或失去與服務(wù)器的連接導(dǎo)致expire沒有得到執(zhí)行,,發(fā)生永久死鎖) * * 除非腳本在redis服務(wù)器執(zhí)行時(shí)redis服務(wù)器發(fā)生崩潰,不過此種情況鎖也會(huì)失效 */ private static final RedisScript SETNX_AND_EXPIRE_SCRIPT; static { StringBuilder sb = new StringBuilder(); sb.append('if (redis.call('setnx', KEYS[1], ARGV[1]) == 1) then\n'); sb.append('\tredis.call('expire', KEYS[1], tonumber(ARGV[2]))\n'); sb.append('\treturn true\n'); sb.append('else\n'); sb.append('\treturn false\n'); sb.append('end'); SETNX_AND_EXPIRE_SCRIPT = new RedisScriptImpl(sb.toString(), Boolean.class); } public RedisLock(StringRedisTemplate stringRedisTemplate, String lockKey) { this.stringRedisTemplate = stringRedisTemplate; this.lockKey = lockKey; this.keys = Collections.singletonList(lockKey); } private boolean doTryLock(int lockSeconds) throws Exception { return stringRedisTemplate.execute(SETNX_AND_EXPIRE_SCRIPT, keys, '1', String.valueOf(lockSeconds)); } /** * 嘗試獲得鎖,,成功返回true,,如果失敗立即返回false * * @param lockSeconds 加鎖的時(shí)間(秒),超過這個(gè)時(shí)間后鎖會(huì)自動(dòng)釋放 */ public boolean tryLock(int lockSeconds) { try { return doTryLock(lockSeconds); } catch (Exception e) { logger.error('tryLock Error', e); return false; } } /** * 輪詢的方式去獲得鎖,,成功返回true,,超過輪詢次數(shù)或異常返回false * * @param lockSeconds 加鎖的時(shí)間(秒),超過這個(gè)時(shí)間后鎖會(huì)自動(dòng)釋放 * @param tryIntervalMillis 輪詢的時(shí)間間隔(毫秒) * @param maxTryCount 最大的輪詢次數(shù) */ public boolean tryLock(final int lockSeconds, final long tryIntervalMillis, final int maxTryCount) { int tryCount = 0; while (true) { if (++tryCount >= maxTryCount) { // 獲取鎖超時(shí) return false; } try { if (doTryLock(lockSeconds)) { return true; } } catch (Exception e) { logger.error('tryLock Error', e); return false; } try { Thread.sleep(tryIntervalMillis); } catch (InterruptedException e) { logger.error('tryLock interrupted', e); return false; } } } /** * 如果加鎖后的操作比較耗時(shí),,調(diào)用方其實(shí)可以在unlock前根據(jù)時(shí)間判斷下鎖是否已經(jīng)過期 * 如果已經(jīng)過期可以不用調(diào)用,,減少一次請(qǐng)求 */ public void unlock() { stringRedisTemplate.delete(lockKey); } private static class RedisScriptImpl implements RedisScript { private final String script; private final String sha1; private final Class resultType; public RedisScriptImpl(String script, Class resultType) { this.script = script; this.sha1 = DigestUtils.sha1DigestAsHex(script); this.resultType = resultType; } @Override public String getSha1() { return sha1; } @Override public Class getResultType() { return resultType; } @Override public String getScriptAsString() { return script; } } }
合——小節(jié)
最后,此文內(nèi)容只是筆者自己學(xué)習(xí)折騰出來的結(jié)果,,如果還有什么筆者沒有考慮到的bug存在,,還請(qǐng)不吝指出,,大家一起學(xué)習(xí)進(jìn)步~
追——解鎖漏洞(更新)
經(jīng)過慎重考慮,發(fā)現(xiàn)以上實(shí)現(xiàn)的分布式鎖有一個(gè)較為嚴(yán)重的解鎖漏洞:因?yàn)榻怄i操作只是做了簡(jiǎn)單的DEL KEY,,如果某客戶端在獲得鎖后執(zhí)行業(yè)務(wù)的時(shí)間超過了鎖的過期時(shí)間,,則最后的解鎖操作會(huì)誤解掉其他客戶端的操作。
為解決此問題,,我們?cè)趧?chuàng)建RedisLock對(duì)象時(shí)用本機(jī)時(shí)間戳和UUID來創(chuàng)建一個(gè)絕對(duì)唯一的lockValue,,然后在加鎖時(shí)存入此值,并在解鎖前用GET取出值進(jìn)行比較,,如果匹配才做DEL,。這里依然需要用LUA腳本保證整個(gè)解鎖過程的原子性。
這里給出修復(fù)此漏洞并做了一些小優(yōu)化之后的代碼:
import java.util.Collections; import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DigestUtils; import org.springframework.data.redis.core.script.RedisScript; /** * Created On 10/24 2017 * Redis實(shí)現(xiàn)的分布式鎖(不可重入) * 此對(duì)象非線程安全,,使用時(shí)務(wù)必注意 */ public class RedisLock { private static final Logger logger = LoggerFactory.getLogger(RedisLock.class); private final StringRedisTemplate stringRedisTemplate; private final String lockKey; private final String lockValue; private boolean locked = false; /** * 使用腳本在redis服務(wù)器執(zhí)行這個(gè)邏輯可以在一定程度上保證此操作的原子性 * (即不會(huì)發(fā)生客戶端在執(zhí)行setNX和expire命令之間,,發(fā)生崩潰或失去與服務(wù)器的連接導(dǎo)致expire沒有得到執(zhí)行,發(fā)生永久死鎖) * * 除非腳本在redis服務(wù)器執(zhí)行時(shí)redis服務(wù)器發(fā)生崩潰,,不過此種情況鎖也會(huì)失效 */ private static final RedisScript SETNX_AND_EXPIRE_SCRIPT; static { StringBuilder sb = new StringBuilder(); sb.append('if (redis.call('setnx', KEYS[1], ARGV[1]) == 1) then\n'); sb.append('\tredis.call('expire', KEYS[1], tonumber(ARGV[2]))\n'); sb.append('\treturn true\n'); sb.append('else\n'); sb.append('\treturn false\n'); sb.append('end'); SETNX_AND_EXPIRE_SCRIPT = new RedisScriptImpl(sb.toString(), Boolean.class); } private static final RedisScript DEL_IF_GET_EQUALS; static { StringBuilder sb = new StringBuilder(); sb.append('if (redis.call('get', KEYS[1]) == ARGV[1]) then\n'); sb.append('\tredis.call('del', KEYS[1])\n'); sb.append('\treturn true\n'); sb.append('else\n'); sb.append('\treturn false\n'); sb.append('end'); DEL_IF_GET_EQUALS = new RedisScriptImpl(sb.toString(), Boolean.class); } public RedisLock(StringRedisTemplate stringRedisTemplate, String lockKey) { this.stringRedisTemplate = stringRedisTemplate; this.lockKey = lockKey; this.lockValue = UUID.randomUUID().toString() + '.' + System.currentTimeMillis(); } private boolean doTryLock(int lockSeconds) throws Exception { if (locked) { throw new IllegalStateException('already locked!'); } locked = stringRedisTemplate.execute(SETNX_AND_EXPIRE_SCRIPT, Collections.singletonList(lockKey), lockValue, String.valueOf(lockSeconds)); return locked; } /** * 嘗試獲得鎖,,成功返回true,如果失敗立即返回false * * @param lockSeconds 加鎖的時(shí)間(秒),,超過這個(gè)時(shí)間后鎖會(huì)自動(dòng)釋放 */ public boolean tryLock(int lockSeconds) { try { return doTryLock(lockSeconds); } catch (Exception e) { logger.error('tryLock Error', e); return false; } } /** * 輪詢的方式去獲得鎖,,成功返回true,超過輪詢次數(shù)或異常返回false * * @param lockSeconds 加鎖的時(shí)間(秒),,超過這個(gè)時(shí)間后鎖會(huì)自動(dòng)釋放 * @param tryIntervalMillis 輪詢的時(shí)間間隔(毫秒) * @param maxTryCount 最大的輪詢次數(shù) */ public boolean tryLock(final int lockSeconds, final long tryIntervalMillis, final int maxTryCount) { int tryCount = 0; while (true) { if (++tryCount >= maxTryCount) { // 獲取鎖超時(shí) return false; } try { if (doTryLock(lockSeconds)) { return true; } } catch (Exception e) { logger.error('tryLock Error', e); return false; } try { Thread.sleep(tryIntervalMillis); } catch (InterruptedException e) { logger.error('tryLock interrupted', e); return false; } } } /** * 解鎖操作 */ public void unlock() { if (!locked) { throw new IllegalStateException('not locked yet!'); } locked = false; // 忽略結(jié)果 stringRedisTemplate.execute(DEL_IF_GET_EQUALS, Collections.singletonList(lockKey), lockValue); } private static class RedisScriptImpl implements RedisScript { private final String script; private final String sha1; private final Class resultType; public RedisScriptImpl(String script, Class resultType) { this.script = script; this.sha1 = DigestUtils.sha1DigestAsHex(script); this.resultType = resultType; } @Override public String getSha1() { return sha1; } @Override public Class getResultType() { return resultType; } @Override public String getScriptAsString() { return script; } } }
出處:http:///2017/10/25/redis-distributed-lock/
|