【问题标题】:KSQL KTabke+KTable Join dublicate result anomalyKSQL KTable+KTable Join 重复结果异常
【发布时间】:2020-05-26 05:21:10
【问题描述】:

我尝试内连接 ktable 和 ktable。

ab 表:

 create table a_table(r string, time string) with (Kafka_topic='a', Key='r', Value_format='json');
 create table b_table(r string, time string) with (Kafka_topic='b', Key='r', Value_format='json');

通过r键内联ab表:

create table ab_table as select * from a_table inner join b_table on a_table.r = b_table.r emit changes;

1) 用例。以慢速模式插入新数据

 ksql> insert into a_table values('1','1', 'timeA');
 --wait 5 second;
 ksql> insert into b_table values('1','1', 'timeB');

select * from ab_table emit changes; --return 1 行结果

print AB_TABLE from beginning; --return 1 行结果

2) 用例。以快速模式插入新数据

 ksql> insert into a_table values('2','2', 'timeA');insert into b_table values('2','2', 'timeB');

  ksql>  print a from beginning;
    Key format: KAFKA_STRING
    Value format: JSON or KAFKA_STRING
    rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"R":"2","TIME":"timeA"}

   ksql> print b from beginning;
    Key format: KAFKA_STRING
    Value format: JSON or KAFKA_STRING
    rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"R":"2","TIME":"timeB"}

select * from ab_table emit changes; --return 1 行结果

print AB_TABLE from beginning; --return 2 行结果

rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"A_TABLE_ROWTIME":1590252246657,"A_TABLE_ROWKEY":"2","A_TABLE_R":"2","A_TABLE_TIME":"timeA","B_TABLE_ROWTIME":1590252246657,"B_TABLE_ROWKEY":"2","
B_TABLE_R":"2","B_TABLE_TIME":"timeB"}
rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"A_TABLE_ROWTIME":1590252246680,"A_TABLE_ROWKEY":"2","A_TABLE_R":"2","A_TABLE_TIME":"timeA","B_TABLE_ROWTIME":1590252246680,"B_TABLE_ROWKEY":"2","
B_TABLE_R":"2","B_TABLE_TIME":"timeB"}

什么是地狱?为什么在第二个用例中我有两个重复的主题行?

更新关于主题\表格的信息

    name                 : B_TABLE
     Field   | Type                      
    -------------------------------------
     ROWTIME | BIGINT           (system) 
     ROWKEY  | VARCHAR(STRING)  (system) 
     R       | VARCHAR(STRING)           
     TIME    | VARCHAR(STRING)  

    name                 : A_TABLE
     Field   | Type                      
    -------------------------------------
     ROWTIME | BIGINT           (system) 
     ROWKEY  | VARCHAR(STRING)  (system) 
     R       | VARCHAR(STRING)           
     TIME    | VARCHAR(STRING)  

    Name                 : AB_TABLE
     Field           | Type                      
    ---------------------------------------------
     ROWTIME         | BIGINT           (system) 
     ROWKEY          | VARCHAR(STRING)  (system) 
     A_TABLE_ROWTIME | BIGINT                    
     A_TABLE_ROWKEY  | VARCHAR(STRING)           
     A_TABLE_R       | VARCHAR(STRING)           
     A_TABLE_TIME    | VARCHAR(STRING)           
     B_TABLE_ROWTIME | BIGINT                    
     B_TABLE_ROWKEY  | VARCHAR(STRING)           
     B_TABLE_R       | VARCHAR(STRING)           
     B_TABLE_TIME    | VARCHAR(STRING)     


topic "a" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1

topic "b" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1

 topic "AB_TABLE" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1

【问题讨论】:

  • 要了解发生了什么将有助于了解两个源表/主题中的内容。此外,您正在使用PRINT x FROM BEGINNING,它将从主题的开头打印,即包括历史行,并使用SELECT * FORM x EMIT CHANGES,它将仅发出新行,即没有历史记录。
  • 我添加了关于主题\表格的信息。我做了两个相等的情况,在“快速情况”中,我在主题中得到两行。没有准备日期和历史。任何人都可以重复: up clear cluster , no data , no history no nothing , up clear cluster 后应用 case 1 , no ​​data , no ​​history no nothing 并应用 case 2。你会看到不同的结果
  • 嘿Padavan,我要的是运行案例2后两个源主题的内容。即PRINT a FROM BEGINNINGPRINT b FROM BEGINNING输出什么?
  • @AndrewCoates 从头开始​​添加 print a;从头开始打印 b;插入数据后的第二个用例

标签: apache-kafka ksqldb


【解决方案1】:

弄清楚这里发生了什么。这与缓冲有关。

默认情况下,ksqlDB 缓冲来自两个源表更改日志的输入,即主题ab。 (这种缓冲对于将报告对同一键更改的所有几条消息压缩到单个输出中很有用)。

当同时对两个表进行更新时,缓冲意味着在刷新缓冲时两个表都被填充。由于表-表连接的两边都会产生一个输出,两个输入事件相互匹配,从而导致两个输出到主题AB_TABLE

PRINT AB_TABLE 正确显示了更改日志中的两行。

但是,SELECT * FROM AB_TABLE EMIT CHANGES 也在缓冲输入,这种缓冲将两个更改压缩为单个输出。

可以通过cache.max.bytes.buffering 控制缓冲。例如,您可以使用以下命令关闭缓冲:

-- turn off buffering:
SET 'cache.max.bytes.buffering' = 0;

我在运行上述代码后再次运行了您的示例,AB_TABLE 主题中只有一行。

有人可能会争辩说,无论缓冲是否正确,表-表连接的正确输出都只是单行。毕竟,处理的第一行不应该找到匹配项,而第二行应该。如果你对此有强烈的感觉,那么请在 Github 中提出一个错误。

【讨论】:

  • 哇,谢谢@AndrewCoates。我尝试在 8 小时内解决这个问题......令人惊讶的是,一个代码所做的不同结果......它的错误!我前几天开始学习英语,如果你愿意,可以在 github 中创建严重的错误。
  • @RobinMoffatt 也可以看到。我知道你在融合中工作
  • 这个现有的 Jira 涵盖了根本原因:issues.apache.org/jira/browse/KAFKA-4609 但是,请注意,如果更改日志被具体化,结果将是正确的。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-11-04
  • 1970-01-01
  • 2020-01-30
  • 2020-10-17
  • 1970-01-01
  • 2020-08-11
  • 1970-01-01
相关资源
最近更新 更多