【问题标题】:Spark Streaming + Spark SQLSpark Streaming + Spark SQL
【发布时间】:2016-09-17 03:55:03
【问题描述】:

我正在尝试通过 Spark Streaming 和 Spark SQL 处理日志。主要思想是根据查询需要为“旧”数据转换为 DataFrame 的 Parquet 格式的“压缩”数据集,压缩数据集加载通过以下方式完成:

    SQLContext sqlContext = JavaSQLContextSingleton.getInstance(sc.sc());
    DataFrame compact = null;
    compact = sqlContext.parquetFile("hdfs://auto-ha/tmp/data/logs");

由于未压缩的数据集(我每天压缩数据集)由许多文件组成,我希望将当天的数据放在 DStream 中,以便快速获取这些查询。

我已经尝试过 DataFrame 方法但没有结果......

    DataFrame df = JavaSQLContextSingleton.getInstance(sc.sc()).createDataFrame(lastData, schema);
    df.registerTempTable("lastData");
    JavaDStream SumStream = inputStream.transform(new Function<JavaRDD<Row>, JavaRDD<Object>>() {
        @Override
        public JavaRDD<Object> call(JavaRDD<Row> v1) throws Exception {
            DataFrame df = JavaSQLContextSingleton.getInstance(v1.context()).createDataFrame(v1, schema);
            ......drop old data from lastData table                                
            df.insertInto("lastData");

        }
    });

例如,使用这种方法,如果我在不同的线程中查询临时表,我不会得到任何结果。

我也尝试过使用 RDD 转换方法,更具体地说,我尝试按照 Spark 示例创建一个空 RDD,然后将 DSStream RDD 内容与空 RDD 合并:

  JavaRDD<Row> lastData = sc.emptyRDD();
  JavaDStream SumStream = inputStream.transform(new Function<JavaRDD<Row>, JavaRDD<Object>>() {
        @Override
        public JavaRDD<Object> call(JavaRDD<Row> v1) throws Exception {
            lastData.union(v1).filter(let only recent data....);
        }
    });

这种方法也不起作用,因为我在 lastData 中没有得到任何内容

我可以为此目的使用窗口计算或 updateStateBy 键吗?

有什么建议吗?

感谢您的帮助!

【问题讨论】:

    标签: apache-spark-sql spark-streaming


    【解决方案1】:

    好吧,我终于明白了。

    我使用 updateState 函数,如果像这样的时间戳超过 24 小时,则返回 0。

          final static Function2<List<Long>, Optional<Long>, Optional<Long>> RETAIN_RECENT_DATA
            = (List<Long> values, Optional<Long> state) -> {
                Long newSum = state.or(0L);
                for (Long value : values) {
                    newSum += value;
                }
                //current milis uses UTC
                if (System.currentTimeMillis() - newSum > 86400000L) {
                    return Optional.absent();
                } else {
                    return Optional.of(newSum);
                }
            };
    

    然后在每个批次中,我将 DataFrame 注册为临时表:

    finalsum.foreachRDD((JavaRDD<Row> rdd, Time time) -> {
            if (!rdd.isEmpty()) {
                HiveContext sqlContext1 = JavaSQLContextSingleton.getInstance(rdd.context());
                if (sqlContext1.cacheManager().isCached("alarm_recent")) {
                    sqlContext1.uncacheTable("alarm_recent");
                }
                DataFrame wordsDataFrame = sqlContext1.createDataFrame(rdd, schema);
                wordsDataFrame.registerTempTable("alarm_recent");
    
                wordsDataFrame.cache();//    
                wordsDataFrame.first();
            }
            return null;
        });
    

    【讨论】:

      【解决方案2】:

      您可以在 Spark1.6 中使用 mapwithState。 mapwithState 函数更加高效且易于实现。

      看看this链接。

      mapwithState 支持很酷的功能,例如 State time outinitialRDD,在保持有状态 Dstream 的同时非常方便。

      谢谢 玛纳斯

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2020-03-19
        • 2016-10-09
        • 2016-06-13
        • 2017-12-30
        • 2015-12-18
        • 2018-09-10
        • 1970-01-01
        相关资源
        最近更新 更多