【问题标题】:RxJava: blocking until retryRxJava:阻塞直到重试
【发布时间】:2014-08-26 11:38:04
【问题描述】:

我正在尝试学习响应式编程,作为第一个“真正的”应用程序,我选择了一个简单的 IRC 客户端,带有 RxJavaRxNetty

如果第一个服务器失败,我目前正在重试另一台服务器。所以,我有一个Observable<Server>,它由网络的服务器组成。它可以使用.repeat() 进行设置,因此它会无限期地重复服务器。现在,我如何使它成为一个阻塞的,以便一次只使用一个,并且只有在 RxClient::connect 失败或连接超时时才会转到下一个?

虽然其他解决方案是使用域,每次连接时都会将我重定向到不同的服务器并仅使用 .retry(),但我有兴趣以被动方式解决问题。

连接到所有服务器(没有.repeat())工作正常,但这不是我想要的:

servers
    .map(x -> RxNetty.createTcpClient(
        x.getAddress(),
        x.getPort(),
        PipelineConfigurators.stringMessageConfigurator()))
    .flatMap(RxClient::connect)
    .onErrorFlatMap(x -> Observable.empty())
    .subscribe(this::handleConnection);

【问题讨论】:

  • 您是否必须将服务器列表设为Observable<Server>?我认为,最好只列出一个列表,当上一个失败时,您可以从中选择下一个。
  • 不,我只是认为这很有意义。是的,一个清单可能会更好。我实际上已经写了一些东西,但这还不是一个完整的解决方案。进度有点慢,但我希望我能在某个时候回答这个问题。

标签: reactive-programming rx-java


【解决方案1】:

使用concatMap 代替flatMap 怎么样。因此,只有在RxClient::connect 完成或出错时才会使用下一个服务器:

servers
    .map(x -> RxNetty.createTcpClient(
        x.getAddress(),
        x.getPort(),
        PipelineConfigurators.stringMessageConfigurator()))
    .concatMap(RxClient::connect)
    .onErrorFlatMap(x -> Observable.empty())
    .subscribe(this::handleConnection);

【讨论】:

  • 谢谢,看来我忽略了flatMap 的文档,直到最近我才知道concatMap 的存在。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-03-18
  • 1970-01-01
  • 2021-07-30
  • 2015-02-07
相关资源
最近更新 更多