【问题标题】:Put() vs Flush() in Kafka Connector Sink TaskKafka 连接器接收器任务中的 Put() 与 Flush()
【发布时间】:2017-12-05 21:13:51
【问题描述】:

我正在尝试使用 Kafka Sink 连接器将数据批量发送到 NOSQL 数据库。我正在关注https://kafka.apache.org/documentation/#connect 文档,并且对必须在何处实现发送记录的逻辑感到困惑。请帮助我了解如何在内部处理记录以及必须使用 Put() 或 Flush() 来批量处理记录。

【问题讨论】:

    标签: apache-kafka apache-kafka-connect


    【解决方案1】:

    当 Kafka Connect 工作程序运行接收器任务时,它将使用分配给该任务的主题分区中的消息。当它这样做时,它通过put(Collection<SinkRecord>) 方法重复地将一批消息传递给接收器任务。只要连接器及其任务正在运行,这将继续。

    Kafka Connect 还会定期记录 sink 任务的进度,即每个主题分区上最近处理的消息的偏移量。这称为提交偏移量,这样做是为了如果连接器意外和不干净地停止,Kafka Connect 知道任务应该在每个主题分区中的哪个位置恢复处理消息。但就在 Kafka Connect 将偏移量写入 Kafka 之前,Kafka Connect 工作人员通过flush(...) 方法为接收器连接器提供了在此阶段执行工作的机会。

    一个特定的接收器连接器可能不需要做任何事情(如果put(...) 完成了所有工作),或者它可能会利用这个机会将已经通过put(...) 处理的所有消息提交到数据存储。例如Confluent's JDBC sink connector使用事务(可以通过连接器的消费者设置控制其大小)写入通过put(...)方法传递的每批消息,因此flush(...)方法不需要做任何事情.另一方面,Confluent's ElasticSearch sink connector 只是为一系列 put(...) 方法累积所有消息,并且仅在 flush(...) 期间将它们写入 Elasticsearch。

    为源连接器和接收器连接器提交偏移的频率由连接器的offset.flush.interval.ms 配置属性控制。默认设置是每 60 秒提交一次偏移量,这种频率足以提高性能并减少开销,但如果连接器任务意外终止,这种频率足以限制潜在的重新处理量。请注意,当连接器正常关闭或遇到异常时,Kafka Connect 将始终有机会提交偏移量。只有当 Kafka Connect 工作人员意外终止时,它才可能没有机会提交偏移量来识别已处理的消息。因此,只有在此类故障后重新启动后,连接器才有可能重新处理它在故障之前所做的一些消息。因为消息至少会被看到一次,所以消息应该是幂等的。在确定此设置的适当值时,请考虑所有这些加上连接器的行为。

    查看Confluent documentation for Kafka Connect 以及开源接收器连接器以获取更多示例和详细信息。

    【讨论】:

    • 你好@Randall Hauch,上面是一个很好的答案,可以理解 connect 如何提交偏移量。在我工作的应用程序中,如果出现异常,我们会重试多次(可配置值),然后使用System.exit(0) 停止分配给连接器的任务,这将主要是正常关闭。因此,在这种情况下,即因为调用了System.exit(0),是否会提交偏移量,直到看到异常的地方或直到上一批(即直到当前的一批记录),因为调用了System.exit(0)
    猜你喜欢
    • 2016-12-23
    • 2018-03-01
    • 2021-02-26
    • 2019-06-17
    • 2021-09-18
    • 2020-01-11
    • 2021-01-08
    • 2021-03-01
    • 1970-01-01
    相关资源
    最近更新 更多