dubbo系列之注册中心

dubbo系列之注册中心StringBuilder buf = new StringBuilder(); Map<St

dubbo注册中心模块对于主流程来说不是必要的,可以看做是线上服务信息的配置中心,提供的核心功能是为consumer提供provider配置信息,为服务治理中心提供所有provider和consumer的信息,便于管理等。

既然是服务信息的中心,那么从模块功能角度,这个服务注册中心应该满足那些特性,怎么实现这些功能呢?

  • 易扩展:面向接口,面向SPI编程,支持通过服务URL通过SPI进行扩展,常规操作通过默认操作实现。

如:服务注册及订阅等功能,默认实现的都是比实际操作更抽象的一层且通用的功能,而具体与注册中心交互的功能则预留给扩展类实现,通过super的增量方式进行扩展。

默认实现的功能:服务注册、失败重试等策略,由FailbackRegistry默认实现;

从本地文件读取服务配置信息以及保存文件服务(默认异步保存文件,每次更新) 更新文件时通过版本号作为新旧校验,由AbstractRegistry默认实现;

private void saveProperties(URL url) {
        if (file == null) {
            return;
        }

        try {
            StringBuilder buf = new StringBuilder();
            Map<String, List<URL>> categoryNotified = notified.get(url);
            if (categoryNotified != null) {
                for (List<URL> us : categoryNotified.values()) {
                    for (URL u : us) {
                        if (buf.length() > 0) {
                            buf.append(URL_SEPARATOR);
                        }
                        buf.append(u.toFullString());
                    }
                }
            }
            properties.setProperty(url.getServiceKey(), buf.toString());
            long version = lastCacheChanged.incrementAndGet();
            if (syncSaveFile) {
                doSaveProperties(version);
            } else {
                registryCacheExecutor.execute(new SaveProperties(version));
            }
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
    }

保证接口或父类逻辑一定执行的方式是,在子类中扩展时使用super()方式进行扩展,要求继承的方法是增量式更新;

public FailbackRegistry(URL url) {
        super(url);
        this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);
        // since the retry task will not be very much. 128 ticks is enough.
        retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
    }

扩展给外部的部分:实际注册、注销、订阅、取关等具体的实际操作留给外部做扩展。

  • 高性能:优先读取本地配置文件服务信息,启动时全量拉去服务列表(有注册的服务全量列表),根据订阅的服务,通过注册中心通知变更,由客户端接收消息后主动从注册中心拉去服务,

待确认:本地配置文件什么时候更新?–provider有更新通知时

/**
     * Notify changes from the Provider side.
     *
     * @param url      consumer side url
     * @param listener listener
     * @param urls     provider latest urls
     */
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        if ((CollectionUtils.isEmpty(urls))
                && !ANY_VALUE.equals(url.getServiceInterface())) {
            logger.warn("Ignore empty notify urls for subscribe url " + url);
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
        }
        // keep every provider's category.
        Map<String, List<URL>> result = new HashMap<>();
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
                List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            listener.notify(categoryList);
            // We will update our cache file after each notification.
            // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
            saveProperties(url);
        }
    }

注册中心异常处理及重试机制

protected void addFailedSubscribed(URL url, NotifyListener listener) {
        Holder h = new Holder(url, listener);
        FailedSubscribedTask oldOne = failedSubscribed.get(h);
        if (oldOne != null) {
            return;
        }
        FailedSubscribedTask newTask = new FailedSubscribedTask(url, this, listener);
        oldOne = failedSubscribed.putIfAbsent(h, newTask);
        if (oldOne == null) {
            // never has a retry task. then start a new task for retry.
            retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
        }
    }

异常重试机制,本地服务文件的更新加锁机制以及版本号维护,zk依赖于zk集群的最终一致性特性,redis依赖定时任务重试补偿机制

服务信息序列化:定义统一的服务格式。

如果使用的注册中心不一样,一个zookeeper一个redis,还能进行通讯吗?

如果两个注册中心的配置信息做协议转换,可以进行通讯。

  • 注册中心解决的本质问题是什么?有哪些借鉴意义?
dubbo系列之注册中心

注册中心本质是解决consumer和provider数据消息同步的问题。

  1. 关于消息同步的设计:

目前消息同步主要是以拉和推两种方式,因为注册中心面对的是所有consumer和provider,而consumer在初次启动时不对外提供服务且所需关注的数据只是注册中心中一部分数据,所以采用客户端拉取数据的方式同步数据,为了更高性能且根据服务ip一般不会轻易变化的特点,采用客户端缓存文件方式进行优化读取配置服务性;根据订阅provider变更消息,重新拉取注册中心的数据,并同步到本地配置文件中。

2.关于架构设计:

注册中心模块大量采用了增量扩展的方式,将注册中心通用功能进行分层次的抽象设计,只对外提供具体操作的接口,这样充分保证了整个模块功能的统一和扩展性。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/4284.html

(0)
编程小号编程小号
上一篇 2022-12-26
下一篇 2023-08-05

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注