【问题标题】:Extend Kafka Connect SinkTask and start consuming from given offsets扩展 Kafka Connect SinkTask 并从给定的偏移量开始消费
【发布时间】:2016-11-20 11:05:40
【问题描述】:

我想扩展 SinkTask 以创建我自己的接收器连接器。

如果我在刷新期间保存了偏移量,并且下次启动接收器连接器时我想从保存的偏移量中继续读取,那么正确的方法是什么?

我尝试使用被覆盖的initialize(SinkTaskContext context)SinkTaskContext 来分配我自己的偏移量:

@Override
public void initialize(SinkTaskContext context) {
  HashMap<TopicPartition, Long> offsetMap = new HashMap<>();
  ...
  context.offset(offsetMap);
}

但这不起作用,因为尚未分配分区。我遇到了异常。

然后我是否应该将上下文(来自initialize())保存到一个全局变量中,然后使用它在方法open(Collection&lt;TopicPartition&gt; partitions)(从SinkTask 覆盖)中分配偏移量,就像我在里面做的一样initialize?例如:

@Override
public void open(Collection<TopicPartition> partitions) {
  HashMap<TopicPartition, Long> offsetMapNew = new HashMap<>();
  for (TopicPartition tp : partitions) // for each partition assigned
  {
     Long offset = myOffsetMap.get(tp.topic() + "-" + tp.partition());
     if (offset == null) { offset = 0l; } // 0 Long
     offsetMapNew.put(tp, offset);
  }
  mySavedTaskContext.offset(offsetMapNew); // sync offsets ?
}

【问题讨论】:

    标签: apache-kafka apache-kafka-connect


    【解决方案1】:

    open() 期间重置偏移量应该是正确的方法,但由于bug 仍未解决,目前无法正确处理。

    目前的解决方法是在put() 中处理重置偏移量。这可能有点违反直觉,但由于您正在管理自己的偏移量,因此如果您愿意,您实际上可以忽略数据。当您收到第一个 put() 电话时,您可以处理加载偏移量并重置它们。所有后续数据都将来自您在重置时指定的偏移量。这就是HDFS connector 当前实现其仅一次交付的方式。 (不幸的是,这是一个很好的例子,说明了如何获得恰好一次但相对复杂的代码。)事实上,由于 HDFS 连接器是 Kafka Connect 中偏移管理功能的驱动因素,因此它不执行重置的事实on rebalance 正是在实现中遗漏了这一点。

    【讨论】:

    • 谢谢你,Ewen。我只想在我们的讨论中分享有关此问题的更多详细信息here
    猜你喜欢
    • 2020-05-17
    • 2019-04-11
    • 2016-10-23
    • 2019-01-18
    • 1970-01-01
    • 1970-01-01
    • 2018-09-25
    • 2021-05-04
    • 2016-02-14
    相关资源
    最近更新 更多