【问题标题】:Scio/Apache beam, how to map grouped resultsScio/Apache Beam,如何映射分组结果
【发布时间】:2018-09-14 14:49:58
【问题描述】:

我有一个简单的管道,可以在固定窗口中从 pubsub 读取数据,解析消息并按特定属性对它们进行分组。但是,如果我在 groupBy 之后 map 我的函数似乎没有被执行。

我错过了什么吗?

sc.pubsubSubscription[String](s"projects/$project/subscriptions/$subscription")
  .withFixedWindow(Duration.standardSeconds(windowSeconds))
  .map(parseMessage)
  .groupBy(_.ip_address)
  .map(entry => log.info(s"${entry._1} was repeated ${entry._2.size} times"))

【问题讨论】:

  • 你能谈谈你是如何测试这个的吗?您可以使用 Beam 的 TestStream 接口对此进行测试。

标签: apache-beam spotify-scio


【解决方案1】:

我能够使用DirectRunner 和一个从 Pub/Sub 读取的简单管道重现该问题,使用消息的第一个单词作为键,应用 GroupByKey,然后记录条目。似乎 GBK 步骤等待所有数据到达,并且由于它是无限源,因此不会发出任何结果。对我有用的是定义一个带有触发的窗口策略,例如:

object PubSubTest {
  private lazy val log = LoggerFactory.getLogger(this.getClass)

  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)

    val defaultInputSub = "test_sub"
    val subscription = args.getOrElse("input", defaultInputSub)
    val project = "PROJECT_ID"

    sc.pubsubSubscription[String](s"projects/$project/subscriptions/$subscription")
      // provide window options including triggering
      .withFixedWindows(duration = Duration.standardSeconds(10), options = WindowOptions(
        trigger = Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
          .plusDelayOf(Duration.standardSeconds(2))),
        accumulationMode = AccumulationMode.ACCUMULATING_FIRED_PANES,
        closingBehavior = ClosingBehavior.FIRE_IF_NON_EMPTY,
        allowedLateness = Duration.standardSeconds(0))
      )
      // use first word of the Pub/Sub message as the key
      .keyBy(a => a.split(" ")(0))
      .groupByKey
      .map(entry => log.info(s"${entry._1} was repeated ${entry._2.size} times"))

    val result = sc.close().waitUntilFinish()
  }
}

【讨论】:

  • 太棒了!那行得通,谢谢!我想知道为什么数据流和直接运行器之间的行为不一致
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-11-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-07-12
相关资源
最近更新 更多