【问题标题】:How to update SPARK broadcast value on worker node?如何更新工作节点上的 SPARK 广播值?
【发布时间】:2018-01-23 02:59:15
【问题描述】:

我有一个从数据库中获取的广播值。我在驱动主控上定义广播

val stampsBroadcast = ssc.sparkContext.broadcast(stampListMap)

此值 (stampsBroadcast.value) 用于工作节点执行程序。一旦执行者完成任务(向数据库添加新密钥)。我需要更新广播值来添加这个新键。

我尝试使用:

stampsBroadcast.unpersist(false)
ssc.sparkContext.broadcast(NewstampsBroadcastValue)

但我似乎无法在工作节点上使用ssc。如果我在driver master上重新广播,如何从worker节点获取新数据?

【问题讨论】:

    标签: java apache-spark broadcast


    【解决方案1】:

    您不能从工作节点创建广播变量。

    就你而言,基本上你需要Accumulators。在 Driver 上定义累加器。在工作节点上,您可以更新累加器值。同样,您可以在 Driver 上获取更新后的值。

    注意:您无法在工作节点上检索累加器的值。只能从工作节点更新值。

    下面是 spark 文档的示例:

    // creating the accumulator on driver
    scala> val accum = sc.longAccumulator("My Accumulator")
    accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
    
    // updating the accumulator on worker nodes
    scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
    ...
    10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
    
    // fetaching the value
    scala> accum.value
    res2: Long = 10
    

    【讨论】:

      【解决方案2】:

      首先:将 spark 更新到 spark2.3

      第二:将stampListMap制作成文件流(sparkStream.readStream.textFile('/../my.txt')...load()),文件流可以自行更新内容。如果现在你使用stream join static,你可以在spark2.3+中使用stream join stream。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2017-04-02
        • 2016-05-26
        • 2021-01-05
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2023-04-01
        相关资源
        最近更新 更多