【问题标题】:Kafka is slow to produce messages in first secondsKafka 在第一秒内生成消息的速度很慢
【发布时间】:2017-07-31 13:43:13
【问题描述】:

我正在使用 kafka,并且我制作了这样的制作人:

synchronized (obj) {

        while (true){

            long start = Instant.now().toEpochMilli();
            for (int i=0; i< NUM_MSG_SEC  ; i++)  
            {

                PriceStreamingData data = PriceStreamingData.newBuilder()
                        .setUser(getRequest().getUser())
                        .setSecurity(getRequest().getSecurity())
                        .setTimestamp(Instant.now().toEpochMilli())
                        .setPrice(new Random().nextDouble()*200)
                        .build();


                record = new ProducerRecord<>(topic, keyBuilder.build(data), 
                        data);



                producer.send(record,new Callback(){
                    @Override
                    public void onCompletion(RecordMetadata arg0, Exception arg1) {
                        counter.incrementAndGet();
                        if(arg1 != null){
                            arg1.printStackTrace();
                        }


                    } 
                });

            }
            long diffCiclo = Instant.now().toEpochMilli() - start;
            long diff = Instant.now().toEpochMilli() - startTime;


            System.out.println("Number of sent: " + counter.get() +  
                    " Millisecond:" + (diff) + " - NumberOfSent/Diff(K): " + counter.get()/diff );

            try {
                if(diffCiclo >= 1000){
                    System.out.println("over 1 second: "  + diffCiclo);

                }
                else {
                    obj.wait( 1000 - diffCiclo );

                }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }




        }
    }

如您所见,它非常简单,只需创建一条新消息并发送即可。 如果我看到日志:

    NumberOfSent/Diff(K)   

在最初的 10 秒内,它的表现非常糟糕

  30k per second

60 秒后我有

  180k  per second 

为什么?我怎样才能开始已经达到 180k 的过程?

我的 kafka 生产者配置是 Follwing

  Async producer ( but also with sync producer the situation dose not change)
  ACKS_CONFIG = 0
  BATCH_SIZE_CONFIG = 20000 
  COMPRESSION_TYPE_CONFIG = none
  LINGER_MS_CONFIG = 0

最后一个细节:

  NUM_MSG_SEC is set to 200000 or bigger number 

【问题讨论】:

  • 是否有其他东西锁定obj 导致延迟? isRunning() 什么时候返回 true?
  • nothing else lock on obj,没有别的东西会导致延迟,我认为延迟在某处但不在我的代码中,我的代码很简单,我认为它是围绕kafka配置的东西,我想那,( isRunning 总是正确的)
  • 也许在synchronized (obj) 之后添加一条日志语句,以确定您的代码何时实际执行。也许也可以在调试中运行以查看在执行代码之前发生了什么。
  • @AndrewS 我之前已经调试过什么都没做......第一行代码从同步(obj)开始,然后才初始化变量
  • 我正在做各种检查,我发现如果我将以下参数放入 jvm -XX:MinMetaspaceFreeRatio=100 -XX:MaxMetaspaceFreeRatio=100 以每秒 180k 的速度运行所需的秒数更少但仍然没有这么快……为什么?

标签: java apache-kafka producer-consumer kafka-producer-api nosql


【解决方案1】:

我自己找到了解决方案,希望这篇文章对其他人也有用。

问题出现在

 ProducerConfig.BATCH_SIZE_CONFIG 

ProducerConfig.LINGER_MS_CONFIG

我的参数是 20000 和 0,为了解决这个问题,我确实将它们设置为更高的值 200000 和 1000。最后我使用参数启动了 JVM:

-XX:MinMetaspaceFreeRatio=100
-XX:MaxMetaspaceFreeRatio=100

因为我发现将元空间设置为合适的值需要更长的时间。

现在生产者直接从 140k 开始,并且在 1 秒内已经达到 180k。

【讨论】:

    猜你喜欢
    • 2019-05-12
    • 2021-01-21
    • 2021-03-04
    • 1970-01-01
    • 2012-04-15
    • 2017-05-22
    • 2018-10-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多