【问题标题】:How to optimize huge spark dataframe SQL query to compare values from spark streaming RDDs?如何优化巨大的火花数据框 SQL 查询来比较火花流 RDD 的值?
【发布时间】:2015-09-13 18:04:36
【问题描述】:

我正在创建一个使用 spark SQL(数据帧)和 spark 流的演示。我不是火花专家,所以我需要一些帮助!

我从数据库加载大约 100 万个对象以触发 Dataframe,并执行 SQL 查询以匹配某些字段以及来自 spark 流的实时数据。

例如,

SELECT *
FROM Person
WHERE Person.name='stream.name' AND Person.age='stream.age' AND ... etc

stream.xxx 是我从 Spark Streaming RDD 中提取的一个 java 字符串。

现在的问题是,对于 100 万行和多列的数据帧,即使 DF 保存在内存中,上面的 SQL 查询也可能需要一些时间来执行。我有一个想法,将 Person 表分解为邮政编码区域(每个数据帧包含来自 1 个区域的 Person)并处理每个数据帧上的每个 spark 流 RDD。这将减少查询时间并使事情变得更快。

我不确定我会如何进行分区。这是一些示例代码。

// Setup Spark Stream with receiver
JavaReceiverInputDStream<String> transaction = jssc.receiverStream(new TransactionStreamReceiver(StorageLevel.MEMORY_AND_DISK()));

DataFrame person1 = //load logic ommitted
DataFrame person2 = //load logic ommitted
DataFrame person3 = //load logic ommitted

// Break up Person table for faster processing
transaction.foreachRDD(new TransactionProcessingFunction(person1, sqlContext, window,1));

transaction.foreachRDD(new TransactionProcessingFunction(person2, sqlContext, window,2));

transaction.foreachRDD(new TransactionProcessingFunction(person3, sqlContext, window,2));

我假设每个工作节点都会处理一个 foreachRDD 方法,但事实并非如此。有什么方法可以分配每个工作人员并行运行每个 foreachRDD?

编辑:TransactionProcessingFunction 类本质上只是一个 forloop,它遍历流数据并执行上面的查询并显示一些结果。

【问题讨论】:

  • 您需要输出中的所有列吗?
  • 我需要大约 6 列,我使用 * 因为我不想全部输入:P
  • 如果您不需要所有列,最好通过具体的方式尽早过滤。
  • 但是你仍然会有 100 万行,这仍然是很多比较。我正在寻找一种方法来并行化 Persons 表的各个部分,spark 应该能够做到这一点,我只是不知道如何。

标签: apache-spark dataframe spark-streaming apache-spark-sql


【解决方案1】:

我将尝试获得死灵法师银牌,指导您to this page 了解如何在流中加载 SQL 表。创建表对象后,您可以repartition() rdd 或简单地让 Spark Environmnet 完成它的工作。

【讨论】:

    猜你喜欢
    • 2020-01-17
    • 2016-06-27
    • 1970-01-01
    • 2018-08-12
    • 1970-01-01
    • 1970-01-01
    • 2020-08-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多