【问题标题】:Having a Kafka Consumer read a single message at a time让 Kafka 消费者一次读取一条消息
【发布时间】:2015-11-10 00:34:35
【问题描述】:

我们设置了 Kafka,以便能够由多个服务器并行处理消息。但是每条消息只能被处理一次(并且只能由一个服务器)。我们已经启动并运行了它,并且运行良好。

现在,我们面临的问题是 Kafka 消费者批量读取消息以获得最大效率。如果/当处理失败、服务器关闭或其他情况时,这会导致问题,因为那时我们会丢失即将处理的数据。

有没有办法让消费者一次只阅读消息,让 Kafka 保留未处理的消息?就像是;消费者拉出一条消息 -> 处理 -> 完成后提交偏移量,重复。使用 Kafka 是否可行?有什么想法/想法吗?

谢谢!

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    您提到只有一个处理,但您担心丢失数据。我假设您只是担心其中一台服务器出现故障时的边缘情况?你会丢失数据吗?

    我认为没有办法一次完成一条消息。查看consumer configurations,似乎只有一个选项可以设置消费者可以从 Kafka 获取的最大字节数,而不是消息数。

    fetch.message.max.bytes
    

    但是如果您担心完全丢失数据,如果您从不提交偏移量,Kafka 不会将其标记为已提交并且不会丢失。 阅读关于delivery semantics的Kafka文档,

    Kafka 在默认情况下有效地保证了至少一次交付,并且 允许用户通过禁用最多执行一次交付 重试生产者并在处理之前提交其偏移量 一批消息。一次性交付需要与 目标存储系统,但 Kafka 提供了偏移量 使实现这一点变得简单。

    因此,Kafka 默认不会启用精确一次处理。它要求您在将处理的输出写入存储时存储偏移量。

    但这可以通过简单地让 消费者将其偏移量存储在与其输出相同的位置......作为一个例子, 我们在 HDFS 中填充数据的 Hadoop ETL 将其偏移量存储在 HDFS 中 使用它读取的数据,以便保证数据和 偏移量要么都更新,要么都不更新。

    希望对你有帮助。

    【讨论】:

    • 是的,它似乎并不真正支持。不过谢谢回复! : /
    • 不客气。此外,如果您发现我们的某个答案回答了您的问题或有帮助,我们将不胜感激接受的答案和/或投票。
    【解决方案2】:

    这取决于您要使用的客户端。对于 C++ 和 python,每次都可以消费 ONE 消息。

    对于 python,我使用了https://github.com/mumrah/kafka-python。以下代码每次可以消费一条消息:

    message = self.__consumer.get_message(block=False, timeout=self.IterTimeout, get_partition_info=True )
    

    __consumer 是 SimpleConsumer 的对象。

    在这里查看我的问题和答案:How to stop Python Kafka Consumer in program?

    对于 C++,我使用的是https://github.com/edenhill/librdkafka。以下代码每次可以消费一条消息。

    214         while( m_bRunning )
    215         {
    216                 // Start to read messages from the local queue.
    217                 RdKafka::Message *msg = m_consumer->consume(m_topic, m_partition, 1000);
    218                 msg_consume(msg);
    219                 delete msg;
    220                 m_consumer->poll(0);
    221         }
    

    m_consumer 是指向 C++ Consumer 对象 (C++ API) 的指针。

    希望对您有所帮助。

    【讨论】:

    • 你的意思是你一个接一个地消费消息,还是它实际上一次从Kafka中提取一条消息?因为有很大的不同。我们想要的是能够拉取一条消息,完成后提交该偏移量,然后再次拉取,依此类推。
    • 消费者默认自动提交。您可以设置提交频率。在 python 中,commit_energy_n 默认为 100。顺便说一下,你需要设置group_id。每条消息将只被组中的一个消费者消费。
    • 您可以使用 api 通过程序进行提交。一条消息被提交并在消费后偏移移动。你总是需要设置起始偏移量。
    • 我认为你需要的是关于组和开始偏移。 Apache Kafka 文档很有帮助。
    • 顺便说一下,根据我的经验,当网络问题发生时,例如服务器宕机,客户端消费者可以处理这些问题。但是每次服务器重新启动时,python 生产者可能会产生两次消息。您需要额外的错误处理逻辑。希望这有帮助
    【解决方案3】:

    您可以尝试将 max.poll.records 设置为 1。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-06-13
      • 2017-11-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-12-01
      • 2017-10-19
      • 2017-01-24
      相关资源
      最近更新 更多