【问题标题】:Consuming events evenly using Flink-Kafka connector使用 Flink-Kafka 连接器均匀消费事件
【发布时间】:2020-10-05 19:47:15
【问题描述】:

我正在使用 Flink 处理来自 Kafka 的流数据。流程非常基础,从 Kafka 消费,数据丰富,然后下沉到 FS。

在我的例子中,分区的数量大于 Flink 的并行度。我注意到 Flink 不会从所有分区均匀消耗。

偶尔会在一些 Kafka 分区中创建滞后。 重新启动应用程序有助于 Flink “重新平衡”消耗并快速关闭滞后。但是,一段时间后,我看到其他分区出现滞后等情况。

看到这种行为,我尝试按照 Flink 文档中的建议使用 rebalance() 重新平衡消耗率:

“分区元素循环,为每个分区创建相等的负载。对于存在数据倾斜的性能优化很有用。”

dataStream.rebalance();

代码更改很小,只需将 rebalance() 添加到数据流源即可。 使用 rebalance() 运行应用程序会导致 Flink 出现非常奇怪的行为:

我将并行度设置为260并提交了一个作业,但是由于某种原因,作业管理器将槽数乘以4。查看执行计划图,我意识到现在所有数据都被260个核心消耗了,然后它被发送到 3 个接收器(希望是均匀的)。由于资源不足,作业失败。

由于我想使用 260 个内核,我尝试再次提交作业,这次的并行度为 65 (=260/4)。 作业运行良好,但处理率低。在 Web UI 中,我发现槽的总数不等于可用任务槽 + 正在运行的任务。但是,如果我将 rtbJsonRequest(我提交的作业)称为具有 65 (=260/4) 个任务槽的作业,而不是它所写的 260,它等于。

长话短说,我正在尝试找到一种方法来平衡 Kafka 分区的消耗。根据 Flink 文档 rebalance() 是我需要的,但显然我用错了。

添加更多输入。主题有520个partition,并行度为260(每个core有2个partition)。

我可以清楚地看到,很少有分区的消耗率非常低:

【问题讨论】:

  • 分区本身是否平衡? IE。分区之间的数据如何拆分?
  • 是的,分区的传入速率看起来正常且均匀。

标签: apache-flink flink-streaming


【解决方案1】:

在源之后插入重新平衡不会平衡源本身,而是会通过在作业图中插入循环网络洗牌来平衡随后的输入。这样做最多可以平衡接收器上的负载,这对您的问题没有帮助。

您总共消耗了多少个 Kafka 分区?您正在使用主题或分区发现吗?重新启动作业有帮助似乎很奇怪。

【讨论】:

  • 我有 500 个分区。似乎默认情况下,分区发现被禁用(不知道)-所以我没有使用它。我添加了 props.setProperty("flink.partition-discovery.interval-millis", "60000");跳起来会有所帮助
  • 好吧,它没有帮助。我很乐意听到其他建议。
  • 您是否尝试过在整个作业中使用 250 的并行度?这似乎是显而易见的答案,因为它会为每个任务槽提供恰好 2 个 kafka 分区。使用 260,您将获得 240,每个 2,以及 20 和 1,这似乎没有多大帮助。
  • 我从 1 比 1 的比例开始(260 个核心对 260 个分区),但没有帮助。即使我将并行度级别更改为 250,我也会看到相同的行为。
【解决方案2】:

我发现我的 2 个 Flink 任务管理器与其他工作人员相比处理速度非常低。

正如您在下面的屏幕截图中所见,每秒少于 5K 的事件,而其他处理至少 37K:

这真的帮助我理解我遇到的是环境问题,而不是 Flink 问题。 就我而言,安装 CPU 控制器并重新启动机器即可解决问题。

在这个过程中我学到了一个很重要的东西,默认情况下 Flink 不会发现 Kafka 分区。如果您想添加它,只需添加到您的属性:

"flink.partition-discovery.interval-millis", "time_interval"

【讨论】:

  • 啊哈!感谢您的跟进。
  • Flink 仅当您计划在作业启动后在 Kafka 主题中动态创建更多分区时才需要分区发现。大多数时候它是不需要的。
【解决方案3】:
Properties properties = new Properties();
properties.setProperty("group.id", consumerGroup);
properties.setProperty("auto.offset.reset", autoOffsetReset);
properties.setProperty("bootstrap.servers", kafkaBootstrapServers);
properties.setProperty(
    "flink.partition-discovery.interval-millis", "30000");

我在属性中添加了分区发现,作业抛出 NPE。这是设置分区发现属性的正确方法吗?

java.lang.NullPointerException: null
        at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:77)

【讨论】:

  • 它适用于我,但是,我使用的是 Kafka 2 而不是 09,并且分区管理在 2 和 09 之间是不同的。在 2 中,它由 zk 在 09 中的经纪人管理。我想这就是您遇到问题的原因。
猜你喜欢
  • 1970-01-01
  • 2018-10-01
  • 2017-08-18
  • 2018-10-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-11-21
  • 1970-01-01
相关资源
最近更新 更多