【问题标题】:Akka Timeout exception but messages actually sentAkka 超时异常,但消息实际发送
【发布时间】:2020-06-16 07:04:49
【问题描述】:

我正在使用具有以下技术的 Scala 2.13 堆栈:

  • 玩!框架 2.8
  • akka 键入 2.6.3
  • alpakka kafka 2.0.3

Akka-stream 作业从 Kafka 读取事件,要求 Actor 进行计算,并根据给定的响应,将新事件生成回 Kafka。

问题是使用 ask 模式发送的消息似乎被 QuestionActor(如下)仅当其邮箱收集到至少两条消息并且每条消息仅接收一条时才使用

奇怪的行为是:

t1

ref ? Question("tr1", 1, None, actorRef)
> AskTimeoutException(tr1)

t2

ref ? Question("tr2", 1, None, actorRef)
> [INFO] - Question request for tr1-1. Processing.
> AskTimeoutException(tr2)

t3

ref ? Question("tr3", 1, None, actorRef)
> [INFO] - Question request for tr2-1. Processing.
> AskTimeoutException(tr3)

然后我试图了解我为什么会观察到这种行为以及我做错了什么

akka-stream Kafka 管道是:

Consumer
  .plainSource(consumerSettings, subscription)
  .map(DeserializeEvents.fromService)
  .filter(_.eventType == classOf[Item].getName)
  .via(askFlowExplicit)
  .withAttributes(ActorAttributes.supervisionStrategy(decider()))
  .map(
    response =>
      new ProducerRecord[String, OutputItem](
        topics,
        OutputItem(response.getClass.getName, response)
      )
  )
  .log("Kafka Pipeline")
  .runWith(Producer.plainSink(producerSettings))

决策者是一种监督策略,在SerialisationTimeout 异常时恢复工作; askFlowExplicit 向外部参与者声明了一个询问请求,并且 - 在此 - 我遇到了我的问题。

val askFlowExplicit =
  ActorFlow.ask[OutputItem, Question, Answer](askTarget) {
    case (envelope, replyTo) =>
      val item = Serdes.deserialize[Item](envelope.payload)
      Question(item.trID, item.id, item.user, replyTo)
  }

管道在 Play 上启动!应用程序引导

@Singleton
class ApplicationStart @Inject()(
    configuration: Configuration,
    questionActor: ActorRef[QuestionActor.Question]
) {
  private implicit val logger = Logger.apply(getClass)
  implicit val mat            = context
  AlpakkaPipeline.run(configuration, questionActor)
}

actor 是属于同一个actor 系统的简单类型actor,并且 - 现在 - 它只是将来自流的请求转发到另一个服务。

class QuestionActor(
    configuration: Configuration,
    context: ActorContext[Question],
    itemService: ItemService
) extends AbstractBehavior[Question](context) {
  import QuestionActor._

  implicit val ec: ExecutionContextExecutor = context.executionContext
  private implicit val timeout: Timeout = ...

  override def onMessage(msg: Question): Behavior[Question] = Behaviors.receive[Question] {
    case (context, Question(trID, id, user, sender)) =>
      log.info(s"Question request for ${msg.trID}-${msg.id}. Processing.")
        itemService
          .action(id, user)
          .onComplete {
            case Success(result) if result.isEmpty =>
              log.info("Action executed")
              msg.replyTo ! NothingHappened(trID, id)
            case Failure(e) =>
              log.error("Action failed.", e)
              msg.replyTo ! FailedAction(trID, id, user, e.getMessage)
          }
      Behaviors.same
  }
}

object QuestionActor {
  final case class Question(
      trID: String,
      id: Int,
      user: Option[UUID],
      replyTo: ActorRef[Answer]
  )

  def apply(itemService: ItemService, configuration: Configuration): Behavior[Question] =
    Behaviors.setup { context =>
      context.setLoggerName(classOf[QuestionActor])
      implicit val log: Logger = context.log
      new QuestionActor(configuration, context)
    }
}

它是使用运行时 DI 和 Play 构建的!

class BootstrapModule(environment: Environment, configuration: Configuration)
    extends AbstractModule
    with AkkaGuiceSupport {

  override def configure(): Unit = {
    bind(new TypeLiteral[ActorRef[CloneWithSender]]() {})
      .toProvider(classOf[QuestionActorProvider])
      .asEagerSingleton()
    bind(classOf[ApplicationStart]).asEagerSingleton()
  }
}

private class Question @Inject()(
    actorSystem: ActorSystem,
    itemService: ItemService,
    configuration: Configuration
) extends Provider[ActorRef[Question]] {
  def get(): ActorRef[Question] = {
    val behavior = QuestionActor(itemService, configuration)
    actorSystem.spawn(behavior, "question-actor")
  }
}

我尝试了什么

  • 将调度程序更改为QuestionActor
  • 将邮箱更改为QuestionActor
  • QuestionActor 内运行管道
  • 从actor构造函数发送相同的消息(到self),观察到相同的行为:另一条消息将触发actor消耗前者,后者请求超时。

我没有做的事

  • 将调度程序更改为 Akka 流管道

现在在我看来这是一个线程问题,但我不知道从哪里开始。 非常感谢任何帮助。提前谢谢你。

【问题讨论】:

  • 您能否通过仅向参与者发送消息而不将其集成到流程中来重现问题?
  • 很难看出问题出在哪里。但是你定义了太多的上下文。尽量不要extends AbstractBehavior[Question](context),避免将context作为构造函数参数传递,只使用Behaviors.setup + receiveMessage
  • 嗨,Ivan,是的,我尝试从 actor 构造函数中向 self 发送相同的消息,我可以观察到相同的行为。 (即使我不使用询问)。我更新了问题。
  • 会尽量避免AbstractBehavior
  • 尝试用sender ! NothingHappened(trID, id)替换msg.replyTo ! NothingHappened(trID, id)

标签: scala playframework akka akka-stream alpakka


【解决方案1】:

问题是您正在组合提供onMessageAbstractBehavior 并在那里定义一个新的Behaviors.receive[Question] 行为。您必须使用其中之一。

删除Behaviors.receive如下

  override def onMessage(msg: Question): Behavior[Question] = {
      log.info(s"Question request for ${msg.trID}-${msg.id}. Processing.")
        itemService
          .action(id, user)
          .onComplete {
            case Success(result) if result.isEmpty =>
              log.info("Action executed")
              msg.replyTo ! NothingHappened(trID, id)
            case Failure(e) =>
              log.error("Action failed.", e)
              msg.replyTo ! FailedAction(trID, id, user, e.getMessage)
          }
      Behaviors.same
  }
}

AbstractBehavior.onMessage 是一种行为的实现。因此,您通过方法参数收到一条消息,您应该处理它并在您的情况下返回一个新的BehaviourBehaviours.same

但不是处理消息,而是使用Behaviors.receive 创建一个新的Behaviour 并将Future 的回调注册到原始的第一条消息。因此,当第二条消息到达时,您会看到日志语句,这会触发新的行为。

如果你想使用 FP 风格定义,你必须坚持使用Behaviors.xxx 辅助方法。如果您选择 OOP 样式,则扩展 AbstractBehavior。但你不应该两者都做。

【讨论】:

  • 是的,似乎可行:我通过在 Behaviors.setup 中定义行为,转移到了 FP 风格的演员实例。但我无法完全理解:为什么使用onMessage 定义行为会产生问题?
  • 相当整洁!再次感谢您!