【问题标题】:FS2 join Cannot prove that Seq[fs2.Stream[cats.effect.IO,Int]] <:< fs2.Stream[cats.effect.IO,O2]FS2 join 不能证明 Seq[fs2.Stream[cats.effect.IO,Int]] <:< fs2.Stream[cats.effect.IO,O2]
【发布时间】:2018-01-03 18:40:53
【问题描述】:

我正在尝试使用 fs2 流 0.10.0-M9 和 doobie 版本 0.5.0-M9 从 http 调用中获取一系列对象,然后将其插入到 postgres 数据库中,但我遇到了问题构造此代码,得到以下错误:

错误:(49, 12) 无法证明 Seq[fs2.Stream[cats.effect.IO,Int]] <: fs2.stream .join>

一旦对 Web 服务的调用返回,我想要做的是同时运行插入语句。代码如下:

fetchProducts(date).map{items  =>
        items.map( i =>
          Stream.eval(upsertProductIO(i).transact(xa))
        )
      }
      .join(100)
      .run
      .unsafeRunSync()

//rest call
def fetchProducts(changeDate: String): Stream[IO, Seq[Product]] = {
//rest call here
}

//DAO code
def upsertProductIO(p: Product): ConnectionIO[Int] = {
  upsertProduct(p).run
}

【问题讨论】:

  • 何塞,您能不能说一下您使用的是哪些版本的库?版本之间似乎存在重大差异。
  • 添加版本 fs2 0.10.0-M9 和 doobie 0.5.0-M9

标签: scala scala-cats scalaz-stream fs2


【解决方案1】:

问题是你有一个Seq[Stream[IO, Seq[Product]],你想要的是一个Stream[IO, Seq[Product]],你可以用Foldable类型的类实例来做:

import cats.implicits._
import scala.concurrent.ExecutionContext.Implicits.global

fetchProducts(date).map { items =>
   items.map(i => Stream.eval(upsertProductIO(i).transact(xa)))
}.map(seqOfStream.toList.sequence)
 .join(100)
 .run
 .unsafeRunSync()

【讨论】:

  • 谢谢。我按照您的建议修改了代码,但出现了类似的错误:fetchProducts("2017-12-10T06:17:37:557-0400").map { items =&gt; items.map(i =&gt; Stream.eval(upsertProductIO(i).transact(ProductDao.xa))) }.map(is =&gt; is.toList.sequence) .join(100) .run .unsafeRunSync() Error:(38, 27) 无法证明 fs2.Stream[cats.effect.IO,Int] <: g> is.toList.sequence)
  • @JoseH.Martinez,你使用什么版本的 Scala 编译器? Yuval 的代码使用 scalaVersion := "2.11.11"scalacOptions += "-Ypartial-unification" 为我编译,并且在没有 -Ypartial-unification 的情况下失败,并出现与您的错误非常相似的错误。
  • 我在 scalaVersion := "2.12.4"
  • 你试过用 sbt 编译吗?
  • @JoseH.Martinez ?
猜你喜欢
  • 2020-08-09
  • 2021-10-19
  • 1970-01-01
  • 2020-11-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-11-28
  • 2019-06-14
相关资源
最近更新 更多