【问题标题】:How to create an observable and observer in RxJava?如何在 RxJava 中创建 observable 和观察者?
【发布时间】:2018-04-24 00:42:22
【问题描述】:

我有一个 void 监听函数来监听服务器推送的数据。我需要创建一个 observable 和观察者,以便我可以使用 onNext、onComplete 和 onError 处理数据。

【问题讨论】:

  • 到目前为止您尝试过什么?我们需要一些代码来帮助您。
  • 这里有一个关于RxJava的教程vogella.com/tutorials/RxJava/article.html,更具体的问题会给你更具体的答案。

标签: java rx-java rx-java2


【解决方案1】:

您可能想考虑使用BehaviourSubject

private final BehaviorSubject<YourImmutableDataClass> mServerObservable = BehaviorSubject.create();

private void update(YourImmutableDataClass next) {
    mServerObservable.onNext(next);
}

public Observable<YourImmutableDataClass> observe() {
    return mServerObservable.distinctUntilChanged();
}

【讨论】:

    【解决方案2】:

    以下是一些粗略的猜测。

    如果您尝试像在不确定的流中一样通过管道,rxjava 1.x 并不能轻松解决背压问题,但 Rxjava2 有一个更好的 Observable.create(..target) ,您可能会在其中使用 listen()实现调用目标的 onnext/onerror/oncomplete。

    当然,当订阅者取消订阅(如果是)时,还有很多代码需要添加,以便可以删除侦听器。但这是一个开始。

    一个可能的解决方案的元素演示:

    小心:这不是防弹代码,侦听器列表不是线程安全的。我只是暂时保持轻松。

    package tests.rxjava2;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Executors;
    import java.util.function.Consumer;
    
    import io.reactivex.Observable;
    import io.reactivex.disposables.Disposable;
    import io.reactivex.schedulers.Schedulers;
    
    public class TestRxjava2Basics {
    
        static void p(Object msg) {
            System.out.println(Thread.currentThread().getName()+"]: "+msg);
        }
    
        static void w(long delay) {
            try {
                Thread.sleep(delay);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws Exception {
            List<Consumer<String>> listeners = new ArrayList<>(); //NOT THREADSAFE!!!!!
            Consumer<String> c1 = s -> p("consumer permanent: "+s);
            listeners.add(c1);
    
            Thread machinegun = new Thread(() -> {
                while(!Thread.interrupted()) {
                    listeners.forEach(c -> c.accept(""+System.currentTimeMillis()));
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
            }, "gun");
    
            machinegun.start();
    
    //      for(int i=0; i<5; i++) {
    //          final int fi = i;
    //          Consumer<String> c = s -> p("consumer adapter "+fi+": "+s);
    //          listeners.add(c);
    //          Thread.sleep(1000);
    //          
    //          listeners.remove(c);
    //          Thread.sleep(1000);
    //      }
    
            //equivalent in RX:
            for(int i=0; i<5; i++) {
                final int fi = i;
                Disposable disp = Observable.create(tgt -> {
                        Consumer<String> c = s -> {
                            p("consumer adapter "+fi+": "+s);
                            tgt.onNext(s);
                        };
                        tgt.setCancellable(() -> {
                            p("cancelling consumer adapter "+fi);
                            listeners.remove(c);
                        });
                        listeners.add(c);
                    })
                    .doOnNext(s -> p("onnext "+fi+": "+s))
                    .subscribe();
    
                Thread.sleep(1000);
    
                disp.dispose();
                Thread.sleep(1000);
            }
    
            machinegun.interrupt();
        }
    
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-06-11
      • 2023-04-10
      • 1970-01-01
      • 1970-01-01
      • 2011-06-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多