【发布时间】:2020-03-14 10:20:45
【问题描述】:
我已经定义了一个流
CREATE STREAM QUOTE (quoteId VARCHAR,
counterPartyId VARCHAR)
WITH (KAFKA_TOPIC='quotes',
VALUE_FORMAT='JSON',
KEY='quoteId');
我想汇总到目前为止我得到了多少报价,以及该事件的最后一个报价 ID
CREATE TABLE KQUOTE AS
SELECT Max(CAST(quoteId as INT)) as quoteId,COUNT(*) AS COUNT
FROM QUOTE
GROUP BY 1;
将此表转为 Stream 因为我想知道聚合结果的历史记录。 (好像我必须使用底层主题来创建流。不能直接从表'KQUOTE'创建流)。
CREATE stream KQuoteStream (quoteId VARCHAR,
count INT)
WITH (KAFKA_TOPIC='KQUOTE',
VALUE_FORMAT='JSON',
KEY='quoteId');
我希望上面使用 RAWKEY quoteId,但事实并非如此。正如我们在下面看到的,RAWKEY 始终为 1(因为我们在创建表 kquote 时按常量 1 分组)。
ksql> select * from KQuoteStream;
1574121797111 | 1 | 806 | 20
1574121979291 | 1 | 807 | 21
尝试通过quoteId重新分区流以将RAWKEY更改为quoteId
CREATE stream KQuoteStreamByQuoteId
as
SELECT quoteId, COUNT FROM KQuoteStream PARTITION BY quoteId;
RAMKEY 仍然是常数 1
ksql> select * from KQuoteStreamByQuoteId;
1574121797111 | 1 | 806 | 20
1574121979291 | 1 | 807 | 21
顺便说一句:所有主题都具有与 1 相同的分区,以使事情变得更简单。 有人有什么想法吗?非常感谢 !
【问题讨论】:
标签: apache-kafka ksqldb