【问题标题】:Using Project Reactor in a Vert.x application在 Vert.x 应用程序中使用 Project Reactor
【发布时间】:2019-08-10 19:48:00
【问题描述】:

我在 Vert.x 应用程序中使用一个库,它返回 Project Reactor 类型 Mono

我有一个 Verticle,它接收这个响应类型,并打算通过事件总线将内容发送到另一个 Verticle:

import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;

public class HelperVerticle extends AbstractVerticle
{
    public static final String ADDRESS = "address_1";

    @Override
    public void start() throws Exception
    {
        vertx.eventBus().consumer(ADDRESS, this::consume);
    }

    private void consume(Message<Object> message)
    {
        Mono.delay(Duration.ofMillis(3000)) 
            .thenReturn("Content of Mono.") // this would come from external library
            .publishOn(Schedulers.fromExecutor(vertx.nettyEventLoopGroup())) // is this needed?
            .subscribe(output ->
            {
                System.out.println("My verticle: " + Thread.currentThread().getName());
                message.reply(output + " " + message.body());
            }, error -> message.fail(1, error.getMessage()));
    }
}

这是正确的方法吗?在将消息发送到事件总线之前,我应该切换到 Vert.x 事件循环线程池吗?一起使用这些库时有什么需要注意的吗?

【问题讨论】:

  • 我看不到verticle在哪里接收到Mono对象。
  • 在这个虚拟示例中,它是在verticle 中创建的。在真正的应用程序中,它将通过方法调用来自库。该方法将从 Verticle 调用。

标签: java reactive-programming vert.x project-reactor reactive-streams


【解决方案1】:

代码对我来说看起来不错,除了你不应该使用 Netty 事件循环组作为执行器,而是使用 Verticle 上下文:

public class HelperVerticle extends AbstractVerticle
{
    public static final String ADDRESS = "address_1";

    private Scheduler scheduler;

    @Override
    public void start() throws Exception
    {
        scheduler = Schedulers.fromExecutor(command -> context.runOnContext(v -> command.run()));
        vertx.eventBus().consumer(ADDRESS, this::consume);
    }

    private void consume(Message<Object> message)
    {
        Mono.delay(Duration.ofMillis(3000)) 
            .thenReturn("Content of Mono.") // this would come from external library
            .publishOn(scheduler)
            .subscribe(output ->
            {
                System.out.println("My verticle: " + Thread.currentThread().getName());
                message.reply(output + " " + message.body());
            }, error -> message.fail(1, error.getMessage()));
    }
}

使用这样的调度程序,您可以确保 Verticle 状态不会被分配给它的事件循环以外的线程修改。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-11
    • 1970-01-01
    • 2021-07-30
    • 2019-11-11
    • 2019-09-27
    • 1970-01-01
    相关资源
    最近更新 更多