【问题标题】:flink: handling backpressure (source: kafka, sink: elasticsearch)flink:处理背压(来源:kafka,接收器:elasticsearch)
【发布时间】:2019-06-20 11:35:45
【问题描述】:

我有一个 flink 作业,它正在从 Kafka 读取数据,执行某些聚合并将结果写入弹性搜索索引。我看到源上的背压很高。高背压导致从 Kafka 读取数据缓慢,我看到数据在网络堆栈中排队(netstat RecvQ 显示数万字节的数据卡在源 kafka 连接中,数据最终被读取)这反过来导致延迟后要沉入弹性搜索的数据,并且这种延迟不断增加。

源每分钟产生约 17,500 条记录,Flink 作业为每个传入记录分配(事件)时间戳,执行 12 种不同类型的 keyBy,在 1 分钟的滚动窗口中匹配事件,在此键控窗口流上执行聚合操作最后将结果写入12个不同的elasticsearch索引(每次写入都是一次插入)。

问题在于写入 elasticsearch 的数据开始滞后,因此仪表板结果(基于 elasticsearch 构建)不再是实时的。我的理解是,这是由于背压的增加而发生的。不知道如何解决这个问题。集群本身是一个基于 VM 的单节点独立集群,具有 64GB RAM(任务管理器配置为使用 20GB)和 16 个 vCPU。没有证据(从 htop 中看到)表明 CPU 或内存受到限制。只有一个任务管理器,这是这个集群上唯一的 flink 作业。

我不确定这个问题是由于集群中的某些本地资源问题还是由于写入 elasticsearch 速度慢。我已将 setBulkFlushMaxActions 设置为 1(正如我在任何地方看到的所有代码示例中所做的那样),我还需要设置 setBulkFlushInterval 和/或 setBulkFlushMaxSizeinMB 吗?

我已经查看了https://www.da-platform.com/flink-forward-berlin/resources/improving-throughput-and-latency-with-flinks-network-stack,但还没有尝试过幻灯片 19 中列出的调整选项,不确定为这些参数设置什么值。

最后,在 IntelliJ IDE 中运行相同的作业时,我认为我没有看到相同的问题。

我将排除所有聚合,然后将它们一一添加回来,看看是否存在触发此问题的特定聚合?

任何具体的指针将不胜感激,也将尝试 setBulkFlushInterval 和 setBulkFlushMaxSizeinMB。

更新 1,2019 年 1 月 29 日 似乎两个节点都以非常高的堆使用率运行,因此 GC 不断运行以尝试清理 JVM 中的空间。将物理内存从 16 GB 增加到 32 GB,然后重新启动节点。这应该有望解决问题,将在另一个 24 小时内知道。

【问题讨论】:

    标签: elasticsearch apache-kafka apache-flink flink-streaming


    【解决方案1】:

    通常在这种情况下,问题出在与外部数据存储的连接上——要么带宽不足,要么对每条记录进行同步写入,而不是批量写入。

    验证 elasticsearch 接收器是否是问题(而不是网络堆栈配置)的一种简单方法是将其替换为丢弃接收器(一个什么都不做的接收器),看看这是否解决了问题.类似的东西

    public static class NullSink<OUT> implements SinkFunction<OUT> {
        @Override
        public void invoke(OUT value, Context context) throws Exception {
        }
    }
    

    更新:

    问题在于您已将 bulk.flush.max.actions 设置为 1,从而阻止了与 elasticsearch 服务器的连接中的任何缓冲。

    【讨论】:

    • 好的,这似乎是问题所在。我没有使用 NullSink 而是一个类似的想法,我删除了 elasticsearch sink 并将结果打印到控制台,并且效果很好。我如何确保写入是批处理的?正如您所说的“为每条记录同步写入,而不是批量写入”,我该如何验证呢?我是否需要将 setBulkFlushMaxSizeinMB 设置为更大的数字(不确定默认值是多少)?
    • 我对 Elasticsearch 接收器不是很熟悉,但我会尝试使用 bulk processor settings
    • 通过将 bulk.flush.max.actions 设置为 1,您有效地禁用了缓冲。这应该大得多。
    • 我的想法是一样的,但是将最大操作设置为 1000 并没有帮助,我仍在尝试。同时,我还联系了 elasticsearch 支持,询问他们应该将这些值设置为什么。当我发现更多信息时会更新。
    • 默认情况下,每个检查点也会导致刷新和确认,因此过于频繁的检查点也会导致到 ES 服务器的频繁往返。
    【解决方案2】:

    通过增加(加倍)elasticsearch 集群节点上的 RAM 并将索引刷新间隔(在所有 elasticsearch 索引上)设置为 30 秒(默认为 1 秒),问题得到了解决。进行这些更改后,flink 中的背压报告为 ok,没有数据延迟,一切看起来都很好。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-09-09
      • 2017-12-28
      • 1970-01-01
      • 1970-01-01
      • 2021-08-16
      • 2018-09-29
      • 2021-01-08
      • 2021-04-15
      相关资源
      最近更新 更多