【问题标题】:Flink Kinesis Connector not consuming messages from Kinesis Data Streams at full capacityFlink Kinesis 连接器未满负荷使用来自 Kinesis Data Streams 的消息
【发布时间】:2020-03-24 05:42:55
【问题描述】:

我正在测试 Apache Flink(使用 v1.8.2)从 Kinesis Data Stream 读取消息的速度。 Kinesis Data Streams 仅包含一个分片,它包含 40,000 条消息。每条消息的大小小于 5 KB。

尝试使用 TRIM_HORIZON 从最旧的消息中读取流,我希望应用能够快速读取所有消息,因为每个分片可以通过 GetRecords 支持高达每秒 2 MB 的最大总数据读取速率。使用连接器配置 (SHARD_GETRECORDS_MAX=400, SHARD_GETRECORDS_INTERVAL_MILLIS=1000) 应用程序应在几分钟内完成以读取所有消息。但由于某种原因,阅读所有消息需要花费大量时间。

您介意检查一下我的连接器配置有什么问题吗?感谢您的帮助。

    public static DataStream<ObjectNode> createKinesisStream(
            StreamExecutionEnvironment env) throws IOException {
        Properties properties = new Properties();
        properties.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
        properties.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
        properties.put(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "400");
        properties.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");

        properties.put(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "PROFILE");
        properties.put(ConsumerConfigConstants.AWS_PROFILE_NAME, "d");

        return env.addSource(new FlinkKinesisConsumer<>(
                    "stream1", new JsonNodeDeserializationSchema(), properties));
    }

   main() code:
   final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
   env.getConfig().setAutoWatermarkInterval(10000L);

   source = AppConfig.createKinesisStream(env);

   DataStream<ObjectNode> filteredStream = source
                .map(new CustomMap());
I have put a counter in discarding sink, in one fetch it read 27 messages( counter 829-855)

24 Mar 2020 08:11:50,519 INFO  DiscardingSink:15 - 827
24 Mar 2020 08:11:50,519 INFO  DiscardingSink:15 - 828
24 Mar 2020 08:11:51,631 INFO  DiscardingSink:15 - 829
24 Mar 2020 08:11:51,631 INFO  DiscardingSink:15 - 830
.
.
24 Mar 2020 08:11:51,639 INFO  DiscardingSink:15 - 854
24 Mar 2020 08:11:51,639 INFO  DiscardingSink:15 - 855
24 Mar 2020 08:11:52,749 INFO  DiscardingSink:15 - 856

【问题讨论】:

  • 添加更多:不存在其他消费者,并且 Kinesis 流上的 ReadProvisionedThroughputExceeded 为零

标签: apache-flink amazon-kinesis stream-processing


【解决方案1】:

一种可能的解释是,您的管道中的某些东西正在对源施加背压。要仅测量源的容量,您可以将工作简化为:

source.addSink(new DiscardingSink<>());

DiscardingSink 在哪里

public static class DiscardingSink<OUT> implements SinkFunction<OUT> {

    @Override
    public void invoke(OUT value, Context context) throws Exception {
    }
}

【讨论】:

  • 谢谢。 getRecords 仍在获取
  • 如果将间隔缩短到 250 毫秒会怎样?
  • 这没有帮助,大卫。 getRecords 吞吐量低
  • 我建议你在 flink-user 邮件列表中询问这个问题,在那里你可能会找到对使用 kinesis 和 flink 有更多了解的人。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-09-26
  • 1970-01-01
  • 2017-02-26
  • 2016-08-20
  • 1970-01-01
  • 2018-03-26
  • 2021-11-01
相关资源
最近更新 更多