前言

前面两篇基本上已经对dubbo的SPI,服务发布,注册等功能进行了分析,那么在消费端是如何发现服务,并进行透明的远程调用的呢?带着这个疑问,走入今天的篇章,Dubbo的服务发现

服务发现的流程

在我们具备的知识体系中,服务的发现应该是会有以下几个步骤的

  • 消费端服务启动时,根据服务接口及注解指定的版本,注册中心信息封装成服务名
  • 通过配置的注册中心,传入服务名远程获取服务提供者的真实地址
  • 为远程服务接口生成代理对象,代理对象保存远程服务的地址和端口
  • 调用接口是通过代理对象的方法,通过远程的ip和端口,封装服务名,方法,方法参数,通过socket发起远程访问

Dubbo中的服务发现

Dubbo与Spring的整合

与服务的发布和注册类似,服务的发现是通过一个ReferenceAnnotationBeanPostProcessor的后置处理器来对bean依赖进行处理的。,这里不再重复注入BeanPostProcessor的相关流程,直接进入到ReferenceAnnotationBeanPostProcessor的构造方法中

  public ReferenceAnnotationBeanPostProcessor() {
        super(DubboReference.class, Reference.class, 	  		             com.alibaba.dubbo.config.annotation.Reference.class);
    }

他像其父类传递了SpringBean的依赖上需要扫描的注解,有如上三种。

父类会将传入的三种注解保存到AbstractAnnotationBeanPostProcessor数组中。该类又继承了InstantiationAwareBeanPostProcessorAdapter类,InstantiationAwareBeanPostProcessorAdapter又实现了InstantiationAwareBeanPostProcessor接口,这个接口中存在一个postProcessPropertyValues方法,该方法会在bean的属性赋值之前被回调,且该方法会返回一个PropertyValues的对象,而该对象是对Spring中Bean的属性的封装。而该方法在AbstractAnnotationBeanPostProcessor类中被重写,所以实际执行的是AbstractAnnotationBeanPostProcessor类中的postProcessPropertyValues方法,进入该方法

@Override
public PropertyValues postProcessPropertyValues(
        PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException {

    InjectionMetadata metadata = findInjectionMetadata(beanName, bean.getClass(), pvs);
    try {
        metadata.inject(bean, beanName, pvs);
    } catch (BeanCreationException ex) {
        throw ex;
    } catch (Throwable ex) {
        throw new BeanCreationException(beanName, "Injection of @" + getAnnotationType().getSimpleName()
                + " dependencies is failed", ex);
    }
    return pvs;
}
  • 调用findInjectionMetadata(beanName, bean.getClass(), pvs);获取一个InjectionMetadata对象,该对象一些列操作之后会调用一个
  • 调用metadata.inject(bean, beanName, pvs);进行依赖注入。

首先进入findInjectionMetadata方法

private InjectionMetadata findInjectionMetadata(String beanName, Class<?> clazz, PropertyValues pvs) {
    // Fall back to class name as cache key, for backwards compatibility with custom callers.
    String cacheKey = (StringUtils.hasLength(beanName) ? beanName : clazz.getName());
    // Quick check on the concurrent map first, with minimal locking.
    AbstractAnnotationBeanPostProcessor.AnnotatedInjectionMetadata metadata = this.injectionMetadataCache.get(cacheKey);
    if (InjectionMetadata.needsRefresh(metadata, clazz)) {
        synchronized (this.injectionMetadataCache) {
            metadata = this.injectionMetadataCache.get(cacheKey);
            if (InjectionMetadata.needsRefresh(metadata, clazz)) {
                if (metadata != null) {
                    metadata.clear(pvs);
                }
                try {
                    metadata = buildAnnotatedMetadata(clazz);
                    this.injectionMetadataCache.put(cacheKey, metadata);
                } catch (NoClassDefFoundError err) {
                    throw new IllegalStateException("Failed to introspect object class [" + clazz.getName() +
                            "] for annotation metadata: could not find class that it depends on", err);
                }
            }
        }
    }
    return metadata;
}
  • 根据当前bean的beanName或类名作为key到injectionMetadataCache查找是否存在metadata,存在则返回
  • 否则一些列判断,刷新之后,调用buildAnnotatedMetadata(clazz);方法创建一个Metadata
  • 将创建的Metadata放入缓存中

进入到buildAnnotatedMetadata(clazz);方法

private AbstractAnnotationBeanPostProcessor.AnnotatedInjectionMetadata buildAnnotatedMetadata(final Class<?> beanClass) {
    Collection<AbstractAnnotationBeanPostProcessor.AnnotatedFieldElement> fieldElements = findFieldAnnotationMetadata(beanClass);
    Collection<AbstractAnnotationBeanPostProcessor.AnnotatedMethodElement> methodElements = findAnnotatedMethodMetadata(beanClass);
    return new AbstractAnnotationBeanPostProcessor.AnnotatedInjectionMetadata(beanClass, fieldElements, methodElements);
}
  • 调用findFieldAnnotationMetadata(beanClass);方法得到该bean中属性的集合
  • 调用findAnnotatedMethodMetadata(beanClass);方法得到该bean中方法的集合
  • 使用得到的集合和bean的class对象构建一个AnnotatedInjectionMetadata对象

首先看看会得到一些什么样的属性,进入到findFieldAnnotationMetadata(beanClass);方法中

private List<AbstractAnnotationBeanPostProcessor.AnnotatedFieldElement> findFieldAnnotationMetadata(final Class<?> beanClass) {

        final List<AbstractAnnotationBeanPostProcessor.AnnotatedFieldElement> elements = new LinkedList<AbstractAnnotationBeanPostProcessor.AnnotatedFieldElement>();

        ReflectionUtils.doWithFields(beanClass, new ReflectionUtils.FieldCallback() {
            @Override
            public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {

                for (Class<? extends Annotation> annotationType : getAnnotationTypes()) {

                    AnnotationAttributes attributes = getAnnotationAttributes(field, annotationType, getEnvironment(), true, true);

                    if (attributes != null) {

                        if (Modifier.isStatic(field.getModifiers())) {
                            if (logger.isWarnEnabled()) {
                                logger.warn("@" + annotationType.getName() + " is not supported on static fields: " + field);
                            }
                            return;
                        }

                        elements.add(new AnnotatedFieldElement(field, attributes));
                    }
                }
            }
        });

        return elements;

    }

根据传入的bean的class对象获取bean的所有属性。筛选出属性上标注了初始化processor中传入的过滤注解,它保存在annotationTypes集合中的,通过回调方法,遍历annotationTypes集合,获取匹配到的注解属性的注解上的所有参数。使用注解参数和属性对象构建一个AnnotatedFieldElement类型的对象,存入到集合中,最后返回集合。

buildAnnotatedMetadata方法中的findAnnotatedMethodMetadata对于方法的处理与属性类似,这里就不再叙述,最后在该方法中构建一个使用方法的注解元数据集合与上的注解元数据集合构建一个AnnotatedInjectionMetadata类型的对象,返回。

所以,在findInjectionMetadata返回的metadate对象实际是AnnotatedInjectionMetadata类型的对象。

因为在postProcessPropertyValues方法中调用了metadata.inject(bean, beanName, pvs);代码,所以这里将直接进入到内部类AnnotatedInjectionMetadata,发现它并没有实现inject方法,但是由于它继承了InjectionMetadata类,所以我们可以往上找它的inject方法

public void inject(Object target, @Nullable String beanName, @Nullable PropertyValues pvs) throws Throwable {
    Collection<InjectionMetadata.InjectedElement> checkedElements = this.checkedElements;
    Collection<InjectionMetadata.InjectedElement> elementsToIterate = checkedElements != null ? checkedElements : this.injectedElements;
    InjectionMetadata.InjectedElement element;
    if (!((Collection)elementsToIterate).isEmpty()) {
        for(Iterator var6 = ((Collection)elementsToIterate).iterator(); var6.hasNext(); element.inject(target, beanName, pvs)) {
            element = (InjectionMetadata.InjectedElement)var6.next();
            if (logger.isTraceEnabled()) {
                logger.trace("Processing injected element of bean '" + beanName + "': " + element);
            }
        }
    }

}

该方法会遍历构建对象时传入的InjectedElement类型的集合,该集合保存的是传入bean的每个属性对象及其标注的注解的元信息。然后调用该对象的inject方法

进入到InjectedElementinject方法

 protected void inject(Object bean, String beanName, PropertyValues pvs) throws Throwable {
            Class<?> injectedType = this.field.getType();
            Object injectedObject = AbstractAnnotationBeanPostProcessor.this.getInjectedObject(this.attributes, bean, beanName, injectedType, this);
            ReflectionUtils.makeAccessible(this.field);
            this.field.set(bean, injectedObject);
        }
  • 获取属性的字节码文件
  • 调用AbstractAnnotationBeanPostProcessorgetInjectedObject方法获取到要依赖注入的属性的对象
  • 修改属性的访问权限
  • 为属性调用set方法赋值,完成依赖的注入

这里主要就需要探讨这个被依赖的对象是从何而来,进入到getInjectedObject方法中

protected Object getInjectedObject(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType, InjectedElement injectedElement) throws Exception {
    String cacheKey = this.buildInjectedObjectCacheKey(attributes, bean, beanName, injectedType, injectedElement);
    Object injectedObject = this.injectedObjectsCache.get(cacheKey);
    if (injectedObject == null) {
        injectedObject = this.doGetInjectedBean(attributes, bean, beanName, injectedType, injectedElement);
        this.injectedObjectsCache.putIfAbsent(cacheKey, injectedObject);
    }

    return injectedObject;
}

通过注解属性,属性class对象,bean名称,bean对象等封装成一个缓存key,从缓存中获取依赖注入的对象,保证单例,不存在则调用当前对象的doGetInjectedBean方法。该方法是一个抽象方法,去看它的子类实现,所以就回到了我们最开始注入的ReferenceAnnotationBeanPostProcessor的类中,进入doGetInjectedBean方法

protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
                                   InjectionMetadata.InjectedElement injectedElement) throws Exception {
    /**
     * The name of bean that annotated Dubbo's {@link Service @Service} in local Spring {@link ApplicationContext}
     */
  // 本地服务
    String referencedBeanName = buildReferencedBeanName(attributes, injectedType);

    /**
     * The name of bean that is declared by {@link Reference @Reference} annotation injection
     */
    String referenceBeanName = getReferenceBeanName(attributes, injectedType);

    ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType);

    boolean localServiceBean = isLocalServiceBean(referencedBeanName, referenceBean, attributes);

    registerReferenceBean(referencedBeanName, referenceBean, attributes, localServiceBean, injectedType);

    cacheInjectedReferenceBean(referenceBean, injectedElement);

    return getOrCreateProxy(referencedBeanName, referenceBean, localServiceBean, injectedType);
}
  • 根据注解和依赖的接口的字节码文件,封装一个referencedBeanName,这个名字就是上篇中在服务注册时注册到bean容器中的名字:ServiceBean:com.wangx.dubbo.api.HelloService:2.0.1,这里主要是为了,如果当前的服务提供者在本地进程中时,可以直接调用
  • 根据注解和依赖的接口的字节码文件,封装一个referenceBeanName,这是一个远程调用的名称
  • 如果不存在,创建一个referenceBean
  • 判断是否是本非服务
  • 注册一个referenceBean到spring中
  • 缓存referenceBean和需要依赖注入的属性的injectedElement对象
  • 调用getOrCreateProxy(referencedBeanName, referenceBean, localServiceBean, injectedType);方法创建或返回一个代理对象

首先讨论怎么创建一个ReferenceBean,其实这里就跟服务注册有类似的设计,服务注册和发布时注册到容器中的是一个ServiceBean,这里是ReferenceBean,正好可以形成对应

进入buildReferenceBeanIfAbsent方法

private ReferenceBean buildReferenceBeanIfAbsent(String referenceBeanName, AnnotationAttributes attributes,
                                                 Class<?> referencedType)
        throws Exception {

    ReferenceBean<?> referenceBean = referenceBeanCache.get(referenceBeanName);

    if (referenceBean == null) {
        ReferenceBeanBuilder beanBuilder = ReferenceBeanBuilder
                .create(attributes, applicationContext)
                .interfaceClass(referencedType);
        referenceBean = beanBuilder.build();
        referenceBeanCache.put(referenceBeanName, referenceBean);
    } else if (!referencedType.isAssignableFrom(referenceBean.getInterfaceClass())) {
        throw new IllegalArgumentException("reference bean name " + referenceBeanName + " has been duplicated, but interfaceClass " +
                referenceBean.getInterfaceClass().getName() + " cannot be assigned to " + referencedType.getName());
    }
    return referenceBean;
}

先从缓存中获取,不存在则创建一个referenceBean对象,并对它的属性进行赋值,比如保存注解上的参数,依赖注入的接口的字节码等

回到doGetInjectedBean方法中,调用registerReferenceBean方法将referenceBean注册到容器中。

进入到registerReferenceBean方法

private void registerReferenceBean(String referencedBeanName, ReferenceBean referenceBean,
                                   AnnotationAttributes attributes,
                                   boolean localServiceBean, Class<?> interfaceClass) {

    ConfigurableListableBeanFactory beanFactory = getBeanFactory();

    String beanName = getReferenceBeanName(attributes, interfaceClass);

    if (localServiceBean) {  // If @Service bean is local one
        /**
         * Get  the @Service's BeanDefinition from {@link BeanFactory}
         * Refer to {@link ServiceAnnotationBeanPostProcessor#buildServiceBeanDefinition}
         */
        AbstractBeanDefinition beanDefinition = (AbstractBeanDefinition) beanFactory.getBeanDefinition(referencedBeanName);
        RuntimeBeanReference runtimeBeanReference = (RuntimeBeanReference) beanDefinition.getPropertyValues().get("ref");
        // The name of bean annotated @Service
        String serviceBeanName = runtimeBeanReference.getBeanName();
        // register Alias rather than a new bean name, in order to reduce duplicated beans
        beanFactory.registerAlias(serviceBeanName, beanName);
    } else { // Remote @Service Bean
        if (!beanFactory.containsBean(beanName)) {
            beanFactory.registerSingleton(beanName, referenceBean);
        }
    }
}

得到beanFactory

获取bean名称,如果是本非服务调用,则直接通过上一步骤获取到的referencedBeanName获取到一个BeanDefinition.这里会获取到的是ServiceBean的bean定义,然后在过去定义中的参数中的ref参数,该参数之前已经讲过,是服务提供者的具体实现,然后将该对象注入打bean的容器中,使用别名的方式

如果是远程,则直接将我们referenceBean对象注入到容器中。

接下来进入到getOrCreateProxy方法中,因为这里返回的对象,将是作为依赖注入的对象,所以需要看看,该方法做了些什么事

private Object getOrCreateProxy(String referencedBeanName, ReferenceBean referenceBean, boolean localServiceBean,
                                Class<?> serviceInterfaceType) {
    if (localServiceBean) { // If the local @Service Bean exists, build a proxy of Service
        return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType},
                newReferencedBeanInvocationHandler(referencedBeanName));
    } else {
        exportServiceBeanIfNecessary(referencedBeanName); // If the referenced ServiceBean exits, export it immediately
        return 
    }
}
  • 创建一个本地实现的代理对象,该方法会创建一个ReferencedBeanInvocationHandler此方法是Dubbo本地代理的一种实现,主要通过传入的bean名称获取到ioc容器中的ServiceBean,然后调用它的getRef方法获取实际实现,作为Bean,然后执行bean的对应的方法。这里就不详细说明了
  • 如果该方法的serviceBean存在,发布它,这里是发布服务的流程,也不看了
  • 返回referenceBean.get();方法获取到的对象

到这里,有关Spring的部分就讨论完毕了,接下来就计入到dubbo的领域里,看看服务是怎么被发现的,也就是这个代理的对象是怎么被生成的,生成对象时多了什么

Dubbo中的服务发现

接下来主要探讨referenceBean.get();方法返回了什么对象,这个对象哪里来的

进入该方法。

public synchronized T get() {
    if (destroyed) {
        throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
    }
    if (ref == null) {
        init();
    }
    return ref;
}
  • 判断是否已经销毁
  • 判断是否存在ref,存在则直接返回
  • 否则就执行init()方法,基本上可以知道,当ref为null时,会在init进行初始化

进入到init()方法,该方法会组装dubbo客户端的请求参数的map集合,包含了本机的ip等,通过参数map创建一个对象,赋值给到ref.语句如下:

ref = createProxy(map);

为ServiceMetadata赋值,ConsumerModer复制,标识已经初始化,分发事件等。

这里主要只会讨论代理对象是如何创建的,所以进入大createProxy(map);方法

private T createProxy(Map<String, String> map) {
    if (shouldJvmRefer(map)) {
        URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
        invoker = REF_PROTOCOL.refer(interfaceClass, url);
        if (logger.isInfoEnabled()) {
            logger.info("Using injvm service " + interfaceClass.getName());
        }
    } else {
        urls.clear();
        if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
            String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    if (StringUtils.isEmpty(url.getPath())) {
                        url = url.setPath(interfaceName);
                    }
                    if (UrlUtils.isRegistry(url)) {
                        urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                    } else {
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }
        } else { // assemble URL from register center's configuration
            // if protocols not injvm checkRegistry
            if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                checkRegistry();
                List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                if (CollectionUtils.isNotEmpty(us)) {
                    for (URL u : us) {
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                        if (monitorUrl != null) {
                            map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }
        }

        if (urls.size() == 1) {
            invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
        } else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            for (URL url : urls) {
                invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                if (UrlUtils.isRegistry(url)) {
                    registryURL = url; // use last registry url
                }
            }
            if (registryURL != null) { // registry url is available
                // for multi-subscription scenario, use 'zone-aware' policy by default
                URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
                // The invoker wrap relation would be like: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                invoker = CLUSTER.join(new StaticDirectory(u, invokers));
            } else { // not a registry url, must be direct invoke.
                invoker = CLUSTER.join(new StaticDirectory(invokers));
            }
        }
    }

    if (shouldCheck() && !invoker.isAvailable()) {
        invoker.destroy();
        throw new IllegalStateException("Failed to check the status of the service "
                + interfaceName
                + ". No provider available for the service "
                + (group == null ? "" : group + "/")
                + interfaceName +
                (version == null ? "" : ":" + version)
                + " from the url "
                + invoker.getUrl()
                + " to the consumer "
                + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
    }
    if (logger.isInfoEnabled()) {
        logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
    }
    /**
     * @since 2.7.0
     * ServiceData Store
     */
    String metadata = map.get(METADATA_KEY);
    WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
    if (metadataService != null) {
        URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
        metadataService.publishServiceDefinition(consumerURL);
    }
    // create service proxy
    return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
  • 判断是否是injvm的服务,这里不详细做讨论
  • 是否已经在服务端手动指定了服务提供者url.这是dubbo中点对点的方式,这里也不做讨论,主要看的是dubbo如何从注册中心拉取服务的
  • 前面两种情况都排除之后,就是注册中心实现的方式,由于dubbo支持多个注册中心,所以这里会是一个urls
  • 获取到注册中心url之后,如果只有一个,直接生成invoker,否则遍历urls,获取到多个注册中心的url,根据注册中心的url,生成invoker,并将多个invokers构建一个StaticDirectory类型的对象,通过invoker = CLUSTER.join(new StaticDirectory(invokers));构建一个invoker保存起来
  • 保存元数据信息
  • 使用invoker获得一个代理对象,所以不论在服务端还是消费端,都是以invoker贯穿整个dubbo的执行的上下文的

Dubbo中的Directory

在进行进一步分析之前,先对Dubbo中的Directory做一个介绍,它在Dubbo中的类图如下所示

在这里插入图片描述

在dubbo中,Directory是用来保存注册中心或服务提供者的远程信息的

可以看到,在Directory的体系之下,最终存在两类Directory,一类是RegistryDirectory,一类是StaticDirectory,顾名思义,前一类是保存到是服务注册中心获取到的远程提供者的信息,而后一类则是静态的,通过静态配置获取到的远程信息。

因为多个注册中心的远程地址是本地配置的,而远程的消息提供者地址是远程获取的,每个注册中心都可以获取到多个服务提供者地址,所以基于dubbo多注册中心的场景下,可能会出现如下的负载均衡的方式

在这里插入图片描述

该图表达的意思是,在方法被调用时,会负载均衡获取到对应的注册中心,然后从注册中心在进行负载均衡,获取到注册该注册中心的服务提供者,选择一个进行远程调用。

在介绍完完Directory及在服务调用端的负载均衡的方案之后,接下来就继续分析。这里我们选择多个注册中心开始,去了解是如何生成一个invoker的。所以现在主要看如下代码:

List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
    invokers.add();
    if (UrlUtils.isRegistry(url)) {
        registryURL = url; // use last registry url
    }
}
if (registryURL != null) { // registry url is available
    // for multi-subscription scenario, use 'zone-aware' policy by default
    URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
    // The invoker wrap relation would be like: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
    invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
    invoker = CLUSTER.join(new StaticDirectory(invokers));
}
  • 构建一个invokers的list
  • 遍历多个注册中心地址,根据注册中心地址,获取到一个invoker
  • 将invoke添加到invokers
  • 使用invokers构建StaticDirectory,使用CLUSTER.join(new StaticDirectory(u, invokers));生成一个invoker.赋值给到invoker属性。

首先来看第一步,怎么生成一个注册中心的Invoker的.

分析REF_PROTOCOL.refer(interfaceClass, url)这段代码。这里其实已经很熟悉了,这是一个Protocol自适应的扩展点。将会通过传入的url动态获取具体的Protocol。根据前面的经验,这里是一个registry的协议,所以这里会获取到RegistryProtocol。进入到RegistryProtocolrefer的方法

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    url = getRegistryUrl(url);
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // group="a,b" or group="*"
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
    String group = qs.get(GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    return doRefer(cluster, registry, type, url);
}
  • 获取真实的注册中心协议地址URL,比如此时使用的注册中心是zookeeper时:zookeeper://localhost:2181/org.apache.dubbo.registry.RegistryService?application=spring-cloud-alibaba-boot-dubbo-consumer&default=true&dubbo=2.0.2&pid=71901&qos.enable=false&refer=application=spring-cloud-alibaba-boot-dubbo-consumer&dubbo=2.0.2&init=false&interface=com.wangx.dubbo.api.HelloService&methods=hello&pid=71901&qos.enable=false&register.ip=192.168.0.3&release=2.7.7&revision=2.0.1&side=consumer&sticky=false&timestamp=1598689366854&version=2.0.1&release=2.7.7&timeout=60000&timestamp=1598689366867
  • 根据url获取Registry,已经可以判断,实际获取到registry是ZookeeperRegistry
  • 判断是否分组,最终调用doRefer方法进行服务发现

进入doRefer方法

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (directory.isShouldRegister()) {
        directory.setRegisteredConsumerUrl(subscribeUrl);
        registry.register(directory.getRegisteredConsumerUrl());
    }
  // 构建路由链
    directory.buildRouterChain(subscribeUrl);
    directory.subscribe(toSubscribeUrl(subscribeUrl));

    Invoker<T> invoker = cluster.join(directory);
    List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
    if (CollectionUtils.isEmpty(listeners)) {
        return invoker;
    }

    RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
    for (RegistryProtocolListener listener : listeners) {
        listener.onRefer(this, registryInvokerWrapper);
    }
    return registryInvokerWrapper;
}
  • 创建一个RegistryDirectory对象
  • 将注册中心和protocol扩展点传入RegistryDirectory对象中
  • 构建一个订阅端的url
  • 判断是否需要将消费端注册到注册中心
  • 构建一个服务路由链
  • 订阅服务变更
  • 将创建的RegistryDirectory对象传入join方法中,返回一个invoker
  • 如果对监听,则对invoker进行包装,构建一个RegistryInvokerWrapper的对象,唤醒监听
  • 返回包装后的invoker

这里首先研究subscribe方法,根据该方法名称,可以猜测应该是订阅服务提供者变动的方法。

首先进入toSubscribeUrl方法看看该方法做了些什么,该方法只是对url的参数进行了封装。进入到directorysubscribe方法

public void subscribe(URL url) {
    setConsumerUrl(url);
    CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
    serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
    registry.subscribe(url, this);
}
  • 添加监听提醒
  • 构建一个配置的监听器
  • 调用registrysubscribe(url, this)方法

这里我们已经知道了获取到的是ZookeeperRegistry,现在进入到该类,发现该类并没有subscribe(url, this)方法,那么进入其父类FailbackRegistry中。进入subscribe方法

super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
    // Sending a subscription request to the server side
    doSubscribe(url, listener);
} catch (Exception e) {
    Throwable t = e;

    List<URL> urls = getCacheUrls(url);
    if (CollectionUtils.isNotEmpty(urls)) {
        notify(url, listener, urls);
        logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
    } else {
        // If the startup detection is opened, the Exception is thrown directly.
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true);
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }
    }

    // Record a failed registration request to a failed list, retry regularly
    addFailedSubscribed(url, listener);
}
  • 调用父类的subscribe方法,该方法主要保存传入的listener
  • 移除失败的订阅
  • 调用doSubscribe(url, listener);执行真正的订阅

进入到doSubscribe方法,它是一个抽象方法,所以我们直接进入到ZookeeperRegistry中的doSubscribe方法

public void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        if (ANY_VALUE.equals(url.getServiceInterface())) {
            String root = toRootPath();
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
            ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
                for (String child : currentChilds) {
                    child = URL.decode(child);
                    if (!anyServices.contains(child)) {
                        anyServices.add(child);
                        subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                Constants.CHECK_KEY, String.valueOf(false)), k);
                    }
                }
            });
            zkClient.create(root, false);
            List<String> services = zkClient.addChildListener(root, zkListener);
            if (CollectionUtils.isNotEmpty(services)) {
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                }
            }
        } else {
            List<URL> urls = new ArrayList<>();
            for (String path : toCategoriesPath(url)) {
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                zkClient.create(path, false);
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

是否是获取到所有的服务,如果是就获取根路径下的所有节点,并监听root节点。这里显然不是。

直接进入到else中的逻辑

获取zk中的不同类型的path,在zk中会存在providersconsumersroutersconfigurators等四种类型,path格式如下

/dubbo/com.wangx.dubbo.api.HelloService/providers。这就构建成了一个服务在zk中的节点作为一个服务。

遍历四种类型的path,使用监听器监听该path的子节点的变化,根据zk的节点特性,当该路径下的子节点出现变动时,是触发监听,并将变化的子节点回调回来,这里的每个子节点就是服务提供者的节点的url,如下

dubbo://192.168.0.3:20880/com.wangx.dubbo.api.HelloService?anyhost=true&application=spring-cloud-alibaba-boot-dubbo-provider&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.wangx.dubbo.api.HelloService&methods=hello&pid=72766&release=2.7.7&revision=2.0.1&side=provider&timestamp=1598693868269&version=2.0.1

解析该url,然后回调方法中调用notify方法

进入到notify方法,此方法还是存在于父类ZookeeperRegistry

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    try {
        doNotify(url, listener, urls);
    } catch (Exception t) {
        // Record a failed registration request to a failed list, retry regularly
        addFailedNotified(url, listener, urls);
        logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
    }
}

该方法接收到了服务端的url,传入的监听器,服务提供者的url列表

做一系列判断,调用doNotify方法进行执行,失败则执行失败的监听,在doNotify中,将调用父类的notify方法,进入到AbstractRegistry中的notify方法

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    if ((CollectionUtils.isEmpty(urls))
            && !ANY_VALUE.equals(url.getServiceInterface())) {
        logger.warn("Ignore empty notify urls for subscribe url " + url);
        return;
    }
    if (logger.isInfoEnabled()) {
        logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
    }
    // keep every provider's category.
    Map<String, List<URL>> result = new HashMap<>();
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
            List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        listener.notify(categoryList);
        // We will update our cache file after each notification.
        // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
        saveProperties(url);
    }
}

进行一些列判断,获取不同类型下的url列表,获取不同类型的列表,将url列表传入到传入的监听器的notify中方法进行执行。因为在RegistryDirectory调用registrysubscribe方法时传的是this,而且RegistryDirectory实现了NotifyListener接口,所以它本身就是一个监听器。

调用saveProperties(url);方法,将一个服务提供者url保存到本地,当注册中心不可用时,至少能够返回一个本地缓存的url

进入到RegistryDirectorynotify方法。

public synchronized void notify(List<URL> urls) {
    Map<String, List<URL>> categoryUrls = urls.stream()
            .filter(Objects::nonNull)
            .filter(this::isValidCategory)
            .filter(this::isNotCompatibleFor26x)
            .collect(Collectors.groupingBy(this::judgeCategory));

    List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
    this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

    List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
    toRouters(routerURLs).ifPresent(this::addRouters);

    // providers
    List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
    /**
     * 3.x added for extend URL address
     */
    ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
    List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
    if (supportedListeners != null && !supportedListeners.isEmpty()) {
        for (AddressListener addressListener : supportedListeners) {
            providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
        }
    }
    refreshOverrideAndInvoker(providerURLs);
}

处理注册中心获取到的不同类型的url.这里只关注providers类型的url,因为它是我们节点发布的真正的服务url,存在监听,则通过扩展点获取所有的地址监听去进行唤醒。

然后进入到refreshOverrideAndInvoker方法,刷新或者覆盖Invoker,之前我们说过,Invoker在dubbo中是贯穿整个上下文的,所以我们猜想这里也会将urls转成一个个的invoker

调用refreshInvoker(urls);方法刷新invoker

private void refreshInvoker(List<URL> invokerUrls) {
    Assert.notNull(invokerUrls, "invokerUrls should not be null");

    if (invokerUrls.size() == 1
            && invokerUrls.get(0) != null
            && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        this.forbidden = true; // Forbid to access
        this.invokers = Collections.emptyList();
        routerChain.setInvokers(this.invokers);
        destroyAllInvokers(); // Close all invokers
    } else {
        this.forbidden = false; // Allow to access
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        if (invokerUrls == Collections.<URL>emptyList()) {
            invokerUrls = new ArrayList<>();
        }
        if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
            this.cachedInvokerUrls = new HashSet<>();
            this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
        }
        if (invokerUrls.isEmpty()) {
            return;
        }
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

        /**
         * If the calculation is wrong, it is not processed.
         *
         * 1. The protocol configured by the client is inconsistent with the protocol of the server.
         *    eg: consumer protocol = dubbo, provider only has other protocol services(rest).
         * 2. The registration center is not robust and pushes illegal specification data.
         *
         */
        if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
                    .toString()));
            return;
        }

        List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
        // pre-route and build cache, notice that route cache should build on original Invoker list.
        // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
        routerChain.setInvokers(newInvokers);
        this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
        this.urlInvokerMap = newUrlInvokerMap;

        try {
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}

由于时间关系。我们只对主要流程进行分析

  • 获取老的invoker的map
  • 调用toInvokers获取新的invoker的newUrlInvokerMap
  • newUrlInvokerMap赋值给urlInvokerMap这里保存的就是每个服务提供者的信息了。后面进行负载均衡时,将会调用该集合中的具体的invoker进行执行

进入到toInvokers方法中,方法较长,这里就不贴了,一些列遍历,判断之后,最终会走到invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);语句。构建一个invoker,然后以url作为key,invoker为value保存到map中,返回map。

主要分析invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);语句,根据SPI的经验,这里会获取到的是DubboProtocol,因为从注册中心获取到的url就是dubbo协议的,进入该类的refer方法,该类不存在,则进入其父类的refer方法。进入到AbstractProtocol类中的refer方法

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
  • 调用protocolBindingRefer(type, url)构建一个invoker
  • 将上一步构建的invoker传入AsyncToSyncInvoker构建一个异步转同步的invoker

直接进入到protocolBindingRefer方法,它是一个抽象方法,所以进入到子类DubboProtocol中的protocolBindingRefer方法中

public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);

    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);

    return invoker;
}
  • 序列化url
  • 调用getClients获取到ExchangeClient数组,这里就是根据配置的连接数,根据从注册中心获取到的url,使用netty的方式创建网络连接,每个服务提供者可能存在多个连接数,这里进行连接复用的优化,减少了在实际调用时重新建立连接的消耗
  • 构建一个DubboInvoker对象。将该对象保存到invokers集合中
  • 返回该对象

网络部分具体就不详细叙述了,分析到这里,我们获取到了当服务执行时最终建立网络连接的invoker。返回的Invoker会经过一些列的封装,最终的层次应该是这样的InvokerDelegate(AsyncToSyncInvoker(DubboInvoker()))。而进过一些列封装之后的信息,最终其实都是保存在RegistryDirectory对象中的。

现在重新回到RegistryProtocol中的doRefer方法中。这里通过Invoker<T> invoker = cluster.join(directory);又会返回一个Invoker,根据学习的扩展点的原理,这里获取到的是FailoverCluster而在该方法中并没有join方法,往父类去找。

进入AbstractCluster中的join方法。

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY));
}

先调用doJoin方法获取AbstractClusterInvoker类型的Invoker,该方法是一个抽象方法。进入FailoverCluster

@Override
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
    return new FailbackClusterInvoker<>(directory);
}

创建一个FailbackClusterInvoker类型的invoker,并保存了directory,该类的构造方法会调用顶级父类AbstractClusterInvoker的构造方法将url和directory进行保存。并在FailbackClusterInvoker构造方法中会根据url初始化重试次数,重试任务等等配置。其他的Cluster将会根据配置,然后根据url自动选择。回到AbstractCluster类中的join方法。调用buildClusterInterceptors方法,进行一些拦截器的绑定。

private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) {
    AbstractClusterInvoker<T> last = clusterInvoker;
    List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtension(clusterInvoker.getUrl(), key);

    if (!interceptors.isEmpty()) {
        for (int i = interceptors.size() - 1; i >= 0; i--) {
            final ClusterInterceptor interceptor = interceptors.get(i);
            final AbstractClusterInvoker<T> next = last;
            last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next);
        }
    }
    return last;
}

根据激活扩展点,获取到所有符合条件拦截器,遍历构建一条拦截器链。返回最后的链式invoker.最后返回到ReferenceBean中的createProxy方法中。根据返回的invoker封装的list。调用PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));语句创建代理对象

根据SPI原理,这里获取到的ProxyFactoryJavassistProxyFactory。进入到JavassistProxyFactorygetProxy(invoker,boolean)方法,在JavassistProxyFactory类中并没有该方法,所以需要去它的父类中找

进入到AbstractProxyFactory中的getProxy(invoker,boolean)方法

`getProxy(invoker, interfaces.toArray(new Class<?>[0]))public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
    Set<Class<?>> interfaces = new HashSet<>();

    String config = invoker.getUrl().getParameter(INTERFACES);
    if (config != null && config.length() > 0) {
        String[] types = COMMA_SPLIT_PATTERN.split(config);
        for (String type : types) {
            // TODO can we load successfully for a different classloader?.
            interfaces.add(ReflectUtils.forName(type));
        }
    }

    if (generic) {
        if (!GenericService.class.isAssignableFrom(invoker.getInterface())) {
            interfaces.add(com.alibaba.dubbo.rpc.service.GenericService.class);
        }

        try {
            // find the real interface from url
            String realInterface = invoker.getUrl().getParameter(Constants.INTERFACE);
            interfaces.add(ReflectUtils.forName(realInterface));
        } catch (Throwable e) {
            // ignore
        }
    }

    interfaces.add(invoker.getInterface());
    interfaces.addAll(Arrays.asList(INTERNAL_INTERFACES));

    return getProxy(invoker, interfaces.toArray(new Class<?>[0]));
}

根据invoker中的url获取到接口全类名,通过全类名获取到所有的接口的字节码对象。通过invoker和字节码对象调用getProxy(invoker, interfaces.toArray(new Class<?>[0]));创建代理对象,该方法是一个抽象方法,在JavassistProxyFactory中被实现。

进入到到JavassistProxyFactory中的getProxy(Invoker<T> invoker, Class<?>[] types);方法

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

根据我们对于动态代理的理解,可以知道,在方法执行时,最终将会执行InvokerInvocationHandler类中的invoke方法。

到这里,我们初始化基本上就完成了,已经成功的创建了一个动态代理对象并赋值给到spring bean中的依赖。

dubbo运行时的操作

前面说过,dubbo最终将会执行InvokerInvocationHandler中的invoke方法。所以现在直接进入该方法

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    if (parameterTypes.length == 0) {
        if ("toString".equals(methodName)) {
            return invoker.toString();
        } else if ("$destroy".equals(methodName)) {
            invoker.destroy();
            return null;
        } else if ("hashCode".equals(methodName)) {
            return invoker.hashCode();
        }
    } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
        return invoker.equals(args[0]);
    }
    RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
    String serviceKey = invoker.getUrl().getServiceKey();
    rpcInvocation.setTargetServiceUniqueName(serviceKey);
  
    if (consumerModel != null) {
        rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
        rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
    }

    return invoker.invoke(rpcInvocation).recreate();
}

该方法过滤掉object的原生方法。然后将服务的方法名,服务名,调用参数等封装成一个RpcInvocation。最后调用invoker的invoke方法进行执行。根据初始化时创建invoker的步骤,这里最先进入是FailbackClusterInvoker这个invoker.这里可以发现,模板方法在dubbo中被大量的运用,该方法在FailbackClusterInvoker中同样没有,进入到其父类AbstractClusterInvokerinvoke

public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();

    // binding attachments into invocation.
    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
    }

    List<Invoker<T>> invokers = list(invocation);
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}
  • 添加一些附着参数
  • 获取到当前invoker中的directory中的invoker的list
  • 初始化一个负载均衡的算法
  • 最后调用doInvoke方法执行真正的调用

因为我们在创建动态代理对象之前,这里传入的是一个StaticDirectory的对象,所以进入到该对象的list,该对象同样没有list方法,所以进入父类AbstractDirectory的list方法

@Override
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
    if (destroyed) {
        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }

    return doList(invocation);
}

判断是否被销毁,没有销毁,在调用doList(invocation);方法实际选择我们的invoker的list.这里基本已经可以猜测,它是抽象方法,且在其子类被实现,所以``StaticDirectorydoList`方法

protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
    List<Invoker<T>> finalInvokers = invokers;
    if (routerChain != null) {
        try {
            finalInvokers = routerChain.route(getConsumerUrl(), invocation);
        } catch (Throwable t) {
            logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
        }
    }
    return finalInvokers == null ? Collections.emptyList() : finalInvokers;
}

这里就是直接返回了在创建时传入的invoker的list,即是我们注册中心的invoker

然后AbstractClusterInvokerinvoke方法中。此时已经知道该invoker的list中包含的是注册中心的invoker.所以这里做的是注册中心的负载均衡。然后初始化一个负载均衡器,不指定就获取默认的随机均衡器,都是SPI的运用,这里就不再详细描述。最后调用doInvoke方法,直接进入到FailbackClusterInvokerdoInvoke方法

@Override
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    Invoker<T> invoker = null;
    try {
        checkInvokers(invokers, invocation);
        invoker = select(loadbalance, invocation, invokers, null);
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                + e.getMessage() + ", ", e);
        addFailed(loadbalance, invocation, invokers, invoker);
        return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
    }
}
  • 参数检查
  • 调用负载均衡器选择一个invoker
  • 调用选择到的invoker的invoke方法

这里将会随机选择,然后获取到一个注册中心的invoker,这里获取到还是一个FailbackClusterInvoker类型的对象,但是这里这里的FailbackClusterInvoker对象包含的Directory对象为RegistryDirecory。根据上一步的分析,这里将从RegistryDirecory中获取到指定注册中心中的所有的invokers.负载均衡,获取一个Invoker,根据前面的分析,这里获取到的是InvokerDelegate。而InvokerDelegate中包含的是DubboInvoker,所以直接进入到DubboInvoker的invoke方法

protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = calculateTimeout(invocation, methodName);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            ExecutorService executor = getCallbackExecutor(getUrl(), inv);
            CompletableFuture<AppResponse> appResponseFuture =
                    currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
            // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
            FutureContext.getContext().setCompatibleFuture(appResponseFuture);
            AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
            result.setExecutor(executor);
            return result;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

这里将会获取初始化时创建的client进行同行,如果允许重试,则使用线程池进行重试,将获取到的结果进行返回。

回到FailbackClusterInvoker的invoker方法,当调用失败时,调用addFailed方法进行处理。

进入addFailed方法。

private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
    if (failTimer == null) {
        synchronized (this) {
            if (failTimer == null) {
                failTimer = new HashedWheelTimer(
                        new NamedThreadFactory("failback-cluster-timer", true),
                        1,
                        TimeUnit.SECONDS, 32, failbackTasks);
            }
        }
    }
    RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
    try {
        failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
    } catch (Throwable e) {
        logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
    }
}

这里使用时间轮的方式进行服务的重试。

到此dubbo的服务发现就聊完了。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐