【问题标题】:How to correlate log events in distributed Vertx system如何关联分布式 Vertx 系统中的日志事件
【发布时间】:2026-01-08 02:50:01
【问题描述】:

在vertx的multiple模块中做日志时,我们应该能够关联单个请求的所有日志是一个基本要求。

因为 vertx 是异步的,所以保存 logid、conversationid、eventid 的最佳位置是什么。

我们可以实施的任何解决方案或模式?

【问题讨论】:

  • 我打算在使用 vertx 框架的其余应用程序中使用 slf4j MDC。我在这里读到的是,你不应该在 vertx 中使用 MDC,因为 MDC 是线程特定的,而 vertx 是线程不可知的。但是我在从一个verticles调用的服务开始时设置MDC变量,并在服务结束时调用MDC.clear()。我认为它将在一个线程内 - 一个事件的端到端事件处理它在本地工作正常,但我担心它在生产环境中是否准确,每分钟都有数千条消息。请有人指导。

标签: logging distributed vert.x


【解决方案1】:

在基于线程的系统中,当前上下文由当前线程持有,因此 MDC 或任何 ThreadLocal 都可以。

在基于 Actor 的系统(例如 Vertx)中,您的上下文就是消息,因此您必须为您发送的每条消息添加一个关联 ID。

对于任何处理程序/回调,您必须将其作为方法参数传递或引用最终方法变量。

要通过事件总线发送消息,您可以将有效负载包装在 JsonObject 中,并将关联 ID 添加到包装器对象中

vertx.eventBus().send("someAddr", 
  new JsonObject().put("correlationId", "someId")
                  .put("payload", yourPayload));

或者您可以使用 DeliveryOption 将关联 ID 作为标头添加

//send
vertx.eventBus().send("someAddr", "someMsg", 
            new DeliveryOptions().addHeader("correlationId", "someId"));

//receive    
vertx.eventBus().consumer("someAddr", msg -> {
        String correlationId = msg.headers().get("correlationId");
        ...
    });

还有更复杂的选项可能,例如在事件总线上使用拦截器,Emanuel Idi 用它来实现对 Vert.x 的 Zipkin 支持,https://github.com/emmanuelidi/vertx-zipkin,但我不确定此集成的当前状态.

【讨论】:

  • zipkin 支持实现为我提供了让拦截器方法正常工作所需的指针,最终代码非常少。
【解决方案2】:

令人惊讶的是,关于这个问题的好的答案很少发表,这很奇怪,考虑到它是多么容易。

假设您在收到请求或消息时在 MDC 上下文中设置了 correlationId,我发现传播它的最简单方法是使用拦截器在上下文之间传递值:

vertx.eventBus()
        .addInboundInterceptor(deliveryContext -> {
            MultiMap headers = deliveryContext.message().headers();
            if (headers.contains("correlationId")) {
                MDC.put("correlationId", headers.get("correlationId"));
                deliveryContext.next();
            }
        })
        .addOutboundInterceptor(deliveryContext -> {
            deliveryContext.message().headers().add("correlationId", MDC.get("correlationId"));
            deliveryContext.next();
        });

【讨论】:

  • 即使您的 Verticle 不在同一个 jvm 中运行,这也应该可以工作。
【解决方案3】:

如果多个模块是指在同一个 Vertx 实例上运行多个 Verticle,您应该能够使用普通的日志库,例如 SLF4J、Log4J、JUL 等。然后您可以将日志保存在您选择的目录中,例如/var/logs/appName.

但是,如果您的意思是如何关联多个 Vertx 实例之间的日志,那么我建议您研究 GrayLog 或类似的分布式/集中式日志记录应用程序。如果您对每个请求使用唯一 ID,则可以将其传递并在日志中使用它。或者根据您的授权系统,如果您对每个请求使用唯一令牌,则可以记录这些令牌。集中式日志记录系统可用于根据该信息汇总和过滤日志。

【讨论】:

  • 我们已经有了 splunk,用于索引和维护日志。我们为每个请求使用唯一 ID,因此要在日志中搜索特定请求,每个日志语句都应该有权访问唯一的请求 ID。由于 vertx 是异步的并且可能会创建多个线程,因此目前唯一的解决方案是将唯一的请求 id 传递到我们想要记录的代码中的所有位置。我的问题是任何其他开箱即用的解决方案或更好的方法?
【解决方案4】:

Clive Evans 提供的拦截器示例效果很好。我添加了一个更详细的示例,展示了它是如何工作的:

import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.time.Duration;
import java.util.UUID;

public class PublisherSubscriberInterceptor {

  private static final Logger LOG = LoggerFactory.getLogger(PublisherSubscriberInterceptor.class);
  public static final String ADRESS = "sender.address";


  public static void main(String[] args) {
    Vertx vertx = Vertx.vertx();
    createInterceptors(vertx);

    vertx.deployVerticle(new Publisher());
    vertx.deployVerticle(new Subscriber1());

    //For our example lets deploy subscriber2 2 times.
    vertx.deployVerticle(Subscriber2.class.getName(), new DeploymentOptions().setInstances(2));
  }

  private static void createInterceptors(Vertx vertx) {
    vertx.eventBus()
      .addInboundInterceptor(deliveryContext -> {
        MultiMap headers = deliveryContext.message().headers();
        if (headers.contains("myId")) {
          MDC.put("myId", headers.get("myId"));
          deliveryContext.next();
        }
      })
      .addOutboundInterceptor(deliveryContext -> {
        deliveryContext.message().headers().add("myId", MDC.get("myId"));
        deliveryContext.next();
      });
  }

  public static class Publisher extends AbstractVerticle {

    @Override
    public void start(Promise<Void> startPromise) throws Exception {
      startPromise.complete();
      vertx.setPeriodic(Duration.ofSeconds(5).toMillis(), id -> {
        MDC.put("myId", UUID.randomUUID().toString());
      vertx.eventBus().publish(Publish.class.getName(), "A message for all");
      });
    }
  }

  public static class Subscriber1 extends AbstractVerticle {
    private static final Logger LOG = LoggerFactory.getLogger(Subscriber1.class);

    @Override
    public void start(Promise<Void> startPromise) throws Exception {
      startPromise.complete();
      vertx.eventBus().consumer(Publish.class.getName(), message-> {
        LOG.debug("Subscriber1 Received: {}", message.body());
      });
    }
  }

  public static class Subscriber2 extends AbstractVerticle {
    private static final Logger LOG = LoggerFactory.getLogger(Subscriber2.class);

    @Override
    public void start(Promise<Void> startPromise) throws Exception {
      startPromise.complete();
      vertx.eventBus().consumer(Publish.class.getName(), message-> {
        LOG.debug("Subscriber2 Received: {}", message.body());
      });
    }
  }
}

您可以查看发布 2 条消息的日志示例:

    13:37:14.315 [vert.x-eventloop-thread-3][myId=a2f0584c-9d4e-48a8-a724-a24ea12f7d80] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber2 - Subscriber2 Received: A message for all
    13:37:14.315 [vert.x-eventloop-thread-1][myId=a2f0584c-9d4e-48a8-a724-a24ea12f7d80] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber1 - Subscriber1 Received: A message for all
    13:37:14.315 [vert.x-eventloop-thread-4][myId=a2f0584c-9d4e-48a8-a724-a24ea12f7d80] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber2 - Subscriber2 Received: A message for all
    13:37:19.295 [vert.x-eventloop-thread-1][myId=63b5839e-3b0b-43a5-b379-92bd1466b870] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber1 - Subscriber1 Received: A message for all
    13:37:19.295 [vert.x-eventloop-thread-3][myId=63b5839e-3b0b-43a5-b379-92bd1466b870] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber2 - Subscriber2 Received: A message for all
    13:37:19.295 [vert.x-eventloop-thread-4][myId=63b5839e-3b0b-43a5-b379-92bd1466b870] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber2 - Subscriber2 Received: A message for all

【讨论】:

    【解决方案5】:

    使用vertx-syncThreadLocal 作为关联ID。 (即“FiberLocal”)。非常适合我。

    【讨论】: