【问题标题】:Test groupBy with akka-stream-testkit使用 akka-stream-testkit 测试 groupBy
【发布时间】:2017-10-13 12:31:28
【问题描述】:

我无法让以下代码工作,它可以编译,但我收到错误 Expected OnNext(_), yet no element signaled during 3 seconds

为什么会出现这个错误,我需要做什么来测试类型 下面的流量?

class GeneralTests extends FunSuite {

  implicit val system = ActorSystem("Test-System")
  implicit val materializer = ActorMaterializer()

  case class Wid(id: Int, v: String)

  test("Substream with folds") {
    val (pub, sub) = TestSource.probe[Wid]
    .groupBy(Int.MaxValue, _.id)
      .fold("")((a: String, b: Wid) => a + b.v)
      .grouped(2)
    .mergeSubstreams
    .toMat(TestSink.probe[Seq[String]])(Keep.both)
    .run()

    sub.request(5)
    pub.sendNext(Wid(1,"1"))
    pub.sendNext(Wid(2,"2"))
    sub.expectNext()
    pub.sendNext(Wid(3,"3"))
    pub.sendNext(Wid(4,"4"))
    pub.sendNext(Wid(5,"5"))
    sub.expectNext()
    sub.expectNext()
  }
}

更新:解释我尝试构建的真实流: 我的用例是一个连续(永无止境)的元素流,我需要将其拆分为子流,然后我可以在给定每个传入消息的情况下并行处理这些子流。这就是我现在拥有的,但 groupBy 很棘手。例如有没有办法清理子流?由于这是一个永无止境的流,因此可能会积累大量需要清理的子流。

class GeneralTests extends UnitSpec {

  implicit val system = ActorSystem("Test-System")
  implicit val materializer = ActorMaterializer()

  case class Wid(id: Int, v: String)

  val flow = Flow[Wid]
    .map { s ⇒ println(Thread.currentThread().getName() + " ASYNC " + s); s }
    .scan("")((a: String, b: Wid) => a + b.v)
    .sliding(2, step = 1)

  test("Parallel group-by with state") {
    val (pub, sub) = TestSource.probe[Wid]
    .map { s ⇒ println(Thread.currentThread().getName() + " BEFORE " + s); s }
    .groupBy(Int.MaxValue, _.id)
      .via(flow).async
    .mergeSubstreams
    .map { s ⇒ println(Thread.currentThread().getName() + " AFTER " + s); s }
    .toMat(TestSink.probe[Seq[String]])(Keep.both)
    .run()

sub.request(n = 4)
pub.sendNext(Wid(1,"1"))
println(sub.requestNext())
pub.sendNext(Wid(2,"2"))
println(sub.requestNext())
pub.sendNext(Wid(1,"3"))
println(sub.requestNext())
pub.sendNext(Wid(2,"4"))
println(sub.requestNext())
  }
}

但是在某些地方,异步操作在输出中显示的同一线程 (6) 上运行。

Test-System-akka.actor.default-dispatcher-3 BEFORE Wid(1,1)
Test-System-akka.actor.default-dispatcher-4 ASYNC Wid(1,1)
Test-System-akka.actor.default-dispatcher-3 AFTER Vector(, 1)
Vector(, 1)
Test-System-akka.actor.default-dispatcher-4 BEFORE Wid(2,2)
Test-System-akka.actor.default-dispatcher-2 ASYNC Wid(2,2)
Test-System-akka.actor.default-dispatcher-2 AFTER Vector(, 2)
Vector(, 2)
Test-System-akka.actor.default-dispatcher-6 BEFORE Wid(1,3)
Test-System-akka.actor.default-dispatcher-6 ASYNC Wid(1,3)
Test-System-akka.actor.default-dispatcher-6 AFTER Vector(1, 13)
Vector(1, 13)
Test-System-akka.actor.default-dispatcher-6 BEFORE Wid(2,4)
Test-System-akka.actor.default-dispatcher-2 ASYNC Wid(2,4)
Test-System-akka.actor.default-dispatcher-6 AFTER Vector(2, 24)
Vector(2, 24)

【问题讨论】:

  • 我不确定我是否理解这段代码的用途。由于流首先由 _.id 分组,在发出前 2 个 Wid 对象(id 为 1 和 2)之后,还没有发出任何内容,因为我们不知道在流传递到fold。因此,从这段代码中,我确实不希望在发送前 2 条消息后在订阅者中收到消息。您能否详细说明您希望发生的事情?
  • @Arnout Engelen 这个例子现在更新了一个更好的例子,有没有cmets?

标签: scala unit-testing akka akka-stream


【解决方案1】:

要让fold 阶段发出其最终值,发布者(即上游)需要完成。

有效的序列如下所示:

sub.request(5)
pub.sendNext(Wid(1,"1"))
pub.sendNext(Wid(2,"2"))
pub.sendNext(Wid(3,"3"))
pub.sendNext(Wid(4,"4"))
pub.sendNext(Wid(5,"5"))
pub.sendComplete()
sub.expectNext()
sub.expectNext()
sub.expectNext()
sub.expectNext()
sub.expectNext()
sub.expectComplete()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-10-25
    • 2018-05-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多