【发布时间】:2017-07-31 13:43:13
【问题描述】:
我正在使用 kafka,并且我制作了这样的制作人:
synchronized (obj) {
while (true){
long start = Instant.now().toEpochMilli();
for (int i=0; i< NUM_MSG_SEC ; i++)
{
PriceStreamingData data = PriceStreamingData.newBuilder()
.setUser(getRequest().getUser())
.setSecurity(getRequest().getSecurity())
.setTimestamp(Instant.now().toEpochMilli())
.setPrice(new Random().nextDouble()*200)
.build();
record = new ProducerRecord<>(topic, keyBuilder.build(data),
data);
producer.send(record,new Callback(){
@Override
public void onCompletion(RecordMetadata arg0, Exception arg1) {
counter.incrementAndGet();
if(arg1 != null){
arg1.printStackTrace();
}
}
});
}
long diffCiclo = Instant.now().toEpochMilli() - start;
long diff = Instant.now().toEpochMilli() - startTime;
System.out.println("Number of sent: " + counter.get() +
" Millisecond:" + (diff) + " - NumberOfSent/Diff(K): " + counter.get()/diff );
try {
if(diffCiclo >= 1000){
System.out.println("over 1 second: " + diffCiclo);
}
else {
obj.wait( 1000 - diffCiclo );
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
如您所见,它非常简单,只需创建一条新消息并发送即可。 如果我看到日志:
NumberOfSent/Diff(K)
在最初的 10 秒内,它的表现非常糟糕
30k per second
60 秒后我有
180k per second
为什么?我怎样才能开始已经达到 180k 的过程?
我的 kafka 生产者配置是 Follwing
Async producer ( but also with sync producer the situation dose not change)
ACKS_CONFIG = 0
BATCH_SIZE_CONFIG = 20000
COMPRESSION_TYPE_CONFIG = none
LINGER_MS_CONFIG = 0
最后一个细节:
NUM_MSG_SEC is set to 200000 or bigger number
【问题讨论】:
-
是否有其他东西锁定
obj导致延迟?isRunning()什么时候返回 true? -
nothing else lock on obj,没有别的东西会导致延迟,我认为延迟在某处但不在我的代码中,我的代码很简单,我认为它是围绕kafka配置的东西,我想那,( isRunning 总是正确的)
-
也许在
synchronized (obj)之后添加一条日志语句,以确定您的代码何时实际执行。也许也可以在调试中运行以查看在执行代码之前发生了什么。 -
@AndrewS 我之前已经调试过什么都没做......第一行代码从同步(obj)开始,然后才初始化变量
-
我正在做各种检查,我发现如果我将以下参数放入 jvm -XX:MinMetaspaceFreeRatio=100 -XX:MaxMetaspaceFreeRatio=100 以每秒 180k 的速度运行所需的秒数更少但仍然没有这么快……为什么?
标签: java apache-kafka producer-consumer kafka-producer-api nosql