【发布时间】:2017-12-05 21:13:51
【问题描述】:
我正在尝试使用 Kafka Sink 连接器将数据批量发送到 NOSQL 数据库。我正在关注https://kafka.apache.org/documentation/#connect 文档,并且对必须在何处实现发送记录的逻辑感到困惑。请帮助我了解如何在内部处理记录以及必须使用 Put() 或 Flush() 来批量处理记录。
【问题讨论】:
标签: apache-kafka apache-kafka-connect
我正在尝试使用 Kafka Sink 连接器将数据批量发送到 NOSQL 数据库。我正在关注https://kafka.apache.org/documentation/#connect 文档,并且对必须在何处实现发送记录的逻辑感到困惑。请帮助我了解如何在内部处理记录以及必须使用 Put() 或 Flush() 来批量处理记录。
【问题讨论】:
标签: apache-kafka apache-kafka-connect
当 Kafka Connect 工作程序运行接收器任务时,它将使用分配给该任务的主题分区中的消息。当它这样做时,它通过put(Collection<SinkRecord>) 方法重复地将一批消息传递给接收器任务。只要连接器及其任务正在运行,这将继续。
Kafka Connect 还会定期记录 sink 任务的进度,即每个主题分区上最近处理的消息的偏移量。这称为提交偏移量,这样做是为了如果连接器意外和不干净地停止,Kafka Connect 知道任务应该在每个主题分区中的哪个位置恢复处理消息。但就在 Kafka Connect 将偏移量写入 Kafka 之前,Kafka Connect 工作人员通过flush(...) 方法为接收器连接器提供了在此阶段执行工作的机会。
一个特定的接收器连接器可能不需要做任何事情(如果put(...) 完成了所有工作),或者它可能会利用这个机会将已经通过put(...) 处理的所有消息提交到数据存储。例如Confluent's JDBC sink connector使用事务(可以通过连接器的消费者设置控制其大小)写入通过put(...)方法传递的每批消息,因此flush(...)方法不需要做任何事情.另一方面,Confluent's ElasticSearch sink connector 只是为一系列 put(...) 方法累积所有消息,并且仅在 flush(...) 期间将它们写入 Elasticsearch。
为源连接器和接收器连接器提交偏移的频率由连接器的offset.flush.interval.ms 配置属性控制。默认设置是每 60 秒提交一次偏移量,这种频率足以提高性能并减少开销,但如果连接器任务意外终止,这种频率足以限制潜在的重新处理量。请注意,当连接器正常关闭或遇到异常时,Kafka Connect 将始终有机会提交偏移量。只有当 Kafka Connect 工作人员意外终止时,它才可能没有机会提交偏移量来识别已处理的消息。因此,只有在此类故障后重新启动后,连接器才有可能重新处理它在故障之前所做的一些消息。因为消息至少会被看到一次,所以消息应该是幂等的。在确定此设置的适当值时,请考虑所有这些加上连接器的行为。
查看Confluent documentation for Kafka Connect 以及开源接收器连接器以获取更多示例和详细信息。
【讨论】:
System.exit(0) 停止分配给连接器的任务,这将主要是正常关闭。因此,在这种情况下,即因为调用了System.exit(0),是否会提交偏移量,直到看到异常的地方或直到上一批(即直到当前的一批记录),因为调用了System.exit(0)?