【问题标题】:Using Spark Accumulators with Structured Streaming将 Spark 累加器与结构化流结合使用
【发布时间】:2020-05-14 00:28:49
【问题描述】:

在我的结构化流作业中,我在 updateAcrossEvents 方法中更新 Spark Accumulators,但当我尝试在我的 StreamingListener 中打印它们时,它们始终为 0。代码如下:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )

编辑:更多信息以更详细地描述问题...

累加器在“updateAcrossEvents”中递增。我有一个 StreamingListener,它在 'onQueryProgress' 方法中写入累加器的值,但在这种方法中,累加器 始终为零!

当我在 updateAcrossEvents 中添加日志语句时,我可以看到这些累加器按预期递增。

这仅在我以“集群”模式运行时发生。在本地模式下它工作正常,这意味着累加器没有正确分配 - 或类似的东西!

注意:我在网上看到很多答案告诉我执行“操作”。这不是一个解决方案。这是一项“有状态结构化流”工作。是的,我也在 SparkContext 中“注册”它们。

【问题讨论】:

    标签: apache-spark spark-streaming spark-structured-streaming


    【解决方案1】:

    您是否尝试在调用操作之前在转换操作(地图)之外打印它?如果是这样,它将为 0,因为 spark 正在使用延迟执行。在您对数据集调用操作之前,不会调用地图操作中的代码。

    df.map{ x => accum.add(x); x }.count
    println(accum.value)
    

    这会奏效。

    【讨论】:

    • 这个累加器被传递给从 StreamingQueryListener 扩展的另一个类。我正在尝试在此侦听器中打印值。
    • @DilTeam 你找到解决方案了吗?
    猜你喜欢
    • 2018-01-24
    • 2017-01-09
    • 2019-03-03
    • 1970-01-01
    • 2017-05-04
    • 2020-01-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多