【问题标题】:Kafka Mongodb sink connector - update documentKafka Mongodb 接收器连接器 - 更新文档
【发布时间】:2020-03-23 18:58:21
【问题描述】:

我们一直致力于开发 kafka 生态系统。让我顺其自然

Source(SQLServer) -> Debezium(CDC) -> Kafka Broker -> Kafka Stream(处理、连接等)-> Mongo 连接器 -> Mongo DB

现在我们进入最后一步,我们正在将处理后的数据插入 mongo dB,但现在我们需要更新数据而不是插入。

我们能否从 mongo sink 连接器获得 upsert(插入/更新)功能。至于现在我明白这是不可能的。

【问题讨论】:

  • 您是否尝试过查看该连接器的 github 问题?
  • 是的,我已经尝试并且仍在尝试。
  • 您能说明一下您使用的是哪个 Mongo 接收器连接器吗? (我不知道有任何支持更新或删除),但只是好奇
  • 我正在使用 com.mongodb.kafka.connect.MongoSinkConnector 连接器类
  • 如果你想要更新和/或删除,你必须在这一行周围添加逻辑来相应地处理github.com/mongodb/mongo-kafka/blob/master/src/main/java/com/…

标签: mongodb apache-kafka apache-kafka-connect


【解决方案1】:

请点击提供的链接,它包含有关 kafka mongo 连接器的所有信息。我已经成功实现了 upsert 功能。您只需要仔细阅读此文档即可。

Kafka Connector - Mongodb

【讨论】:

【解决方案2】:

实际上这是一个 upsert,如果 ${uniqueFieldToUpdateOn} 不在 mongo 中,我们要插入,或者如果它存在则更新如下。

根据您的用例更新/替换,有两种主要方法可以对集合中的数据更改进行建模,如下所述:

更新

以下配置状态:

  1. 更新 ${uniqueFieldToUpdateOn} 为您要建模更新的记录所特有的字段。
  2. AllowList(白名单)此字段与PartialValueStrategy 一起使用允许为 id 策略投影自定义值字段。
  3. UpdateOneBusinessKeyTimestampStrategy 意味着只有上面声明的唯一字段引用的一个文档将被更新(最新时间戳获胜)。
"document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy", 
"document.id.strategy.partial.value.projection.list":"${uniqueFieldToUpdateOn}",
"document.id.strategy.partial.value.projection.type":"AllowList",
"writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy" 

替换

注意这个模型是替换而不是更新,但可能仍然有用

以下配置状态:

  1. 将 ${uniqueFieldToUpdateOn} 替换为您要为其建模的记录所特有的字段。
  2. AllowList(白名单)此字段与PartialValueStrategy 一起使用允许为 id 策略投影自定义值字段。
  3. ReplaceOneBusinessKeyStrategy 表示仅替换上面声明的唯一字段引用的一个文档。
"document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy", 
"document.id.strategy.partial.value.projection.list":"${uniqueFieldToUpdateOn}",
"document.id.strategy.partial.value.projection.type":"AllowList",
"writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy"

【讨论】:

  • 我使用了建议的相同配置,但我无法成功地使连接器运行失败我收到错误org.apache.kafka.connect.errors.DataException: Could not convert key 456 into a BsonDocument.\n\tat com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:157)\n\tat com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.clone(LazyBsonDocument.java:146)\n\tat com.mongodb.kafka.connect.sink.converter.SinkDocument.clone(SinkDocument.java:45)\n\tat 是否有一个示例连接器可以用作参考
  • 你能告诉我你的连接器配置,也许我可以帮助你吗?
  • 因为我不能一次性发布完整的连接器,所以我把它分成部分第一部分是 ** "name":"tag-update2","config":{"connector.class" :"com.mongodb.kafka.connect.MongoSinkConnector","tasks.max":"1","connection.uri":"mongodb://xx.xx.xx:27017","database":"staff" ,"collection":"user","topics":"user_update_new1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"io.confluent.connect. json.JsonSchemaConverter" **
  • 另一半是 "value.converter.schema.registry.url": "xx.xxx.xx.xx:8081", "document.id.strategy":"com.mongodb.kafka.connect. sink.processor.id.strategy.PartialValueStrategy", "document.id.strategy.partial.value.projection.type":"user_id", "document.id.strategy.partial.value.projection.type":"AllowList" , "writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy" } } 抱歉无法将其作为一个整体发送
  • 首先你有 document.id.strategy.partial.value.projection.type 两次,你应该有 document.id.strategy.partial.value.projection.list 和“user_id”,也你能告诉我你正在发送的消息的有效负载,包括密钥吗?看起来像转换密钥的问题
猜你喜欢
  • 2019-12-28
  • 2020-04-27
  • 2021-10-24
  • 2019-07-22
  • 2019-06-17
  • 2021-09-18
  • 2020-01-11
  • 2021-01-08
  • 2019-11-03
相关资源
最近更新 更多