【发布时间】:2020-05-26 05:21:10
【问题描述】:
我尝试内连接 ktable 和 ktable。
a 和 b 表:
create table a_table(r string, time string) with (Kafka_topic='a', Key='r', Value_format='json');
create table b_table(r string, time string) with (Kafka_topic='b', Key='r', Value_format='json');
通过r键内联a和b表:
create table ab_table as select * from a_table inner join b_table on a_table.r = b_table.r emit changes;
1) 用例。以慢速模式插入新数据
ksql> insert into a_table values('1','1', 'timeA');
--wait 5 second;
ksql> insert into b_table values('1','1', 'timeB');
select * from ab_table emit changes; --return 1 行结果
print AB_TABLE from beginning; --return 1 行结果
2) 用例。以快速模式插入新数据
ksql> insert into a_table values('2','2', 'timeA');insert into b_table values('2','2', 'timeB');
ksql> print a from beginning;
Key format: KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"R":"2","TIME":"timeA"}
ksql> print b from beginning;
Key format: KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"R":"2","TIME":"timeB"}
select * from ab_table emit changes; --return 1 行结果
print AB_TABLE from beginning; --return 2 行结果
rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"A_TABLE_ROWTIME":1590252246657,"A_TABLE_ROWKEY":"2","A_TABLE_R":"2","A_TABLE_TIME":"timeA","B_TABLE_ROWTIME":1590252246657,"B_TABLE_ROWKEY":"2","
B_TABLE_R":"2","B_TABLE_TIME":"timeB"}
rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"A_TABLE_ROWTIME":1590252246680,"A_TABLE_ROWKEY":"2","A_TABLE_R":"2","A_TABLE_TIME":"timeA","B_TABLE_ROWTIME":1590252246680,"B_TABLE_ROWKEY":"2","
B_TABLE_R":"2","B_TABLE_TIME":"timeB"}
什么是地狱?为什么在第二个用例中我有两个重复的主题行?
更新关于主题\表格的信息
name : B_TABLE
Field | Type
-------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
R | VARCHAR(STRING)
TIME | VARCHAR(STRING)
name : A_TABLE
Field | Type
-------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
R | VARCHAR(STRING)
TIME | VARCHAR(STRING)
Name : AB_TABLE
Field | Type
---------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
A_TABLE_ROWTIME | BIGINT
A_TABLE_ROWKEY | VARCHAR(STRING)
A_TABLE_R | VARCHAR(STRING)
A_TABLE_TIME | VARCHAR(STRING)
B_TABLE_ROWTIME | BIGINT
B_TABLE_ROWKEY | VARCHAR(STRING)
B_TABLE_R | VARCHAR(STRING)
B_TABLE_TIME | VARCHAR(STRING)
topic "a" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "b" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "AB_TABLE" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
【问题讨论】:
-
要了解发生了什么将有助于了解两个源表/主题中的内容。此外,您正在使用
PRINT x FROM BEGINNING,它将从主题的开头打印,即包括历史行,并使用SELECT * FORM x EMIT CHANGES,它将仅发出新行,即没有历史记录。 -
我添加了关于主题\表格的信息。我做了两个相等的情况,在“快速情况”中,我在主题中得到两行。没有准备日期和历史。任何人都可以重复: up clear cluster , no data , no history no nothing , up clear cluster 后应用 case 1 , no data , no history no nothing 并应用 case 2。你会看到不同的结果
-
嘿Padavan,我要的是运行案例2后两个源主题的内容。即
PRINT a FROM BEGINNING和PRINT b FROM BEGINNING输出什么? -
@AndrewCoates 从头开始添加 print a;从头开始打印 b;插入数据后的第二个用例
标签: apache-kafka ksqldb