【发布时间】:2019-07-09 19:08:39
【问题描述】:
我必须每天使用一次加载文件并在我的 spark 流中使用它。为此,我正在尝试读取文件并广播它。下面是正在使用的代码。
def loadCustomer(sc: SparkContext, customerFilePath: String) = {
val customerList: Set[String] = if (customerFilePath.isEmpty) Set()
else {
sc.textFile(customerFilePath).collect().toSet
}
customerList
}
...
...
var customerList = loadCustomer(spark.sparkContext, params.customerFilePath)
// Filter by customer regular expression and customerList
val filteredTransactionStream = tranactionStream
.filter(x => IDRegex.pattern.matcher(x.customer).matches()).filter{ case(transactionRecord) => !(customerList.contains(transactionRecord.customer))}
在流式作业连续运行之前,代码可以正常工作。但是当我尝试重新启动作业时出现以下错误。我发现如果我们有检查点,我们就不能使用广播。
java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to scala.collection.SetLike
请告诉我如何解决这个问题。
谢谢
【问题讨论】:
标签: apache-spark spark-streaming