【问题标题】:Append streaming dataset to batch dataset in Spark将流数据集附加到 Spark 中的批处理数据集
【发布时间】:2017-02-10 21:55:17
【问题描述】:

我们在 Spark 中有一个用例,我们希望将历史数据从数据库加载到 Spark,并继续向 Spark 添加新的流数据,然后我们可以对整个最新数据集进行分析。

据我所知,Spark SQL 和 Spark Streaming 都不能将历史数据与流数据结合起来。然后我发现了 Spark 2.0 中的 Structured Streaming,它似乎是为这个问题而构建的。但是经过一些实验,我仍然无法弄清楚。这是我的代码:

SparkSession spark = SparkSession
        .builder()
        .config(conf)
        .getOrCreate();

JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

// Load historical data from MongoDB
JavaMongoRDD<Document> mongordd = MongoSpark.load(jsc);


// Create typed dataset with customized schema
JavaRDD<JavaRecordForSingleTick> rdd = mongordd.flatMap(new FlatMapFunction<Document, JavaRecordForSingleTick>() {...});
Dataset<Row> df = spark.sqlContext().createDataFrame(rdd, JavaRecordForSingleTick.class);
Dataset<JavaRecordForSingleTick> df1 = df.as(ExpressionEncoder.javaBean(JavaRecordForSingleTick.class));


// ds listens to a streaming data source
Dataset<Row> ds = spark.readStream()
        .format("socket")
        .option("host", "127.0.0.1")
        .option("port", 11111)
        .load();

// Create the typed dataset with customized schema
Dataset<JavaRecordForSingleTick> ds1 = ds
        .as(Encoders.STRING())
        .flatMap(new FlatMapFunction<String, JavaRecordForSingleTick>() {
    @Override
    public Iterator<JavaRecordForSingleTick> call(String str) throws Exception {
    ...
    }
}, ExpressionEncoder.javaBean(JavaRecordForSingleTick.class));


// ds1 and df1 have the same schema. ds1 gets data from the streaming data source, df1 is the dataset with historical data

ds1 = ds1.union(df1);
StreamingQuery query = ds1.writeStream().format("console").start();
query.awaitTermination();

我收到一个错误“org.apache.spark.sql.AnalysisException:不支持流和批处理 DataFrames/Datasets 之间的联合;”当我联合()两个数据集时。

谁能帮帮我?我是不是走错方向了?

【问题讨论】:

  • Spark 2.0 中的结构化流处于 Alpha 阶段 - 很多东西还不支持。我想知道您是否不能改用有状态流。在有状态流中,您可以使用历史数据引导您的状态,然后以您喜欢的方式附加流数据。有关详细信息,请参阅此Databrick's blogpost
  • @GlennieHellesSindholt 嗨,Glennie,谢谢您的建议。我认为 mapWithState() 最好用于用新的流数据替换/更新当前状态(键值对)。在我的用例中,我的 RDD 不是键值配对的,不需要更新旧数据。使用 mapWithState() 是不是太过分了?
  • 我同意 mapWithState 不是显而易见的选择,如果您没有任何类型的聚合,但是如果您不需要历史数据,为什么要在您的流媒体?

标签: apache-spark apache-spark-sql apache-spark-2.0 spark-structured-streaming


【解决方案1】:

在支持这种类型的功能方面,我不能代表 MongoDB spark 连接器,而且 Google 上似乎没有太多关于它的内容。但是,Spark 数据库生态系统中还有其他数据库可以这样做。我在another answer 中介绍了 Spark 数据库生态系统中的大部分内容。虽然我知道SnappyDataMemSQL 在该列表中,但我不能准确地说出哪个数据库可以轻松地允许您正在寻找的功能类型。但是,您可能需要两者的关系形式的数据。

【讨论】:

    猜你喜欢
    • 2018-04-30
    • 2020-10-09
    • 1970-01-01
    • 2023-04-03
    • 2019-02-12
    • 2016-12-30
    • 2020-05-05
    • 2018-03-14
    • 1970-01-01
    相关资源
    最近更新 更多