Distro 介绍

image.png

Distro 协议是一个 AP 协议,只能保证数据的最终一致性,用于 Nacos 中的临时服务。

Distro 协议的设计思想:

  1. 每个节点只负责部分数据, 定时发送自己负责数据的校验值到其他节点来保持数据⼀致性。
    1. RPC 模式下,Client 直连的节点,则负责此 Client
    2. HTTP 模式下,通过 instance 的 IP:PORT 计算 hash 取模,来确定负责的节点
  2. Nacos 每个节点是“平等”的都可以处理写请求, 同时把新数据同步到其他节点。
    1. 全量数据同步:Distro 节点启动时,会从所有节点拉取 snapshot 全量数据(该节点负责的数据),更新本地数据
    2. 增量数据同步:
      • 心跳机制:通过心跳来校验所有 Client 的数据 hash 是否和其他节点一致,如果不一致,则拉取
      • 实时推送:如果有服务注册或注销,Distro 节点会主动给其他节点推送变更数据
  3. 每个节点独立处理所有读请求, 返回本地缓存的数据
    1. 本地缓存的数据可能会有延时,当本地缓存更新时,会主动推送最新数据给 Client,达到最终一致性
    2. 提高服务可用性

特点:

  1. Distro 协议是一个 AP 协议,只能保证数据的最终一致性
  2. Distro 节点之间进行数据验证时,需要将本地所有数据计算成 hash 值,所以 Distro 协议不适用于大量数据的场景

Distro 初始化

DistroProtocol 类在初始化的时候,会:

  1. startVerifyTask() :定时任务,每隔 5 秒和其他节点校验每个 Client 数据是否一致 ,如果发现数据不一致,会推送本地数据给其他节点;
  2. startLoadTask():从其他 Distro 节点拉取数据

image.png

@Component
public class DistroProtocol {
    // 维护了集群所有节点的信息
    private final ServerMemberManager memberManager;

    // 维护了 Distro 事件处理器
    private final DistroComponentHolder distroComponentHolder;

    // 负责执行定时任务
    private final DistroTaskEngineHolder distroTaskEngineHolder;
    
    public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
        DistroTaskEngineHolder distroTaskEngineHolder) {
        this.memberManager = memberManager;
        this.distroComponentHolder = distroComponentHolder;
        this.distroTaskEngineHolder = distroTaskEngineHolder;
        // 启动 Distro 任务
        startDistroTask();
    }
    
    private void startDistroTask() {
        if (EnvUtil.getStandaloneMode()) {
            // 单机模式无需使用Distro协议
            isInitialized = true;
            return;
        }
        // 定时任务,每隔 5 秒和其他节点校验每个 Client 数据是否一致 
        startVerifyTask();
        // 从其他 Distro 节点拉取数据
        startLoadTask();
    }

 }

初始化时候,当前节点会从其他的节点拉取所有的服务 snapshot 数据,当所有的 snapshot 数据拉取完成,当前 Distro 节点初始化完成,通过回调函数设置 isInitialized 为 true:

private void startLoadTask() {
    // 回调函数
    DistroCallback loadCallback = new DistroCallback() {
        @Override
        public void onSuccess() {
            // Distro 节点初始化完成
            isInitialized = true;
        }
        
        @Override
        public void onFailed(Throwable throwable) {
            isInitialized = false;
        }
    };
    // 从其他节点拉取 snapshot 数据
    GlobalExecutor.submitLoadDataTask(
            new DistroLoadDataTask(memberManager, distroComponentHolder, DistroConfig.getInstance(), loadCallback));
}

image.png

DistroLoadDataTask 是一个线程类,只需要关注 run 方法的逻辑即可,其核心逻辑在 load() 方法中:

image.png

@Override
public void run() {
    try {
        load();
        if (!checkCompleted()) {
            // 如果加载数据失败,会再次执行加载任务,30秒后继续重试
            GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
        } else {
            loadCallback.onSuccess();
            Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
        }
    } catch (Exception e) {
        loadCallback.onFailed(e);
        Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
    }
}

private void load() throws Exception {
    // 若出自身之外没有其他节点,则休眠1秒,可能其他节点还未启动完毕
    // 注意:如果有新的节点进来,通过心跳来同步数据
    while (memberManager.allMembersWithoutSelf().isEmpty()) {
        Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
        TimeUnit.SECONDS.sleep(1);
    }
    // 若数据类型为空,说明distroComponentHolder的组件注册器还未初始化完毕
    while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
        Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
        TimeUnit.SECONDS.sleep(1);
    }
    // 加载每个类型的数据
    for (String each : distroComponentHolder.getDataStorageTypes()) {
        if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
            // 调用加载方法,并标记已处理
            loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
        }
    }
}

最终会调用 loadAllDataSnapshotFromRemote 方法同步数据:

  1. 从远端拉取 snapshot 数据;
  2. 将 snapshot 数据应用到本地内存;
  3. 标记数据初始化成功。
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
    // 获取数据传输对象
    DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
    // 获取数据处理器
    DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
    if (null == transportAgent || null == dataProcessor) {
        Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
                resourceType, transportAgent, dataProcessor);
        return false;
    }
    // 向每个节点请求数据
    for (Member each : memberManager.allMembersWithoutSelf()) {
        long startTime = System.currentTimeMillis();
        try {
            Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
            /** 
             * 发送请求拉取 snapshot 数据
             *  @see com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientTransportAgent#getDatumSnapshot(String)
             */
            DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
            Loggers.DISTRO.info("[DISTRO-INIT] it took {} ms to load snapshot {} from {} and snapshot size is {}.",
                    System.currentTimeMillis() - startTime, resourceType, each.getAddress(),
                    getDistroDataLength(distroData));
            /**
             * 解析数据
             * @see com.alibaba.nacos.naming.consistency.ephemeral.distro.v2.DistroClientDataProcessor#processSnapshot(DistroData)
             */
            boolean result = dataProcessor.processSnapshot(distroData);
            Loggers.DISTRO
                    .info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),
                            result);
            // 若解析成功,标记此类型数据已加载完毕
            if (result) {
                distroComponentHolder.findDataStorage(resourceType).finishInitial();
                return true;
            }
        } catch (Exception e) {
            e.printStackTrace();
            Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
        }
    }
    return false;
}

getDatumSnapshot 最终会调用 DistroClientTransportAgent#getDatumSnapshot :

  1. 判断目标节点的连接状态是否正常;
  2. 构建 DistroDataRequest 请求,类型为 DataOperation.SNAPSHOT,并给目标节点发送该请求;
  3. 如果期间有其他情况,皆抛出 DistroException 异常;
public class DistroClientTransportAgent implements DistroTransportAgent {
    
    // 负责发送 RPC 请求
    private final ClusterRpcClientProxy clusterRpcClientProxy;
    
    // 管理 Distro 节点信息
    private final ServerMemberManager memberManager;

    public DistroData getDatumSnapshot(String targetServer) {
        // 从节点管理器获取目标节点信息
        Member member = memberManager.find(targetServer);
        // 判断目标服务器是否健康
        if (checkTargetServerStatusUnhealthy(member)) {
            throw new DistroException(
                    String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));
        }
        // 构建请求参数
        DistroDataRequest request = new DistroDataRequest();
        // 设置请求的操作类型为DataOperation.SNAPSHOT
        request.setDataOperation(DataOperation.SNAPSHOT);
        try {
            // 使用Rpc代理对象发送同步rpc请求
            Response response = clusterRpcClientProxy
                    .sendRequest(member, request, DistroConfig.getInstance().getLoadDataTimeoutMillis());
            if (checkResponse(response)) {
                return ((DistroDataResponse) response).getDistroData();
            } else {
                throw new DistroException(
                        String.format("[DISTRO-FAILED] Get snapshot request to %s failed, code: %d, message: %s",
                                targetServer, response.getErrorCode(), response.getMessage()));
            }
        } catch (NacosException e) {
            e.printStackTrace();
            throw new DistroException("[DISTRO-FAILED] Get distro snapshot failed! ", e);
        }
    }
}

processSnapshot 会调用 DistroClientDataProcessor#processSnapshot() 方法来处理拉取到的 snapshot 数据:

  1. 将获取的 snapshot 数据反序列化成 ClientSyncDatumSnapshot 类型;
  2. 创建 IpPortBasedClient 对象,放到 ClientManager 中,并启动心跳检测任务;
  3. 调用 upgradeClient 更新 ServiceManager、ClientServiceIndexesManager、Push 给订阅者等操作;
public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {
    public static final String TYPE = "Nacos:Naming:v2:ClientData";

    /**
     * @see ClientManagerDelegate 
     * 里面包含了 EphemeralIpPortClientManager 和 PersistentIpPortClientManager
     */
    private final ClientManager clientManager;
    
    private final DistroProtocol distroProtocol;
    
    @Override
    public boolean processSnapshot(DistroData distroData) {
        // 反序列化获取的DistroData为ClientSyncDatumSnapshot
        ClientSyncDatumSnapshot snapshot = ApplicationUtils.getBean(Serializer.class)
                .deserialize(distroData.getContent(), ClientSyncDatumSnapshot.class);
        // 处理结果集,这里将返回远程节点负责的所有client以及client下面的service、instance信息
        for (ClientSyncData each : snapshot.getClientSyncDataList()) {
            // 每次处理一个client
            handlerClientSyncData(each);
        }
        return true;
    }
    
    // 备注:这里只需要维护 publisher 的关系,不需要维护 subscriber 的关系
    private void handlerClientSyncData(ClientSyncData clientSyncData) {
        Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}, revision={}",
                clientSyncData.getClientId(),
                clientSyncData.getAttributes().getClientAttribute(ClientConstants.REVISION, 0L));
        // 因为是同步数据,因此创建IpPortBasedClient,并缓存
        clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
        Client client = clientManager.getClient(clientSyncData.getClientId());
        // 升级此客户端的服务信息
        upgradeClient(client, clientSyncData);
    }

}

ClientSyncData 中包含了其他节点 namespace、service、instance 等信息,要注意,这里的数据,是以每个 Client (即,发布者)为单位的:

public class ClientSyncData implements Serializable {
    
    private static final long serialVersionUID = -5141768777704539562L;
    
    private String clientId;
    
    private ClientAttributes attributes;
    
    private List<String> namespaces;
    
    private List<String> groupNames;
    
    private List<String> serviceNames;
    
    private List<InstancePublishInfo> instancePublishInfos;
    
    private BatchInstanceData batchInstanceData;
}

upgradeClient 是将远程数据应用到当前节点本地的核心流程:

  1. 更新 ServiceManager 下的数据
  2. 更新 ClientServiceIndexesManager
  3. 发布 ServiceEvent.ServiceChangedEvent 事件
  4. 给订阅者 Push PushDelayTask,通知订阅者更新自己的 instance 数据
private void upgradeClient(Client client, ClientSyncData clientSyncData) {
    // 已同步的服务集合

    Set<Service> syncedService = new HashSet<>();
    // process batch instance sync logic
    // 0、更新 ServiceManager 下的数据
    // 1、更新 ClientServiceIndexesManager
    // 2、发布 ServiceEvent.ServiceChangedEvent 事件
    // 3、给订阅者 Push PushDelayTask,通知订阅者更新自己的 instance 数据
    processBatchInstanceDistroData(syncedService, client, clientSyncData);

    List<String> namespaces = clientSyncData.getNamespaces();
    List<String> groupNames = clientSyncData.getGroupNames();
    List<String> serviceNames = clientSyncData.getServiceNames();
    List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();

    // TODO 这段逻辑,和 processBatchInstanceDistroData 里面逻辑差不多,为啥这里要在执行一遍????
    for (int i = 0; i < namespaces.size(); i++) {
        Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        // 标记该 Service 已经被处理
        syncedService.add(singleton);
        InstancePublishInfo instancePublishInfo = instances.get(i); // TODO 如果一个 Service 有多个 instance,该怎么处理呢
        // 当前 instance 不存在本地的 client 中,视为从 其他节点获取的新服务,新服务需要 Push 给订阅者
        if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
            // 添加到 ClientInfo 中
            client.addServiceInstance(singleton, instancePublishInfo);
            // 当前节点发布服务注册事件
            NotifyCenter.publishEvent(
                    new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
            NotifyCenter.publishEvent(
                    new MetadataEvent.InstanceMetadataEvent(singleton, instancePublishInfo.getMetadataId(), false));
        }
    }
    // 若当前client内部已发布的service不在本次同步的列表内,说明已经过时了,要删掉
    // TODO 为啥这么判断?如果两个 Node 数据不一致,以远程数据为准?这样好像会出问题?
    for (Service each : client.getAllPublishedService()) {
        // 如果 client 有多余的 Service,从 client 中移除
        if (!syncedService.contains(each)) {
            client.removeServiceInstance(each);
            NotifyCenter.publishEvent(
                    new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
        }
    }
    client.setRevision(
            clientSyncData.getAttributes().<Integer>getClientAttribute(ClientConstants.REVISION, 0));
}

增量数据同步

startLoadTask 用来同步全量数据(Nacos 初始化过程中),Nacos 通过发布 Event 的机制,来同步增量数据(Nacos 运行过程中)。

DistroClientDataProcessor 继承了 Subscriber 事件订阅类,他在初始化的时候,会订阅这几种事件:

image.png

@Override
public List<Class<? extends Event>> subscribeTypes() {
    List<Class<? extends Event>> result = new LinkedList<>();
    result.add(ClientEvent.ClientChangedEvent.class);
    result.add(ClientEvent.ClientDisconnectEvent.class);
    result.add(ClientEvent.ClientVerifyFailedEvent.class);
    return result;
}

当有服务注册时,会发送 ClientEvent.ClientChangedEvent 事件;当有服务下线时,会发送 ClientEvent.ClientDisconnectEvent 事件,这俩事件会被此类处理。

private void syncToAllServer(ClientEvent event) {
    Client client = event.getClient();
    // Only ephemeral data sync by Distro, persist client should sync by raft.
    // distro 协议下,每个 node 只负责部分服务的数据同步
    // TODO isResponsibleClient 的逻辑解析
    if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
        return;
    }
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        // 删除事件
        distroProtocol.sync(distroKey, DataOperation.DELETE);
    } else if (event instanceof ClientEvent.ClientChangedEvent) {
        // 可以看到,增量数据同步是以 client 发布的数据为单位进行的
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        // 节点变更事件,即增量数据的同步方法
        distroProtocol.sync(distroKey, DataOperation.CHANGE);
    }
}

可以看到, Distro 节点会通过 distroProtocol.sync 给其他节点发送 RPC 请求,请求的 Type 分别是 DELETE 和 CHANGE:

public void sync(DistroKey distroKey, DataOperation action, long delay) {
    for (Member each : memberManager.allMembersWithoutSelf()) {
        syncToTarget(distroKey, action, each.getAddress(), delay);
    }
}

public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
    DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
            targetServer);
    DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
    /**
     * 发布任务 DistroDelayTask
     * @see DistroDelayTaskProcessor#process(NacosTask)
     */
    distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
    if (Loggers.DISTRO.isDebugEnabled()) {
        Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
    }
}

会走到 DistroDelayTaskProcessor 的 process 方法,分别转换成 DistroSyncDeleteTask 和 DistroSyncChangeTask 进行任务投递:

public boolean process(NacosTask task) {
    if (!(task instanceof DistroDelayTask)) {
        return true;
    }
    DistroDelayTask distroDelayTask = (DistroDelayTask) task;
    DistroKey distroKey = distroDelayTask.getDistroKey();
    switch (distroDelayTask.getAction()) {
        case DELETE:
            DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
            distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
            return true;
        case CHANGE:
        case ADD:
            DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
            distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
            return true;
        default:
            return false;
    }
}

在 DistroClientDataProcessor 中,他会生成,他会调用 client.generateSyncData() 生成要同步的数据,病序列化放到 Request 中,发送给对方节点:

DistroClientDataProcessorpublic DistroData getDistroData(DistroKey distroKey) {
    Client client = clientManager.getClient(distroKey.getResourceKey());
    if (null == client) {
        return null;
    }
    byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
    return new DistroData(distroKey, data);
}

可以看到 client 生成这个 client 的同步数据的流程,他会将所有的 namespace、service、instance 等信息进行序列化:

public abstract class AbstractClient implements Client {
    @Override
    public ClientSyncData generateSyncData() {
        List<String> namespaces = new LinkedList<>();
        List<String> groupNames = new LinkedList<>();
        List<String> serviceNames = new LinkedList<>();
    
        List<String> batchNamespaces = new LinkedList<>();
        List<String> batchGroupNames = new LinkedList<>();
        List<String> batchServiceNames = new LinkedList<>();
        
        List<InstancePublishInfo> instances = new LinkedList<>();
        List<BatchInstancePublishInfo> batchInstancePublishInfos = new LinkedList<>();
        BatchInstanceData  batchInstanceData = new BatchInstanceData();
        for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
            InstancePublishInfo instancePublishInfo = entry.getValue();
            if (instancePublishInfo instanceof BatchInstancePublishInfo) {
                BatchInstancePublishInfo batchInstance = (BatchInstancePublishInfo) instancePublishInfo;
                batchInstancePublishInfos.add(batchInstance);
                buildBatchInstanceData(batchInstanceData, batchNamespaces, batchGroupNames, batchServiceNames, entry);
                batchInstanceData.setBatchInstancePublishInfos(batchInstancePublishInfos);
            } else {
                namespaces.add(entry.getKey().getNamespace());
                groupNames.add(entry.getKey().getGroup());
                serviceNames.add(entry.getKey().getName());
                instances.add(entry.getValue());
            }
        }
        ClientSyncData data = new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances, batchInstanceData);
        data.getAttributes().addClientAttribute(REVISION, getRevision());
        return data;
    }
}

最终会走到 DistroClientTransportAgent 发送一个 RPC 请求给对方:

public boolean syncData(DistroData data, String targetServer) {
    // 目标服务不存在
    if (isNoExistTarget(targetServer)) {
        return true;
    }
    // 构造distro协议数据的请求
    DistroDataRequest request = new DistroDataRequest(data, data.getType());
    // 找到对应的ip,端口等属性信息
    Member member = memberManager.find(targetServer);
    // 检查服务节点是否在线
    if (checkTargetServerStatusUnhealthy(member)) {
        Loggers.DISTRO
                .warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy, key: {}", targetServer,
                        data.getDistroKey());
        return false;
    }
    try {
        // 在线的话,发送grpc的请求
        Response response = clusterRpcClientProxy.sendRequest(member, request);
        // 检查结果是否成功
        return checkResponse(response);
    } catch (NacosException e) {
        Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! key: {}", data.getDistroKey(), e);
    }
    return false;
}

对方节点收到 RPC 请求,会调用 DistroDataRequestHandler 进行处理。他会根据 Request 的 Type 来选择处理逻辑:

public class DistroDataRequestHandler extends RequestHandler<DistroDataRequest, DistroDataResponse> {
    
    private final DistroProtocol distroProtocol;
    
    @Override
    public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
        try {
            switch (request.getDataOperation()) {
                case VERIFY:
                    // 心跳请求,每隔 5 秒发送一次
                    return handleVerify(request.getDistroData(), meta);
                case SNAPSHOT:
                    // 处理快照请求
                    // 全量数据同步
                    return handleSnapshot();
                case ADD:
                case CHANGE:
                case DELETE:
                    // 增量数据同步
                    return handleSyncData(request.getDistroData());
                case QUERY:
                    // 处理查询数据的请求
                    return handleQueryData(request.getDistroData());
                default:
                    // 默认的返回
                    return new DistroDataResponse();
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
            DistroDataResponse result = new DistroDataResponse();
            result.setErrorCode(ResponseCode.FAIL.getCode());
            result.setMessage("handle distro request with exception");
            return result;
        }
    }
    
}

handleSyncData 最终会走到 DistroClientDataProcessor 进行处理:

@Override
public boolean processData(DistroData distroData) {
    switch (distroData.getType()) {
        case ADD:
        case CHANGE:
            // 反序列化获取的DistroData为ClientSyncDatumSnapshot
            // TODO 发送事件的逻辑:@see com.alibaba.nacos.core.distributed.distro.DistroProtocol.syncToTarget 方法
            ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
                    .deserialize(distroData.getContent(), ClientSyncData.class);
            // 处理 client 变更的事件
            handlerClientSyncData(clientSyncData);
            return true;
        case DELETE:
            String deleteClientId = distroData.getDistroKey().getResourceKey();
            Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
            // 处理 client 下线(删除)的事件
            clientManager.clientDisconnected(deleteClientId);
            return true;
        default:
            return false;
    }
}

handlerClientSyncData 的逻辑和 Distro 初始化逻辑类似,核心都在 upgradeClient 中:

  1. 创建一个 IpPortBasedClient 对象,放到 clientManager 中
  2. 更新 ServiceManager 下的数据
  3. 更新 ClientServiceIndexesManager
  4. 发布 ServiceEvent.ServiceChangedEvent 事件
  5. 给订阅者 Push PushDelayTask,通知订阅者更新自己的 instance 数据
// 备注:这里只需要维护 publisher 的关系,不需要维护 subscriber 的关系
private void handlerClientSyncData(ClientSyncData clientSyncData) {
    Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}, revision={}",
            clientSyncData.getClientId(),
            clientSyncData.getAttributes().getClientAttribute(ClientConstants.REVISION, 0L));
    // 因为是同步数据,因此创建IpPortBasedClient,并缓存
    clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
    Client client = clientManager.getClient(clientSyncData.getClientId());
    // 升级此客户端的服务信息
    upgradeClient(client, clientSyncData);
}

在 clientDisconnected 方法中,会进行:

  1. 将 client 从 ClientManager 中移除;
  2. 发布 ClientEvent.ClientDisconnectEvent 事件
    1. 更新 ClientServiceIndexesManager
    2. 更新 NamingMetadataManager
    3. 通知订阅者修改自己的服务
@Override
public boolean clientDisconnected(String clientId) {
    Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
    IpPortBasedClient client = clients.remove(clientId);
    if (null == client) {
        return true;
    }
    NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client, isResponsibleClient(client)));
    client.release();
    return true;
}

至此,流程完成。

心跳机制(元数据校验)

上面我们提到,DistroProtocol 初始化完成后,会启动一个 VerifyTask 任务,这个任务会每隔 5 秒给其他节点发送当前节点负责的 Client 的数据信息(revision hash 值),来对比两个节点数据是否一致,如果不一致会触发两者之间进行数据同步。

private void startVerifyTask() {
    // 每隔 5 秒给其他 member 发送验证请求
    /**
     * @see DistroVerifyTimedTask#run()
     */
    GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTimedTask(memberManager, distroComponentHolder,
                    distroTaskEngineHolder.getExecuteWorkersManager()),
            DistroConfig.getInstance().getVerifyIntervalMillis()); // 间隔 5 秒一次
}

DistroVerifyTimedTask 也是一个线程类,所以只关心他的 run 方法逻辑即可:

public class DistroVerifyTimedTask implements Runnable {
    
    // 管理所有的服务节点
    private final ServerMemberManager serverMemberManager;
    
    // Distro 组件持有器
    private final DistroComponentHolder distroComponentHolder;
    
    // 执行定时任务
    private final DistroExecuteTaskExecuteEngine executeTaskExecuteEngine;
    
    @Override
    public void run() {
        try {
            // 获取所有的服务节点
            List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("server list is: {}", targetServer);
            }
            for (String each : distroComponentHolder.getDataStorageTypes()) {
                /**
                 * 这里我们注关注 Nacos:Naming:v2:ClientData 类型
                 * @see DistroVerifyTimedTask#verifyForDataStorage(String, List)
                 */
                verifyForDataStorage(each, targetServer);
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
        }
    }
    
}

verifyForDataStorage 获取到本地的数据,然后将数据发送给其他的节点:

private void verifyForDataStorage(String type, List<Member> targetServer) {
    // 获取处理存储类
    DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);
    if (!dataStorage.isFinishInitial()) {
        Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data",
                dataStorage.getClass().getSimpleName());
        return;
    }
    // 拿到验证的数据
    List<DistroData> verifyData = dataStorage.getVerifyData();
    if (null == verifyData || verifyData.isEmpty()) {
        return;
    }
    for (Member member : targetServer) {
        DistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);
        if (null == agent) {
            continue;
        }
        // 通过执行器执行
        /**
         * @see DistroVerifyExecuteTask#run()
         */
        executeTaskExecuteEngine.addTask(member.getAddress() + type,
                new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));
    }
}

getVerifyData 可以看到 Distro 要验证的数据内容:

  1. 遍历所有的 client,即发布者;
  2. 将 client 的 clientId 和 revision 作为此服务当前的状态。
public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {
    @Override
    public List<DistroData> getVerifyData() {
        List<DistroData> result = null;
        // 数据验证的单位是每个 Client 的数据
        for (String each : clientManager.allClientId()) {
            // 对每个本机所管理的注册客户端进行处理
            Client client = clientManager.getClient(each);
            if (null == client || !client.isEphemeral()) {
                // 空的或者是非临时性的节点,不处理
                continue;
            }
            // 如果是自己管理的客户端
            // TODO 待分析这块流程
            if (clientManager.isResponsibleClient(client)) {
                // 需要验证的数据就是每个节点的clientId和revision
                // revision 是用 client 里面所有的 service、instance 等信息通过计算 hash 来获取的
                // 可以看到,是通过 client 为单位来进行数据校验的
                DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(),
                        client.getRevision());
                DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
                DistroData data = new DistroData(distroKey,
                        ApplicationUtils.getBean(Serializer.class).serialize(verifyData));
                data.setType(DataOperation.VERIFY);
                if (result == null) {
                    result = new LinkedList<>();
                }
                result.add(data);
            }
        }
        return result;
    }
}

上篇文章,我们提到了 revision 的计算逻辑:将 client 拥有的 namespace、group、ip、port、instance、metadata 等所有的状态信息计算一个 hash 值:

// client.recalculateRevision() 会调用如下方法,计算该 client 当前服务的状态
public static int hash(Client client) {
    if (!(client instanceof IpPortBasedClient)) {
        return 0;
    }
    return Objects.hash(client.getClientId(),
            client.getAllPublishedService().stream()
                    .map(s -> {
                        InstancePublishInfo ip = client.getInstancePublishInfo(s);
                        double weight = getWeight(ip);
                        Boolean enabled = getEnabled(ip);
                        String cluster = StringUtils.defaultIfBlank(ip.getCluster(), DEFAULT_CLUSTER_NAME);
                        return Objects.hash(
                                s.getNamespace(),
                                s.getGroup(),
                                s.getName(),
                                s.isEphemeral(),
                                ip.getIp(),
                                ip.getPort(),
                                weight,
                                ip.isHealthy(),
                                enabled,
                                cluster,
                                ip.getExtendDatum()
                        );
                    })
                    .collect(Collectors.toSet()));
}

DistroVerifyExecuteTask 最终会调用 DistroClientTransportAgent 给其他 Node 发送一个 DistroDataRequest 请求,请求的 type 为 DataOperation.VERIFY。如果对方校验成功或失败,会触发回调:

public class DistroClientTransportAgent implements DistroTransportAgent {

    @Override
    public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
        if (isNoExistTarget(targetServer)) {
            callback.onSuccess();
            return;
        }
        DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
        Member member = memberManager.find(targetServer);
        if (checkTargetServerStatusUnhealthy(member)) {
            Loggers.DISTRO
                    .warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy, key: {}", targetServer,
                            verifyData.getDistroKey());
            callback.onFailed(null);
            return;
        }
        try {
            DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,
                    verifyData.getDistroKey().getResourceKey(), callback, member);
            clusterRpcClientProxy.asyncRequest(member, request, wrapper);
        } catch (NacosException nacosException) {
            callback.onFailed(nacosException);
        }
    }
}

对方节点收到 RPC 请求,会调用 DistroDataRequestHandler 进行处理。他会根据 Request 的 Type 来选择处理逻辑:

public class DistroDataRequestHandler extends RequestHandler<DistroDataRequest, DistroDataResponse> {
    
    private final DistroProtocol distroProtocol;
    
    @Override
    public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
        try {
            switch (request.getDataOperation()) {
                case VERIFY:
                    // 心跳请求,每隔 5 秒发送一次
                    return handleVerify(request.getDistroData(), meta);
                case SNAPSHOT:
                    // 处理快照请求
                    // 全量数据同步
                    return handleSnapshot();
                case ADD:
                case CHANGE:
                case DELETE:
                    // 增量数据同步
                    return handleSyncData(request.getDistroData());
                case QUERY:
                    // 处理查询数据的请求
                    return handleQueryData(request.getDistroData());
                default:
                    // 默认的返回
                    return new DistroDataResponse();
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
            DistroDataResponse result = new DistroDataResponse();
            result.setErrorCode(ResponseCode.FAIL.getCode());
            result.setMessage("handle distro request with exception");
            return result;
        }
    }
    
}

handleVerify 方法最终会调用 ConnectionBasedClientManager 的 verifyClient 进行校验:

private DistroDataResponse handleVerify(DistroData distroData, RequestMeta meta) {
    DistroDataResponse result = new DistroDataResponse();
    // 如果返回 false, RPC 的 Response 会带上错误标识
    if (!distroProtocol.onVerify(distroData, meta.getClientIp())) {
        result.setErrorInfo(ResponseCode.FAIL.getCode(), "[DISTRO-FAILED] distro data verify failed");
    }
    return result;
}

public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {

    @Override
    public boolean verifyClient(DistroClientVerifyInfo verifyData) {
        ConnectionBasedClient client = clients.get(verifyData.getClientId());
        if (null != client) {
            // remote node of old version will always verify with zero revision
            // 如果本地 client 的 revision 和远端传来的一致,说明数据一致,返回 true
            if (0 == verifyData.getRevision() || client.getRevision() == verifyData.getRevision()) {
                // 更新该 client 的最新更新时间
                // TODO 这个字段的含义
                client.setLastRenewTime();
                return true;
            } else {
                Loggers.DISTRO.info("[DISTRO-VERIFY-FAILED] ConnectionBasedClient[{}] revision local={}, remote={}",
                        client.getClientId(), client.getRevision(), verifyData.getRevision());
            }
        }
        // 数据不一致,校验失败
        return false;
    }


}

Distro 收到 Response 之后,会触发回调函数,如果数据同步失败,会发布 ClientVerifyFailedEvent 事件:

public class DistroClientTransportAgent implements DistroTransportAgent {
    @Override
    public void onResponse(Response response) {
        if (checkResponse(response)) {
            NamingTpsMonitor.distroVerifySuccess(member.getAddress(), member.getIp());
            // 输出日志
            distroCallback.onSuccess();
        } else {
            // 触发时机:对方返回了失败的响应
            // 校验失败发布ClientVerifyFailedEvent事件
            Loggers.DISTRO.info("Target {} verify client {} failed, sync new client", targetServer, clientId);
            NotifyCenter.publishEvent(new ClientEvent.ClientVerifyFailedEvent(clientId, targetServer));
            NamingTpsMonitor.distroVerifyFail(member.getAddress(), member.getIp());
            distroCallback.onFailed(null);
        }
    }

}

ClientVerifyFailedEvent 的处理逻辑在 DistroClientDataProcessor 类中:

@Override
public void onEvent(Event event) {
    if (EnvUtil.getStandaloneMode()) {
        return;
    }
    if (!upgradeJudgement.isUseGrpcFeatures()) {
        return;
    }
    // 本地数据和远端数据不一致,将本地数据发送给远端
    if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
        syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
    } else {
        // ClientEvent.ClientChangedEvent 和 ClientEvent.ClientDisconnectEvent 事件逻辑
        syncToAllServer((ClientEvent) event);
    }
}

// 将本地这个 client 数据发送给远端
private void syncToVerifyFailedServer(ClientEvent.ClientVerifyFailedEvent event) {
    Client client = clientManager.getClient(event.getClientId());
    if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
        return;
    }
    // TYPE 为 "Nacos:Naming:v2:ClientData"
    DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
    // Verify failed data should be sync directly.
    // 将 client 数据发送给远方的节点,类型为 ADD
    distroProtocol.syncToTarget(distroKey, DataOperation.ADD, event.getTargetServer(), 0L);
}

Distro 节点收到 ADD 请求后,处理逻辑和上述 snapshot 逻辑类似,不再赘述。

注册服务/写入新服务

如果使用 Nacos client 注册临时服务,那么默认会发送一个 InstanceRequest 的 RPC 请求到 Nacos server 端,并交由 InstanceRequestHandler 进行处理(详细流程,可以参考上篇文章):

image.png

但是,如果通过 HTTP 方式调用 URL 路由来注册,则会经过前置的拦截器 Filter,判断该请求 IP:PORT 所属的节点,并负责请求转发:

public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
        throws IOException, ServletException {
        
        try {
            Method method = controllerMethodsCache.getMethod(req);
            
            String path = new URI(req.getRequestURI()).getPath();
            if (method == null) {
                throw new NoSuchMethodException(req.getMethod() + " " + path);
            }
            // 带了 @CanDistro 注解的方法才需要处理
            if (!method.isAnnotationPresent(CanDistro.class)) {
                filterChain.doFilter(req, resp);
                return;
            }
            
            //根据请求获取路径,格式为IP:PORT
            // 测试发现,他是 instance 实例的 IP:PORT,也就是说,所属关系是以 instance 级别来划分的
            String distroTag = distroTagGenerator.getResponsibleTag(req);
            
            //判断是否为本节点负责,如果不是就进行下面的逻辑进行请求转发
            if (distroMapper.responsible(distroTag)) {
                filterChain.doFilter(req, resp);
                return;
            }
            
            ......
            
            //通过distroTag计算其所属的 Distro 责任节点
            final String targetServer = distroMapper.mapSrv(distroTag);
            
            List<String> headerList = new ArrayList<>(16);
            Enumeration<String> headers = req.getHeaderNames();
            while (headers.hasMoreElements()) {
                String headerName = headers.nextElement();
                headerList.add(headerName);
                headerList.add(req.getHeader(headerName));
            }
            
            final String body = IoUtils.toString(req.getInputStream(), StandardCharsets.UTF_8.name());
            final Map<String, String> paramsValue = HttpClient.translateParameterMap(req.getParameterMap());
            
            //转发请求
            RestResult<String> result = HttpClient
                    .request(HTTP_PREFIX + targetServer + req.getRequestURI(), headerList, paramsValue, body,
                            PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, StandardCharsets.UTF_8.name(), req.getMethod());
            String data = result.ok() ? result.getData() : result.getMessage();
            try {
                WebUtils.response(resp, data, result.getCode());
            } catch (Exception ignore) {
                Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(distroTag) + urlString);
            }
            
          .....
}

distroMapper.responsible 会将 tag 计算 hash 值,然后与 servers.size() 取模,来判断该 instance 是否由当前服务负责:

public boolean responsible(String responsibleTag) {
    final List<String> servers = healthyList;
    
    if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {
        return true;
    }
    
    if (CollectionUtils.isEmpty(servers)) {
        // means distro config is not ready yet
        return false;
    }

    // TODO 为啥要计算两次?
    String localAddress = EnvUtil.getLocalAddress();
    int index = servers.indexOf(localAddress);
    int lastIndex = servers.lastIndexOf(localAddress);
    if (lastIndex < 0 || index < 0) {
        return true;
    }

    // clientID 的 # 之前的内容 取 hash,然后和 node size 取模。
    int target = distroHash(responsibleTag) % servers.size();
    // 判断是否包含
    return target >= index && target <= lastIndex;
}

如果上述通过 hash 计算发现不是本节点负责,distroMapper.mapSrv 会计算出哪个节点负责此 instance 的更新:

public String mapSrv(String responsibleTag) {
    // 所有健康的节点
    final List<String> servers = healthyList;
    
    if (CollectionUtils.isEmpty(servers) || !switchDomain.isDistroEnabled()) {
        return EnvUtil.getLocalAddress();
    }
    
    try {
        // hash 取模选择一个 server
        int index = distroHash(responsibleTag) % servers.size();
        return servers.get(index);
    } catch (Throwable e) {
        Loggers.SRV_LOG
                .warn("[NACOS-DISTRO] distro mapper failed, return localhost: " + EnvUtil.getLocalAddress(), e);
        return EnvUtil.getLocalAddress();
    }
}

总结

为什么 gRpc 注册服务,不需要判断负责节点并进行路由转发,而 Http 的方式需要做这些事情呢?

grpc是长链接,只有直接接收连接的节点可以和client直接通信,后面client的每次请求都会到当前节点(段开重新链接后由新的收到节点管理);另外长链接可以用长链接心跳来判断这个链接注册的服务实例是否过期。所以用接收节点管理对应的信息,不需要再转发。

http是无状态的,需要心跳判断实例是否过期。因为不是长链接每次请求到的节点可能是不同的,需要把所有心跳都转发到同一个节点处理,其心跳长能维护正确。同理其它写操作和心跳操作一样转发到同一个节点,统一只由一个节点操作,可以尽量避免冲突,保证数据正确。所以http请求时,集群需要根据分区规则转发到同一个节点管理操作。

读取服务

Distro 节点之间会定期同步全量数据,所以当有读请求进来,Distro 节点会直接从本地的缓存中拉取数据。此时可能数据会有延时,但是 Distro 的心跳机制(5秒),会感知并拉取到对方最新数据,然后再 Push 给用户,实现最终一致性。具体可以参考上周分享的流程:

image.png

参考资料

[1] 5000 字 | 揭秘 Nacos 的 AP 架构 「Distro 一致性协议」(一)

https://ost.51cto.com/posts/13166

[2] Nacos 2.0原理解析(一):Distro协议

https://blog.csdn.net/zcrzcrzcrzcrzcr/article/details/122260705

[3] Nacos 2.0源码分析-Distro协议详解

https://www.cnblogs.com/lukama/p/14918667.html

[4] Nacos 的 Distro 一致性协议

https://nacos.io/blog/article-nacos-distro-mechanism/