【问题标题】:why ADD COLUMN to kafka table is not supported in Clickhouse为什么 Clickhouse 不支持向 kafka 表添加列
【发布时间】:2021-03-28 07:25:55
【问题描述】:

我在 ClickHouse 中向 Kafka 队列添加列时遇到问题。

我已经用命令创建了一个表

CREATE TABLE my_db.my_queue ON CLUSTER my_cluster
(
    `ts` String,
    .... some other columns
)
ENGINE = Kafka()
SETTINGS 
kafka_broker_list = '172.21.0.3:9092', 
kafka_topic_list = 'my_topic', 
kafka_group_name = 'my_group', 
kafka_format = 'JSONEachRow', 
kafka_row_delimiter = '\n', 
kafka_num_consumers = 1, 
kafka_skip_broken_messages = 10;

然后尝试添加一列

ALTER TABLE my_db.my_queue  ON CLUSTER my_cluster ADD COLUMN new_column String;

但出现错误

SQL Error [48]: ClickHouse exception, code: 48, host: 172.21.0.4, port: 8123; Code: 48,
e.displayText() = DB::Exception: There was an error on [clickhouse-server:9000]: Code: 48,
e.displayText() = DB::Exception: Alter of type 'ADD COLUMN' is not supported by storage Kafka
(version 20.11.4.13 (official build)) (version 20.11.4.13 (official build))

我不熟悉 ClickHouse 和任何分析数据库。 所以我想知道为什么不支持它?或者我应该以其他方式添加一列?

【问题讨论】:

    标签: apache-kafka clickhouse


    【解决方案1】:

    从 Kafka 队列中支持具有不同模式的消息的一种方法是存储原始 JSON 消息,如下所示:

    CREATE TABLE my_db.my_queue ON CLUSTER my_cluster
    (
        `message` String
    )
    ENGINE = Kafka()
    SETTINGS 
    kafka_broker_list = '172.21.0.3:9092', 
    kafka_topic_list = 'my_topic', 
    kafka_group_name = 'my_group', 
    kafka_format = 'JSONAsString', 
    kafka_row_delimiter = '\n', 
    kafka_num_consumers = 1, 
    kafka_skip_broken_messages = 10;
    

    JSONAsString 格式将原始 JSON 存储在 message 列中。通过这种方式,您可以从 Kafka 表中通过物化视图和JSON functions 对每个新行进行后处理。

    例如:

    CREATE TABLE my_db.post_processed_data (
      `ts` String,
      `another_column` String
    )
    -- use a proper engine
    Engine=Log;
    
    CREATE MATERIALIZED VIEW my_db.my_queue_mv TO my_db.post_processed_data 
    AS
    SELECT 
        JSONExtractString(message, 'ts') AS ts,
        JSONExtractString(message, 'another_column') AS another_column
    FROM my_db.my_queue;
    

    如果 Kafka 队列的 JSON 模式有任何变化,您可以做出相应的反应,在 post_processed_data 表中执行 ALTER TABLE .. ADD COLUMN .. 并相应地更新物化视图。这样 Kafka 表就会保持原样。

    【讨论】:

      【解决方案2】:

      kafka 引擎不支持。 只需删除表并使用新架构创建即可。

      它不支持alter,因为KafkaEngine的作者不需要它。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-02-06
        • 2020-06-19
        • 2021-12-12
        相关资源
        最近更新 更多