【发布时间】:2016-08-02 14:17:00
【问题描述】:
我们有一个流数据,我在 HBase 表中有一些主信息。对于每一行,我都需要查找 HBase 主表并获取一些配置文件信息。我的代码是这样的
val con = new setContext(hadoopHome,sparkMaster)
val l_sparkcontext = con.getSparkContext
val l_hivecontext = con.getHiveContext
val topicname = "events"
val ssc = new StreamingContext(l_sparkcontext, Seconds(30))
val eventsStream = KafkaUtils.createStream(ssc,"xxx.xxx.142.xxx:2181","receive_rest_events",Map(topicname.toString -> 10))
println("Kafka Stream for receiving Events.." )
val profile_data = l_hivecontext.sql("select gender, income, age, riid from hbase_customer_profile")
profile_data.foreach(println)
val tabBC = l_sparkcontext.broadcast(profile_data)
eventsStream.foreachRDD(rdd => {
rdd.foreach(record => {
val subs_profile_rows = tabBC.value
val Rows = record._2.split(rowDelim)
Rows.foreach(row => {
val values = row.split(colDelim)
val riid = values(1).toInt
val cond = "riid = " + riid
println("Condition : ", cond)
val enriched_events = subs_profile_rows.filter(cond)
}) // End of Rows
}) // End of RDD
}) // End of Events Stream
不幸的是,我总是在过滤器上点击 NPE。我在这里遵循了几个问题和答案来跨工作节点广播值,但没有任何帮助。有人可以帮忙吗。
问候
巴拉
【问题讨论】:
-
检查是否使用了无法序列化的值。
-
我不确定 profile_data 是否应该在 foreach 中创建,那是不可序列化的。
标签: scala foreach spark-streaming