【问题标题】:Kafka elasticsearch connector - 'Flush timeout expired with unflushed records:'卡夫卡弹性搜索连接器 - '刷新超时过期未刷新记录:'
【发布时间】:2018-07-14 18:28:36
【问题描述】:

我对 kafka -> elasticsearch 连接器有一个奇怪的问题。第一次开始时一切都很好,我在 elasticsearch 中收到了一个新数据并通过 kibana 仪表板检查了它,但是当我使用相同的生产者应用程序将新数据生成到 kafka 并尝试再次启动连接器时,我没有在弹性搜索中获取任何新数据。 现在我收到这样的错误:

[2018-02-04 21:38:04,987] ERROR WorkerSinkTask{id=log-platform-elastic-0} Commit of offsets threw an unexpected exception for sequence number 14: null (org.apache.kafka.connect.runtime.WorkerSinkTask:233)
org.apache.kafka.connect.errors.ConnectException: Flush timeout expired with unflushed records: 15805

我正在使用下一个命令来运行连接器:

/usr/bin/connect-standalone /etc/schema-registry/connect-avro-standalone.properties log-platform-elastic.properties

connect-avro-standalone.properties

bootstrap.servers=kafka-0.kafka-hs:9093,kafka-1.kafka-hs:9093,kafka-2.kafka-hs:9093
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
#rest.host.name=
rest.port=8084
#rest.advertised.host.name=
#rest.advertised.port=
plugin.path=/usr/share/java

log-platform-elastic.properties

name=log-platform-elastic
key.converter=org.apache.kafka.connect.storage.StringConverter
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=member_sync_log, order_history_sync_log # ... and many others
key.ignore=true
connection.url=http://elasticsearch:9200
type.name=log

我检查了与 kafka 代理、elasticsearch 和 schema-registry 的连接(此时,schema-registry 和连接器在同一台主机上),一切都很好。 Kafka 代理在端口 9093 上运行,我可以使用 kafka-avro-console-consumer 从主题中读取数据。 对于这方面的任何帮助,我将不胜感激!

【问题讨论】:

标签: elasticsearch apache-kafka apache-kafka-connect confluent-platform


【解决方案1】:

只需将 flush.timeout.ms 更新为大于 10000(默认为 10 秒)

根据文档:

flush.timeout.ms 用于周期性的超时时间(以毫秒为单位) 刷新,并在等待缓冲区空间可用时 添加记录时完成的请求。如果超过此超时 任务将失败。

类型:long 默认值:10000 重要性:low

See documentation

【讨论】:

    【解决方案2】:

    我们可以优化弹性搜索配置来解决问题。配置参数请参考以下链接

    https://docs.confluent.io/current/connect/kafka-connect-elasticsearch/configuration_options.html

    以下是可以控制消息速率流以最终帮助解决问题的关键参数:

    flush.timeout.ms:增加可能有助于在冲洗时间上提供更多呼吸

    用于定期刷新的超时时间(以毫秒为单位),以及何时 等待缓冲区空间被完成的请求提供为 记录被添加。如果超过此超时,任务将失败。

    max.buffered.records:尝试减少缓冲区记录限制

    每个任务在阻塞前缓冲的最大记录数 接受更多记录。此配置可用于限制 每个任务的内存使用情况

    batch.size:尝试减小批量大小

    写入时要作为批处理的记录数 弹性搜索

    tasks.max:并行线程数(消费者实例)减少或增加。如果带宽无法处理减少任务可能会有所帮助,这将控制 Elastic Search。

    通过调整上述参数解决了我的问题

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-18
      • 1970-01-01
      • 1970-01-01
      • 2020-08-27
      • 2016-09-03
      • 1970-01-01
      相关资源
      最近更新 更多