【问题标题】:Flink - Convert Avro datastream to tableFlink - 将 Avro 数据流转换为表
【发布时间】:2021-04-06 04:43:05
【问题描述】:

我在 Kafka 中有 Avro 格式的消息。这些必须转换为表并使用 SQL 选择,然后转换为流,最后下沉。 有多个具有不同 Avro 模式的 Kafka 主题,因此需要动态表。

这是我正在使用的代码

StreamExecutionEnvironment env = ...;
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

FlinkKafkaConsumer<MyAvroClass> kafkaConsumer = ...;
var kafkaInputStream = env.addSource(kafkaConsumer, "kafkaInput");

Table table = tableEnv.fromDataStream(kafkaInputStream);
tableEnv.executeSql("DESCRIBE " + table).print();
...

MyAvroClass 是 Avro 类,它扩展了 SpecificRecordBase 并包含一个数组。
这个类的代码。

public class MyAvroClass extends SpecificRecordBase implements SpecificRecord {
  // avro fields
  private String event_id;
  private User user;
  private List<Item> items; 
  
  // getter, setters, constructors, builders, ...
}

我无法访问items 字段的元素。当我打印表格描述时,我看到项目的类型是 ANY

+------------+-------------------------------------------------------------+------+-----+--------+-----------+
|       name |                                                        type | null | key | extras | watermark |
+------------+-------------------------------------------------------------+------+-----+--------+-----------+
|   event_id |                                                      STRING | true |     |        |           |
|      items |                        LEGACY('RAW', 'ANY<java.util.List>') | true |     |        |           |
|       user |  LEGACY('STRUCTURED_TYPE', 'POJO<com.company.events.User>') | true |     |        |           |
+------------+-------------------------------------------------------------+------+-----+--------+-----------+  

如何将其转换为可以查询 from 项目的类型? 提前致谢

【问题讨论】:

  • 您找到解决方案了吗?

标签: java apache-flink flink-streaming flink-sql


【解决方案1】:

我目前正在为此目的使用此方法。这个想法是SpecificRecord -(by AvroSerializationSchema)-&gt; binary -(by AvroRowDeserializationSchema)-&gt; Row。请注意,您需要按照FLINK-23885 的评论中的建议,使用.returns 来表示.map 的输出类型。

public static <T extends SpecificRecord> Table toTable(StreamTableEnvironment tEnv,
                                                       DataStream<T> dataStream,
                                                       Class<T> cls) {
  RichMapFunction<T, Row> avroSpecific2RowConverter = new RichMapFunction<>() {
    private transient AvroSerializationSchema<T> avro2bin = null;
    private transient AvroRowDeserializationSchema bin2row = null;

    @Override
    public void open(Configuration parameters) throws Exception {
      avro2bin = AvroSerializationSchema.forSpecific(cls);
      bin2row = new AvroRowDeserializationSchema(cls);
    }

    @Override
    public Row map(T value) throws Exception {
      byte[] bytes = avro2bin.serialize(value);
      Row row = bin2row.deserialize(bytes);
      return row;
    }
  };

  SingleOutputStreamOperator<Row> rows = dataStream.map(avroSpecific2RowConverter)
    // https://issues.apache.org/jira/browse/FLINK-23885
    .returns(AvroSchemaConverter.convertToTypeInfo(cls));

  return tEnv.fromDataStream(rows);
}

【讨论】:

  • 您的答案可以通过额外的支持信息得到改进。请edit 添加更多详细信息,例如引用或文档,以便其他人可以确认您的答案是正确的。你可以找到更多关于如何写好答案的信息in the help center
猜你喜欢
  • 1970-01-01
  • 2016-12-06
  • 1970-01-01
  • 1970-01-01
  • 2015-11-18
  • 2013-07-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多