【发布时间】:2016-11-20 11:05:40
【问题描述】:
我想扩展 SinkTask 以创建我自己的接收器连接器。
如果我在刷新期间保存了偏移量,并且下次启动接收器连接器时我想从保存的偏移量中继续读取,那么正确的方法是什么?
我尝试使用被覆盖的initialize(SinkTaskContext context) 的SinkTaskContext 来分配我自己的偏移量:
@Override
public void initialize(SinkTaskContext context) {
HashMap<TopicPartition, Long> offsetMap = new HashMap<>();
...
context.offset(offsetMap);
}
但这不起作用,因为尚未分配分区。我遇到了异常。
然后我是否应该将上下文(来自initialize())保存到一个全局变量中,然后使用它在方法open(Collection<TopicPartition> partitions)(从SinkTask 覆盖)中分配偏移量,就像我在里面做的一样initialize?例如:
@Override
public void open(Collection<TopicPartition> partitions) {
HashMap<TopicPartition, Long> offsetMapNew = new HashMap<>();
for (TopicPartition tp : partitions) // for each partition assigned
{
Long offset = myOffsetMap.get(tp.topic() + "-" + tp.partition());
if (offset == null) { offset = 0l; } // 0 Long
offsetMapNew.put(tp, offset);
}
mySavedTaskContext.offset(offsetMapNew); // sync offsets ?
}
【问题讨论】:
标签: apache-kafka apache-kafka-connect