【问题标题】:Asynchronous call for every item inside a collection对集合中每个项目的异步调用
【发布时间】:2017-08-31 06:55:53
【问题描述】:

我有一个到目前为止我无法解决的问题 我是 RxKotlin 的新手,所以它可能很容易。请看代码:

    override fun infos(): Stream<Info> =
        client.infoAboutItem(identifier)
                .map {
                    val itemId = it.itemId ?: ""
                    val item = client.itemForId(itemId)
                    ClientInfo(client, it, source, item) as Info
                }
                .let { AccessStream(it) }

stream 是我们自制的集合。 Map 是一种允许您遍历该集合中的每个项目的方法。

这里的问题是

 client.itemForId(itemId)

是一个 http 调用,它返回一个不理想的 Single。

我想在 map 中创建一个异步调用,该调用将返回 Item 而不是 Single,然后将其传递给 ClientInfo。到目前为止,我尝试过的是在地图内使用订阅并使用 blockingGet() 方法,但这会阻塞主线程,即使我在不​​同的线程上观察和订阅

因此,它涉及对集合中的所有内容进行异步调用。

感谢您的帮助

【问题讨论】:

  • 是否需要返回Stream&lt;Info&gt; 或者可以更改?
  • 不幸的是,必须这样做,很多事情都希望返回这个集合
  • 可能是Single&lt;Stream&lt;Info&gt;&gt;Observable&lt;Stream&lt;Info&gt;&gt;
  • 可能吗?为什么你有解决方案?
  • 看看我的回答,也许对你有帮助

标签: java kotlin rx-java rx-kotlin


【解决方案1】:

您可以尝试返回Observable&lt;Stream&lt;Info&gt;&gt;,然后它看起来像:

   override fun infos(): Observable<Stream<Info>> = 
                Observable.from(client.infoAboutItem(identifier))
                        .flatMapSingle {
                            val itemId = it.itemId ?: ""
                            client.itemForId(itemId)
                        }
                        .map { 
                            ClientInfo(client, it, source, item) as Info
                         }
                        .toList()
                        .flatMap {
                            AccessStream(it)
                        }

【讨论】:

  • 我认为ClientInfo()的构造周围的flatMap()应该是map(),因为构造函数返回一个Info而不是Observable&lt;Info&gt;
  • 是的,你是对的!或者可以是带有Observable.just() 的flatMap,但这里的地图更好。谢谢,将编辑
【解决方案2】:

您应该将这些昂贵的操作包装到 observable 中,并使用平面地图将这些数据压缩到 Client Info 中。

我写了一个小样本来炫耀一下。

class SimpleTest {
  val testScheduler = TestScheduler()

  @Test
  fun test() {
    infos().observeOn(Schedulers.immediate())
        .subscribe { logger("Output", it.toString()) }

    testScheduler.advanceTimeBy(10, TimeUnit.MINUTES)
  }

  fun infos(): Single<List<ClientInfo>> {
    return Observable.from(infoAboutItem("some_identifier"))
        .doOnNext { logger("Next", it.toString()) }
        .flatMap { aboutItem ->
          Observable.fromCallable { itemForId(aboutItem.itemId) }
              .subscribeOn(testScheduler)
              .map { ClientInfo(aboutItem = aboutItem, item = it) }
        }
        .doOnNext { logger("Next", it.toString()) }
        .toList()
        .toSingle()
  }

  data class ClientInfo(
      val id: String = UUID.randomUUID().toString(),
      val aboutItem: AboutItem,
      val item: Item
  )

  data class AboutItem(val itemId: String = UUID.randomUUID().toString())
  data class Item(val id: String = UUID.randomUUID().toString())

  fun infoAboutItem(identifier: String): List<AboutItem> {
    return (1..10).map { AboutItem() }
  }

  fun itemForId(itemId: String): Item {
    val sleepTime = Random().nextInt(1000).toLong()
    Thread.sleep(sleepTime)
    return Item()
  }

  fun logger(tag: String, message: String): Unit {
    val formattedDate = Date(Schedulers.immediate().now()).format()
    System.out.println("$tag @ $formattedDate: $message")
  }

  fun Date.format(): String {
    return SimpleDateFormat("HH:mm:ss.SSS", Locale.US).format(this)
  }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-12-28
    • 1970-01-01
    • 1970-01-01
    • 2013-07-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多