【问题标题】:Kafka Streams processing speed slows when empty processor is added添加空处理器时,Kafka Streams 处理速度变慢
【发布时间】:2019-02-05 02:09:23
【问题描述】:

考虑一下这个 Kafka Streams 驱动程序

public class TestDriver {

    private static final String SOURCE = "SOURCE";

    public static void main(String[] args) throws Exception {

        ProtoDeserializer<Message> protoDeserializer = new ProtoDeserializer<>(Message.parser());
        ProtoSerializer<Message> protoSerializer = new ProtoSerializer<>();

        StringDeserializer stringDerializer = new StringDeserializer();
        StringSerializer stringSerializer = new StringSerializer();

        Topology topologyBuilder = new Topology();
        topologyBuilder.addSource(SOURCE, stringDerializer, protoDeserializer, "input-messages")

            .addProcessor(DummyProcessor.NAME, DummyProcessor::new, SOURCE)

            .addSink("MAIN", "output-messages", stringSerializer, protoSerializer, DummyProcessor.NAME)
        ;

        KafkaStreams streams = new KafkaStreams(topologyBuilder, getConfig());
        streams.cleanUp();
        streams.start();

        System.out.println(streams.toString());

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

    }

    private static Properties getConfig() {
        Properties config = new Properties();
        config.put(StreamsConfig.CLIENT_ID_CONFIG, "test.stream-processor");
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test.stream-processor");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker-1:9092,broker-2:9092,broker-3:9092");
        config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
        config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 10);
        config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
        return config;
    }
}

问题是,当没有处理器添加到拓扑中时(不包括.addProcessor()),从源到接收器的处理速度很好(意味着我目前产生25k消息/秒并且捕获没有问题上)。

但是,当添加DummyProcessor 时,它会突然处理 3k messages/s max(600k 字节)。

DummyProcessor 基本上什么都不做:

public class DummyProcessor extends AbstractProcessor<String, Message> {

    public static final String NAME = "DUMMY_PROCESSOR";

    public void process(String key, Message originalMessage) {
        context().forward(key, originalMessage);
        context().commit();
    }
}

是否会为 Streams 性能添加单个“空”处理器这样的开销?它的原因是什么? Kafka Streams 是否如此智能,以至于当没有处理器时它不执行 protobuf serde 并且只转发接收到的数据?无论如何要加快速度?

以这样的速度,我需要 x 数千个可用的 CPU 线程才能处理我的所有数据,因为 25k 消息/秒是我拥有的 1% 的数据。听起来很多。

【问题讨论】:

  • 删除context().commit();会提高性能吗?
  • @wardziniak 是的,每秒有 33 万条消息。我虽然提交()对于流工作是必要的(?)。但是,这似乎是瓶颈。

标签: java apache-kafka apache-kafka-streams


【解决方案1】:

问题是由于经常请求commit造成的。

您根本不需要致电ProcessorContext:commit()。 Kafka Streams 基于commit.interval.ms 属性执行提交(默认:30000 ms)。如果恰好一次语义被设置为不同的值。您可以在https://kafka.apache.org/documentation/#streamsconfigs详细了解。

如果在某些用例中您需要更频繁地提交,您可以致电ProcessorContext:commit()。但是您必须记住,该提交不是立即(直接)进行的。它只设置标志以尽快提交。

【讨论】:

  • 啊,我根本没想到。现在很清楚了。可能最让我感到困惑的是,几乎所有教程/视频/博客文章/其他示例在任何地方都调用commit(),所以我认为这是必要的。谢谢!
猜你喜欢
  • 2017-01-07
  • 1970-01-01
  • 2021-11-16
  • 1970-01-01
  • 2018-08-25
  • 1970-01-01
  • 2019-06-02
  • 1970-01-01
  • 2016-06-02
相关资源
最近更新 更多