View SringCloud Gateway Official documents , the working principle of Gateway is shown in the figure below:
Clients make requests to Spring Cloud Gateway. If the Gateway Handler Mapping determines that a request matches a route, it is sent to the Gateway Web Handler. This handler runs the request through a filter chain that is specific to the request. The reason the filters are divided by the dotted line is that filters can run logic both before and after the proxy request is sent. All "pre" filter logic is executed. Then the proxy request is made. After the proxy request is made, the "post" filter logic is run.
Client requests are first processed by Gateway Handler Mapping to find a route matching the request in the routing table, and then the request is handed over to the Web Handler. The Web Handler maintains a filter chain and executes these filters in a chain. These filters logically have two execution stages pre and post.
This paper focuses on the process of routing lookup, and on this basis, explores the implementation of persistent dynamic routing table.
Source code reading
By reading the official source code, sort out the working mechanism of gateway and look for extension points, so as to realize the persistent dynamic routing table.
RoutePredicateHandlerMapping
The Gateway Handler Mapping that implements the routing lookup logic in the Gateway is RoutePredicateHandlerMapping Class in GatewayAutoConfiguration Automatic assembly is realized in (the Bean automatic equipment of Gateway is realized by this kind). Lines 260-266 of the source code are as follows
@Bean public RoutePredicateHandlerMapping routePredicateHandlerMapping( FilteringWebHandler webHandler, RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties, Environment environment) { return new RoutePredicateHandlerMapping(webHandler, routeLocator, globalCorsProperties, environment); }
First of all, we can see that the assembly here is unconditional and there is no expansion point left (i.e Previous articles This is expanded with a special method), focusing on the injection of two beans:
-
FilteringWebHandler : create a filter chain, load the global filter and convert it into a gateway filter, combine the two, and execute the filter chain. This paper will not expand this part;
-
RouteLocator : there are multiple implementation classes, which is the focus of this paper.
@Override protected Mono<?> getHandlerInternal(ServerWebExchange exchange) { //... Ignore code above // lookupRoute is used to find routes return lookupRoute(exchange) // Record the found route in the ServerWebExchange context, and then return the FilteringWebHandler .flatMap((Function<Route, Mono<?>>) r -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isDebugEnabled()) { logger.debug( "Mapping [" + getExchangeDesc(exchange) + "] to " + r); } // Subsequently, the routing object will be obtained from Attributes, and then the routing filter will be obtained and column operations such as filter will be performed exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); return Mono.just(webHandler); //... Ignore subsequent codes
protected Mono<Route> lookupRoute(ServerWebExchange exchange) { // this.routeLocator.getRoutes() this method is the key point and needs to be analyzed later // How does the implementation class of RouteLocator getRoutes() return this.routeLocator.getRoutes() .concatMap(route -> Mono.just(route).filterWhen(r -> { exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId()); // Judge whether the current route assertion is hit according to the assertion of the request and current route. It is not the focus of this article and will not be expanded return r.getPredicate().apply(exchange); }) //... Ignore subsequent codes
RouteLocator implementation class
-
CompositeRouteLocator : merge and combine other routerlocators.
-
CachingRouteLocator : RouterLocator with caching and event based refresh caching mechanism.
-
RouteDefinitionRouteLocator Implemented: RouteDefinition Translate into Route The specific transformation process of RouterLocator is not expanded in this paper. This class plays an important role as a bridge.
// Transformation method @Override public Flux<Route> getRoutes() { // Notice the getRouteDefinitions() here Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions() .map(this::convertToRoute); // ... what follows is abbreviated
Let's take a look at this time GatewayAutoConfiguration In the case of automatic assembly, lines 223-240 of the source code
@Bean public RouteLocator routeDefinitionRouteLocator(GatewayProperties properties, // Inject all filter factories (excluding global filters) to assemble the Route according to the RouteDefinition List<GatewayFilterFactory> gatewayFilters, // Inject all assertion factories to assemble the Route according to the RouteDefinition List<RoutePredicateFactory> predicates, // The loader injected into RouteDefinition is the focus of the following RouteDefinitionLocator routeDefinitionLocator, ConfigurationService configurationService) { return new RouteDefinitionRouteLocator(routeDefinitionLocator, predicates, gatewayFilters, properties, configurationService); } @Bean @Primary @ConditionalOnMissingBean(name = "cachedCompositeRouteLocator") public RouteLocator cachedCompositeRouteLocator(List<RouteLocator> routeLocators) { return new CachingRouteLocator( new CompositeRouteLocator(Flux.fromIterable(routeLocators))); }
stay GatewayAutoConfiguration Instead of directly assembling CompositeRouteLocator, it is nested in cacheingroutelocator. Line 17 of the above code block indicates that all implementation classes of RouteLocator will be assembled into cachedCompositeRouteLocator (also including cachedCompositeRouteLocator. Line 16 skillfully avoids its circular dependence on itself through a Conditional), In this way, only routeDefinitionRouteLocator is injected into the cachedCompositeRouteLocator, which can be expanded by players.
CachingRouteLocator This article will not expand the description. After you create a route, you need to publish one RefreshRoutesEvent Event, and then the Locator can listen to the event and refresh the route.
Let's look at the construction method of RouteDefinitionRouteLocator
public RouteDefinitionRouteLocator(RouteDefinitionLocator routeDefinitionLocator, List<RoutePredicateFactory> predicates, List<GatewayFilterFactory> gatewayFilterFactories, GatewayProperties gatewayProperties, ConfigurationService configurationService) { this.routeDefinitionLocator = routeDefinitionLocator; this.configurationService = configurationService; // Asserting factory initialization, similar to the filter factory below, also intercepts RoutePredicateFactory initFactories(predicates); gatewayFilterFactories.forEach( // factory.name() is used to obtain the name of the filter factory. The specific code implementation is to truncate "GatewayFilterFactory" at the end // This is why we need to end the name with GatewayFilterFactory when customizing the filter factory factory -> this.gatewayFilterFactories.put(factory.name(), factory)); this.gatewayProperties = gatewayProperties; }
Combined with the content of the above code block, for example, we define a route when using spring cloud gateway:
spring: cloud: gateway: routes: - id: path_route uri: https://example.org predicates: - Path=/red/{segment},/blue/{segment} filters: - AddRequestHeader=X-Request-red, blue
Path corresponds to PathRoutePredicateFactory, and AddRequestHeader corresponds to AddRequestHeaderGatewayFilterFactory.
RouteDefinitionLocator implementation class
When assembling RouteDefinitionRouteLocator, inject a Bean with the name routeDefinitionLocator, and return to GatewayAutoConfiguration See how the RouteDefinitionLocator is assembled. The source code is lines 208-214
@Bean @Primary public RouteDefinitionLocator routeDefinitionLocator( List<RouteDefinitionLocator> routeDefinitionLocators) { return new CompositeRouteDefinitionLocator(Flux.fromIterable(routeDefinitionLocators)); }
Here is an example CompositeRouteDefinitionLocator For all RouteDefinitionLocator The implementation classes of are combined and encapsulated. These implementations all implement the specific getRouteDefinitions() method.
-
PropertiesRouteDefinitionLocator The routing loader based on the configuration file does not support dynamic loading after the configuration file is modified
-
DiscoveryClientRouteDefinitionLocator Route loader based on service discovery.
GatewayDiscoveryClientAutoConfiguration It is the automatic assembly of the supporting filters and assertions of the RouteDefinitionLocator above. The loader is closed by default and needs to be opened through the configuration file.
# Open this locator spring.cloud.gateway.discovery.locator.enabled=true # When it is turned on, the default mode is reactive mode. If it needs to be turned off, it can be adjusted to blocking mode spring.cloud.discovery.reactive.enabled=false
-
CachingRouteDefinitionLocator Through browsing the source code, it is found that the loader is not actually installed. It may be that the underlying RouteLocator already has a cache.
-
InMemoryRouteDefinitionRepository The memory based route loader can manage routes through the management endpoint provided by spring cloud gateway, but it is not persistent due to its memory based implementation.
adopt GatewayAutoConfiguration Check the assembly code, lines 202-206 of the source code:
@Bean // It takes effect only when there is no other implementation Bean of RouteDefinitionRepository, // We can realize dynamic routing and persistence through this extension point @ConditionalOnMissingBean(RouteDefinitionRepository.class) public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() { return new InMemoryRouteDefinitionRepository(); }
Management Endpoints:
-
CompositeRouteDefinitionLocator It is a combination of other routedefinitionlocators GatewayAutoConfiguration You can see its assembly code, line 208-214 of the source code
@Bean @Primary public RouteDefinitionLocator routeDefinitionLocator( List<RouteDefinitionLocator> routeDefinitionLocators) { return new CompositeRouteDefinitionLocator( Flux.fromIterable(routeDefinitionLocators)); }
You can see the front RouteDefinitionRouteLocator The RouteDefinitionLocator injected in is CompositeRouteDefinitionLocator, and assembly is the implementation of all routedefinitionlocators injected, including the implementation of RouteDefinitionRepository (InMemoryRouteDefinitionRepository by default).
It is clear here that if we need to implement dynamic routing with persistent storage, we only need to implement a RouteDefinitionRepository based on the database (or other persistent storage) and referring to InMemoryRouteDefinitionRepository.
Dynamic routing based on MongoDB
The above is summarized as follows:
-
Starting with RoutePredicateHandlerMapping, the RouteLocator injected into it is cacheingroutelocator;
-
Cacheingroutelocator encapsulates CompositeRouteLocator;
-
The only RouteLocator of CompositeRouteLocator combination is RouteDefinitionRouteLocator;
-
RouteDefinitionRouteLocator implements the conversion from DefinitionRoute to Route, and injects a Bean named routeDefinitionLocator, namely CompositeRouteDefinitionLocator;
-
CompositeRouteDefinitionLocator combines all routedefinitionlocators, including an implementation of RouteDefinitionRepository, InMemoryRouteDefinitionRepository;
-
The InMemoryRouteDefinitionRepository assembly is conditional and takes effect only when there is no other RouteDefinitionRepository Bean;
-
Refer to InMemoryRouteDefinitionRepository to implement a database-based routing storage.
For all codes, see: scg-dynamic-route
Main code snippet
@Slf4j @Component public class MongoRouteDefinitionRepository implements RouteDefinitionRepository, ApplicationEventPublisherAware { private static final String CACHE_KEY = "routes"; private ApplicationEventPublisher eventPublisher; private Map<String, RouteDefinition> cache = new ConcurrentHashMap<>(); private final RouteRepositoryOperations repositoryOperation; public MongoRouteDefinitionRepository(RouteRepositoryOperations repositoryOperation) { this.repositoryOperation = repositoryOperation; } @Override public Flux<RouteDefinition> getRouteDefinitions() { return Flux.fromIterable(cache.values()); } @Override public Mono<Void> save(Mono<RouteDefinition> route) { return route.flatMap( r -> repositoryOperation.save(MongoRouteDefinition.from(r)) .log() .doOnNext(this::addCache) .then(Mono.empty()) ); } @Override public Mono<Void> delete(Mono<String> routeId) { return repositoryOperation.findById(routeId) .log() .map(RouteDefinition::getId) .doOnNext(this::removeCache) .flatMap(repositoryOperation::deleteById); } @Override public void setApplicationEventPublisher(ApplicationEventPublisher eventPublisher) { this.eventPublisher = eventPublisher; } /** * Adds the specified route to the cache. * <p> * In order to load routing in real time, you can call this method after monitoring data changes through ChangeStream of MongoDB. */ public void addCache(RouteDefinition route) { this.cache.putIfAbsent(route.getId(), route); this.publishEvent(); } /** * Deletes the specified route from the cache. * <p> * In order to load routing in real time, you can call this method after monitoring data changes through ChangeStream of MongoDB. */ public void removeCache(String routeId) { if (this.cache.remove(routeId) != null) { this.publishEvent(); } } void publishEvent() { eventPublisher.publishEvent(new RefreshRoutesEvent(this)); } RouteRepositoryOperations getRepositoryOperation() { return repositoryOperation; } Map<String, RouteDefinition> getCache() { return cache; } void setCache( Map<String, RouteDefinition> cache) { this.cache = cache; } }
@Slf4j @Component @ConditionalOnProperty(value = "route.schedule.enabled", havingValue = "true", matchIfMissing = true) public class RouteRefresher { private final MongoRouteDefinitionRepository repository; public RouteRefresher( MongoRouteDefinitionRepository repository) { this.repository = repository; } /** * Reload the cache at regular intervals */ @Scheduled(initialDelay = 10000, fixedDelay = 60 * 60 * 1001) private void refresh() { RouteRepositoryOperations operation = repository.getRepositoryOperation(); int page = 0; int pageSize = 1000; int total = Math.toIntExact(operation.count().blockOptional().orElse(0L)); Map<String, RouteDefinition> oldCache = repository.getCache(); Map<String, RouteDefinition> newCache = new ConcurrentHashMap<>(total); int oldTotal = oldCache.size(); if (oldTotal < 1) { // First synchronous refresh repository.setCache(newCache); } while (page * pageSize < total) { operation.findAll(PageRequest.of(page++, pageSize)) .doOnNext(route -> newCache.putIfAbsent(route.getId(), route)) .blockLast(); log.info("The current total size of dynamic routing table is:{}, The current size of the new routing table is:{}", oldTotal, newCache.size()); } repository.setCache(newCache); log.info("The new routing table has been loaded. The current size is:{}", newCache.size()); repository.publishEvent(); } }
@Component @ConditionalOnProperty(value = "changeStream.enabled", havingValue = "true", matchIfMissing = true) public class RouteChangeStreamHandler implements CommandLineRunner { private final ReactiveMongoTemplate mongoTemplate; private final MongoRouteDefinitionRepository routeRepository; public RouteChangeStreamHandler( MongoRouteDefinitionRepository routeRepository, ReactiveMongoTemplate mongoTemplate) { this.routeRepository = routeRepository; this.mongoTemplate = mongoTemplate; } @Override public void run(String... args) { new Thread(this::startMonitor, "ChangeStream-Monitor-routes").start(); } public void startMonitor() { Aggregation aggregation = Aggregation.newAggregation(Aggregation .match(Criteria.where("operationType").in("insert", "delete", "update", "replace"))); ChangeStreamOptions options = ChangeStreamOptions.builder() .filter(aggregation) .returnFullDocumentOnUpdate() .build(); String collectionName = MongoRouteDefinition.class.getAnnotation(Document.class).value(); Flux<ChangeStreamEvent<MongoRouteDefinition>> changeStream = mongoTemplate .changeStream(collectionName, options, MongoRouteDefinition.class); changeStream .log() .doOnNext(e -> { if (OperationType.INSERT == e.getOperationType() || OperationType.UPDATE == e.getOperationType() || OperationType.REPLACE == e.getOperationType()) { Optional.ofNullable(e.getBody()).ifPresent(routeRepository::addCache); } else if (OperationType.DELETE == e.getOperationType()) { getId(e).ifPresent(routeRepository::removeCache); } }).blockLast(); } private Optional<String> getId(ChangeStreamEvent<MongoRouteDefinition> e) { return Optional.ofNullable(e.getRaw()) .flatMap(raw -> Optional.ofNullable(raw.getDocumentKey())) .flatMap(docKey -> Optional.ofNullable(docKey.getObjectId("_id"))) .flatMap(bson -> Optional.of(bson.getValue().toHexString())); } }
@Document("gwRoutes") public class MongoRouteDefinition extends RouteDefinition { public static MongoRouteDefinition from(RouteDefinition route) { MongoRouteDefinition newRoute = new MongoRouteDefinition(); BeanUtils.copyProperties(route, newRoute); return newRoute; } }
public interface RouteRepositoryOperations extends ReactiveMongoRepository<MongoRouteDefinition, String> { /** * Paging query * * @param pageable paging * @return Current page */ @Query(value = "{}", sort = "{_id:1}") Flux<MongoRouteDefinition> findAll(Pageable pageable); }
Original link:[ https://xie.infoq.cn/article/0ae4f61ce6c67a651d94678a8].