【发布时间】:2019-06-20 11:35:45
【问题描述】:
我有一个 flink 作业,它正在从 Kafka 读取数据,执行某些聚合并将结果写入弹性搜索索引。我看到源上的背压很高。高背压导致从 Kafka 读取数据缓慢,我看到数据在网络堆栈中排队(netstat RecvQ 显示数万字节的数据卡在源 kafka 连接中,数据最终被读取)这反过来导致延迟后要沉入弹性搜索的数据,并且这种延迟不断增加。
源每分钟产生约 17,500 条记录,Flink 作业为每个传入记录分配(事件)时间戳,执行 12 种不同类型的 keyBy,在 1 分钟的滚动窗口中匹配事件,在此键控窗口流上执行聚合操作最后将结果写入12个不同的elasticsearch索引(每次写入都是一次插入)。
问题在于写入 elasticsearch 的数据开始滞后,因此仪表板结果(基于 elasticsearch 构建)不再是实时的。我的理解是,这是由于背压的增加而发生的。不知道如何解决这个问题。集群本身是一个基于 VM 的单节点独立集群,具有 64GB RAM(任务管理器配置为使用 20GB)和 16 个 vCPU。没有证据(从 htop 中看到)表明 CPU 或内存受到限制。只有一个任务管理器,这是这个集群上唯一的 flink 作业。
我不确定这个问题是由于集群中的某些本地资源问题还是由于写入 elasticsearch 速度慢。我已将 setBulkFlushMaxActions 设置为 1(正如我在任何地方看到的所有代码示例中所做的那样),我还需要设置 setBulkFlushInterval 和/或 setBulkFlushMaxSizeinMB 吗?
我已经查看了https://www.da-platform.com/flink-forward-berlin/resources/improving-throughput-and-latency-with-flinks-network-stack,但还没有尝试过幻灯片 19 中列出的调整选项,不确定为这些参数设置什么值。
最后,在 IntelliJ IDE 中运行相同的作业时,我认为我没有看到相同的问题。
我将排除所有聚合,然后将它们一一添加回来,看看是否存在触发此问题的特定聚合?
任何具体的指针将不胜感激,也将尝试 setBulkFlushInterval 和 setBulkFlushMaxSizeinMB。
更新 1,2019 年 1 月 29 日 似乎两个节点都以非常高的堆使用率运行,因此 GC 不断运行以尝试清理 JVM 中的空间。将物理内存从 16 GB 增加到 32 GB,然后重新启动节点。这应该有望解决问题,将在另一个 24 小时内知道。
【问题讨论】:
标签: elasticsearch apache-kafka apache-flink flink-streaming