【问题标题】:Creating DataFrame in Java based on ByteArrayInputStream基于 ByteArrayInputStream 在 Java 中创建 DataFrame
【发布时间】:2020-10-22 14:49:25
【问题描述】:

我需要将以下转换为 Java 中的 Spark DataFrame,并根据 avro 模式保存结构。然后我会根据这个avro结构写到s3。

GenericRecord r = new GenericData.Record(inAvroSchema);
r.put("id", "1");
r.put("cnt", 111);

Schema enumTest =
        SchemaBuilder.enumeration("name1")
                .namespace("com.name")
                .symbols("s1", "s2");

GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(enumTest, "s1");

r.put("type", symbol);

ByteArrayOutputStream bao = new ByteArrayOutputStream();
GenericDatumWriter<GenericRecord> w = new GenericDatumWriter<>(inAvroSchema);

Encoder e = EncoderFactory.get().jsonEncoder(inAvroSchema, bao);
w.write(r, e);
e.flush();

我可以基于 JSON 结构创建对象

  Object o = reader.read(null, DecoderFactory.get().jsonDecoder(inAvroSchema, new ByteArrayInputStream(bao.toByteArray())));

但也许有什么方法可以基于 ByteArrayInputStream(bao.toByteArray()) 创建 DataFrame?

谢谢

【问题讨论】:

    标签: java dataframe apache-spark bytearrayinputstream


    【解决方案1】:

    不,您必须使用数据源来读取 Avro 数据。 Spark 将 Avro 作为文件从文件系统读取是至关重要的,因为许多优化和功能都依赖于它(例如压缩和分区)。 您必须添加spark-avro(除非您高于2.4)。 请注意,您使用的 EnumType 在 Spark 的 Dataset 中将是 String

    另见:Spark: Read an inputStream instead of File

    或者,您可以考虑使用SparkContext#parallelize 部署一堆任务并通过DatumReader/DatumWriter 显式读取/写入文件。

    【讨论】:

    • 谢谢@andreoss。在这部分代码之前,我计划读取 avro 数据并使用 spark.sql 进行一些聚合。主要思想是应用 avro 模式,然后写下 s3。当我将它写入 s3 时,“avroSchema”选项不起作用,如果我直接从聚合数据帧写入它,则 Enum 值存在问题,因为它被保存为字符串到 s3。我发现只有一种方法可以应用上述脚本中提到的 avroSchema。我可以将它保存到本地的 avro 文件中......但它不适用于 s3....
    • 您可以直接处理文件,仍然使用 Spark 来安排您的工作,但您不会有任何数据集
    猜你喜欢
    • 2016-05-26
    • 2018-10-01
    • 2012-06-28
    • 2016-02-10
    • 2022-01-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多