【问题标题】:Consuming nested JSON message from Kafka with ClickHouse使用 ClickHouse 使用来自 Kafka 的嵌套 JSON 消息
【发布时间】:2020-12-11 03:51:12
【问题描述】:

如果是平面 JSON 文档,Clickhouse 绝对可以从 Kafka 读取 JSON 消息。

我们在 Clickhouse 中使用kafka_format = 'JSONEachRow' 表示这一点。

这是我们目前使用它的方式:

CREATE TABLE topic1_kafka
(
    ts Int64,
    event String,
    title String,
    msg String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1test.intra:9092,kafka2test.intra:9092,kafka3test.intra:9092',
kafka_topic_list = 'topic1', kafka_num_consumers = 1, kafka_group_name = 'ch1', 
kafka_format = 'JSONEachRow'

只要生产者将平面 JSON 发送到 topic1_kafka 就可以了。但并非所有生产者都发送平面 JSON,大多数应用程序会生成这样的嵌套 JSON 文档:

{
  "ts": 1598033988,
  "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663",
  "location": [39.920515, 32.853708],
  "stats": {
    "temp": 71.2,
    "total_memory": 32,
    "used_memory": 21.2
  }
}

很遗憾,上面的 JSON 文档与 JSONEachRow 不兼容,因此 ClickHouse 无法将 JSON 文档中的字段映射到表中的列。

有没有办法进行这种映射?

编辑:我们想将嵌套的 json 映射到这样的平面表:

CREATE TABLE topic1
(
    ts Int64,
    deviceId String,
    location_1 Float64,
    location_2 Float64,
    stats_temp Float64,
    stats_total_memory Float64,
    stats_used_memory Float64
) ENGINE = MergeTree()

【问题讨论】:

    标签: json apache-kafka clickhouse


    【解决方案1】:

    看起来曾经的方法是将“原始”数据作为字符串获取,然后在消费者物化视图中使用 JSON functions 处理每一行。

    WITH '{"ts": 1598033988, "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663", "location": [39.920515, 32.853708], "stats": { "temp": 71.2, "total_memory": 32, "used_memory": 21.2 }}' AS raw
    SELECT 
      JSONExtractUInt(raw, 'ts') AS ts,
      JSONExtractString(raw, 'deviceId') AS deviceId,
      arrayMap(x -> toFloat32(x), JSONExtractArrayRaw(raw, 'location')) AS location,
      JSONExtract(raw, 'stats', 'Tuple(temp Float64, total_memory Float64, used_memory Float64)') AS stats,
      stats.1 AS temp,
      stats.2 AS total_memory,
      stats.3 AS used_memory;
    
    /*
    ┌─────────ts─┬─deviceId─────────────────────────────┬─location──────────────┬─stats────────────────────────┬─temp─┬─total_memory─┬────────used_memory─┐
    │ 1598033988 │ cf060111-dbe6-4aa8-a2d0-d5aa17f45663 │ [39.920513,32.853706] │ (71.2,32,21.200000000000003) │ 71.2 │           32 │ 21.200000000000003 │
    └────────────┴──────────────────────────────────────┴───────────────────────┴──────────────────────────────┴──────┴──────────────┴────────────────────┘
    */
    

    备注:对于带浮点数的数字,应使用类型 Float64 而不是 Float32(参见相关CH Issue 13962)。


    使用需要更改 JSON 架构的标准数据类型:

    1. stats 表示为 Tuple
    CREATE TABLE test_tuple_field
    (
        ts Int64,
        deviceId String,
        location Array(Float32),
        stats Tuple(Float32, Float32, Float32)
    ) ENGINE = MergeTree()
    ORDER BY ts;
    
    
    INSERT INTO test_tuple_field FORMAT JSONEachRow 
    { "ts": 1598033988, "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663", "location": [39.920515, 32.853708], "stats": [71.2, 32, 21.2]};
    
    1. stats 表示为 Nested Structure
    CREATE TABLE test_nested_field
    (
        ts Int64,
        deviceId String,
        location Array(Float32),
        stats Nested (temp Float32, total_memory Float32, used_memory Float32)
    ) ENGINE = MergeTree()
    ORDER BY ts;
    
    
    SET input_format_import_nested_json=1;
    INSERT INTO test_nested_field FORMAT JSONEachRow 
    { "ts": 1598033988, "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663", "location": [39.920515, 32.853708], "stats": { "temp": [71.2], "total_memory": [32], "used_memory": [21.2] }};
    

    查看相关答案ClickHouse JSON parse exception: Cannot parse input: expected ',' before

    【讨论】:

    • 使用 Kafka Engine 创建 clickhouse 表时,我应该在 kafka_format 中指定什么格式?
    • 我会尝试测试它的 TSVRaw、TSV、CSV。
    • 问题是,当使用 CSV 时,如果消息包含逗号,这肯定是,然后 clickhouse 认为它是不同的列。因此无法解析消息。同样的事情也适用于 TSV,消息不能包含制表符。
    • 可能是不可能的 ;( ,尝试检查每种格式(完整列表:select * from system.formats
    • virtual columns;在Altinity Kafka FAQ 中提到了 _raw_message - 这可能会有所帮助(如果它存在,在源代码中我看不到它的引用 github.com/ClickHouse/ClickHouse/blob/…)。
    猜你喜欢
    • 2019-04-25
    • 2021-12-04
    • 1970-01-01
    • 2018-03-19
    • 1970-01-01
    • 1970-01-01
    • 2020-02-22
    • 2017-03-01
    • 2020-11-27
    相关资源
    最近更新 更多