【问题标题】:Guava EventBus dispatchingGuava EventBus 调度
【发布时间】:2014-02-22 00:32:16
【问题描述】:

我正在使用 Guava 的 EventBus 启动一些处理并报告结果。这是一个非常简单的可编译示例:

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;

public class Test {

    public static class InitiateProcessing { }
    public static class ProcessingStarted { }
    public static class ProcessingResults { }
    public static class ProcessingFinished { }

    public static EventBus bus = new EventBus();

    @Subscribe
    public void receiveStartRequest(InitiateProcessing evt) {
        System.out.println("Got processing request - starting processing");
        bus.post(new ProcessingStarted());

        System.out.println("Generating results");
        bus.post(new ProcessingResults());
        System.out.println("Generating more results");
        bus.post(new ProcessingResults());

        bus.post(new ProcessingFinished());
    }

    @Subscribe
    public void processingStarted(ProcessingStarted evt) {
        System.out.println("Processing has started");
    }

    @Subscribe
    public void resultsReceived(ProcessingResults evt) {
        System.out.println("got results");
    }

    @Subscribe
    public void processingComplete(ProcessingFinished evt) {
        System.out.println("Processing has completed");
    }


    public static void main(String[] args) {
        Test t = new Test();
        bus.register(t);
        bus.post(new InitiateProcessing());
    }
}

我使用这些事件作为其他软件组件做出反应以准备此处理的一种方式。例如,他们可能必须在处理之前保存其当前状态并在之后恢复它。

我希望这个程序的输出是:

Got processing request - starting processing
Processing has started
Generating results
got results
Generating more results
got results
Processing has completed

相反,实际的输出是:

Got processing request - starting processing
Generating results
Generating more results
Processing has started
got results
got results
Processing has completed

应该表明处理已经开始的事件实际上发生在实际处理之后(“生成结果”)。

查看源代码后,我明白为什么会这样。这是EventBus 的相关source code

  /**
   * Drain the queue of events to be dispatched. As the queue is being drained,
   * new events may be posted to the end of the queue.
   */
  void dispatchQueuedEvents() {
    // don't dispatch if we're already dispatching, that would allow reentrancy
    // and out-of-order events. Instead, leave the events to be dispatched
    // after the in-progress dispatch is complete.
    if (isDispatching.get()) {
        return;
    }
    // dispatch event (omitted)

发生的事情是因为我已经调度了顶级InitiateProcessing 事件,其余事件只是被推到队列的末尾。我希望它的行为类似于 .NET 事件,在所有处理程序完成之前调用事件不会返回。

我不太明白这种实现的原因。当然,可以保证事件是有序的,但是周围代码的顺序会完全扭曲。

有什么方法可以让总线按照描述的方式运行并产生所需的输出?我确实在 Javadocs 中读到了

EventBus 保证它不会从 多个线程同时进行,除非该方法明确允许 带有@AllowConcurrentEvents 注释。

但我认为这不适用于这里 - 我在单线程应用程序中看到了这个问题。

编辑

这里问题的原因是我来自订阅者的posting。由于事件总线不可重入,因此这些“子帖子”会排队并在第一个处理程序完成后处理。我可以注释掉EventBus 源代码中的if (isDispatching.get()) { return; } 部分,一切都按照我的预期运行——所以真正的问题是我这样做引入了哪些潜在问题?设计师们似乎做出了一个谨慎的决定,不允许重入。

【问题讨论】:

  • 看起来事件总线在它自己的线程中运行。这通常意味着,这些操作是异步执行的,并且(只要它是总线)保证按照其顺序交付,并且与主线程无关
  • @injecteer 它不运行它自己的线程。他们确实有一个AsyncEventBus 允许您指定一个Executor - 但我没有使用它。这都是单线程的。
  • 你可能是对的。虽然我认为,它们确实在一个新线程中运行 :) 您能否通过在每个处理 @Subscribe-d 方法中添加 System.out.println( "curr thread: " + Thread.currentThread().getName() ) 来测试它?
  • 你为什么要从 within 订阅者的 .post() 开始?这里有问题
  • 这正是我看到这个问题的原因。这不是一个有效的用例吗?从概念上讲,事件可以触发其他事件 - 我们一直在假设这没问题的情况下进行操作。

标签: java guava


【解决方案1】:

EventBus 通常按照以下原则运行,即向总线发布事件的代码不应该关心订阅者对事件做了什么或何时,除了尊重事件发布的顺序(在无论如何都是同步事件总线的情况)。

如果您希望在您的方法过程中的特定时间调用特定方法,并且您希望确保这些方法在您的方法继续之前完成(就像您在示例中看到的那样),为什么不直接调用这些方法呢?当您使用事件总线时,您明确地将您的代码与响应给定事件的确切情况分开。这在许多情况下是可取的,也是 EventBus 存在的主要原因,但它似乎并不是您想要的。

【讨论】:

  • “将我的代码与响应给定事件的确切情况分开”我想要的。我希望事件说“为发生的事情做好准备”——我不在乎订阅者必须如何或做什么才能做好准备。他们可能不需要做任何事情。我只是想确信,在发布活动之后,订阅者已经准备好,所以我可以继续采取一些行动。事件总线似乎非常适合这种情况,因为我可以轻松添加和删除可能以不同方式处理事件的组件。发布事件的代码无关紧要。
  • 到目前为止,我从答案中得出的结论是,这种行为是设计使然,事件总线不支持我的用例。我最终修改了总线以允许更适合我的用例的重入。
【解决方案2】:

我尝试总结一下 Guava 的 EventBus 事件传递行为:

如果在 t1 时刻发布了事件 E1,则通知所有订阅者。 如果其中一个订阅者在它的 @Subscribe 方法中发布了一个事件本身(片刻之后),那么“新”事件 E2 将被排入队列并在 之后交付。 Afterwards 在这里意味着:毕竟 t1E1 的所有 @Subscribe 方法都返回了。

将这种“级联”事件发布与广度优先树遍历进行比较。

这似乎是 EventBus 的明确选择设计。

【讨论】:

    【解决方案3】:

    我知道这个问题已经有 4 年了,但我今天遇到了同样的问题。有一个简单的(和违反直觉的)改变来获得你想要的行为。根据https://stackoverflow.com/a/53136251/1296767,您可以将 AsyncEventBus 与 DirectExecutor 一起使用:

    public static EventBus bus = new AsyncEventBus(MoreExecutors.newDirectExecutorService());
    

    使用上述更改运行您的测试代码,结果正是您想要的:

    Got processing request - starting processing
    Processing has started
    Generating results
    got results
    Generating more results
    got results
    Processing has completed
    

    【讨论】:

      【解决方案4】:

      虽然在所有“订阅者”都收到信号之前发布到 EventBus 不会返回..这些订阅者可能还没有开始执行。这意味着当第一个 bus.post 返回时,您将继续下一个帖子,而没有任何介入的订阅者开始处理。

      public void post(Object event) 向所有已注册的人发布事件 订户。该方法会在事件发生后成功返回 已发布给所有订阅者,无论有任何例外 订阅者抛出的。如果没有订阅者被订阅 event 的类,并且 event 还不是 DeadEvent,它将是 包装在 DeadEvent 中并重新发布。

      参数:event - 要发布的事件。

      【讨论】:

      • 这不太对 - 在这种情况下,当第一个帖子返回时,所有事件都已被处理。无论哪种方式 - 这都不能回答我是否可以让公共汽车按预期运行的问题。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2023-04-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多