springboot整合curator实现分布式锁
理论篇:
Curator是Netflix开源的⼀套ZooKeeper客户端框架. Netflix在使⽤ZooKeeper的过程中发现ZooKeeper⾃带的客户端太底层, 应⽤⽅在使⽤的时候需要⾃⼰处理很多事情, 于是在它的基础上包装了⼀下, 提供了⼀套更好⽤的客户端框架. Netflix在⽤ZooKeeper的过程中遇到的问题, 我们也遇到了, 所以开始研究⼀下, ⾸先从他在github上的源码, wiki⽂档以及Netflix的技术blog⼊⼿.
看完官⽅的⽂档之后, 发现Curator主要解决了三类问题:
封装ZooKeeper client与ZooKeeper server之间的连接处理;
提供了⼀套Fluent风格的操作API;
提供ZooKeeper各种应⽤场景(recipe, ⽐如共享锁服务, 集领导选举机制)的抽象封装.
Curator列举的ZooKeeper使⽤过程中的⼏个问题
初始化连接的问题: 在client与server之间握⼿建⽴连接的过程中, 如果握⼿失败, 执⾏所有的同步⽅法(⽐如create, getData等)将抛出异常
⾃动恢复(failover)的问题: 当client与⼀台server的连接丢失,并试图去连接另外⼀台server时, client将回到初始连接模式
session过期的问题: 在极端情况下, 出现ZooKeeper session过期, 客户端需要⾃⼰去监听该状态并重新创建ZooKeeper实例 .
对可恢复异常的处理:当在server端创建⼀个有序ZNode, ⽽在将节点名返回给客户端时崩溃, 此时client端抛出可恢复的异常, ⽤户需要⾃⼰捕获这些异常并进⾏重试
使⽤场景的问题:Zookeeper提供了⼀些标准的使⽤场景⽀持, 但是ZooKeeper对这些功能的使⽤说明⽂档很少, ⽽且很容易⽤错. 在⼀些极端场景下如何处理, zk并没有给出详细的⽂档说明. ⽐如共享锁服务, 当服务器端创建临时顺序节点成功, 但是在客户端接收到节点名之前挂掉了,如果不能很好的处理这种情况, 将导致死锁.
Curator主要从以下⼏个⽅⾯降低了zk使⽤的复杂性:
重试机制:提供可插拔的重试机制, 它将给捕获所有可恢复的异常配置⼀个重试策略, 并且内部也提供了⼏种标准的重试策略(⽐如指数补偿). 连接状态监控: Curator初始化之后会⼀直的对zk连接进⾏监听, ⼀旦发现连接状态发⽣变化, 将作出相应的处理.
zk客户端实例管理:Curator对zk客户端到server集连接进⾏管理. 并在需要的情况, 重建zk实例, 保证与zk集的可靠连接
各种使⽤场景⽀持:Curator实现zk⽀持的⼤部分使⽤场景⽀持(甚⾄包括zk⾃⾝不⽀持的场景), 这些实现都遵循了zk的最佳实践, 并考虑了各种极端情况.
Curator通过以上的处理, 让⽤户专注于⾃⾝的业务本⾝, ⽽⽆需花费更多的精⼒在zk本⾝.
实操篇:
CuratorFrameworkFactory类提供了两个⽅法, ⼀个⼯⼚⽅法newClient, ⼀个构建⽅法build. 使⽤⼯⼚⽅法newClient可以创建⼀个默认的实例, ⽽build构建⽅法可以对实例进⾏定制. 当CuratorFramework实例构建完成, 紧接着调⽤start()⽅法, 在应⽤结束的时候, 需要调⽤close()⽅法.  CuratorFramework是线程安全的. 在⼀个应⽤中可以共享同⼀个zk集的CuratorFramework.
核⼼对象CuratorFramework的创建如下:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
springboot原理pdf
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
需要使⽤分布式锁的地⽅,代码如下:
String lockOn= "test";
InterProcessMutex mutex = new InterProcessMutex(curatorFramework,lockOn);
boolean locked =mutex.acquire(0,TimeUnit.SECONDS);
//finally部分
分布式锁常⽤于定时任务,使⽤⾃定义注解,使⽤spring aspect around,在真正的代码执⾏之前尝试获取锁,获取不到直接退出,获取到锁的,执⾏具体业务,代码如下:
@Aspect
public class DistributedLockAspect{
@Pointcut("@annotation(com.**.**.DistributedLock")
public void methodAspect(){};
@Around("methodAspect()")
public Object execute(ProceedingJoinPoint joinPoint) throws Exception{
String lockPath = "/opt/zookeeper/lock";
InterProcessMutex mutex = new InterProcessMutex(cruatorFramework,lockPath);
try{
boolean locked = mutex.acquire(0,TimeUnit.SECONDS);
if(!locked){
return null;
}else{
return joinPoint.proceed();
}
}catch(Exception e){
e.printStackTrace();
}finally{
}
}
}
⾃定义注解:
1 @Target(ElementType.METHOD)
2 @Retention(RetentionPolicy.RUNTIME)
3public @interface DistributedLock{
4    String lockPath();
5 }
注意事项:
1.  CuratorFramework对象建议在应⽤中做单例处理,在具体使⽤处注⼊使⽤, 并在应⽤结束前销毁,代码如下:@Configration
public class CuratorConfigration{
@Bean
public CuratorFramework initCuratorFramework(){
//忽略
// 参照前⾯ CuratorFramework 对象创建部分
}
}
2. 在aspect部分将curatorFramework对象进⾏关闭
@PreDestroy
public void destroy(){
CloseableUtils.closeQuietly(curatorFramework);
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。