We are using Spring Cloud Gateway in order to route requests to multiple underlying services. The calls to these underlying services will be sequential and potentially feed into one another (response from one being used in the request for the next). We have a working solution for when we need to make those requests sequentially BEFORE the main request, but after the main request we are having problems with feeding the response of one proxy request into the request of the next.
The way we have planned on feeding the response from one request to the next is by making the request using a WebClient in the GatewayFilter and storing the response string in the exchange's attribute store. Then during the next proxy request we supply an attribute name to optionally pull the request body from. This works well when using "pre" filters, because the first proxy request is built, executed and response cached before the second request is built and executed, so the chain of attributes works as expected. The problem comes when working with "post" filters. In the post proxy, the web client requests are all built before the subsequent request has finished. So the attribute store never has the response from the previous request, meaning the next request doesn't work as intended because it doesn't have a valid request body.
My understanding was that calling chain.filter(exchange).then(Mono.fromRunnable{ ... })
would cause the .then
logic to execute only after the prior filters had fully completed. This does not seem to be the case. In other filter types like logging, response manipulation, etc the post filters execute in the correct order, but when creating a WebClient they don't seem to.
Does anyone have any ideas on how this desired behavior might be achievable?
Pre-Proxy Filter Code(Working):
class PreProxyGatewayFilterFactory: AbstractGatewayFilterFactory<PreProxyGatewayFilterFactory.Params>(Params::class.java) {
override fun apply(params: Params): GatewayFilter {
return OrderedGatewayFilter(
{ exchange, chain ->
ServerWebExchangeUtils.cacheRequestBody(exchange){
val cachedExchange = exchange.mutate().request(it).build()
executeRequest(cachedExchange, params)
.map { response ->
val body = response.body.toString()
cacheResponse(
response.body.toString(),
params.cachedResponseBodyAttributeName,
cachedExchange
)
}
.flatMap(chain::filter)
}
}, params.order)
}
private fun cacheResponse(response: String, attributeName: String?, exchange: ServerWebExchange): ServerWebExchange{
if(!attributeName.isNullOrBlank()){
exchange.attributes[attributeName] = response
}
return exchange
}
private fun executeRequest(exchange: ServerWebExchange, params: Params): Mono<ResponseEntity<String>>{
val request = when(exchange.request.method){
HttpMethod.PUT -> WebClient.create().put().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
HttpMethod.POST -> WebClient.create().post().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
HttpMethod.GET -> WebClient.create().get().uri(params.proxyPath)
HttpMethod.DELETE -> WebClient.create().delete().uri(params.proxyPath)
else -> throw Exception("Invalid request method passed in to the proxy filter")
}
return request.headers { headers ->
headers.addAll(exchange.request.headers)
headers.remove(CONTENT_LENGTH)
}
.exchange()
.flatMap{ response ->
response.toEntity(String::class.java)
}
}
private fun createProxyRequestBody(exchange: ServerWebExchange, attributeName: String?): BodyInserter<out Flux<out Any>, ReactiveHttpOutputMessage> {
val cachedBody = attributeName?.let { attrName ->
exchange.getAttributeOrDefault<String>(attrName, "null")
} ?: "null"
return if(cachedBody != "null"){
BodyInserters.fromPublisher(Flux.just(cachedBody), String::class.java)
} else {
BodyInserters.fromDataBuffers(exchange.request.body)
}
}
data class Params(
val proxyPath: String = "",
val cachedRequestBodyAttributeName: String? = null,
val cachedResponseBodyAttributeName: String? = null,
val order: Int = 0
)
}
Post-Proxy Filter Code (Not Working)
class PostProxyGatewayFilterFactory: AbstractGatewayFilterFactory<PostProxyGatewayFilterFactory.Params>(Params::class.java) {
override fun apply(params: Params): GatewayFilter {
return OrderedGatewayFilter(
{ exchange, chain ->
ServerWebExchangeUtils.cacheRequestBody(exchange){
val cachedExchange = exchange.mutate().request(it).build()
//Currently using a cached body does not work in post proxy
chain.filter(cachedExchange).then( Mono.fromRunnable{
executeRequest(cachedExchange, params)
.map { response ->
cacheResponse(
response.body.toString(),
params.cachedResponseBodyAttributeName,
cachedExchange
)
}
.flatMap {
Mono.empty<Void>()
}
})
}
}, params.order)
}
private fun cacheResponse(response: String, attributeName: String?, exchange: ServerWebExchange): ServerWebExchange{
if(!attributeName.isNullOrBlank()){
exchange.attributes[attributeName] = response
}
return exchange
}
private fun executeRequest(exchange: ServerWebExchange, params: Params): Mono<ResponseEntity<String>>{
val request = when(exchange.request.method){
HttpMethod.PUT -> WebClient.create().put().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
HttpMethod.POST -> WebClient.create().post().uri(params.proxyPath).body(createProxyRequestBody(exchange, params.cachedRequestBodyAttributeName))
HttpMethod.GET -> WebClient.create().get().uri(params.proxyPath)
HttpMethod.DELETE -> WebClient.create().delete().uri(params.proxyPath)
else -> throw Exception("Invalid request method passed in to the proxy filter")
}
return request.headers { headers ->
headers.addAll(exchange.request.headers)
headers.remove(CONTENT_LENGTH)
}
.exchange()
.flatMap{ response ->
response.toEntity(String::class.java)
}
}
private fun createProxyRequestBody(exchange: ServerWebExchange, attributeName: String?): BodyInserter<out Flux<out Any>, ReactiveHttpOutputMessage> {
val cachedBody = attributeName?.let { attrName ->
exchange.getAttributeOrDefault<String>(attrName, "null")
} ?: "null"
return if(cachedBody != "null"){
BodyInserters.fromPublisher(Flux.just(cachedBody), String::class.java)
} else {
BodyInserters.fromDataBuffers(exchange.request.body)
}
}
data class Params(
val proxyPath: String = "",
val cachedRequestBodyAttributeName: String? = null,
val cachedResponseBodyAttributeName: String? = null,
val order: Int = 0
)
}
question from:
https://stackoverflow.com/questions/66050798/spring-cloud-gateway-post-filter-web-client-request