【问题标题】:Cannot create stream in Ksql无法在 Ksql 中创建流
【发布时间】:2020-05-14 00:23:54
【问题描述】:

我有如下流,我想从中创建另一个流。我正在尝试以下命令,但出现以下错误。我错过了什么吗?

ksql> create stream down_devices_stream as select * from fakedata119 where deviceProperties['status']='false';
Failed to generate code for SqlPredicate.filterExpression: (FAKEDATA119.DEVICEPROPERTIES['status'] = 'false')schema:org.apache.kafka.connect.data.SchemaBuilder@6e18dbbfisWindowedKey:false
Caused by: Line 1, Column 180: Operator "<=" not allowed on reference operands

ksql> select * from fakedata119;
1529505497087 | null | 19 | visibility sensors | Wed Jun 20 16:38:17 CEST 2018 | {visibility=74, status=true}
1529505498087 | null | 7 | fans | Wed Jun 20 16:38:18 CEST 2018 | {temperature=44, rotationSense=1, status=false, frequency=49}
1529505499088 | null | 28 | air quality monitors | Wed Jun 20 16:38:19 CEST 2018 | {coPpm=257, status=false, Co2Ppm=134}
1529505500089 | null | 4 | fans | Wed Jun 20 16:38:20 CEST 2018 | {temperature=42, rotationSense=1, status=true, frequency=51}
1529505501089 | null | 23 | air quality monitors | Wed Jun 20 16:38:21 CEST 2018 | {coPpm=158, status=true, Co2Ppm=215}

sql> describe fakedata119;

     Field            | Type                                 
    ---------------------------------------------------------
     ROWTIME          | BIGINT           (system)            
     ROWKEY           | VARCHAR(STRING)  (system)            
     DEVICEID         | INTEGER                              
     CATEGORY         | VARCHAR(STRING)                      
     TIMESTAMP        | VARCHAR(STRING)                      
     DEVICEPROPERTIES | MAP[VARCHAR(STRING),VARCHAR(STRING)] 

【问题讨论】:

  • 你使用的是什么版本的 KSQL?
  • 我使用的是版本:4.1.1
  • 我们会在这里尝试并提供帮助,但我可以看到您对 KSQL 有几个问题,也许交互式聊天也会有所帮助 - 这里有一个 Slack 社区,有一个专门的#ksql 频道:@ 987654321@

标签: apache-kafka ksqldb


【解决方案1】:

没有看到你的输入数据,我猜它看起来像这样:

{
  "id": "a42",
  "category": "foo",
  "timestamp": "2018-06-21 10:04:57 BST",
  "deviceID": 42,
  "deviceProperties": {
    "status": "false",
    "foo": "bar"
  }
}

如果是这样,您最好使用EXTRACTJSONFIELD 来访问嵌套值并构建谓词。

CREATE STREAM test (Id VARCHAR, category VARCHAR, timeStamp VARCHAR, \
                    deviceID INTEGER, deviceProperties VARCHAR) \
WITH (KAFKA_TOPIC='test_map2', VALUE_FORMAT='JSON');


ksql> SELECT EXTRACTJSONFIELD(DEVICEPROPERTIES,'$.status') AS STATUS FROM fakeData223;
false

ksql> SELECT *  FROM fakeData223 \
      WHERE EXTRACTJSONFIELD(DEVICEPROPERTIES,'$.status')='false';
1529572405759 | null | a42 | foo | 2018-06-21 10:04:57 BST | 42 | {"status":"false","foo":"bar"}

您发现的错误我已在此处记录为要跟踪的错误:https://github.com/confluentinc/ksql/issues/1474

【讨论】:

  • 输入数据看起来完全一样。当我点击:'ksql> SELECT EXTRACTJSONFIELD(DEVICEPROPERTIES,'$.status') AS STATUS FROM fakeData223;'它给出了null。 extractjsonfield 可以与 avro 一起使用吗?
  • 不,它仅适用于 JSON。当前不支持嵌套 Avro,但将在下一个版本 5.0 中提供。
【解决方案2】:

我添加了一个测试来覆盖这个用例:

https://github.com/confluentinc/ksql/pull/1476/files

有趣的是,这会传递给我们的 master 和即将推出的 5.0 分支,但在 4.1 上失败了。

所以...看起来这是您正在使用的版本的问题,但好消息是它已在即将发布的版本中修复。另外,您现在可以使用 Robin 在上面的解决方法。

查询愉快!

安迪

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-10-27
    • 1970-01-01
    • 2018-10-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多