【问题标题】:How to log request and response bodies in Spring WebFlux如何在 Spring WebFlux 中记录请求和响应主体
【发布时间】:2017-12-27 15:27:13
【问题描述】:

我希望使用 Kotlin 在 Spring WebFlux 上的 REST API 中集中记录请求和响应。到目前为止,我已经尝试过这种方法

@Bean
fun apiRouter() = router {
    (accept(MediaType.APPLICATION_JSON) and "/api").nest {
        "/user".nest {
            GET("/", userHandler::listUsers)
            POST("/{userId}", userHandler::updateUser)
        }
    }
}.filter { request, next ->
    logger.info { "Processing request $request with body ${request.bodyToMono<String>()}" }
    next.handle(request).doOnSuccess { logger.info { "Handling with response $it" } }
}

这里请求方法和路径记录成功,但是body是Mono,那我应该怎么记录呢?是否应该反过来,我必须订阅请求正文 Mono 并将其记录在回调中? 另一个问题是这里的ServerResponse 接口无权访问响应正文。我怎样才能在这里得到它?


我尝试过的另一种方法是使用WebFilter

@Bean
fun loggingFilter(): WebFilter =
        WebFilter { exchange, chain ->
            val request = exchange.request
            logger.info { "Processing request method=${request.method} path=${request.path.pathWithinApplication()} params=[${request.queryParams}] body=[${request.body}]"  }

            val result = chain.filter(exchange)

            logger.info { "Handling with response ${exchange.response}" }

            return@WebFilter result
        }

同样的问题:请求正文是Flux,没有响应正文。

有没有办法访问来自某些过滤器的日志记录的完整请求和响应?我不明白什么?

【问题讨论】:

标签: spring-boot kotlin project-reactor spring-webflux


【解决方案1】:

这或多或少类似于 Spring MVC 中的情况。

在 Spring MVC 中,您可以使用AbstractRequestLoggingFilter 过滤器和ContentCachingRequestWrapper 和/或ContentCachingResponseWrapper。这里有很多权衡:

  • 如果您想访问 servlet 请求属性,您需要实际读取和解析请求正文
  • 记录请求正文意味着缓冲请求正文,这会占用大量内存
  • 如果您想访问响应正文,则需要包装响应并在写入响应正文时对其进行缓冲,以供以后检索

ContentCaching*Wrapper 类在 WebFlux 中不存在,但您可以创建类似的。但请记住这里的其他要点:

  • 在内存中缓冲数据会以某种方式违反反应式堆栈,因为我们正在尝试利用可用资源来提高效率
  • 您不应该篡改实际的数据流并且刷新频率高于/低于预期,否则您可能会破坏流式使用案例
  • 在该级别,您只能访问DataBuffer 实例,它们是(大致)内存效率高的字节数组。这些属于缓冲池,并被回收用于其他交换。如果未正确保留/释放这些内容,则会产生内存泄漏(并且缓冲数据以供以后使用当然适合这种情况)
  • 再次在该级别,它只是字节,您无权访问任何编解码器来解析 HTTP 正文。如果内容一开始就不是人类可读的,我会忘记缓冲内容

您问题的其他答案:

  • 是的,WebFilter 可能是最好的方法
  • 不,您不应该订阅请求正文,否则您将使用处理程序无法读取的数据;您可以在doOn 操作符中flatMap 请求和缓冲数据
  • 包装响应应该可以让您在编写响应主体时访问它;不过不要忘记内存泄漏

【讨论】:

  • 感谢您的详细解答。看起来这样的高级过滤(和日志记录)违背了核心反应意识形态,我应该考虑将日志记录转移到业务级别(至少对于响应)
  • @brian-clozel,“flatMap on the request”是什么意思?你能详细说明一下吗?
  • 你能详细说明retain/release模型吗?我看到它在StringDecoder 中使用过,但不太明白。 PooledDataBuffer 文档在这方面毫无用处。
  • 关于“你为什么要这样做”:我有一个用例,为了可见性目的,我们必须保留每条“消息”(下游和上游的请求/响应)。我可以在我的控制器方法中接受字符串,并自己进行解析,但那是……意思吗?我也可以在持久化之前序列化 POJO,但这只会浪费资源。所以我想,可能有一种方法可以“潜入”WebFlux/Netty 管道,这样我就可以在处理请求体时将其表示在内存中,以实现持久性目的。不知道为什么这会比手动序列化更糟糕。
【解决方案2】:

我没有找到记录请求/响应正文的好方法,但如果您只是对元数据感兴趣,那么您可以按照以下方式进行。

import org.springframework.http.HttpHeaders
import org.springframework.http.HttpStatus
import org.springframework.http.server.reactive.ServerHttpResponse
import org.springframework.stereotype.Component
import org.springframework.web.server.ServerWebExchange
import org.springframework.web.server.WebFilter
import org.springframework.web.server.WebFilterChain
import reactor.core.publisher.Mono

@Component
class LoggingFilter(val requestLogger: RequestLogger, val requestIdFactory: RequestIdFactory) : WebFilter {
    val logger = logger()

    override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
        logger.info(requestLogger.getRequestMessage(exchange))
        val filter = chain.filter(exchange)
        exchange.response.beforeCommit {
            logger.info(requestLogger.getResponseMessage(exchange))
            Mono.empty()
        }
        return filter
    }
}

@Component
class RequestLogger {

    fun getRequestMessage(exchange: ServerWebExchange): String {
        val request = exchange.request
        val method = request.method
        val path = request.uri.path
        val acceptableMediaTypes = request.headers.accept
        val contentType = request.headers.contentType
        return ">>> $method $path ${HttpHeaders.ACCEPT}: $acceptableMediaTypes ${HttpHeaders.CONTENT_TYPE}: $contentType"
    }

    fun getResponseMessage(exchange: ServerWebExchange): String {
        val request = exchange.request
        val response = exchange.response
        val method = request.method
        val path = request.uri.path
        val statusCode = getStatus(response)
        val contentType = response.headers.contentType
        return "<<< $method $path HTTP${statusCode.value()} ${statusCode.reasonPhrase} ${HttpHeaders.CONTENT_TYPE}: $contentType"
    }

    private fun getStatus(response: ServerHttpResponse): HttpStatus =
        try {
            response.statusCode
        } catch (ex: Exception) {
            HttpStatus.CONTINUE
        }
}

【讨论】:

  • 我每次使用这种方法都会获得 http 100 状态(因为response.statusCode 为空)。到目前为止,我还无法弄清楚如何在WebFilter 中正确获取响应的状态代码。有人知道吗?
  • 该死的“var”东西。
【解决方案3】:

我对 Spring WebFlux 很陌生,我不知道如何在 Kotlin 中做到这一点,但应该与在 Java 中使用 WebFilter 相同:

public class PayloadLoggingWebFilter implements WebFilter {

    public static final ByteArrayOutputStream EMPTY_BYTE_ARRAY_OUTPUT_STREAM = new ByteArrayOutputStream(0);

    private final Logger logger;
    private final boolean encodeBytes;

    public PayloadLoggingWebFilter(Logger logger) {
        this(logger, false);
    }

    public PayloadLoggingWebFilter(Logger logger, boolean encodeBytes) {
        this.logger = logger;
        this.encodeBytes = encodeBytes;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        if (logger.isInfoEnabled()) {
            return chain.filter(decorate(exchange));
        } else {
            return chain.filter(exchange);
        }
    }

    private ServerWebExchange decorate(ServerWebExchange exchange) {
        final ServerHttpRequest decorated = new ServerHttpRequestDecorator(exchange.getRequest()) {

            @Override
            public Flux<DataBuffer> getBody() {

                if (logger.isDebugEnabled()) {
                    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    return super.getBody().map(dataBuffer -> {
                        try {
                            Channels.newChannel(baos).write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
                        } catch (IOException e) {
                            logger.error("Unable to log input request due to an error", e);
                        }
                        return dataBuffer;
                    }).doOnComplete(() -> flushLog(baos));

                } else {
                    return super.getBody().doOnComplete(() -> flushLog(EMPTY_BYTE_ARRAY_OUTPUT_STREAM));
                }
            }

        };

        return new ServerWebExchangeDecorator(exchange) {

            @Override
            public ServerHttpRequest getRequest() {
                return decorated;
            }

            private void flushLog(ByteArrayOutputStream baos) {
                ServerHttpRequest request = super.getRequest();
                if (logger.isInfoEnabled()) {
                    StringBuffer data = new StringBuffer();
                    data.append('[').append(request.getMethodValue())
                        .append("] '").append(String.valueOf(request.getURI()))
                        .append("' from ")
                            .append(
                                Optional.ofNullable(request.getRemoteAddress())
                                            .map(addr -> addr.getHostString())
                                        .orElse("null")
                            );
                    if (logger.isDebugEnabled()) {
                        data.append(" with payload [\n");
                        if (encodeBytes) {
                            data.append(new HexBinaryAdapter().marshal(baos.toByteArray()));
                        } else {
                            data.append(baos.toString());
                        }
                        data.append("\n]");
                        logger.debug(data.toString());
                    } else {
                        logger.info(data.toString());
                    }

                }
            }
        };
    }

}

这里有一些测试:github

我认为这就是 Brian Clozel (@brian-clozel) 的意思。

【讨论】:

  • 这与反应式编程背道而驰,您正在缓冲整个内容。当然不是布赖恩说的。
  • 我发现这个例子很有帮助。我可以使用这种机制将请求 JSON 保存在数据库中以维护审计跟踪。
  • @Silvmike 这仅适用于调用 getBody() 的 POST 请求。如果我必须为 GET 请求调用 flushLog 应该怎么做?在这种情况下不会调用 getBody()。
  • 我做了一个 hack,如果请求类型是 GET,我会覆盖 getMethodValue() 并调用 flushLog。
【解决方案4】:

布赖恩说了什么。此外,记录请求/响应主体对于反应式流媒体没有意义。如果您将通过管道的数据想象为流,那么您在任何时候都没有完整的内容除非您缓冲它,这会破坏整个观点。对于小的请求/响应,您可以摆脱缓冲,但为什么要使用反应模型(除了给您的同事留下深刻印象:-))?

我能想到的记录请求/响应的唯一原因是调试,但是对于反应式编程模型,调试方法也必须修改。 Project Reactor 文档有一个很好的调试部分,您可以参考:http://projectreactor.io/docs/core/snapshot/reference/#debugging

【讨论】:

  • 它用于在开发过程中进行调试。没有人在生产中启用调试。我在另一篇文章中详细解释了为什么需要调试stackoverflow.com/questions/47596571/…
  • 这里我填写的场景与调试无关,假设您有一个重试策略配置为在返回 HttpStatus 503/504 时采取行动,在我看来,能够记录这个响应不仅仅是简单的调试事情,它可能会为我们提供有用的信息,说明如果 API 这样做,在给定时刻发生这种情况的原因,所以我希望我能在合理的而不是 500 行代码中找到一种方法经过几天的寻找,我已经看到了,对这个基本的东西真的很沮丧。
【解决方案5】:

您实际上可以为 Netty 和 Reactor-Netty 相关的调试日志启用以查看正在发生的事情的全貌。你可以玩下面的,看看你想要什么,不要什么。这是我能做的最好的了。

reactor.ipc.netty.channel.ChannelOperationsHandler: DEBUG
reactor.ipc.netty.http.server.HttpServer: DEBUG
reactor.ipc.netty.http.client: DEBUG
io.reactivex.netty.protocol.http.client: DEBUG
io.netty.handler: DEBUG
io.netty.handler.proxy.HttpProxyHandler: DEBUG
io.netty.handler.proxy.ProxyHandler: DEBUG
org.springframework.web.reactive.function.client: DEBUG
reactor.ipc.netty.channel: DEBUG

【讨论】:

  • 这是一个用于本地调试的选项,但我们不能在生产实例中使用它,因为它也会暴露标题内容。
【解决方案6】:

假设我们正在处理一个简单的 JSON 或 XML 响应,如果相应记录器的 debug 级别由于某种原因不够,可以在将其转换为对象之前使用字符串表示:

Mono<Response> mono = WebClient.create()
                               .post()
                               .body(Mono.just(request), Request.class)
                               .retrieve()
                               .bodyToMono(String.class)
                               .doOnNext(this::sideEffectWithResponseAsString)
                               .map(this::transformToResponse);

以下是副作用和转化方法:

private void sideEffectWithResponseAsString(String response) { ... }
private Response transformToResponse(String response) { /*use Jackson or JAXB*/ }    

【讨论】:

    【解决方案7】:

    这是我为 java 想出的。

    public class RequestResponseLoggingFilter implements WebFilter {
    
        @Override
        public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
            ServerHttpRequest httpRequest = exchange.getRequest();
            final String httpUrl = httpRequest.getURI().toString();
    
            ServerHttpRequestDecorator loggingServerHttpRequestDecorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
                String requestBody = "";
    
                @Override
                public Flux<DataBuffer> getBody() {
                    return super.getBody().doOnNext(dataBuffer -> {
                        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
                            Channels.newChannel(byteArrayOutputStream).write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
                            requestBody = IOUtils.toString(byteArrayOutputStream.toByteArray(), "UTF-8");
                            commonLogger.info(LogMessage.builder()
                                    .step(httpUrl)
                                    .message("log incoming http request")
                                    .stringPayload(requestBody)
                                    .build());
                        } catch (IOException e) {
                            commonLogger.error(LogMessage.builder()
                                    .step("log incoming request for " + httpUrl)
                                    .message("fail to log incoming http request")
                                    .errorType("IO exception")
                                    .stringPayload(requestBody)
                                    .build(), e);
                        }
                    });
                }
            };
    
            ServerHttpResponseDecorator loggingServerHttpResponseDecorator = new ServerHttpResponseDecorator(exchange.getResponse()) {
                String responseBody = "";
                @Override
                public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                    Mono<DataBuffer> buffer = Mono.from(body);
                    return super.writeWith(buffer.doOnNext(dataBuffer -> {
                        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
                            Channels.newChannel(byteArrayOutputStream).write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
                            responseBody = IOUtils.toString(byteArrayOutputStream.toByteArray(), "UTF-8");
                            commonLogger.info(LogMessage.builder()
                                    .step("log outgoing response for " + httpUrl)
                                    .message("incoming http request")
                                    .stringPayload(responseBody)
                                    .build());
                        } catch (Exception e) {
                            commonLogger.error(LogMessage.builder()
                                    .step("log outgoing response for " + httpUrl)
                                    .message("fail to log http response")
                                    .errorType("IO exception")
                                    .stringPayload(responseBody)
                                    .build(), e);
                        }
                    }));
                }
            };
            return chain.filter(exchange.mutate().request(loggingServerHttpRequestDecorator).response(loggingServerHttpResponseDecorator).build());
        }
    
    }
    

    【讨论】:

      【解决方案8】:

      这是带有完整实现GitHub Repo,用于记录请求和响应主体以及基于 webflux/java 的应用程序的 http 标头...

      【讨论】:

        【解决方案9】:

        如果您使用控制器而不是处理程序,最好的方法是使用 @Log 注释对控制器类进行注释的 aop。仅供参考,这将普通 json 对象作为请求而不是单声道。

        @Target(AnnotationTarget.FUNCTION)
        @Retention(AnnotationRetention.RUNTIME)
        annotation class Log
        
        @Aspect
        @Component
        class LogAspect {
            companion object {
                val log = KLogging().logger
            }
        
            @Around("@annotation(Log)")
            @Throws(Throwable::class)
            fun logAround(joinPoint: ProceedingJoinPoint): Any? {
                val start = System.currentTimeMillis()
                val result = joinPoint.proceed()
                return if (result is Mono<*>) result.doOnSuccess(getConsumer(joinPoint, start)) else result
            }
        
            fun getConsumer(joinPoint: ProceedingJoinPoint, start: Long): Consumer<Any>? {
                return Consumer {
                    var response = ""
                    if (Objects.nonNull(it)) response = it.toString()
                    log.info(
                        "Enter: {}.{}() with argument[s] = {}",
                        joinPoint.signature.declaringTypeName, joinPoint.signature.name,
                        joinPoint.args
                    )
                    log.info(
                        "Exit: {}.{}() had arguments = {}, with result = {}, Execution time = {} ms",
                        joinPoint.signature.declaringTypeName, joinPoint.signature.name,
                        joinPoint.args[0],
                        response, System.currentTimeMillis() - start
                    )
                }
            }
        }
        

        【讨论】:

          【解决方案10】:

          自 Spring Boot 2.2.x 起,Spring Webflux 支持Kotlin coroutines。使用协程,您可以拥有非阻塞调用的优势,而无需处理 Mono 和 Flux 包装的对象。它为ServerRequestServerResponse 添加了扩展,添加了ServerRequest#awaitBody()ServerResponse.BodyBuilder.bodyValueAndAwait(body: Any) 等方法。所以你可以像这样重写你的代码:

          @Bean
          fun apiRouter() = coRouter {
              (accept(MediaType.APPLICATION_JSON) and "/api").nest {
                  "/user".nest {
                      /* the handler methods now use ServerRequest and ServerResponse directly
                       you just need to add suspend before your function declaration:
                       suspend fun listUsers(ServerRequest req, ServerResponse res) */ 
                      GET("/", userHandler::listUsers)
                      POST("/{userId}", userHandler::updateUser)
                  }
              }
          
              // this filter will be applied to all routes built by this coRouter
              filter { request, next ->
                // using non-blocking request.awayBody<T>()
                logger.info("Processing $request with body ${request.awaitBody<String>()}")
                  val res = next(request)
                  logger.info("Handling with Content-Type ${res.headers().contentType} and status code ${res.rawStatusCode()}")
                  res 
              }
          }
          

          为了用coRoutines创建一个WebFilter Bean,我觉得可以用这个CoroutineWebFilter接口(我没测试过,不知道能不能用)。

          【讨论】:

            【解决方案11】:

            我认为这里要做的适当的事情是以异步方式将每个请求的内容写入文件(java.nio)并设置一个间隔来异步读取这些请求正文文件并将它们写入日志一种内存使用感知方式(一次至少一个文件,但一次最多 100 mb),并在记录它们后从磁盘中删除文件。

            【讨论】:

            • 您的答案可以通过额外的支持信息得到改进。请edit 添加更多详细信息,例如引用或文档,以便其他人可以确认您的答案是正确的。你可以找到更多关于如何写好答案的信息in the help center
            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 2020-08-25
            • 2021-03-03
            • 2015-10-07
            • 2019-01-10
            • 2019-10-02
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多