【发布时间】:2023-02-11 01:37:40
【问题描述】:
我是 flink sql cli 的新手,我想从我的 kafka 集群创建一个接收器。
我已经阅读了文档,据我所知,de headers 是一个 map<STRING, BYTE> 类型,通过它们是所有重要信息。
当我使用 de sql-cli 时,我尝试按照以下命令创建一个接收器表:
CREATE TABLE KafkaSink (
`headers` MAP<STRING, BYTES> METADATA
) WITH (
'connector' = 'kafka',
'topic' = 'MyTopic',
'properties.bootstrap.servers' ='LocalHost',
'properties.group.id' = 'MyGroypID',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'json'
);
但是当我尝试使用 select * from KafkaSink limit 10 读取数据时;它返回空记录
我试过运行类似的查询
select headers.col1 from a limit 10;
而且,我尝试在选择列部分创建具有不同结构的汇表:
...
`headers` STRING
...
...
`headers` MAP<STRING, STRING>
...
...
`headers` ROW(COL1 VARCHAR, COL2 VARCHAR...)
...
但它没有返回任何内容,但是当我从 kafka 集群中获取偏移列时,它给我带来了偏移量但没有标题。
有人可以解释我的错误吗?
我想用 flink sql cli 创建一个 kafka sink
【问题讨论】:
标签: apache-kafka apache-flink sqlclient