【问题标题】:KSQL left join is not workingKSQL 左连接不起作用
【发布时间】:2018-03-16 10:05:54
【问题描述】:

我是 stackoverflow 的新手,所以如果我有什么问题,请在此处发布此问题。

我已经尝试寻找答案,但在网站上找不到与 KSQL JOIN 相关的问题,所以我发布了这个。我尝试了不同的方法来运行这个查询,但我一直得到空指针异常,所以在这里发布。

我有 2 个 kafka avro 主题交易和费用,但数据有很多空格来清除我创建了以下主题和带有修剪数据的表格。 DEAL_STREAMEXPENSE_TABLE

ksql> describe EXPENSE_TABLE;

结果:

Field      | Type
ROWTIME    | BIGINT (system)
ROWKEY     | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)

ksql> describe deal_stream;

结果:

Field      | Type
ROWTIME    | BIGINT (system)
ROWKEY     | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)

当我执行以下查询时,它给了我空指针异常。 我尝试了以下查询。

1:

ksql> CREATE STREAM deal_expense_new AS SELECT td.KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0;

2:

ksql> CREATE STREAM deal_expense_new AS SELECT td.KSQL_COL_0 AS KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0;

3:

CREATE STREAM deal_expense_trimmed AS SELECT td.KSQL_COL_0 AS KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0 where td.KSQL_COL_0 IS NOT NULL;

错误:

> Message Stream created and running ksql> Exception in thread
> "ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-115"
> java.lang.NullPointerException at
> io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
> at
> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
> at
> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
> at
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
> Exception in thread
> "ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-116"
> java.lang.NullPointerException at
> io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
> at
> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
> at
> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
> at
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
> Exception in thread
> "ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-113"
> java.lang.NullPointerException at
> io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
> at
> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
> at
> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
> at
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
> Exception in thread
> "ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-114"
> java.lang.NullPointerException at
> io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
> at
> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
> at
> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
> at
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)

【问题讨论】:

  • 只运行SELECT(而不是CREATE STREAM前缀)是否有效?
  • 您能否添加您的 CREATE TABLECREATE STREAM 语句,以及来自每个基础主题的数据样本?
  • 鉴于您遇到异常,您最好将其作为问题记录在 KSQL github 存储库中:github.com/confluentinc/ksql/issues/new - 如果这是用户错误(并且不清楚),那么至少错误应该更好。如果它是一个错误,那么它需要修复:)
  • @RobinMoffatt 它也不适用于选择。我尝试使用 NULL 和空值过滤字段,然后它也不起作用。我正在使用 2018 年 2 月的 ksql 0.5 版本
  • @RobinMoffatt 我已经添加了这个问题。 github.com/confluentinc/ksql/issues/968#issuecomment-373779244

标签: apache-kafka confluent-platform ksqldb


【解决方案1】:

这个bug应该在最新的master中修复了。该修复程序将包含在下一个月度版本中。这是 github 问题:https://github.com/confluentinc/ksql/issues/521

【讨论】:

  • 嗨 Rohan,我使用的是 2018 年 2 月发布的 ksql 0.5,您共享的链接已在 2017 年 12 月解决,所以我认为它已经包含在我正在使用的版本中。
猜你喜欢
  • 2018-10-26
  • 2015-12-02
  • 2012-09-10
  • 2019-02-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多