【问题标题】:Why is Vert.x worker verticle called from multiple threads concurrently?为什么同时从多个线程调用 Vert.x worker verticle?
【发布时间】:2021-07-04 09:24:33
【问题描述】:

我用 Java (11) 编写的 vertx (4.0.2) 应用程序使用了一些数据量大的 Verticle,这些 Verticle 会导致延迟峰值,因为事件循环会被它们暂时阻塞。出于这个原因,我想将这些verticles部署为worker verticles,这样eventloop和其他verticles就不再被阻塞了。

不幸的是,我的应用程序现在崩溃了,因为 Verticle 内的事件处理是由多个线程同时执行的;(

如果我正确理解了 vertx 文档,则不会发生这种情况:

Vert.x 从不会由多个线程同时执行 Worker Verticle 实例,但可以在不同时间由不同线程执行。

我能够通过一个最小的示例重现该问题:

@Slf4j
public class WorkerTest extends AbstractVerticle {
  private static final String ADDRESS = "address";
  private volatile String currentThread = null;
  private long counter = 0;

  @Override
  public void start(final Promise<Void> startPromise) {
    vertx.eventBus().consumer(ADDRESS, this::handleMessage);
    startPromise.complete();
  }

  private void handleMessage(Message<Object> message) {
    final var _currentThread = this.currentThread;
    final var thisThread = Thread.currentThread().getName();

    if (_currentThread != null) {
      log.error(
          "concurrent callback: current thread={}, this thread={}", _currentThread, thisThread);
      return;
    }

    try {
      this.currentThread = thisThread;
      Thread.sleep(2);
      if (++counter % 100L == 0) {
        log.info("received {} messages (current thread: {})", counter, thisThread);
      }
    } catch (Exception e) {
    } finally {
      this.currentThread = null;
    }
  }

  public static void main(String[] args) {
    final Vertx vertx = Vertx.vertx();

    vertx.deployVerticle(
        new WorkerTest(),
        new DeploymentOptions().setWorker(true),
        result -> {
          if (result.failed()) {
            System.exit(1);
            return;
          }

          for (int i = 0; i < 1000; ++i) {
            vertx.eventBus().send(ADDRESS, "test");
          }
        });
  }
}

执行这会给我很多日志错误,因为handleMessage 是同时从多个线程调用的。如果我将 Verticle 部署为非工作人员,这将按预期工作。

我在这里做错了什么?

【问题讨论】:

  • 您唯一要检查的是不同的线程用于运行您的handleMessage,即在handleMessage(...) 方法线程的不同运行之间发生了变化。这是有效的,并且符合您从 vert.x 文档中取出的文本。剩下的修复是从开始到结束 handle(...) 没有线程切换发生,也没有其他线程同时运行 handleMessage
  • @mohamnag 谢谢你的回答,但我不确定我是否理解正确。据我了解,永远不应该记录错误日志消息,因为进入handle... 的每个线程都会在离开时将this.currentThread 设置为null。但这不是我所看到的,这意味着有多个线程同时执行 `handle...'。
  • 抱歉,第一行看错了。但是,我刚刚尝试了您的代码,但从未得到日志。你确定所有的代码都和你这里的一样吗? PS。为获得最佳结果,在调用 deployVerticle 时不要实例化你自己的 Verticle,要么传递类名,要么使用供应商变体 WorkerTest::new
  • 我刚刚将一个完整的 repro 推送到 github:github.com/eXistence/vertx-workertest 我的同事看到了同样的问题;(将 vertx 升级到 4.0.3 起初似乎解决了这个问题,但这只是因为它们以某种方式改变了调度和所有事件都由同一个线程处理。在更复杂的场景中,它仍然会中断;(
  • 4.0.2 有这个错误,但 4.0.3 解决了这个问题。不确定您的“复杂情况”,但请参阅我的回答以了解哪些工作正常

标签: java multithreading vert.x vertx-verticle


【解决方案1】:

vertx 4.0.2 似乎是您的问题。使用 vertx 4.0.3 和以下代码:


public class WorkerTest extends AbstractVerticle {
    private static final String ADDRESS = "address";

    private volatile boolean handleMessageInExecution = false;

    public static void main(String[] args) {
        final Vertx vertx = Vertx.vertx();

        vertx.deployVerticle(
                WorkerTest::new,
                new DeploymentOptions()
                        .setInstances(2)
                        .setWorkerPoolSize(10)
                        .setWorker(true)
                ,
                result -> {
                    for (int i = 0; i < 100; ++i) {
                        vertx.eventBus().send(ADDRESS, "test " + i);
                    }
                });
    }

    @Override
    public void start(final Promise<Void> startPromise) {
        vertx.eventBus().localConsumer(ADDRESS, this::handleMessage);
        startPromise.complete();
    }

    private void handleMessage(Message<String> message) {
        if (handleMessageInExecution) {
            // this should never happen, since each thread that sets this to true, will also set it to
            // false on exit.
            System.out.println(message.body() + " ERROR");
            return;
        }

        handleMessageInExecution = true; // this thread is now executing handleMessage
        System.out.println(message.body() + " START   " + Thread.currentThread());

        try {
            Thread.sleep(1); // block thread for a moment to simulate heavy load
        } catch (Exception e) {
            // ignore interruption
            e.printStackTrace();
        } finally {
            handleMessageInExecution = false; // we are done executing
            System.out.println(message.body() + " END     " + Thread.currentThread());
        }
    }
}

我们看到了这个输出,这是预期的(每条消息由一个线程处理,它从头到尾运行,没有并发,最多同时有 2 条消息,因为我们有 2 个实例):

test 1 START   Thread[vert.x-worker-thread-2,5,main]
test 0 START   Thread[vert.x-worker-thread-3,5,main]
test 0 END     Thread[vert.x-worker-thread-3,5,main]
test 1 END     Thread[vert.x-worker-thread-2,5,main]
test 2 START   Thread[vert.x-worker-thread-3,5,main]
test 3 START   Thread[vert.x-worker-thread-2,5,main]
test 3 END     Thread[vert.x-worker-thread-2,5,main]
test 2 END     Thread[vert.x-worker-thread-3,5,main]
test 5 START   Thread[vert.x-worker-thread-2,5,main]
test 4 START   Thread[vert.x-worker-thread-3,5,main]
test 4 END     Thread[vert.x-worker-thread-3,5,main]
test 6 START   Thread[vert.x-worker-thread-3,5,main]
test 5 END     Thread[vert.x-worker-thread-2,5,main]
test 7 START   Thread[vert.x-worker-thread-2,5,main]
test 6 END     Thread[vert.x-worker-thread-3,5,main]
test 8 START   Thread[vert.x-worker-thread-3,5,main]
test 7 END     Thread[vert.x-worker-thread-2,5,main]
test 9 START   Thread[vert.x-worker-thread-2,5,main]
test 8 END     Thread[vert.x-worker-thread-3,5,main]
test 10 START   Thread[vert.x-worker-thread-3,5,main]
test 9 END     Thread[vert.x-worker-thread-2,5,main]
test 11 START   Thread[vert.x-worker-thread-2,5,main]
test 10 END     Thread[vert.x-worker-thread-3,5,main]
...

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-07-18
    • 1970-01-01
    • 1970-01-01
    • 2013-02-02
    相关资源
    最近更新 更多