【问题标题】:Akka Streams ActorRefSource order of messagesAkka Streams ActorRefSource 消息顺序
【发布时间】:2016-08-31 06:32:59
【问题描述】:

我想通过使用 akka Streams 的 ActorRefSource 来构建一个项目序列。所述源被连续馈送数据。计算完成后,Stream 会被 Poison Pill 终止。

下面的简化示例说明了我的意图:

val source = Source.actorRef[Int](1000, OverflowStrategy.fail)
    .mapMaterializedValue{ ref =>
      for(i <- 1 to 1000) {
        ref ! i
      }

      ref ! PoisonPill
    }

    source.runWith(Sink.seq).foreach(s => println("count: "+s.size))

我期待 Stream 处理所有 1000 个元素,然后由于收到毒丸而终止。不幸的是,Stream 通常会更早终止。示例输出是:

count: 24

在发送毒丸之前等待一段时间,例如1000 毫秒会导致处理所有数字。

任何关于如何确保在收到毒丸之前所有物品都已处理的想法将不胜感激。

【问题讨论】:

    标签: scala akka akka-stream


    【解决方案1】:

    参见the documentation for Source.actorRef:PoisonPill 在终止流之前不会刷新缓冲区。

    【讨论】:

    • 好吧,所以为了确保所有项目在关机前得到处理,我应该传递一个 akka.actor.Status.Succes 的实例来代替?感谢您的澄清!
    猜你喜欢
    • 2018-09-29
    • 1970-01-01
    • 1970-01-01
    • 2021-06-19
    • 1970-01-01
    • 1970-01-01
    • 2018-02-18
    • 2021-07-13
    • 2017-06-24
    相关资源
    最近更新 更多