【发布时间】:2018-09-14 22:51:05
【问题描述】:
我正在使用 Spark 2.3 和 Java 1.8
我有一个 CSV 记录的 RDD 说:
JavaRDD<CsvRecordsPerApp> csvRecordsRdd
这里每个CsvRecordsPerApp都有多个值:
class CsvRecordsPerApp implements Serializable {
String customerName;
String supplierName;
String otherFieldName;
}
我想将它保存在多个文件夹中,这样每个 RDD 都会保存到 3 个单独的文件夹中,例如
- customerNames\part-0000
- customerNames\part-0001
...
- supplierNames\part-0000
- supplierNames\part-0001
...
- otherFieldNames\part-0000
- otherFieldNames\part-0001
...
但是当我在下面执行时,它会将所有输出文件保存在单个文件中:
JavaRDD<CsvRecordsPerApp> csvRecordsRdd = ...
csvRecordsRdd.saveAsTextFile("file-name");
喜欢:
file-name/0000
file-name/0001
..
我尝试将 csvRecordsRdd 映射到不同的值并保存 3 次,如下所示:
JavaRDD<String> customerNameRdd = csvRecordsRdd.map(csv -> csv.getCustomerName());
customerNameRdd.saveAsTextFile("customerNames");
JavaRDD<String> supplierNameRdd = csvRecordsRdd.map(csv -> csv.getSupplierName());
supplierNameRdd.saveAsTextFile("supplierNames");
JavaRDD<String> otherFieldNameRdd = csvRecordsRdd.map(csv -> csv.getOtherFieldName());
otherFieldNameRdd.saveAsTextFile("otherFieldName");
这里的问题是它重新计算 RDD 3 次,我有三个条目!!
然后为了停止重新计算,我尝试了下面的缓存,但它不起作用,仍然计算了 3 次:
csvRecordsRdd.persist(StorageLevel.MEMORY_AND_DISK()); or csvRecordsRdd.cache();
我正在寻找解决问题的方法
【问题讨论】:
-
能不能把这3个RDD放到一个case类中,然后把csvRecordsRDD映射到case类?这应该将其限制为一次与 3 次。(警告:我自己是 spark/scala 新手。)
标签: java file apache-spark rdd