【发布时间】:2015-09-13 18:04:36
【问题描述】:
我正在创建一个使用 spark SQL(数据帧)和 spark 流的演示。我不是火花专家,所以我需要一些帮助!
我从数据库加载大约 100 万个对象以触发 Dataframe,并执行 SQL 查询以匹配某些字段以及来自 spark 流的实时数据。
例如,
SELECT *
FROM Person
WHERE Person.name='stream.name' AND Person.age='stream.age' AND ... etc
stream.xxx 是我从 Spark Streaming RDD 中提取的一个 java 字符串。
现在的问题是,对于 100 万行和多列的数据帧,即使 DF 保存在内存中,上面的 SQL 查询也可能需要一些时间来执行。我有一个想法,将 Person 表分解为邮政编码区域(每个数据帧包含来自 1 个区域的 Person)并处理每个数据帧上的每个 spark 流 RDD。这将减少查询时间并使事情变得更快。
我不确定我会如何进行分区。这是一些示例代码。
// Setup Spark Stream with receiver
JavaReceiverInputDStream<String> transaction = jssc.receiverStream(new TransactionStreamReceiver(StorageLevel.MEMORY_AND_DISK()));
DataFrame person1 = //load logic ommitted
DataFrame person2 = //load logic ommitted
DataFrame person3 = //load logic ommitted
// Break up Person table for faster processing
transaction.foreachRDD(new TransactionProcessingFunction(person1, sqlContext, window,1));
transaction.foreachRDD(new TransactionProcessingFunction(person2, sqlContext, window,2));
transaction.foreachRDD(new TransactionProcessingFunction(person3, sqlContext, window,2));
我假设每个工作节点都会处理一个 foreachRDD 方法,但事实并非如此。有什么方法可以分配每个工作人员并行运行每个 foreachRDD?
编辑:TransactionProcessingFunction 类本质上只是一个 forloop,它遍历流数据并执行上面的查询并显示一些结果。
【问题讨论】:
-
您需要输出中的所有列吗?
-
我需要大约 6 列,我使用 * 因为我不想全部输入:P
-
如果您不需要所有列,最好通过具体的方式尽早过滤。
-
但是你仍然会有 100 万行,这仍然是很多比较。我正在寻找一种方法来并行化 Persons 表的各个部分,spark 应该能够做到这一点,我只是不知道如何。
标签: apache-spark dataframe spark-streaming apache-spark-sql