WebFlux parsing request body in WebFilter

651 Views Asked by At

What is the right way to take the value from requestBody and put it in the Header?

  @Override
  public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
    return exchange.getRequest()
        .getBody()
        .map(dataBuffer -> dataBuffer.asInputStream(true))
        .reduce(SequenceInputStream::new)
        .map(inputStream -> new BufferedReader(new InputStreamReader(inputStream)).lines()
            .collect(Collectors.joining("\n")))
        .map(this::getRequestRqUid)
        .flatMap(rqUid -> chain.filter(exchange)
            .subscriberContext(ctx -> ctx.put(RQUID_HEADER, rqUid)));
  }

The problem is that I open Flux DataBuffer

How to properly close it or use a copy with DefaultDataBufferFactory?

1

There are 1 best solutions below

0
On BEST ANSWER

It turned out to implement as a basis took ServerWebExchangeUtils from spring-cloud-gateway-server

@Order
@Slf4j
@Component
@RequiredArgsConstructor
public class PrepareHeaderWebFilter implements WebFilter {

  private static final byte[] EMPTY_BYTES = {};
  public static final String CACHED_REQUEST_BODY_ATTR = "cachedRequestBody";
  public static final String CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR = "cachedServerHttpRequestDecorator";
  private static final List<HttpMessageReader<?>> MESSAGE_READERS = HandlerStrategies.withDefaults()
      .messageReaders();

  @Override
  public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
    return cacheRequestBody(exchange, serverHttpRequest -> {
      var exchangeMutate = exchange.mutate()
          .request(serverHttpRequest)
          .build();
      var serverRequest = ServerRequest.create(exchangeMutate, MESSAGE_READERS);

      return serverRequest.bodyToMono(BaseRequest.class)
          .map(baseRequest -> {
            exchange.getAttributes()
                .put(BaseRequest.Fields.rqUID, baseRequest.getRqUID());
            return baseRequest;
          })
          .then(removeCacheAndChain(exchange, chain));
    });
  }

  private static Mono<Void> cacheRequestBody(ServerWebExchange exchange, Function<ServerHttpRequest, Mono<Void>> function) {
    ServerHttpResponse response = exchange.getResponse();
    DataBufferFactory factory = response.bufferFactory();
    return DataBufferUtils.join(exchange.getRequest()
                                    .getBody())
        .defaultIfEmpty(factory.wrap(EMPTY_BYTES))
        .map(dataBuffer -> decorate(exchange, dataBuffer))
        .switchIfEmpty(Mono.just(exchange.getRequest()))
        .flatMap(function);
  }

  private static Mono<Void> removeCacheAndChain(ServerWebExchange exchange, WebFilterChain chain) {
    ServerHttpRequest cachedRequest = exchange.getAttribute(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);
    Assert.notNull(cachedRequest, "cache request shouldn't be null");
    exchange.getAttributes()
        .remove(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);
    return chain.filter(exchange.mutate()
                            .request(cachedRequest)
                            .build());
  }

  private static ServerHttpRequest decorate(ServerWebExchange exchange, DataBuffer dataBuffer) {
    if (dataBuffer.readableByteCount() > 0) {
      if (log.isTraceEnabled()) {
        log.trace("retaining body in exchange attribute");
      }
      exchange.getAttributes()
          .put(CACHED_REQUEST_BODY_ATTR, dataBuffer);
    }

    ServerHttpRequest decorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
      @Override
      public Flux<DataBuffer> getBody() {
        return Mono.fromSupplier(() -> {
              if (exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR, null) == null) {
                // probably == downstream closed or no body
                return null;
              }
              if (dataBuffer instanceof NettyDataBuffer) {
                NettyDataBuffer pdb = (NettyDataBuffer) dataBuffer;
                return pdb.factory()
                    .wrap(pdb.getNativeBuffer()
                              .retainedSlice());
              } else if (dataBuffer instanceof DefaultDataBuffer) {
                DefaultDataBuffer ddf = (DefaultDataBuffer) dataBuffer;
                return ddf.factory()
                    .wrap(Unpooled.wrappedBuffer(ddf.getNativeBuffer())
                              .nioBuffer());
              } else {
                throw new IllegalArgumentException("Unable to handle DataBuffer of type " + dataBuffer.getClass());
              }
            })
            .flux();
      }
    };

    exchange.getAttributes()
        .put(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR, decorator);

    return decorator;
  }
}