spring cloud gateway intercepts request Body

When accessing Spring-Cloud-Gateway, there may be a need to cache Json-Body or Form-Urlencoded data.
Because Spring-Cloud-Gateway is a responsive architecture design based on WebFlux, traditional programming ideas are not suitable for the development of Reactor Stream in the process of migrating from Zuul.
There are many caching cases on the network, but there are various Bug problems in the testing process. When caching the Body, we need to consider the overall response operation in order to cache the data more reasonably.

The following is a concrete implementation of caching Json-Body data or Form-Urlencoded data, which has been tested to meet various needs and to avoid problems arising from other caching schemes on the network.

Define a GatewayContext class for storing data in requests

import org.springframework.util.MultiValueMap;



public class GatewayContext {

    public static final String CACHE_GATEWAY_CONTEXT = "cacheGatewayContext";

    /**
     * cache json body
     */
    private String cacheBody;
    /**
     * cache formdata
     */
    private MultiValueMap<String, String> formData;
    /**
     * cache reqeust path
     */
    private String path;


    public String getCacheBody() {
        return cacheBody;
    }

    public void setCacheBody(String cacheBody) {
        this.cacheBody = cacheBody;
    }

    public MultiValueMap<String, String> getFormData() {
        return formData;
    }

    public void setFormData(MultiValueMap<String, String> formData) {
        this.formData = formData;
    }

    public String getPath() {
        return path;
    }

    public void setPath(String path) {
        this.path = path;
    }
}
  • 1. This example only supports caching the following three MediaType s
    • APPLICATION_JSON--Json data
    • APPLICATION_JSON_UTF8--Json data
    • APPLICATION_FORM_URLENCODED--FormData form data
  • 2. Summary of experience:
    • When caching Body, it can't be cached directly inside the Filter. It needs to be cached on the asynchronous operation path according to the response processing mode. Because Body can only read once, new requests and exchange s need to be re-encapsulated after reading to ensure the normal transmission of requests to the downstream.
    • When caching FormData, FormData can only be read once, so after reading, request and exchange need to be re-encapsulated. Note here that if the content of FormData is modified, the content-length in Header must be redefined to ensure that the size of the transmitted data is the same.
package com.weiresearch.idss.weiark.gateway.LogReader;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;

import io.netty.buffer.ByteBufAllocator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

// https://segmentfault.com/a/1190000017898354

@Component
public class GatewayContextFilter
    implements GlobalFilter {

    /**
     * default HttpMessageReader
     */
    private static final List<HttpMessageReader<?>> messageReaders =
        HandlerStrategies.withDefaults().messageReaders();

    private Logger log = LoggerFactory.getLogger(GatewayContextFilter.class);

    @Override
    public Mono<Void> filter(
        ServerWebExchange exchange,
        GatewayFilterChain chain) {
        /**
         * save request path and serviceId into gateway context
         */
        ServerHttpRequest request = exchange.getRequest();
        String path = request.getPath().pathWithinApplication().value();
        GatewayContext gatewayContext = new GatewayContext();
        gatewayContext.setPath(path);
        /**
         * save gateway context into exchange
         */
        exchange.getAttributes().put(GatewayContext.CACHE_GATEWAY_CONTEXT,
            gatewayContext);
        HttpHeaders headers = request.getHeaders();
        MediaType contentType = headers.getContentType();
        log.info("start-------------------------------------------------");
        log.info("HttpMethod:{},Url:{}", request.getMethod(),
            request.getURI().getRawPath());
        if (request.getMethod() == HttpMethod.GET) {
            log.info("end-------------------------------------------------");
        }
        if (request.getMethod() == HttpMethod.POST) {
            Mono<Void> voidMono = null;
            if (MediaType.APPLICATION_JSON.equals(contentType)
                || MediaType.APPLICATION_JSON_UTF8.equals(contentType)) {
                voidMono =
                    readBody(exchange, chain, gatewayContext);
            }
            if (MediaType.APPLICATION_FORM_URLENCODED.equals(contentType)) {
                voidMono =
                    readFormData(exchange, chain, gatewayContext);
            }

            return voidMono;
        }
        /* log.debug(
            "[GatewayContext]ContentType:{},Gateway context is set with {}",
            contentType, gatewayContext);*/
        return chain.filter(exchange);

    }

    /**
     * ReadFormData
     * 
     * @param exchange
     * @param chain
     * @return
     */
    private Mono<Void> readFormData(
        ServerWebExchange exchange,
        GatewayFilterChain chain,
        GatewayContext gatewayContext) {
        final ServerHttpRequest request = exchange.getRequest();
        HttpHeaders headers = request.getHeaders();

        return exchange.getFormData()
            .doOnNext(multiValueMap -> {
                gatewayContext.setFormData(multiValueMap);
                log.info("Post x-www-form-urlencoded:{}",
                    multiValueMap);
                log.info(
                    "end-------------------------------------------------");
            })
            .then(Mono.defer(() -> {
                Charset charset = headers.getContentType().getCharset();
                charset = charset == null ? StandardCharsets.UTF_8 : charset;
                String charsetName = charset.name();
                MultiValueMap<String, String> formData =
                    gatewayContext.getFormData();
                /**
                 * formData is empty just return
                 */
                if (null == formData || formData.isEmpty()) {
                    return chain.filter(exchange);
                }
                StringBuilder formDataBodyBuilder = new StringBuilder();
                String entryKey;
                List<String> entryValue;
                try {
                    /**
                     * repackage form data
                     */
                    for (Map.Entry<String, List<String>> entry : formData
                        .entrySet()) {
                        entryKey = entry.getKey();
                        entryValue = entry.getValue();
                        if (entryValue.size() > 1) {
                            for (String value : entryValue) {
                                formDataBodyBuilder.append(entryKey).append("=")
                                    .append(
                                        URLEncoder.encode(value, charsetName))
                                    .append("&");
                            }
                        } else {
                            formDataBodyBuilder
                                .append(entryKey).append("=").append(URLEncoder
                                    .encode(entryValue.get(0), charsetName))
                                .append("&");
                        }
                    }
                } catch (UnsupportedEncodingException e) {
                    // ignore URLEncode Exception
                }
                /**
                 * substring with the last char '&'
                 */
                String formDataBodyString = "";
                if (formDataBodyBuilder.length() > 0) {
                    formDataBodyString = formDataBodyBuilder.substring(0,
                        formDataBodyBuilder.length() - 1);
                }
                /**
                 * get data bytes
                 */
                byte[] bodyBytes = formDataBodyString.getBytes(charset);
                int contentLength = bodyBytes.length;
                ServerHttpRequestDecorator decorator =
                    new ServerHttpRequestDecorator(
                        request) {
                        /**
                         * change content-length
                         * 
                         * @return
                         */
                        @Override
                        public HttpHeaders getHeaders() {
                            HttpHeaders httpHeaders = new HttpHeaders();
                            httpHeaders.putAll(super.getHeaders());
                            if (contentLength > 0) {
                                httpHeaders.setContentLength(contentLength);
                            } else {
                                httpHeaders.set(HttpHeaders.TRANSFER_ENCODING,
                                    "chunked");
                            }
                            return httpHeaders;
                        }

                        /**
                         * read bytes to Flux<Databuffer>
                         * 
                         * @return
                         */
                        @Override
                        public Flux<DataBuffer> getBody() {
                            return DataBufferUtils
                                .read(new ByteArrayResource(bodyBytes),
                                    new NettyDataBufferFactory(
                                        ByteBufAllocator.DEFAULT),
                                    contentLength);
                        }
                    };
                ServerWebExchange mutateExchange =
                    exchange.mutate().request(decorator).build();
                /*   log.info("[GatewayContext]Rewrite Form Data :{}",
                       formDataBodyString);*/

                return chain.filter(mutateExchange);
            }));
    }

    /**
     * ReadJsonBody
     * 
     * @param exchange
     * @param chain
     * @return
     */
    private Mono<Void> readBody(
        ServerWebExchange exchange,
        GatewayFilterChain chain,
        GatewayContext gatewayContext) {
        /**
         * join the body
         */
        return DataBufferUtils.join(exchange.getRequest().getBody())
            .flatMap(dataBuffer -> {
                /*
                 * read the body Flux<DataBuffer>, and release the buffer
                 * //TODO when SpringCloudGateway Version Release To G.SR2,this can be update with the new version's feature
                 * see PR https://github.com/spring-cloud/spring-cloud-gateway/pull/1095
                 */
                byte[] bytes = new byte[dataBuffer.readableByteCount()];
                dataBuffer.read(bytes);
                DataBufferUtils.release(dataBuffer);
                Flux<DataBuffer> cachedFlux = Flux.defer(() -> {
                    DataBuffer buffer =
                        exchange.getResponse().bufferFactory().wrap(bytes);
                    DataBufferUtils.retain(buffer);
                    return Mono.just(buffer);
                });
                /**
                 * repackage ServerHttpRequest
                 */
                ServerHttpRequest mutatedRequest =
                    new ServerHttpRequestDecorator(exchange.getRequest()) {
                        @Override
                        public Flux<DataBuffer> getBody() {
                            return cachedFlux;
                        }
                    };
                /**
                 * mutate exchage with new ServerHttpRequest
                 */
                ServerWebExchange mutatedExchange =
                    exchange.mutate().request(mutatedRequest).build();
                /**
                 * read body string with default messageReaders
                 */
                return ServerRequest.create(mutatedExchange, messageReaders)
                    .bodyToMono(String.class)
                    .doOnNext(objectValue -> {
                        log.info("PostBody:{}", objectValue);
                        log.info(
                            "end-------------------------------------------------");
                        gatewayContext.setCacheBody(objectValue);
                        /*  log.debug("[GatewayContext]Read JsonBody:{}",
                              objectValue);*/
                    }).then(chain.filter(mutatedExchange));
            });
    }

}

In the subsequent Filter, you can get the Gateway Context directly from Server Exchange, and you can get the cached data. If you need to cache other data, you can add it to the Gateway Context according to your needs.

GatewayContext gatewayContext = exchange.getAttribute(GatewayContext.CACHE_GATEWAY_CONTEXT);

Reference: https://segmentfault.com/a/1190000017898354

Keywords: PHP Java JSON Spring network

Added by Hagaroo on Wed, 31 Jul 2019 18:04:07 +0300