【问题标题】:kafka -> flink - performance issueskafka -> flink - 性能问题
【发布时间】:2016-02-27 13:14:12
【问题描述】:

我正在研究一些每秒生成约 30K 消息的 kafka 主题。我有一个 flink 拓扑设置来读取其中一个,聚合一点(5 秒窗口),然后(最终)写入数据库。

当我运行拓扑并删除除读取 -> 聚合步骤之外的所有内容时,每分钟我只能收到约 30K 条消息。没有任何地方可以发生背压。

我做错了什么?


编辑:

  1. 我无法更改有关主题空间的任何内容。每个主题都有一个分区,并且有数百个。
  2. 每条消息都是一个压缩的 thrift 对象,平均大小为 2-3Kb

看来我只能获得 ~1.5 MB/s。不接近提到的 100MB/s。

当前代码路径:

DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);  
DataStream<Tuple4<Long, Long, Integer, String>> ds4 = dataStream4.rebalance().flatMap(new mapper2("data_4")).setParallelism(4);

public class mapper2 implements FlatMapFunction<byte[], Tuple4<Long, Long, Integer, String>> {
    private String mapId;
    public mapper2(String mapId) {
        this.mapId = mapId;
    }

    @Override
    public void flatMap(byte[] bytes, Collector<Tuple4<Long, Long, Integer, String>> collector) throws Exception {
        TimeData timeData = (TimeData)ts_thriftDecoder.fromBytes(bytes);
        Tuple4 tuple4 = new Tuple4<Long, Long, Integer, String>();
        tuple4.f0 = timeData.getId();
        tuple4.f1 = timeData.getOtherId();
        tuple4.f2 = timeData.getSections().size();
        tuple4.f3 = mapId;

        collector.collect(tuple4);
    }
}

【问题讨论】:

  • 您从哪里获得约 30K msg/sec 的数据?来自你自己的测试?如果你在某处读到过,能否提供链接?
  • 我有生产者代码,可以从那里读取吞吐量。
  • 所以你的生产者以 ~30K msg/sec 的速度生产,但你的消费者只以 ~30K msg/min 的速度消费?
  • 是的。这就是我所看到的。我知道一旦数据进入 flink,它就会非常迅速。我有另一个将> 6M记录/分钟推入cassandra的拓扑。我想知道 kafka 消费者 FlinkKafkaConsumer081 是否存在一些固有的瓶颈。请注意,此主题只有一个分区。这会是个问题吗?
  • 好的。这就说得通了。您可能想要编辑您的问题,以更清楚地说明这些事情,和/或您根据我的问题发现的任何其他事实您的调试。

标签: java apache-kafka apache-flink flink-streaming


【解决方案1】:

从代码中,我看到两个可能导致性能问题的潜在组件:

  • FlinkKafka 消费者
  • Thrift 反序列化器

为了了解瓶颈在哪里,我会先测量 Flink 从 Kafka 主题读取的原始读取性能。

因此,您可以在集群上运行以下代码吗?

public class RawKafka {

private static final Logger LOG = LoggerFactory.getLogger(RawKafka.class);

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);

    dataStream4.flatMap(new FlatMapFunction<byte[], Integer>() {
        long received = 0;
        long logfreq = 50000;
        long lastLog = -1;
        long lastElements = 0;

        @Override
        public void flatMap(byte[] element, Collector<Integer> collector) throws Exception {
            received++;
            if (received % logfreq == 0) {
                // throughput over entire time
                long now = System.currentTimeMillis();

                // throughput for the last "logfreq" elements
                if(lastLog == -1) {
                    // init (the first)
                    lastLog = now;
                    lastElements = received;
                } else {
                    long timeDiff = now - lastLog;
                    long elementDiff = received - lastElements;
                    double ex = (1000/(double)timeDiff);
                    LOG.info("During the last {} ms, we received {} elements. That's {} elements/second/core. GB received {}",
                            timeDiff, elementDiff, elementDiff*ex, (received * 2500) / 1024 / 1024 / 1024);
                    // reinit
                    lastLog = now;
                    lastElements = received;
                }
            }
        }
    });

    env.execute("Raw kafka throughput");
}
}

此代码测量来自 Kafka 的 50k 个元素之间的时间,并记录从 Kafka 读取的元素数量。 在我的本地机器上,我的吞吐量约为 330k 个元素/核心/秒:

16:09:34,028 INFO  RawKafka                                                      - During the last 88 ms, we received 30000 elements. That's 340909.0909090909 elements/second/core. GB received 0
16:09:34,028 INFO  RawKafka                                                      - During the last 86 ms, we received 30000 elements. That's 348837.20930232556 elements/second/core. GB received 0
16:09:34,028 INFO  RawKafka                                                      - During the last 85 ms, we received 30000 elements. That's 352941.17647058825 elements/second/core. GB received 0
16:09:34,028 INFO  RawKafka                                                      - During the last 88 ms, we received 30000 elements. That's 340909.0909090909 elements/second/core. GB received 0
16:09:34,030 INFO  RawKafka                                                      - During the last 90 ms, we received 30000 elements. That's 333333.3333333333 elements/second/core. GB received 0
16:09:34,030 INFO  RawKafka                                                      - During the last 91 ms, we received 30000 elements. That's 329670.3296703297 elements/second/core. GB received 0
16:09:34,030 INFO  RawKafka                                                      - During the last 85 ms, we received 30000 elements. That's 352941.17647058825 elements/second/core. GB received 0

我真的很想知道您从 Kafka 中读取的吞吐量达到了多少。

【讨论】:

  • 我必须为整个问题道歉。根据您工具的输出,我回去检查了我们在生产者处测量“流量”的方式。我们测量的内容及其报告的值存在一些语义问题。总而言之:我每秒可以阅读约 5K 条消息,我怀疑它会更高。
  • 没问题。如果您需要进一步的建议来提高性能,请告诉我。
  • @rmetzger 感谢您的工具!使用它时,我发现我在没有特殊序列化程序的情况下以 70 MB/s 的速度消耗(使用 SimpleStringSchema)。最高可达 ~35K msg/s。我想使用 Avro/Byte[] 优化我的 msg/s。你能指定我如何开始吗?
  • 你想优化什么?
  • @rmetzger 我目前的吞吐量是 ~35K msg/s,有什么办法可以增加它,就像 ~352941 elements/second/core 在你的情况下一样
【解决方案2】:

我从未使用过 Flink 或它的 KafkaConsumer,但我有在 Storm 环境中使用 Kafka 的经验。以下是我的一些想法。确定 Kafka 速度的方式有很多变数。这里有一些需要考虑和调查的事情,当你有这些问题时,请在你的问题中添加更多细节。

  • 添加更多分区应该会增加吞吐量。所以是的,添加更多分区和消费者应该会看到性能的线性增长。
  • Kafka 吞吐量与消息大小有关。因此,如果您有大量消息,吞吐量会相应受到影响。
  • 您是否有任何证据支持您对 Kafka Consumer 应该更快的期望?虽然我同意 30K msg/min 真的很慢,但你有证据支持你的期望吗?比如使用 FlinkKafkaConsumer(类似于 this)进行一般的速度测试,或者使用普通的 Kafka 消费者来查看消费速度,然后与 Flink 的消费者进行比较?

它消耗缓慢可能有很多原因,我试图强调一些与 Kafka 相关的一般内容。我敢肯定,你可能在 Flink 中可以做一些我不知道的事情来加速消费,因为我从未使用过它。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-01-08
    • 1970-01-01
    • 2021-10-12
    • 2014-08-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-14
    相关资源
    最近更新 更多