【问题标题】:How to lock the thread until the Observable be complete如何锁定线程直到 Observable 完成
【发布时间】:2017-05-18 13:27:50
【问题描述】:

我正在测试一个场景,我想发送一个事件,并观察消费者何时完成处理以继续流程,即当我触发事件时,我需要阻塞该主线程直到结束消费者处理,使用rxJava Observable,我没有成功锁定主线程等待observable结果。

我的制作人

@Service
public class Producer {

    private MessageChannel output;

    @Autowired
    private Consumer consumer;

    @Autowired
    public Producer(Processor processor) {
        this.output = processor.output();
    }

    public void send(String event) {

        System.out.println("SENDING EVENT...");

        output.send(MessageBuilder.withPayload(event).build());

        //Observable<Boolean> obs = consumer.execute();
        //obs.subscribe();

        //Blocking process
        BlockingObservable.from(consumer.execute()).subscribe();

        //Continue to flow
        System.out.println("EVENT PROCESSED...");

    }
}

我的消费者

@Service
public class Consumer {

    @StreamListener(target = Processor.INPUT)
    public void receiver(@Payload String event){

        System.out.println("EVENT RECEIVED, PROCESSING...");
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        execute();

    }

    public Observable<Boolean> execute() {
        return Observable.<Boolean>create(emitter -> {
            try {
                System.out.println("EVENT STILL PROCESSING...");
                emitter.onNext(Boolean.TRUE);
            } catch (Exception e) {
                emitter.onError(new RuntimeException("ERROR"));
            }
            emitter.onCompleted();
        });
    }
}

【问题讨论】:

    标签: java multithreading rx-java observable spring-cloud-stream


    【解决方案1】:

    您可以使用BlockingObservable.toFuture(consumer.execute()).get() 而不是BlockingObservable.from(consumer.execute()).subscribe() 来阻塞线程。

    【讨论】:

      【解决方案2】:

      需要使用运算符toBlockig(等待消费者消费)+single来获取值。

         @Test
          public void observableEvolveAndReturnToStringValue() {
              assertTrue(Observable.just(10)
                                   .map(String::valueOf)
                                   .toBlocking()
                                   .single()
                                   .equals("10"));
          }
      

      您可以在此处查看更多示例https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/utils/ObservableToBlocking.java

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多