服务注册和发现之Eureka原理篇
概念
在传统应⽤组件间调⽤,通过接⼝规范约束来实现的,从⽽实现不同模块间良好协作;但是被拆分成微服务后,每个微服务实例的数量和⽹络地址都可能动态变化,使得原来硬编码的地址极不⽅便,故需要⼀个中⼼化的组件来进⾏服务的登记和管理。
服务注册中⼼:实现服务治理,管理所有的服务信息和状态。
注册中⼼好处:不⽤关⼼服务提供⽅数量、地址等细节。
注册中⼼技术栈:Eureka、Nacos、Consul、Zookeeper等。
服务注册与发现包括两部分:⼀个是服务器端,另⼀个是客户端
Server是⼀个公共服务,为Client提供服务注册和发现的功能,维护注册到⾃⾝的Client的相关信息,同时提供接⼝给Client获取注册表中其他服务的信息,使得动态变化的Client 能够进⾏服务间的相互调⽤。
Client将⾃⼰的服务信息通过⼀定的⽅式登记到Server上,并在正常范围内维护⾃⼰信息⼀致性,⽅便其
他服务发现⾃⼰,同时可以通过Server获取到⾃⼰依赖的其他服务信息,完成服务调⽤,还内置了负载均衡器,⽤来进⾏基本的负载均衡。
Spring Cloud以Eureka作为服务注册中⼼,是⼀个RESTful风格服务,是服务注册和发现的基础组件,它屏蔽了Server和Client的交互细节,使得开发者将精⼒放在业务上。
使⽤
Eureka单节点搭建
服务端
1. 引⼊pom
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
2. l
eureka:
client:
#是否将⾃⼰注册到Eureka Server,默认为true,由于当前就是server,故⽽设置成false,表明该服务不会向eureka注册⾃⼰的信息
register-with-eureka: false
#是否从eureka server获取注册信息,由于单节点,不需要同步其他节点数据,⽤false
fetch-registry: false
#设置服务注册中⼼的URL,⽤于client和server端交流
service-url:
defaultZone: username:pwd@localhost:7901/eureka/
3. 代码
//启动类增加@EnableEurekaServer 标识该服务为注册中⼼服务端
@SpringBootApplication
public class EurekaServerApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaServerApplication.class, args);
}
}
客户端
1. 引⼊pom
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
2. l
#注册中⼼
eureka:
client:
#设置服务注册中⼼的URL
service-url:
defaultZone: username:pwd@localhost:7901/eureka/
Eureka也⽀持⾼可⽤,集环境搭建与单节点环境类似,需要注意客户端配置地址defaultZone时候,尽量写多个地址(写⼀个也⾏,EurekaServer注册表会⾃动同步,避免极端情况数据同步(Eureka是AP模型)不及时)
原理
注册中⼼本质:存储每个客户端的注册信息,EurekaClient从EurekaServer同步获取服务注册列表,通过⼀定的规则选择⼀个服务进⾏调⽤。Eureka架构图如下:
服务提供者:是⼀个Eureka client,向 Eureka Server注册和更细⾃⼰的信息,同时能从Eureka Server注册表中获取到其他服务的信息。
服务注册中⼼:提供服务注册和发现的功能。每个Eureka Client向Eureka Server注册⾃⼰的信息,也可以通过Eureka Server获取到其他服务的信息达到发现和调⽤其他服务的⽬的。
服务消费者:是⼀个Eureka client,通过Eureka Server获取注册的其他服务信息,从⽽到所需要的服务发起远程调⽤。
注册:服务提供者向Eureka Server端注册⾃⾝的元数据以供服务发现。
续约:通过发送⼼跳到Server以维持和更新注册表中服务实例元数据的有效性。在⼀定时长内,Server没有收到Client的⼼跳信息,将默认下线,会把服务实例信息从注册表中删除。
下线:服务提供⽅在关闭时候主动向Server注销服务实例元数据,这时服务提供⽅实例数据将从Server的注册表中删除。
获取注册表:服务消费者Client向Server请求注册表信息,⽤于服务发现,从⽽发起远程调⽤。
源码
Eureka Client
Eureka Client⼯作流程
Eureka Client通过SpringBoot⾃动装配,加载相关类,META-INF/spring.factories如下配置:
EurekaClientAutoConfiguration:Eureka client⾃动配置类,负责client中关键beans的配置和初始化
RibbonEurekaAutoConfiguration:Ribbon负载均衡相关配置
EurekaDiscoveryClientConfiguration:配置⾃动注册和应⽤的健康检查器
Eureka 相关配置类
//EurekaClientConfigBean:封装了Eureka Client和Eureka Server交互所需要的配置信息
//EurekaInstanceConfigBean:封装了EurekaClient⾃⾝服务实例的配置信息,主要⽤于构建InstanceInfo
public void initComponent(EurekaInstanceConfig config) {
throw new RuntimeException("Failed to initialize ApplicationInfoManager", e);
}
}
Eureka客户端comflix.discovery.DiscoveryClient
该类是Eureka Client核⼼类实现了EurekaClient接⼝(EurekaClient继承LookupService接⼝),关键代码如下:
//构造⽅法
DiscoveryClient(ApplicationInfoManager applicationInfoManager,EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { //初始化⼀些参数线程池等
...
//1.根据fetch-registry配置参数拉取注册表信息
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
/
/2.根据register-with-eureka配置参数向服务端注册
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
<("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
/
/3.初始化定时任务①⽤于发送⼼跳②⽤于刷新缓存③按需注册事件向server注册
initScheduledTasks();
...
}
拉取注册表信息
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
if (clientConfig.shouldDisableDelta()
|| (!Strings.RegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (RegisteredApplications().size() == 0)
|| (Version() == -1))
{
...
//全量拉取
getAndStoreFullRegistry();
} else {
//增量拉取
getAndUpdateDelta(applications);
}
applications.ReconcileHashCode());
logTotalInstances();
.
..
}
//拉取⽅式有两种:全量和增量
//1.全量拉取
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = ();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = RegistryRefreshSingleVipAddress() == null
? ())
:
RegistryRefreshSingleVipAddress(), ());
if (StatusCode() == StatusCode()) {
apps = Entity();
}
logger.info("The response status is {}", StatusCode());
if (apps == null) {
<("The application is null for some reason. Not storing this information");
//注意:AtomicLong cas进⾏版本管理
} else if (fetchRegistryGenerationpareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
//保留UP状态服务
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", AppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
//2.增量拉取
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = ();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = ());
if (StatusCode() == StatusCode()) {
delta = Entity();
}
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGenerationpareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", AppsHashCode());
String reconcileHashCode = "";
if (Lock()) {
try {
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
if (!reconcileHashCode.AppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", AppsHashCode());
boolean register() throws Throwable {
logger.info(PREFIX + "{}: ", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
//发送InstanceInfo信息到Server端
httpResponse = ister(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, StatusCode());
}
StatusCode() == Status.StatusCode();
}
初始化三个定时任务
private void initScheduledTasks() {
/
/1.根据fetch-registry配置参数开启定时任务刷新client注册表缓存
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = RegistryFetchIntervalSeconds();
int expBackOffBound = CacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
//2.根据register-with-eureka配置参数开启⼼跳监测
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = LeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = HeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
//3.1 注册状态改变,在应⽤状态发⽣变化时,刷新服务实例信息,在服务实例信息发⽣改变时向server注册
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == Status() ||
InstanceStatus.DOWN == PreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
}
//3.2 定时刷新服务实例信息和检查应⽤状态的变化,在服务实例信息发⽣改变的情况下向server重新发起注册
instanceInfoReplicator.InitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
续租
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
//服务端发送⼼跳包
httpResponse = istrationClient.AppName(), Id(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, StatusCode());
if (StatusCode() == Status.StatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, AppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
//若不存在则发送注册
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
StatusCode() == StatusCode();
}
服务下线
@PreDestroy
@Override
public synchronized void shutdown() {
if (isShutdownpareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.Id());
}
//清除定时任务
cancelScheduledTasks();
//取消注册
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
restful接口调用实例applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
//关闭与服务端通讯连接
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
Monitors.unregisterObject(this);
logger.info("Completed shut down of DiscoveryClient");
}
}
Eureka Server
Server主要有以下⼏个功能:接受服务注册、接受服务⼼跳、服务剔除、服务下线、集同步、获取注册表中服务实例信息
Eureka Server同时也是⼀个Eureka Client,在不禁⽌Eureka Server的客户端⾏为时,它会向它配置⽂件中的其他Eureka Server进⾏拉取注册表、服务注册和发送⼼跳等操作。
Eureka Server通过SpringBoot⾃动装配,加载相关类,META-INF/spring.factories如下配置:
EurekaServerAutoConfiguration:向spring的bean⼯⼚添加eureka-server相关功能的bean
@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
}
EurekaServerAutoConfiguration⽣效条件 EurekaServerMarkerConfiguration.Marker所以服务端需要加上@EnableEurekaServer注解
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}
Eureka Server初始化类 EurekaServerInitializerConfiguration
public void start() {
new Thread(() -> {
try {
//初始化eureka的上下⽂环境
EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
//发布服务端可注册事件通知
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
//发布Eureka Server已启动事件通知
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
/
/ Help!
<("Could not initialize Eureka servlet context", ex);
}
}).start();
}
EurekaServerBootstrap:启动类
public void contextInitialized(ServletContext context) {
try {
//初始化eureka环境
initEurekaEnvironment();
//初始化eureka context 其中两个主要功能如下
initEurekaServerContext();
context.setAttribute(Name(), this.serverContext);
}
catch (Throwable e) {
<("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
XStream.PRIORITY_VERY_HIGH);
XStream.PRIORITY_VERY_HIGH);
if (isAws(Info())) {
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论