【问题标题】:Why record cannot get deleted using JDBC Sink Connector in Kafka Connect为什么无法在 Kafka Connect 中使用 JDBC Sink Connector 删除记录
【发布时间】:2020-07-26 04:10:21
【问题描述】:

我的接收器属性:

{
  "name": "jdbc-oracle",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "orders",
    "connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
    "connection.user": "ersin",
    "connection.password": "ersin!",
    "auto.create": "true",
    "delete.enabled": "true",
    "pk.mode": "record_key",
    "pk.fields": "id",
    "insert.mode": "upsert",
    "plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
    "name": "jdbc-oracle"
  },
  "tasks": [
    {
      "connector": "jdbc-oracle",
      "task": 0
    }
  ],
  "type": "sink"
}

我的 connect-avro-distributed.properties

bootstrap.servers=10.0.0.0:9092

group.id=connect-cluster

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.0.0:8081

config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

我这样发送数据:

./bin/kafka-avro-console-producer \
  --broker-list 10.0.0.0:9092 --topic orders \
  --property parse.key="true" \
  --property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
  --property key.separator="$" \
  --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":["null","int"],"default": null},{"name":"product","type": ["null","string"],"default": null}, {"name":"quantity", "type":  ["null","int"],"default": null}, {"name":"price","type":  ["null","int"],"default": null}]}' \
  --property schema.registry.url=http://10.0.0.0:8081

我可以像这样在 oracle 上插入或更新数据

{"id":3}${"id": {"int":2}, "product": {"string":"Yağız Gülbahar"}, "quantity": {"int":1071}, "price": {"int":1453}}

但是当我把它放在oracle上删除记录时,它不能删除数据,只是将列更新为空

{"id":2}${"id": null, "product": null , "quantity": null , "price": null }

我该如何解决这个问题?

提前致谢

【问题讨论】:

    标签: apache-kafka apache-kafka-connect kafka-producer-api


    【解决方案1】:

    其实你需要制作一个墓碑记录。在 Kafka 中使用 JDBC Sink Connector 删除的工作原理如下:

    连接器可以在使用数据库表时删除数据库表中的行 tombstone 记录,这是一个 Kafka 记录,具有非空键和 一个空值。默认情况下禁用此行为,这意味着任何 墓碑记录会导致连接器出现故障,使其 易于升级 JDBC 连接器并保持先前​​的行为。

    可以使用delete.enabled=true 启用删除,但仅当 pk.mode 设置为 record_key。这是因为从 表要求使用主键作为条件。

    启用删除模式不会影响insert.mode

    另请注意,这种记录只会在delete.retention.ms 毫秒后被删除,目前默认为 24​​ 小时。


    因此,请尝试在您的属性中减少此配置,看看它是否有效。为此,您需要运行以下命令:

     ./bin/kafka-topics.sh \
        --alter \
        --zookeeper localhost:2181 \
        --topic orders \
        --config retention.ms=100 
    

    现在一旦配置完成,您需要做的就是生成一条带有非空键和空值的消息。请注意,由于用户输入被解析为 UTF-8,因此不能使用 Kafka 控制台消费者来生成空记录。 因此,

    {"id":2}${"id": null, "product": null , "quantity": null , "price": null }
    

    不是实际的墓碑消息。

    不过,您可以使用 kafkacat,但它仅适用于 JSON 消息:

    # Produce a tombstone (a "delete" for compacted topics) for key 
    # "abc" by providing an empty message value which -Z interpretes as NULL:
    
    echo "abc:" | kafkacat -b mybroker -t mytopic -Z -K:
    

    但在您的情况下,这不起作用,因为您需要发送 Avro 消息。因此,我建议用你喜欢的语言编写一个非常简单的 Avro Producer,这样你就可以真正发送墓碑消息。

    【讨论】:

    猜你喜欢
    • 2020-06-19
    • 2021-11-20
    • 2022-07-12
    • 1970-01-01
    • 2020-03-21
    • 2021-07-15
    • 2021-12-21
    • 2020-09-23
    • 2020-09-25
    相关资源
    最近更新 更多