【问题标题】:Java: Read JSON from a file, convert to ORC and write to a fileJava:从文件中读取 JSON,转换为 ORC 并写入文件
【发布时间】:2018-02-27 15:26:23
【问题描述】:

我需要自动化 JSON 到 ORC 的转换过程。我几乎可以通过使用 Apache 的 ORC-tools 包到达那里,除了 JsonReader 不处理 Map 类型和 throws an exception。因此,以下工作但不处理 Map 类型。

Path hadoopInputPath = new Path(input);
    try (RecordReader recordReader = new JsonReader(hadoopInputPath, schema, hadoopConf)) { // throws when schema contains Map type
        try (Writer writer = OrcFile.createWriter(new Path(output), OrcFile.writerOptions(hadoopConf).setSchema(schema))) {
            VectorizedRowBatch batch = schema.createRowBatch();
            while (recordReader.nextBatch(batch)) {
                writer.addRowBatch(batch);
            }
        }
    }

因此,我开始研究使用 Hive 类进行 Json 到 ORC 的转换,这有一个额外的优势,即在未来我可以转换为其他格式,例如只需少量代码更改的 AVRO。但是,我不确定使用 Hive 类执行此操作的最佳方法是什么。具体来说,不清楚如何将 HCatRecord 写入文件,如下所示。

    HCatRecordSerDe hCatRecordSerDe = new HCatRecordSerDe();
    SerDeUtils.initializeSerDe(hCatRecordSerDe, conf, tblProps, null);

    OrcSerde orcSerde = new OrcSerde();
    SerDeUtils.initializeSerDe(orcSerde, conf, tblProps, null);

    Writable orcOut = orcSerde.serialize(hCatRecord, hCatRecordSerDe.getObjectInspector());
    assertNotNull(orcOut);

    InputStream input = getClass().getClassLoader().getResourceAsStream("test.json.snappy");
    SnappyCodec compressionCodec = new SnappyCodec();
    try (CompressionInputStream inputStream = compressionCodec.createInputStream(input)) {
        LineReader lineReader = new LineReader(new InputStreamReader(inputStream, Charsets.UTF_8));
        String jsonLine = null;
        while ((jsonLine = lineReader.readLine()) != null) {
            Writable jsonWritable = new Text(jsonLine);
            DefaultHCatRecord hCatRecord = (DefaultHCatRecord) jsonSerDe.deserialize(jsonWritable);
            // TODO: Write ORC to file????
        }
    }

任何关于如何完成上述代码或执行 JSON-to-ORC 的更简单方法的想法将不胜感激。

【问题讨论】:

  • 老实说,我会使用 Spark / Pig / 实际的 HiveQL 来做到这一点
  • Map 不就和普通的 JSON 对象一样吗?因此是 Hive 的结构?
  • cricket_007,这种 JSON 到 ORC 的转换需要作为 Web 服务的一部分来完成,该服务已经接收 JSON 数据并用它做其他事情,例如归档。因此,使用 Spark/Hive 作业进行这种转换对我们来说并不是一个真正的选择(即使我们在其他地方使用它们进行这种转换),因为它也需要将 JSON 数据重新发送到这些作业。
  • 我看不出您不能在 Web 服务器中创建 SparkContext 的任何原因
  • 在我之前的评论中我提到了 Spark,但实际上我们只是使用 Hive 查询来做这种格式转换,所以我不太熟悉如何使用 SparkContext 来做这样的事情。我将使用哪些 Spark java 类进行转换?任何代码示例或指向 JavaDocs 的链接都会非常有帮助。

标签: java json apache hive orc


【解决方案1】:

这是我最终根据 cricket_007 建议使用 Spark 库所做的事情:

Maven 依赖项(有一些排除项以保持 maven-duplicate-finder-plugin 满意):

    <properties>
        <dep.jackson.version>2.7.9</dep.jackson.version>
        <spark.version>2.2.0</spark.version>
        <scala.binary.version>2.11</scala.binary.version>
    </properties>

    <dependency>
        <groupId>com.fasterxml.jackson.module</groupId>
        <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
        <version>${dep.jackson.version}</version>
        <exclusions>
            <exclusion>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>apache-log4j-extras</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
            </exclusion>
            <exclusion>
                <groupId>net.java.dev.jets3t</groupId>
                <artifactId>jets3t</artifactId>
            </exclusion>
            <exclusion>
                <groupId>com.google.code.findbugs</groupId>
                <artifactId>jsr305</artifactId>
            </exclusion>
            <exclusion>
                <groupId>stax</groupId>
                <artifactId>stax-api</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.objenesis</groupId>
                <artifactId>objenesis</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

Java代码概要:

SparkConf sparkConf = new SparkConf()
    .setAppName("Converter Service")
    .setMaster("local[*]");

SparkSession sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate();

// read input data
Dataset<Row> events = sparkSession.read()
    .format("json")
    .schema(inputConfig.getSchema()) // StructType describing input schema
    .load(inputFile.getPath());

// write data out
DataFrameWriter<Row> frameWriter = events
    .selectExpr(
        // useful if you want to change the schema before writing it to ORC, e.g. ["`col1` as `FirstName`", "`col2` as `LastName`"]
        JavaConversions.asScalaBuffer(outputSchema.getColumns()))
    .write()
    .options(ImmutableMap.of("compression", "zlib"))
    .format("orc")
    .save(outputUri.getPath());

希望这可以帮助某人入门。

【讨论】:

  • 我开始使用 Apache ORC 推出自己的转换器,但在编写包含 1 个以上元素的列表时遇到了问题。我找到了这个解决方案,并在几个小时内启动并运行它,甚至无需定义架构。我只是希望缺少定义的架构不会在以后咬我......
猜你喜欢
  • 2021-10-14
  • 2011-12-18
  • 2015-12-19
  • 2020-03-02
  • 2016-07-28
  • 1970-01-01
  • 1970-01-01
  • 2013-12-02
  • 1970-01-01
相关资源
最近更新 更多