【发布时间】:2019-06-08 07:30:21
【问题描述】:
TLDR 摘要:Clickhouse Kafka 引擎,物化视图不适用于复杂的 select 语句。
加长版:
我正在尝试使用 JSONEachRow 通过其 Kafka 引擎将大量 JSON 数据点发送到 Clickhouse。但是物化视图不会正确使用流。 我有一个用 go 编写的 kafka 生产者,它从多个 tcp 流中获取数据并异步写入 kafka 队列。
数据如此流动:
TCP Sources -> Producer -> Kafka -> Clickhouse(Kafka Engine) -> Materialized View -> 目的地表
所有这些都有效,到目前为止一切顺利。
当我提高输入数据的速度(400,000 点/秒)时,我第一次遇到瓶颈,我的生产者无法足够快地写入 kafka 并且连接堆积。所以我希望尝试批量处理数据,但似乎 Clickhouse 无法将 json 数组作为输入 (https://clickhouse.yandex/docs/en/interfaces/formats/)
所以我想到了在源头对数据点进行批处理并在物化视图中转换消息的想法,所以以前我有很多单独的消息:
{ "t": 1547457441651445401,"i": "device_2","c": 20001,"v": 56454654}" }
我现在有一条消息,它是上述的倍数并被字符串化,点之间有换行符。
{"实时":"{\"t\":1547458266855015791,\"i\":\"device_2\",\"c\":20001,\"v\":56454654}\n{\ "t\":1547458266855015791,\"i\":\"device_2\",\"c\":20001,\"v\":56454654}"}
这里的目的是在物化视图的select语句中使用visitParamExtract将字符串解析并转换为多个值。
实体化视图:
CREATE MATERIALIZED VIEW ltdb_mat_view TO default.ltdb AS SELECT
visitParamExtractInt(x, 't') AS timestamp,
visitParamExtractString(x, 'i') AS device_id,
visitParamExtractInt(x, 'v') AS value FROM (
SELECT arrayJoin(*) AS x
FROM
(
SELECT splitByChar('\n', realtimes)
FROM kafka_stream_realtimes
) )
它似乎在做某事,因为当它运行时,kafka_stream_realtimes 被清除,我无法手动查询它,得到一个错误“DB::Exception: Failed to claim consumer:.”。但数据从未进入决赛桌。
总结:
- 数据到达 clickhouse,它只是消失了,似乎永远不会 进入决赛桌。
- 如果我放弃物化视图,我可以看到数据在 kafka_stream_realtimes
- 如果我将物化视图查询作为 INSERT INTO 语句运行 紧随其后的是选择,它将从流中获取数据到 决赛桌。
- 我意识到我可能只是将瓶颈推到了 clickhouse 这可能永远不会奏效,但我想通过这个 完整性
为了完整性: kafka_stream_realimes:
CREATE TABLE IF NOT EXISTS kafka_stream_realtimes(realtimes String)
ENGINE = Kafka('kafka:9092', 'realtimes', 'groupTest', 'JSONEachRow');
ltdb:
CREATE TABLE default.ltdb (timestamp Int64,device_id String,value Int64) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(toDateTime(round(timestamp/1000000000)))
ORDER BY (device_id, value)
SETTINGS index_granularity=8192;
【问题讨论】:
标签: apache-kafka materialized-views clickhouse