现在的挑战是服务是否崩溃并且用户重新连接到
另一个实例,我需要他们开始接收消息
从他们离开的地方开始。
这还不是默认的消费者行为吗?您是否可以为每个唯一用户设置一个 consumer group.id,然后开箱即用地解决此问题?
如果您已经在使用大量 kafka 侦听器,那么很可能必须重做所有代码以管理偏移量。
话虽如此
我编写了一个类,它对“全局”消费者行为执行类似的操作,我必须手动管理消费者偏移量并手动分配分区。这应该让您了解所涉及的内容
用例是:create a consumer to pull all data from all partitions in a topic up until it hits the current offsets at time t
因此,例如,如果您有 3 个分区,其结束偏移量分别为 100、200 和 150,那么消费者将从偏移量 0 读取到 N=100,200,150 的每个分区,然后返回结果集合。
以此为基准,您可以修改行为以从消费者组 id 的最后一次提交开始(每个用户唯一),手动读取并提交消费者组的偏移量。退出条件可能是 websocket 会话的结束。
另外需要注意的是,当您像我一样手动分配分区时,您会失去消费者组平衡行为。
/**
*
* This class pulls an entire Kafka topic and then exits based on the latest
* offsets in the topic
*
* @author Fermi-4
*
* @param <K>
* @param <T>
*/
@Slf4j
public class KafkaRunnerImpl<K, T> implements KafkaRunner<T> {
final Deserializer<K> _keySerde;
final Deserializer<T> _valueSerde;
final String _bootstrapServers;
final String _topic;
final int _requestTimeout;
final int _sampleTimeout;
public KafkaRunnerImpl(Deserializer<K> _keySerde, Deserializer<T> _valueSerde, String _bootstrapServers, String _topic, int _requestTimeout,
int _sampleTimeout) {
super();
this._topic = _topic;
this._keySerde = _keySerde;
this._valueSerde = _valueSerde;
this._bootstrapServers = _bootstrapServers;
this._requestTimeout = _requestTimeout;
this._sampleTimeout = _sampleTimeout;
}
private Map<TopicPartition, Long> _endOffsets;
/**
*
* https://stackoverflow.com/questions/52837302/strategies-to-ensure-that-each-kafka-consumer-has-a-different-unique-group-id
*
* in order to ensure we get all of the data from the partitions of a topic we need
* to manually assign the partitions to the consumer.
*
* There are tradeoffs to doing this and is not the intended use for kafka (see above)
*
*
*/
public Collection<T> pull() {
try(KafkaConsumer<K, T> consumer = new KafkaConsumer<K, T>(getConsumerProps())) {
Collection<T> records = new ArrayList<T>();
/*
* subscribe to topic
*/
List<PartitionInfo> info = consumer.partitionsFor(_topic);
/*
* manually assign the partitions
*/
consumer.assign(info.stream().map(partitionInfo -> new TopicPartition(_topic, partitionInfo.partition())).collect(Collectors.toList()));
/*
* partitions belonging to this consumer
*/
Set<TopicPartition> partitions = consumer.assignment();
/*
* seek the beginning of assigned partitions
*/
consumer.seekToBeginning(partitions);
/*
* the target partition end offsets
*/
_endOffsets = consumer.endOffsets(partitions);
log.info(_endOffsets.toString());
/*
* start loop
*/
while(comparePartitionOffsets(consumer)) {
/*
* poll for messages
*/
ConsumerRecords<K, T> sample = consumer.poll(Duration.ofMillis(_sampleTimeout));
for(TopicPartition partition : partitions) {
/*
* grab the records only from this partition
*/
List<ConsumerRecord<K, T>> recordsForPartition = sample.records(partition);
/*
* iterate through all the records and add to the collection
* ONLY if the record offset is less than the target offset
* for current partition
*/
for(ConsumerRecord<K, T> consumerRecord : recordsForPartition) {
log.debug("record offset: " + consumerRecord.offset() + " partition: " + consumerRecord.partition());
if(consumerRecord.offset() <= _endOffsets.get(partition)) {
records.add(consumerRecord.value());
}
}
}
}
return records;
} catch (Exception e) {
// handle the exception
}
return null;
}
// compare current offsets to the target offsets, return false if offsets not met yet
private boolean comparePartitionOffsets(KafkaConsumer<K, T> consumer) {
return !_endOffsets.keySet().stream().allMatch(tp -> {
log.info("consumer: " +consumer.position(tp) + " target: " + _endOffsets.get(tp));
boolean offsetsReached = consumer.position(tp) >= (_endOffsets.get(tp)-1);
if(offsetsReached) log.info("Target offsets reached for partition: " + tp.partition());
return offsetsReached;
});
}
private Properties getConsumerProps() {
Properties properties = new Properties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "runner-group");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, _requestTimeout); // polls until this timeout is reached then closes
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _bootstrapServers);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, _valueSerde.getClass());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, _keySerde.getClass());
return properties;
}
}