【发布时间】:2019-08-10 15:14:22
【问题描述】:
我需要在 kafka 消费者应用程序中获取消息生成时的时间戳(事件时间)。我知道 timestampExtractor 可以与 kafka stream 一起使用,但我的要求不同,因为我没有使用流来消费消息。
我的kafka生产者如下:
@Override
public void run(ApplicationArguments args) throws Exception {
List<String> names = Arrays.asList("priya", "dyser", "Ray", "Mark", "Oman", "Larry");
List<String> pages = Arrays.asList("blog", "facebook", "instagram", "news", "youtube", "about");
Runnable runnable = () -> {
String rPage = pages.get(new Random().nextInt(pages.size()));
String rName = pages.get(new Random().nextInt(names.size()));
PageViewEvent pageViewEvent = new PageViewEvent(rName, rPage, Math.random() > .5 ? 10 : 1000);
Message<PageViewEvent> message = MessageBuilder
.withPayload(pageViewEvent).
setHeader(KafkaHeaders.MESSAGE_KEY, pageViewEvent.getUserId().getBytes())
.build();
try {
this.pageViewsOut.send(message);
log.info("sent " + message);
} catch (Exception e) {
log.error(e);
}
};
Kafka Consumer 使用 Spring kafka @KafkaListener 实现。
@KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory")
public void receive(@Payload PageViewEvent data,@Headers MessageHeaders headers) {
LOG.info("Message received");
LOG.info("received data='{}'", data);
}
容器工厂配置
@Bean
public ConsumerFactory<String,PageViewEvent > priceEventConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(PageViewEvent.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> priceEventsKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(priceEventConsumerFactory());
return factory;
}
在我打印时发送消息的生产者给我以下数据:
[payload=PageViewEvent(userId=blog, page=about, duration=10), 标头={id=8ebdad85-e2f7-958f-500e-4560ac0970e5, kafka_messageKey=[B@71975e1a, contentType=application/json, 时间戳=1553041963803}]
这确实有一个生成的时间戳。如何使用 Spring kafka 获取消息生成的时间戳?
【问题讨论】:
-
headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)中的@KafkaListener不适合您吗? -
@Gary Russell 它可以工作......但它不是消息产生的时间戳。据我所知,这是消费者收到它的时间戳。如果我的理解有误,请告诉我。
-
但是 KafkaHeaders.timestampType 是 CREATE TIME,那么是不是意味着 RECEIVED_TIMESTAMP 是消息的产生时间??
标签: spring-boot apache-kafka apache-kafka-streams spring-kafka