【发布时间】:2026-02-02 07:10:01
【问题描述】:
我的目标是使用 Kafka 作为源和 Flink 作为流处理引擎来设置一个高吞吐量的集群。这就是我所做的。
我已经在 master 和 worker 上设置了一个 2 节点集群,配置如下。
掌握 flink-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 256
taskmanager.heap.mb: 512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
Worker flink-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 512 #256
taskmanager.heap.mb: 1024 #512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
Master 节点上的slaves 文件如下所示:
<WORKER_IP_ADDR>
localhost
两个节点上的 flink 设置位于同名文件夹中。我通过运行在主服务器上启动集群
bin/start-cluster-streaming.sh
这将启动 Worker 节点上的任务管理器。
我的输入源是 Kafka。这是sn-p。
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream =
env.addSource(
new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);
env.execute("Kafka stream");
这是我的 Sink 函数
public class MySink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
public void invoke(String arg0) throws Exception {
processMessage(arg0);
System.out.println("Processed Message");
}
}
这是我的 pom.xml 中的 Flink 依赖项。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9.0</version>
</dependency>
然后我在master上用这个命令运行打包好的jar
bin/flink run flink-test-jar-with-dependencies.jar
但是,当我将消息插入 Kafka 主题时,我能够仅在主节点上考虑来自我的 Kafka 主题的所有消息(通过我的SinkFunction 实现的调用方法中的调试消息)。
在作业管理器 UI 中,我可以看到 2 个任务管理器,如下所示:
- 为什么工作节点没有收到任务?
- 我是否缺少某些配置?
【问题讨论】:
-
感谢您发布这么好的问题!您如何在主服务器上获取这些调试消息?在 master (JobManager) 上执行用户代码是不可能的,你使用的是 Flink 0.9.0 还是 0.10-SNAPSHOT?你的
stringSinkFunction怎么样? (它只是打印到标准输出吗?) -
@rmetzger,不客气。我已经更新了这个问题。任何帮助将不胜感激。
-
@SudarshanShubakar,从屏幕截图中可以看出,您已经注册了 2 个 TM,每个 TM 有 50 个插槽。这也与您的配置相对应。此外,看起来您的工作执行得很好。在 100 个插槽中的每一个中,都部署了任务
Custom Source -> Stream Sink (x/100)。因此,我想知道什么对您不起作用。会不会是您的主题少于100分区?由于 Flink 在 Kafka 分区和源任务之间创建了映射,因此会有一些任务没有收到任何输入。 -
ok@TillRohrmann 这可能是问题所在。我相信关于 Kafka 主题的分区少于 100 个。让我在更改后报告。
-
好吧@TillRohrmann 评论解决了这个问题。你能把你的评论变成答案吗?
标签: streaming cluster-computing apache-kafka apache-flink