【问题标题】:Binding an API callback to an RxJava Observable将 API 回调绑定到 RxJava Observable
【发布时间】:2016-03-30 17:58:34
【问题描述】:

我正在尝试制作一个响应式应用程序,该应用程序在单独的线程上侦听网络套接字的价格,并且对如何构建 Observable 感到有些困惑。我拥有的许多接口都受到我正在使用的 API 的限制,因此无法更改。我提炼了我想要做的测试作为下面的测试,但我看不到如何填写 getPriceReactive() 方法的主体,以便订阅者在控制台上打印价格(请参阅代码中的注释)。

public class PriceObservableTest {

   // This interface is defined externally and used by the API
   private interface ITickHandler {
       void priceReceived(double price);
   }

   // Stores the price (currently just one double for illustration)
   private class Tick {
       double price = Double.NaN;
   }

   // Implementation of handler called by API when it receives a price
   private class TickHandler implements ITickHandler {
       private final Tick tick;

       TickHandler() { this.tick = new Tick(); }

       @Override public void priceReceived(double x) { tick.price = x; }
   }

   // This class emulates the API delivering prices from the socket
   private class PriceSource {
      private final Thread thread;

      PriceSource(final ITickHandler handler) {
          thread = new Thread(new Runnable() {
              final Random r = new Random();
              @Override public void run() {
                  while (!Thread.currentThread().isInterrupted()) {
                      try {
                          Thread.sleep(100);
                          handler.priceReceived(r.nextDouble() * 100);
                      } catch (InterruptedException e) {
                          break;
                      }
                  }
                  System.out.println("Price thread closed");
              }
         });
      }

      void subscribe() { thread.start(); }

      void unsubscribe() { thread.interrupt(); }
  }

  @Test
  public void simpleTest() throws Exception {

      final ITickHandler handler = new TickHandler();

      // Simulate some prices received periodically from a socket
      PriceSource prices = new PriceSource(handler);

      Observable<Tick> reactive = getPriceReactive(handler);

      reactive.subscribe(new Subscriber<Tick>() {
          @Override public void onCompleted() { }
          @Override public void onError(Throwable e) { }
          @Override public void onNext(Tick tick) {
              System.out.println("Received price: " + tick.price);
          }});

      // Observe prices for 1 second. The subscriber should print them to console
      prices.subscribe();
      Thread.sleep(1000); 
      prices.unsubscribe();
   }

   // Returns an observable that reacts to price changes
   private Observable<Tick> getPriceReactive(ITickHandler handler) {
       return Observable.create(new Observable.OnSubscribe<Tick>() {
           @Override public void call(Subscriber<? super Tick> subscriber) {

              // How to call subscriber.onNext() whenever
              // priceReceived() is called with a new price?

           }
       });
   }
}

每当 API 调用 priceReceived() 时,就需要以某种方式调用 subscriber.onNext(),但我不太明白如何实现这一点。当然,我可以在TickHandler 中存储对订阅者的引用,但这违背了拥有Observable 的目的,不是吗?

【问题讨论】:

    标签: java system.reactive rx-java observable


    【解决方案1】:

    ITickHandler 实现中转换为Observable。您控制的不是订阅者,而是发布者

    private class TickHandler implements ITickHandler {
       private final Tick tick;
       private final PublishSubject<Tick> priceSubject;
    
      TickHandler() { 
           this.tick = new Tick(); 
           this.priceSubject = PublishSubject.create();
       }
    
       @Override public void priceReceived(double x)
       { 
            tick.price = x; 
            priceSubject.onNext(tick);
       }
    
       public Observable<Tick> priceReceivedObservable()
       {
           return priceSubject.asObservable();   
       }
    }
    

    你可以在你的测试中使用它,比如:

    final ITickHandler handler = new TickHandler();
    PriceSource prices = new PriceSource(handler);
    
    handler.priceReceivedObservable()
           .subscribe(new Subscriber<Tick>() {
              @Override public void onCompleted() { }
              @Override public void onError(Throwable e) { }
              @Override public void onNext(Tick tick) {
                  System.out.println("Received price: " + tick.price);
              }});
    

    我警告你,它没有经过测试,因为我不会做很多 Java :)

    【讨论】:

    • 主题的使用很有趣,但我对它们有点怀疑,因为 rx 文档似乎建议它们只在极少数情况下才需要。实际上,事实证明已经提出了一个非常相似的问题,该解决方案不涉及主题:stackoverflow.com/questions/20552598/…
    • @ScarletPumpernickel 我不会说这个问题或任何答案与您的问题相关(除非您想用自定义事件污染您的代码)。我还要说这是 Subject 的正确用例
    • 我会 +1 主题的使用 - 每当你想用外部事件驱动 Observable 时,它​​们都非常棒,可能并不完全在你的控制之下。 Observable.create() 也可以,但是会比较麻烦,并且需要处理(或阻止)多个订阅。
    • @TassosBassoukos 即使使用Observable.Create,您也必须使用Subject 或事件将API 调用转移到Observable,对吗?
    • 不一定 - 当 create() 中的回调运行时,您可以访问订阅者;您可以在那时手动驾驶它。
    猜你喜欢
    • 1970-01-01
    • 2020-11-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多