【发布时间】:2021-09-09 18:09:21
【问题描述】:
我有一个 Flink 1.11 作业,它使用来自 Kafka 主题的消息,键入它们,过滤它们(keyBy 后跟自定义 ProcessFunction),并通过 JDBC 接收器将它们保存到数据库中(如此处所述:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html)
Kafka 消费者使用以下选项进行初始化:
properties.setProperty("auto.offset.reset", "earliest")
kafkaConsumer = new FlinkKafkaConsumer(topic, deserializer, properties)
kafkaConsumer.setStartFromGroupOffsets()
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
在集群上启用了检查点。
我想要实现的是保证将所有过滤后的数据保存到数据库中,即使数据库停机了,比如说,6 小时,或者在保存到数据库时出现编程错误,并且工作需要更新、重新部署和重新启动。
为此,Kafka 偏移量的任何检查点都应该意味着
- 从 Kafka 读取的数据处于 Flink operator 状态,等待过滤/传递到 sink,并将作为 Flink operator checkpointing 的一部分进行检查点,或者
- 从 Kafka 读取的数据已经提交到数据库中。
在查看 JdbcSink 的实现时,我发现它并没有真正保留任何将被检查点/恢复的内部状态 - 相反,它的检查点是对数据库的写出。现在,如果在检查点期间此写入失败,并且 Kafka 偏移量确实被保存,我将处于“丢失”数据的情况 - 来自 Kafka 的后续读取将从已提交的偏移量和任何数据在运行时恢复 db write failed 现在不再从 Kafka 读取,也不再在 db 中。
所以有没有办法在完整管道(Kafka -> Flink -> DB)无法执行时停止推进 Kafka 偏移量 - 或者这里的解决方案(在 1.13 之前的世界中)可能是创建我自己的实现GenericJdbcSinkFunction 将保持一些 ValueState 直到数据库写入成功?
【问题讨论】:
-
不确定这对保存到数据库有什么帮助?无论我是按窗口分组还是使用窗口,最后我都需要将结果窗口保存到数据库中,这就是我的问题所在。
-
@kozyr Flink 1.13 带来了对 JDBC connector 的仅一次支持(目前 MySQL 不支持)。这意味着,如果您使用仅支持一次的 Kafka 和 JDBC,则在检查点期间提交的偏移量应该被中止,以防其中一个操作员失败。 More on that here
-
@YuvalItzchakov 不幸的是,我使用的是 1.11,因为我使用 Kinesis Data Analytics 运行 Flink,最新版本是 1.11。
-
我不太明白你的问题:你是否使用检查点?
setCommitOffsetsOnCheckpoints暗示如此,但您在问题中明确声明您没有使用检查点。
标签: jdbc apache-kafka apache-flink flink-streaming