【发布时间】:2020-05-24 03:58:02
【问题描述】:
我有两个要加入的主题,然后查询加入以获取最新结果。我从here 关注了Create a ksqlDB Table from a ksqlDB Stream 上的文档。
这就是我的工作:
CREATE TABLE CATALOGUE_TABLE
(title STRING)
WITH (KAFKA_TOPIC='catalogue-topic-test',
VALUE_FORMAT='AVRO');
CREATE TABLE SCHEDULE_TABLE
(fromInstant STRING,
toInstant STRING)
WITH (KAFKA_TOPIC='schedule-topic-test',
VALUE_FORMAT='AVRO');
请放心,这两个基础主题都有其所有条目的键。然后我像这样加入他们:
CREATE TABLE MYTABLE AS
SELECT c.title, s.fromInstant, s.toInstant
FROM CATALOGUE_TABLE c
INNER JOIN SCHEDULE_TABLE s
ON s.ROWKEY = c.ROWKEY
EMIT CHANGES;
我不确定我最终会得到什么。不管是什么,我都可以运行以下命令:
select * from MYTABLE EMIT CHANGES;
我可以看到上面的所有更新,包括所有重复项。它基本上是一条溪流。现在,如果我运行以下命令:
select * from MYTABLE WHERE ROWKEY='12';
要获得 id=12 的最后更新,我得到:
表“MYTABLE”未实现。有关查询类型的信息,请参阅 https://cnfl.io/queries。如果你...
并且输出的其余部分被截断,所以我看不出它想说什么。我的猜测是我在 MYTABLE 中做错了什么。
我想我错过了一个 groupBy,它应该是负责删除所有具有重复 ID 的条目的部分,但我无法弄清楚我需要在那里放什么以及我是否应该在 MYTABLE 中这样做仅级别,或者是否应该在所有三个表上完成。
【问题讨论】:
标签: apache-kafka confluent-platform ksqldb