【问题标题】:Batch Size in kafka jdbc sink connectorkafka jdbc sink连接器中的批量大小
【发布时间】:2021-03-10 01:39:18
【问题描述】:

我想通过 jdbc sink 批量读取 5000 条记录,为此我在 jdbc sink 配置文件中使用了 batch.size:

name=jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
batch.size=5000
topics=postgres_users

connection.url=jdbc:postgresql://localhost:34771/postgres?user=foo&password=bar
file=test.sink.txt
auto.create=true

但是当新记录插入源数据库时,batch.size 没有影响,因为记录被插入到数据库中。

如何实现批量插入5000个?

【问题讨论】:

    标签: apache-kafka apache-kafka-connect confluent-platform connector


    【解决方案1】:

    没有直接的解决方案来批量接收记录,但如果它有效,我们会在属性下方尝试调整。我从未尝试过,但我理解 Kafka Sink Connector 只不过是一个消费者来消费来自该主题的消息。

    ma​​x.poll.records:单次调用poll()返回的最大记录数

    consumer.fetch.min.bytes:服务器应为获取请求返回的最小数据量。如果可用数据不足,则请求将在响应请求之前等待累积那么多数据

    fetch.wait.max.ms:代理将在向消费者客户端发送响应之前等待这段时间,除非它有足够的数据来填充响应(fetch.message.max .bytes)

    fetch.min.bytes:代理将等待此数据量填满,然后再将响应发送给消费者客户端。

    【讨论】:

    • 感谢您的回答,它很有帮助,但我使用的是 JDBC 接收器连接器,我认为我们在连接器级别没有这些配置,我不希望这些配置影响其他连接器,所以我不能将这些放在全球范围内
    【解决方案2】:

    为了设置批量大小,您有两个选项:

    • 在 Kafka Connect 实例(独立或分布式)使用的 worker.properties 文件中添加 ma​​x.poll.records=5000
    • 在连接器配置文件(分布式连接器的 JSON 文件)中设置相同的属性。

    对于第二个选项,您必须:

    • 通过在 worker.properties 中添加 connector.client.config.override.policy=All 来启用覆盖连接属性的可能性
    • 在连接器配置中使用 "consumer.override.max.poll.records" : 2000 配置批量大小(注意“consumer.override.”前缀)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-03-21
      • 2020-07-31
      • 2019-06-22
      • 2021-01-22
      • 2021-12-19
      • 2022-09-24
      • 2021-05-05
      • 2019-06-15
      相关资源
      最近更新 更多