【问题标题】:MongoDB Kafka Sink Connector doesn't process the RenameByRegex processorMongoDB Kafka Sink 连接器不处理 RenameByRegex 处理器
【发布时间】:2020-07-26 04:46:08
【问题描述】:

我需要从 Kafka Topic 和 Sink 监听事件到 MongoDB 中的集合。该消息包含一个带有 id 属性的嵌套对象,如上例所示。

{
    "testId": 1,
    "foo": "bar",
    "foos": [{ "id":"aaaaqqqq-rrrrr" }]
}

我正在尝试使用 RegExp 将此嵌套 id 重命名为 _id

{
        "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
        "topics": "test",
        "connection.uri": "mongodb://mongo:27017",
        "database": "test_db",
        "collection": "test",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
        "value.projection.list":"testId",
        "value.projection.type": "whitelist",
        "post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder, com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex",
        "field.renamer.regexp": "[{\"regexp\":\"\b(id)\b\", \"pattern\":\"\b(id)\b\",\"replace\":\"_id\"}]"
    }

配置/验证的结果是500 Internal Server Error,带有该消息:

{
    "error_code": 500,
    "message": null
}

我遗漏了什么或者是一个问题?

【问题讨论】:

    标签: mongodb apache-kafka apache-kafka-connect mongodb-kafka-connector


    【解决方案1】:

    我想你想要的只是Kafka Connect Single Message Transform (SMT),更准确地说是ReplaceField

    过滤或重命名结构或映射中的字段。


    以下将id字段名称替换为_id

    "transforms": "RenameField",
    "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.RenameField.renames": "id:_id"
    

    在您的情况下,在应用上述转换之前,您可能还想Flattenfoos

    "transforms": "flatten",
    "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
    "transforms.flatten.delimiter": "."
    

    最后应用转换来重命名字段:

    "transforms": "RenameField",
    "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.RenameField.renames": "foos.id:foos._id"
    

    【讨论】:

      猜你喜欢
      • 2021-05-05
      • 2020-05-26
      • 2022-09-24
      • 1970-01-01
      • 2019-10-20
      • 2018-04-20
      • 2018-07-10
      • 2018-12-01
      • 2019-02-17
      相关资源
      最近更新 更多