【问题标题】:Saving Spark Java RDD such that each RDD value is saved into separate files in separate folders保存 Spark Java RDD,以便将每个 RDD 值保存到单独文件夹中的单独文件中
【发布时间】:2018-09-14 22:51:05
【问题描述】:

我正在使用 Spark 2.3 和 Java 1.8

我有一个 CSV 记录的 RDD 说:

JavaRDD<CsvRecordsPerApp> csvRecordsRdd

这里每个CsvRecordsPerApp都有多个值:

class CsvRecordsPerApp implements Serializable {
    String customerName;
    String supplierName;
    String otherFieldName;
} 

我想将它保存在多个文件夹中,这样每个 RDD 都会保存到 3 个单独的文件夹中,例如

- customerNames\part-0000
- customerNames\part-0001
...
- supplierNames\part-0000
- supplierNames\part-0001
...

- otherFieldNames\part-0000
- otherFieldNames\part-0001
...

但是当我在下面执行时,它会将所有输出文件保存在单个文件中:

JavaRDD<CsvRecordsPerApp> csvRecordsRdd = ...
csvRecordsRdd.saveAsTextFile("file-name");

喜欢:

file-name/0000
file-name/0001
..

我尝试将 csvRecordsRdd 映射到不同的值并保存 3 次,如下所示:

JavaRDD<String> customerNameRdd = csvRecordsRdd.map(csv -> csv.getCustomerName());
customerNameRdd.saveAsTextFile("customerNames");

JavaRDD<String> supplierNameRdd = csvRecordsRdd.map(csv -> csv.getSupplierName());
supplierNameRdd.saveAsTextFile("supplierNames");

JavaRDD<String> otherFieldNameRdd = csvRecordsRdd.map(csv -> csv.getOtherFieldName());
otherFieldNameRdd.saveAsTextFile("otherFieldName");

这里的问题是它重新计算 RDD 3 次,我有三个条目!!

然后为了停止重新计算,我尝试了下面的缓存,但它不起作用,仍然计算了 3 次:

csvRecordsRdd.persist(StorageLevel.MEMORY_AND_DISK()); or csvRecordsRdd.cache();

我正在寻找解决问题的方法

【问题讨论】:

  • 能不能把这3个RDD放到一个case类中,然后把csvRecordsRDD映射到case类?这应该将其限制为一次与 3 次。(警告:我自己是 spark/scala 新手。)

标签: java file apache-spark rdd


【解决方案1】:

这里是缓存工作的解决方案(对不起,我忘了早点更新)。

因为我将 spart-submit 驱动程序执行程序内存从 1 gb(默认)更改为 20 gb 左右(取决于您的系统可用性,例如在我的桌面上,我将其增加到 5 gb,但在 EMR 上我将其增加到 20 gb 或更多)。

我认为这只是一种解决方法,因为它会缓存对象。缓存有一个限制,因此对于更大的数据它可能会失败,并且肯定需要更大的 m/c。

所以,请提出更多更好的解决方案。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-09-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多