【问题标题】:Delay in Consumer consuming messages in Apache KafkaApache Kafka 中消费者消费消息的延迟
【发布时间】:2014-02-12 11:49:45
【问题描述】:

我正在使用 Kafka 0.8.0 并尝试实现下面提到的场景。

JCA API(充当生产者并向其发送数据)-----> 消费者-----> HBase

我在使用 JCA 客户端获取数据后立即将每条消息发送给消费者。例如,一旦生产者发送消息 no.1 ,我想从消费者那里获取相同的消息并“放入”HBase。但是我的消费者在一些随机的 n 消息之后开始获取消息。我想让生产者和消费者同步,这样他们就可以一起工作了。

我用过:

1 个经纪人

1 个主题

1 个单一生产者和高级消费者

谁能建议我需要做什么才能达到同样的效果?

已编辑:

添加一些相关代码sn-p。

Consumer.java

public class Consumer extends Thread {
    private final ConsumerConnector consumer;
    private final String topic;
    PrintWriter pw = null;
    int t = 0;
    StringDecoder kd = new StringDecoder(null);
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    Map<String, List<KafkaStream<String, Signal>>> consumerMap;
    KafkaStream<String, Signal> stream;
    ConsumerIterator<String, Signal> it;

    public Consumer(String topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());

        this.topic = topic;
        topicCountMap.put(topic, new Integer(1));
        consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer(
                new VerifiableProperties()));
        stream = consumerMap.get(topic).get(0);
        it = stream.iterator();

    }

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.zkConnect);
        props.put("group.id", KafkaProperties.groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("fetch.size", "1024");

        return new ConsumerConfig(props);

    }

    synchronized public void run() {

        while (it.hasNext()) {
            t = (it.next().message()).getChannelid();
            System.out.println("In Consumer received msg" + t);
        }
    }
}

producer.java

public class Producer {
    public final kafka.javaapi.producer.Producer<String, Signal> producer;
    private final String topic;
    private final Properties props = new Properties();

    public Producer(String topic)
    {
        props.put("serializer.class", "org.bigdata.kafka.Serializer");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "localhost:9092");
        // Use random partitioner. Don't need the key type. Just set it to Integer.
        // The message is of type userdefined Object .
        producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props));
        this.topic = topic;
    }
}

KafkaProperties.java

public interface KafkaProperties {
    final static String zkConnect = "127.0.0.1:2181";
    final static String groupId = "group1";
    final static String topic = "test00";
    final static String kafkaServerURL = "localhost";
    final static int kafkaServerPort = 9092;
    final static int kafkaProducerBufferSize = 64 * 1024;
    final static int connectionTimeOut = 100000;
    final static int reconnectInterval = 10000;
    final static String clientId = "SimpleConsumerDemoClient";
}

这就是消费者对于前 10 条消息的行为方式,它不会系统输出消费者收到的消息,但从第 11 条消息开始,它开始正常运行。

     producer sending msg1

     producer sending msg2

     producer sending msg3

     producer sending msg4

     producer sending msg5

     producer sending msg6

     producer sending msg7

     producer sending msg8

     producer sending msg9

     producer sending msg10

     producer sending msg11

     producer sending msg12
     In Consumer received msg12

     producer sending msg13
     In Consumer received msg13

     producer sending msg14
     In Consumer received msg14

     producer sending msg15
     In Consumer received msg15

     producer sending msg16
     In Consumer received msg16

     producer sending msg17
     In Consumer received msg17

     producer sending msg18
     In Consumer received msg18

     producer sending msg19
     In Consumer received msg19

     producer sending msg20
     In Consumer received msg20

     producer sending msg21
     In Consumer received msg21

已编辑:在生产者向消费者发送消息的位置添加侦听器功能。而且我使用的默认生产者配置没有覆盖它

public synchronized void onValueChanged(final MonitorEvent event_) {


    // Get the value from the DBR
    try {
        final DBR dbr = event_.getDBR();

        final String[] val = (String[]) dbr.getValue();

        producer1.producer.send(new KeyedMessage<String, Signal>         
                    (KafkaProperties.topic,new Signal(messageNo)));
        System.out.println("producer sending msg"+messageNo);

        messageNo++;


    } catch (Exception ex) {
        ex.printStackTrace();
    }
}

【问题讨论】:

  • 你能展示你的生产者和消费者代码/配置吗?看起来他们中的一些是批量操作的(事实上,这是件好事)。
  • @Dmitry 添加了代码 sn-p。
  • 消费者似乎没问题(除了属性 fetch.size = 1K - 这意味着消费者无法接收更大的消息,但可能不是我们正在寻找的问题)。可以分享一下生产者的 newProducerConfig() 和 run() 方法的代码吗?
  • @Dmitry 我的生产者类没有线程化。我有一个监听器函数,每当某个值发生变化时都会调用它,在其中我使用 producer.send() 函数向消费者发送消息。无论如何,我将共享监听器函数。

标签: java apache-kafka


【解决方案1】:
  1. 尝试将props.put("request.required.acks", "1") 添加到生产者配置中。默认情况下,生产者不等待确认,并且不保证消息传递。因此,如果您在测试之前启动代理,生产者可能会在代理完全初始化之前开始发送消息,并且前几条消息可能会丢失。

  2. 尝试将props.put("auto.offset.reset", "smallest") 添加到消费者配置中。它等于 kafka-console-consumer.sh 的--from-beginning 选项。如果您的消费者启动晚于生产者,并且 Zookeeper 中没有保存偏移数据,则默认情况下它将开始仅消费新消息(请参阅文档中的 Consumer configs)。

【讨论】:

  • 感谢您的建议。将 props.put("request.required.acks", "1") 添加到生产者,但程序的行为是随机的。我每次用一个新主题运行该程序 5 次。但它给出了 5 次不同的结果。两次生产者和消费者同步,其余时间消费者被延迟。
  • “延迟”是指所有消息都已收到,但不是在发送后立即收到?在您的原始输出中,前几条消息完全丢失了。
  • 在答案中添加了另一个建议。
  • 尝试在退出前调用 consumer.shutdown()。它将同步消耗的偏移量。
  • 嗨@Dmitry,我的问题在这个回复之后得到了解决。我也有一个疑问。是否可以使用固定大小(例如 5 个)的线程池来处理来自任意数量(例如 100 个或更多)主题的消息?
【解决方案2】:

一种可能性是卡夫卡滞后。可能是消费者因分区过多而超载。或者处理每条消息的成本非常高。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-06-08
    • 2012-01-28
    • 1970-01-01
    • 2017-09-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多