原创

Ribbon源码总结

Ribbon入门

Ribbon是什么?

提供云端负载均衡,有多种负载均衡策略可供选择,可配合服务发现和断路器使用。

如何使用?

1.加入依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</dependency>

2.启动类加上RestTemplate

@Bean
@LoadBalanced
public RestTemplate restTemplate(){
    return new RestTemplate();
}

3.业务层中使用

先注入RestTemplateLoadBalancerClient方法:

@Autowired
private RestTemplate restTemplate;

@Autowired
private LoadBalancerClient loadBalancerClient;
  1. 【推荐】使用RestTemplate
  2. 直接使用Ribbon的API:

Ribbon执行流程图

原生Ribbon原生执行流程

一般SpringCloud Ribbon和Eureka的简单流程

SpringCloud Ribbon和Eureka的详细整体流程

服务列表更新流程

服务列表更新是通过组件ServerListUpdater定时拉取,也是使用PollingServerListUpdater,与原生的区别在于ServerList的不同。springcloud集成eureka后,ServerList组件有变化,会通过EurekaClient从eurekaServer拉取服务列表。 具体流程如下:

IPing组件执行流程

ILoadBalancer的初始化过程中,会启动定时器,定时的检查Server是否还存活。原生ribbon默认的是不执行参照,springcloud集成eureka后,会从EurekaClient中获取服务实例的状态,从而判断是否存活。 流程如下:

源码分析

如何实现Ribbon的负载均衡自动化配置?

我查阅源码,发现LoadBalancerAutoConfiguration为实现客户端负载均衡器的自动化配置类。

package org.springframework.cloud.client.loadbalancer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.web.client.RestTemplate;

@Configuration
@ConditionalOnClass({RestTemplate.class})
@ConditionalOnBean({LoadBalancerClient.class})
@EnableConfigurationProperties({LoadBalancerRetryProperties.class})
public class LoadBalancerAutoConfiguration {
    @LoadBalanced
    @Autowired(
        required = false
    )
    private List<RestTemplate> restTemplates = Collections.emptyList();
    @Autowired(
        required = false
    )
    private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();

    public LoadBalancerAutoConfiguration() {
    }

    @Bean
    public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
        return () -> {
            restTemplateCustomizers.ifAvailable((customizers) -> {
                Iterator var2 = this.restTemplates.iterator();

                while(var2.hasNext()) {
                    RestTemplate restTemplate = (RestTemplate)var2.next();
                    Iterator var4 = customizers.iterator();

                    while(var4.hasNext()) {
                        RestTemplateCustomizer customizer = (RestTemplateCustomizer)var4.next();
                        customizer.customize(restTemplate);
                    }
                }

            });
        };
    }

    @Bean
    @ConditionalOnMissingBean
    public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
        return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
    }

    @Configuration
    @ConditionalOnClass({RetryTemplate.class})
    public static class RetryInterceptorAutoConfiguration {
        public RetryInterceptorAutoConfiguration() {
        }

        @Bean
        @ConditionalOnMissingBean
        public RetryLoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRetryProperties properties, LoadBalancerRequestFactory requestFactory, LoadBalancedRetryFactory loadBalancedRetryFactory) {
            return new RetryLoadBalancerInterceptor(loadBalancerClient, properties, requestFactory, loadBalancedRetryFactory);
        }

        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
            return (restTemplate) -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }
    }

    @Configuration
    @ConditionalOnClass({RetryTemplate.class})
    public static class RetryAutoConfiguration {
        public RetryAutoConfiguration() {
        }

        @Bean
        @ConditionalOnMissingBean
        public LoadBalancedRetryFactory loadBalancedRetryFactory() {
            return new LoadBalancedRetryFactory() {
            };
        }
    }

    @Configuration
    @ConditionalOnMissingClass({"org.springframework.retry.support.RetryTemplate"})
    static class LoadBalancerInterceptorConfig {
        LoadBalancerInterceptorConfig() {
        }

        @Bean
        public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }

        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
            return (restTemplate) -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }
    }
}

LoadBalancerAutoConfiguration类头上的注解可以知道Ribbon实现的负载均衡自动化配置需要满足下面两个条件:

  • @ConditionalOnClass({RestTemplate.class})RestTemplate类必须存在于当前工程的环境中。
  • @ConditionalOnBean({LoadBalancerClient.class}):在Spring的Bean工程中有必须有LoadBalancerClient的实现Bean。

在该自动化配置类中,主要做了下面三件事:

  • 创建了一个LoadBalancerInterceptor的Bean,用于实现对客户端发起请求时进行拦截,以实现客户端负载均衡。
  • 创建了一个RestTemplateCustomizer的Bean,用于给RestTemplate增加LoadBalancerInterceptor拦截器。
  • 维护了一个被@LoadBalanced注解修饰的RestTemplate对象列表,并在这里进行初始化,通过调用RestTemplateCustomizer的实例来给需要客户端负载均衡的RestTemplate增加LoadBalancerInterceptor拦截器。

LoadBalancerInterceptor拦截器

LoadBalancerInterceptor拦截器是如何将一个普通的RestTemplate变成客户端负载均衡的。

package org.springframework.cloud.client.loadbalancer;

import java.io.IOException;
import java.net.URI;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.util.Assert;

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    private LoadBalancerClient loadBalancer;
    private LoadBalancerRequestFactory requestFactory;

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
        this.loadBalancer = loadBalancer;
        this.requestFactory = requestFactory;
    }

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
    }

    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
        URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
        return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
    }
}
RestTemplate源码

传统情况下在java代码里访问restful服务,一般使用ApacheHttpClient。不过此种方法使用起来太过繁琐。spring提供了一种简单便捷的模板类来进行操作,这就是RestTemplate

package org.springframework.web.client;

import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.ClientHttpRequest;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.client.support.InterceptingHttpAccessor;
import org.springframework.http.converter.ByteArrayHttpMessageConverter;
import org.springframework.http.converter.GenericHttpMessageConverter;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.ResourceHttpMessageConverter;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.http.converter.cbor.MappingJackson2CborHttpMessageConverter;
import org.springframework.http.converter.feed.AtomFeedHttpMessageConverter;
import org.springframework.http.converter.feed.RssChannelHttpMessageConverter;
import org.springframework.http.converter.json.GsonHttpMessageConverter;
import org.springframework.http.converter.json.JsonbHttpMessageConverter;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.http.converter.smile.MappingJackson2SmileHttpMessageConverter;
import org.springframework.http.converter.support.AllEncompassingFormHttpMessageConverter;
import org.springframework.http.converter.xml.Jaxb2RootElementHttpMessageConverter;
import org.springframework.http.converter.xml.MappingJackson2XmlHttpMessageConverter;
import org.springframework.http.converter.xml.SourceHttpMessageConverter;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.web.util.DefaultUriBuilderFactory;
import org.springframework.web.util.DefaultUriBuilderFactory.EncodingMode;
import org.springframework.web.util.UriTemplateHandler;

/**
 * Synchronous client to perform HTTP requests, exposing a simple, template
 * method API over underlying HTTP client libraries such as the JDK
 * {@code HttpURLConnection}, Apache HttpComponents, and others.
 *
 * <p>The RestTemplate offers templates for common scenarios by HTTP method, in
 * addition to the generalized {@code exchange} and {@code execute} methods that
 * support of less frequent cases.
 *
 * <p><strong>NOTE:</strong> As of 5.0, the non-blocking, reactive
 * {@code org.springframework.web.reactive.client.WebClient} offers a
 * modern alternative to the {@code RestTemplate} with efficient support for
 * both sync and async, as well as streaming scenarios. The {@code RestTemplate}
 * will be deprecated in a future version and will not have major new features
 * added going forward. See the WebClient section of the Spring Framework reference
 * documentation for more details and example code.
 *
 * @author Arjen Poutsma
 * @author Brian Clozel
 * @author Roy Clarkson
 * @author Juergen Hoeller
 * @since 3.0
 * @see HttpMessageConverter
 * @see RequestCallback
 * @see ResponseExtractor
 * @see ResponseErrorHandler
 */
public class RestTemplate extends InterceptingHttpAccessor implements RestOperations {

    private static boolean romePresent;

    private static final boolean jaxb2Present;

    private static final boolean jackson2Present;

    private static final boolean jackson2XmlPresent;

    private static final boolean jackson2SmilePresent;

    private static final boolean jackson2CborPresent;

    private static final boolean gsonPresent;

    private static final boolean jsonbPresent;

    static {
        ClassLoader classLoader = RestTemplate.class.getClassLoader();
        romePresent = ClassUtils.isPresent("com.rometools.rome.feed.WireFeed", classLoader);
        jaxb2Present = ClassUtils.isPresent("javax.xml.bind.Binder", classLoader);
        jackson2Present =
                ClassUtils.isPresent("com.fasterxml.jackson.databind.ObjectMapper", classLoader) &&
                        ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator", classLoader);
        jackson2XmlPresent = ClassUtils.isPresent("com.fasterxml.jackson.dataformat.xml.XmlMapper", classLoader);
        jackson2SmilePresent = ClassUtils.isPresent("com.fasterxml.jackson.dataformat.smile.SmileFactory", classLoader);
        jackson2CborPresent = ClassUtils.isPresent("com.fasterxml.jackson.dataformat.cbor.CBORFactory", classLoader);
        gsonPresent = ClassUtils.isPresent("com.google.gson.Gson", classLoader);
        jsonbPresent = ClassUtils.isPresent("javax.json.bind.Jsonb", classLoader);
    }


    private final List<HttpMessageConverter<?>> messageConverters = new ArrayList<>();

    private ResponseErrorHandler errorHandler = new DefaultResponseErrorHandler();

    private UriTemplateHandler uriTemplateHandler;

    private final ResponseExtractor<HttpHeaders> headersExtractor = new HeadersExtractor();


    /**
     * Create a new instance of the {@link RestTemplate} using default settings.
     * Default {@link HttpMessageConverter HttpMessageConverters} are initialized.
     */
    public RestTemplate() {
        this.messageConverters.add(new ByteArrayHttpMessageConverter());
        this.messageConverters.add(new StringHttpMessageConverter());
        this.messageConverters.add(new ResourceHttpMessageConverter(false));
        try {
            this.messageConverters.add(new SourceHttpMessageConverter<>());
        }
        catch (Error err) {
            // Ignore when no TransformerFactory implementation is available
        }
        this.messageConverters.add(new AllEncompassingFormHttpMessageConverter());

        if (romePresent) {
            this.messageConverters.add(new AtomFeedHttpMessageConverter());
            this.messageConverters.add(new RssChannelHttpMessageConverter());
        }

        if (jackson2XmlPresent) {
            this.messageConverters.add(new MappingJackson2XmlHttpMessageConverter());
        }
        else if (jaxb2Present) {
            this.messageConverters.add(new Jaxb2RootElementHttpMessageConverter());
        }

        if (jackson2Present) {
            this.messageConverters.add(new MappingJackson2HttpMessageConverter());
        }
        else if (gsonPresent) {
            this.messageConverters.add(new GsonHttpMessageConverter());
        }
        else if (jsonbPresent) {
            this.messageConverters.add(new JsonbHttpMessageConverter());
        }

        if (jackson2SmilePresent) {
            this.messageConverters.add(new MappingJackson2SmileHttpMessageConverter());
        }
        if (jackson2CborPresent) {
            this.messageConverters.add(new MappingJackson2CborHttpMessageConverter());
        }

        this.uriTemplateHandler = initUriTemplateHandler();
    }

    /**
     * Create a new instance of the {@link RestTemplate} based on the given {@link ClientHttpRequestFactory}.
     * @param requestFactory the HTTP request factory to use
     * @see org.springframework.http.client.SimpleClientHttpRequestFactory
     * @see org.springframework.http.client.HttpComponentsClientHttpRequestFactory
     */
    public RestTemplate(ClientHttpRequestFactory requestFactory) {
        this();
        setRequestFactory(requestFactory);
    }

    /**
     * Create a new instance of the {@link RestTemplate} using the given list of
     * {@link HttpMessageConverter} to use.
     * @param messageConverters the list of {@link HttpMessageConverter} to use
     * @since 3.2.7
     */
    public RestTemplate(List<HttpMessageConverter<?>> messageConverters) {
        Assert.notEmpty(messageConverters, "At least one HttpMessageConverter required");
        this.messageConverters.addAll(messageConverters);
        this.uriTemplateHandler = initUriTemplateHandler();
    }

    private static DefaultUriBuilderFactory initUriTemplateHandler() {
        DefaultUriBuilderFactory uriFactory = new DefaultUriBuilderFactory();
        uriFactory.setEncodingMode(EncodingMode.URI_COMPONENT);  // for backwards compatibility..
        return uriFactory;
    }


    /**
     * Set the message body converters to use.
     * <p>These converters are used to convert from and to HTTP requests and responses.
     */
    public void setMessageConverters(List<HttpMessageConverter<?>> messageConverters) {
        Assert.notEmpty(messageConverters, "At least one HttpMessageConverter required");
        // Take getMessageConverters() List as-is when passed in here
        if (this.messageConverters != messageConverters) {
            this.messageConverters.clear();
            this.messageConverters.addAll(messageConverters);
        }
    }

    /**
     * Return the list of message body converters.
     * <p>The returned {@link List} is active and may get appended to.
     */
    public List<HttpMessageConverter<?>> getMessageConverters() {
        return this.messageConverters;
    }

    /**
     * Set the error handler.
     * <p>By default, RestTemplate uses a {@link DefaultResponseErrorHandler}.
     */
    public void setErrorHandler(ResponseErrorHandler errorHandler) {
        Assert.notNull(errorHandler, "ResponseErrorHandler must not be null");
        this.errorHandler = errorHandler;
    }

    /**
     * Return the error handler.
     */
    public ResponseErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    /**
     * Configure default URI variable values. This is a shortcut for:
     * <pre class="code">
     * DefaultUriBuilderFactory factory = new DefaultUriBuilderFactory();
     * handler.setDefaultUriVariables(...);
     *
     * RestTemplate restTemplate = new RestTemplate();
     * restTemplate.setUriTemplateHandler(handler);
     * </pre>
     * @param uriVars the default URI variable values
     * @since 4.3
     */
    @SuppressWarnings("deprecation")
    public void setDefaultUriVariables(Map<String, ?> uriVars) {
        if (this.uriTemplateHandler instanceof DefaultUriBuilderFactory) {
            ((DefaultUriBuilderFactory) this.uriTemplateHandler).setDefaultUriVariables(uriVars);
        }
        else if (this.uriTemplateHandler instanceof org.springframework.web.util.AbstractUriTemplateHandler) {
            ((org.springframework.web.util.AbstractUriTemplateHandler) this.uriTemplateHandler)
                    .setDefaultUriVariables(uriVars);
        }
        else {
            throw new IllegalArgumentException(
                    "This property is not supported with the configured UriTemplateHandler.");
        }
    }

    /**
     * Configure a strategy for expanding URI templates.
     * <p>By default, {@link DefaultUriBuilderFactory} is used and for
     * backwards compatibility, the encoding mode is set to
     * {@link EncodingMode#URI_COMPONENT URI_COMPONENT}. As of 5.0.8, prefer
     * using {@link EncodingMode#TEMPLATE_AND_VALUES TEMPLATE_AND_VALUES}.
     * <p><strong>Note:</strong> in 5.0 the switch from
     * {@link org.springframework.web.util.DefaultUriTemplateHandler
     * DefaultUriTemplateHandler} (deprecated in 4.3), as the default to use, to
     * {@link DefaultUriBuilderFactory} brings in a different default for the
     * {@code parsePath} property (switching from false to true).
     * @param handler the URI template handler to use
     */
    public void setUriTemplateHandler(UriTemplateHandler handler) {
        Assert.notNull(handler, "UriTemplateHandler must not be null");
        this.uriTemplateHandler = handler;
    }

    /**
     * Return the configured URI template handler.
     */
    public UriTemplateHandler getUriTemplateHandler() {
        return this.uriTemplateHandler;
    }


    // GET

    @Override
    @Nullable
    public <T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws RestClientException {
        RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
        HttpMessageConverterExtractor<T> responseExtractor =
                new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);
        return execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables);
    }

    @Override
    @Nullable
    public <T> T getForObject(String url, Class<T> responseType, Map<String, ?> uriVariables) throws RestClientException {
        RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
        HttpMessageConverterExtractor<T> responseExtractor =
                new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);
        return execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables);
    }

    @Override
    @Nullable
    public <T> T getForObject(URI url, Class<T> responseType) throws RestClientException {
        RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
        HttpMessageConverterExtractor<T> responseExtractor =
                new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);
        return execute(url, HttpMethod.GET, requestCallback, responseExtractor);
    }

    @Override
    public <T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Object... uriVariables)
            throws RestClientException {

        RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
        ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(responseType);
        return nonNull(execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables));
    }

    @Override
    public <T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Map<String, ?> uriVariables)
            throws RestClientException {

        RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
        ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(responseType);
        return nonNull(execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables));
    }

    @Override
    public <T> ResponseEntity<T> getForEntity(URI url, Class<T> responseType) throws RestClientException {
        RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
        ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(responseType);
        return nonNull(execute(url, HttpMethod.GET, requestCallback, responseExtractor));
    }


    // HEAD

    @Override
    public HttpHeaders headForHeaders(String url, Object... uriVariables) throws RestClientException {
        return nonNull(execute(url, HttpMethod.HEAD, null, headersExtractor(), uriVariables));
    }

    @Override
    public HttpHeaders headForHeaders(String url, Map<String, ?> uriVariables) throws RestClientException {
        return nonNull(execute(url, HttpMethod.HEAD, null, headersExtractor(), uriVariables));
    }

    @Override
    public HttpHeaders headForHeaders(URI url) throws RestClientException {
        return nonNull(execute(url, HttpMethod.HEAD, null, headersExtractor()));
    }


    // POST

    @Override
    @Nullable
    public URI postForLocation(String url, @Nullable Object request, Object... uriVariables)
            throws RestClientException {

        RequestCallback requestCallback = httpEntityCallback(request);
        HttpHeaders headers = execute(url, HttpMethod.POST, requestCallback, headersExtractor(), uriVariables);
        return (headers != null ? headers.getLocation() : null);
    }

    @Override
    @Nullable
    public URI postForLocation(String url, @Nullable Object request, Map<String, ?> uriVariables)
            throws RestClientException {

        RequestCallback requestCallback = httpEntityCallback(request);
        HttpHeaders headers = execute(url, HttpMethod.POST, requestCallback, headersExtractor(), uriVariables);
        return (headers != null ? headers.getLocation() : null);
    }

    @Override
    @Nullable
    public URI postForLocation(URI url, @Nullable Object request) throws RestClientException {
        RequestCallback requestCallback = httpEntityCallback(request);
        HttpHeaders headers = execute(url, HttpMethod.POST, requestCallback, headersExtractor());
        return (headers != null ? headers.getLocation() : null);
    }

    @Override
    @Nullable
    public <T> T postForObject(String url, @Nullable Object request, Class<T> responseType,
            Object... uriVariables) throws RestClientException {

        RequestCallback requestCallback = httpEntityCallback(request, responseType);
        HttpMessageConverterExtractor<T> responseExtractor =
                new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);
        return execute(url, HttpMethod.POST, requestCallback, responseExtractor, uriVariables);
    }

    @Override
    @Nullable
    public <T> T postForObject(String url, @Nullable Object request, Class<T> responseType,
            Map<String, ?> uriVariables) throws RestClientException {

        RequestCallback requestCallback = httpEntityCallback(request, responseType);
        HttpMessageConverterExtractor<T> responseExtractor =
                new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);
        return execute(url, HttpMethod.POST, requestCallback, responseExtractor, uriVariables);
    }

    @Override
    @Nullable
    public <T> T postForObject(URI url, @Nullable Object request, Class<T> responseType)
            throws RestClientException {

        RequestCallback requestCallback = httpEntityCallback(request, responseType);
        HttpMessageConverterExtractor<T> responseExtractor =
                new HttpMessageConverterExtractor<>(responseType, getMessageConverters());
        return execute(url, HttpMethod.POST, requestCallback, responseExtractor);
    }

    @Override
    public <T> ResponseEntity<T> postForEntity(String url, @Nullable Object request,
            Class<T> responseType, Object... uriVariables) throws RestClientException {

        RequestCallback requestCallback = httpEntityCallback(request, responseType);
        ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(responseType);
        return nonNull(execute(url, HttpMethod.POST, requestCallback, responseExtractor, uriVariables));
    }

    @Override
    public <T> ResponseEntity<T> postForEntity(String url, @Nullable Object request,
            Class<T> responseType, Map<String, ?> uriVariables) throws RestClientException {

        RequestCallback requestCallback = httpEntityCallback(request, responseType);
        ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(responseType);
        return nonNull(execute(url, HttpMethod.POST, requestCallback, responseExtractor, uriVariables));
    }

    @Override
    public <T> ResponseEntity<T> postForEntity(URI url, @Nullable Object request, Class<T> responseType)
            throws RestClientException {

        RequestCallback requestCallback = httpEntityCallback(request, responseType);
        ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(responseType);
        return nonNull(execute(url, HttpMethod.POST, requestCallback, responseExtractor));
    }


    // PUT

    @Override
    public void put(String url, @Nullable Object request, Object... uriVariables)
            throws RestClientException {

        RequestCallback requestCallback = httpEntityCallback(request);
        execute(url, HttpMethod.PUT, requestCallback, null, uriVariables);
    }

    @Override
    public void put(String url, @Nullable Object request, Map<String, ?> uriVariables)
            throws RestClientException {

        RequestCallback requestCallback = httpEntityCallback(request);
        execute(url, HttpMethod.PUT, requestCallback, null, uriVariables);
    }

    @Override
    public void put(URI url, @Nullable Object request) throws RestClientException {
        RequestCallback requestCallback = httpEntityCallback(request);
        execute(url, HttpMethod.PUT, requestCallback, null);
    }


    // PATCH

    @Override
    @Nullable
    public <T> T patchForObject(String url, @Nullable Object request, Class<T> responseType,
            Object... uriVariables) throws RestClientException {

        RequestCallback requestCallback = httpEntityCallback(request, responseType);
        HttpMessageConverterExtractor<T> responseExtractor =
                new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);
        return execute(url, HttpMethod.PATCH, requestCallback, responseExtractor, uriVariables);
    }

    @Override
    @Nullable
    public <T> T patchForObject(String url, @Nullable Object request, Class<T> responseType,
            Map<String, ?> uriVariables) throws RestClientException {

        RequestCallback requestCallback = httpEntityCallback(request, responseType);
        HttpMessageConverterExtractor<T> responseExtractor =
                new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);
        return execute(url, HttpMethod.PATCH, requestCallback, responseExtractor, uriVariables);
    }

    @Override
    @Nullable
    public <T> T patchForObject(URI url, @Nullable Object request, Class<T> responseType)
            throws RestClientException {

        RequestCallback requestCallback = httpEntityCallback(request, responseType);
        HttpMessageConverterExtractor<T> responseExtractor =
                new HttpMessageConverterExtractor<>(responseType, getMessageConverters());
        return execute(url, HttpMethod.PATCH, requestCallback, responseExtractor);
    }


    // DELETE

    @Override
    public void delete(String url, Object... uriVariables) throws RestClientException {
        execute(url, HttpMethod.DELETE, null, null, uriVariables);
    }

    @Override
    public void delete(String url, Map<String, ?> uriVariables) throws RestClientException {
        execute(url, HttpMethod.DELETE, null, null, uriVariables);
    }

    @Override
    public void delete(URI url) throws RestClientException {
        execute(url, HttpMethod.DELETE, null, null);
    }


    // OPTIONS

    @Override
    public Set<HttpMethod> optionsForAllow(String url, Object... uriVariables) throws RestClientException {
        ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor();
        HttpHeaders headers = execute(url, HttpMethod.OPTIONS, null, headersExtractor, uriVariables);
        return (headers != null ? headers.getAllow() : Collections.emptySet());
    }

    @Override
    public Set<HttpMethod> optionsForAllow(String url, Map<String, ?> uriVariables) throws RestClientException {
        ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor();
        HttpHeaders headers = execute(url, HttpMethod.OPTIONS, null, headersExtractor, uriVariables);
        return (headers != null ? headers.getAllow() : Collections.emptySet());
    }

    @Override
    public Set<HttpMethod> optionsForAllow(URI url) throws RestClientException {
        ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor();
        HttpHeaders headers = execute(url, HttpMethod.OPTIONS, null, headersExtractor);
        return (headers != null ? headers.getAllow() : Collections.emptySet());
    }


    // exchange

    @Override
    public <T> ResponseEntity<T> exchange(String url, HttpMethod method,
            @Nullable HttpEntity<?> requestEntity, Class<T> responseType, Object... uriVariables)
            throws RestClientException {

        RequestCallback requestCallback = httpEntityCallback(requestEntity, responseType);
        ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(responseType);
        return nonNull(execute(url, method, requestCallback, responseExtractor, uriVariables));
    }

    @Override
    public <T> ResponseEntity<T> exchange(String url, HttpMethod method,
            @Nullable HttpEntity<?> requestEntity, Class<T> responseType, Map<String, ?> uriVariables)
            throws RestClientException {

        RequestCallback requestCallback = httpEntityCallback(requestEntity, responseType);
        ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(responseType);
        return nonNull(execute(url, method, requestCallback, responseExtractor, uriVariables));
    }

    @Override
    public <T> ResponseEntity<T> exchange(URI url, HttpMethod method, @Nullable HttpEntity<?> requestEntity,
            Class<T> responseType) throws RestClientException {

        RequestCallback requestCallback = httpEntityCallback(requestEntity, responseType);
        ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(responseType);
        return nonNull(execute(url, method, requestCallback, responseExtractor));
    }

    @Override
    public <T> ResponseEntity<T> exchange(String url, HttpMethod method, @Nullable HttpEntity<?> requestEntity,
            ParameterizedTypeReference<T> responseType, Object... uriVariables) throws RestClientException {

        Type type = responseType.getType();
        RequestCallback requestCallback = httpEntityCallback(requestEntity, type);
        ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(type);
        return nonNull(execute(url, method, requestCallback, responseExtractor, uriVariables));
    }

    @Override
    public <T> ResponseEntity<T> exchange(String url, HttpMethod method, @Nullable HttpEntity<?> requestEntity,
            ParameterizedTypeReference<T> responseType, Map<String, ?> uriVariables) throws RestClientException {

        Type type = responseType.getType();
        RequestCallback requestCallback = httpEntityCallback(requestEntity, type);
        ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(type);
        return nonNull(execute(url, method, requestCallback, responseExtractor, uriVariables));
    }

    @Override
    public <T> ResponseEntity<T> exchange(URI url, HttpMethod method, @Nullable HttpEntity<?> requestEntity,
            ParameterizedTypeReference<T> responseType) throws RestClientException {

        Type type = responseType.getType();
        RequestCallback requestCallback = httpEntityCallback(requestEntity, type);
        ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(type);
        return nonNull(execute(url, method, requestCallback, responseExtractor));
    }

    @Override
    public <T> ResponseEntity<T> exchange(RequestEntity<?> requestEntity, Class<T> responseType)
            throws RestClientException {

        RequestCallback requestCallback = httpEntityCallback(requestEntity, responseType);
        ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(responseType);
        return nonNull(doExecute(requestEntity.getUrl(), requestEntity.getMethod(), requestCallback, responseExtractor));
    }

    @Override
    public <T> ResponseEntity<T> exchange(RequestEntity<?> requestEntity, ParameterizedTypeReference<T> responseType)
            throws RestClientException {

        Type type = responseType.getType();
        RequestCallback requestCallback = httpEntityCallback(requestEntity, type);
        ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(type);
        return nonNull(doExecute(requestEntity.getUrl(), requestEntity.getMethod(), requestCallback, responseExtractor));
    }


    // General execution

    /**
     * {@inheritDoc}
     * <p>To provide a {@code RequestCallback} or {@code ResponseExtractor} only,
     * but not both, consider using:
     * <ul>
     * <li>{@link #acceptHeaderRequestCallback(Class)}
     * <li>{@link #httpEntityCallback(Object)}
     * <li>{@link #httpEntityCallback(Object, Type)}
     * <li>{@link #responseEntityExtractor(Type)}
     * </ul>
     */
    @Override
    @Nullable
    public <T> T execute(String url, HttpMethod method, @Nullable RequestCallback requestCallback,
            @Nullable ResponseExtractor<T> responseExtractor, Object... uriVariables) throws RestClientException {

        URI expanded = getUriTemplateHandler().expand(url, uriVariables);
        return doExecute(expanded, method, requestCallback, responseExtractor);
    }

    /**
     * {@inheritDoc}
     * <p>To provide a {@code RequestCallback} or {@code ResponseExtractor} only,
     * but not both, consider using:
     * <ul>
     * <li>{@link #acceptHeaderRequestCallback(Class)}
     * <li>{@link #httpEntityCallback(Object)}
     * <li>{@link #httpEntityCallback(Object, Type)}
     * <li>{@link #responseEntityExtractor(Type)}
     * </ul>
     */
    @Override
    @Nullable
    public <T> T execute(String url, HttpMethod method, @Nullable RequestCallback requestCallback,
            @Nullable ResponseExtractor<T> responseExtractor, Map<String, ?> uriVariables)
            throws RestClientException {

        URI expanded = getUriTemplateHandler().expand(url, uriVariables);
        return doExecute(expanded, method, requestCallback, responseExtractor);
    }

    /**
     * {@inheritDoc}
     * <p>To provide a {@code RequestCallback} or {@code ResponseExtractor} only,
     * but not both, consider using:
     * <ul>
     * <li>{@link #acceptHeaderRequestCallback(Class)}
     * <li>{@link #httpEntityCallback(Object)}
     * <li>{@link #httpEntityCallback(Object, Type)}
     * <li>{@link #responseEntityExtractor(Type)}
     * </ul>
     */
    @Override
    @Nullable
    public <T> T execute(URI url, HttpMethod method, @Nullable RequestCallback requestCallback,
            @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {

        return doExecute(url, method, requestCallback, responseExtractor);
    }

    /**
     * Execute the given method on the provided URI.
     * <p>The {@link ClientHttpRequest} is processed using the {@link RequestCallback};
     * the response with the {@link ResponseExtractor}.
     * @param url the fully-expanded URL to connect to
     * @param method the HTTP method to execute (GET, POST, etc.)
     * @param requestCallback object that prepares the request (can be {@code null})
     * @param responseExtractor object that extracts the return value from the response (can be {@code null})
     * @return an arbitrary object, as returned by the {@link ResponseExtractor}
     */
    @Nullable
    protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
            @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {

        Assert.notNull(url, "URI is required");
        Assert.notNull(method, "HttpMethod is required");
        ClientHttpResponse response = null;
        try {
            ClientHttpRequest request = createRequest(url, method);
            if (requestCallback != null) {
                requestCallback.doWithRequest(request);
            }
            response = request.execute();
            handleResponse(url, method, response);
            return (responseExtractor != null ? responseExtractor.extractData(response) : null);
        }
        catch (IOException ex) {
            String resource = url.toString();
            String query = url.getRawQuery();
            resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
            throw new ResourceAccessException("I/O error on " + method.name() +
                    " request for \"" + resource + "\": " + ex.getMessage(), ex);
        }
        finally {
            if (response != null) {
                response.close();
            }
        }
    }

    /**
     * Handle the given response, performing appropriate logging and
     * invoking the {@link ResponseErrorHandler} if necessary.
     * <p>Can be overridden in subclasses.
     * @param url the fully-expanded URL to connect to
     * @param method the HTTP method to execute (GET, POST, etc.)
     * @param response the resulting {@link ClientHttpResponse}
     * @throws IOException if propagated from {@link ResponseErrorHandler}
     * @since 4.1.6
     * @see #setErrorHandler
     */
    protected void handleResponse(URI url, HttpMethod method, ClientHttpResponse response) throws IOException {
        ResponseErrorHandler errorHandler = getErrorHandler();
        boolean hasError = errorHandler.hasError(response);
        if (logger.isDebugEnabled()) {
            try {
                int code = response.getRawStatusCode();
                HttpStatus status = HttpStatus.resolve(code);
                logger.debug("Response " + (status != null ? status : code));
            }
            catch (IOException ex) {
                // ignore
            }
        }
        if (hasError) {
            errorHandler.handleError(url, method, response);
        }
    }

    /**
     * Return a {@code RequestCallback} that sets the request {@code Accept}
     * header based on the given response type, cross-checked against the
     * configured message converters.
     */
    public <T> RequestCallback acceptHeaderRequestCallback(Class<T> responseType) {
        return new AcceptHeaderRequestCallback(responseType);
    }

    /**
     * Return a {@code RequestCallback} implementation that writes the given
     * object to the request stream.
     */
    public <T> RequestCallback httpEntityCallback(@Nullable Object requestBody) {
        return new HttpEntityRequestCallback(requestBody);
    }

    /**
     * Return a {@code RequestCallback} implementation that:
     * <ol>
     * <li>Sets the request {@code Accept} header based on the given response
     * type, cross-checked against the configured message converters.
     * <li>Writes the given object to the request stream.
     * </ol>
     */
    public <T> RequestCallback httpEntityCallback(@Nullable Object requestBody, Type responseType) {
        return new HttpEntityRequestCallback(requestBody, responseType);
    }

    /**
     * Return a {@code ResponseExtractor} that prepares a {@link ResponseEntity}.
     */
    public <T> ResponseExtractor<ResponseEntity<T>> responseEntityExtractor(Type responseType) {
        return new ResponseEntityResponseExtractor<>(responseType);
    }

    /**
     * Return a response extractor for {@link HttpHeaders}.
     */
    protected ResponseExtractor<HttpHeaders> headersExtractor() {
        return this.headersExtractor;
    }

    private static <T> T nonNull(@Nullable T result) {
        Assert.state(result != null, "No result");
        return result;
    }


    /**
     * Request callback implementation that prepares the request's accept headers.
     */
    private class AcceptHeaderRequestCallback implements RequestCallback {

        @Nullable
        private final Type responseType;

        public AcceptHeaderRequestCallback(@Nullable Type responseType) {
            this.responseType = responseType;
        }

        @Override
        public void doWithRequest(ClientHttpRequest request) throws IOException {
            if (this.responseType != null) {
                List<MediaType> allSupportedMediaTypes = getMessageConverters().stream()
                        .filter(converter -> canReadResponse(this.responseType, converter))
                        .flatMap(this::getSupportedMediaTypes)
                        .distinct()
                        .sorted(MediaType.SPECIFICITY_COMPARATOR)
                        .collect(Collectors.toList());
                if (logger.isDebugEnabled()) {
                    logger.debug("Accept=" + allSupportedMediaTypes);
                }
                request.getHeaders().setAccept(allSupportedMediaTypes);
            }
        }

        private boolean canReadResponse(Type responseType, HttpMessageConverter<?> converter) {
            Class<?> responseClass = (responseType instanceof Class ? (Class<?>) responseType : null);
            if (responseClass != null) {
                return converter.canRead(responseClass, null);
            }
            else if (converter instanceof GenericHttpMessageConverter) {
                GenericHttpMessageConverter<?> genericConverter = (GenericHttpMessageConverter<?>) converter;
                return genericConverter.canRead(responseType, null, null);
            }
            return false;
        }

        private Stream<MediaType> getSupportedMediaTypes(HttpMessageConverter<?> messageConverter) {
            return messageConverter.getSupportedMediaTypes()
                    .stream()
                    .map(mediaType -> {
                        if (mediaType.getCharset() != null) {
                            return new MediaType(mediaType.getType(), mediaType.getSubtype());
                        }
                        return mediaType;
                    });
        }
    }


    /**
     * Request callback implementation that writes the given object to the request stream.
     */
    private class HttpEntityRequestCallback extends AcceptHeaderRequestCallback {

        private final HttpEntity<?> requestEntity;

        public HttpEntityRequestCallback(@Nullable Object requestBody) {
            this(requestBody, null);
        }

        public HttpEntityRequestCallback(@Nullable Object requestBody, @Nullable Type responseType) {
            super(responseType);
            if (requestBody instanceof HttpEntity) {
                this.requestEntity = (HttpEntity<?>) requestBody;
            }
            else if (requestBody != null) {
                this.requestEntity = new HttpEntity<>(requestBody);
            }
            else {
                this.requestEntity = HttpEntity.EMPTY;
            }
        }

        @Override
        @SuppressWarnings("unchecked")
        public void doWithRequest(ClientHttpRequest httpRequest) throws IOException {
            super.doWithRequest(httpRequest);
            Object requestBody = this.requestEntity.getBody();
            if (requestBody == null) {
                HttpHeaders httpHeaders = httpRequest.getHeaders();
                HttpHeaders requestHeaders = this.requestEntity.getHeaders();
                if (!requestHeaders.isEmpty()) {
                    requestHeaders.forEach((key, values) -> httpHeaders.put(key, new LinkedList<>(values)));
                }
                if (httpHeaders.getContentLength() < 0) {
                    httpHeaders.setContentLength(0L);
                }
            }
            else {
                Class<?> requestBodyClass = requestBody.getClass();
                Type requestBodyType = (this.requestEntity instanceof RequestEntity ?
                        ((RequestEntity<?>)this.requestEntity).getType() : requestBodyClass);
                HttpHeaders httpHeaders = httpRequest.getHeaders();
                HttpHeaders requestHeaders = this.requestEntity.getHeaders();
                MediaType requestContentType = requestHeaders.getContentType();
                for (HttpMessageConverter<?> messageConverter : getMessageConverters()) {
                    if (messageConverter instanceof GenericHttpMessageConverter) {
                        GenericHttpMessageConverter<Object> genericConverter =
                                (GenericHttpMessageConverter<Object>) messageConverter;
                        if (genericConverter.canWrite(requestBodyType, requestBodyClass, requestContentType)) {
                            if (!requestHeaders.isEmpty()) {
                                requestHeaders.forEach((key, values) -> httpHeaders.put(key, new LinkedList<>(values)));
                            }
                            logBody(requestBody, requestContentType, genericConverter);
                            genericConverter.write(requestBody, requestBodyType, requestContentType, httpRequest);
                            return;
                        }
                    }
                    else if (messageConverter.canWrite(requestBodyClass, requestContentType)) {
                        if (!requestHeaders.isEmpty()) {
                            requestHeaders.forEach((key, values) -> httpHeaders.put(key, new LinkedList<>(values)));
                        }
                        logBody(requestBody, requestContentType, messageConverter);
                        ((HttpMessageConverter<Object>) messageConverter).write(
                                requestBody, requestContentType, httpRequest);
                        return;
                    }
                }
                String message = "No HttpMessageConverter for " + requestBodyClass.getName();
                if (requestContentType != null) {
                    message += " and content type \"" + requestContentType + "\"";
                }
                throw new RestClientException(message);
            }
        }

        private void logBody(Object body, @Nullable MediaType mediaType, HttpMessageConverter<?> converter) {
            if (logger.isDebugEnabled()) {
                if (mediaType != null) {
                    logger.debug("Writing [" + body + "] as \"" + mediaType + "\"");
                }
                else {
                    logger.debug("Writing [" + body + "] with " + converter.getClass().getName());
                }
            }
        }
    }


    /**
     * Response extractor for {@link HttpEntity}.
     */
    private class ResponseEntityResponseExtractor<T> implements ResponseExtractor<ResponseEntity<T>> {

        @Nullable
        private final HttpMessageConverterExtractor<T> delegate;

        public ResponseEntityResponseExtractor(@Nullable Type responseType) {
            if (responseType != null && Void.class != responseType) {
                this.delegate = new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);
            }
            else {
                this.delegate = null;
            }
        }

        @Override
        public ResponseEntity<T> extractData(ClientHttpResponse response) throws IOException {
            if (this.delegate != null) {
                T body = this.delegate.extractData(response);
                return ResponseEntity.status(response.getRawStatusCode()).headers(response.getHeaders()).body(body);
            }
            else {
                return ResponseEntity.status(response.getRawStatusCode()).headers(response.getHeaders()).build();
            }
        }
    }


    /**
     * Response extractor that extracts the response {@link HttpHeaders}.
     */
    private static class HeadersExtractor implements ResponseExtractor<HttpHeaders> {

        @Override
        public HttpHeaders extractData(ClientHttpResponse response) {
            return response.getHeaders();
        }
    }

}

ClientHttpRequestInterceptor源码

主要是解析请求。

@FunctionalInterface
public interface ClientHttpRequestInterceptor {
    ClientHttpResponse intercept(HttpRequest var1, byte[] var2, ClientHttpRequestExecution var3) throws IOException;
}

拦截器注入LoadBalancerClientLoadBalancerRequestFactory,当一个被@LoadBalanced注解修饰的RestTemplate对象向外发起HTTP请求时,会被LoadBalancerInterceptor类的intercept函数所拦截。由于我们在使用RestTemplate时候采用了服务名作为host,所以直接从HttpRequest的URI对象中通过getHost()就可以拿到服务名,然后调用execute函数去根据服务名来选择实例并发起实际的请求。

LoadBalancerClient接口源码

package org.springframework.cloud.client.loadbalancer;

import java.io.IOException;
import java.net.URI;

import org.springframework.cloud.client.ServiceInstance;

/**
 * 客户端负载均衡器
 */
public interface LoadBalancerClient extends ServiceInstanceChooser {

    /**
     * 使用指定的LoadBalancer中的ServiceInstance执行请求服务。
     */
    <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;

    /**
     * 使用指定的LoadBalancer中的ServiceInstance执行请求服务。
     */
    <T> T execute(String serviceId, ServiceInstance serviceInstance,
            LoadBalancerRequest<T> request) throws IOException;

    /**
     * 创建一个适当的URI,该URI具有供系统使用的真实主机和端口。
     * 一些系统使用具有逻辑服务名称的URI作为主机,例如
     * http://myservice/path/to/service。这将替换为服务名称。
     * host:来自ServiceInstance的端口。
     */
    URI reconstructURI(ServiceInstance instance, URI original);

}

LoadBalancerClient接口实现类RibbonLoadBalancerClient源码

package org.springframework.cloud.netflix.ribbon;

import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.Map;

import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.Server;

import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.cloud.client.loadbalancer.LoadBalancerRequest;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;

import static org.springframework.cloud.netflix.ribbon.RibbonUtils.updateToSecureConnectionIfNeeded;

/**
 * 客户端负载均衡器LoadBalancerClient的接口的实现类
 */
public class RibbonLoadBalancerClient implements LoadBalancerClient {

    private SpringClientFactory clientFactory;

    public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
        this.clientFactory = clientFactory;
    }

    @Override
    public URI reconstructURI(ServiceInstance instance, URI original) {
        Assert.notNull(instance, "instance can not be null");
        String serviceId = instance.getServiceId();
        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);

        URI uri;
        Server server;
        if (instance instanceof RibbonServer) {
            RibbonServer ribbonServer = (RibbonServer) instance;
            server = ribbonServer.getServer();
            uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
        }
        else {
            server = new Server(instance.getScheme(), instance.getHost(),
                    instance.getPort());
            IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
            ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
            uri = updateToSecureConnectionIfNeeded(original, clientConfig,
                    serverIntrospector, server);
        }
        return context.reconstructURIWithServer(server, uri);
    }

    @Override
    public ServiceInstance choose(String serviceId) {
        return choose(serviceId, null);
    }

    /**
     * New: Select a server using a 'key'.
     */
    public ServiceInstance choose(String serviceId, Object hint) {
        Server server = getServer(getLoadBalancer(serviceId), hint);
        if (server == null) {
            return null;
        }
        return new RibbonServer(serviceId, server, isSecure(server, serviceId),
                serverIntrospector(serviceId).getMetadata(server));
    }

    @Override
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
            throws IOException {
        return execute(serviceId, request, null);
    }

    /**
     * New: Execute a request by selecting server using a 'key'. The hint will have to be
     * the last parameter to not mess with the `execute(serviceId, ServiceInstance,
     * request)` method. This somewhat breaks the fluent coding style when using a lambda
     * to define the LoadBalancerRequest.
     */
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
            throws IOException {
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
        Server server = getServer(loadBalancer, hint);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                isSecure(server, serviceId),
                serverIntrospector(serviceId).getMetadata(server));

        return execute(serviceId, ribbonServer, request);
    }

    @Override
    public <T> T execute(String serviceId, ServiceInstance serviceInstance,
            LoadBalancerRequest<T> request) throws IOException {
        Server server = null;
        if (serviceInstance instanceof RibbonServer) {
            server = ((RibbonServer) serviceInstance).getServer();
        }
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }

        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);
        RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

        try {
            T returnVal = request.apply(serviceInstance);
            statsRecorder.recordStats(returnVal);
            return returnVal;
        }
        // catch IOException and rethrow so RestTemplate behaves correctly
        catch (IOException ex) {
            statsRecorder.recordStats(ex);
            throw ex;
        }
        catch (Exception ex) {
            statsRecorder.recordStats(ex);
            ReflectionUtils.rethrowRuntimeException(ex);
        }
        return null;
    }

    private ServerIntrospector serverIntrospector(String serviceId) {
        ServerIntrospector serverIntrospector = this.clientFactory.getInstance(serviceId,
                ServerIntrospector.class);
        if (serverIntrospector == null) {
            serverIntrospector = new DefaultServerIntrospector();
        }
        return serverIntrospector;
    }

    private boolean isSecure(Server server, String serviceId) {
        IClientConfig config = this.clientFactory.getClientConfig(serviceId);
        ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
        return RibbonUtils.isSecure(config, serverIntrospector, server);
    }

    // Note: This method could be removed?
    protected Server getServer(String serviceId) {
        return getServer(getLoadBalancer(serviceId), null);
    }

    protected Server getServer(ILoadBalancer loadBalancer) {
        return getServer(loadBalancer, null);
    }

    protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
        if (loadBalancer == null) {
            return null;
        }
        // Use 'default' on a null hint, or just pass it on?
        return loadBalancer.chooseServer(hint != null ? hint : "default");
    }

    protected ILoadBalancer getLoadBalancer(String serviceId) {
        return this.clientFactory.getLoadBalancer(serviceId);
    }

    /**
     * Ribbon-server-specific {@link ServiceInstance} implementation.
     */
    public static class RibbonServer implements ServiceInstance {

        private final String serviceId;

        private final Server server;

        private final boolean secure;

        private Map<String, String> metadata;

        public RibbonServer(String serviceId, Server server) {
            this(serviceId, server, false, Collections.emptyMap());
        }

        public RibbonServer(String serviceId, Server server, boolean secure,
                Map<String, String> metadata) {
            this.serviceId = serviceId;
            this.server = server;
            this.secure = secure;
            this.metadata = metadata;
        }

        @Override
        public String getInstanceId() {
            return this.server.getId();
        }

        @Override
        public String getServiceId() {
            return this.serviceId;
        }

        @Override
        public String getHost() {
            return this.server.getHost();
        }

        @Override
        public int getPort() {
            return this.server.getPort();
        }

        @Override
        public boolean isSecure() {
            return this.secure;
        }

        @Override
        public URI getUri() {
            return DefaultServiceInstance.getUri(this);
        }

        @Override
        public Map<String, String> getMetadata() {
            return this.metadata;
        }

        public Server getServer() {
            return this.server;
        }

        @Override
        public String getScheme() {
            return this.server.getScheme();
        }

        @Override
        public String toString() {
            final StringBuilder sb = new StringBuilder("RibbonServer{");
            sb.append("serviceId='").append(serviceId).append('\'');
            sb.append(", server=").append(server);
            sb.append(", secure=").append(secure);
            sb.append(", metadata=").append(metadata);
            sb.append('}');
            return sb.toString();
        }

    }

}

ServiceInstanceChooser源码

package org.springframework.cloud.client.loadbalancer;

import org.springframework.cloud.client.ServiceInstance;

/**
 * 负载平衡器选择服务器发送请求的类实现接口。
 */
public interface ServiceInstanceChooser {

    /**
     * 从LoadBalancer中为指定的服务选择ServiceInstance。
     */
    ServiceInstance choose(String serviceId);

}

Ctrl+Alt+b之后查看ServiceInstanceChooser的使用者:

ServiceInstance接口源码

/**
 * Represents an instance of a service in a discovery system.
 *
 * @author Spencer Gibb
 * @author Tim Ysewyn
 */
public interface ServiceInstance {

    /**
     * @return The unique instance ID as registered.
     */
    default String getInstanceId() {
        return null;
    }

    /**
     * 已注册服务实例的实例ID。
     * @return The service ID as registered.
     */
    String getServiceId();

    /**
     * 已注册服务实例的主机名。
     * @return The hostname of the registered service instance.
     */
    String getHost();

    /**
     * 已注册服务实例的端口。
     * @return The port of the registered service instance.
     */
    int getPort();

    /**
     * 已注册服务实例的端口是否使用HTTPS
     * @return Whether the port of the registered service instance uses HTTPS.
     */
    boolean isSecure();

    /**
     * 获取URI地址
     * @return The service URI address.
     */
    URI getUri();

    /**
     * @return The key / value pair metadata associated with the service instance.
     */
    Map<String, String> getMetadata();

    /**
     * @return The scheme of the service instance.
     */
    default String getScheme() {
        return null;
    }

}

接口ServiceInstance源码以及实现类

package org.springframework.cloud.client;

import java.net.URI;
import java.util.Map;

public interface ServiceInstance {
    default String getInstanceId() {
        return null;
    }

    //获取注册在Eureka的服务名称
    String getServiceId();

    //获取主机地址
    String getHost();

    //获取端口
    int getPort();

    //是否安全
    boolean isSecure();

    //获取uri地址
    URI getUri();

    //元数据
    Map<String, String> getMetadata();

    default String getScheme() {
        return null;
    }
}

测试获得的数据:

ServiceInstance在Ribbon中的实现源码
public static class RibbonServer implements ServiceInstance {

    private final String serviceId;

    private final Server server;

    private final boolean secure;

    private Map<String, String> metadata;

    /**
         * 控制器(serviceId、server)
         * @param serviceId
         * @param server
         */
    public RibbonServer(String serviceId, Server server) {
        this(serviceId, server, false, Collections.emptyMap());
    }

    /**
         * 控制器(serviceId、server、secure、metadata)
         * @param serviceId
         * @param server
         * @param secure
         * @param metadata
         */
    public RibbonServer(String serviceId, Server server, boolean secure,
                        Map<String, String> metadata) {
        this.serviceId = serviceId;
        this.server = server;
        this.secure = secure;
        this.metadata = metadata;
    }

    /**
         * 获取实例ID
         * @return
         */
    @Override
    public String getInstanceId() {
        return this.server.getId();
    }

    /**
         * 获取服务ID
         * @return
         */
    @Override
    public String getServiceId() {
        return this.serviceId;
    }

    /**
         * 获取主机地址
         * @return
         */
    @Override
    public String getHost() {
        return this.server.getHost();
    }

    /**
         * 获取端口
         * @return
         */
    @Override
    public int getPort() {
        return this.server.getPort();
    }

    /**
         * 是否是安全的
         * @return
         */
    @Override
    public boolean isSecure() {
        return this.secure;
    }

    /**
         * 获取URI
         * @return
         */
    @Override
    public URI getUri() {
        return DefaultServiceInstance.getUri(this);
    }

    /**
         * 获取元数据
         * @return
         */
    @Override
    public Map<String, String> getMetadata() {
        return this.metadata;
    }

    /**
         * Server中包含了很多信息,例如:端口、主机、时区时间...
         * @return
         */
    public Server getServer() {
        return this.server;
    }

    @Override
    public String getScheme() {
        return this.server.getScheme();
    }

    /**
         * toString
         * @return
         */
    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("RibbonServer{");
        sb.append("serviceId='").append(serviceId).append('\'');
        sb.append(", server=").append(server);
        sb.append(", secure=").append(secure);
        sb.append(", metadata=").append(metadata);
        sb.append('}');
        return sb.toString();
    }

}
ServiceInstance的一般默认实现源码(非Ribbon)
package org.springframework.cloud.client;

import java.net.URI;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;

public class DefaultServiceInstance implements ServiceInstance {
    private final String instanceId;
    private final String serviceId;
    private final String host;
    private final int port;
    private final boolean secure;
    private final Map<String, String> metadata;

    public DefaultServiceInstance(String instanceId, String serviceId, String host, int port, boolean secure, Map<String, String> metadata) {
        this.instanceId = instanceId;
        this.serviceId = serviceId;
        this.host = host;
        this.port = port;
        this.secure = secure;
        this.metadata = metadata;
    }

    public DefaultServiceInstance(String instanceId, String serviceId, String host, int port, boolean secure) {
        this(instanceId, serviceId, host, port, secure, new LinkedHashMap());
    }

    /** @deprecated */
    @Deprecated
    public DefaultServiceInstance(String serviceId, String host, int port, boolean secure, Map<String, String> metadata) {
        this((String)null, serviceId, host, port, secure, metadata);
    }

    /** @deprecated */
    @Deprecated
    public DefaultServiceInstance(String serviceId, String host, int port, boolean secure) {
        this(serviceId, host, port, secure, new LinkedHashMap());
    }

    public static URI getUri(ServiceInstance instance) {
        String scheme = instance.isSecure() ? "https" : "http";
        String uri = String.format("%s://%s:%s", scheme, instance.getHost(), instance.getPort());
        return URI.create(uri);
    }

    public URI getUri() {
        return getUri(this);
    }

    public Map<String, String> getMetadata() {
        return this.metadata;
    }

    public String getInstanceId() {
        return this.instanceId;
    }

    public String getServiceId() {
        return this.serviceId;
    }

    public String getHost() {
        return this.host;
    }

    public int getPort() {
        return this.port;
    }

    public boolean isSecure() {
        return this.secure;
    }

    public String toString() {
        return "DefaultServiceInstance{instanceId='" + this.instanceId + '\'' + ", serviceId='" + this.serviceId + '\'' + ", host='" + this.host + '\'' + ", port=" + this.port + ", secure=" + this.secure + ", metadata=" + this.metadata + '}';
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        } else if (o != null && this.getClass() == o.getClass()) {
            DefaultServiceInstance that = (DefaultServiceInstance)o;
            return this.port == that.port && this.secure == that.secure && Objects.equals(this.instanceId, that.instanceId) && Objects.equals(this.serviceId, that.serviceId) && Objects.equals(this.host, that.host) && Objects.equals(this.metadata, that.metadata);
        } else {
            return false;
        }
    }

    public int hashCode() {
        return Objects.hash(new Object[]{this.instanceId, this.serviceId, this.host, this.port, this.secure, this.metadata});
    }
}

Ribbon的11种负载均衡策略

Ctrl + Alt + b查看IRule的实现类,也就是Ribbon的负载均衡策略,默认是轮询分发RoundRobinRule

1.AbstractLoadBalancerRule

提供用于设置和获取负载平衡器的默认实现。

2.AvailabilityFilteringRule

负载均衡器规则,过滤掉服务器。

3.BestAvailableRule

一种用“跳闸”断路器跳过服务器并选择具有最低并发请求的服务器。

4.ClientConfigEnabledRoundRobinRule

类中定义的RoundRobinRule类loadbalance包。

5.PredicateBasedRule

将服务器过滤逻辑委托给{@link AbstractServerPredicate}实例的规则。过滤后,服务器将以循环方式从过滤列表中返回。

6.RandomRule

一种随机分配现有流量的负载平衡策略服务器。

源码:

package com.netflix.loadbalancer;

import com.netflix.client.config.IClientConfig;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

/**
 * A loadbalacing strategy that randomly distributes traffic amongst existing
 * servers.
 * 
 * @author stonse
 * 
 */
public class RandomRule extends AbstractLoadBalancerRule {

    /**
     * Randomly choose from all living servers
     */
    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        }
        Server server = null;

        while (server == null) {
            if (Thread.interrupted()) {
                return null;
            }
            List<Server> upList = lb.getReachableServers();
            List<Server> allList = lb.getAllServers();

            int serverCount = allList.size();
            if (serverCount == 0) {
                /*
                 * No servers. End regardless of pass, because subsequent passes
                 * only get more restrictive.
                 */
                return null;
            }

            int index = chooseRandomInt(serverCount);
            server = upList.get(index);

            if (server == null) {
                /*
                 * The only time this should happen is if the server list were
                 * somehow trimmed. This is a transient condition. Retry after
                 * yielding.
                 */
                Thread.yield();
                continue;
            }

            if (server.isAlive()) {
                return (server);
            }

            // Shouldn't actually happen.. but must be transient or a bug.
            server = null;
            Thread.yield();
        }

        return server;

    }

    protected int chooseRandomInt(int serverCount) {
        return ThreadLocalRandom.current().nextInt(serverCount);
    }

    @Override
    public Server choose(Object key) {
        return choose(getLoadBalancer(), key);
    }

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        // TODO Auto-generated method stub

    }
}

7.ResponseTimeWeightedRule

使用平均/百分位数响应时间的规则为每个服务器分配动态“权重”,然后在加权循环。

8.RetryRule

考虑到{@link IRule}可以级联,这个{@link RetryRule}类允许向现有规则添加重试逻辑。

9.RoundRobinRule

Ribbon默认的负载均衡策略,轮询分发。

源码:

package com.netflix.loadbalancer;

import com.netflix.client.config.IClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * The most well known and basic load balancing strategy, i.e. Round Robin Rule.
 *
 * @author stonse
 * @author Nikos Michalakis <nikos@netflix.com>
 *
 */
public class RoundRobinRule extends AbstractLoadBalancerRule {

    private AtomicInteger nextServerCyclicCounter;
    private static final boolean AVAILABLE_ONLY_SERVERS = true;
    private static final boolean ALL_SERVERS = false;

    private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);

    public RoundRobinRule() {
        nextServerCyclicCounter = new AtomicInteger(0);
    }

    public RoundRobinRule(ILoadBalancer lb) {
        this();
        setLoadBalancer(lb);
    }

    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        }

        Server server = null;
        int count = 0;
        while (server == null && count++ < 10) {
            List<Server> reachableServers = lb.getReachableServers();
            List<Server> allServers = lb.getAllServers();
            int upCount = reachableServers.size();
            int serverCount = allServers.size();

            if ((upCount == 0) || (serverCount == 0)) {
                log.warn("No up servers available from load balancer: " + lb);
                return null;
            }

            int nextServerIndex = incrementAndGetModulo(serverCount);
            server = allServers.get(nextServerIndex);

            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }

            if (server.isAlive() && (server.isReadyToServe())) {
                return (server);
            }

            // Next.
            server = null;
        }

        if (count >= 10) {
            log.warn("No available alive servers after 10 tries from load balancer: "
                    + lb);
        }
        return server;
    }

    /**
     * Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
     *
     * @param modulo The modulo to bound the value of the counter.
     * @return The next value.
     */
    private int incrementAndGetModulo(int modulo) {
        for (;;) {
            int current = nextServerCyclicCounter.get();
            int next = (current + 1) % modulo;
            if (nextServerCyclicCounter.compareAndSet(current, next))
                return next;
        }
    }

    @Override
    public Server choose(Object key) {
        return choose(getLoadBalancer(), key);
    }

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
    }
}

10.ZoneAvoidanceRule

根据区域筛选服务器的规则。

源码:

package com.netflix.loadbalancer;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;

import com.netflix.client.config.IClientConfig;

/**
 * A rule that uses the a {@link CompositePredicate} to filter servers based on zone and availability. The primary predicate is composed of
 * a {@link ZoneAvoidancePredicate} and {@link AvailabilityPredicate}, with the fallbacks to {@link AvailabilityPredicate}
 * and an "always true" predicate returned from {@link AbstractServerPredicate#alwaysTrue()} 
 * 
 * @author awang
 *
 */
public class ZoneAvoidanceRule extends PredicateBasedRule {

    private static final Random random = new Random();

    private CompositePredicate compositePredicate;

    public ZoneAvoidanceRule() {
        super();
        ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
        AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
        compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
    }

    private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
        return CompositePredicate.withPredicates(p1, p2)
                             .addFallbackPredicate(p2)
                             .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                             .build();

    }


    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig);
        AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig);
        compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
    }

    static Map<String, ZoneSnapshot> createSnapshot(LoadBalancerStats lbStats) {
        Map<String, ZoneSnapshot> map = new HashMap<String, ZoneSnapshot>();
        for (String zone : lbStats.getAvailableZones()) {
            ZoneSnapshot snapshot = lbStats.getZoneSnapshot(zone);
            map.put(zone, snapshot);
        }
        return map;
    }

    static String randomChooseZone(Map<String, ZoneSnapshot> snapshot,
            Set<String> chooseFrom) {
        if (chooseFrom == null || chooseFrom.size() == 0) {
            return null;
        }
        String selectedZone = chooseFrom.iterator().next();
        if (chooseFrom.size() == 1) {
            return selectedZone;
        }
        int totalServerCount = 0;
        for (String zone : chooseFrom) {
            totalServerCount += snapshot.get(zone).getInstanceCount();
        }
        int index = random.nextInt(totalServerCount) + 1;
        int sum = 0;
        for (String zone : chooseFrom) {
            sum += snapshot.get(zone).getInstanceCount();
            if (index <= sum) {
                selectedZone = zone;
                break;
            }
        }
        return selectedZone;
    }

    public static Set<String> getAvailableZones(
            Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
            double triggeringBlackoutPercentage) {
        if (snapshot.isEmpty()) {
            return null;
        }
        Set<String> availableZones = new HashSet<String>(snapshot.keySet());
        if (availableZones.size() == 1) {
            return availableZones;
        }
        Set<String> worstZones = new HashSet<String>();
        double maxLoadPerServer = 0;
        boolean limitedZoneAvailability = false;

        for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
            String zone = zoneEntry.getKey();
            ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
            int instanceCount = zoneSnapshot.getInstanceCount();
            if (instanceCount == 0) {
                availableZones.remove(zone);
                limitedZoneAvailability = true;
            } else {
                double loadPerServer = zoneSnapshot.getLoadPerServer();
                if (((double) zoneSnapshot.getCircuitTrippedCount())
                        / instanceCount >= triggeringBlackoutPercentage
                        || loadPerServer < 0) {
                    availableZones.remove(zone);
                    limitedZoneAvailability = true;
                } else {
                    if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
                        // they are the same considering double calculation
                        // round error
                        worstZones.add(zone);
                    } else if (loadPerServer > maxLoadPerServer) {
                        maxLoadPerServer = loadPerServer;
                        worstZones.clear();
                        worstZones.add(zone);
                    }
                }
            }
        }

        if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
            // zone override is not needed here
            return availableZones;
        }
        String zoneToAvoid = randomChooseZone(snapshot, worstZones);
        if (zoneToAvoid != null) {
            availableZones.remove(zoneToAvoid);
        }
        return availableZones;

    }

    public static Set<String> getAvailableZones(LoadBalancerStats lbStats,
            double triggeringLoad, double triggeringBlackoutPercentage) {
        if (lbStats == null) {
            return null;
        }
        Map<String, ZoneSnapshot> snapshot = createSnapshot(lbStats);
        return getAvailableZones(snapshot, triggeringLoad,
                triggeringBlackoutPercentage);
    }

    @Override
    public AbstractServerPredicate getPredicate() {
        return compositePredicate;
    }    
}

11.WeightedResponseTimeRule

使用平均/百分位数响应时间的规则为每个服务器分配动态“权重”,然后在“加权循环”。

建议:如果部分机器配置强,则可以改为 WeightedResponseTimeRule。

如何更改默认的Ribbon负载均衡策略

官方文档参考:

https://cloud.spring.io/spring-cloud-static/Finchley.RELEASE/single/spring-cloud.html#_customizing_the_ribbon_client_by_setting_properties

To set the IRule for a service name called users, you could set the following properties:

application.yml.

users:
  ribbon:
    NIWSServerListClassName: com.netflix.loadbalancer.ConfigurationBasedServerList
    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.WeightedResponseTimeRule

解读:

users就是你的服务名称,需要调整负载均衡策略的服务名称。

配置示例:

# 自定义Ribbon负载均衡策略
product-service-01:
  ribbon:
    # ConfigurationBasedServerList该属性是在调用时加载该配置信息用的,这种方式是通过配置文件读取的方式,ribbon还提供了其它几种获取服务列表的方式。参考:https://www.jianshu.com/p/cad86cea0ef7
    # NIWSServerListClassName是ribbon的其中一个配置属性,全部属性在CommonClientConfigKey类中,大约有七十多个。
    #NIWSServerListClassName: com.netflix.loadbalancer.ConfigurationBasedServerList
    # 使用哪种负载均衡策略,默认是com.netflix.loadbalancer.RoundRobinRule,轮询分发
    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RoundRobinRule

Ribbon负载均衡器

解析ILoadBalancer
模块图

Ribbon的核心是基于LoadBalancer负载均衡器来实现的,Ribbon通过ILoadBalancer接口对外提供统一的选择服务器(Server)的功能基础接口ILoadBalancer主要提供一下方法:

public interface ILoadBalancer {
    //向负载均衡器中维护的实例列表中,增加服务实例
    public void addServers(List<Server> newServers);
    //通过某种策略,从负载均衡器中挑选出具体的服务实例
    public Server chooseServer(Object key);
    //用来通知和标识负载均衡器中,某个具体的实例已经停止服务
    public void markServerDown(Server server);
    //获取当前服务的实例列表
    public List<Server> getReachableServers();
    //获取所有的服务实例列表,包括正常服务和停止服务的实例
    public List<Server> getAllServers();
}
ILoadBalancer源码
package com.netflix.loadbalancer;

import java.util.List;

/**
 * Interface that defines the operations for a software loadbalancer. A typical
 * loadbalancer minimally need a set of servers to loadbalance for, a method to
 * mark a particular server to be out of rotation and a call that will choose a
 * server from the existing list of server.
 * 
 * @author stonse
 * 
 */
public interface ILoadBalancer {

    /**
     * Initial list of servers.
     * This API also serves to add additional ones at a later time
     * The same logical server (host:port) could essentially be added multiple times
     * (helpful in cases where you want to give more "weightage" perhaps ..)
     * 
     * @param newServers new servers to add
     */
    public void addServers(List<Server> newServers);

    /**
     * Choose a server from load balancer.
     * 
     * @param key An object that the load balancer may use to determine which server to return. null if 
     *         the load balancer does not use this parameter.
     * @return server chosen
     */
    public Server chooseServer(Object key);

    /**
     * To be called by the clients of the load balancer to notify that a Server is down
     * else, the LB will think its still Alive until the next Ping cycle - potentially
     * (assuming that the LB Impl does a ping)
     * 
     * @param server Server to mark as down
     */
    public void markServerDown(Server server);

    /**
     * @deprecated 2016-01-20 This method is deprecated in favor of the
     * cleaner {@link #getReachableServers} (equivalent to availableOnly=true)
     * and {@link #getAllServers} API (equivalent to availableOnly=false).
     *
     * Get the current list of servers.
     *
     * @param availableOnly if true, only live and available servers should be returned
     */
    @Deprecated
    public List<Server> getServerList(boolean availableOnly);

    /**
     * @return Only the servers that are up and reachable.
     */
    public List<Server> getReachableServers();

    /**
     * @return All known servers, both reachable and unreachable.
     */
    public List<Server> getAllServers();
}
  • addServers:向负载均衡器中维护的实例列表增加服务实例。
  • chooseServer:通过某种策略,从负载均衡器中挑选出一个具体的服务实例。
  • markServerDown:用来通知和标识负载均衡器中某个具体实例已经停止服务,不然负载均衡器在下一次获取服务实例清单前都会认为服务实例均是正常服务的。
  • getReachableServers:获取当前正常服务的实例列表。
  • getAllServers:获取所有已知的服务实例列表,包括正常服务和停止服务的实例。

在该接口定义中涉及到的Server对象定义的是一个传统的服务端节点,在该类中存储了服务端节点的一些元数据信息,包括:host、port以及一些部署信息等。

PingUrl源码

PingUrl类主要是对拼接请求地址的时候,做一些业务上的处理。

package com.netflix.loadbalancer;

import java.io.IOException;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PingUrl implements IPing {
    private static final Logger LOGGER = LoggerFactory.getLogger(PingUrl.class);
    String pingAppendString = "";
    boolean isSecure = false;
    String expectedContent = null;

    public PingUrl() {
    }

    public PingUrl(boolean isSecure, String pingAppendString) {
        this.isSecure = isSecure;
        this.pingAppendString = pingAppendString != null ? pingAppendString : "";
    }

    public void setPingAppendString(String pingAppendString) {
        this.pingAppendString = pingAppendString != null ? pingAppendString : "";
    }

    public String getPingAppendString() {
        return this.pingAppendString;
    }

    public boolean isSecure() {
        return this.isSecure;
    }

    public void setSecure(boolean isSecure) {
        this.isSecure = isSecure;
    }

    public String getExpectedContent() {
        return this.expectedContent;
    }

    public void setExpectedContent(String expectedContent) {
        this.expectedContent = expectedContent;
    }

    public boolean isAlive(Server server) {
        String urlStr = "";
        if (this.isSecure) {
            urlStr = "https://";
        } else {
            urlStr = "http://";
        }

        urlStr = urlStr + server.getId();
        urlStr = urlStr + this.getPingAppendString();
        boolean isAlive = false;
        HttpClient httpClient = new DefaultHttpClient();
        HttpUriRequest getRequest = new HttpGet(urlStr);
        String content = null;

        try {
            HttpResponse response = httpClient.execute(getRequest);
            content = EntityUtils.toString(response.getEntity());
            isAlive = response.getStatusLine().getStatusCode() == 200;
            if (this.getExpectedContent() != null) {
                LOGGER.debug("content:" + content);
                if (content == null) {
                    isAlive = false;
                } else if (content.equals(this.getExpectedContent())) {
                    isAlive = true;
                } else {
                    isAlive = false;
                }
            }
        } catch (IOException var11) {
            var11.printStackTrace();
        } finally {
            getRequest.abort();
        }

        return isAlive;
    }

    public static void main(String[] args) {
        PingUrl p = new PingUrl(false, "/cs/hostRunning");
        p.setExpectedContent("true");
        Server s = new Server("ec2-75-101-231-85.compute-1.amazonaws.com", 7101);
        boolean isAlive = p.isAlive(s);
        System.out.println("isAlive:" + isAlive);
    }
}
ServerList源码

ServerList类:获取服务器列表的接口。

package com.netflix.loadbalancer;

import java.util.List;

/**
 * Interface that defines the methods sed to obtain the List of Servers
 * @author stonse
 *
 * @param <T>
 */
public interface ServerList<T extends Server> {

    //获取服务器列表
    public List<T> getInitialListOfServers();

    //返回更新的服务器列表。这称为负载均衡器的Ping周期,每隔30秒(可以配置)。
    public List<T> getUpdatedListOfServers();   

}
ILoadBalancer的实现类

1.AbstractLoadBalancer

AbstractLoadBalancerILoadBalancer接口的抽象实现。在该抽象类中定义了一个关于服务实例的分组枚举类ServerGroup,它包含了三种不同类型:ALL-所有服务实例、STATUS_UP-正常服务的实例、STATUS_NOT_UP-停止服务的实例;实现了一个chooseServer()函数,该函数通过调用接口中的chooseServer(Object key)实现,其中参数key为null,表示在选择具体服务实例时忽略key的条件判断;最后还定义了两个抽象函数,getServerList(ServerGroup serverGroup)定义了根据分组类型来获取不同的服务实例列表,getLoadBalancerStats()定义了获取LoadBalancerStats对象的方法,LoadBalancerStats对象被用来存储负载均衡器中各个服务实例当前的属性和统计信息,这些信息非常有用,我们可以利用这些信息来观察负载均衡器的运行情况,同时这些信息也是用来制定负载均衡策略的重要依据。

AbstractLoadBalancer源码:

public abstract class AbstractLoadBalancer implements ILoadBalancer {

    public enum ServerGroup{
        ALL,
        STATUS_UP,
        STATUS_NOT_UP        
    }

    /**
     * delegate to {@link #chooseServer(Object)} with parameter null.
     */
    public Server chooseServer() {
        return chooseServer(null);
    }


    /**
     * List of servers that this Loadbalancer knows about
     * 
     * @param serverGroup Servers grouped by status, e.g., {@link ServerGroup#STATUS_UP}
     */
    public abstract List<Server> getServerList(ServerGroup serverGroup);

    /**
     * Obtain LoadBalancer related Statistics
     */
    public abstract LoadBalancerStats getLoadBalancerStats();    
}
2.BaseLoadBalancer

BaseLoadBalancer类是Ribbon负载均衡器的基础实现类,在该类中定义很多关于均衡负载器相关的基础内容:

  • 定义并维护了两个存储服务实例Server对象的列表。一个用于存储所有服务实例的清单,一个用于存储正常服务的实例清单。

    @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> allServerList = Collections
            .synchronizedList(new ArrayList<Server>());
    @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> upServerList = Collections
            .synchronizedList(new ArrayList<Server>());
    
  • 定义了之前我们提到的用来存储负载均衡器各服务实例属性和统计信息的LoadBalancerStats对象。

  • 定义了检查服务实例是否正常服务的IPing对象,在BaseLoadBalancer中默认为null,需要在构造时注入它的具体实现。

  • 定义了检查服务实例操作的执行策略对象IPingStrategy,在BaseLoadBalancer中默认使用了该类中定义的静态内部类SerialPingStrategy实现。根据源码,我们可以看到该策略采用线性遍历ping服务实例的方式实现检查。该策略在当我们实现的IPing速度不理想,或是Server列表过大时,可能变的不是很为理想,这时候我们需要通过实现IPingStrategy接口并实现pingServers(IPing ping, Server[] servers)函数去扩展ping的执行策略。

    private static class SerialPingStrategy implements IPingStrategy {
        @Override
        public boolean[] pingServers(IPing ping, Server[] servers) {
            int numCandidates = servers.length;
            boolean[] results = new boolean[numCandidates];
    
            if (logger.isDebugEnabled()) {
                logger.debug("LoadBalancer:  PingTask executing ["
                             + numCandidates + "] servers configured");
            }
    
            for (int i = 0; i < numCandidates; i++) {
                results[i] = false;
                try {
                    if (ping != null) {
                        results[i] = ping.isAlive(servers[i]);
                    }
                } catch (Throwable t) {
                    logger.error("Exception while pinging Server:"
                                 + servers[i], t);
                }
            }
            return results;
        }
    }
    
  • 定义了负载均衡的处理规则IRule对象,从BaseLoadBalancerchooseServer(Object key)的实现源码,我们可以知道负载均衡器实际进行服务实例选择任务是委托给了IRule实例中的choose函数来实现。而在这里,默认初始化了RoundRobinRuleIRule的实现对象。RoundRobinRule实现了最基本且常用的线性负载均衡规则。

    public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                return rule.choose(key);
            } catch (Throwable t) {
                return null;
            }
        }
    }
    
  • 启动ping任务:在BaseLoadBalancer的默认构造函数中,会直接启动一个用于定时检查Server是否健康的任务。该任务默认的执行间隔为:10秒。

  • 实现了ILoadBalancer接口定义的负载均衡器应具备的一系列基本操作:

    • addServers(List newServers):向负载均衡器中增加新的服务实例列表,该实现将原本已经维护着的所有服务实例清单allServerList和新传入的服务实例清单newServers都加入到newList中,然后通过调用setServersList函数对newList进行处理,在BaseLoadBalancer中实现的时候会使用新的列表覆盖旧的列表。而之后介绍的几个扩展实现类对于服务实例清单更新的优化都是对setServersList函数的重写来实现的。

      public void addServers(List<Server> newServers) {
          if (newServers != null && newServers.size() > 0) {
              try {
                  ArrayList<Server> newList = new ArrayList<Server>();
                  newList.addAll(allServerList);
                  newList.addAll(newServers);
                  setServersList(newList);
              } catch (Exception e) {
                  logger.error("Exception while adding Servers", e);
              }
          }
      }
      
    • chooseServer(Object key):挑选一个具体的服务实例。

    • markServerDown(Server server):标记某个服务实例暂停服务。

      public void markServerDown(Server server) {
          if (server == null) {
              return;
          }
          if (!server.isAlive()) {
              return;
          }
          logger.error("LoadBalancer:  markServerDown called on ["
                  + server.getId() + "]");
          server.setAlive(false);
          notifyServerStatusChangeListener(singleton(server));
      }
      
    • getReachableServers():获取可用的服务实例列表。由于BaseLoadBalancer中单独维护了一个正常服务的实例清单,所以直接返回即可。

      public List<Server> getReachableServers() {
          return Collections.unmodifiableList(upServerList);
      }
      
    • getAllServers():获取所有的服务实例列表。由于BaseLoadBalancer中单独维护了一个所有服务的实例清单,所以也直接返回它即可。

      public List<Server> getAllServers() {
          return Collections.unmodifiableList(allServerList);
      }
      

BaseLoadBalancer源码:

package com.netflix.loadbalancer;

import static java.util.Collections.singleton;

import com.google.common.collect.ImmutableList;
import com.netflix.client.ClientFactory;
import com.netflix.client.IClientConfigAware;
import com.netflix.client.PrimeConnections;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.Monitors;
import com.netflix.util.concurrent.ShutdownEnabledTimer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * A basic implementation of the load balancer where an arbitrary list of
 * servers can be set as the server pool. A ping can be set to determine the
 * liveness of a server. Internally, this class maintains an "all" server list
 * and an "up" server list and use them depending on what the caller asks for.
 * 
 * @author stonse
 * 
 */
public class BaseLoadBalancer extends AbstractLoadBalancer implements
        PrimeConnections.PrimeConnectionListener, IClientConfigAware {

    private static Logger logger = LoggerFactory
            .getLogger(BaseLoadBalancer.class);
    private final static IRule DEFAULT_RULE = new RoundRobinRule();
    private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
    private static final String DEFAULT_NAME = "default";
    private static final String PREFIX = "LoadBalancer_";

    protected IRule rule = DEFAULT_RULE;

    protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;

    protected IPing ping = null;

    @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> allServerList = Collections
            .synchronizedList(new ArrayList<Server>());
    @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> upServerList = Collections
            .synchronizedList(new ArrayList<Server>());

    protected ReadWriteLock allServerLock = new ReentrantReadWriteLock();
    protected ReadWriteLock upServerLock = new ReentrantReadWriteLock();

    protected String name = DEFAULT_NAME;

    protected Timer lbTimer = null;
    protected int pingIntervalSeconds = 10;
    protected int maxTotalPingTimeSeconds = 5;
    protected Comparator<Server> serverComparator = new ServerComparator();

    protected AtomicBoolean pingInProgress = new AtomicBoolean(false);

    protected LoadBalancerStats lbStats;

    private volatile Counter counter = Monitors.newCounter("LoadBalancer_ChooseServer");

    private PrimeConnections primeConnections;

    private volatile boolean enablePrimingConnections = false;

    private IClientConfig config;

    private List<ServerListChangeListener> changeListeners = new CopyOnWriteArrayList<ServerListChangeListener>();

    private List<ServerStatusChangeListener> serverStatusListeners = new CopyOnWriteArrayList<ServerStatusChangeListener>();

    /**
     * Default constructor which sets name as "default", sets null ping, and
     * {@link RoundRobinRule} as the rule.
     * <p>
     * This constructor is mainly used by {@link ClientFactory}. Calling this
     * constructor must be followed by calling {@link #init()} or
     * {@link #initWithNiwsConfig(IClientConfig)} to complete initialization.
     * This constructor is provided for reflection. When constructing
     * programatically, it is recommended to use other constructors.
     */
    public BaseLoadBalancer() {
        this.name = DEFAULT_NAME;
        this.ping = null;
        setRule(DEFAULT_RULE);
        setupPingTask();
        lbStats = new LoadBalancerStats(DEFAULT_NAME);
    }

    public BaseLoadBalancer(String lbName, IRule rule, LoadBalancerStats lbStats) {
        this(lbName, rule, lbStats, null);
    }

    public BaseLoadBalancer(IPing ping, IRule rule) {
        this(DEFAULT_NAME, rule, new LoadBalancerStats(DEFAULT_NAME), ping);
    }

    public BaseLoadBalancer(IPing ping, IRule rule, IPingStrategy pingStrategy) {
        this(DEFAULT_NAME, rule, new LoadBalancerStats(DEFAULT_NAME), ping, pingStrategy);
    }

    public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
            IPing ping) {
        this(name, rule, stats, ping, DEFAULT_PING_STRATEGY);
    }

    public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
            IPing ping, IPingStrategy pingStrategy) {

        logger.debug("LoadBalancer [{}]:  initialized", name);

        this.name = name;
        this.ping = ping;
        this.pingStrategy = pingStrategy;
        setRule(rule);
        setupPingTask();
        lbStats = stats;
        init();
    }

    public BaseLoadBalancer(IClientConfig config) {
        initWithNiwsConfig(config);
    }

    public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) {
        initWithConfig(config, rule, ping, createLoadBalancerStatsFromConfig(config));
    }

    void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping) {
        initWithConfig(clientConfig, rule, ping, createLoadBalancerStatsFromConfig(config));
    }

    void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {
        this.config = clientConfig;
        String clientName = clientConfig.getClientName();
        this.name = clientName;
        int pingIntervalTime = Integer.parseInt(""
                + clientConfig.getProperty(
                        CommonClientConfigKey.NFLoadBalancerPingInterval,
                        Integer.parseInt("30")));
        int maxTotalPingTime = Integer.parseInt(""
                + clientConfig.getProperty(
                        CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,
                        Integer.parseInt("2")));

        setPingInterval(pingIntervalTime);
        setMaxTotalPingTime(maxTotalPingTime);

        // cross associate with each other
        // i.e. Rule,Ping meet your container LB
        // LB, these are your Ping and Rule guys ...
        setRule(rule);
        setPing(ping);

        setLoadBalancerStats(stats);
        rule.setLoadBalancer(this);
        if (ping instanceof AbstractLoadBalancerPing) {
            ((AbstractLoadBalancerPing) ping).setLoadBalancer(this);
        }
        logger.info("Client: {} instantiated a LoadBalancer: {}", name, this);
        boolean enablePrimeConnections = clientConfig.get(
                CommonClientConfigKey.EnablePrimeConnections, DefaultClientConfigImpl.DEFAULT_ENABLE_PRIME_CONNECTIONS);

        if (enablePrimeConnections) {
            this.setEnablePrimingConnections(true);
            PrimeConnections primeConnections = new PrimeConnections(
                    this.getName(), clientConfig);
            this.setPrimeConnections(primeConnections);
        }
        init();

    }

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        String ruleClassName = (String) clientConfig
                .getProperty(CommonClientConfigKey.NFLoadBalancerRuleClassName);
        String pingClassName = (String) clientConfig
                .getProperty(CommonClientConfigKey.NFLoadBalancerPingClassName);
        IRule rule;
        IPing ping;
        LoadBalancerStats stats;
        try {
            rule = (IRule) ClientFactory.instantiateInstanceWithClientConfig(
                    ruleClassName, clientConfig);
            ping = (IPing) ClientFactory.instantiateInstanceWithClientConfig(
                    pingClassName, clientConfig);
            stats = createLoadBalancerStatsFromConfig(clientConfig);
        } catch (Exception e) {
            throw new RuntimeException("Error initializing load balancer", e);
        }
        initWithConfig(clientConfig, rule, ping, stats);
    }

    private LoadBalancerStats createLoadBalancerStatsFromConfig(IClientConfig clientConfig) {
        String loadBalancerStatsClassName = clientConfig
                .get(CommonClientConfigKey.NFLoadBalancerStatsClassName, LoadBalancerStats.class.getName());
        try {
            return (LoadBalancerStats) ClientFactory.instantiateInstanceWithClientConfig(
                    loadBalancerStatsClassName, clientConfig);
        } catch (Exception e) {
            logger.warn("Error initializing configured LoadBalancerStats class - " + String.valueOf(loadBalancerStatsClassName)
                    + ". Falling-back to a new LoadBalancerStats instance instead.", e);
            return new LoadBalancerStats(clientConfig.getClientName());
        }
    }

    public void addServerListChangeListener(ServerListChangeListener listener) {
        changeListeners.add(listener);
    }

    public void removeServerListChangeListener(ServerListChangeListener listener) {
        changeListeners.remove(listener);
    }

    public void addServerStatusChangeListener(ServerStatusChangeListener listener) {
        serverStatusListeners.add(listener);
    }

    public void removeServerStatusChangeListener(ServerStatusChangeListener listener) {
        serverStatusListeners.remove(listener);
    }

    public IClientConfig getClientConfig() {
        return config;
    }

    private boolean canSkipPing() {
        if (ping == null
                || ping.getClass().getName().equals(DummyPing.class.getName())) {
            // default ping, no need to set up timer
            return true;
        } else {
            return false;
        }
    }

    void setupPingTask() {
        if (canSkipPing()) {
            return;
        }
        if (lbTimer != null) {
            lbTimer.cancel();
        }
        lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
                true);
        lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
        forceQuickPing();
    }

    /**
     * Set the name for the load balancer. This should not be called since name
     * should be immutable after initialization. Calling this method does not
     * guarantee that all other data structures that depend on this name will be
     * changed accordingly.
     */
    void setName(String name) {
        // and register
        this.name = name;
        if (lbStats == null) {
            lbStats = new LoadBalancerStats(name);
        } else {
            lbStats.setName(name);
        }
    }

    public String getName() {
        return name;
    }

    @Override
    public LoadBalancerStats getLoadBalancerStats() {
        return lbStats;
    }

    public void setLoadBalancerStats(LoadBalancerStats lbStats) {
        this.lbStats = lbStats;
    }

    public Lock lockAllServerList(boolean write) {
        Lock aproposLock = write ? allServerLock.writeLock() : allServerLock
                .readLock();
        aproposLock.lock();
        return aproposLock;
    }

    public Lock lockUpServerList(boolean write) {
        Lock aproposLock = write ? upServerLock.writeLock() : upServerLock
                .readLock();
        aproposLock.lock();
        return aproposLock;
    }

    public void setPingInterval(int pingIntervalSeconds) {
        if (pingIntervalSeconds < 1) {
            return;
        }

        this.pingIntervalSeconds = pingIntervalSeconds;
        if (logger.isDebugEnabled()) {
            logger.debug("LoadBalancer [{}]:  pingIntervalSeconds set to {}",
                name, this.pingIntervalSeconds);
        }
        setupPingTask(); // since ping data changed
    }

    public int getPingInterval() {
        return pingIntervalSeconds;
    }

    /*
     * Maximum time allowed for the ping cycle
     */
    public void setMaxTotalPingTime(int maxTotalPingTimeSeconds) {
        if (maxTotalPingTimeSeconds < 1) {
            return;
        }
        this.maxTotalPingTimeSeconds = maxTotalPingTimeSeconds;
        logger.debug("LoadBalancer [{}]: maxTotalPingTime set to {}", name, this.maxTotalPingTimeSeconds);

    }

    public int getMaxTotalPingTime() {
        return maxTotalPingTimeSeconds;
    }

    public IPing getPing() {
        return ping;
    }

    public IRule getRule() {
        return rule;
    }

    public boolean isPingInProgress() {
        return pingInProgress.get();
    }

    /* Specify the object which is used to send pings. */

    public void setPing(IPing ping) {
        if (ping != null) {
            if (!ping.equals(this.ping)) {
                this.ping = ping;
                setupPingTask(); // since ping data changed
            }
        } else {
            this.ping = null;
            // cancel the timer task
            lbTimer.cancel();
        }
    }

    /* Ignore null rules */

    public void setRule(IRule rule) {
        if (rule != null) {
            this.rule = rule;
        } else {
            /* default rule */
            this.rule = new RoundRobinRule();
        }
        if (this.rule.getLoadBalancer() != this) {
            this.rule.setLoadBalancer(this);
        }
    }

    /**
     * get the count of servers.
     * 
     * @param onlyAvailable
     *            if true, return only up servers.
     */
    public int getServerCount(boolean onlyAvailable) {
        if (onlyAvailable) {
            return upServerList.size();
        } else {
            return allServerList.size();
        }
    }

    /**
     * Add a server to the 'allServer' list; does not verify uniqueness, so you
     * could give a server a greater share by adding it more than once.
     */
    public void addServer(Server newServer) {
        if (newServer != null) {
            try {
                ArrayList<Server> newList = new ArrayList<Server>();

                newList.addAll(allServerList);
                newList.add(newServer);
                setServersList(newList);
            } catch (Exception e) {
                logger.error("LoadBalancer [{}]: Error adding newServer {}", name, newServer.getHost(), e);
            }
        }
    }

    /**
     * Add a list of servers to the 'allServer' list; does not verify
     * uniqueness, so you could give a server a greater share by adding it more
     * than once
     */
    @Override
    public void addServers(List<Server> newServers) {
        if (newServers != null && newServers.size() > 0) {
            try {
                ArrayList<Server> newList = new ArrayList<Server>();
                newList.addAll(allServerList);
                newList.addAll(newServers);
                setServersList(newList);
            } catch (Exception e) {
                logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e);
            }
        }
    }

    /*
     * Add a list of servers to the 'allServer' list; does not verify
     * uniqueness, so you could give a server a greater share by adding it more
     * than once USED by Test Cases only for legacy reason. DO NOT USE!!
     */
    void addServers(Object[] newServers) {
        if ((newServers != null) && (newServers.length > 0)) {

            try {
                ArrayList<Server> newList = new ArrayList<Server>();
                newList.addAll(allServerList);

                for (Object server : newServers) {
                    if (server != null) {
                        if (server instanceof String) {
                            server = new Server((String) server);
                        }
                        if (server instanceof Server) {
                            newList.add((Server) server);
                        }
                    }
                }
                setServersList(newList);
            } catch (Exception e) {
                logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e);
            }
        }
    }

    /**
     * Set the list of servers used as the server pool. This overrides existing
     * server list.
     */
    public void setServersList(List lsrv) {
        Lock writeLock = allServerLock.writeLock();
        logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name);

        ArrayList<Server> newServers = new ArrayList<Server>();
        writeLock.lock();
        try {
            ArrayList<Server> allServers = new ArrayList<Server>();
            for (Object server : lsrv) {
                if (server == null) {
                    continue;
                }

                if (server instanceof String) {
                    server = new Server((String) server);
                }

                if (server instanceof Server) {
                    logger.debug("LoadBalancer [{}]:  addServer [{}]", name, ((Server) server).getId());
                    allServers.add((Server) server);
                } else {
                    throw new IllegalArgumentException(
                            "Type String or Server expected, instead found:"
                                    + server.getClass());
                }

            }
            boolean listChanged = false;
            if (!allServerList.equals(allServers)) {
                listChanged = true;
                if (changeListeners != null && changeListeners.size() > 0) {
                   List<Server> oldList = ImmutableList.copyOf(allServerList);
                   List<Server> newList = ImmutableList.copyOf(allServers);                   
                   for (ServerListChangeListener l: changeListeners) {
                       try {
                           l.serverListChanged(oldList, newList);
                       } catch (Exception e) {
                           logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e);
                       }
                   }
                }
            }
            if (isEnablePrimingConnections()) {
                for (Server server : allServers) {
                    if (!allServerList.contains(server)) {
                        server.setReadyToServe(false);
                        newServers.add((Server) server);
                    }
                }
                if (primeConnections != null) {
                    primeConnections.primeConnectionsAsync(newServers, this);
                }
            }
            // This will reset readyToServe flag to true on all servers
            // regardless whether
            // previous priming connections are success or not
            allServerList = allServers;
            if (canSkipPing()) {
                for (Server s : allServerList) {
                    s.setAlive(true);
                }
                upServerList = allServerList;
            } else if (listChanged) {
                forceQuickPing();
            }
        } finally {
            writeLock.unlock();
        }
    }

    /* List in string form. SETS, does not add. */
    void setServers(String srvString) {
        if (srvString != null) {

            try {
                String[] serverArr = srvString.split(",");
                ArrayList<Server> newList = new ArrayList<Server>();

                for (String serverString : serverArr) {
                    if (serverString != null) {
                        serverString = serverString.trim();
                        if (serverString.length() > 0) {
                            Server svr = new Server(serverString);
                            newList.add(svr);
                        }
                    }
                }
                setServersList(newList);
            } catch (Exception e) {
                logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e);
            }
        }
    }

    /**
     * return the server
     * 
     * @param index
     * @param availableOnly
     */
    public Server getServerByIndex(int index, boolean availableOnly) {
        try {
            return (availableOnly ? upServerList.get(index) : allServerList
                    .get(index));
        } catch (Exception e) {
            return null;
        }
    }

    @Override
    public List<Server> getServerList(boolean availableOnly) {
        return (availableOnly ? getReachableServers() : getAllServers());
    }

    @Override
    public List<Server> getReachableServers() {
        return Collections.unmodifiableList(upServerList);
    }

    @Override
    public List<Server> getAllServers() {
        return Collections.unmodifiableList(allServerList);
    }

    @Override
    public List<Server> getServerList(ServerGroup serverGroup) {
        switch (serverGroup) {
        case ALL:
            return allServerList;
        case STATUS_UP:
            return upServerList;
        case STATUS_NOT_UP:
            ArrayList<Server> notAvailableServers = new ArrayList<Server>(
                    allServerList);
            ArrayList<Server> upServers = new ArrayList<Server>(upServerList);
            notAvailableServers.removeAll(upServers);
            return notAvailableServers;
        }
        return new ArrayList<Server>();
    }

    public void cancelPingTask() {
        if (lbTimer != null) {
            lbTimer.cancel();
        }
    }

    /**
     * TimerTask that keeps runs every X seconds to check the status of each
     * server/node in the Server List
     * 
     * @author stonse
     * 
     */
    class PingTask extends TimerTask {
        public void run() {
            try {
                new Pinger(pingStrategy).runPinger();
            } catch (Exception e) {
                logger.error("LoadBalancer [{}]: Error pinging", name, e);
            }
        }
    }

    /**
     * Class that contains the mechanism to "ping" all the instances
     * 
     * @author stonse
     *
     */
    class Pinger {

        private final IPingStrategy pingerStrategy;

        public Pinger(IPingStrategy pingerStrategy) {
            this.pingerStrategy = pingerStrategy;
        }

        public void runPinger() throws Exception {
            if (!pingInProgress.compareAndSet(false, true)) { 
                return; // Ping in progress - nothing to do
            }

            // we are "in" - we get to Ping

            Server[] allServers = null;
            boolean[] results = null;

            Lock allLock = null;
            Lock upLock = null;

            try {
                /*
                 * The readLock should be free unless an addServer operation is
                 * going on...
                 */
                allLock = allServerLock.readLock();
                allLock.lock();
                allServers = allServerList.toArray(new Server[allServerList.size()]);
                allLock.unlock();

                int numCandidates = allServers.length;
                results = pingerStrategy.pingServers(ping, allServers);

                final List<Server> newUpList = new ArrayList<Server>();
                final List<Server> changedServers = new ArrayList<Server>();

                for (int i = 0; i < numCandidates; i++) {
                    boolean isAlive = results[i];
                    Server svr = allServers[i];
                    boolean oldIsAlive = svr.isAlive();

                    svr.setAlive(isAlive);

                    if (oldIsAlive != isAlive) {
                        changedServers.add(svr);
                        logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                            name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
                    }

                    if (isAlive) {
                        newUpList.add(svr);
                    }
                }
                upLock = upServerLock.writeLock();
                upLock.lock();
                upServerList = newUpList;
                upLock.unlock();

                notifyServerStatusChangeListener(changedServers);
            } finally {
                pingInProgress.set(false);
            }
        }
    }

    private void notifyServerStatusChangeListener(final Collection<Server> changedServers) {
        if (changedServers != null && !changedServers.isEmpty() && !serverStatusListeners.isEmpty()) {
            for (ServerStatusChangeListener listener : serverStatusListeners) {
                try {
                    listener.serverStatusChanged(changedServers);
                } catch (Exception e) {
                    logger.error("LoadBalancer [{}]: Error invoking server status change listener", name, e);
                }
            }
        }
    }

    private final Counter createCounter() {
        return Monitors.newCounter("LoadBalancer_ChooseServer");
    }

    /*
     * Get the alive server dedicated to key
     * 
     * @return the dedicated server
     */
    public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                return rule.choose(key);
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }

    /* Returns either null, or "server:port/servlet" */
    public String choose(Object key) {
        if (rule == null) {
            return null;
        } else {
            try {
                Server svr = rule.choose(key);
                return ((svr == null) ? null : svr.getId());
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server", name, e);
                return null;
            }
        }
    }

    public void markServerDown(Server server) {
        if (server == null || !server.isAlive()) {
            return;
        }

        logger.error("LoadBalancer [{}]:  markServerDown called on [{}]", name, server.getId());
        server.setAlive(false);
        // forceQuickPing();

        notifyServerStatusChangeListener(singleton(server));
    }

    public void markServerDown(String id) {
        boolean triggered = false;

        id = Server.normalizeId(id);

        if (id == null) {
            return;
        }

        Lock writeLock = upServerLock.writeLock();
        writeLock.lock();
        try {
            final List<Server> changedServers = new ArrayList<Server>();

            for (Server svr : upServerList) {
                if (svr.isAlive() && (svr.getId().equals(id))) {
                    triggered = true;
                    svr.setAlive(false);
                    changedServers.add(svr);
                }
            }

            if (triggered) {
                logger.error("LoadBalancer [{}]:  markServerDown called for server [{}]", name, id);
                notifyServerStatusChangeListener(changedServers);
            }

        } finally {
            writeLock.unlock();
        }
    }

    /*
     * Force an immediate ping, if we're not currently pinging and don't have a
     * quick-ping already scheduled.
     */
    public void forceQuickPing() {
        if (canSkipPing()) {
            return;
        }
        logger.debug("LoadBalancer [{}]:  forceQuickPing invoking", name);

        try {
            new Pinger(pingStrategy).runPinger();
        } catch (Exception e) {
            logger.error("LoadBalancer [{}]: Error running forceQuickPing()", name, e);
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("{NFLoadBalancer:name=").append(this.getName())
                .append(",current list of Servers=").append(this.allServerList)
                .append(",Load balancer stats=")
                .append(this.lbStats.toString()).append("}");
        return sb.toString();
    }

    /**
     * Register with monitors and start priming connections if it is set.
     */
    protected void init() {
        Monitors.registerObject("LoadBalancer_" + name, this);
        // register the rule as it contains metric for available servers count
        Monitors.registerObject("Rule_" + name, this.getRule());
        if (enablePrimingConnections && primeConnections != null) {
            primeConnections.primeConnections(getReachableServers());
        }
    }

    public final PrimeConnections getPrimeConnections() {
        return primeConnections;
    }

    public final void setPrimeConnections(PrimeConnections primeConnections) {
        this.primeConnections = primeConnections;
    }

    @Override
    public void primeCompleted(Server s, Throwable lastException) {
        s.setReadyToServe(true);
    }

    public boolean isEnablePrimingConnections() {
        return enablePrimingConnections;
    }

    public final void setEnablePrimingConnections(
            boolean enablePrimingConnections) {
        this.enablePrimingConnections = enablePrimingConnections;
    }

    public void shutdown() {
        cancelPingTask();
        if (primeConnections != null) {
            primeConnections.shutdown();
        }
        Monitors.unregisterObject("LoadBalancer_" + name, this);
        Monitors.unregisterObject("Rule_" + name, this.getRule());
    }

    /**
     * Default implementation for <c>IPingStrategy</c>, performs ping
     * serially, which may not be desirable, if your <c>IPing</c>
     * implementation is slow, or you have large number of servers.
     */
    private static class SerialPingStrategy implements IPingStrategy {

        @Override
        public boolean[] pingServers(IPing ping, Server[] servers) {
            int numCandidates = servers.length;
            boolean[] results = new boolean[numCandidates];

            logger.debug("LoadBalancer:  PingTask executing [{}] servers configured", numCandidates);

            for (int i = 0; i < numCandidates; i++) {
                results[i] = false; /* Default answer is DEAD. */
                try {
                    // NOTE: IFF we were doing a real ping
                    // assuming we had a large set of servers (say 15)
                    // the logic below will run them serially
                    // hence taking 15 times the amount of time it takes
                    // to ping each server
                    // A better method would be to put this in an executor
                    // pool
                    // But, at the time of this writing, we dont REALLY
                    // use a Real Ping (its mostly in memory eureka call)
                    // hence we can afford to simplify this design and run
                    // this
                    // serially
                    if (ping != null) {
                        results[i] = ping.isAlive(servers[i]);
                    }
                } catch (Exception e) {
                    logger.error("Exception while pinging Server: '{}'", servers[i], e);
                }
            }
            return results;
        }
    }
}
3.DynamicServerListLoadBalancer

DynamicServerListLoadBalancer类继承于BaseLoadBalancer类,它是对基础负载均衡器的扩展。在该负载均衡器中,实现了服务实例清单的在运行期的动态更新能力;同时,它还具备了对服务实例清单的过滤功能,也就是说我们可以通过过滤器来选择性的获取一批服务实例清单。

DynamicServerListLoadBalancer源码:

package com.netflix.loadbalancer;

import com.google.common.annotations.VisibleForTesting;
import com.netflix.client.ClientFactory;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * A LoadBalancer that has the capabilities to obtain the candidate list of
 * servers using a dynamic source. i.e. The list of servers can potentially be
 * changed at Runtime. It also contains facilities wherein the list of servers
 * can be passed through a Filter criteria to filter out servers that do not
 * meet the desired criteria.
 * 
 * @author stonse
 * 
 */
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);

    boolean isSecure = false;
    boolean useTunnel = false;

    // to keep track of modification of server lists
    protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);

    volatile ServerList<T> serverListImpl;

    volatile ServerListFilter<T> filter;

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };

    protected volatile ServerListUpdater serverListUpdater;

    public DynamicServerListLoadBalancer() {
        super();
    }

    @Deprecated
    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, 
            ServerList<T> serverList, ServerListFilter<T> filter) {
        this(
                clientConfig,
                rule,
                ping,
                serverList,
                filter,
                new PollingServerListUpdater()
        );
    }

    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        restOfInit(clientConfig);
    }

    public DynamicServerListLoadBalancer(IClientConfig clientConfig) {
        initWithNiwsConfig(clientConfig);
    }

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        try {
            super.initWithNiwsConfig(clientConfig);
            String niwsServerListClassName = clientConfig.getPropertyAsString(
                    CommonClientConfigKey.NIWSServerListClassName,
                    DefaultClientConfigImpl.DEFAULT_SEVER_LIST_CLASS);

            ServerList<T> niwsServerListImpl = (ServerList<T>) ClientFactory
                    .instantiateInstanceWithClientConfig(niwsServerListClassName, clientConfig);
            this.serverListImpl = niwsServerListImpl;

            if (niwsServerListImpl instanceof AbstractServerList) {
                AbstractServerListFilter<T> niwsFilter = ((AbstractServerList) niwsServerListImpl)
                        .getFilterImpl(clientConfig);
                niwsFilter.setLoadBalancerStats(getLoadBalancerStats());
                this.filter = niwsFilter;
            }

            String serverListUpdaterClassName = clientConfig.getPropertyAsString(
                    CommonClientConfigKey.ServerListUpdaterClassName,
                    DefaultClientConfigImpl.DEFAULT_SERVER_LIST_UPDATER_CLASS
            );

            this.serverListUpdater = (ServerListUpdater) ClientFactory
                    .instantiateInstanceWithClientConfig(serverListUpdaterClassName, clientConfig);

            restOfInit(clientConfig);
        } catch (Exception e) {
            throw new RuntimeException(
                    "Exception while initializing NIWSDiscoveryLoadBalancer:"
                            + clientConfig.getClientName()
                            + ", niwsClientConfig:" + clientConfig, e);
        }
    }

    void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        enableAndInitLearnNewServersFeature();

        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }


    @Override
    public void setServersList(List lsrv) {
        super.setServersList(lsrv);
        List<T> serverList = (List<T>) lsrv;
        Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
        for (Server server : serverList) {
            // make sure ServerStats is created to avoid creating them on hot
            // path
            getLoadBalancerStats().getSingleServerStat(server);
            String zone = server.getZone();
            if (zone != null) {
                zone = zone.toLowerCase();
                List<Server> servers = serversInZones.get(zone);
                if (servers == null) {
                    servers = new ArrayList<Server>();
                    serversInZones.put(zone, servers);
                }
                servers.add(server);
            }
        }
        setServerListForZones(serversInZones);
    }

    protected void setServerListForZones(
            Map<String, List<Server>> zoneServersMap) {
        LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
        getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
    }

    public ServerList<T> getServerListImpl() {
        return serverListImpl;
    }

    public void setServerListImpl(ServerList<T> niwsServerList) {
        this.serverListImpl = niwsServerList;
    }

    public ServerListFilter<T> getFilter() {
        return filter;
    }

    public void setFilter(ServerListFilter<T> filter) {
        this.filter = filter;
    }

    public ServerListUpdater getServerListUpdater() {
        return serverListUpdater;
    }

    public void setServerListUpdater(ServerListUpdater serverListUpdater) {
        this.serverListUpdater = serverListUpdater;
    }

    @Override
    /**
     * Makes no sense to ping an inmemory disc client
     * 
     */
    public void forceQuickPing() {
        // no-op
    }

    /**
     * Feature that lets us add new instances (from AMIs) to the list of
     * existing servers that the LB will use Call this method if you want this
     * feature enabled
     */
    public void enableAndInitLearnNewServersFeature() {
        LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
        serverListUpdater.start(updateAction);
    }

    private String getIdentifier() {
        return this.getClientConfig().getClientName();
    }

    public void stopServerListRefreshing() {
        if (serverListUpdater != null) {
            serverListUpdater.stop();
        }
    }

    @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

    /**
     * Update the AllServer list in the LoadBalancer if necessary and enabled
     * 
     * @param ls
     */
    protected void updateAllServerList(List<T> ls) {
        // other threads might be doing this - in which case, we pass
        if (serverListUpdateInProgress.compareAndSet(false, true)) {
            try {
                for (T s : ls) {
                    s.setAlive(true); // set so that clients can start using these
                                      // servers right away instead
                                      // of having to wait out the ping cycle.
                }
                setServersList(ls);
                super.forceQuickPing();
            } finally {
                serverListUpdateInProgress.set(false);
            }
        }
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder("DynamicServerListLoadBalancer:");
        sb.append(super.toString());
        sb.append("ServerList:" + String.valueOf(serverListImpl));
        return sb.toString();
    }

    @Override 
    public void shutdown() {
        super.shutdown();
        stopServerListRefreshing();
    }


    @Monitor(name="LastUpdated", type=DataSourceType.INFORMATIONAL)
    public String getLastUpdate() {
        return serverListUpdater.getLastUpdate();
    }

    @Monitor(name="DurationSinceLastUpdateMs", type= DataSourceType.GAUGE)
    public long getDurationSinceLastUpdateMs() {
        return serverListUpdater.getDurationSinceLastUpdateMs();
    }

    @Monitor(name="NumUpdateCyclesMissed", type=DataSourceType.GAUGE)
    public int getNumberMissedCycles() {
        return serverListUpdater.getNumberMissedCycles();
    }

    @Monitor(name="NumThreads", type=DataSourceType.GAUGE)
    public int getCoreThreads() {
        return serverListUpdater.getCoreThreads();
    }
}
4.NoOpLoadBalancer

不做任何处理。

package com.netflix.loadbalancer;

import java.util.Collections;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A noOp Loadbalancer
 * i.e. doesnt do anything "loadbalancer like"
 * 
 * @author stonse
 *
 */
public class NoOpLoadBalancer extends AbstractLoadBalancer {

    static final Logger  logger = LoggerFactory.getLogger(NoOpLoadBalancer.class);


    @Override
    public void addServers(List<Server> newServers) {
        logger.info("addServers to NoOpLoadBalancer ignored");
    }

    @Override
    public Server chooseServer(Object key) {       
        return null;
    }

    @Override
    public LoadBalancerStats getLoadBalancerStats() {        
        return null;
    }


    @Override
    public List<Server> getServerList(ServerGroup serverGroup) {     
        return Collections.emptyList();
    }

    @Override
    public void markServerDown(Server server) {
        logger.info("markServerDown to NoOpLoadBalancer ignored");
    }

    @Override
    public List<Server> getServerList(boolean availableOnly) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public List<Server> getReachableServers() {
        return null;

    }

    @Override
    public List<Server> getAllServers() {
        return null;
    }
}
5.ZoneAwareLoadBalancer
  • 选择服务器时可以避免整个区域的负载均衡器。
  • 用于衡量区域条件的关键指标是平均活动请求,
    每个区域的每个休息客户端聚合。它是
    区域中的未完成请求总数除以可用目标实例的数量(不包括断路器跳闸实例)。当在坏区域上缓慢发生超时时,此度量标准非常有效。
  • LoadBalancer将计算并检查所有可用区域的区域统计信息。如果任何区域的平均活动请求已达到配置的阈值,则将从活动服务器列表中删除此区域。如果多个区域已达到阈值,则将删除每个服务器具有最多活动请求的区域。
    一旦最差区域被丢弃,将在其余区域中选择一个区域,其概率与其实例数量成比例。
    将使用给定规则从所选区域返回服务器(规则是负载平衡策略,例如{@link AvailabilityFilteringRule})
    对于每个请求,将重复上述步骤。也就是说,每个区域相关的负载平衡决策是实时进行的,其中包含帮助选择的最新统计数据。

ZoneAwareLoadBalancer源码:

package com.netflix.loadbalancer;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.netflix.client.ClientFactory;
import com.netflix.client.config.IClientConfig;
import com.netflix.config.DynamicBooleanProperty;
import com.netflix.config.DynamicDoubleProperty;
import com.netflix.config.DynamicPropertyFactory;

/**
 * Load balancer that can avoid a zone as a whole when choosing server. 
 *<p>
 * The key metric used to measure the zone condition is Average Active Requests,
which is aggregated per rest client per zone. It is the
total outstanding requests in a zone divided by number of available targeted instances (excluding circuit breaker tripped instances).
This metric is very effective when timeout occurs slowly on a bad zone.
<p>
The  LoadBalancer will calculate and examine zone stats of all available zones. If the Average Active Requests for any zone has reached a configured threshold, this zone will be dropped from the active server list. In case more than one zone has reached the threshold, the zone with the most active requests per server will be dropped.
Once the the worst zone is dropped, a zone will be chosen among the rest with the probability proportional to its number of instances.
A server will be returned from the chosen zone with a given Rule (A Rule is a load balancing strategy, for example {@link AvailabilityFilteringRule})
For each request, the steps above will be repeated. That is to say, each zone related load balancing decisions are made at real time with the up-to-date statistics aiding the choice.

 * @author awang
 *
 * @param <T>
 */
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {

    private ConcurrentHashMap<String, BaseLoadBalancer> balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();

    private static final Logger logger = LoggerFactory.getLogger(ZoneAwareLoadBalancer.class);

    private volatile DynamicDoubleProperty triggeringLoad;

    private volatile DynamicDoubleProperty triggeringBlackoutPercentage; 

    private static final DynamicBooleanProperty ENABLED = DynamicPropertyFactory.getInstance().getBooleanProperty("ZoneAwareNIWSDiscoveryLoadBalancer.enabled", true);

    void setUpServerList(List<Server> upServerList) {
        this.upServerList = upServerList;
    }

    public ZoneAwareLoadBalancer() {
        super();
    }

    @Deprecated
    public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
            IPing ping, ServerList<T> serverList, ServerListFilter<T> filter) {
        super(clientConfig, rule, ping, serverList, filter);
    }

    public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
                                 IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
                                 ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
    }

    public ZoneAwareLoadBalancer(IClientConfig niwsClientConfig) {
        super(niwsClientConfig);
    }

    @Override
    protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
        super.setServerListForZones(zoneServersMap);
        if (balancers == null) {
            balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
        }
        for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
            String zone = entry.getKey().toLowerCase();
            getLoadBalancer(zone).setServersList(entry.getValue());
        }
        // check if there is any zone that no longer has a server
        // and set the list to empty so that the zone related metrics does not
        // contain stale data
        for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
            if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
                existingLBEntry.getValue().setServersList(Collections.emptyList());
            }
        }
    }    

    @Override
    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        Server server = null;
        try {
            LoadBalancerStats lbStats = getLoadBalancerStats();
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            if (triggeringLoad == null) {
                triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
            }

            if (triggeringBlackoutPercentage == null) {
                triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
            }
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            logger.debug("Available zones: {}", availableZones);
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                logger.debug("Zone chosen: {}", zone);
                if (zone != null) {
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Exception e) {
            logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
        }
        if (server != null) {
            return server;
        } else {
            logger.debug("Zone avoidance logic is not invoked.");
            return super.chooseServer(key);
        }
    }

    @VisibleForTesting
    BaseLoadBalancer getLoadBalancer(String zone) {
        zone = zone.toLowerCase();
        BaseLoadBalancer loadBalancer = balancers.get(zone);
        if (loadBalancer == null) {
            // We need to create rule object for load balancer for each zone
            IRule rule = cloneRule(this.getRule());
            loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
            BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
            if (prev != null) {
                loadBalancer = prev;
            }
        } 
        return loadBalancer;        
    }

    private IRule cloneRule(IRule toClone) {
        IRule rule;
        if (toClone == null) {
            rule = new AvailabilityFilteringRule();
        } else {
            String ruleClass = toClone.getClass().getName();                
            try {
                rule = (IRule) ClientFactory.instantiateInstanceWithClientConfig(ruleClass, this.getClientConfig());
            } catch (Exception e) {
                throw new RuntimeException("Unexpected exception creating rule for ZoneAwareLoadBalancer", e);
            }
        }
        return rule;
    }


    @Override
    public void setRule(IRule rule) {
        super.setRule(rule);
        if (balancers != null) {
            for (String zone: balancers.keySet()) {
                balancers.get(zone).setRule(cloneRule(rule));
            }
        }
    }
}

Ribbon的Ping机制

在负载均衡器中,提供了 Ping 机制,每隔一段时间,会去 Ping 服务器,判断服务器是否存活,该工作由 com.netflix.loadbalancer.IPing 接口的实现类负责,如果单独使用 Ribbon 默认情况下不会激活 Ping 机制,默认的实现类为 DummyPing。

IPing接口源码

如果单独使用Ribbon(非SpringCloud Ribbon),没有注册中心Eureka,就需要手动指定服务器列表(商品服务product-service):

product-service-01:
  ribbon:
    listOfServers: localhost:8001,localhost:8002

完整配置application.yml

product-service-01:
  ribbon:
      # 配置服务器列表
    listOfServers: localhost:8001,localhost:8002
    # 配置负载均衡规则IRule的实现类
    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.WeightedResponseTimeRule
    # 配置负载均衡实现类
    NFLoadBalancerClassName: com.netflix.loadbalancer.ZoneAwareLoadBalancer
    # 配置IPing的实现类,com.lzhpo.ribbon.MyPing是实现IPing类
    NFLoadBalancerPingClassName: com.lzhpo.ribbon.MyIPing
    # 配置Ping操作的间隔(默认10s)
    NFLoadBalancerPingInterval: 2

禁用Eureka:

ribbon:
  eureka:
   enabled: false
↓↓↓ 打开微信关注我的微信公众号 ↓↓↓

会打篮球的程序猿

领取精品免费学习资料哦~
正文到此结束
本文目录