【问题标题】:Project Reactor, using a Flux sink outside of the creation lambdaProject Reactor,在创建 lambda 之外使用 Flux sink
【发布时间】:2019-09-27 12:41:35
【问题描述】:
  • 当我的服务启动时,我想构建一个简单的管道。
  • 我想隔离 Flux 接收器或处理器,以便发出事件。
  • 事件将来自多个线程,应根据管道的 subscribeOn() 规范进行处理,但一切似乎都在 main 线程上运行。
  • 最好的方法是什么?我在下面附上了我的尝试。
  • (我使用的是 reactor-core v3.2.8.RELEASE。)
import org.junit.jupiter.api.Test;

import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

/**
 * I want to construct my React pipelines during creation,
 * then emit events over the lifetime of my services.
 */
public class React1Test
{
    /**
     * Attempt 1 - use a DirectProcessor and send items to it.
     * Doesn't work though - seems to always run on the main thread.
     */
    @Test
    public void testReact1() throws InterruptedException
    {
        // Create the flux and sink.
        FluxProcessor<String, String> fluxProcessor = DirectProcessor.<String>create().serialize();
        FluxSink<String> sink = fluxProcessor.sink();

        // Create the pipeline.
        fluxProcessor
            .doOnNext(str -> showDebugMsg(str))   // What thread do ops work on?
            .subscribeOn(Schedulers.elastic())
            .subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?

        // Give the multi-thread pipeline a second.
        Thread.sleep(1000);

        // Time passes ... things happen ...
        // Pass a few messages to the sink, emulating events.
        sink.next("a");
        sink.next("b");
        sink.next("c");

        // It's multi-thread so wait a sec to receive.
        Thread.sleep(1000);
    }

    // Used down below during Flux.create().
    private FluxSink<String> sink2;

    /**
     * Attempt 2 - use Flux.create() and its FluxSink object.
     * Also seems to always run on the main thread.
     */
    @Test
    public void testReact2() throws InterruptedException
    {
        // Create the flux and sink.
        Flux.<String>create(sink -> sink2 = sink)
            .doOnNext(str -> showDebugMsg(str))   // What thread do ops work on?
            .subscribeOn(Schedulers.elastic())
            .subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?

        // Give the multi-thread pipeline a second.
        Thread.sleep(1000);

        // Pass a few messages to the sink.
        sink2.next("a");
        sink2.next("b");
        sink2.next("c");

        // It's multi-thread so wait a sec to receive.
        Thread.sleep(1000);
    }

    // Show us what thread we're on.
    private static void showDebugMsg(String msg)
    {
        System.out.println(String.format("%s [%s]", msg, Thread.currentThread().getName()));
    }
}

输出总是:

a [main]
a [main]
b [main]
b [main]
c [main]
c [main]

但我期望的是:

a [elastic-1]
a [elastic-1]
b [elastic-2]
b [elastic-2]
c [elastic-3]
c [elastic-3]

提前致谢。

【问题讨论】:

    标签: java project-reactor reactor


    【解决方案1】:

    您看到[main] 是因为您从主线程调用onNext。 您使用的subscribeOn 仅用于订阅(当create 的 lambda 被触发时)。 如果您使用publishOn 而不是subscribeOn,您将看到elastic-* 线程被记录。

    另外,不鼓励使用Processors,将sinkFlux.create和类似的运算符存储为字段。

    【讨论】:

    • 至少subscribe() lambda 不应该在subscribeOn() 调度程序上运行吗?即使我添加了publishOn(),它也会在同一个线程上发布每个项目,例如elastic-1testReact1() 方法也使用处理器。谢谢。
    • 应该并且将会,但是您没有在create 中记录任何内容。
    • “相同的线程”是由于队列耗尽而导致的。如果所有事情都可以在一个线程上完成,为什么它会传播?
    • 您能否提供参考以确认“另外,请考虑使用处理器,不鼓励将接收器存储为字段。” ? IIUC,projectreactor.io/docs/core/release/reference 则相反:“我需要处理器吗?“生成器”运算符可以代替吗?(通常,这些运算符用于桥接非反应性 API,提供概念上相似的“接收器”到处理器,从某种意义上说,它允许您使用数据手动填充序列或终止它)。”。所以使用 Flux.create( sink -> /*store the sink*/ ).publish().connect() 是推荐的方法?
    • 您提供的链接与我所说的完全一致。存储从Flux.create 获得的sink 是不行的,不应该使用/推荐。
    【解决方案2】:
    • 您可以使用parallel()runOn() 而不是subscribeOn()sink.next() 运行多线程。
    • bsideup 也是正确的 - 您可以使用 publishOn() 强制下游操作员在不同的调度程序线程上运行。

    这是我更新的代码:

    import org.junit.jupiter.api.Test;
    
    import reactor.core.publisher.DirectProcessor;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.FluxProcessor;
    import reactor.core.publisher.FluxSink;
    import reactor.core.scheduler.Schedulers;
    
    /**
     * I want to construct my React pipelines during creation,
     * then emit events over the lifetime of my services.
     */
    public class React1Test
    {
        /**
         * Version 1 - use a DirectProcessor to dynamically emit items.
         */
        @Test
        public void testReact1() throws InterruptedException
        {
            // Create the flux and sink.
            FluxProcessor<String, String> fluxProcessor = DirectProcessor.<String>create().serialize();
            FluxSink<String> sink = fluxProcessor.sink();
    
            // Create the pipeline.
            fluxProcessor
                .parallel()
                .runOn(Schedulers.elastic())
                .doOnNext(str -> showDebugMsg(str))   // What thread do ops work on?
                .subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?
    
            // Give the multi-thread pipeline a second.
            Thread.sleep(1000);
    
            // Time passes ... things happen ...
            // Pass a few messages to the sink, emulating events.
            sink.next("a");
            sink.next("b");
            sink.next("c");
    
            // It's multi-thread so wait a sec to receive.
            Thread.sleep(1000);
        }
    
        // Used down below during Flux.create().
        private FluxSink<String> sink2;
    
        /**
         * Version 2 - use Flux.create() and its FluxSink object.
         */
        @Test
        public void testReact2() throws InterruptedException
        {
            // Create the flux and sink.
            Flux.<String>create(sink -> sink2 = sink)
                .parallel()
                .runOn(Schedulers.elastic())
                .doOnNext(str -> showDebugMsg(str))   // What thread do ops work on?
                .subscribe(str -> showDebugMsg(str)); // What thread does subscribe run on?
    
            // Give the multi-thread pipeline a second.
            Thread.sleep(1000);
    
            // Pass a few messages to the sink.
            sink2.next("a");
            sink2.next("b");
            sink2.next("c");
    
            // It's multi-thread so wait a sec to receive.
            Thread.sleep(1000);
        }
    
        // Show us what thread we're on.
        private static void showDebugMsg(String msg)
        {
            System.out.println(String.format("%s [%s]", msg, Thread.currentThread().getName()));
        }
    }
    

    两个版本都产生所需的多线程输出:

    a [elastic-2]
    b [elastic-3]
    c [elastic-4]
    b [elastic-3]
    a [elastic-2]
    c [elastic-4]
    

    【讨论】:

    • 答案与您的问题无关。你从来没有要求多线程。
    • 你说得对,这就是我没有将其标记为官方答案的原因,也是我提到 bsideup 的完美答案的原因。有两种方法(到目前为止)在 lambda 之外使用接收器 - 使用 .publishOn() 或使用 .parallel().runOn()。如果不清楚,我很抱歉。
    • 虽然parallel + runOn 的组合有效,但它并不适合它的设计。如果您需要将计算转移到另一个线程,publishOn 是应该使用的运算符。
    • parallel + runOn 组合是唯一将每个项目放在池中的单独线程上的版本。 publishOnsubscribeOn 将所有项目的计算移动到一个单独的线程。如果您知道实现此目的的更好方法,请告诉我!
    猜你喜欢
    • 2017-06-17
    • 2020-09-14
    • 2019-01-07
    • 1970-01-01
    • 2021-06-12
    • 2018-10-01
    • 2020-09-25
    • 2020-10-31
    • 1970-01-01
    相关资源
    最近更新 更多