【问题标题】:Alpakka MongoDB - specify type in MongoSourceAlpakka MongoDB - 在 MongoSource 中指定类型
【发布时间】:2018-08-18 22:17:04
【问题描述】:

我目前正在使用 Akka Streams 和 Alpakka MongoDB connector

是否可以指定MongoSource的类型?

val codecRegistry = fromRegistries(fromProviders(classOf[TodoMongo]), DEFAULT_CODEC_REGISTRY)
  private val todoCollection: MongoCollection[TodoMongo] = mongoDb
    .withCodecRegistry(codecRegistry)
    .getCollection("todo")

我想做这样的事情:

val t: FindObservable[Seq[TodoMongo]] = todoCollection.find()
MongoSource(t) // Stuck here

但我收到以下错误:

Expected Observable[scala.Document], Actual FindObservable[Seq[TodoMongo]].

我找不到关于这部分的正确文档。

【问题讨论】:

标签: mongodb scala akka akka-stream alpakka


【解决方案1】:

这个还没有发布,但是在Alpakka的master分支中,MongoSource.apply带了一个类型参数:

object MongoSource {
  def apply[T](query: Observable[T]): Source[T, NotUsed] =
    Source.fromPublisher(ObservableToPublisher(query))
}

因此,随着 Alpakka 即将发布的 0.18 版本,您将能够执行以下操作:

val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())

注意这里的source 假设todoCollection.find() 返回一个Observable[TodoMongo];根据需要调整类型。

与此同时,您可以简单地手动添加上述代码。例如:

package akka.stream.alpakka.mongodb.scaladsl

import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable

object MyMongoSource {
  def apply[T](query: Observable[T]): Source[T, NotUsed] =
    Source.fromPublisher(ObservableToPublisher(query))
}

注意MyMongoSource 被定义为驻留在akka.stream.alpakka.mongodb.scaladsl 包中(如MongoSource),因为ObservableToPublisher 是包私有类。你可以像使用MongoSource一样使用MyMongoSource

val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find()) 

【讨论】:

  • 很好的答案。正在寻找这样的东西。但是如何使用 MongoSource 的以下 apply 方法手动添加类型呢? def apply(query: Observable[Document]): Source[Document, NotUsed]
  • 与 Alpakka 0.18 完美搭配。
猜你喜欢
  • 2018-08-20
  • 1970-01-01
  • 2020-06-21
  • 2022-07-28
  • 2021-12-09
  • 2010-12-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多