【Nacos】源码之服务端AP架构集群节点的心跳检测

【Nacos】源码之服务端AP架构集群节点的心跳检测当Nacos服务端启动时怎么知道集群中有哪些节点?当新的节点加入集群或者集群中有节点下线了,集群之间可以通过健康检查发现。健康检查的频率是怎么样的?节点的状态又是如何变动的?状态的变动又会触发什么动作。在配置集群时,会在配置文件cluster.conf中指定集群中各个节点的IP和端口,Nacos服务端启动时会读取这个配置文件并解析,下面来看看这个解析过程。com.alibaba.nacos.core.cluster.ServerMemberManager#ServerMemberManagerServer

【Nacos】源码之服务端AP架构集群节点的心跳检测"

当Nacos服务端启动时怎么知道集群中有哪些节点?当新的节点加入集群或者集群中有节点下线了,集群之间可以通过健康检查发现。健康检查的频率是怎么样的?节点的状态又是如何变动的?状态的变动又会触发什么动作。

当Nacos服务端启动时怎么知道集群中有哪些节点?

在配置集群时,会在配置文件cluster.conf中指定集群中各个节点的IP和端口,Nacos服务端启动时会读取这个配置文件并解析,下面来看看这个解析过程。

com.alibaba.nacos.core.cluster.ServerMemberManager#ServerMemberManager

public ServerMemberManager(ServletContext servletContext) throws Exception { 
   
	this.serverList = new ConcurrentSkipListMap<>();
	EnvUtil.setContextPath(servletContext.getContextPath());

	init();
}

protected void init() throws NacosException { 
   
	Loggers.CORE.info("Nacos-related cluster resource initialization");
	this.port = EnvUtil.getProperty("server.port", Integer.class, 8848);
	this.localAddress = InetUtils.getSelfIP() + ":" + port;
	this.self = MemberUtil.singleParse(this.localAddress);
	this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);
	serverList.put(self.getAddress(), self);

	// register NodeChangeEvent publisher to NotifyManager
	// 注册MembersChangeEvent事件
	registerClusterEvent();

	// Initializes the lookup mode
	// 初始化节点
	initAndStartLookup();

	if (serverList.isEmpty()) { 
   
		throw new NacosException(NacosException.SERVER_ERROR, "cannot get serverlist, so exit.");
	}

	Loggers.CORE.info("The cluster resource is initialized");
}

ServerMemberManager#registerClusterEvent

注册MembersChangeEvent的Publisher。

监听IPChangeEvent事件。

com.alibaba.nacos.core.cluster.ServerMemberManager#registerClusterEvent

private void registerClusterEvent() { 
   
	// Register node change events
	NotifyCenter.registerToPublisher(MembersChangeEvent.class,
									 EnvUtil.getProperty("nacos.member-change-event.queue.size", Integer.class, 128));

	// The address information of this node needs to be dynamically modified
	// when registering the IP change of this node
	NotifyCenter.registerSubscriber(new Subscriber<InetUtils.IPChangeEvent>() { 
   
		@Override
		public void onEvent(InetUtils.IPChangeEvent event) { 
   
			String newAddress = event.getNewIP() + ":" + port;
			ServerMemberManager.this.localAddress = newAddress;
			EnvUtil.setLocalAddress(localAddress);

			Member self = ServerMemberManager.this.self;
			self.setIp(event.getNewIP());

			String oldAddress = event.getOldIP() + ":" + port;
			// 维护服务列表
			ServerMemberManager.this.serverList.remove(oldAddress);
			ServerMemberManager.this.serverList.put(newAddress, self);

			ServerMemberManager.this.memberAddressInfos.remove(oldAddress);
			ServerMemberManager.this.memberAddressInfos.add(newAddress);
		}

		@Override
		public Class<? extends Event> subscribeType() { 
   
			return InetUtils.IPChangeEvent.class;
		}
	});
}

ServerMemberManager#initAndStartLookup

com.alibaba.nacos.core.cluster.ServerMemberManager#initAndStartLookup

private void initAndStartLookup() throws NacosException { 
   
	this.lookup = LookupFactory.createLookUp(this);
	/** * @see com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup#start() */
	this.lookup.start();
}

FileConfigMemberLookup#start

com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup#start

public void start() throws NacosException { 
   
	if (start.compareAndSet(false, true)) { 
   
		// 读取cluster.conf文件
		readClusterConfFromDisk();

		// Use the inotify mechanism to monitor file changes and automatically
		// trigger the reading of cluster.conf
		try { 
   
			// 监听文件的变化
			WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
		} catch (Throwable e) { 
   
			Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());
		}
	}
}

FileConfigMemberLookup#readClusterConfFromDisk

com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup#readClusterConfFromDisk

private void readClusterConfFromDisk() { 
   
	Collection<Member> tmpMembers = new ArrayList<>();
	try { 
   
		List<String> tmp = EnvUtil.readClusterConf();
		// 读取cluster.conf文件
		tmpMembers = MemberUtil.readServerConf(tmp);
	} catch (Throwable e) { 
   
		Loggers.CLUSTER
			.error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());
	}

	/** * 发布MembersChangeEvent事件 */
	afterLookup(tmpMembers);
}

AbstractMemberLookup#afterLookup

com.alibaba.nacos.core.cluster.AbstractMemberLookup#afterLookup

public void afterLookup(Collection<Member> members) { 
   
	this.memberManager.memberChange(members);
}

ServerMemberManager#memberChange

com.alibaba.nacos.core.cluster.ServerMemberManager#memberChange

synchronized boolean memberChange(Collection<Member> members) { 
   

	if (members == null || members.isEmpty()) { 
   
		return false;
	}

	// 判断自己是否在集群中
	boolean isContainSelfIp = members.stream()
		.anyMatch(ipPortTmp -> Objects.equals(localAddress, ipPortTmp.getAddress()));

	if (isContainSelfIp) { 
   
		isInIpList = true;
	} else { 
   
		isInIpList = false;
		// 如果自己不在集群中,把自己加入
		members.add(this.self);
		Loggers.CLUSTER.warn("[serverlist] self ip {} not in serverlist {}", self, members);
	}

	// If the number of old and new clusters is different, the cluster information
	// must have changed; if the number of clusters is the same, then compare whether
	// there is a difference; if there is a difference, then the cluster node changes
	// are involved and all recipients need to be notified of the node change event

	// 判断集群的状态是否已变更
	boolean hasChange = members.size() != serverList.size();
	ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap<>();
	Set<String> tmpAddressInfo = new ConcurrentHashSet<>();
	for (Member member : members) { 
   
		final String address = member.getAddress();

		if (!serverList.containsKey(address)) { 
   
			hasChange = true;
			// If the cluster information in cluster.conf or address-server has been changed,
			// while the corresponding nacos-server has not been started yet, the member's state
			// should be set to DOWN. If the corresponding nacos-server has been started, the
			// member's state will be set to UP after detection in a few seconds.
			member.setState(NodeState.DOWN);
		} else { 
   
			//fix issue # 4925
			member.setState(serverList.get(address).getState());
		}

		// Ensure that the node is created only once
		tmpMap.put(address, member);
		if (NodeState.UP.equals(member.getState())) { 
   
			tmpAddressInfo.add(address);
		}
	}

	serverList = tmpMap;
	memberAddressInfos = tmpAddressInfo;

	Collection<Member> finalMembers = allMembers();

	Loggers.CLUSTER.warn("[serverlist] updated to : {}", finalMembers);

	// Persist the current cluster node information to cluster.conf
	// <important> need to put the event publication into a synchronized block to ensure
	// that the event publication is sequential
	if (hasChange) { 
   
		MemberUtil.syncToFile(finalMembers);
		// 发布MembersChangeEvent事件
		Event event = MembersChangeEvent.builder().members(finalMembers).build();
		NotifyCenter.publishEvent(event);
	}

	return hasChange;
}

集群间的节点怎么维持心跳?

ServerMemberManager监听了Spring Boot启动过程中发出的WebServerInitializedEvent事件,然后启动集群节点之间的健康检查任务MemberInfoReportTask。

com.alibaba.nacos.core.cluster.ServerMemberManager#onApplicationEvent

public void onApplicationEvent(WebServerInitializedEvent event) { 
   
	getSelf().setState(NodeState.UP);
	if (!EnvUtil.getStandaloneMode()) { 
   
		// 发送服务节点之间的心跳包
		GlobalExecutor.scheduleByCommon(this.infoReportTask, 5_000L);
	}
	EnvUtil.setPort(event.getWebServer().getPort());
	EnvUtil.setLocalAddress(this.localAddress);
	Loggers.CLUSTER.info("This node is ready to provide external services");
}

下面分析MemberInfoReportTask任务的执行过程。

Task#run

MemberInfoReportTask实现了Task,Task实现了Runnable接口,在Task中会调用子类的方法executeBody()。

com.alibaba.nacos.core.cluster.Task#run

    public void run() { 
   
        if (shutdown) { 
   
            return;
        }
        try { 
   
            executeBody();
        } catch (Throwable t) { 
   
            Loggers.CORE.error("this task execute has error : {}", ExceptionUtil.getStackTrace(t));
        } finally { 
   
            if (!shutdown) { 
   
                after();
            }
        }
    }

MemberInfoReportTask#executeBody

遍历集群中的所有的节点,给每个节点发送心跳包。

com.alibaba.nacos.core.cluster.ServerMemberManager.MemberInfoReportTask#executeBody

protected void executeBody() { 
   
	// 获取除自己外的所有节点
	List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();

	if (members.isEmpty()) { 
   
		return;
	}

	// 每次+1
	this.cursor = (this.cursor + 1) % members.size();
	Member target = members.get(cursor);

	Loggers.CLUSTER.debug("report the metadata to the node : {}", target.getAddress());

	// /nacos/v1/core/cluster/report
	final String url = HttpUtils
		.buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT,
				  "/cluster/report");

	try { 
   
		Header header = Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version);
		AuthHeaderUtil.addIdentityToHeader(header);
		asyncRestTemplate
			.post(url, header,
				  Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() { 
   
					  @Override
					  public void onReceive(RestResult<String> result) { 
   
						  if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value()
							  || result.getCode() == HttpStatus.NOT_FOUND.value()) { 
   
							  Loggers.CLUSTER
								  .warn("{} version is too low, it is recommended to upgrade the version : {}",
										target, VersionUtils.version);
							  return;
						  }
						  if (result.ok()) { 
   
							  // 成功
							  MemberUtil.onSuccess(ServerMemberManager.this, target);
						  } else { 
   
							  Loggers.CLUSTER
								  .warn("failed to report new info to target node : {}, result : {}",
										target.getAddress(), result);
							  // 失败
							  MemberUtil.onFail(ServerMemberManager.this, target);
						  }
					  }

					  @Override
					  public void onError(Throwable throwable) { 
   
						  Loggers.CLUSTER
							  .error("failed to report new info to target node : {}, error : {}",
									 target.getAddress(),
									 ExceptionUtil.getAllExceptionMsg(throwable));
						  // 失败
						  MemberUtil.onFail(ServerMemberManager.this, target, throwable);
					  }

					  @Override
					  public void onCancel() { 
   

					  }
				  });
	} catch (Throwable ex) { 
   
		Loggers.CLUSTER.error("failed to report new info to target node : {}, error : {}", target.getAddress(),
							  ExceptionUtil.getAllExceptionMsg(ex));
	}
}

MemberUtil#onSuccess

com.alibaba.nacos.core.cluster.MemberUtil#onSuccess

public static void onSuccess(final ServerMemberManager manager, final Member member) { 
   
	final NodeState old = member.getState();
	manager.getMemberAddressInfos().add(member.getAddress());
	// 将节点状态改为UP
	member.setState(NodeState.UP);
	member.setFailAccessCnt(0);
	if (!Objects.equals(old, member.getState())) { 
   
		// 发布MembersChangeEvent事件
		manager.notifyMemberChange();
	}
}

MemberUtil#onFail

com.alibaba.nacos.core.cluster.MemberUtil#onFail(com.alibaba.nacos.core.cluster.ServerMemberManager, com.alibaba.nacos.core.cluster.Member, java.lang.Throwable)

public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex) { 
   
	manager.getMemberAddressInfos().remove(member.getAddress());
	final NodeState old = member.getState();
	// 将节点状态改为SUSPICIOUS
	member.setState(NodeState.SUSPICIOUS);
	member.setFailAccessCnt(member.getFailAccessCnt() + 1);
	int maxFailAccessCnt = EnvUtil.getProperty("nacos.core.member.fail-access-cnt", Integer.class, 3);

	// If the number of consecutive failures to access the target node reaches
	// a maximum, or the link request is rejected, the state is directly down
	if (member.getFailAccessCnt() > maxFailAccessCnt || StringUtils
		.containsIgnoreCase(ex.getMessage(), TARGET_MEMBER_CONNECT_REFUSE_ERRMSG)) { 
   
		// 错误次数超过3次,将节点状态改为DOWN
		member.setState(NodeState.DOWN);
	}
	if (!Objects.equals(old, member.getState())) { 
   
		// 发布MembersChangeEvent事件
		manager.notifyMemberChange();
	}
}

节点对心跳包的处理

集群的节点之间每隔5s会给集群中的其他节点发送心跳包,下面来看看节点收到这个心跳包后是怎么处理的。

请求的接口地址:/nacos/v1/core/cluster/report。

com.alibaba.nacos.core.controller.NacosClusterController#report

@PostMapping(value = { 
   "/report"})
public RestResult<String> report(@RequestBody Member node) { 
   
	// 服务接收其他节点心跳包入口
	if (!node.check()) { 
   
		return RestResultUtils.failedWithMsg(400, "Node information is illegal");
	}
	LoggerUtils.printIfDebugEnabled(Loggers.CLUSTER, "node state report, receive info : {}", node);
	node.setState(NodeState.UP);
	node.setFailAccessCnt(0);

	// 更新节点状态
	boolean result = memberManager.update(node);

	return RestResultUtils.success(Boolean.toString(result));
}

com.alibaba.nacos.core.cluster.ServerMemberManager#update

public boolean update(Member newMember) { 
   
	Loggers.CLUSTER.debug("member information update : {}", newMember);

	String address = newMember.getAddress();
	if (!serverList.containsKey(address)) { 
   
		return false;
	}

	serverList.computeIfPresent(address, (s, member) -> { 
   
		if (NodeState.DOWN.equals(newMember.getState())) { 
   
			memberAddressInfos.remove(newMember.getAddress());
		}
		boolean isPublishChangeEvent = MemberUtil.isBasicInfoChanged(newMember, member);
		newMember.setExtendVal(MemberMetaDataConstants.LAST_REFRESH_TIME, System.currentTimeMillis());
		MemberUtil.copy(newMember, member);
		if (isPublishChangeEvent) { 
   
			// member basic data changes and all listeners need to be notified
			// 如果节点有变更,发布MembersChangeEvent事件
			notifyMemberChange();
		}
		return member;
	});

	return true;
}

今天的文章【Nacos】源码之服务端AP架构集群节点的心跳检测分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/32145.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注