【问题标题】:How to decode a byte[] of List<Objects> to Dataset<Row> in spark?如何在火花中将 List<Objects> 的字节 [] 解码为 Dataset<Row>?
【发布时间】:2020-05-23 18:41:22
【问题描述】:

我在我的项目中使用 spark-sql-2.3.1v、kafka 和 java8。 我正在尝试将收到的主题字节 [] 转换为 kafka 消费者端的数据集。

这里是详细信息

我有

class Company{
    String companyName;
    Integer companyId;
}

我定义为

public static final StructType companySchema = new StructType(
              .add("companyName", DataTypes.StringType)
              .add("companyId", DataTypes.IntegerType);

但消息定义为

class Message{
    private List<Company> companyList;
    private String messageId;
}

我试图定义为

StructType messageSchema = new StructType()
            .add("companyList", DataTypes.createArrayType(companySchema , false),false)
            .add("messageId", DataTypes.StringType);

我使用序列化将消息作为字节 [] 发送到 kafka 主题。

我在 consumer 成功接收到消息字节 []。 我试图将其转换为数据集?怎么办?

   Dataset<Row> messagesDs = kafkaReceivedStreamDs.select(from_json(col("value").cast("string"), messageSchema ).as("messages")).select("messages.*");

  messagesDs.printSchema();

  root
         |-- companyList: array (nullable = true)
         |    |-- element: struct (containsNull = true)
         |    |    |-- companyName: string (nullable = true)
         |    |    |-- companyId: integer (nullable = true)
         |-- messageId: string (nullable = true)    

Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList")));

comapanyListDs.printSchema();

root
 |-- col: struct (nullable = true)
 |    |-- companyName: string (nullable = true)
 |    |-- companyId: integer (nullable = true)



Dataset<Company> comapanyDs = comapanyListDs.as(Encoders.bean(Company.class));

出现错误:

线程“主”org.apache.spark.sql.AnalysisException 中的异常:无法解析给定输入列的“companyName”:[col];

如何获取Dataset记录,如何获取?

【问题讨论】:

    标签: java apache-spark apache-kafka apache-spark-sql spark-structured-streaming


    【解决方案1】:

    你的结构在爆炸时被命名为“col”。

    由于您的 Bean 类没有“col”属性,因此它会因上述错误而失败。

    线程“主”org.apache.spark.sql.AnalysisException 中的异常: 无法解析给定输入列的“公司名称”:[col];

    您可以执行以下选择以将相关列作为普通列: 像这样的:

        Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList"))).
    select(col("col.companyName").as("companyName"),col("col.companyId").as("companyId"));
    

    我尚未测试语法,但一旦您从 struct 中获得每一行的普通列,就必须执行下一步。

    【讨论】:

    猜你喜欢
    • 2022-11-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-21
    • 1970-01-01
    • 2019-04-29
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多