【问题标题】:Issue with broadcast in spark-streaming checkpoint火花流检查点中的广播问题
【发布时间】:2019-07-09 19:08:39
【问题描述】:

我必须每天使用一次加载文件并在我的 spark 流中使用它。为此,我正在尝试读取文件并广播它。下面是正在使用的代码。

    def loadCustomer(sc: SparkContext, customerFilePath: String) = {
      val customerList: Set[String] = if (customerFilePath.isEmpty) Set()
      else {
        sc.textFile(customerFilePath).collect().toSet
      }
      customerList
    }
    ...
    ...

    var customerList = loadCustomer(spark.sparkContext, params.customerFilePath)

    // Filter by customer regular expression and customerList
    val filteredTransactionStream = tranactionStream
                            .filter(x => IDRegex.pattern.matcher(x.customer).matches()).filter{ case(transactionRecord) => !(customerList.contains(transactionRecord.customer))}

在流式作业连续运行之前,代码可以正常工作。但是当我尝试重新启动作业时出现以下错误。我发现如果我们有检查点,我们就不能使用广播。

java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to scala.collection.SetLike

请告诉我如何解决这个问题。

谢谢

【问题讨论】:

    标签: apache-spark spark-streaming


    【解决方案1】:

    您应该优雅地停止应用程序,否则应用程序将停止并保存一半的数据,当您尝试重新启动时,它可能不会序列化,因为数据不完全可用。

    【讨论】:

    • 感谢您的回复。我尝试创建一个单例对象(具有读取文件并广播它的逻辑)。然后从 executors 本身调用这个单例对象。即使重新启动,流媒体也能正常工作。谢谢
    猜你喜欢
    • 1970-01-01
    • 2018-08-26
    • 2017-02-13
    • 1970-01-01
    • 2016-01-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多