【问题标题】:Creating a KSQL Stream: How to extract value from complex json创建 KSQL 流:如何从复杂的 json 中提取值
【发布时间】:2018-10-28 18:42:29
【问题描述】:

我正在尝试在 Apache/KAFKA KSQL 中创建流 主题包含(有点复杂的 JSON)

{
  "agreement_id": "dd8afdbe-59cf-4272-b640-b14a24d8234c",
  "created_at": "2018-02-17 16:00:00.000Z",
  "id": "6db276a8-2efe-4495-9908-4d3fc4cc16fa",
  "event_type": "data",
  "total_charged_amount": {
    "tax_free_amount": null,
    "tax_amounts": [],
    "tax_included_amount": {
      "amount": 0.0241,
      "currency": "EUR"
    }
  }
  "used_service_units": [
    {
      "amount": 2412739,
      "currency": null,
      "unit_of_measure": "bytes"
    }
  ]
}

现在,对于像 event_type 和 created_at 这样简单的东西,创建流很容易。应该是这样的

CREATE STREAM tstream (event_type varchar, created_at varchar) WITH (kafka_topic='usage_events', value_format='json');

但现在我需要访问 used_service_units.... 我想在上面的JSON中提取“金额”

我该怎么做?

CREATE STREAM usage (event_type varchar,create_at varchar, used_service_units[0].amount int) WITH (kafka_topic='usage_events', value_format='json');

结果

line 1:78: mismatched input '[' expecting {'ADD', 'APPROXIMATE', ...

如果我改为创建这样的流

CREATE STREAM usage (event_type varchar,create_at varchar, used_service_units varchar) WITH (kafka_topic='usage_events', value_format='json');

然后像这样在流上执行 SQL SELECT

SELECT EXTRACTJSONFIELD(used_service_units,'$.amount') FROM usage;
SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage;
SELECT EXTRACTJSONFIELD(used_service_units,'$[0].amount') FROM usage;

这些替代方案都不起作用...

这个给了我

SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage;'

Code generation failed for SelectValueMapper

【问题讨论】:

    标签: json apache-kafka ksqldb


    【解决方案1】:

    似乎解决此问题的一种方法是将列数据类型设为数组 即

    CREATE STREAM usage (event_type varchar,created_at varchar, total_charged_amount varchar, used_service_units array<varchar> ) WITH (kafka_topic='usage_events', value_format='json');
    

    现在我可以做到以下几点:

    SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage
    

    【讨论】:

    • @RobinMoffatt,这种方法是否记录在某处?在confluent.io/product/ksql 的示例/食谱中找不到它,但这也是我的确切问题,所以也许这是一个更常见的要求。
    • @JochemSchulenklopper 很高兴写下来,但可以先更详细地了解具体问题 - 你能联系我吗? robin@confluent.io。谢谢。
    • @RobinMoffatt,如果 JSON 文档包含更多层、更多字典、数组中的更多项或字典中数组中的字典,那该部分需要发生什么?在我的例子中,我正在解析由公共 SaaS 提供商从 webhook 发送的事件数据,并且 JSON 文档非常平整。我猜想(也许希望)有一些查询语言来表达“多级路径”,比如用于 JSON 的 XPath 的变体,或者针对 KSQL 中的 JSON 文档的类似 CSS 的选择器......但我找不到KSQL 中的文档用于此类事情。 (指向文档的指针同样有价值。)
    • @JochemSchulenklopper 看到一篇关于温和嵌套 JSON confluent.io/stream-processing-cookbook/ksql-recipes/… 的关于融合的文章。它对我来说太简单了,但它可能会对你有所帮助。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-10-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多