【问题标题】:Spark - Read file in byte format as InputStreamSpark - 以字节格式读取文件作为 InputStream
【发布时间】:2015-11-04 06:03:22
【问题描述】:

如何以字节数组格式读取多个文件作为 spark 作业中的输入流?

Path pt = new Path(umfPaths);           
FileSystem fs = FileSystem.get(jsc.hadoopConfiguration());
fs.open(pt);

.. 原因是我有内容为字节格式的输入文件。然后将输入文件拆分为块长度为 64 MB 的多个文件并存储在 HDFS 中。我必须使用 Apache spark 并行处理文件。 req 是读取一个完整的 64MB 块作为单个文件并处理它。通过编写自定义记录读取器或使用 FileSystem API(使用 InputStream)来读取每个文件是否有效?

【问题讨论】:

  • 如果我可以问,这样做的目的是什么? PS:您似乎对您提出的旧问题有待接受的答案,请您这样做并投票给他们,因为它们似乎已经解决了。
  • 嗨,我不能投票,但我将它们标记为已解决。
  • 没问题。你能回答我问你的问题吗? (做你正在尝试的事情的目的是什么?)
  • 我刚刚用更多细节编辑了我的问题。如果您需要更多详细信息,请告诉我
  • 但是spark会负责分区,你为什么要折磨自己? (您当然也可以在 spark 上设置分区大小)您只需要在使用 sc.newhadoopapifile(...) 读取时指定输入格式

标签: java apache-spark inputstream rdd


【解决方案1】:

我解决了在 SparkContext 中使用 API newHadoopApiFile 的问题。 我写了一个 CustomInputFormat 类,它会做 InputFormat 的事情,并会返回一个 POJO 对象。

JavaPairRDD> baseRDD = sc.newAPIHadoopFile(args[2], InputFormat.class, NullWritable.class, ArrayList.class, conf);

然后忽略 Key 并创建一个 RDD 的 Values 。

JavaRDD> mapLines1 = baseRDD.values();

然后做了上面RDD的FlatMap。

在 InputFormat 类中,我扩展了 FileInputFormat 并将 isSplittable 覆盖为 false 以作为单个文件读取。

public class InputFormat extends  FileInputFormat {    
 public
 RecordReader<NullWritable, ArrayList<Record>> 
  createRecordReader(InputSplit split, TaskAttemptContext context)
 throws IOException, InterruptedException{  //Logic Goes here  }

      @Override
          protected boolean isSplitable(JobContext context, Path file) {             return false;     
} 
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-07-31
    • 1970-01-01
    • 1970-01-01
    • 2023-03-30
    • 2013-08-14
    • 2017-05-19
    • 2021-06-13
    • 1970-01-01
    相关资源
    最近更新 更多