【发布时间】:2022-01-28 16:54:44
【问题描述】:
我当前的 Flink 应用程序
- 基于 Flink Stateful Function 3.1.1,从 Kafka 读取消息,处理消息,然后下沉到 Kafka Egress
- 应用程序已按照指南部署在 K8s 上并且运行良好:Stateful Functions Deployment
- 基于标准部署,已开启kubernetes HA
我的目标
我想自动放大/缩小有状态函数。 我也想知道如何创建更多的备用作业管理器
我对 HA 的观察
我尝试在flink-config ConfigMap 中设置kubernetes.jobmanager.replicas:
---
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: shadow-fn
data:
flink-conf.yaml: |+
kubernetes.jobmanager.replicas: 7
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
我在 K8s 中没有看到备用作业管理器。
那我直接调整deployment的副本:
apiVersion: apps/v1
kind: Deployment
metadata:
name: statefun-master
spec:
replicas: 7
备用作业经理出现。我查看 pod 日志,leader 选举成功。但是,当我在网络浏览器中访问 UI 时,它会显示:
{"errors":["Service temporarily unavailable due to an ongoing leader election. Please refresh."]}
我的方法有什么问题?
关于缩放的问题
Reactive Mode 正是我所需要的。我试过但失败了,作业管理器有错误消息:
Exception in thread "main" org.apache.flink.configuration.IllegalConfigurationException: Reactive mode is configured for an unsupported cluster type. At the moment, reactive mode is only supported by standalone application clusters (bin/standalone-job.sh).
似乎不应该以这种方式完成有状态函数自动缩放。 那么,进行自动缩放的正确方法是什么?
可能的方法(可能不正确)
经过一番研究,我目前的方向是:
- Job Manger 与自动缩放无关。它与 K8s 上的 HA 有关。我只需要确保 Job Manager 具有正确的故障转移行为
- 我的有状态函数是
Flink remote services,即它们是常规的k8s services。它们可以以KNative service的形式部署,实现自动伸缩。只有当 http 请求来自 Flink 的 worker 时,服务的副本才会增加 - 最重要的部分,Flink 的 worker(或任务管理器) 我还不知道如何进行自动缩放。也许我应该使用
KNative来部署 Flink worker? 如果它不适用于 KNative,也许我应该完全改变 flink 运行时部署。例如,尝试原来的reactive demo。但恐怕 Stateuful 函数不打算那样工作。
最后
我一遍又一遍地阅读 Flink 文档和 Github 示例,但找不到更多信息来执行此操作。感谢任何提示/说明/指南!
【问题讨论】:
标签: apache-flink knative-serving flink-statefun