nacos上的注册过的服务实例掉线分析
最近⽣产上的xxl_job框架的⼀个执⾏器(nacos客户端)因为分配内存不⼤,导致频繁与nacos服务端的连接断开,⽽断开之后虽然客户端服务没有宕掉,但是就是⽆法重新注册到nacos的服务端上去。
基于以上情况,我试着从nacos客户端注册与⼼跳检测⽅⾯跟⼀下源码。
1:⾸先打开nacos官⽹,查⼼跳⽅⾯的介绍知识
然后使⽤idea打开nacos源码(在github上下载),全局搜索/instance/beat,到发送⼼跳的⽅法
1  public JSONObject sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
2
3        if (NAMING_LOGGER.isDebugEnabled()) {
4            NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, String());
5        }
6        Map<String, String> params = new HashMap<String, String>(8);
7        String body = StringUtils.EMPTY;
8        if (!lightBeatEnabled) {
9            try {
10                body = "beat=" + JSONString(beatInfo), "UTF-8");
11            } catch (UnsupportedEncodingException e) {
12                throw new NacosException(NacosException.SERVER_ERROR, "encode beatInfo error", e);
13            }
14        }
15        params.put(CommonParams.NAMESPACE_ID, namespaceId);
namespace是干嘛的
16        params.put(CommonParams.SERVICE_NAME, ServiceName());
17        params.put(CommonParams.CLUSTER_NAME, Cluster());
18        params.put("ip", Ip());
19        params.put("port", String.Port()));
20        String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, body, HttpMethod.PUT);
21        return JSON.parseObject(result);
22    }
然后在服务端到nacos服务端接收到客户端⼼跳后操作逻辑如下:
1  @CanDistro
2    @PutMapping("/beat")
3    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
4    public JSONObject beat(HttpServletRequest request) throws Exception {
5
6        JSONObject result = new JSONObject();
7
8        result.put("clientBeatInterval", ClientBeatInterval());
9        String serviceName = quired(request, CommonParams.SERVICE_NAME);
10        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
11            Constants.DEFAULT_NAMESPACE_ID);
12        String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME,
13            UtilsAndCommons.DEFAULT_CLUSTER_NAME);
14        String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
15        int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
16        String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
17
18        RsInfo clientBeat = null;
19        if (StringUtils.isNotBlank(beat)) {
20            clientBeat = JSON.parseObject(beat, RsInfo.class);
21        }
22
23        if (clientBeat != null) {
24            if (StringUtils.Cluster())) {
25                clusterName = Cluster();
26            } else {
27                // fix #2533
28                clientBeat.setCluster(clusterName);
29            }
30            ip = Ip();
31            port = Port();
32        }
33
34        if (Loggers.SRV_LOG.isDebugEnabled()) {
35            Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
36        }
37
38        Instance instance = Instance(namespaceId, serviceName, clusterName, ip, port);
39
40        if (instance == null) {
41            if (clientBeat == null) {
42                result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
43                return result;
44            }
45            instance = new Instance();
46            instance.Port());
47            instance.Ip());
48            instance.Weight());
49            instance.Metadata());
50            instance.setClusterName(clusterName);
51            instance.setServiceName(serviceName);
52            instance.InstanceId());
53            instance.setEphemeral(clientBeat.isEphemeral());
54
55            isterInstance(namespaceId, serviceName, instance);
56        }
57
58        Service service = Service(namespaceId, serviceName);
59
60        if (service == null) {
61            throw new NacosException(NacosException.SERVER_ERROR,
62                "service not found: " + serviceName + "@" + namespaceId);
63        }
64        if (clientBeat == null) {
65            clientBeat = new RsInfo();
66            clientBeat.setIp(ip);
67            clientBeat.setPort(port);
68            clientBeat.setCluster(clusterName);
69        }
70        service.processClientBeat(clientBeat);
71
72        result.put(CommonParams.CODE, NamingResponseCode.OK);
73        result.put("clientBeatInterval", InstanceHeartBeatInterval());
74        result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
75        return result;
76    }
接下来,我们跟⼀下 service.processClientBeat(clientBeat)这个⽅法,看看⾥⾯的业务逻辑到底是怎样的?
public void processClientBeat(final RsInfo rsInfo) {
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
再跟⼀下:
@Override
public void run() {
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", String());
}
String ip = Ip();
String clusterName = Cluster();
int port = Port();
Cluster cluster = ClusterMap().get(clusterName);
List<Instance> instances = cluster.allIPs(true);
for (Instance instance : instances) {
if (Ip().equals(ip) && Port() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", String());
}
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {
if (!instance.isHealthy()) {
instance.setHealthy(true);
Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
getPushService().serviceChanged(service);
}
}
}
}
}
到这⾥,我发现nacos服务端接收到客户端服务实例的信息后,主要是做了⼼跳的最新时间更新和健康状态更新,我去nacos相关的数据库
表⾥⾯看了⼀下相关信息
看⼀下删除实例的逻辑:
@Override
public void run() {
try {
if (!getDistroMapper().Name())) {
return;
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
for (Instance instance : instances) {
if (System.currentTimeMillis() - LastBeat() > InstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",                                Ip(), Port(), ClusterName(), Name(),
UtilsAndCommons.LOCALHOST_SITE, InstanceHeartBeatTimeOut(), LastB
eat());
getPushService().serviceChanged(service);
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
if (System.currentTimeMillis() - LastBeat() > IpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", Name(), JSONString(instance));
deleteIP(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。