【发布时间】:2020-06-10 00:21:26
【问题描述】:
我正在尝试将 Monix Task 与 mongo-scala-driver 一起使用。我有点理解Error Handling
val mongoClient: Resource[Task, MongoConnection[Task, DomainModel]] =
MongoTypedConnection.create[Task, DomainModel](
"mongodb:...&authMechanism=SCRAM-SHA-1"
)
mongoClient.use { client =>
val changeStream: Task[ChangeStreamObservable[DomainModel]] =
for {
collection <- client.getMongoCollection("myDatabase", "myCollection")
changes <- client.watchCollection(collection)
} yield changes
...
...
...
.as(ExitCode.Success)
}
当没有错误时,这非常有效。我想为此添加错误处理(例如处理不正确的database 和collection 名称)。我基于文档的初步尝试是尝试:
val changeObs: io.Serializable =
Await.result(changeStream
.onErrorHandleWith {
case _: TimeoutException =>
// Oh, we know about timeouts, recover it
Task.now("Recovered!")
case other =>
// We have no idea what happened, raise error!
Task.raiseError(other)
}.runToFuture, 5.seconds)
但这给了我一个io.Serializable。如何在保留ChangeStreamObservable[DomainModel] 的同时进行某种简洁的错误处理?感谢我可以研究的任何模式的指针。
BR
【问题讨论】: