【问题标题】:RxJava - how to achieve a delayed timeoutRxJava - 如何实现延迟超时
【发布时间】:2020-07-15 15:53:34
【问题描述】:

向(比我)更多 RxJava 技术熟练的人提出问题。

我订阅了一个 PublishSubject,它应该在 60 秒 + 5 秒的容差内发出结果消息。如果我没有及时收到,则认为是发送方有问题,我需要中止。

通常我会这样订阅它:

subject
.timeout(65, TimeUnit.SECONDS)
.subscribe(
// onNext
    { msg -> handleSuccess(msg)},
//onError
    { t-> handleError(t) }
)

问题是,主题也可能在这 60 秒内发送更新消息。所以我的 OnNext 看起来像:

{ msg -> when (msg){
      is ResultMessage -> handleSuccess(msg)
      is UpdateMessage -> handleUpdate(msg)    
    }
}

您看到了我的问题,接收 UpdateMessage 会重置我的超时。对我来说理想的方法是,如果我可以设置 5 秒的超时时间,但在它被武装之前有 60 秒的初始延迟。

我知道,我可以在订阅之外设置一个额外的普通计时器,但我希望有一个更优雅的反应式解决方案。

【问题讨论】:

  • 所以您只想等待 65 秒以获取特定类型的消息而不是 UpdateMessage 吗?
  • 必须在 65 秒内收到 ResultMessage,否则会触发超时。 UpdateMessages 是可选的,但如果收到则必须进行处理
  • UpdateMessages 不会重置ResultMessage 的超时时间?
  • 它会,主题发送“消息”类型的对象,可以是 UpdateMessage 或 ResultMessage。每次接收到的发射都会重置超时。我知道我可以在 subect 上订阅两次,检查相应订阅中的任一消息类型,并且只有一个等待 Result 超时工作。我只是在寻找更优雅的解决方案

标签: rx-java


【解决方案1】:

应该能够建立两个不同的订阅和过滤器:

subject
    .filter( /*is update*/ )
    .subscribe( /*...*/ );

subject
    .filter( /*is result*/ )
    .timeout( 65, TimeUnit.SECONDS )
    .subscribe( /*...*/ );

【讨论】:

  • 谢谢,我自己的解决方案与一个订阅相得益彰,但也使用了过滤器
【解决方案2】:

我最终得到的解决方案是在 doOnNext 中进一步处理 UpdateMessages 并将发出的消息过滤到 ResultMessage。这样,超时只会影响 ResultMessage 的发射。

        subject
            .doOnNext{msg ->
                    if (msg is UpdateMessage)
                        handleUpdate(msg)
            }
            .filter{ msg -> msg is ResultMessage }
            .timeout(65, TimeUnit.SECONDS)
            .subscribe(
                    // onNext
                    { msg -> handleSuccess(msg)},
                    //onError
                    { t-> handleError(t) }
            )

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-06-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多