【发布时间】:2021-04-06 04:43:05
【问题描述】:
我在 Kafka 中有 Avro 格式的消息。这些必须转换为表并使用 SQL 选择,然后转换为流,最后下沉。 有多个具有不同 Avro 模式的 Kafka 主题,因此需要动态表。
这是我正在使用的代码
StreamExecutionEnvironment env = ...;
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
FlinkKafkaConsumer<MyAvroClass> kafkaConsumer = ...;
var kafkaInputStream = env.addSource(kafkaConsumer, "kafkaInput");
Table table = tableEnv.fromDataStream(kafkaInputStream);
tableEnv.executeSql("DESCRIBE " + table).print();
...
MyAvroClass 是 Avro 类,它扩展了 SpecificRecordBase 并包含一个数组。
这个类的代码。
public class MyAvroClass extends SpecificRecordBase implements SpecificRecord {
// avro fields
private String event_id;
private User user;
private List<Item> items;
// getter, setters, constructors, builders, ...
}
我无法访问items 字段的元素。当我打印表格描述时,我看到项目的类型是 ANY
+------------+-------------------------------------------------------------+------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+------------+-------------------------------------------------------------+------+-----+--------+-----------+
| event_id | STRING | true | | | |
| items | LEGACY('RAW', 'ANY<java.util.List>') | true | | | |
| user | LEGACY('STRUCTURED_TYPE', 'POJO<com.company.events.User>') | true | | | |
+------------+-------------------------------------------------------------+------+-----+--------+-----------+
如何将其转换为可以查询 from 项目的类型? 提前致谢
【问题讨论】:
-
您找到解决方案了吗?
标签: java apache-flink flink-streaming flink-sql