Ribbon source code analysis

Ribbon is an open source client load balancing project of Netflix, which is generally used with Eureka. However, in order to reduce other interference factors and focus on the ribbon, this time we leave Eureka to talk about the ribbon.

We talked about the last one RestTemplate source code analysis , today's article intends to use the @ LoadBalanced annotation to make the RestTemplate load balanced.

1, Simple example

First, we introduce ribbon dependency ()

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
            <version>2.1.4.RELEASE</version>
        </dependency>

Then directly annotate the Bean of RestTemplate with @ Ribbon

@Configuration
public class RestTemplateConfig {

    @Bean
    @LoadBalanced
    public RestTemplate getRestTemplate() {
        return new RestTemplate();
    }

}

Project configuration. demo is the name of the service accessed by the load balancer

Next, we will start two services demo1 and demo2 on ports 8081 and 8082, but their application Name is demo

server.port=8080
demo.ribbon.listOfServers=http://localhost:8081,http://localhost:8082

Directly access the demo service in the Controller

@RestController
public class Controller {

    @Autowired
    RestTemplate restTemplate;

    @RequestMapping("/test")
    public String test() {
        String result = restTemplate.getForObject("http://demo/test", String.class);
        System.out.println(result);
        return result;
    }
}

This is the controller of demo1 project, and demo2 returns 8082

@RestController
public class Controller {

    @RequestMapping("/test")
    public String test() {
        return "8081";
    }
}

For the configuration of demo1 project, the port of demo2 is 8082 and the name is still demo

spring.application.name=demo
server.port=8081

At this time, we visit localhost:8080/test

8081 and 8082 will be returned in turn to prove that the RestTemplate has the ability of load balancing.

When we close demo2 and make multiple calls, we find that 8081 and 500 errors will be returned in turn.

ps: please remember the phenomenon here, and then use the source code to explain why this phenomenon occurs.

2, @ LoadBalanced annotation

LoadBalanced is a combined annotation. Click to find it

/**
 * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient.
 * @author Spencer Gibb
 */
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {

}

From the top note, we can see:

The RestTemplate marked by LoadBalance. This RestTemplate will use LoadBalancerClient to configure itself later.

Double click Shift in the idea to open the global search to find the interface.

public interface LoadBalancerClient extends ServiceInstanceChooser

There is only one abstract method in the ServiceInstanceChooser interface:

public interface ServiceInstanceChooser {
	ServiceInstance choose(String serviceId);
}

This method selects a service instance from the load balancer based on the service id passed in. The service instance is encapsulated by ServiceInstance.

The method of ServiceInstance is as follows:

LoadBalancerClient has the following abstract methods:

public interface LoadBalancerClient extends ServiceInstanceChooser {

    //Pick a service instance from the load balancer and request it
	<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
	<T> T execute(String serviceId, ServiceInstance serviceInstance,
			LoadBalancerRequest<T> request) throws IOException;

    //Reconstruct the url, that is, convert the service name into the form of ip:port, and the instance parameter determines which ip port to use
	URI reconstructURI(ServiceInstance instance, URI original);

}

Next to the @ LoadBalanced annotation, LoadBalancerClient and ServiceInstanceChooser, we also found loadbalancenautoconfiguration, which is an automatic configuration class of load balancer.

3, Loadbalancenautoconfiguration auto configuration class analysis

The source code of loadbalancenautoconfiguration is as follows:

@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

	@LoadBalanced
	@Autowired(required = false)
	private List<RestTemplate> restTemplates = Collections.emptyList();

	@Autowired(required = false)
	private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();

	@Bean
	public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
			final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
		return () -> restTemplateCustomizers.ifAvailable(customizers -> {
			for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
				for (RestTemplateCustomizer customizer : customizers) {
					customizer.customize(restTemplate);
				}
			}
		});
	}

	@Bean
	@ConditionalOnMissingBean
	public LoadBalancerRequestFactory loadBalancerRequestFactory(
			LoadBalancerClient loadBalancerClient) {
		return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
	}

	@Configuration
	@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
	static class LoadBalancerInterceptorConfig {

		@Bean
		public LoadBalancerInterceptor ribbonInterceptor(
				LoadBalancerClient loadBalancerClient,
				LoadBalancerRequestFactory requestFactory) {
			return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
		}

		@Bean
		@ConditionalOnMissingBean
		public RestTemplateCustomizer restTemplateCustomizer(
				final LoadBalancerInterceptor loadBalancerInterceptor) {
			return restTemplate -> {
				List<ClientHttpRequestInterceptor> list = new ArrayList<>(
						restTemplate.getInterceptors());
				list.add(loadBalancerInterceptor);
				restTemplate.setInterceptors(list);
			};
		}

	}


	@Configuration
	@ConditionalOnClass(RetryTemplate.class)
	public static class RetryAutoConfiguration {
      ....
	}

	@Configuration
	@ConditionalOnClass(RetryTemplate.class)
	public static class RetryInterceptorAutoConfiguration {
      ...
	}

}

@Configuration represents that this class is a configuration class

@ConditionalOnClass(RestTemplate.class)

@ConditionalOnBean(LoadBalancerClient.class)

These two annotations indicate that if you want to realize the automatic configuration of Ribbon, you need to be able to load the RestTemplate class and the interface implementation class with LoadBalancerClient.

RestTemplate was injected into the container at the beginning, and we introduced Netflix Ribbon relies on the interface implementation class of LoadBalancerClient in the package, that is, RibbonLoadBalancerClient. We will talk about this class later.

	@LoadBalanced
	@Autowired(required = false)
	private List<RestTemplate> restTemplates = Collections.emptyList();

Using @ Autowired with @ LoadBalanced annotation will make the container inject RestTemplate modified by @ LoadBalanced annotation into the restTemplates collection.

We omitted the RetryAutoConfiguration and RetryInterceptorAutoConfiguration autoconfiguration codes, because @ ConditionalOnClass(RetryTemplate.class) requires the RetryTemplate class to exist in the current project, but we did not introduce it.

In the static class LoadBalancerInterceptorConfig, two things are done:

(1) Inject the loadbalancerinceptor interceptor interceptor into the container

		@Bean
		public LoadBalancerInterceptor ribbonInterceptor(
				LoadBalancerClient loadBalancerClient,
				LoadBalancerRequestFactory requestFactory) {
			return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
		}

(2) Return an implementation class of RestTemplateCustomizer, but use lambda to simplify the code. Then get all interceptor collections of the parameter restTemplate, add the current loadBalancerInterceptor interceptor interceptor to the collection, and finally save the current interceptor collection with setInterceptors(list)

		@Bean
		@ConditionalOnMissingBean
		public RestTemplateCustomizer restTemplateCustomizer(
				final LoadBalancerInterceptor loadBalancerInterceptor) {
			return restTemplate -> {
				List<ClientHttpRequestInterceptor> list = new ArrayList<>(
						restTemplate.getInterceptors());
				list.add(loadBalancerInterceptor);
				restTemplate.setInterceptors(list);
			};
		}

Loadbalancedresttemplateinitializerdeprectated method. The input parameter is the implementation class of all resttemplatecustomizers (the implementation class of the returned RestTemplateCustomizer was injected into the container using @ Bean just now).

Traverse all resttemplates modified by @ LoadBalanced, and call the customize method successively, that is, add a loadbalancerinceptor interceptor interceptor for the RestTemplate.

	public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
			final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
		return () -> restTemplateCustomizers.ifAvailable(customizers -> {
			for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
				for (RestTemplateCustomizer customizer : customizers) {
					customizer.customize(restTemplate);
				}
			}
		});
	}

What exactly does the loadbalancerinceptor interceptor do internally?

4, LoadBalancerInterceptor interceptor

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

	private LoadBalancerClient loadBalancer;

	private LoadBalancerRequestFactory requestFactory;

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
			LoadBalancerRequestFactory requestFactory) {
		this.loadBalancer = loadBalancer;
		this.requestFactory = requestFactory;
	}

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
		// for backwards compatibility
		this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
	}

	@Override
	public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) throws IOException {
        //Get the address to access, that is, in the example http://demo/test
		final URI originalUri = request.getURI();
        //Get the service name, i.e. demo
		String serviceName = originalUri.getHost();
		Assert.state(serviceName != null,
				"Request URI does not contain a valid hostname: " + originalUri);
		return this.loadBalancer.execute(serviceName,
				this.requestFactory.createRequest(request, body, execution));
	}

}

Follow up to loadBalancer In the execute method, loadBalancer is the only implementation class of LoadBalancerClient, RibbonLoadBalancerClient

	public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
			throws IOException {
		return execute(serviceId, request, null);
	}

Continue to the execute method of RibbonLoadBalancerClient

	public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
			throws IOException {
		ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
		Server server = getServer(loadBalancer, hint);
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}
		RibbonServer ribbonServer = new RibbonServer(serviceId, server,
				isSecure(server, serviceId),
				serverIntrospector(serviceId).getMetadata(server));

		return execute(serviceId, ribbonServer, request);
	}

ILoadBalancer loadBalancer = getLoadBalancer(serviceId);

Literally, it should be to get the load balancer (no one can't see it), that is, the implementation class of the interface ILoadBalancer.

Going deep into getLoadBalancer is actually looking for the implementation class of ILoadBalancer from the container.

The following is the class diagram of ILoadBalancer and dependent classes:

Which implementation class is it?

In fact, the implementation class of ILoadBalancer completes the injection in the automatic configuration class RibbonClientConfiguration:

	@Bean
	@ConditionalOnMissingBean
	public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
			ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
			IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
		if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
			return this.propertiesFactory.get(ILoadBalancer.class, config, name);
		}
		return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
				serverListFilter, serverListUpdater);
	}

It's zoneawarelodbalancer class!

5, How is zoneawareload balancer constructed

The following parameters are required to construct this class (these parameters are all injected into the container in advance in RibbonClientConfiguration)

(1) IClientConfig config is the configuration of the client load balancer, including the default read timeout (5s), connection timeout (2s) and maximum connections (200). The default implementation class is DefaultClientConfigImpl.

(2) serverList < server > serverList, get the service list. At present, we write the service address collection in yaml, so the type of serverList is actually ConfigurationBasedServerList, that is, get the service address collection from the configuration file.

(3) Serverlistfilter < server > serverlistfilter, service list filter. The default type is ZonePreferenceServerListFilter, which filters out services that are not in the same zone as the client. We have not configured any zones, so this filter is of little effect in this example.

(4) IRule rule is a load balancing policy interface. The common policies include roundrobin rule and random rule. By default, ZoneAvoidanceRule is adopted, that is, filter according to the zone and then poll. (in this example, the effect is to poll all the time, so the interface returns 8081 and 8082 in turn.)

The class diagram relationship of IRule and its implementation classes is as follows:

(5) IPing ping is the interface to judge whether the service instance is alive. Common implementation classes include DummyPing (return true directly, and the service will always be considered normal), PingUrl (ping a url to get whether it is alive or not), etc. the default detection strategy is DummyPing, so even if we close the demo2 service, the ribbon will still choose the demo2 service.

The class diagram of IPing and its implementation classes is as follows:

(6) ServerListUpdater serverListUpdater is used to update the service list. The default implementation is PollingServerListUpdater, which will start a ScheduledThreadPoolExecutor and periodically execute the IPing policy. (students who are not familiar with thread pool can refer to my article Talk about thread pool)

6, execute method

I'll paste the execute method of RibbonLoadBalancerClient again

	public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
			throws IOException {
		ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
		Server server = getServer(loadBalancer, hint);
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}
		RibbonServer ribbonServer = new RibbonServer(serviceId, server,
				isSecure(server, serviceId),
				serverIntrospector(serviceId).getMetadata(server));

		return execute(serviceId, ribbonServer, request);
	}

From the above, we can see that:

(1) getLoadBalancer(serviceId): it can be understood that when the first request arrives, create beans such as IClientConfig (client configuration), serverlist < server > (service list loaded from the configuration file), IRule (load balancing policy) and IPing (exploration policy), which is a lazy loading mode.

(2) getServer(loadBalancer, hint): select an appropriate service instance from the service list through the above load balancing strategy and probing strategy (the detailed code is in the chooseServer method of zonewareloadbalancer). The Server object contains ip, port, protocol and other information.

Enter the execute(serviceId, ribbonServer, request) method:

The core code is the apply method:

T returnVal = request.apply(serviceInstance)

In fact, the next code has nothing to do with Ribbon.

The logic of the interceptor injected by Ribbon into RestTemplate has been completed. Next, it will traverse other interceptors in the execute method of InterceptingClientHttpRequest and go to the intercept method of the next interceptor.

If there are no other interceptors at this time, the RestTemplate execution process will eventually be followed. At this time, the RestTemplate has obtained the address after load balancing, and makes a direct request using the encapsulated HttpURLConnection.

7, How does RestTemplate use this interceptor?

We talked about the last one RestTemplate source code analysis For example, no interceptors are used.

For interceptors, let's start with the main process method doExecute of RestTemplate

	protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
			@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
 
		Assert.notNull(url, "URI is required");
		Assert.notNull(method, "HttpMethod is required");
		ClientHttpResponse response = null;
		try {
            //Create the ClientHttpRequest mentioned at the beginning of the article 
			ClientHttpRequest request = createRequest(url, method);
			if (requestCallback != null) {
                //Execute request callback
				requestCallback.doWithRequest(request);
			}
            //Execute the request, and U obtains the response result
			response = request.execute();
            //Processing response results
			handleResponse(url, method, response);
            //Use the response extractor to extract data and return predefined java objects, such as String in the example
			return (responseExtractor != null ? responseExtractor.extractData(response) : null);
		}
		catch (IOException ex) {
			String resource = url.toString();
			String query = url.getRawQuery();
			resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
			throw new ResourceAccessException("I/O error on " + method.name() +
					" request for \"" + resource + "\": " + ex.getMessage(), ex);
		}
		finally {
			if (response != null) {
				response.close();
			}
		}
	}

Enter the ClientHttpRequest request = createRequest(url, method) method

The createRequest method in HttpAccessor is called here

    protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
        ClientHttpRequest request = this.getRequestFactory().createRequest(url, method);
        this.initialize(request);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("HTTP " + method.name() + " " + url);
        }
 
        return request;
    }

The getRequestFactory method is overridden by InterceptingHttpAccessor

	public ClientHttpRequestFactory getRequestFactory() {
		List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
		if (!CollectionUtils.isEmpty(interceptors)) {
			ClientHttpRequestFactory factory = this.interceptingRequestFactory;
			if (factory == null) {
				factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
				this.interceptingRequestFactory = factory;
			}
			return factory;
		}
		else {
			return super.getRequestFactory();
		}
	}

Because the interceptor returned by getInterceptors at this time is not empty, the request factory obtained at this time is InterceptingClientHttpRequestFactory instead of SimpleClientHttpRequestFactory without interceptor.

The createRequest method of InterceptingClientHttpRequestFactory will create InterceptingClientHttpRequest instead of the default SimpleBufferingClientHttpRequest.

The class diagram relationship between the two is as follows:

Next, go to response = request in the main process of RestTemplate Execute() method

Like SimpleBufferingClientHttpRequest, this block enters the parent class AbstractClientHttpRequest

	public final ClientHttpResponse execute() throws IOException {
		assertNotExecuted();
		ClientHttpResponse result = executeInternal(this.headers);
		this.executed = true;
		return result;
	}

Like SimpleBufferingClientHttpRequest, enter the executeInternal method, which is located in AbstractBufferingClientHttpRequest

	protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
		byte[] bytes = this.bufferedOutput.toByteArray();
		if (headers.getContentLength() < 0) {
			headers.setContentLength(bytes.length);
		}
		ClientHttpResponse result = executeInternal(headers, bytes);
		this.bufferedOutput = new ByteArrayOutputStream(0);
		return result;
	}

The core is the executeInternal(headers, bytes) method, which is located in InterceptingClientHttpRequest

	protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
		InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
		return requestExecution.execute(this, bufferedOutput);
	}

Enter the execute method of InterceptingRequestExecution (which is actually the private inner class of InterceptingClientHttpRequest)

		public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
			if (this.iterator.hasNext()) {
				ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
				return nextInterceptor.intercept(request, body, this);
			}
			else {
				HttpMethod method = request.getMethod();
				Assert.state(method != null, "No standard HTTP method");
				ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
				request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
				if (body.length > 0) {
					if (delegate instanceof StreamingHttpOutputMessage) {
						StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
						streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
					}
					else {
						StreamUtils.copy(body, delegate.getBody());
					}
				}
				return delegate.execute();
			}
		}

this.iterator is the iterator of the current interceptor. If there is an interceptor, directly execute the interceptor intercept method first, and the interceptor type at this time is LoadBalancerInterceptor. In this way, the next content will return to Chapter 4.

After the LoadBalancerInterceptor interceptor is executed, it will return to the execute method. Next, follow the else logic. At this time, the delegate type is SimpleBufferingClientHttpRequest, which is a familiar class. At this time, the scene becomes a scene without interceptors.

If you want to follow up, you can start with RestTemplate source code analysis:

Here, the process of how RestTemplate can make use of the load balancing capability provided by the Ribbon is over.

8, Ribbon's general process summary

(1) Ribbon's auto configuration class gets all RestTemplate instances modified by the @ LoadBalanced annotation

(2) Add the loadbalancerinceptor interceptor interceptor to the interceptor list of each RestTemplate

(3) RestTemplate will execute the intercept method of each interceptor before executing the request

(4) In the intercept method of loadbalancerinceptor, the service instance collection will be read from the configuration file first, and then the load balancing policy, detection policy and service list update policy will be created

(5) Then, the intercept method will select a service instance according to the above strategy

(6) After RestTemplate gets the service instance, it can directly make a request

Keywords: Java

Added by gilmourt on Tue, 28 Dec 2021 16:51:58 +0200