【问题标题】:kafka connect - jdbc sink sql exceptionkafka connect - jdbc sink sql异常
【发布时间】:2021-09-09 23:00:05
【问题描述】:

我正在使用 confluent 社区版进行简单设置,该设置包括一个调用 Kafka 休息代理的休息客户端,然后使用提供的 jdbc 接收器连接器将该数据推送到一个 oracle 数据库中。

我注意到如果存在 sql 异常,例如如果实际数据的长度大于实际数据的长度(定义的列长度),则任务停止,如果我重新启动它,它会尝试插入错误的条目它停止了。它不会插入其他条目。

难道不是我可以记录错误条目并让任务继续插入其他数据的方法吗?

【问题讨论】:

  • 请阅读minimal reproducible example 并相应地增强您的问题。不要解释你的代码应该做什么,在你的问题中发布代码的重要部分。如果您的代码会按照您的意愿执行,那么您就不会在这里发布...
  • 没有写代码。 JDBC 接收器连接器是一个提供的连接器。我要问的是有没有可以在 confluent kafka 中完成的配置,即使出现异常也可以继续插入记录?

标签: apache-kafka-connect confluent-platform


【解决方案1】:

接收器目前无法忽略不良记录,但您可以使用kafka-consumer-groups 工具手动跳过它们:

kafka-consumer-groups \
    --bootstrap-server kafka:29092 \
    --group connect-sink_postgres_foo_00 \
    --reset-offsets \
    --topic foo \
    --to-offset 2 \
    --execute

欲了解更多信息see here

【讨论】:

  • 我在看你的文章confluent.io/blog/…,读完后我期待这个errors.tolerance 会解决这个问题。那么目前所有的 sink 连接器都不能忽略不良记录吗? @罗宾莫法特
  • 请参阅 Bartosz Wardziński 的回答 - 该设置仅适用于管道的某些阶段存在问题的情况。如果它在交付点,则由连接器来处理它,而 JDBC 接收器目前没有。
  • 哦,好的,明白了。我面临的具体问题是带有 kafka v1.1.1 的 s3 sink 连接器。收到格式错误的 json 时,它会停止连接器,我需要按照您的建议进行手动跳过。有没有办法升级到 kafka 2.0 并使用 errors.tolerance:all 来解决这个问题?或者您知道 s3 sink 连接器是否有类似弹性搜索 sink 连接器的功能?
【解决方案2】:

用于 Sink 连接器的 Kafka Connect 框架只能在以下过程中抛出异常时跳过有问题的记录: - 转换键或值 (Converter:toConnectData(...)) - 转换 (Transformation::apply)

为此,您可以使用 errors.tolerance 属性:

"errors.tolerance": "all"

还有一些附加属性,用于打印有关错误的详细信息:errors.log.enableerrors.log.include.messages。 原答案:Apache Kafka JDBC Connector - SerializationException: Unknown magic byte

如果在传递消息期间抛出异常,Sink 任务将被终止。 如果您需要处理与外部系统的通信错误(或其他错误),则必须为您的连接器添加支持

Jdbc Connector,当SQLException 被抛出时会重试但不跳过任何记录

重试次数和重试间隔由以下属性管理

  • max.retries 默认值 10
  • retry.backoff.ms 默认 3000

【讨论】:

  • 感谢 cmets。即使使用这些属性,它仍然会停止。 (ERROR WorkerSinkTask{id=test-sink-oracle-jdbc-autoincrement-0} 任务正在被杀死,直到手动重新启动才能恢复。)
  • 是否有解决方案,或者如果我说我需要编写自己的 jdbc 接收器连接器是否有意义?谢谢
  • 这对问题没有帮助,问题是有效数据被传递到接收器连接器,但传入的数据与数据库的预期列长度不匹配,因此连接器将进入错误状态并且无法传递有问题的消息。需要在接收器连接器本身的上游进行转换以检查列长度并引发异常,以便捕获errors.tolerance
【解决方案3】:

目前,具体来说,没有办法阻止这种情况导致接收器连接器出现故障。

但是,还有另一种方法可能值得研究。您可以在连接器上应用单消息转换 (SMT),检查传入列的长度,然后决定要么抛出异常,这将冒泡到 errors.tolerance 配置,或者返回 null,它将过滤完全记录下来。

由于这是一个接收器连接器,SMT 将在将记录传递到连接器之前应用,因此通过转换跳过的记录永远不会进入要同步的任务'd 到数据库中。

【讨论】:

    猜你喜欢
    • 2018-07-08
    • 2020-09-25
    • 2020-09-23
    • 2020-07-31
    • 2020-12-14
    • 2021-12-19
    • 1970-01-01
    • 2019-05-12
    • 1970-01-01
    相关资源
    最近更新 更多