【发布时间】:2017-04-11 06:08:11
【问题描述】:
kafka 服务器和客户端 jar 移至最新库:0.10.0.1
我的消费者和生产者代码使用上述最新的 kafka jar,但仍使用旧的消费者 apis(0.8.2)。
我在调用提交偏移时遇到了消费者方面的问题。
2017-04-10 00:07:14,547 ERROR kafka.consumer.ZookeeperConsumerConnector [groupid1_x.x.x.x-1491594443834-b0e2bce5], Error while committing offsets. java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
at kafka.consumer.ZookeeperConsumerConnector.liftedTree2$1(ZookeeperConsumerConnector.scala:354)
at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:351)
at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:331)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:111)
at com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.commitOffset(KafkaHLConsumer.java:173)
at com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.run(KafkaHLConsumer.java:271)
kafka 服务器端配置:
listeners=PLAINTEXT://:9092
log.message.format.version=0.8.2.1
broker.id.generation.enable=false
unclean.leader.election.enable=false
kafka 消费者的以下配置:
auto.commit.enable is overridden to false
auto.offset.reset is overridden to smallest
consumer.timeout.ms is overridden to 100
dual.commit.enabled is overridden to true
fetch.message.max.bytes is overridden to 209715200
group.id is overridden to crm_topic1_hadoop_tables
offsets.storage is overridden to kafka
rebalance.backoff.ms is overridden to 6000
zookeeper.session.timeout.ms is overridden to 23000
zookeeper.sync.time.ms is overridden to 2000
为了创建消费者,我使用下面的 api:
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(propForHLConsumer));
对于提交调用
consumer.commitOffsets();
在从 kafka 读取消息时,我们使用下面的方法来处理超时
private boolean hasNext(ConsumerIterator<byte[], byte[]> it)
{
try
{
it.hasNext();
return true;
}
catch (ConsumerTimeoutException e)
{
return false;
}
}
这是必需的,因为我们希望仅在从 kafka 接收到特定时间间隔或大小的消息(字节)后才开始处理。
同样的异常,即使在设置之后 dual.commit.enabled = false 消费者.timeout.ms = 1000 其他设置保持旧配置。
更多细节:
使用 0.8.2.1 版本,我从来没有遇到过这样的问题。搬到后 0.10.0.1(客户端和服务器),开始得到这个异常。
在处理/推送到 hadoop 之前,我们正在读取多条消息。 处理/写入 hadoop 部分需要时间(约 5 分钟)。之后 当我们尝试推动这个过程时,我们正在超越异常。这 例外我每第二次提交Offset。还有一段时间(在哪里 commitOffset 在前一次提交的 10 秒内调用)否 第二次提交例外。
供您参考。如果提交偏移失败,那么消费者只是 阅读下一条消息而不返回上一次成功提交 偏移位置。但是如果提交偏移失败并重新启动消费者 进程然后它正在从旧的提交位置读取。
【问题讨论】:
标签: apache-kafka kafka-consumer-api