【问题标题】:Kafka Connect - Failed to flush, timed out while waiting for producer to flush outstanding messagesKafka Connect - 无法刷新,等待生产者刷新未完成的消息时超时
【发布时间】:2019-08-26 12:52:36
【问题描述】:

我正在尝试在 BULK 模式下使用具有以下属性的 Kafka Connect JDBC 源连接器。

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
timestamp.column.name=timestamp
connection.password=XXXXX
validate.non.null=false
tasks.max=1
producer.buffer.memory=2097152
batch.size=1000
producer.enable.idempotence=true
offset.flush.timeout.ms=300000
table.types=TABLE,VIEW
table.whitelist=materials
offset.flush.interval.ms=5000
mode=bulk
topic.prefix=mysql-
connection.user=kafka_connect_user
poll.interval.ms=200000
connection.url=jdbc:mysql://<DBNAME>
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

我收到以下关于提交偏移量的错误,更改各种参数似乎效果不大。

[2019-04-04 12:42:14,886] INFO WorkerSourceTask{id=SapMaterialsConnector-0} flushing 4064 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-04-04 12:42:19,886] ERROR WorkerSourceTask{id=SapMaterialsConnector-0} Failed to flush, timed out while waiting for producer to flush outstanding 712 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)

【问题讨论】:

  • 您如何运行 Confluent Platform?在你的笔记本电脑上?您看到的消息表明向代理发送消息存在问题。例如,如果它被重载。
  • Kafka 是在 Azure 上的 HDInsight 上作为托管服务运行的。集群有三个代理。 “批量”数据的大小小于 20 MB。有什么好的指南可以检查集群是否过载?

标签: apache-kafka apache-kafka-connect kafka-producer-api


【解决方案1】:

该错误表示有大量消息缓冲,在达到超时之前无法刷新。


要解决这个问题,你可以

  • 在您的 Kafka Connect Worker Configs 中增加 offset.flush.timeout.ms 配置参数
  • 或者您可以通过减少 Kafka Connect Worker 配置中的 producer.buffer.memory 来减少缓冲的数据量。当您有相当大的消息时,这将成为最佳选择。

【讨论】:

  • producer.buffer.memory=2097152 offset.flush.timeout.ms=300000 所以2兆缓冲区和5分钟超时,这些真的是不合适的值吗?我不能在 5 分钟内发送 2 兆字节?
  • 我面临同样的问题,我已经尝试了上述答案,但它对我没有帮助。我在这里提到了有关我的问题的完整详细信息:github.com/confluentinc/kafka-connect-jdbc/issues/… 请提供任何 cmets
  • 你知道这个属性是如何为 kafka connect worker 调用的吗?我们的融合云实例在“producer.override.offset.flush.timeout.ms”和“offset.flush.timeout.ms”下无法识别。
【解决方案2】:

启用security.protocol=SSL 时,请确保 Connect 工作人员和 Connect 生产者有单独的 SSL 参数。 为两者提供 SSL 设置

# Authentication settings for Connect workers
ssl.keystore.location=/var/private/ssl/kafka.worker.keystore.jks
ssl.keystore.password=worker1234
ssl.key.password=worker1234

# Authentication settings for Connect producers used with source connectors
producer.ssl.keystore.location=/var/private/ssl/kafka.source.keystore.jks
producer.ssl.keystore.password=connector1234
producer.ssl.key.password=connector1234

https://docs.confluent.io/5.2.3/connect/security.html#separate-principals

【讨论】:

    【解决方案3】:

    如果您尝试连接融合云,则此错误可能是因为工作人员属性中缺少配置,请确保您添加了生产者和消费者配置。

    consumer.ssl.endpoint.identification.algorithm=https
    consumer.sasl.mechanism=PLAIN
    consumer.request.timeout.ms=20000
    consumer.retry.backoff.ms=500
    consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="API_KEY" password="SECRET";
    consumer.security.protocol=SASL_SSL
    
    producer.ssl.endpoint.identification.algorithm=https
    producer.sasl.mechanism=PLAIN
    producer.request.timeout.ms=20000
    producer.retry.backoff.ms=500
    producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="API_KEY" password="SECRET";
    producer.security.protocol=SASL_SSL
    

    【讨论】:

      【解决方案4】:

      我不知道这是否对某人有帮助。我在使用 Oracle Connector CDC 时遇到了同样的错误,错误是因为该表没有主键。我添加了主键,效果很好。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-06-08
        • 1970-01-01
        • 1970-01-01
        • 2014-10-16
        • 1970-01-01
        • 2020-06-04
        相关资源
        最近更新 更多