Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
231 views
in Technique[技术] by (71.8m points)

kotlin - Spring Cloud Gateway: Post Filter Web Client Request

We are using Spring Cloud Gateway in order to route requests to multiple underlying services. The calls to these underlying services will be sequential and potentially feed into one another (response from one being used in the request for the next). We have a working solution for when we need to make those requests sequentially BEFORE the main request, but after the main request we are having problems with feeding the response of one proxy request into the request of the next.

The way we have planned on feeding the response from one request to the next is by making the request using a WebClient in the GatewayFilter and storing the response string in the exchange's attribute store. Then during the next proxy request we supply an attribute name to optionally pull the request body from. This works well when using "pre" filters, because the first proxy request is built, executed and response cached before the second request is built and executed, so the chain of attributes works as expected. The problem comes when working with "post" filters. In the post proxy, the web client requests are all built before the subsequent request has finished. So the attribute store never has the response from the previous request, meaning the next request doesn't work as intended because it doesn't have a valid request body.

My understanding was that calling chain.filter(exchange).then(Mono.fromRunnable{ ... }) would cause the .then logic to execute only after the prior filters had fully completed. This does not seem to be the case. In other filter types like logging, response manipulation, etc the post filters execute in the correct order, but when creating a WebClient they don't seem to.

Does anyone have any ideas on how this desired behavior might be achievable?

Pre-Proxy Filter Code(Working):

class PreProxyGatewayFilterFactory: AbstractGatewayFilterFactory<PreProxyGatewayFilterFactory.Params>(Params::class.java) {
    override fun apply(params: Params): GatewayFilter {
        return OrderedGatewayFilter(
            { exchange, chain ->
                ServerWebExchangeUtils.cacheRequestBody(exchange){
                    val cachedExchange = exchange.mutate().request(it).build()
                    executeRequest(cachedExchange, params)
                        .map { response ->
                            val body = response.body.toString()
                            cacheResponse(
                                response.body.toString(),
                                params.cachedResponseBodyAttributeName,
                                cachedExchange
                            )
                        }
                        .flatMap(chain::filter)
                }
            }, params.order)
    }

    private fun cacheResponse(response: String, attributeName: String?, exchange: ServerWebExchange): ServerWebExchange{
        if(!attributeName.isNullOrBlank()){
            exchange.attributes[attributeName] = response
        }
        return exchange
    }

    private fun executeRequest(exchange: ServerWebExchange, params: Params): Mono<ResponseEntity<String>>{
        val request = when(exchange.request.method){
            HttpMethod.PUT -> WebClient.create().put().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
            HttpMethod.POST -> WebClient.create().post().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
            HttpMethod.GET -> WebClient.create().get().uri(params.proxyPath)
            HttpMethod.DELETE -> WebClient.create().delete().uri(params.proxyPath)
            else -> throw Exception("Invalid request method passed in to the proxy filter")
        }
        return request.headers { headers ->
            headers.addAll(exchange.request.headers)
            headers.remove(CONTENT_LENGTH)
        }
        .exchange()
        .flatMap{ response ->
            response.toEntity(String::class.java)
        }
    }

    private fun createProxyRequestBody(exchange: ServerWebExchange, attributeName: String?): BodyInserter<out Flux<out Any>, ReactiveHttpOutputMessage> {
        val cachedBody = attributeName?.let { attrName ->
            exchange.getAttributeOrDefault<String>(attrName, "null")
        } ?: "null"
        return if(cachedBody != "null"){
            BodyInserters.fromPublisher(Flux.just(cachedBody), String::class.java)
        } else {
            BodyInserters.fromDataBuffers(exchange.request.body)
        }
    }

    data class Params(
            val proxyPath: String = "",
            val cachedRequestBodyAttributeName: String? = null,
            val cachedResponseBodyAttributeName: String? = null,
            val order: Int = 0
    )
}

Post-Proxy Filter Code (Not Working)

class PostProxyGatewayFilterFactory: AbstractGatewayFilterFactory<PostProxyGatewayFilterFactory.Params>(Params::class.java) {
    override fun apply(params: Params): GatewayFilter {
        return OrderedGatewayFilter(
            { exchange, chain ->
                ServerWebExchangeUtils.cacheRequestBody(exchange){
                    val cachedExchange = exchange.mutate().request(it).build()

                    //Currently using a cached body does not work in post proxy
                    chain.filter(cachedExchange).then( Mono.fromRunnable{
                        executeRequest(cachedExchange, params)
                            .map { response ->
                                cacheResponse(
                                    response.body.toString(),
                                    params.cachedResponseBodyAttributeName,
                                    cachedExchange
                                )
                            }
                            .flatMap {
                                Mono.empty<Void>()
                            }
                    })
                }
            }, params.order)
    }

    private fun cacheResponse(response: String, attributeName: String?, exchange: ServerWebExchange): ServerWebExchange{
        if(!attributeName.isNullOrBlank()){
            exchange.attributes[attributeName] = response
        }
        return exchange
    }

    private fun executeRequest(exchange: ServerWebExchange, params: Params): Mono<ResponseEntity<String>>{
        val request = when(exchange.request.method){
            HttpMethod.PUT -> WebClient.create().put().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
            HttpMethod.POST -> WebClient.create().post().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
            HttpMethod.GET -> WebClient.create().get().uri(params.proxyPath)
            HttpMethod.DELETE -> WebClient.create().delete().uri(params.proxyPath)
            else -> throw Exception("Invalid request method passed in to the proxy filter")
        }
        return request.headers { headers ->
            headers.addAll(exchange.request.headers)
            headers.remove(CONTENT_LENGTH)
        }
        .exchange()
        .flatMap{ response ->
            response.toEntity(String::class.java)
        }
    }

    private fun createProxyRequestBody(exchange: ServerWebExchange, attributeName: String?): BodyInserter<out Flux<out Any>, ReactiveHttpOutputMessage> {
        val cachedBody = attributeName?.let { attrName ->
            exchange.getAttributeOrDefault<String>(attrName, "null")
        } ?: "null"
        return if(cachedBody != "null"){
            BodyInserters.fromPublisher(Flux.just(cachedBody), String::class.java)
        } else {
            BodyInserters.fromDataBuffers(exchange.request.body)
        }
    }


    data class Params(
            val proxyPath: String = "",
            val cachedRequestBodyAttributeName: String? = null,
            val cachedResponseBodyAttributeName: String? = null,
            val order: Int = 0
    )
}
question from:https://stackoverflow.com/questions/66050798/spring-cloud-gateway-post-filter-web-client-request

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Was finally able to get to a working solution for the post filter proxy pulling it's request body from the attributes. It was a relatively straightforward fix that I just couldn't find the answer to. Instead of using chain.filter(exchange).then(Mono.fromRunnable { ...execute proxy request...}) I just needed to use chain.filter(exchange).then(Mono.defer { ...execute proxy request...}).


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...