Nacos服务注册的原理
Nacos 服务注册需要具备的能⼒:
服务提供者把⾃⼰的协议地址注册到Nacos server
服务消费者需要从Nacos Server上去查询服务提供者的地址(根据服务名称)
Nacos Server需要感知到服务提供者的上下线的变化
服务消费者需要动态感知到Nacos Server端服务地址的变化
作为注册中⼼所需要的能⼒⼤多如此,我们需要做的是理解各种注册中⼼的独有特性,总结他们的共性。
Nacos的实现原理:
下⾯我们先来了解⼀下 Nacos 注册中⼼的实现原理,通过下⾯这张图来说明。
图中的流程是⼤家所熟悉的,不同的是在Nacos 中,服务注册时在服务端本地会通过轮询注册中⼼集节点地址进⾏服务得注册,在注册中⼼上,即Nacos Server上采⽤了Map保存实例信息,当然配置了持久化的服务会被保存到数据库中,在服务的调⽤⽅,为了保证本地服务实例列表的动态感知,Nacos与其他注册中⼼不同的是,采⽤了
Pull/Push同时运作的⽅式。通过这些我们对Nacos注册中⼼的原理有了⼀定的了解。我们从源码层⾯去验证这些理论知识。
Nacos的源码分析(结合spring-cloud-alibaba +dubbo +nacos 的整合):
服务注册的流程:
在基于Dubbo服务发布的过程中,⾃动装配是⾛的事件监听机制,在 DubboServiceRegistrationNonWebApplicationAutoConfiguration 这个类中,这个类会监听 ApplicationStartedEvent 事件,这个事件是spring boot在2.0新增的,就是当spring boot应⽤启动完成之后会发布这个时间。⽽此时监听到这个事件之后,会触发注册的动作。
@EventListener(ApplicationStartedEvent.class)
public void onApplicationStarted() {
setServerPort();
register();
}
private void register() {
if (registered) {
return;
}
registered = true;
}
this.serviceRegistry。是spring-cloud提供的接⼝实现(org.springframework.cloud.client.serviceregistry.ServiceRegistry).很显然注⼊的实例是: NacosServiceRegistry
然后进⼊到实现类的注册⽅法:
@Override
public void register(Registration registration) {
if (StringUtils.ServiceId())) {
log.warn("No service to register for ");
return;
}
//对应当前应⽤的application.name
String serviceId = ServiceId();
//表⽰nacos上的分组配置
String group = Group();
/
/表⽰服务实例信息
Instance instance = getNacosInstanceFromRegistration(registration);
try {
//通过命名服务进⾏注册
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
}
catch (Exception e) {
<("nacos registry, {} {},", serviceId,
/
/ rethrow a RuntimeException if the registration is failed.
// issue : github/alibaba/spring-cloud-alibaba/issues/1132
rethrowRuntimeException(e);
}
}
接下去就是开始注册实例,主要做两个动作
1. 如果当前注册的是临时节点,则构建⼼跳信息,通过beat反应堆来构建⼼跳任务
2. 调⽤registerService发起服务注册
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
////是否是临时节点,如果是临时节点,则构建⼼跳信息
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.GroupedName(serviceName, groupName));
beatInfo.Ip());
beatInfo.Port());
beatInfo.ClusterName());
beatInfo.Weight());
beatInfo.Metadata());
beatInfo.setScheduled(false);
//beatReactor,添加⼼跳信息进⾏处理
beatReactor.GroupedName(serviceName, groupName), beatInfo);
}
//调⽤服务代理类进⾏注册
}
然后调⽤ NamingProxy 的注册⽅法进⾏注册,代码逻辑很简单,构建请求参数,发起请求。
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
namespaceId, serviceName, instance);
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, ClusterName());
params.put("ip", Ip());
params.put("port", String.Port()));
params.put("weight", String.Weight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", Metadata()));
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
往下⾛我们就会发现上⾯提到的,服务在进⾏注册的时候会轮询配置好的注册中⼼的地址:
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
throw new IllegalArgumentException("no server available");
}
Exception exception = new Exception();
//如果服务地址不为空
if (servers != null && !servers.isEmpty()) {
//随机获取⼀台服务器节点
Random random = new Random(System.currentTimeMillis());
int index = Int(servers.size());
// 遍历服务列表
for (int i = 0; i < servers.size(); i++) {
String server = (index);//获得索引位置的服务节点
try {//调⽤指定服务
return callServer(api, params, server, method);
springboot原理流程} catch (NacosException e) {
exception = e;
("request {} failed.", server, e);
} catch (Exception e) {
exception = e;
("request {} failed.", server, e);
}
//轮询
index = (index + 1) % servers.size();
}
// ..........
}
最后通过 callServer(api, params, server, method) 发起调⽤,这⾥通过 JSK⾃带的 HttpURLConnection 进⾏发起调⽤。我们可以通过断点的⽅式来看到这⾥的请求参数:
期间可能会有多个 GET的请求获取服务列表,是正常的,会发现有如上的⼀个请求,会调⽤ 192.168.200.1:8848/nacos/v1/ns/instance 这个地址。那么接下去就是Nacos Server 接受到服务端的注册请求的处理流程。需要下载Nacos Server 源码,
源码下载可以参考:
Nacos服务端的处理:
服务端提供了⼀个InstanceController类,在这个类中提供了服务注册相关的API,⽽服务端发起注册时,调⽤的接⼝是: [post]: /nacos/v1/ns/instance ,serviceName: 代表客户端的项⽬名称,namespace:
nacos 的namespace。
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String serviceName = quired(request, CommonParams.SERVICE_NAME);
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// 从请求中解析出instance实例
final Instance instance = parseInstance(request);
return "ok";
}
然后调⽤ ServiceManager 进⾏服务的注册
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//创建⼀个空服务,在Nacos控制台服务列表展⽰的服务信息,实际上是初始化⼀个serviceMap,它是⼀个ConcurrentHashMap集合
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
//从serviceMap中,根据namespaceId和serviceName得到⼀个服务对象
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
//调⽤addInstance创建⼀个服务实例
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
在创建空的服务实例的时候我们发现了存储实例的map:
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
//从serviceMap中获取服务对象
Service service = getService(namespaceId, serviceName);
if (service == null) {//如果为空。则初始化
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.GroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
alculateChecksum();
if (cluster != null) {
cluster.setService(service);
ClusterMap().Name(), cluster);
}
service.validate();
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
在 getService ⽅法中我们发现了Map:
/**
* Map(namespace, Map(group::serviceName, Service)).
*/
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
通过注释我们可以知道,Nacos是通过不同的 namespace 来维护服务的,⽽每个namespace下有不同的group,不同的group下才有对应的Service ,再通过这个 serviceName 来确定服务实例。 第⼀次进来则会进⼊初始化,初始化完会调⽤ putServiceAndInit
private void putServiceAndInit(Service service) throws NacosException {
putService(service);//把服务信息保存到serviceMap集合
service.init();//建⽴⼼跳检测机制
//实现数据⼀致性监听,ephemeral(标识服务是否为临时服务,默认是持久化的,也就是true)=true表⽰采⽤raft协议,false表⽰采⽤Distro
consistencyService
.listen(KeyBuilder.NamespaceId(), Name(), true), service);
consistencyService
.
listen(KeyBuilder.NamespaceId(), Name(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", Json());
}
获取到服务以后把服务实例添加到集合中,然后基于⼀致性协议进⾏数据的同步。然后调⽤ addInstance
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, ips)
throws NacosException {
// 组装key
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
// 获取刚刚组装的服务
Service service = getService(namespaceId, serviceName);
synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 也就是上⼀步实现监听的类⾥添加注册服务
consistencyService.put(key, instances);
}
}
然后给服务注册⽅发送注册成功的响应。结束服务注册流程。其中细节后续慢慢分析。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论