【问题标题】:Clickhouse not consuming Kafka messages via complex Materialized ViewClickhouse 不通过复杂的物化视图消费 Kafka 消息
【发布时间】:2019-06-08 07:30:21
【问题描述】:

TLDR 摘要:Clickhouse Kafka 引擎,物化视图不适用于复杂的 select 语句。

加长版:

我正在尝试使用 JSONEachRow 通过其 Kafka 引擎将大量 JSON 数据点发送到 Clickhouse。但是物化视图不会正确使用流。 我有一个用 go 编写的 kafka 生产者,它从多个 tcp 流中获取数据并异步写入 kafka 队列。

数据如此流动:

TCP Sources -> Producer -> Kafka -> Clickhouse(Kafka Engine) -> Materialized View -> 目的地表

所有这些都有效,到目前为止一切顺利。

当我提高输入数据的速度(400,000 点/秒)时,我第一次遇到瓶颈,我的生产者无法足够快地写入 kafka 并且连接堆积。所以我希望尝试批量处理数据,但似乎 Clickhouse 无法将 json 数组作为输入 (https://clickhouse.yandex/docs/en/interfaces/formats/)

所以我想到了在源头对数据点进行批处理并在物化视图中转换消息的想法,所以以前我有很多单独的消息:

{ "t": 1547457441651445401,"i": "device_2","c": 20001,"v": 56454654}" }

我现在有一条消息,它是上述的倍数并被字符串化,点之间有换行符。

{"实时":"{\"t\":1547458266855015791,\"i\":\"device_2\",\"c\":20001,\"v\":56454654}\n{\ "t\":1547458266855015791,\"i\":\"device_2\",\"c\":20001,\"v\":56454654}"}

这里的目的是在物化视图的select语句中使用visitParamExtract将字符串解析并转换为多个值。

实体化视图:

CREATE MATERIALIZED VIEW ltdb_mat_view TO default.ltdb AS SELECT 
    visitParamExtractInt(x, 't') AS timestamp, 
    visitParamExtractString(x, 'i') AS device_id, 
    visitParamExtractInt(x, 'v') AS value FROM  (
    SELECT arrayJoin(*) AS x
    FROM 
    (
        SELECT splitByChar('\n', realtimes)
        FROM kafka_stream_realtimes 
    )  )

它似乎在做某事,因为当它运行时,kafka_stream_realtimes 被清除,我无法手动查询它,得到一个错误“DB::Exception: Failed to claim consumer:.”。但数据从未进入决赛桌。

总结:

  • 数据到达 clickhouse,它只是消失了,似乎永远不会 进入决赛桌。
  • 如果我放弃物化视图,我可以看到数据在 kafka_stream_realtimes
  • 如果我将物化视图查询作为 INSERT INTO 语句运行 紧随其后的是选择,它将从流中获取数据到 决赛桌。
  • 我意识到我可能只是将瓶颈推到了 clickhouse 这可能永远不会奏效,但我想通过这个 完整性

为了完整性: kafka_stream_realimes:

CREATE TABLE IF NOT EXISTS kafka_stream_realtimes(realtimes String)
  ENGINE = Kafka('kafka:9092', 'realtimes', 'groupTest', 'JSONEachRow');

ltdb:

CREATE TABLE default.ltdb (timestamp Int64,device_id String,value Int64) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(toDateTime(round(timestamp/1000000000)))
ORDER BY (device_id, value)
SETTINGS index_granularity=8192;

【问题讨论】:

    标签: apache-kafka materialized-views clickhouse


    【解决方案1】:

    但似乎 Clickhouse 无法将 json 数组作为输入

    似乎动机是在生产者端进行批量提交。为什么不将一堆 JSON 行分组并一次性提交呢? ClickHouse 将接收这些多行消息并为您解析它们。您可能还需要向 Kafka 引擎提供 kafka_row_delimiter 设置,因为大多数 kafka 生产者不会在每条消息的末尾附加行分隔符。

    所以一条消息变成了

    { "t": 1547457441651445401,"i": "device_2","c": 20001,"v": 56454654}
    { "t": 1547457441651445402,"i": "device_2","c": 20001,"v": 56454654}
    { "t": 1547457441651445403,"i": "device_2","c": 20001,"v": 56454654}
    ...
    

    【讨论】:

    • 是的,我将它们组合在一起。它应该如何格式化为点的 json 数组,或者只是带有分隔符的单个点?
    • 不,我的意思是将多行消息提交到kafka
    • 这似乎有效。我认为我通过对消息字符串进行编码而使事情变得过于复杂,而没有意识到 kafka 只会自行处理换行符。谢谢。
    • 虽然它解决了我的用例,但我认为问题仍然存在于为什么我的问题中的物化视图会将流消耗到一些空白中,但我接受你的回答。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-01-28
    • 2022-06-15
    • 1970-01-01
    • 1970-01-01
    • 2016-06-21
    • 2017-09-23
    相关资源
    最近更新 更多