【发布时间】:2015-07-26 06:37:57
【问题描述】:
我有一个 RESTful API,它接收一组 JSON 消息,这些消息将被转换为单独的 Avro 消息,然后发送到 Kafka。在路由内部,我调用了 3 个不同的参与者:1)一个参与者出去并从磁盘检索 Avro 模式 2)然后循环遍历 JSON 消息数组并将其与第二个参与者中的 Avro 模式进行比较。如果任何消息未验证,那么我需要将响应返回给 API 的调用者并停止处理。 3) 遍历数组并传递给第三个参与者,该参与者接受 JSON 对象,将其转换为 Avro 消息并发送到 Kafka 主题。
如果其中一位演员出现故障,我遇到的问题是如何停止路线中的处理。我将请求上下文传递给每个参与者并调用它的完整方法,但它似乎并没有立即停止,即使它不应该下一个参与者仍在处理。这是我在路线中所做的代码sn-p:
post {
entity(as[JsObject]) { membersObj =>
requestContext =>
val membersJson = membersObj.fields("messages").convertTo[JsArray].elements
val messageService = actorRefFactory.actorOf(Props(new MessageProcessingServicev2()))
val avroService = actorRefFactory.actorOf(Props(new AvroSchemaService()))
val validationService = actorRefFactory.actorOf(Props(new JSONMessageValidationService()))
implicit val timeout = Timeout(5 seconds)
val future = avroService ? AvroSchema.MemberSchema(requestContext)
val memberSchema:Schema = Await.result(future, timeout.duration).asInstanceOf[Schema]
for (member <- membersJson) validationService ! ValidationService.MemberValidation(member.asJsObject, memberSchema, requestContext)
for (member <- membersJson) (messageService ! MessageProcessingv2.ProcessMember(member.asJsObject, topicName, memberSchema, requestContext))
我浏览了很多关于这个主题的博客/书籍/幻灯片,但不确定最好的方法是什么。我已经使用 Scala/Akka 大约 2 个月了,基本上是自学了我一直需要的部分。因此,对于经验更丰富的 Scala/Akka/Spray 开发人员在这方面的任何见解,我们都非常感激。我的想法是将 3 个演员包装在一个“大师”演员中,并让每个演员都成为该演员的孩子,并尝试像那样接近它。
【问题讨论】: