【发布时间】:2016-04-17 09:37:59
【问题描述】:
我正在将旧的 Casbah Mongo 驱动程序迁移到新的异步 Scala 驱动程序,我正在尝试在 Akka 流中使用它,但流卡住了。
我有一个定义了 createLogic() 的 GraphStage。代码如下。这与 Casbah 工作得很好,我希望新的 mongo 驱动程序的异步特性非常适合,但是这里会发生什么......
如果我通过此代码流式传输 2 条记录,则第一条记录流过并触发下一步。请参阅下面的输出('HERE IN SEND' 确认它通过了)。第二条记录似乎通过了 BlacklistFilter 中的正确步骤,但 Akka 从未流向 SEND 步骤。
任何想法为什么这不适用于新驱动程序?
object BlacklistFilter {
type FilterShape = FanOutShape2[QM[RenderedExpression], QM[RenderedExpression], QM[Unit]]
}
import BlacklistFilter._
case class BlacklistFilter(facilities: Facilities, helloConfig: HelloConfig)(implicit asys: ActorSystem) extends GraphStage[FilterShape] {
val outPass: Outlet[QM[RenderedExpression]] = Outlet("Pass")
val outFail: Outlet[QM[Unit]] = Outlet("Fail")
val reIn: Inlet[QM[RenderedExpression]] = Inlet("Command")
override val shape: FilterShape = new FanOutShape2(reIn, outPass, outFail)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
override def preStart(): Unit = pull(reIn)
setHandler(reIn, new InHandler {
override def onPush(): Unit = {
val cmd = grab(reIn)
val re: RenderedExpression = cmd.body
val check = re.recipient.contacts(re.media).toString
// NEW NON-BLOCKING CODE
//-------------------------------------
facilities.withMongo(helloConfig.msgDB, helloConfig.blacklistColl) { coll =>
var found: Option[Document] = None
coll.find(Document("_id" -> check)).first().subscribe(
(doc: Document) => {
found = Some(doc)
println("BLACKLIST FAIL! " + check)
emit(outFail, cmd)
// no pull() here as this happens on complete below
},
(e: Throwable) => {
// Log something here!
emit(outFail, cmd)
pull(reIn)
},
() => {
if (found.isEmpty) {
println("BLACKLIST OK. " + check)
emit(outPass, cmd)
}
pull(reIn)
println("Pulled reIn...")
}
)
}
// OLD BLOCKING CASBAH CODE THAT WORKED
//-------------------------------------
// await(facilities.mongoAccess().mongo(helloConfig.msgDB, helloConfig.blacklistColl)(_.findOne(MongoDBObject("_id" -> check)))) match {
// case Some(_) => emit(outFail, cmd)
// case None => emit(outPass, cmd)
// }
// pull(reIn)
}
override def onUpstreamFinish(): Unit = {} // necessary for some reason!
})
setHandler(outPass, eagerTerminateOutput)
setHandler(outFail, eagerTerminateOutput)
}
}
输出:
BLACKLIST OK. jsmith@yahoo.com
Pulled reIn...
HERE IN SEND (TemplateRenderedExpression)!!!
ACK!
BLACKLIST OK. 919-919-9119
Pulled reIn...
您可以从输出中看到第一条记录很好地流向了 SEND/ACK 步骤。第二条记录打印了 BLACKLIST 消息,这意味着它发出 outPass 然后在 reIn 上调用 pull ...但是下游没有任何反应。
任何人都知道为什么这在 Akka Streams 中的工作方式与正常工作的 Casbah 版本不同(显示的代码已注释掉)?
(我可以将 Mongo 调用转换为 Future 并在其上等待,这应该像旧代码一样工作,但这有点违背了异步的全部意义!)
【问题讨论】:
标签: mongodb scala akka-stream