面试篇-Nacos源码分析
1. 下载Nacos源码并运行
要研究Nacos源码自然不能用打包好的Nacos服务端jar包来运行,需要下载源码自己编译来运行。
1.1 下载Nacos源码
Nacos的GitHub地址:https://github.com/alibaba/nacos
课前资料中已经提供了下载好的1.4.2版本的Nacos源码:

如果需要研究其他版本的同学,也可以自行下载:
大家找到其release页面:https://github.com/alibaba/nacos/tags,找到其中的1.4.2.版本:

点击进入后,下载Source code(zip):

1.2 导入Demo工程
我们的课前资料提供了一个微服务Demo,包含了服务注册、发现等业务。

导入该项目后,查看其项目结构:

结构说明:
cloud-source-demo:项目父目录
cloud-demo:微服务的父工程,管理微服务依赖
order-service:订单微服务,业务中需要访问user-service,是一个服务消费者
user-service:用户微服务,对外暴露根据id查询用户的接口,是一个服务提供者
1.3 导入Nacos源码
将之前下载好的Nacos源码解压到cloud-source-demo项目目录中:

然后,使用IDEA将其作为一个module来导入:
1)选择项目结构选项:

然后点击导入module:

在弹出窗口中,选择nacos源码目录:

然后选择maven模块,finish:

最后,点击OK即可:

导入后的项目结构:

1.4 proto编译
Nacos底层的数据通信会基于protobuf对数据做序列化和反序列化。并将对应的proto文件定义在了consistency这个子模块中:

我们需要先将proto文件编译为对应的Java代码。
1.4.1 什么是protobuf
protobuf的全称是Protocol Buffer,是Google提供的一种数据序列化协议,这是Google官方的定义:
Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据序列化,很适合做数据存储或 RPC 数据交换格式。它可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。
可以简单理解为,是一种跨语言、跨平台的数据传输格式。与json的功能类似,但是无论是性能,还是数据大小都比json要好很多。
protobuf的之所以可以跨语言,就是因为数据定义的格式为.proto格式,需要基于protoc编译为对应的语言。
1.4.2 安装protoc
Protobuf的GitHub地址:https://github.com/protocolbuffers/protobuf/releases
我们可以下载windows版本的来使用:

另外,课前资料也提供了下载好的安装包:

解压到任意非中文目录下,其中的bin目录中的protoc.exe可以帮助我们编译:

然后将这个bin目录配置到你的环境变量path中,可以参考JDK的配置方式:

1.4.3 编译proto
进入nacos-1.4.2的consistency模块下的src/main目录下:

然后打开cmd窗口,运行下面的两个命令:
如图:

会在nacos的consistency模块中编译出这些java代码:

1.5 运行
nacos服务端的入口是在console模块中的Nacos类:

我们需要让它单机启动:

然后新建一个SpringBootApplication:

然后填写应用信息:

然后运行Nacos这个main函数:

将order-service和user-service服务启动后,可以查看nacos控制台:

2. 服务注册
服务注册到Nacos以后,会保存在一个本地注册表中,其结构如下:

首先最外层是一个Map,结构为:Map<String, Map<String, Service>>:
key:是namespace_id,起到环境隔离的作用。namespace下可以有多个group
value:又是一个
Map<String, Service>,代表分组及组内的服务。一个组内可以有多个服务key:代表group分组,不过作为key时格式是group_name:service_name
value:分组下的某一个服务,例如userservice,用户服务。类型为
Service,内部也包含一个Map<String,Cluster>,一个服务下可以有多个集群key:集群名称
value:
Cluster类型,包含集群的具体信息。一个集群中可能包含多个实例,也就是具体的节点信息,其中包含一个Set<Instance>,就是该集群下的实例的集合Instance:实例信息,包含实例的IP、Port、健康状态、权重等等信息
每一个服务去注册到Nacos时,就会把信息组织并存入这个Map中。
2.1 服务注册接口
Nacos提供了服务注册的API接口,客户端只需要向该接口发送请求,即可实现服务注册。
接口说明: 注册一个实例到Nacos服务。
请求类型:POST
请求路径:/nacos/v1/ns/instance
请求参数:
ip
字符串
是
服务实例IP
port
int
是
服务实例port
namespaceId
字符串
否
命名空间ID
weight
double
否
权重
enabled
boolean
否
是否上线
healthy
boolean
否
是否健康
metadata
字符串
否
扩展信息
clusterName
字符串
否
集群名
serviceName
字符串
是
服务名
groupName
字符串
否
分组名
ephemeral
boolean
否
是否临时实例
错误编码:
400
Bad Request
客户端请求中的语法错误
403
Forbidden
没有权限
404
Not Found
无法找到资源
500
Internal Server Error
服务器内部错误
200
OK
正常
2.2 客户端
首先,我们需要找到服务注册的入口。
2.2.1 NacosServiceRegistryAutoConfiguration
因为Nacos的客户端是基于SpringBoot的自动装配实现的,我们可以在nacos-discovery依赖:
spring-cloud-starter-alibaba-nacos-discovery-2.2.6.RELEASE.jar
这个包中找到Nacos自动装配信息:

可以看到,有很多个自动配置类被加载了,其中跟服务注册有关的就是NacosServiceRegistryAutoConfiguration这个类,我们跟入其中。
可以看到,在NacosServiceRegistryAutoConfiguration这个类中,包含一个跟自动注册有关的Bean:

2.2.2 NacosAutoServiceRegistration
NacosAutoServiceRegistration源码如图:

可以看到在初始化时,其父类AbstractAutoServiceRegistration也被初始化了。
AbstractAutoServiceRegistration如图:

可以看到它实现了ApplicationListener接口,监听Spring容器启动过程中的事件。
在监听到WebServerInitializedEvent(web服务初始化完成)的事件后,执行了bind 方法。

其中的bind方法如下:
其中的start方法流程:
其中最关键的register()方法就是完成服务注册的关键,代码如下:
此处的this.serviceRegistry就是NacosServiceRegistry:

2.2.3 NacosServiceRegistry
NacosServiceRegistry是Spring的ServiceRegistry接口的实现类,而ServiceRegistry接口是服务注册、发现的规约接口,定义了register、deregister等方法的声明。
而NacosServiceRegistry对register的实现如下:
可以看到方法中最终是调用NamingService的registerInstance方法实现注册的。
而NamingService接口的默认实现就是NacosNamingService。
2.2.4 NacosNamingService
NacosNamingService提供了服务注册、订阅等功能。
其中registerInstance就是注册服务实例,源码如下:
最终,由NacosProxy的registerService方法,完成服务注册。
代码如下:
这里提交的信息就是Nacos服务注册接口需要的完整参数,核心参数有:
namespace_id:环境
service_name:服务名称
group_name:组名称
cluster_name:集群名称
ip: 当前实例的ip地址
port: 当前实例的端口
而在NacosNamingService的registerInstance方法中,有一段是与服务心跳有关的代码,我们在后续会继续学习。

2.2.5 客户端注册的流程图
如图:

2.3 服务端
在nacos-console的模块中,会引入nacos-naming这个模块:

模块结构如下:

其中的com.alibaba.nacos.naming.controllers包下就有服务注册、发现等相关的各种接口,其中的服务注册是在InstanceController类中:

2.3.1 InstanceController
进入InstanceController类,可以看到一个register方法,就是服务注册的方法了:
这里,进入到了serviceManager.registerInstance()方法中。
2.3.2 ServiceManager
ServiceManager就是Nacos中管理服务、实例信息的核心API,其中就包含Nacos的服务注册表:

而其中的registerInstance方法就是注册服务实例的方法:
创建好了服务,接下来就要添加实例到服务中:
该方法中对修改服务列表的动作加锁处理,确保线程安全。而在同步代码块中,包含下面几步:
1)先获取要更新的实例列表,
addIpAddresses(service, ephemeral, ips);2)然后将更新后的数据封装到
Instances对象中,后面更新注册表时使用3)最后,调用
consistencyService.put()方法完成Nacos集群的数据同步,保证集群一致性。
注意:在第1步的addIPAddress中,会拷贝旧的实例列表,添加新实例到列表中。在第3步中,完成对实例状态更新后,则会用新列表直接覆盖旧实例列表。而在更新过程中,旧实例列表不受影响,用户依然可以读取。
这样在更新列表状态过程中,无需阻塞用户的读操作,也不会导致用户读取到脏数据,性能比较好。这种方案称为CopyOnWrite方案。
2.3.2.1 更服务列表
我们来看看实例列表的更新,对应的方法是addIpAddresses(service, ephemeral, ips);:
继续进入updateIpAddresses方法:
简单来讲,就是先获取旧的实例列表,然后把新的实例信息与旧的做对比,新的实例就添加,老的实例同步ID。然后返回最新的实例列表。
2.3.5.2 Nacos集群一致性
在完成本地服务列表更新后,Nacos又实现了集群一致性更新,调用的是:
consistencyService.put(key, instances);
这里的ConsistencyService接口,代表集群一致性的接口,有很多中不同实现:

我们进入DelegateConsistencyServiceImpl来看:
其中的mapConsistencyService(key)方法就是选择委托方式的:
默认情况下,所有实例都是临时实例,我们关注DistroConsistencyServiceImpl即可。
2.3.4 DistroConsistencyServiceImpl
我们来看临时实例的一致性实现:DistroConsistencyServiceImpl类的put方法:
这里方法只有两行:
onPut(key, value):其中value就是Instances,要更新的服务信息。这行主要是基于线程池方式,异步的将Service信息写入注册表中(就是那个多重Map)distroProtocol.sync():就是通过Distro协议将数据同步给集群中的其它Nacos节点
我们先看onPut方法
2.3.4.1 更新本地实例列表
1)放入阻塞队列
onPut方法如下:
notifier的类型就是DistroConsistencyServiceImpl.Notifier,内部维护了一个阻塞队列,存放服务列表变更的事件:

addTask时,将任务加入该阻塞队列:
2)Notifier异步更新
同时,notifier还是一个Runnable,通过一个单线程的线程池来不断从阻塞队列中获取任务,执行服务列表的更新。来看下其中的run方法:
来看看handle方法:
3)覆盖实例列表
而在Service的onChange方法中,就可以看到更新实例列表的逻辑了:
updateIPs方法:
在第45行的代码中:clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
就是在更新注册表:
2.3.4.2 集群数据同步
在DistroConsistencyServiceImpl的put方法中分为两步:

其中的onPut方法已经分析过了。
下面的distroProtocol.sync()就是集群同步的逻辑了。
DistroProtocol类的sync方法如下:
其中同步的任务封装为一个DistroDelayTask对象。
交给了distroTaskEngineHolder.getDelayTaskExecuteEngine()执行,这行代码的返回值是:
NacosDelayTaskExecuteEngine,这个类维护了一个线程池,并且接收任务,执行任务。
执行任务的方法为processTasks()方法:
可以看出来基于Distro模式的同步是异步进行的,并且失败时会将任务重新入队并充实,因此不保证同步结果的强一致性,属于AP模式的一致性策略。
2.3.5 服务端流程图

2.4 总结
Nacos的注册表结构是什么样的?
答:Nacos是多级存储模型,最外层通过namespace来实现环境隔离,然后是group分组,分组下就是服务,一个服务有可以分为不同的集群,集群中包含多个实例。因此其注册表结构为一个Map,类型是:
Map<String, Map<String, Service>>,外层key是
namespace_id,内层key是group+serviceName.Service内部维护一个Map,结构是:
Map<String,Cluster>,key是clusterName,值是集群信息Cluster内部维护一个Set集合,元素是Instance类型,代表集群中的多个实例。
Nacos如何保证并发写的安全性?
答:首先,在注册实例时,会对service加锁,不同service之间本身就不存在并发写问题,互不影响。相同service时通过锁来互斥。并且,在更新实例列表时,是基于异步的线程池来完成,而线程池的线程数量为1.
Nacos如何避免并发读写的冲突?
答:Nacos在更新实例列表时,会采用CopyOnWrite技术,首先将Old实例列表拷贝一份,然后更新拷贝的实例列表,再用更新后的实例列表来覆盖旧的实例列表。
Nacos如何应对阿里内部数十万服务的并发写请求?
答:Nacos内部会将服务注册的任务放入阻塞队列,采用线程池异步来完成实例更新,从而提高并发写能力。
3. 服务心跳
Nacos的实例分为临时实例和永久实例两种,可以通过在yaml 文件配置:
临时实例基于心跳方式做健康检测,而永久实例则是由Nacos主动探测实例状态。
其中Nacos提供的心跳的API接口为:
接口描述:发送某个实例的心跳
请求类型:PUT
请求路径:
请求参数:
serviceName
字符串
是
服务名
groupName
字符串
否
分组名
ephemeral
boolean
否
是否临时实例
beat
JSON格式字符串
是
实例心跳内容
错误编码:
400
Bad Request
客户端请求中的语法错误
403
Forbidden
没有权限
404
Not Found
无法找到资源
500
Internal Server Error
服务器内部错误
200
OK
正常
3.1 客户端
在2.2.4.服务注册这一节中,我们说过NacosNamingService这个类实现了服务的注册,同时也实现了服务心跳:
3.1.1 BeatInfo
这里的BeanInfo就包含心跳需要的各种信息:

3.1.2 BeatReactor
而BeatReactor这个类则维护了一个线程池:

当调用BeatReactor的.addBeatInfo(groupedServiceName, beatInfo)方法时,就会执行心跳:
心跳周期的默认值在com.alibaba.nacos.api.common.Constants类中:

可以看到是5秒,默认5秒一次心跳。
3.1.3 BeatTask
心跳的任务封装在BeatTask这个类中,是一个Runnable,其run方法如下:
3.1.5 发送心跳
最终心跳的发送还是通过NamingProxy的sendBeat方法来实现:
3.2 服务端
对于临时实例,服务端代码分两部分:
1)InstanceController提供了一个接口,处理客户端的心跳请求
2)定时检测实例心跳是否按期执行
3.2.1 InstanceController
与服务注册时一样,在nacos-naming模块中的InstanceController类中,定义了一个方法用来处理心跳请求:
最终,在确认心跳请求对应的服务、实例都在的情况下,开始交给Service类处理这次心跳请求。调用了Service的processClientBeat方法
3.2.2 处理心跳请求
查看Service的service.processClientBeat(clientBeat);方法:
可以看到心跳信息被封装到了 ClientBeatProcessor类中,交给了HealthCheckReactor处理,HealthCheckReactor就是对线程池的封装,不用过多查看。
关键的业务逻辑都在ClientBeatProcessor这个类中,它是一个Runnable,其中的run方法如下:
处理心跳请求的核心就是更新心跳实例的最后一次心跳时间,lastBeat,这个会成为判断实例心跳是否过期的关键指标!
3.3.3 心跳异常检测
在服务注册时,一定会创建一个Service对象,而Service中有一个init方法,会在注册时被调用:
其中HealthCheckReactor.scheduleCheck就是执行心跳检测的定时任务:

可以看到,该任务是5000ms执行一次,也就是5秒对实例的心跳状态做一次检测。
此处的ClientBeatCheckTask同样是一个Runnable,其中的run方法为:
其中的超时时间同样是在com.alibaba.nacos.api.common.Constants这个类中:

3.3.4 主动健康检测
对于非临时实例(ephemeral=false),Nacos会采用主动的健康检测,定时向实例发送请求,根据响应来判断实例健康状态。
入口在2.3.2小节的ServiceManager类中的registerInstance方法:

创建空服务时:
创建服务流程:
关键在putServiceAndInit(service)方法中:
进入初始化逻辑:service.init(),这个会进入Service类中:
这里集群的初始化 entry.getValue().init();会进入Cluster类型的init()方法:
这里的HealthCheckReactor.scheduleCheck(checkTask);会开启定时任务,对非临时实例做健康检测。检测逻辑定义在HealthCheckTask这个类中,是一个Runnable,其中的run方法:
健康检测逻辑定义在healthCheckProcessor.process(this);方法中,在HealthCheckProcessor接口中,这个接口也有很多实现,默认是TcpSuperSenseProcessor:

进入TcpSuperSenseProcessor的process方法:
可以看到,所有的健康检测任务都被放入一个阻塞队列,而不是立即执行了。这里又采用了异步执行的策略,可以看到Nacos中大量这样的设计。
而TcpSuperSenseProcessor本身就是一个Runnable,在它的构造函数中会把自己放入线程池中去执行,其run方法如下:
通过processTask来处理健康检测的任务:
任务被封装到了TaskProcessor中去执行了,TaskProcessor是一个Callable,其中的call方法:
3.3 总结
Nacos的健康检测有两种模式:
临时实例:
采用客户端心跳检测模式,心跳周期5秒
心跳间隔超过15秒则标记为不健康
心跳间隔超过30秒则从服务列表删除
永久实例:
采用服务端主动健康检测方式
周期为2000 + 5000毫秒内的随机数
检测异常只会标记为不健康,不会删除
那么为什么Nacos有临时和永久两种实例呢?
以淘宝为例,双十一大促期间,流量会比平常高出很多,此时服务肯定需要增加更多实例来应对高并发,而这些实例在双十一之后就无需继续使用了,采用临时实例比较合适。而对于服务的一些常备实例,则使用永久实例更合适。
与eureka相比,Nacos与Eureka在临时实例上都是基于心跳模式实现,差别不大,主要是心跳周期不同,eureka是30秒,Nacos是5秒。
另外,Nacos支持永久实例,而Eureka不支持,Eureka只提供了心跳模式的健康监测,而没有主动检测功能。
4. 服务发现
Nacos提供了一个根据serviceId查询实例列表的接口:
接口描述:查询服务下的实例列表
请求类型:GET
请求路径:
请求参数:
serviceName
字符串
是
服务名
groupName
字符串
否
分组名
namespaceId
字符串
否
命名空间ID
clusters
字符串,多个集群用逗号分隔
否
集群名称
healthyOnly
boolean
否,默认为false
是否只返回健康实例
错误编码:
400
Bad Request
客户端请求中的语法错误
403
Forbidden
没有权限
404
Not Found
无法找到资源
500
Internal Server Error
服务器内部错误
200
OK
正常
4.1 客户端
4.1.1 定时更新服务列表
4.1.1.1 NacosNamingService
在2.2.4小节中,我们讲到一个类NacosNamingService,这个类不仅仅提供了服务注册功能,同样提供了服务发现的功能。

多个重载的方法最终都会进入一个方法:
4.1.1.2 HostReactor
进入1.1.订阅服务消息,这里是由HostReactor类的getServiceInfo()方法来实现的:
基本逻辑就是先从本地缓存读,根据结果来选择:
如果本地缓存没有,立即去nacos读取,
updateServiceNow(serviceName, clusters)
image-20210923161528710 如果本地缓存有,则开启定时更新功能,并返回缓存结果:
scheduleUpdateIfAbsent(serviceName, clusters)

image-20210923161630575 在UpdateTask中,最终还是调用updateService方法:

image-20210923161752521
不管是立即更新服务列表,还是定时更新服务列表,最终都会执行HostReactor中的updateService()方法:
4.1.1.3 ServerProxy
而ServerProxy的queryList方法如下:
4.1.2 处理服务变更通知
除了定时更新服务列表的功能外,Nacos还支持服务列表变更时的主动推送功能。
在HostReactor类的构造函数中,有非常重要的几个步骤:

基本思路是:
通过PushReceiver监听服务端推送的变更数据
解析数据后,通过NotifyCenter发布服务变更的事件
InstanceChangeNotifier监听变更事件,完成对服务列表的更新
4.1.2.1 PushReceiver
我们先看PushReceiver,这个类会以UDP方式接收Nacos服务端推送的服务变更数据。
先看构造函数:
PushReceiver构造函数中基于线程池来运行任务。这是因为PushReceiver本身也是一个Runnable,其中的run方法业务逻辑如下:
4.1.2.2 HostReactor
通知数据的处理由交给了HostReactor的processServiceJson方法:
4.2 服务端
4.2.1 拉取服务列表接口
在2.3.1小节介绍的InstanceController中,提供了拉取服务列表的接口:
进入doSrvIpxt()方法来获取服务列表:
4.2.2 发布服务变更的UDP通知
在上一节中,InstanceController中的doSrvIpxt()方法中,有这样一行代码:
其实是把消费者的UDP端口、IP等信息封装为一个PushClient对象,存储PushService中。方便以后服务变更后推送消息。
PushService类本身实现了ApplicationListener接口:

这个是事件监听器接口,监听的是ServiceChangeEvent(服务变更事件)。
当服务列表变化时,就会通知我们:

4.3 总结
Nacos的服务发现分为两种模式:
模式一:主动拉取模式,消费者定期主动从Nacos拉取服务列表并缓存起来,再服务调用时优先读取本地缓存中的服务列表。
模式二:订阅模式,消费者订阅Nacos中的服务列表,并基于UDP协议来接收服务变更通知。当Nacos中的服务列表更新时,会发送UDP广播给所有订阅者。
与Eureka相比,Nacos的订阅模式服务状态更新更及时,消费者更容易及时发现服务列表的变化,剔除故障服务。
Last updated
Was this helpful?