【问题标题】:KSQL Join more than two streamsKSQL 加入两个以上的流
【发布时间】:2019-04-04 11:38:15
【问题描述】:

是否可以在 KSQL 中加入两个以上的流/表?

例子:

我有三个流:

CREATE STREAM StreamA (id BIGINT, message VARCHAR) WITH 
(KAFKA_TOPIC='TopicA', VALUE_FORMAT='DELIMITED');
CREATE STREAM StreamB (id BIGINT, aid BIGINT, message VARCHAR) WITH . 
(KAFKA_TOPIC='TopicB', VALUE_FORMAT='DELIMITED');
CREATE STREAM StreamC (id BIGINT, bid BIGINT, message VARCHAR) WITH 
(KAFKA_TOPIC='TopicC', VALUE_FORMAT='DELIMITED');

我尝试通过加入这三个流来创建另一个流:

CREATE STREAM ABCStream AS SELECT * FROM StreamA a JOIN 
StreamB b ON b.aid = a.id JOIN StreamC c WITHIN 1 HOURS ON 
c.bid = b.id; 

我得到以下异常:

mismatched input 'JOIN' expecting ';'  
Caused by: org.antlr.v4.runtime.InputMismatchException

【问题讨论】:

  • 您是否尝试过在 github 问题上搜索多个连接?上次我检查时,至少有两个问题要求此功能,因此尚不支持
  • 感谢@cricket_007。我去看看。
  • 欢迎在github.com/confluentinc/ksql/issues/1891 上对多个加入的功能请求进行投票。

标签: join stream apache-kafka ksqldb


【解决方案1】:

不可以,在最高 v5.0 的 KSQL 中,每个查询只能连接两个。您需要菊花链式查询,如下所示:

中间流:

CREATE STREAM ABStream AS \
   SELECT * \
     FROM StreamA a \
     JOIN StreamB b \
          ON b.aid = a.id;

多连接流

CREATE STREAM ABCStream AS \
   SELECT * \
     FROM ABStream AB \
     JOIN StreamC c \
          WITHIN 1 HOURS \
          ON c.bid = AB.b_id;

【讨论】:

  • 我一直在根据This Post 的答案尝试类似的方法,但遇到了不兼容分区的问题:由于分区数量不匹配,因此无法将 ABSTREAM 与 STREAMC 连接。 ABSTREAM 分区 = 4; STREAMC partitions = 1。请重新分区其中一个,以便分区数匹配。
  • 这似乎有点笨拙,但我通过从 StreamC 创建另一个流来使其工作,该流具有与中间流相同数量的分区。 CREATE STREAM StreamCPartitioned WITH (PARTITIONS=4) AS SELECT * FROM StreamC PARTITION BY id;然后将该流与中间流加入:CREATE STREAM ABCStream AS \ SELECT * \ FROM ABStream AB \ JOIN StreamCPartitioned c \ WITHIN 1 HOURS \ ON c.bid = AB.b_id;
  • 是的,我也被这个咬了。我的源主题(本例中的 A 和 B)各有 1 个分区,KSQL 默认为它创建的主题(本例中的 C)设置 4 个分区,这使得它们无法加入其他源分区(例如 D)。在定义流时,您必须在 KSQL 的 WITH 部分中指定分区数,以确保它们都具有相同的分区数。
猜你喜欢
  • 1970-01-01
  • 2020-11-04
  • 1970-01-01
  • 1970-01-01
  • 2018-10-15
  • 2018-11-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多