【发布时间】:2016-09-17 03:55:03
【问题描述】:
我正在尝试通过 Spark Streaming 和 Spark SQL 处理日志。主要思想是根据查询需要为“旧”数据转换为 DataFrame 的 Parquet 格式的“压缩”数据集,压缩数据集加载通过以下方式完成:
SQLContext sqlContext = JavaSQLContextSingleton.getInstance(sc.sc());
DataFrame compact = null;
compact = sqlContext.parquetFile("hdfs://auto-ha/tmp/data/logs");
由于未压缩的数据集(我每天压缩数据集)由许多文件组成,我希望将当天的数据放在 DStream 中,以便快速获取这些查询。
我已经尝试过 DataFrame 方法但没有结果......
DataFrame df = JavaSQLContextSingleton.getInstance(sc.sc()).createDataFrame(lastData, schema);
df.registerTempTable("lastData");
JavaDStream SumStream = inputStream.transform(new Function<JavaRDD<Row>, JavaRDD<Object>>() {
@Override
public JavaRDD<Object> call(JavaRDD<Row> v1) throws Exception {
DataFrame df = JavaSQLContextSingleton.getInstance(v1.context()).createDataFrame(v1, schema);
......drop old data from lastData table
df.insertInto("lastData");
}
});
例如,使用这种方法,如果我在不同的线程中查询临时表,我不会得到任何结果。
我也尝试过使用 RDD 转换方法,更具体地说,我尝试按照 Spark 示例创建一个空 RDD,然后将 DSStream RDD 内容与空 RDD 合并:
JavaRDD<Row> lastData = sc.emptyRDD();
JavaDStream SumStream = inputStream.transform(new Function<JavaRDD<Row>, JavaRDD<Object>>() {
@Override
public JavaRDD<Object> call(JavaRDD<Row> v1) throws Exception {
lastData.union(v1).filter(let only recent data....);
}
});
这种方法也不起作用,因为我在 lastData 中没有得到任何内容
我可以为此目的使用窗口计算或 updateStateBy 键吗?
有什么建议吗?
感谢您的帮助!
【问题讨论】:
标签: apache-spark-sql spark-streaming