【问题标题】:Flink, Kafka and JDBC sinkFlink、Kafka 和 JDBC 接收器
【发布时间】:2021-09-09 18:09:21
【问题描述】:

我有一个 Flink 1.11 作业,它使用来自 Kafka 主题的消息,键入它们,过滤它们(keyBy 后跟自定义 ProcessFunction),并通过 JDBC 接收器将它们保存到数据库中(如此处所述:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html

Kafka 消费者使用以下选项进行初始化:

properties.setProperty("auto.offset.reset", "earliest")
kafkaConsumer = new FlinkKafkaConsumer(topic, deserializer, properties)
kafkaConsumer.setStartFromGroupOffsets()
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)

在集群上启用了检查点。

我想要实现的是保证将所有过滤后的数据保存到数据库中,即使数据库停机了,比如说,6 小时,或者在保存到数据库时出现编程错误,并且工作需要更新、重新部署和重新启动。

为此,Kafka 偏移量的任何检查点都应该意味着

  1. 从 Kafka 读取的数据处于 Flink operator 状态,等待过滤/传递到 sink,并将作为 Flink operator checkpointing 的一部分进行检查点,或者
  2. 从 Kafka 读取的数据已经提交到数据库中。

在查看 JdbcSink 的实现时,我发现它并没有真正保留任何将被检查点/恢复的内部状态 - 相反,它的检查点是对数据库的写出。现在,如果在检查点期间此写入失败,并且 Kafka 偏移量确实被保存,我将处于“丢失”数据的情况 - 来自 Kafka 的后续读取将从已提交的偏移量和任何数据在运行时恢复 db write failed 现在不再从 Kafka 读取,也不再在 db 中。

所以有没有办法在完整管道(Kafka -> Flink -> DB)无法执行时停止推进 Kafka 偏移量 - 或者这里的解决方案(在 1.13 之前的世界中)可能是创建我自己的实现GenericJdbcSinkFunction 将保持一些 ValueState 直到数据库写入成功?

【问题讨论】:

  • 不确定这对保存到数据库有什么帮助?无论我是按窗口分组还是使用窗口,最后我都需要将结果窗口保存到数据库中,这就是我的问题所在。
  • @kozyr Flink 1.13 带来了对 JDBC connector 的仅一次支持(目前 MySQL 不支持)。这意味着,如果您使用仅支持一次的 Kafka 和 JDBC,则在检查点期间提交的偏移量应该被中止,以防其中一个操作员失败。 More on that here
  • @YuvalItzchakov 不幸的是,我使用的是 1.11,因为我使用 Kinesis Data Analytics 运行 Flink,最新版本是 1.11。
  • 我不太明白你的问题:你是否使用检查点? setCommitOffsetsOnCheckpoints 暗示如此,但您在问题中明确声明您没有使用检查点。

标签: jdbc apache-kafka apache-flink flink-streaming


【解决方案1】:

我可以看到 3 个选项:

  1. 在您的 Flink 版本中试用 JDBC 1.13 连接器。它很有可能会奏效。
  2. 如果不能立即生效,请检查是否可以将其反向移植到 1.11。不应该有太多变化。
  3. 通过扩展TwoPhaseCommitSinkFunction 或使用CheckpointedFunctionCheckpointListener 实现您自己的SinkFunction,编写您自己的两阶段提交接收器。基本上,您在成功的检查点后创建一个新事务并使用notifyCheckpointCompleted 提交它。

【讨论】:

    猜你喜欢
    • 2021-05-07
    • 2019-06-17
    • 2020-01-11
    • 2020-05-13
    • 2019-11-15
    • 2021-04-15
    • 2018-02-06
    • 2019-06-20
    • 2021-03-01
    相关资源
    最近更新 更多