【问题标题】:Some commit offset call is getting fail in kafka一些提交偏移调用在 kafka 中失败
【发布时间】:2017-04-11 06:08:11
【问题描述】:

kafka 服务器和客户端 jar 移至最新库:0.10.0.1

我的消费者和生产者代码使用上述最新的 kafka jar,但仍使用旧的消费者 apis(0.8.2)。

我在调用提交偏移时遇到了消费者方面的问题。

2017-04-10 00:07:14,547 ERROR kafka.consumer.ZookeeperConsumerConnector [groupid1_x.x.x.x-1491594443834-b0e2bce5], Error while committing offsets. java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
    at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
    at kafka.consumer.ZookeeperConsumerConnector.liftedTree2$1(ZookeeperConsumerConnector.scala:354)
    at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:351)
    at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:331)
    at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:111)
    at com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.commitOffset(KafkaHLConsumer.java:173)
    at com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.run(KafkaHLConsumer.java:271)

kafka 服务器端配置:

listeners=PLAINTEXT://:9092
log.message.format.version=0.8.2.1
broker.id.generation.enable=false
unclean.leader.election.enable=false

kafka 消费者的以下配置:

auto.commit.enable is overridden to false
auto.offset.reset is overridden to smallest
consumer.timeout.ms is overridden to 100
dual.commit.enabled is overridden to true
fetch.message.max.bytes is overridden to 209715200
group.id is overridden to crm_topic1_hadoop_tables
offsets.storage is overridden to kafka
rebalance.backoff.ms is overridden to 6000
zookeeper.session.timeout.ms is overridden to 23000
zookeeper.sync.time.ms is overridden to 2000

为了创建消费者,我使用下面的 api:

ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(propForHLConsumer));

对于提交调用

consumer.commitOffsets();

在从 kafka 读取消息时,我们使用下面的方法来处理超时

private boolean hasNext(ConsumerIterator<byte[], byte[]> it)
{
    try
    {
        it.hasNext();
        return true;
    }
    catch (ConsumerTimeoutException e)
    {
        return false;
    }
}

这是必需的,因为我们希望仅在从 kafka 接收到特定时间间隔或大小的消息(字节)后才开始处理。

同样的异常,即使在设置之后 dual.commit.enabled = false 消费者.timeout.ms = 1000 其他设置保持旧配置。

更多细节:

使用 0.8.2.1 版本,我从来没有遇到过这样的问题。搬到后 0.10.0.1(客户端和服务器),开始得到这个异常。

在处理/推送到 hadoop 之前,我们正在读取多条消息。 处理/写入 hadoop 部分需要时间(约 5 分钟)。之后 当我们尝试推动这个过程时,我们正在超越异常。这 例外我每第二次提交Offset。还有一段时间(在哪里 commitOffset 在前一次提交的 10 秒内调用)否 第二次提交例外。

供您参考。如果提交偏移失败,那么消费者只是 阅读下一条消息而不返回上一次成功提交 偏移位置。但是如果提交偏移失败并重新启动消费者 进程然后它正在从旧的提交位置读取。

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    正如我在问题细节中提到的,我使用的是最新的 kafka jar,但仍在使用旧的消费者客户端:

    kafka.javaapi.consumer.ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(propForHLConsumer));
    

    我通过调用第二次 commitOffset 解决了这个问题。

    实际上是与connections.max.idle.ms有关的问题。 这个属性是用最新的 kafka 引入的(broker=10 分钟,consumer=9 分钟,producer=9 分钟)。

    因此,每当我的老消费者在 10 分钟后调用第二次提交偏移时,我都会遇到异常。

    使用旧的消费者 API,无法设置此属性。 和代理配置我无法更改(由其他团队处理并为其他用户提供相同的代理)...

    在这里,我认为旧的 commitOffset 调用需要另一个连接(迭代器除外),并且当它的理想状态超过 10 分钟时,该连接正在接近。我不太确定。

    如果第一次 commitOffset 调用发生任何失败,那么第二次调用将确保成功。 如果第一次自己成功,那么下一次执行不会有任何问题。无论如何,我们很少调用提交偏移量。

    接下来我将使用最新的 kafka 消费者和生产者 java API 移动我的代码。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-10-04
      • 1970-01-01
      • 2020-07-22
      • 2021-03-17
      • 2020-08-08
      • 2017-12-10
      • 2021-11-14
      • 2017-10-16
      相关资源
      最近更新 更多