【问题标题】:Converting data into Parquet in Spark在 Spark 中将数据转换为 Parquet
【发布时间】:2017-06-02 13:08:18
【问题描述】:

我在 S3 中有一些旧数据,我想使用 Spark 2 使用 Java API 将其转换为镶木地板格式。

我有所需的 Avro 架构(.avsc 文件)及其使用 Avro 编译器生成的 Java 类,我想使用这些架构以 Parquet 格式存储数据。输入数据不是任何标准格式,但我有一个库可以将旧文件中的每一行转换为 Avro 类。

是否可以将数据读取为JavaRDD<String>,然后使用库将转换应用到 Avro 类,最后以 parquet 格式存储。

类似:

JavaRDD<String> rdd = javaSparkContext.textFile("s3://bucket/path_to_legacy_files");    
JavaRDD<MyAvroClass> converted = rdd.map(line -> customLib.convertToAvro(line));    
converted.saveAsParquet("s3://bucket/destination"); //how do I do this

上面的方法可行吗?稍后我想使用 Hive、Presto 和 Spark 处理转换后的 parquet 数据。

【问题讨论】:

  • 搜索 Spark 峰会新闻。 Steve Loughran (Horton) 关于“对象存储”...
  • @SamsonScharfrichter 没有回答我的问题。我看到的唯一与远程相关的东西是他如何将一些 csv 数据转换为 Parquet。他使用 sparkSession.csv() 调用来加载我无法加载的数据,因为我需要使用自定义反序列化器。
  • 那么,您的实际问题是什么?是关于将自定义 JavaRDD&lt;stuff&gt; 转换为常规 DataFrame 吗?关于将您的自定义内容保存为 Parquet 格式?关于将其保存到 S3 对象存储?关于使用另一个不知道 RDD 是什么的工具来回读您的自定义内容的方法?以上的组合?
  • @SamsonScharfrichter 问题基本上是如何将一些非标准数据转换为镶木地板。我有一个可供我使用的 Spark 2.0 集群、Avro 模式定义和一个 Java 库,可以将记录从传统的非标准格式转换为 Avro 类的实例。代码sn-p只是一个想法,问能不能做这样的事情。

标签: apache-spark avro parquet spark-avro


【解决方案1】:

暂时忽略 S3;这是一个生产细节。您需要从更简单的问题开始“将我格式的本地文件转换为标准文件”。您可以通过针对一小部分数据样本集的单元测试在本地实现这一点。

这在Spark中一般和Hadoop Mapreduce一样:实现InputFormat&lt;K, V&gt;FileInputFormat&lt;K, V&gt;的子类,或者使用Hadoop的org.apache.hadoop.streaming.mapreduce.StreamInputFormat输入格式,实现自己的RecordReader,然后将选项spark.hadoop.stream.recordreader.class设置为您的唱片阅读器的类名(可能是最简单的)。

有很多关于此的文档,以及堆栈溢出问题。源代码树本身中有很多示例。

【讨论】:

    【解决方案2】:

    想通了,基本上是 Steve 提到的方法,只是 Hadoop 输入和输出格式已经存在:

             Job job = new Job();
             ParquetOutputFormat.setWriteSupportClass(job, AvroWriteSupport.class);
             AvroParquetOutputFormat.setSchema(job, MyAvroType.SCHEMA$);
             AvroParquetOutputFormat.setBlockSize(job, 128*1024*1024);
             AvroParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
             AvroParquetOutputFormat.setCompressOutput(job, true);
    
             sparkContext.textFile("s3://bucket/path_to_legacy_files")
                .map(line -> customLib.convertToAvro(line))
                .mapToPair(record -> new Tuple2<Void, MyAvroType>(null, record))
                .saveAsNewAPIHadoopFile(
                    "s3://bucket/destination", 
                    Void.class, 
                    MyAvroType.class,
                    new ParquetOutputFormat<MyAvroType>().getClass(), 
                    job.getConfiguration());
    

    【讨论】:

      猜你喜欢
      • 2020-01-31
      • 2017-11-08
      • 1970-01-01
      • 2023-03-17
      • 1970-01-01
      • 1970-01-01
      • 2017-03-29
      • 1970-01-01
      • 2018-12-15
      相关资源
      最近更新 更多