Dubbo中服务注册与发现实现原理
我们知道,现在⼀般微服务为了更好的管理都会提供对应的服务注册与服务发现机制,Dubbo作为⼀个RPC框架和微服务组件也提供了服务注册和服务发现机制,接下来我们看看Dubbo是怎么实现的
服务注册
我们还是基于之前
我们知道,在对外发布服务的时候是通过port来进⾏服务的暴露发布的,⽽服务注册的时机也是在这个时
候,port,会调⽤doExportUrls来进⾏服务发布和注册:
private void doExportUrls(){
ServiceRepository repository = ServiceRepository();
ServiceDescriptor serviceDescriptor = isterService(getInterfaceClass());
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);
微服务注册中心有哪些
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this,true);
for(ProtocolConfig protocolConfig : protocols){
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p +"/"+ path)
.orElse(path), group, version);
/
/ In case user specified path, register service one more time to map it to path.
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
List registryURLs = ConfigValidationUtils.loadRegistries(this, true);
这个⽅法会判断当前是否是有注册中⼼的配置,系统是否配置了如下配置:
<=beijing
那么这时候会重新⽣成基于注册中⼼的URL,会将URL.protocol设置为registry,这个时候暴露服务的URL就改变了,我们接着往下看:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs){
Map<String, String> map =buildAttributes(protocolConfig);
URL url =buildUrl(protocolConfig, registryURLs, map);
exportUrl(url, registryURLs);
}
private void exportUrl(URL url, List<URL> registryURLs){
String scope = Parameter(SCOPE_KEY);
if(!SCOPE_NONE.equalsIgnoreCase(scope)){
if(!SCOPE_REMOTE.equalsIgnoreCase(scope)){
exportLocal(url);
}
if(!SCOPE_LOCAL.equalsIgnoreCase(scope)){
url =exportRemote(url, registryURLs);
MetadataUtils.publishServiceDefinition(url);
}
}
this.urls.add(url);
}
private URL exportRemote(URL url, List<URL> registryURLs){
if(CollectionUtils.isNotEmpty(registryURLs)){
for(URL registryURL : registryURLs){
if(SERVICE_REGISTRY_PROTOCOL.Protocol())){
url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY,"true");
}
if(LOCAL_PROTOCOL.Protocol())){
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, Parameter(DYNAMIC_KEY));                URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if(monitorUrl !=null){
url = url.putAttribute(MONITOR_KEY, monitorUrl);
}
String proxy = Parameter(PROXY_KEY);
if(StringUtils.isNotEmpty(proxy)){
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
doExportUrl(registryURL.putAttribute(EXPORT_KEY, url),true);
}
}else{
if(Name().ServiceInterface())){
MetadataUtils.saveMetadataURL(url);
}
doExportUrl(url,true);
}
return url;
}
private void doExportUrl(URL url,boolean withMetaData){
Invoker<?> invoker = Invoker(ref,(Class) interfaceClass, url);
if(withMetaData){
invoker =new DelegateProviderMetaDataInvoker(invoker,this);
}
Exporter<?> exporter = port(invoker);
exporters.add(exporter);
}
可以看到最终通过
来进⾏发布,⽽这⾥的PROTOCOL:
private static final Protocol PROTOCOL = ExtensionLoader(Protocol.class).getAdaptiveExtension();
是Protocol$Adaptive:
public Exporter export(Invoker arg0)throws RpcException {
if(arg0 ==null)throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
Url()==null)throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
URL url = Url();
String extName =( Protocol()==null?"dubbo": Protocol());
if(extName ==null)throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url ("+ String()+") use keys ([protocol])");
Protocol extension =(ExtensionLoader(Protocol.class).g etExtension(extName);
port(arg0);
}
这⾥获取的Protocol则是获取URL中对应的protocol类型,⽽上⾯分析这⾥对应就是的registry,⽽Dubbo中的dubbo-registry中META-INF.dubbo.internal中org.apache.dubbo.rpc.Protocol⽂件中内容如下:
registry=InterfaceCompatibleRegistryProtocol
service-discovery-registry=RegistryProtocol
registry获取到的Protocol为InterfaceCompatibleRegistryProtocol,⽽其暴露服务在其⽗类RegistryProtocol助攻完成:
public<T> Exporter<T>export(final Invoker<T> originInvoker)throws RpcException {
URL registryUrl =getRegistryUrl(originInvoker);
URL providerUrl =getProviderUrl(originInvoker);
final URL overrideSubscribeUrl =getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener =new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl =overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
final ExporterChangeableWrapper<T> exporter =doLocalExport(originInvoker, providerUrl);
final Registry registry =getRegistry(registryUrl);
final URL registeredProviderUrl =getUrlToRegistry(providerUrl, registryUrl);
boolean register = Parameter(REGISTER_KEY,true);
if(register){
register(registry, registeredProviderUrl);
}
registerStatedUrl(registryUrl, registeredProviderUrl, register);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
notifyExport(exporter);
return new DestroyableExporter<>(exporter);
}
这⾥⾸先会将服务暴露出去:
final ExporterChangeableWrapper exporter = doLocalExport(originInvoker, providerUrl);
服务暴露成功后,在将服务注册到注册中⼼去:
register(registry, registeredProviderUrl);
private void register(Registry registry, URL registeredProviderUrl){
}
⽽这⾥具体执⾏的`Registry``则通过如下⽅式获得:
Registry registry =getRegistry(registryUrl);
protected Registry getRegistry(final URL registryUrl){
Registry(registryUrl);
}
⽽这⾥的registryFactory是RegistryProtocol⼀个属性,提供了get、set⽅法,我们需要注意的是,这⾥是通过SPI机制来加载RegistryProtocol,在加载RegistryProtocol的时候SPI机制同时会判断其属性有没有set⽅法,如果有set⽅法,会在⽤SPI机制加载属性对应的接⼝实现并注⼊。我们以nacos举例,对应加载的RegistryFactory为NacosRegistryFactory创建的为NacosRegistry.⽽在NacosRegistry通过NacosNamingService将服务信息注册到了Nacos中。
⽽我们来看看注册的时候注册了哪些东西:
public void doRegister(URL url){
final String serviceName =getServiceName(url);
final Instance instance =createInstance(url);
execute(namingService -> isterInstance(serviceName,
getUrl().getGroup(Constants.DEFAULT_GROUP), instance));
}
这⾥获取服务名称逻辑如下:
private String getServiceName(URL url, String category){
return category + SERVICE_NAME_SEPARATOR + ColonSeparatedKey();
}
public String getColonSeparatedKey(){
StringBuilder serviceNameBuilder =new StringBuilder();
serviceNameBuilder.ServiceInterface());
append(serviceNameBuilder, VERSION_KEY,false);
append(serviceNameBuilder, GROUP_KEY,false);
String();
}
可以看到,getServiceName返回的是⼀个多个部分拼接起来的字符串,包含如下部分:
类别,主要有providers,consumers,routers,configurators
接⼝全名称
版本号
分组
这⼏个信息⽤:连接。
⽽Instance创建逻辑如下:
private Instance createInstance(URL url){
// Append default category if absent
String category = Category(DEFAULT_CATEGORY);
URL newURL = url.addParameter(CATEGORY_KEY, category);
newURL = newURL.addParameter(PROTOCOL_KEY, Protocol());
newURL = newURL.addParameter(PATH_KEY, Path());
String ip = Host();
int port = Port();
Instance instance =new Instance();
instance.setIp(ip);
instance.setPort(port);
instance.setMetadata(new HashMap<>(Parameters()));
return instance;
}
然后将上述信息注册到Nacos中,这样就完成了服务端的服务注册。
需要注意的是,如果我们配置register=false的话,那么这⾥是不会注册到注册中⼼的,也就是我们可以在DubboService的属性中配
置register=false这样被注解的服务就不会注册到注册中⼼了。
服务发现
与服务注册类似,服务发现是在fer中实现的,通过之前
的分析,我们发现,消费端启动流程在fer中返回的是⼀个ServiceDiscoveryMigrationInvoker,⽽其触发消费端的注册和消费端的服务订阅则是在interceptInvoker:
protected<T> Invoker<T>interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl, URL registryURL){
List<RegistryProtocolListener> listeners =findRegistryProtocolListeners(url);
if(CollectionUtils.isEmpty(listeners)){
return invoker;
}
for(RegistryProtocolListener listener : listeners){
}
return invoker;
}
Dubbo中默认在MigrationRuleListener实现了onRefer的事件处理:
public void onRefer(RegistryProtocol registryProtocol, ClusterInvoker<?> invoker, URL consumerUrl, URL registryURL){
MigrationRuleHandler<?> migrationRuleHandler = handlersputeIfAbsent((MigrationInvoker<?>) invoker, _key ->{ ((MigrationInvoker<?>) invoker).setMigrationRuleListener(this);
return new MigrationRuleHandler<>((MigrationInvoker<?>) invoker, consumerUrl);
});
migrationRuleHandler.doMigrate(rule);
}
⽽这⾥最后会调⽤ServiceDiscoveryInvoker:
public<T> ClusterInvoker<T>getServiceDiscoveryInvoker(Cluster cluster, Registry registry, Class<T> type, URL url){
DynamicDirectory<T> directory =new ServiceDiscoveryRegistryDirectory<>(type, url);
return doCreateInvoker(directory, cluster, registry, type);
}
protected<T> ClusterInvoker<T>doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type){
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters =new HashMap<String, String>(ConsumerUrl().getParameters());
URL urlToRegistry =new ServiceConfigURL(
<(PROTOCOL_KEY)==null? DUBBO : (PROTOCOL_KEY),
if(directory.isShouldRegister()){
directory.setRegisteredConsumerUrl(urlToRegistry);
}
directory.buildRouterChain(urlToRegistry);
directory.subscribe(toSubscribeUrl(urlToRegistry));
return(ClusterInvoker<T>) cluster.join(directory);
}
可以看到,这⾥有⼀步和服务端⼀样,判断是否需要向注册中⼼注册消费端。
然后通过directory.subscribe去订阅注册中⼼。
public void subscribe(URL url){
setSubscribeUrl(url);
registry.subscribe(url,this);
}
这⾥获取的registry和服务端注册的时候获取的逻辑是⼀样的,

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。