【问题标题】:ksql - creating a stream from a json arrayksql - 从 json 数组创建流
【发布时间】:2018-06-25 23:58:26
【问题描述】:

我的 kafka 主题正在以这种格式推送数据(来自 collectd):

[{"values":[100.000080140372],"dstypes":["derive"],"dsnames":["value"],"time":1529970061.145,"interval":10.000,"host":"k5.orch","plugin":"cpu","plugin_instance":"23","type":"cpu","type_instance":"idle","meta":{"network:received":true}}]

它是数组、整数和浮点数的组合......而且整个东西都在一个 json 数组中。结果,我有一段时间使用 ksql 来处理这些数据。

当我创建一个“默认”流时

create stream cd_temp with (kafka_topic='ctd_test', value_format='json');

我得到这个结果:

ksql> describe cd_temp;

 Field   | Type                      
-------------------------------------
 ROWTIME | BIGINT           (system) 
 ROWKEY  | VARCHAR(STRING)  (system) 
-------------------------------------

任何 select 都会返回 ROWTIME 和 ROWKEY 的 8 位十六进制值。

我花了一些时间试图提取 json 字段无济于事。我担心的是:

ksql> print 'ctd_test' from beginning;
Format:JSON
com.fasterxml.jackson.databind.node.ArrayNode cannot be cast to com.fasterxml.jackson.databind.node.ObjectNode

有没有可能ksql中不能使用这个topic?有没有一种技术可以解包外部数组以获取内部有趣的位?

【问题讨论】:

    标签: apache-kafka collectd ksqldb


    【解决方案1】:

    在撰写本文时(2018 年 6 月),KSQL 无法处理整个内容嵌入顶级数组的 JSON 消息。有一个github issue to track this。我建议在此问题上添加 +1 投票以提高其优先级。

    另外,我注意到您的 create stream 语句没有定义 json 消息的架构。虽然这在这种情况下无济于事,但其他 Json 输入格式需要它,即您创建的语句应该类似于:

    create stream cd_temp (values ARRAY<DOUBLE>, dstypes ARRAY<VARCHAR>, etc) with (kafka_topic='ctd_test', value_format='json');
    

    【讨论】:

    • 无赖。好的-ty。
    猜你喜欢
    • 2018-10-15
    • 1970-01-01
    • 2018-10-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-11-13
    相关资源
    最近更新 更多