【发布时间】:2019-12-10 00:36:57
【问题描述】:
我正在尝试使用 Kafka Source 和 Sink 测试 Flink 一次性语义:
- 运行 flink 应用,简单地将消息从一个主题传输到另一个主题,并行度=1,检查点间隔 20 秒
- 每 2 秒使用 Python 脚本生成具有递增整数的消息。
- 使用处于 read_committed 隔离级别的控制台使用者读取输出主题。
- 手动杀死TaskManager
我希望看到输出主题中的整数单调递增,而不管 TaskManager 的终止和恢复。
但实际上在控制台消费者输出中看到了一些意想不到的东西:
32
33
34
35
36
37
38
39
40
-- TaskManagerKilled
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45
看起来像是在输出主题中重放的检查点之间的所有消息。 它应该是正确的行为还是我做错了什么?
已恢复一个快照: Flink UI
我的 Flink 代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data"));
Properties producerProperty = new Properties();
producerProperty.setProperty("bootstrap.servers", ...);
producerProperty.setProperty("zookeeper.connect", ...);
producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000");
producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
Properties consumerProperty = new Properties();
consumerProperty.setProperty("bootstrap.servers", ...);
consumerProperty.setProperty("zookeeper.connect", ...);
consumerProperty.setProperty("group.id", "test2");
FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<String>("stringTopic1", new ComplexStringSchema(), consumerProperty);
consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());
FlinkKafkaProducer<String> producer1 = new FlinkKafkaProducer<String>("test", new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), producerProperty, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
producer1.ignoreFailuresAfterTransactionTimeout();
DataStreamSource<String> s1 = env.addSource(consumer1);
s1.addSink(producer1);
env.execute("Test");
}
【问题讨论】:
-
你应该提供你的依赖版本,尤其是 Flink 和 Kafka。这项工作的日志也很有用(例如,可能会告诉您您的语义是否因某种原因发生了更改)。