【问题标题】:Spring Kafka - how to fetch timestamp (event time) when message was producedSpring Kafka - 如何在生成消息时获取时间戳(事件时间)
【发布时间】:2019-08-10 15:14:22
【问题描述】:

我需要在 kafka 消费者应用程序中获取消息生成时的时间戳(事件时间)。我知道 timestampExtractor 可以与 kafka stream 一起使用,但我的要求不同,因为我没有使用流来消费消息。

我的kafka生产者如下:

@Override
public void run(ApplicationArguments args) throws Exception {


    List<String> names = Arrays.asList("priya", "dyser", "Ray", "Mark", "Oman", "Larry");
    List<String> pages = Arrays.asList("blog", "facebook", "instagram", "news", "youtube", "about");
    Runnable runnable = () -> {
        String rPage = pages.get(new Random().nextInt(pages.size()));
        String rName = pages.get(new Random().nextInt(names.size()));
        PageViewEvent pageViewEvent = new PageViewEvent(rName, rPage, Math.random() > .5 ? 10 : 1000);

        Message<PageViewEvent> message =  MessageBuilder
                .withPayload(pageViewEvent).
                setHeader(KafkaHeaders.MESSAGE_KEY, pageViewEvent.getUserId().getBytes())
                        .build();

        try {
            this.pageViewsOut.send(message);
            log.info("sent " + message);
        } catch (Exception e) {
            log.error(e);
        }
    };

Kafka Consumer 使用 Spring kafka @KafkaListener 实现。

@KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory")

    public void receive(@Payload PageViewEvent data,@Headers MessageHeaders headers) {
        LOG.info("Message received");
        LOG.info("received data='{}'", data);
 }

容器工厂配置

   @Bean
   public ConsumerFactory<String,PageViewEvent > priceEventConsumerFactory() {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(PageViewEvent.class));



    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> priceEventsKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(priceEventConsumerFactory());
        return factory;
    }

在我打印时发送消息的生产者给我以下数据:

[payload=PageViewEvent(userId=blog, page=about, duration=10), 标头={id=8ebdad85-e2f7-958f-500e-4560ac0970e5, kafka_messageKey=[B@71975e1a, contentType=application/json, 时间戳=1553041963803}]

这确实有一个生成的时间戳。如何使用 Spring kafka 获取消息生成的时间戳?

【问题讨论】:

  • headers.get(KafkaHeaders.RECEIVED_TIMESTAMP) 中的@KafkaListener 不适合您吗?
  • @Gary Russell 它可以工作......但它不是消息产生的时间戳。据我所知,这是消费者收到它的时间戳。如果我的理解有误,请告诉我。
  • 但是 KafkaHeaders.timestampType 是 CREATE TIME,那么是不是意味着 RECEIVED_TIMESTAMP 是消息的产生时间??

标签: spring-boot apache-kafka apache-kafka-streams spring-kafka


【解决方案1】:

RECEIVED_TIMESTAMP 表示接收到的是来自记录的时间戳,而不是接收到的时间。我们避免将其放在 TIMESTAMP 中以避​​免无意传播到出站消息。

【讨论】:

  • 谢谢@Gary Russell。这意味着这一切都取决于 TimeStampType...什么值会进来。
  • 时间戳是什么?
  • ConsumerRecord.timestamp() - 它可以由生产者或代理设置。还有一个 timestampType 字段描述了时间戳NO_TIMESTAMP_TYPE(-1, "NoTimestampType"), CREATE_TIME(0, "CreateTime"), LOG_APPEND_TIME(1, "LogAppendTime");
【解决方案2】:
You can use something like below:

final Producer<String, String> producer = new KafkaProducer<String, String>(properties);
        long time = System.currentTimeMillis();
        final CountDownLatch countDownLatch = new CountDownLatch(5);
        int count=0;
        try {
            for (long index = time; index < time + 10; index++) {
                String key = null;
                count++;
                if(count<=5)
                    key = "id_"+ Integer.toString(1);
                else
                    key = "id_"+ Integer.toString(2);
                final ProducerRecord<String, String> record =
                        new ProducerRecord<>(TOPIC, key, "B2B Sample Message: " + count);
                producer.send(record, (metadata, exception) -> {
                    long elapsedTime = System.currentTimeMillis() - time;
                    if (metadata != null) {
                        System.out.printf("sent record(key=%s value=%s) " +
                                        "meta(partition=%d, offset=%d) time=%d timestamp=%d\n",
                                record.key(), record.value(), metadata.partition(),
                                metadata.offset(), elapsedTime, metadata.timestamp());
                        System.out.println("Timestamp:: "+metadata.timestamp() );
                    } else {
                        exception.printStackTrace();
                    }
                    countDownLatch.countDown();
                });
            }
            try {
                countDownLatch.await(25, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }finally {
            producer.flush();
            producer.close();
        }

    }

【讨论】:

    猜你喜欢
    • 2020-11-18
    • 1970-01-01
    • 2020-12-21
    • 2019-07-28
    • 2019-07-06
    • 1970-01-01
    • 2021-07-14
    • 1970-01-01
    • 2019-08-18
    相关资源
    最近更新 更多