【问题标题】:Flink SQL-CLi: bring header recordsFlink SQL-CLi:带头记录
【发布时间】:2023-02-11 01:37:40
【问题描述】:

我是 flink sql cli 的新手,我想从我的 kafka 集群创建一个接收器。

我已经阅读了文档,据我所知,de headers 是一个 map<STRING, BYTE> 类型,通过它们是所有重要信息。

当我使用 de sql-cli 时,我尝试按照以下命令创建一个接收器表:

CREATE TABLE KafkaSink (
`headers` MAP<STRING, BYTES> METADATA     
) WITH (
      'connector' = 'kafka',
      'topic' = 'MyTopic', 
      'properties.bootstrap.servers' ='LocalHost',
      'properties.group.id' = 'MyGroypID',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'json'
    );

但是当我尝试使用 select * from KafkaSink limit 10 读取数据时;它返回空记录

我试过运行类似的查询

select headers.col1 from a limit 10;

而且,我尝试在选择列部分创建具有不同结构的汇表:

...
`headers` STRING
...
...
`headers` MAP<STRING, STRING>
...
...
`headers` ROW(COL1 VARCHAR, COL2 VARCHAR...)
...

但它没有返回任何内容,但是当我从 kafka 集群中获取偏移列时,它给我带来了偏移量但没有标题。

有人可以解释我的错误吗?

我想用 flink sql cli 创建一个 kafka sink

【问题讨论】:

    标签: apache-kafka apache-flink sqlclient


    【解决方案1】:

    好的,正如我所看到的,当我尝试更改为

    'format' = 'debezium-json'
    

    我可以以更好的方式看到 json。 我遵循 json 模式,在我的例子中是

    {
    "data": {...},
    "metadata":{...}
    }
    

    因此,我没有带标题,而是带了我需要的所有列的数据,数据作为字符串,列作为示例 数据.col1, 数据.col2

    为了查看记录,只需使用

    select 
          json_value(data, '$.Col1') as Col1 
    from Table;
    

    有用!

    【讨论】:

      猜你喜欢
      • 2021-05-09
      • 1970-01-01
      • 1970-01-01
      • 2021-10-23
      • 2018-06-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多