【问题标题】:org.apache.kafka.common.errors.SerializationException: Failed to deserialize keyorg.apache.kafka.common.errors.SerializationException:无法反序列化密钥
【发布时间】:2021-12-27 04:47:37
【问题描述】:

我正在尝试使用 KAFKA TOPIC 在 KSQL 中创建一个表,该表已成功创建。当我尝试从表中选择数据时,出现错误

org.apache.kafka.common.errors.SerializationException: Failed to deserialize key from topic: ah_topics_2639_sales. Can't convert type. sourceType: ObjectNode, requiredType: BIGINT, path: $ Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: ObjectNode, requiredType: BIGINT

主题中的数据:

Key format: JSON or SESSION(KAFKA_INT) or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING Value format: JSON or KAFKA_STRING rowtime: 2021/11/15 14:03:55.013 Z, key: {"MESG_SEQ_NO":1015}, value: {"MESG_SEQ_NO":1015,"MESSAGENO":"1015","MESSAGECREATIONDATETIME":1554336000000,"OPCO_GLN":"L","OPCO_COUNTRYCODE":"L","STORENO":5809,"SRVPNO":320,"ENDDATETIMETRANSACTION":1554336000000,"DATETIMESENTSTORE":1554336000000,"MESG_ACTION":"I","MESG_IND_COMPLETED":"N","MESG_IND_SENTTOBROKER":"N","MESG_ARTCLE_SALES_SEQ_NO":1015,"NASANUMBER":584623,"LKARNUMBER":null,"AMOUNTCE":1,"AMOUNTWEIGHT":null,"VOLUME":null,"AMOUNTCEDISCOUNT":null,"AMOUNTCEPRICED":null,"AMOUNTWEIGHTDISCOUNT":null,"AMOUNTWEIGHTPRICED":null,"SALEVALUE":null,"DISCOUNTVALUE":null,"PROMOTIONVALUE":null}, partition: 0

表结构

ksql> CREATE TABLE MSG_2639_SALES_TRNS_STORES_TABLE_S( MESG_SEQ_NO INT(4,0) PRIMARY KEY, MESSAGENO STRING, MESSAGECREATIONDATETIME TIMESTAMP,
>OPCO_GLN STRING, OPCO_COUNTRYCODE STRING, STORENO INT(4,0), SRVPNO INT(4,0),
>ENDDATETIMETRANSACTION TIMESTAMP, DATETIMESENTSTORE TIMESTAMP, MESG_ACTION STRING, MESG_IND_COMPLETED STRING, MESG_IND_SENTTOBROKER STRING)
>with (kafka_topic='ah_topics_2639_sales' , key_format='JSON',  value_format='JSON');
 Message
---------------
 Table created
---------------


Selecting Data :

ksql> select * from MSG_2639_SALES_TRNS_STORES_TABLE_S emit changes;
+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|MESG_SEQ_NO |MESSAGENO   |MESSAGECREAT|OPCO_GLN    |OPCO_COUNTRY|STORENO     |SRVPNO      |ENDDATETIMET|DATETIMESENT|MESG_ACTION |MESG_IND_COM|MESG_IND_SEN|
|            |            |IONDATETIME |            |CODE        |            |            |RANSACTION  |STORE       |            |PLETED      |TTOBROKER   |
+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+

完成获取任何数据,我在日志中看到错误为 org.apache.kafka.common.errors.SerializationException: Failed to deserialize key from topic: ah_topics_2639_sales. Can't convert type. sourceType: ObjectNode, requiredType: BIGINT, path: $ Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: ObjectNode, requiredType: BIGINT

【问题讨论】:

  • 欢迎来到 Stackoverflow。您的问题很难按原样阅读。我建议重新格式化内容以使错误和结构更易于阅读。此外,包括您尝试过的一些内容会有所帮助。
  • 欢迎使用帖子旁边的复选标记接受以下答案

标签: apache-kafka ksqldb


【解决方案1】:

您有MESG_SEQ_NO INT(4,0) PRIMARY KEY,但是key: {"MESG_SEQ_NO":1015} 是一个结构,而不是一个整数(JSON 字段的名称不会自动提取和匹配)

因此,正如错误所说Can't convert type. sourceType: ObjectNode, requiredType: BIGINT

我上次检查过,ksql 中的键理想情况下应该是原始类型而不是结构化对象。如果它是结构化的,您需要明确说明它 - https://docs.ksqldb.io/en/latest/reference/serialization/#deserialization-of-single-keys

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-11-26
    • 1970-01-01
    • 2019-10-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多