【问题标题】:Exactly-once semantics in Flink Kafka ProducerFlink Kafka Producer 中的 Exactly-once 语义
【发布时间】:2019-12-10 00:36:57
【问题描述】:

我正在尝试使用 Kafka Source 和 Sink 测试 Flink 一次性语义:

  1. 运行 flink 应用,简单地将消息从一个主题传输到另一个主题,并行度=1,检查点间隔 20 秒
  2. 每 2 秒使用 Python 脚本生成具有递增整数的消息。
  3. 使用处于 read_committed 隔离级别的控制台使用者读取输出主题。
  4. 手动杀死TaskManager

我希望看到输出主题中的整数单调递增,而不管 TaskManager 的终止和恢复。

但实际上在控制台消费者输出中看到了一些意想不到的东西:

32
33
34
35
36
37
38
39
40
-- TaskManagerKilled
32
34
35
36
40
41
46
31
33
37
38
39
42
43
44
45

看起来像是在输出主题中重放的检查点之间的所有消息。 它应该是正确的行为还是我做错了什么?

已恢复一个快照: Flink UI

我的 Flink 代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);
        env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data"));

        Properties producerProperty = new Properties();
        producerProperty.setProperty("bootstrap.servers", ...);
        producerProperty.setProperty("zookeeper.connect", ...);
        producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000");
        producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
        producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

        Properties consumerProperty = new Properties();
        consumerProperty.setProperty("bootstrap.servers", ...);
        consumerProperty.setProperty("zookeeper.connect", ...);
        consumerProperty.setProperty("group.id", "test2");

        FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<String>("stringTopic1", new ComplexStringSchema(), consumerProperty);
        consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());

        FlinkKafkaProducer<String> producer1 = new FlinkKafkaProducer<String>("test",  new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), producerProperty, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        producer1.ignoreFailuresAfterTransactionTimeout();
        DataStreamSource<String> s1 = env.addSource(consumer1);
        s1.addSink(producer1);
        env.execute("Test");
    }

【问题讨论】:

  • 你应该提供你的依赖版本,尤其是 Flink 和 Kafka。这项工作的日志也很有用(例如,可能会告诉您您的语义是否因某种原因发生了更改)。

标签: apache-kafka apache-flink


【解决方案1】:

除了将生产者设置为exactly-once语义外,您还需要将消费者配置为仅读取来自kafka的已提交消息。默认情况下,消费者将读取已提交和未提交的消息。将此设置添加到您的消费者应该会让您更接近您想要的行为。

consumerProperties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

【讨论】:

    【解决方案2】:

    Flink 以可配置的定期间隔生成检查点。当检查点恢复时,Flink 将状态回滚到输入流中最后检查点的位置(不一定与最后处理/消费的位置相同)。有不同的方法来确保精确一次的语义。您可以使用支持一次性语义的生产者(接收器),请参阅:Fault Tolerance Guarantees in Flink sinks

    或者,您可以在您的消费者中支持一次性语义。假设由多个 worker 持久化的唯一整数(并行度 > 1),确保一次性处理的一种方法如下:

    1. 假设当前检查点 id 是 Ckpt N。将所有已处理的整数(大型事件的已处理事件的指纹)存储在 Ckpt N 的状态中。您可以通过让您的消费者实现 ListCheckpointed 接口来存储Ckpt N 中的状态(指纹或整数)。

    2. 一旦 Flink 移动到下一个检查点(Ckpt N+1),过滤掉所有存储在 Ckpt N 状态中的整数,以确保只处理一次。将未过滤的已处理整数(或已处理事件的指纹)存储在 Ckpt N + 1 的状态中(即丢弃 Ckpt N 的状态)。

    您只需要存储在两个检查点之间发生的已处理事件(或在您的情况下为整数)的指纹,并在以后保留新检查点时丢弃。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-08-25
      • 1970-01-01
      • 2019-06-23
      • 1970-01-01
      • 2021-01-16
      • 2019-07-18
      • 1970-01-01
      • 2019-08-25
      相关资源
      最近更新 更多