【问题标题】:Lost message from the Kafka Topic来自 Kafka 主题的丢失消息
【发布时间】:2019-11-26 05:47:08
【问题描述】:

ProducerRecord 中尝试时间戳时;我发现了一些奇怪的东西。从生产者那里发送了几条消息后,我运行了 kafka-console-consumer.sh 并验证了这些消息是否在主题中。我拦住了制片人,等了一分钟。当我重新运行 kafka-console-consumer.sh 时,它没有显示我之前生成的消息。我还添加了 producer.flush() 和 producer.close() 但结果还是一样。

现在,当我停止使用时间戳字段时,一切正常,这让我相信带有时间戳的消息有些挑剔。

我使用的是 Kafka_2.11-2.0.0(2018 年 7 月 30 日发布)

以下是示例代码。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internal.RecordHeaders;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import static java.lang.Thread.sleep;
public class KafkaProducerSample{
    public static void main(String[] args){
        String kafkaHost="sample:port";
        String notificationTopic="test";

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaHost);
        props.put(ProducerConfig.ACKS_CONFIG, 1);
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

        Producer<String, String> producer = new KafkaProducer(props, new StringSerialize(), new StringSerializer);

        RecordHeaders recordHeaders = new RecordHeader();
        ProducerRecord<String, String> record = new ProducerRecord(notificationTopic, null, 1574443515L, sampleKey, SampleValue);
        producer.send(record);
        sleep(1000);
    }
}

我运行控制台消费者如下

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap.server KAFKA_HOST:PORT --topic test --from-beginning

#output after running producer
test


#output 5mins after shutting down producer

【问题讨论】:

  • 您的kafka-console-consumer 是如何运行的?你用过--from-beginning标志吗?
  • 更新问题使其更清晰
  • 尝试从分区和特定偏移量消费 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --offset 0 --partition 0
  • @sun007 试过了;不起作用。

标签: java apache-kafka kafka-producer-api


【解决方案1】:

您只异步发送一条记录,而不是确认或刷新缓冲区。

您需要发送更多记录,

producer.send(record).get();

producer.send(record);
producer.flush();

或者(首选),在您的 main 方法中执行 Runtime.addShutdownHook() 以刷新和关闭生产者

【讨论】:

    猜你喜欢
    • 2019-11-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-10
    • 2018-10-27
    • 1970-01-01
    • 1970-01-01
    • 2021-07-09
    相关资源
    最近更新 更多