Redis做分布式锁的常见问题和解决⽅案
redis 做分布式锁的三个核⼼要素:
1、加锁
最简单的命令是setnx,key是锁的唯⼀标识,按业务来决定命名,value为当前线程的线程ID。当⼀个线程执⾏setnx返回1,说明key原本不存在,该线程成功得到了锁,当其他线程执⾏setnx返回0,说明key已经存在,该线程抢锁失败。
2、解锁
当得到锁的线程执⾏完任务,需要释放锁,以便其他线程可以进⼊。释放锁的最简单⽅式是执⾏del指令。
3、锁超时
如果⼀个得到锁的线程在执⾏任务的过程中挂掉,来不及显式地释放锁,这块资源将会永远被锁住,别的线程再也别想进来。所以,setnx 的key必须设置⼀个超时时间,以保证即使没有被显式释放,这把锁也要在⼀定时间后⾃动释放。setnx不⽀持超时参数,所以需要额外的指令,
expire(key, time):
Redis做分布式可能出现的问题:
1、 setnx和expire的⾮原⼦性
2、超时后使⽤del 导致误删其他线程的锁。
A线程持有锁,但是因为任务运⾏耗时较长,锁过期了。B线程获取到锁,B还没执⾏完,但是A执⾏完了,锁被释放掉,误删除。
3、并发的可靠性问题
解决办法:
1、java中jedisCluster客户端,提供
set(final String key, final String value, final String nxxx, final String expx, final long time)
相当于是setnx和expire的组合包装,但是具有原⼦性。
2、基于Redis的分布式锁框架redisson。
Redisson是⼀个企业级的开源Redis Client,也提供了分布式锁的⽀持。
加锁机制
线程去获取锁,获取成功: 执⾏lua脚本,保存数据到redis数据库。
线程去获取锁,获取失败: ⼀直通过while循环尝试获取锁,获取成功后,执⾏lua脚本,保存数据到redis数据库。
watch dog⾃动延期机制
⼯作线程未完成任务,但是到了过期时间,还想延长,可以通过看门狗机制,不断延长锁的过期时间。
Redision实现分布式锁的原理
@Override
public RLock getLock(String name) {
return new CommandExecutor(), name);
}
拿到实例后进⾏锁定
// 获取锁
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
// 可中断的获取锁
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 获取当前线程的id
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
// 采⽤不断循环的⽅式获取锁
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
// 异步⽅式获取锁,
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = ConnectionManager
().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLI ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
}
Long ttlRemaining = Now();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
// ⽤lua 脚本保证Redis事务特性
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = Millis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
⾮常重要的⼀点,Redisson在获取锁的时候,采⽤信号量竞争机制,也就是多个线程获取锁,只有⼀个线程获取到锁,其他的线程会进⼊阻塞状态,防⽌⽆效的轮询⽽浪费资源。所以,接着看订阅scribe函数
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return PUBSUB.subscribe(getEntryName(), getChannelName(), ConnectionManager().getSubscribeService()); }
public RFuture<E> subscribe(final String entryName, final String channelName, final PublishSubscribeService subscribeService) {
// 原⼦类⾃增
final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
// 设定信号量
final AsyncSemaphore semaphore = Semaphore(new ChannelName(channelName));
final RPromise<E> newPromise = new RedissonPromise<E>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
());
}
};
Runnable listener = new Runnable() {
@Override
public void run() {
E entry = (entryName);
if (entry != null) {
entry.aquire();
return;
}
E value = createEntry(newPromise);
value.aquire();
E oldValue = entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.aquire();
return;
}
RedisPubSubListener<Object> listener = createListener(channelName, value);
subscribeService.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
}
};
semaphore.acquire(listener);
listenerHolder.set(listener);
return newPromise;
}
接着看如何释放锁:
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (Cause();
} else {
throw e;
}
}
}
}
@Override
public RFuture<Void> unlockAsync(final long threadId) {
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
cancelExpirationRenewal(threadId);
return;
}
Boolean opStatus = Now();
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId);
return;eval是做什么的
}
if (opStatus) {
cancelExpirationRenewal(null);
}
}
});
return result;
}
// lua 脚本做异步处理,释放Redis锁
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论