【问题标题】:Spring kafka producer threads keep increasingSpring kafka生产者线程不断增加
【发布时间】:2020-05-12 16:13:17
【问题描述】:

我们正在使用带有 Spring 的 Kafka,我们目前正在对应用程序进行一些负载测试。在开始负载测试的几分钟内,Tomcat 停止响应,在分析线程转储时,我看到相当多的 Kafka 生产者线程,并假设这可能是应用程序挂起的原因。线程数非常高,即在几分钟就有 200 多个 Kafka 生产者线程。有什么办法可以关闭这些生产者线程。下面给出的是我的 Spring Kafka 生产者配置。

编辑: 在我们的应用程序中,我们有一个事件发布/订阅,我正在使用 Kafka 发布事件。 分区数:15,并发:5

@Bean
public ProducerFactory<String, Object> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    configProps.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaCustomPartitioner.class);
    configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    configProps.put(ProducerConfig.LINGER_MS_CONFIG, 200);


    DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(configProps);
    factory.setTransactionIdPrefix(serverId+"-tx-");
    // factory.setProducerPerConsumerPartition(false);
    return factory;
}

public ConsumerFactory<String, Object> consumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
    props.put(ConsumerConfig.GROUP_ID_CONFIG,"custom-group-id");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,60000);
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,5000);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,20);
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,600000);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "org.xxx.xxx.xxx");
    return new DefaultKafkaConsumerFactory<>(props);
}


@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> customKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    //factory.setConcurrency(eventTopicConcurrency);
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setErrorHandler(new SeekToCurrentErrorHandler());
    factory.setConsumerFactory(consumerFactory("custom-group-id"));

    return factory;
}

以下是我的发布者和订阅者代码

@Override
public void publish(Event event) {
    //try {
        DomainEvent event = event.getDomainEvent();
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topicName,
                event.getMainDocumentId() != null ? event.getMainDocumentId() : null, event);

        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                if(LOGGER.isDebugEnabled())
                    LOGGER.debug("Published event {} : {}",event.getEventName(), event.getEventId());
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.error("Failed to publish event {} : {} ", event.getEventName(), event.getEventId());
                throw new RuntimeException(ex);
            }
        });
    }

监听器:我们有不止一个事件订阅者,所以当我们从 Kafka 接收到一个事件时,我们会为每个订阅者生成新线程来处理该事件,当所有订阅者都完成处理时,我们会提交偏移量。

@KafkaListener(topics = "${kafka.event.topic.name}-#{ClusterConfigSplitter.toClusterId('${cluster.info}')}", concurrency="${kafka.event.topic.concurrency}", clientIdPrefix="${web.server.id}-event-consumer", containerFactory = "customKafkaListenerContainerFactory")
public void eventTopicListener(Event event, Acknowledgment ack)
        throws InterruptedException, ClassNotFoundException, IOException {

    if(LOGGER.isDebugEnabled())
        LOGGER.debug("Received event {} : {}", event.getDomainEvent().getEventName(), event.getDomainEvent().getEventId());

    DomainEvent domainEvent = event.getDomainEvent();

    List<EventSubscriber> subcribers = new ArrayList<>();
    for (String failedSubscriber : event.getSubscribersToRetry()) {
        subcribers.add(eventSubcribers.get(failedSubscriber));
    }

    CountDownLatch connectionLatch = new CountDownLatch(subcribers.size());

    List<String> failedSubscribers = new ArrayList<>();

    for (EventSubscriber subscriber : subcribers) {

        taskExecutor.execute(new Runnable() {
            @Override
            public void run() {
                tenantContext.setTenant(domainEvent.getTenantId());
                DefaultTransactionDefinition def = new DefaultTransactionDefinition();
                def.setName(domainEvent.getEventId() + "-" + subscriber.getClass().getName());
                def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);

                TransactionStatus status = txManager.getTransaction(def);

                try {
                    subscriber.handle(domainEvent);
                    txManager.commit(status);
                } catch (Exception ex) {
                    LOGGER.error("Processing event {} : {} failed for {} - {}", domainEvent.getEventName(), domainEvent.getEventId(), ex);

                    txManager.rollback(status);
                    failedSubscribers.add(subscriber.getClass().getName());
                }

                connectionLatch.countDown();

                if(LOGGER.isDebugEnabled())
                    LOGGER.debug("Processed event {} : {} by {} ", domainEvent.getEventName(), domainEvent.getEventId(), subscriber.getClass().getName());
            }
        });

    }

    connectionLatch.await();

    ack.acknowledge();

    if(failedSubscribers.size()>0) {

        eventPersistenceService.eventFailed(domainEvent, failedSubscribers, event.getRetryCount()+1);

    }




}

事务管理器

    @Bean
@Primary
public PlatformTransactionManager transactionManager(EntityManagerFactory factory,@Qualifier("common-factory") EntityManagerFactory commonFactory, ProducerFactory producerFactory){

    JpaTransactionManager transactionManager = new JpaTransactionManager();
    transactionManager.setEntityManagerFactory(factory);

    JpaTransactionManager commonTransactionManager = new JpaTransactionManager();
    commonTransactionManager.setEntityManagerFactory(commonFactory);

    KafkaTransactionManager kafkaTransactionManager= new KafkaTransactionManager(producerFactory);

    return new ChainedKafkaTransactionManager(kafkaTransactionManager,commonTransactionManager,transactionManager);

}

【问题讨论】:

    标签: java tomcat apache-kafka spring-kafka


    【解决方案1】:

    我将写一个更完整的答案来帮助其他可能发现这个问题的人。

    使用事务时,默认情况下,我们必须为每个group/topic/partition组合创建一个新的生产者(假设事务是由消费者线程启动的);这是为了在发生再平衡时可以适当地隔离生产者。

    2.5 kafka-clients 有一个改进的算法来改善这种情况,我们不再需要所有这些生产者。

    但是,代理必须升级到 2.5.0 才能使用此功能。

    即将发布的 2.5.0.RELEASE(明天到期)允许将这种新的线程模型用于事务生产者。

    release candidate 可用于测试。

    有关新功能的文档是here

    但是,您已禁用创建提供适当生产者防护的生产者。

    factory.setProducerPerConsumerPartition(false);
    

    所以在这种情况下,您应该看到生产者被缓存了;拥有这么多生产者是不寻常的,除非您的侦听器容器具有巨大的并发性并且生产量非常大。

    生产者工厂目前不支持限制缓存大小。

    也许您可以编辑您的问题,以更多地解释您的应用程序正在做什么并显示更多代码/配置。

    【讨论】:

    • 我已经添加了更多代码,请看看它是否有任何想法
    • 对不起,设置factory.setProducerPerConsumerPartition(false);后我没有运行负载测试,观察是默认的,仍然200+看起来很大,明天再检查线程数设置producerPerConsumerPartition = false
    • 有多少订阅者?鉴于这种架构,如果订阅者数量很高,我不会感到惊讶最终会有很多生产者。我假设publish() 是从subscriber.handle() 调用的。或许可以考虑使用订阅者池来限制您需要的并发生产者数量。
    • 对不起,我在你的第二条评论之前发布了;但是,鉴于您在新线程(而不是容器线程)上运行生产者,我怀疑生产者工厂设置会有所不同,因为您使用的是缓存中的生产者而不是容器将启动的生产者(假设您有将 CKTM 注入容器中)。但是,使用事务容器是没有意义的,因为您的生产者没有参与该事务。
    • 我无法理解您评论的第一部分,您能否详细说明 - “但是,鉴于您在新线程(而不是容器线程)上运行生产者,我怀疑生产者工厂设置会有所不同,因为您使用的是缓存中的生产者,而不是容器将启动的生产者”。还有为什么说生产者不参与交易,调用发布方法的方法在@Transactional下
    猜你喜欢
    • 2019-06-14
    • 1970-01-01
    • 2023-02-21
    • 2018-11-16
    • 2019-05-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-05-19
    相关资源
    最近更新 更多