【问题标题】:Kafka producer difference between flush and pollKafka生产者flush和poll的区别
【发布时间】:2019-03-06 11:37:25
【问题描述】:

我们有一个 Kafka 消费者,它将读取消息并执行此操作,然后使用以下脚本再次发布到 Kafka 主题

生产者配置:

{
  "bootstrap.servers": "localhost:9092"
}

我还没有配置任何其他配置像queue.buffering.max.messagesqueue.buffering.max.msbatch.num.messages

我假设这些都将是来自configuration的默认值

queue.buffering.max.messages : 100000
queue.buffering.max.ms : 0
batch.num.messages : 10000

我的理解:当内部队列达到 queue.buffering.max.ms 或 batch.num.messages 时,消息将在单独的线程中发布到 Kafka。在我的配置中 queue.buffering.max.ms 是 0,所以当我调用生产()时,每条消息都会被发布。如果我错了,请纠正我。

我的制作人 sn-p:

def send(topic, message):
    p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
    p.flush()

来自this post 我了解到,在每条消息后使用刷新,生产者将成为同步生产者。如果我使用上面的脚本,发布到 Kafka 大约需要 45 毫秒

如果我将 sn-p 更改为

def send(topic, message):
    p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
    p.poll(0)

有没有什么性能会提高?你能澄清我的理解吗?

谢谢

【问题讨论】:

    标签: python apache-kafka kafka-producer-api confluent-platform


    【解决方案1】:

    flush()poll() 的区别在客户端的documentation 中有说明。

    对于flush(),它声明:

    等待生产者队列中的所有消息被传递。这是一个 调用 poll() 直到 len() 为零或 可选超时时间已过。

    对于poll()

    轮询生产者以获取事件并调用相应的回调 (如果已注册)。

    send() 之后调用poll() 不会使生产者同步,因为刚刚发送的消息不太可能已经到达代理并且已经将传递报告发送回客户端。

    相反,flush() 将阻塞,直到先前发送的消息已传递(或错误),从而有效地使生产者同步。

    【讨论】:

    • 感谢您的信息。它真的对我有用。时间从 45ms 到 ~0.2ms
    • 这篇文章并没有解释 OP 关于配置的问题。关于这一点,我的理解是:1.(已经对此进行了测试)您最终运行的产品超出了queue.buffering.max.messages 限制。你得到一个例外。 2.(我不确定,这只是我的假设)您最终会通过 batch.num.messages 这个设置运行生产。消息可能在您刷新之前就已传递。
    猜你喜欢
    • 2019-11-23
    • 1970-01-01
    • 1970-01-01
    • 2017-04-28
    • 2018-02-05
    • 1970-01-01
    • 2023-03-26
    • 1970-01-01
    • 2019-02-03
    相关资源
    最近更新 更多