【问题标题】:Java Reactor: How to produce Flux from stdin?Java Reactor:如何从标准输入生成 Flux?
【发布时间】:2018-03-05 17:04:57
【问题描述】:

我想从标准输入异步读取用户生成的消息。 比如:

Flux.from(stdinPublisher()) 
    .subscribe(msg -> System.out.println("Received: " + msg));

那么如何在这里实现这样的标准输入发布者呢?

【问题讨论】:

  • stdinPublisher方法的源码?
  • 是的。我想订阅和接收用户创建的消息。可能是stdinPublisher 方法实现或其他方法,无论如何。

标签: java project-reactor spring-webflux


【解决方案1】:

这很容易。抱歉打扰了:)

import java.util.Scanner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

@Component
@Slf4j
public class StdinProducerExample implements ApplicationRunner {

  @Override
  public void run(ApplicationArguments args) throws Exception {
    Flux
        .create(sink -> {
          Scanner scanner = new Scanner(System.in);
          while (scanner.hasNext()) {
            sink.next(scanner.nextLine());
          }
        })
        .subscribeOn(Schedulers.newSingle("stdin publisher"))
        .subscribe(m -> log.info("User message: {}", m));
    log.info("Started listening stdin");
  }

}

【讨论】:

    【解决方案2】:

    使用 Reactor 生成数据的另一种方法是 Processors

    FluxProcessor sinks 安全地控制多线程生产者,并且可以 由从多个线程生成数据的应用程序使用 同时。例如,您可以创建一个线程安全的序列化 为UnicastProcessor 下沉。多个生产者线程可能 在以下序列化接收器上同时生成数据:

    public class FluxProcessorSample {
    
      public static void main(String[] args) {    
        FluxProcessor<String, String> processor = UnicastProcessor.<String>create().serialize();
        FluxSink<String> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
    
        // Print input to STDOUT
        Executors.newSingleThreadScheduledExecutor()
            .execute(() -> processor
                .publishOn(Schedulers.elastic())
                .map(str -> "1>> " + str)
                .subscribe(System.out::println));
    
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
          sink.next(scanner.nextLine());
        }
      }
    }
    

    UnicastProcessor 可以通过使用内部缓冲区来处理背压。权衡是它最多可以有一个Subscriber。如果您在订阅者尚未请求数据时通过它推送任何数量的数据,它会缓冲所有数据。

    其他FluxProcessor implementations 是:

    • DirectProcessor - 可以将信号分派为零或更多 Subscribers。它有不处理背压的限制

    • EmitterProcessor - 可以在兑现时向多个订阅者发出 每个订阅者的背压。当它没有订阅者时, 它仍然可以接受一些数据推送到可配置的 bufferSize

    【讨论】:

      猜你喜欢
      • 2019-11-12
      • 1970-01-01
      • 2021-11-23
      • 2017-12-17
      • 1970-01-01
      • 2012-04-02
      • 2017-06-17
      • 1970-01-01
      • 2019-06-23
      相关资源
      最近更新 更多