【问题标题】:How to create an Observable that will marge a stream of Observables after they complete?如何创建一个 Observable,它会在完成后处理 Observable 流?
【发布时间】:2015-01-03 13:06:20
【问题描述】:

我正在使用 Jetty WebSocket Client 从客户端设置 WebSocket 连接。我正在尝试创建一个以 Observable 形式提供事件流的类。

我设法通过编写一个 POJO @WebSocket 类来实现这一点,该类将所有内容发布到 SerializedSubject<SocketEvent, SocketEvent> eventSubject = new SerializedSubject<>(PublishSubject.create()); 并且一切正常。

每次连接中断时如何让它重新连接?

我尝试从Observable.intervalflatMap-ing 开始到Observable<ObservableSocket.SocketEvent> connect(String url),每个连接都返回 Observable。

Observable<Long> reconnectObservable = Observable.interval(1000, TimeUnit.MILLISECONDS);
Observable<ObservableSocket.SocketEvent> composed = reconnectObservable.flatMap(aLong -> {
    try {
        System.out.println("Connect");
        return connect(url);
    } catch (Exception e) {
        System.out.println("Exception: " + e);
        return Observable.just(new ObservableSocket.SocketCloseEvent(999, "Exception: " + e));
    }
});

问题是,它每 1 秒创建另一个连接。如何让 flatMap 等待内部 Observable 完成?

【问题讨论】:

  • 你试过repeat吗?

标签: java rx-java


【解决方案1】:
Observable.range(0, Integer.MAX_VALUE).concatMap(tick -> { ... });

concatMap 维护一个 SerialSubscription,并且一次只会订阅一个发出的 observable,等待每一个终止。范围提供了无限信号(在这种情况下,无限以大约 20 亿结束:P),并且 concatMap 将根据创建的每个内部 observable 进行一次连接。

【讨论】:

    猜你喜欢
    • 2021-08-28
    • 2021-05-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-02-25
    • 1970-01-01
    • 1970-01-01
    • 2020-02-05
    相关资源
    最近更新 更多