【发布时间】:2022-06-17 02:42:28
【问题描述】:
我想处理来自 kafka 的消息,然后提交该消息,一旦 flink 消费和处理所有消息结束作业,使用 taskmanager 和心跳升级进程
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(address)
.setTopics(inputTopic)
.setGroupId(consumerGroup)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("enable.auto.commit", "true")
.setProperty("commit.offsets.on.checkpoint", "true")
.build();
DataStream<String> stream = environment.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
ObjectMapper mapper = new ObjectMapper();
stream.map((value) -> {
【问题讨论】:
-
请提供足够的代码,以便其他人更好地理解或重现问题。
标签: java apache-kafka apache-flink