【发布时间】:2017-06-23 22:36:08
【问题描述】:
我是 Spark 的新手,在映射 Dataframe 时收到错误。
我有一个 DStream,我想使用 sql Dataframe 对其进行转换以过滤数据。代码是这样的:
val textDStream = ssc.textFileStream(inputPath)
val activityStream = textDStream.transform(input => {
input.flatMap { line =>
val record = line.split("\\t")
Some(Activity(record(0).toLong / MS_IN_HOUR * MS_IN_HOUR, record(1), record(2), record(3), record(4), record(5), record(6)))
}
})
activityStream.transform(rdd => {
val df = rdd.toDF()
df.registerTempTable("activity")
val activityByProduct = sqlContext.sql("""SELECT
product,
timestamp_hour,
sum(case when action = 'purchase' then 1 else 0 end) as purchase_count,
sum(case when action = 'add_to_cart' then 1 else 0 end) as add_to_cart_count,
sum(case when action = 'page_view' then 1 else 0 end) as page_view_count
from activity
group by product, timestamp_hour """)
activityByProduct
.map { r => ((r.getString(0), r.getLong(1)),
ActivityByProduct(r.getString(0), r.getLong(1), r.getLong(2), r.getLong(3), r.getLong(4))
)}
}).print()
这里的问题是我收到以下错误:
错误:(58, 18) 缺少参数类型 .map { r => ((r.getString(0), r.getLong(1)),
activityByProduct
.map { r => ((r.getString(0), r.getLong(1)),
ActivityByProduct(r.getString(0), r.getLong(1), r.getLong(2), r.getLong(3), r.getLong(4))
)}
我看不到类型丢失的地方。我已经尝试明确设置 r => type。但它继续返回错误。
会是什么?
提前致谢
【问题讨论】:
-
您的
.map()返回什么?如果它是一个元组(元组,?),那么你需要更多的括号。ActivityByProduct是什么? -
事实上我正在返回一个(键,ActivityByProduct)的元组。其中 key = (product, timestamp_hour) = (r.getString(0), r.getLong(1) 这个key用于有状态操作
-
我已更改为 activityByProduct .map { r => { val key = (r.getString(0), r.getLong(1)) val activity = ActivityByProduct(r.getString(0), r.getLong(1), r.getLong(2), r.getLong(3), r.getLong(4)) ActivityFull(key, activity) } } 它返回同样的错误
-
ActivityByProduct 是一个案例类
-
你试过了:
.map[((String,Long),ActivityByProduct)] { r => ....?
标签: scala apache-spark