对于 Spring 的响应式 WebClient,正确获取请求/响应日志记录确实很困难。
我有以下要求:
- 在一个日志语句中记录请求和响应包括正文(如果您在 AWS cloudwatch 中滚动浏览数百条日志,那么将所有内容放在一个语句中会更方便)
- 从日志中过滤敏感数据,例如个人数据或财务数据,以符合 GDPR 和 PCI
Wiretapping Netty 或使用custom Jackson en-/decoders 因此不是一种选择。
这是我对这个问题的看法(同样基于 Stanislav 出色的 answer)。
(以下代码使用Lombok注解处理,如果你还没有使用它,你可能也想使用它。否则应该很容易de-lombok)
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.http.client.reactive.ClientHttpRequestDecorator;
import org.springframework.lang.NonNull;
import org.springframework.util.StopWatch;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.lang.Math.min;
import static java.util.UUID.randomUUID;
import static net.logstash.logback.argument.StructuredArguments.v;
@Slf4j
@RequiredArgsConstructor
public class RequestLoggingFilterFunction implements ExchangeFilterFunction {
private static final int MAX_BYTES_LOGGED = 4_096;
private final String externalSystem;
@Override
@NonNull
public Mono<ClientResponse> filter(@NonNull ClientRequest request, @NonNull ExchangeFunction next) {
if (!log.isDebugEnabled()) {
return next.exchange(request);
}
var clientRequestId = randomUUID().toString();
var requestLogged = new AtomicBoolean(false);
var responseLogged = new AtomicBoolean(false);
var capturedRequestBody = new StringBuilder();
var capturedResponseBody = new StringBuilder();
var stopWatch = new StopWatch();
stopWatch.start();
return next
.exchange(ClientRequest.from(request).body(new BodyInserter<>() {
@Override
@NonNull
public Mono<Void> insert(@NonNull ClientHttpRequest req, @NonNull Context context) {
return request.body().insert(new ClientHttpRequestDecorator(req) {
@Override
@NonNull
public Mono<Void> writeWith(@NonNull Publisher<? extends DataBuffer> body) {
return super.writeWith(Flux.from(body).doOnNext(data -> capturedRequestBody.append(extractBytes(data)))); // number of bytes appended is maxed in real code
}
}, context);
}
}).build())
.doOnNext(response -> {
if (!requestLogged.getAndSet(true)) {
log.debug("| >>---> Outgoing {} request [{}]\n{} {}\n{}\n\n{}\n",
v("externalSystem", externalSystem),
v("clientRequestId", clientRequestId),
v("clientRequestMethod", request.method()),
v("clientRequestUrl", request.url()),
v("clientRequestHeaders", request.headers()), // filtered in real code
v("clientRequestBody", capturedRequestBody.toString()) // filtered in real code
);
}
}
)
.doOnError(error -> {
if (!requestLogged.getAndSet(true)) {
log.debug("| >>---> Outgoing {} request [{}]\n{} {}\n{}\n\nError: {}\n",
v("externalSystem", externalSystem),
v("clientRequestId", clientRequestId),
v("clientRequestMethod", request.method()),
v("clientRequestUrl", request.url()),
v("clientRequestHeaders", request.headers()), // filtered in real code
error.getMessage()
);
}
})
.map(response -> response.mutate().body(transformer -> transformer
.doOnNext(body -> capturedResponseBody.append(extractBytes(body))) // number of bytes appended is maxed in real code
.doOnTerminate(() -> {
if (stopWatch.isRunning()) {
stopWatch.stop();
}
})
.doOnComplete(() -> {
if (!responseLogged.getAndSet(true)) {
log.debug("| <---<< Response for outgoing {} request [{}] after {}ms\n{} {}\n{}\n\n{}\n",
v("externalSystem", externalSystem),
v("clientRequestId", clientRequestId),
v("clientRequestExecutionTimeInMillis", stopWatch.getTotalTimeMillis()),
v("clientResponseStatusCode", response.statusCode().value()),
v("clientResponseStatusPhrase", response.statusCode().getReasonPhrase()),
v("clientResponseHeaders", response.headers()), // filtered in real code
v("clientResponseBody", capturedResponseBody.toString()) // filtered in real code
);
}
})
.doOnError(error -> {
if (!responseLogged.getAndSet(true)) {
log.debug("| <---<< Error parsing response for outgoing {} request [{}] after {}ms\n{}",
v("externalSystem", externalSystem),
v("clientRequestId", clientRequestId),
v("clientRequestExecutionTimeInMillis", stopWatch.getTotalTimeMillis()),
v("clientErrorMessage", error.getMessage())
);
}
}
)
).build()
);
}
private static String extractBytes(DataBuffer data) {
int currentReadPosition = data.readPosition();
var numberOfBytesLogged = min(data.readableByteCount(), MAX_BYTES_LOGGED);
var bytes = new byte[numberOfBytesLogged];
data.read(bytes, 0, numberOfBytesLogged);
data.readPosition(currentReadPosition);
return new String(bytes);
}
}
成功交换的日志条目如下所示:
2021-12-07 17:14:04.029 DEBUG --- [ctor-http-nio-3] RequestLoggingFilterFunction : | >>---> Outgoing SnakeOil request [6abd0170-d682-4ca6-806c-bbb3559998e8]
POST https://localhost:8101/snake-oil/oauth/token
Content-Type: application/x-www-form-urlencoded
grant_type=client_credentials&client_id=*****&client_secret=*****
2021-12-07 17:14:04.037 DEBUG --- [ctor-http-nio-3] RequestLoggingFilterFunction : | <---<< Response for outgoing SnakeOil request [6abd0170-d682-4ca6-806c-bbb3559998e8] after 126ms
200 OK
Content-Type: application/json
Vary: [Accept-Encoding, User-Agent]
Transfer-Encoding: chunked
{"access_token":"*****","expires_in":"3600","token_type":"BearerToken"}
当然,错误情况也会得到妥善处理。