【问题标题】:Kafka Connect Debezium with ms sql server. Key extract configuration problemKafka 将 Debezium 与 ms sql 服务器连接起来。密钥提取配置问题
【发布时间】:2021-01-25 13:36:51
【问题描述】:

我正在使用 Debezium 和 kafka connect。我需要从下面的有效负载中提取“listid”,并能够将其分配给 kafka 消息键。使用我的连接器文件中的配置,我无法提取该值。感谢您为解决此问题提供的任何帮助。

{
    "before": null,
    "after": {
        "listid": 19,
        "billingid": "0",
        "userid": "test",

连接器属性

key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false


value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

transforms=unwrap,extract
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.extractkey.type==org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractkey.field=listid


key.converter.schemas.enable=false
value.converter.schemas.enable=true

Sooooo 我正在继续实验,并希望分享对问题的更好解释以及我所实验的内容。

跟随 Robin 的博客https://www.confluent.fr/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/

1- 从终端运行 kafka-avro 消费者

{"LIST_ID":10058}

价值

{"before":null,"after":{"test.dbo.test.Value":{"LIST_ID":10058,"billingid 
etc...

2- 我正在尝试获取值为 10058 的键

3 - 这就是我的转换配置

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
transforms=createKey,extractInt,
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=LIST_ID
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=LIST_ID

我在针对 mssql 服务器的管道中使用 debezium 连接器。

【问题讨论】:

  • 我确实评论了confluent.fr/blog/…。当我运行
  • 我确实评论了confluent.fr/blog/…。我修改了道具以支持文章配置。运行 kafka avro 消费者时,密钥现在显示为 {"_LIST_ID":24892}。有什么建议吗?

标签: apache-kafka avro apache-kafka-connect confluent-platform debezium


【解决方案1】:

看起来像是一个错字(extract vs extractkey):

transforms=unwrap,extractkey
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.extractkey.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractkey.field=listid

【讨论】:

  • 我进行了调整,但我仍然得到 {"LIST_ID":4524} 的键值诗句 4525。感谢您的回复。
  • 尝试设置transforms.extractkey.field=LIST_ID
  • 谢谢亚历克斯。仍然没有运气:-(
  • 请再次显示消息密钥中的确切内容(LIST_ID_LIST_ID
  • 嗨,Alex,我不断收到返回的密钥 {"LIST_ID":1535}。我无法仅提取整数值 1535 :-(
猜你喜欢
  • 2021-06-12
  • 2018-11-22
  • 2019-04-10
  • 2019-01-17
  • 1970-01-01
  • 2019-02-16
  • 2022-01-03
  • 2022-08-22
  • 1970-01-01
相关资源
最近更新 更多