【问题标题】:Bulk insert / Insert many with Play Framework, ReactiveMongo使用 Play 框架、ReactiveMongo 批量插入/插入多个
【发布时间】:2016-07-04 15:09:31
【问题描述】:

我正在使用 Play Framework 2.5.0 和 ReactiveMongo 构建一个应用程序,我花了很多时间,卡住了,在大多数网络语言中很容易做到的事情上。

那个东西是一次插入许多文档。 为此,我必须使用 ReactiveMongo 函数bulkInsert

我发现 this google group 有一个非常简单的例子,但是它是从 2013 年开始的,现在签名改变了

来自

def bulkInsert[T](enumerator: Enumerator[T]) 

def bulkInsert(documents: Stream[P.Document], ordered: Boolean, writeConcern: WriteConcern)(implicit ec: ExecutionContext): Future[MultiBulkWriteResult]

所以在这里我尝试以这个例子为例,并找到一种将 Enumerator 转换为 Stream 的方法(没有找到任何方法):

val schemasDocs: Seq[JsObject] = {
  jsonSchemas.fields.map {
    case (field, value) => Json.obj(field -> value)
  }
}
val enumerator = Enumerator.enumerate(schemasDocs)
val schemasStream = Source.fromPublisher(Streams.enumeratorToPublisher(enumerator)) // my attempt to turn enumerator into a Stream
val schemasInsert = {
  getCollection("schemas").flatMap(
    _.bulkInsert(schemasStream, true)
  )
}

现在我发现自己潜入了 Akka、ReactiveMongo 和 Play API 以尝试从 JsObjects 的 Seq 创建一个 JsObjects 流。

然后我尝试了另一种方法:来自 ReactiveMongo 网站的 example

val bulkDocs = schemasDocs.map(implicitly[collection.ImplicitlyDocumentProducer](_))
collection.bulkInsert(ordered=true)(bulkDocs: _*)

给我一​​个难以调试的错误:

type mismatch; found : Seq[reactivemongo.play.json.collection.JSONCollection#ImplicitlyDocumentProducer] required: Seq[x$48.ImplicitlyDocumentProducer]

我宁愿不使用 Streams 而使用第二种解决方案,因为我不喜欢在我的代码中包含我不理解的东西..

【问题讨论】:

  • 请查看examples
  • 我仍然遇到同样的错误:类型不匹配;发现:Seq[reactivemongo.play.json.collection.JSONCollection#ImplicitlyDocumentProducer] 需要:Seq[x$12.ImplicitlyDocumentProducer],我真的不明白
  • 您的代码不够完整,无法理解
  • 我还遇到了 ImplicitlyDocumentProducer 的“类型不匹配”问题,我通过使用隐式 Json 格式化程序添加相应的对象来解决这个问题。 object Person {implicit val formatter = Json.format[FuelStation]}
  • 对于 json implicit val formatter = Json.format[FuelStation] 或对于 bson : implicit val writer = Macros.writer[Person]

标签: mongodb scala playframework reactivemongo play-reactivemongo


【解决方案1】:

我刚刚找到了如何处理 bulkInsert。有一个例子

build.sbt

...
libraryDependencies ++= Seq(
  "org.reactivemongo" %% "play2-reactivemongo" % "0.11.14"
)
...

plugins.sbt

addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.5.12")

CxTransactionsRepository.scala

package cx.repository

import cx.model.CxTransactionEntity
import play.modules.reactivemongo.ReactiveMongoApi
import reactivemongo.play.json.collection.JSONCollection

import scala.concurrent.{ExecutionContext, Future}

class CxTransactionsRepository @Inject()(val reactiveMongoApi: ReactiveMongoApi)(implicit ec: ExecutionContext){

  private val cxTransactionsCollectionFuture: Future[JSONCollection] = reactiveMongoApi.database.map(_.collection[JSONCollection]("cxTransactions"))

  def bulkInsert(seq: Seq[CxTransactionEntity]): Future[Int] = {
    for {
      transactions <- cxTransactionsCollectionFuture
      writeResult <- transactions.bulkInsert(ordered = false)(seq.map(implicitly[transactions.ImplicitlyDocumentProducer](_)): _*)
    } yield {
      writeResult.n
    }
  }

}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-12
    • 1970-01-01
    相关资源
    最近更新 更多