【发布时间】:2021-12-27 04:47:37
【问题描述】:
我正在尝试使用 KAFKA TOPIC 在 KSQL 中创建一个表,该表已成功创建。当我尝试从表中选择数据时,出现错误
org.apache.kafka.common.errors.SerializationException: Failed to deserialize key from topic: ah_topics_2639_sales. Can't convert type. sourceType: ObjectNode, requiredType: BIGINT, path: $ Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: ObjectNode, requiredType: BIGINT
主题中的数据:
Key format: JSON or SESSION(KAFKA_INT) or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING Value format: JSON or KAFKA_STRING rowtime: 2021/11/15 14:03:55.013 Z, key: {"MESG_SEQ_NO":1015}, value: {"MESG_SEQ_NO":1015,"MESSAGENO":"1015","MESSAGECREATIONDATETIME":1554336000000,"OPCO_GLN":"L","OPCO_COUNTRYCODE":"L","STORENO":5809,"SRVPNO":320,"ENDDATETIMETRANSACTION":1554336000000,"DATETIMESENTSTORE":1554336000000,"MESG_ACTION":"I","MESG_IND_COMPLETED":"N","MESG_IND_SENTTOBROKER":"N","MESG_ARTCLE_SALES_SEQ_NO":1015,"NASANUMBER":584623,"LKARNUMBER":null,"AMOUNTCE":1,"AMOUNTWEIGHT":null,"VOLUME":null,"AMOUNTCEDISCOUNT":null,"AMOUNTCEPRICED":null,"AMOUNTWEIGHTDISCOUNT":null,"AMOUNTWEIGHTPRICED":null,"SALEVALUE":null,"DISCOUNTVALUE":null,"PROMOTIONVALUE":null}, partition: 0
表结构
ksql> CREATE TABLE MSG_2639_SALES_TRNS_STORES_TABLE_S( MESG_SEQ_NO INT(4,0) PRIMARY KEY, MESSAGENO STRING, MESSAGECREATIONDATETIME TIMESTAMP,
>OPCO_GLN STRING, OPCO_COUNTRYCODE STRING, STORENO INT(4,0), SRVPNO INT(4,0),
>ENDDATETIMETRANSACTION TIMESTAMP, DATETIMESENTSTORE TIMESTAMP, MESG_ACTION STRING, MESG_IND_COMPLETED STRING, MESG_IND_SENTTOBROKER STRING)
>with (kafka_topic='ah_topics_2639_sales' , key_format='JSON', value_format='JSON');
Message
---------------
Table created
---------------
Selecting Data :
ksql> select * from MSG_2639_SALES_TRNS_STORES_TABLE_S emit changes;
+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|MESG_SEQ_NO |MESSAGENO |MESSAGECREAT|OPCO_GLN |OPCO_COUNTRY|STORENO |SRVPNO |ENDDATETIMET|DATETIMESENT|MESG_ACTION |MESG_IND_COM|MESG_IND_SEN|
| | |IONDATETIME | |CODE | | |RANSACTION |STORE | |PLETED |TTOBROKER |
+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
完成获取任何数据,我在日志中看到错误为
org.apache.kafka.common.errors.SerializationException: Failed to deserialize key from topic: ah_topics_2639_sales. Can't convert type. sourceType: ObjectNode, requiredType: BIGINT, path: $ Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: ObjectNode, requiredType: BIGINT
【问题讨论】:
-
欢迎来到 Stackoverflow。您的问题很难按原样阅读。我建议重新格式化内容以使错误和结构更易于阅读。此外,包括您尝试过的一些内容会有所帮助。
-
欢迎使用帖子旁边的复选标记接受以下答案
标签: apache-kafka ksqldb