【发布时间】:2020-07-15 15:53:34
【问题描述】:
向(比我)更多 RxJava 技术熟练的人提出问题。
我订阅了一个 PublishSubject,它应该在 60 秒 + 5 秒的容差内发出结果消息。如果我没有及时收到,则认为是发送方有问题,我需要中止。
通常我会这样订阅它:
subject
.timeout(65, TimeUnit.SECONDS)
.subscribe(
// onNext
{ msg -> handleSuccess(msg)},
//onError
{ t-> handleError(t) }
)
问题是,主题也可能在这 60 秒内发送更新消息。所以我的 OnNext 看起来像:
{ msg -> when (msg){
is ResultMessage -> handleSuccess(msg)
is UpdateMessage -> handleUpdate(msg)
}
}
您看到了我的问题,接收 UpdateMessage 会重置我的超时。对我来说理想的方法是,如果我可以设置 5 秒的超时时间,但在它被武装之前有 60 秒的初始延迟。
我知道,我可以在订阅之外设置一个额外的普通计时器,但我希望有一个更优雅的反应式解决方案。
【问题讨论】:
-
所以您只想等待 65 秒以获取特定类型的消息而不是
UpdateMessage吗? -
必须在 65 秒内收到 ResultMessage,否则会触发超时。 UpdateMessages 是可选的,但如果收到则必须进行处理
-
UpdateMessages不会重置ResultMessage的超时时间? -
它会,主题发送“消息”类型的对象,可以是 UpdateMessage 或 ResultMessage。每次接收到的发射都会重置超时。我知道我可以在 subect 上订阅两次,检查相应订阅中的任一消息类型,并且只有一个等待 Result 超时工作。我只是在寻找更优雅的解决方案
标签: rx-java