【问题标题】:Kafka KSQL Re partition and rekey problemKafka KSQL Re partition and rekey问题
【发布时间】:2020-03-14 10:20:45
【问题描述】:

我已经定义了一个流

CREATE STREAM QUOTE (quoteId VARCHAR,
                      counterPartyId VARCHAR)
        WITH (KAFKA_TOPIC='quotes',
              VALUE_FORMAT='JSON',
              KEY='quoteId');

我想汇总到目前为止我得到了多少报价,以及该事件的最后一个报价 ID

CREATE TABLE KQUOTE AS
    SELECT Max(CAST(quoteId as INT)) as quoteId,COUNT(*) AS COUNT
        FROM QUOTE
        GROUP BY 1;

将此表转为 Stream 因为我想知道聚合结果的历史记录。 (好像我必须使用底层主题来创建流。不能直接从表'KQUOTE'创建流)。

CREATE stream KQuoteStream (quoteId VARCHAR,
                      count INT)
        WITH (KAFKA_TOPIC='KQUOTE',
              VALUE_FORMAT='JSON',
              KEY='quoteId');

我希望上面使用 RAWKEY quoteId,但事实并非如此。正如我们在下面看到的,RAWKEY 始终为 1(因为我们在创建表 kquote 时按常量 1 分组)。

ksql> select * from KQuoteStream;
1574121797111 | 1 | 806 | 20
1574121979291 | 1 | 807 | 21

尝试通过quoteId重新分区流以将RAWKEY更改为quoteId

CREATE stream KQuoteStreamByQuoteId
        as
    SELECT quoteId, COUNT FROM KQuoteStream PARTITION BY quoteId;

RAMKEY 仍然是常数 1

ksql> select * from KQuoteStreamByQuoteId;
1574121797111 | 1 | 806 | 20
1574121979291 | 1 | 807 | 21

顺便说一句:所有主题都具有与 1 相同的分区,以使事情变得更简单。 有人有什么想法吗?非常感谢 !

【问题讨论】:

    标签: apache-kafka ksqldb


    【解决方案1】:

    这绝对是您发现的一个有趣的错误!

    这里的诀窍是要了解WITH(KEY='quoteId') 实际上并没有 任何事情,这是对ksqlDB 的一个提示,即关键字段恰好也存在于quoteId 的值中。然后,当您PARTITION BY quoteId 时,它认为您正在按行键进行分区,所以它什么也不做!我同意这种行为非常不直观,这就是为什么我们计划删除 WITH(KEY=...) 功能以支持更直观的功能(待定)。

    与此同时,解决方法应该是在创建KQuoteStream 时指定密钥,这样KSQL 就不会优化掉重新分区。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-05-12
      • 2023-03-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-08-17
      • 2019-04-28
      • 2020-11-04
      相关资源
      最近更新 更多