【发布时间】:2017-05-18 13:27:50
【问题描述】:
我正在测试一个场景,我想发送一个事件,并观察消费者何时完成处理以继续流程,即当我触发事件时,我需要阻塞该主线程直到结束消费者处理,使用rxJava Observable,我没有成功锁定主线程等待observable结果。
我的制作人
@Service
public class Producer {
private MessageChannel output;
@Autowired
private Consumer consumer;
@Autowired
public Producer(Processor processor) {
this.output = processor.output();
}
public void send(String event) {
System.out.println("SENDING EVENT...");
output.send(MessageBuilder.withPayload(event).build());
//Observable<Boolean> obs = consumer.execute();
//obs.subscribe();
//Blocking process
BlockingObservable.from(consumer.execute()).subscribe();
//Continue to flow
System.out.println("EVENT PROCESSED...");
}
}
我的消费者
@Service
public class Consumer {
@StreamListener(target = Processor.INPUT)
public void receiver(@Payload String event){
System.out.println("EVENT RECEIVED, PROCESSING...");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
execute();
}
public Observable<Boolean> execute() {
return Observable.<Boolean>create(emitter -> {
try {
System.out.println("EVENT STILL PROCESSING...");
emitter.onNext(Boolean.TRUE);
} catch (Exception e) {
emitter.onError(new RuntimeException("ERROR"));
}
emitter.onCompleted();
});
}
}
【问题讨论】:
标签: java multithreading rx-java observable spring-cloud-stream