Spring cloud gateway dynamic routing

View SringCloud Gateway Official documents , the working principle of Gateway is shown in the figure below:

 

 

Clients make requests to Spring Cloud Gateway. If the Gateway Handler Mapping determines that a request matches a route, it is sent to the Gateway Web Handler. This handler runs the request through a filter chain that is specific to the request. The reason the filters are divided by the dotted line is that filters can run logic both before and after the proxy request is sent. All "pre" filter logic is executed. Then the proxy request is made. After the proxy request is made, the "post" filter logic is run.

 

Client requests are first processed by Gateway Handler Mapping to find a route matching the request in the routing table, and then the request is handed over to the Web Handler. The Web Handler maintains a filter chain and executes these filters in a chain. These filters logically have two execution stages pre and post.

This paper focuses on the process of routing lookup, and on this basis, explores the implementation of persistent dynamic routing table.

Source code reading

By reading the official source code, sort out the working mechanism of gateway and look for extension points, so as to realize the persistent dynamic routing table.

RoutePredicateHandlerMapping

The Gateway Handler Mapping that implements the routing lookup logic in the Gateway is RoutePredicateHandlerMapping Class in GatewayAutoConfiguration Automatic assembly is realized in (the Bean automatic equipment of Gateway is realized by this kind). Lines 260-266 of the source code are as follows

@Bean
public RoutePredicateHandlerMapping routePredicateHandlerMapping(
    FilteringWebHandler webHandler, RouteLocator routeLocator,
    GlobalCorsProperties globalCorsProperties, Environment environment) {
  return new RoutePredicateHandlerMapping(webHandler, routeLocator,
      globalCorsProperties, environment);
}

First of all, we can see that the assembly here is unconditional and there is no expansion point left (i.e Previous articles This is expanded with a special method), focusing on the injection of two beans:

  • FilteringWebHandler : create a filter chain, load the global filter and convert it into a gateway filter, combine the two, and execute the filter chain. This paper will not expand this part;

  • RouteLocator : there are multiple implementation classes, which is the focus of this paper.

 

 

@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
//...  Ignore code above

  // lookupRoute is used to find routes
  return lookupRoute(exchange)
      // Record the found route in the ServerWebExchange context, and then return the FilteringWebHandler
      .flatMap((Function<Route, Mono<?>>) r -> {
        exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
        if (logger.isDebugEnabled()) {
          logger.debug(
              "Mapping [" + getExchangeDesc(exchange) + "] to " + r);
        }
        // Subsequently, the routing object will be obtained from Attributes, and then the routing filter will be obtained and column operations such as filter will be performed
        exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
        return Mono.just(webHandler);

//...  Ignore subsequent codes

 

 protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
   // this.routeLocator.getRoutes() this method is the key point and needs to be analyzed later 
   // How does the implementation class of RouteLocator getRoutes()
   return this.routeLocator.getRoutes()
       .concatMap(route -> Mono.just(route).filterWhen(r -> {
         exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
         // Judge whether the current route assertion is hit according to the assertion of the request and current route. It is not the focus of this article and will not be expanded
         return r.getPredicate().apply(exchange);
       })
//...  Ignore subsequent codes

 

RouteLocator implementation class

// Transformation method 
@Override
public Flux<Route> getRoutes() {
  // Notice the getRouteDefinitions() here 
	Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions()
			.map(this::convertToRoute);
  // ...  what follows is abbreviated

 

Let's take a look at this time GatewayAutoConfiguration In the case of automatic assembly, lines 223-240 of the source code

@Bean
public RouteLocator routeDefinitionRouteLocator(GatewayProperties properties,
  // Inject all filter factories (excluding global filters) to assemble the Route according to the RouteDefinition
  List<GatewayFilterFactory> gatewayFilters,
  // Inject all assertion factories to assemble the Route according to the RouteDefinition
  List<RoutePredicateFactory> predicates,
  // The loader injected into RouteDefinition is the focus of the following
  RouteDefinitionLocator routeDefinitionLocator,
  ConfigurationService configurationService) {
  return new RouteDefinitionRouteLocator(routeDefinitionLocator, predicates,
      gatewayFilters, properties, configurationService);
}

@Bean
@Primary
@ConditionalOnMissingBean(name = "cachedCompositeRouteLocator")
public RouteLocator cachedCompositeRouteLocator(List<RouteLocator> routeLocators) {
  return new CachingRouteLocator(
      new CompositeRouteLocator(Flux.fromIterable(routeLocators)));
}

stay GatewayAutoConfiguration Instead of directly assembling CompositeRouteLocator, it is nested in cacheingroutelocator. Line 17 of the above code block indicates that all implementation classes of RouteLocator will be assembled into cachedCompositeRouteLocator (also including cachedCompositeRouteLocator. Line 16 skillfully avoids its circular dependence on itself through a Conditional), In this way, only routeDefinitionRouteLocator is injected into the cachedCompositeRouteLocator, which can be expanded by players.

 

CachingRouteLocator This article will not expand the description. After you create a route, you need to publish one RefreshRoutesEvent Event, and then the Locator can listen to the event and refresh the route.

 

Let's look at the construction method of RouteDefinitionRouteLocator

public RouteDefinitionRouteLocator(RouteDefinitionLocator routeDefinitionLocator,
    List<RoutePredicateFactory> predicates,
    List<GatewayFilterFactory> gatewayFilterFactories,
    GatewayProperties gatewayProperties,
    ConfigurationService configurationService) {
  this.routeDefinitionLocator = routeDefinitionLocator;
  this.configurationService = configurationService;
  // Asserting factory initialization, similar to the filter factory below, also intercepts RoutePredicateFactory
  initFactories(predicates);
  gatewayFilterFactories.forEach(
      // factory.name() is used to obtain the name of the filter factory. The specific code implementation is to truncate "GatewayFilterFactory" at the end
      // This is why we need to end the name with GatewayFilterFactory when customizing the filter factory
      factory -> this.gatewayFilterFactories.put(factory.name(), factory));
  this.gatewayProperties = gatewayProperties;
}

Combined with the content of the above code block, for example, we define a route when using spring cloud gateway:

spring:
  cloud:
    gateway:
      routes:
      - id: path_route
        uri: https://example.org
        predicates:
        - Path=/red/{segment},/blue/{segment}
        filters:
        - AddRequestHeader=X-Request-red, blue

Path corresponds to PathRoutePredicateFactory, and AddRequestHeader corresponds to AddRequestHeaderGatewayFilterFactory.

RouteDefinitionLocator implementation class

 

When assembling RouteDefinitionRouteLocator, inject a Bean with the name routeDefinitionLocator, and return to GatewayAutoConfiguration See how the RouteDefinitionLocator is assembled. The source code is lines 208-214

@Bean
@Primary
public RouteDefinitionLocator routeDefinitionLocator(
  List<RouteDefinitionLocator> routeDefinitionLocators) {
  return new CompositeRouteDefinitionLocator(Flux.fromIterable(routeDefinitionLocators));
}

Here is an example CompositeRouteDefinitionLocator For all RouteDefinitionLocator The implementation classes of are combined and encapsulated. These implementations all implement the specific getRouteDefinitions() method.

GatewayDiscoveryClientAutoConfiguration It is the automatic assembly of the supporting filters and assertions of the RouteDefinitionLocator above. The loader is closed by default and needs to be opened through the configuration file.

# Open this locator
spring.cloud.gateway.discovery.locator.enabled=true
# When it is turned on, the default mode is reactive mode. If it needs to be turned off, it can be adjusted to blocking mode
spring.cloud.discovery.reactive.enabled=false
  • CachingRouteDefinitionLocator Through browsing the source code, it is found that the loader is not actually installed. It may be that the underlying RouteLocator already has a cache.

  • InMemoryRouteDefinitionRepository The memory based route loader can manage routes through the management endpoint provided by spring cloud gateway, but it is not persistent due to its memory based implementation.

adopt GatewayAutoConfiguration Check the assembly code, lines 202-206 of the source code:

@Bean
// It takes effect only when there is no other implementation Bean of RouteDefinitionRepository,
// We can realize dynamic routing and persistence through this extension point
@ConditionalOnMissingBean(RouteDefinitionRepository.class)
public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
	return new InMemoryRouteDefinitionRepository();
}

Management Endpoints:

@Bean
@Primary
public RouteDefinitionLocator routeDefinitionLocator(
    List<RouteDefinitionLocator> routeDefinitionLocators) {
  return new CompositeRouteDefinitionLocator(
      Flux.fromIterable(routeDefinitionLocators));
}

You can see the front RouteDefinitionRouteLocator The RouteDefinitionLocator injected in is CompositeRouteDefinitionLocator, and assembly is the implementation of all routedefinitionlocators injected, including the implementation of RouteDefinitionRepository (InMemoryRouteDefinitionRepository by default).

 

It is clear here that if we need to implement dynamic routing with persistent storage, we only need to implement a RouteDefinitionRepository based on the database (or other persistent storage) and referring to InMemoryRouteDefinitionRepository.

 

Dynamic routing based on MongoDB

The above is summarized as follows:

  1. Starting with RoutePredicateHandlerMapping, the RouteLocator injected into it is cacheingroutelocator;

  2. Cacheingroutelocator encapsulates CompositeRouteLocator;

  3. The only RouteLocator of CompositeRouteLocator combination is RouteDefinitionRouteLocator;

  4. RouteDefinitionRouteLocator implements the conversion from DefinitionRoute to Route, and injects a Bean named routeDefinitionLocator, namely CompositeRouteDefinitionLocator;

  5. CompositeRouteDefinitionLocator combines all routedefinitionlocators, including an implementation of RouteDefinitionRepository, InMemoryRouteDefinitionRepository;

  6. The InMemoryRouteDefinitionRepository assembly is conditional and takes effect only when there is no other RouteDefinitionRepository Bean;

  7. Refer to InMemoryRouteDefinitionRepository to implement a database-based routing storage.

 

 

For all codes, see: scg-dynamic-route

 

Main code snippet

@Slf4j
@Component
public class MongoRouteDefinitionRepository
    implements RouteDefinitionRepository, ApplicationEventPublisherAware {

  private static final String CACHE_KEY = "routes";

  private ApplicationEventPublisher eventPublisher;

  private Map<String, RouteDefinition> cache = new ConcurrentHashMap<>();

  private final RouteRepositoryOperations repositoryOperation;

  public MongoRouteDefinitionRepository(RouteRepositoryOperations repositoryOperation) {
    this.repositoryOperation = repositoryOperation;
  }

  @Override
  public Flux<RouteDefinition> getRouteDefinitions() {
    return Flux.fromIterable(cache.values());
  }

  @Override
  public Mono<Void> save(Mono<RouteDefinition> route) {
    return route.flatMap(
        r -> repositoryOperation.save(MongoRouteDefinition.from(r))
            .log()
            .doOnNext(this::addCache)
            .then(Mono.empty())
    );
  }

  @Override
  public Mono<Void> delete(Mono<String> routeId) {
    return repositoryOperation.findById(routeId)
        .log()
        .map(RouteDefinition::getId)
        .doOnNext(this::removeCache)
        .flatMap(repositoryOperation::deleteById);
  }

  @Override
  public void setApplicationEventPublisher(ApplicationEventPublisher eventPublisher) {
    this.eventPublisher = eventPublisher;
  }

  /**
   * Adds the specified route to the cache.
   * <p>
   * In order to load routing in real time, you can call this method after monitoring data changes through ChangeStream of MongoDB.
   */
  public void addCache(RouteDefinition route) {
    this.cache.putIfAbsent(route.getId(), route);
    this.publishEvent();
  }

  /**
   * Deletes the specified route from the cache.
   * <p>
   * In order to load routing in real time, you can call this method after monitoring data changes through ChangeStream of MongoDB.
   */
  public void removeCache(String routeId) {
    if (this.cache.remove(routeId) != null) {
      this.publishEvent();
    }
  }

  void publishEvent() {
    eventPublisher.publishEvent(new RefreshRoutesEvent(this));
  }

  RouteRepositoryOperations getRepositoryOperation() {
    return repositoryOperation;
  }


  Map<String, RouteDefinition> getCache() {
    return cache;
  }

  void setCache(
      Map<String, RouteDefinition> cache) {
    this.cache = cache;
  }

}

 

@Slf4j
@Component
@ConditionalOnProperty(value = "route.schedule.enabled", havingValue = "true", matchIfMissing = true)
public class RouteRefresher {

  private final MongoRouteDefinitionRepository repository;

  public RouteRefresher(
      MongoRouteDefinitionRepository repository) {
    this.repository = repository;
  }

  /**
   * Reload the cache at regular intervals
   */
  @Scheduled(initialDelay = 10000, fixedDelay = 60 * 60 * 1001)
  private void refresh() {
    RouteRepositoryOperations operation = repository.getRepositoryOperation();

    int page = 0;
    int pageSize = 1000;
    int total = Math.toIntExact(operation.count().blockOptional().orElse(0L));
    Map<String, RouteDefinition> oldCache = repository.getCache();
    Map<String, RouteDefinition> newCache = new ConcurrentHashMap<>(total);
    int oldTotal = oldCache.size();
    if (oldTotal < 1) {
      // First synchronous refresh
      repository.setCache(newCache);
    }
    while (page * pageSize < total) {
      operation.findAll(PageRequest.of(page++, pageSize))
          .doOnNext(route -> newCache.putIfAbsent(route.getId(), route))
          .blockLast();
      log.info("The current total size of dynamic routing table is:{}, The current size of the new routing table is:{}", oldTotal, newCache.size());
    }
    repository.setCache(newCache);
    log.info("The new routing table has been loaded. The current size is:{}", newCache.size());
    repository.publishEvent();
  }
}

 

@Component
@ConditionalOnProperty(value = "changeStream.enabled", havingValue = "true", matchIfMissing = true)
public class RouteChangeStreamHandler implements CommandLineRunner {

  private final ReactiveMongoTemplate mongoTemplate;
  private final MongoRouteDefinitionRepository routeRepository;

  public RouteChangeStreamHandler(
      MongoRouteDefinitionRepository routeRepository, ReactiveMongoTemplate mongoTemplate) {
    this.routeRepository = routeRepository;
    this.mongoTemplate = mongoTemplate;
  }

  @Override
  public void run(String... args) {
    new Thread(this::startMonitor, "ChangeStream-Monitor-routes").start();
  }

  public void startMonitor() {
    Aggregation aggregation = Aggregation.newAggregation(Aggregation
        .match(Criteria.where("operationType").in("insert", "delete", "update", "replace")));

    ChangeStreamOptions options = ChangeStreamOptions.builder()
        .filter(aggregation)
        .returnFullDocumentOnUpdate()
        .build();

    String collectionName = MongoRouteDefinition.class.getAnnotation(Document.class).value();
    Flux<ChangeStreamEvent<MongoRouteDefinition>> changeStream = mongoTemplate
        .changeStream(collectionName, options, MongoRouteDefinition.class);

    changeStream
        .log()
        .doOnNext(e -> {
          if (OperationType.INSERT == e.getOperationType()
              || OperationType.UPDATE == e.getOperationType()
              || OperationType.REPLACE == e.getOperationType()) {
            Optional.ofNullable(e.getBody()).ifPresent(routeRepository::addCache);
          } else if (OperationType.DELETE == e.getOperationType()) {
            getId(e).ifPresent(routeRepository::removeCache);
          }
        }).blockLast();
  }

  private Optional<String> getId(ChangeStreamEvent<MongoRouteDefinition> e) {
    return Optional.ofNullable(e.getRaw())
        .flatMap(raw -> Optional.ofNullable(raw.getDocumentKey()))
        .flatMap(docKey -> Optional.ofNullable(docKey.getObjectId("_id")))
        .flatMap(bson -> Optional.of(bson.getValue().toHexString()));
  }
}

 

@Document("gwRoutes")
public class MongoRouteDefinition extends RouteDefinition {

  public static MongoRouteDefinition from(RouteDefinition route) {
    MongoRouteDefinition newRoute = new MongoRouteDefinition();
    BeanUtils.copyProperties(route, newRoute);
    return newRoute;
  }
}

 

public interface RouteRepositoryOperations extends
    ReactiveMongoRepository<MongoRouteDefinition, String> {

  /**
   * Paging query
   *
   * @param pageable paging
   * @return Current page
   */
  @Query(value = "{}", sort = "{_id:1}")
  Flux<MongoRouteDefinition> findAll(Pageable pageable);
}

 

Original link:[ https://xie.infoq.cn/article/0ae4f61ce6c67a651d94678a8].

 

Keywords: Spring source code gateway

Added by antonbrk on Sat, 29 Jan 2022 04:42:52 +0200