【问题标题】:Sleuth tracing is not working for transactional Kafka producers侦探跟踪不适用于事务性 Kafka 生产者
【发布时间】:2021-04-21 11:11:14
【问题描述】:

目前,我们正在使用事务性 Kafka 生产者。我们注意到的是缺少 Kafka 的跟踪方面,这意味着我们看不到 Kafka 生产者的检测,因此缺少 b3 标头。

查看代码后,我们发现没有为事务生产者调用后处理器,这意味着 TraceProducer 永远不会由 TraceProducerPostProcessor 创建。这有什么原因吗?此外,为事务生产者启用跟踪的解决方法是什么?似乎没有一个地方可以轻松创建跟踪生产者(DefaultKafkaProducerFactory #doCreateTxProducer 是私有的)

附加屏幕截图(DefaultKafkaProducerFactory 类)。在屏幕截图中,您可以看到仅为原始生产者调用后处理器,而不是为事务生产者调用。

您的帮助将不胜感激。

谢谢

DefaultKafkaProducerFactory#createRawProducer

【问题讨论】:

    标签: apache-kafka spring-kafka spring-cloud-sleuth brave


    【解决方案1】:

    ??

    createRawProducer() 被事务性和非事务性生产者调用:

    其他事情正在发生。

    编辑

    问题是侦探用不同的生产者替换了生产者,但工厂丢弃了它并使用了原来的。

    https://github.com/spring-projects/spring-kafka/issues/1778

    EDIT2

    其实,我们在这里丢弃追踪生产者是件好事; Sleuth 还将工厂包装在代理中,并将 CloseSafeProducer 包装在 TracingProducer 中;但我看到交易和非交易生产者的结果相同......

    @SpringBootApplication
    public class So67194702Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So67194702Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(ProducerFactory<String, String> pf) {
            return args -> {
                Producer<String, String> prod = pf.createProducer();
                prod.close();
            };
        }
    
    }
    

    close()上设置断点...

    【讨论】:

    • 感谢您的回答。将调试并返回给您。
    • 我想我明白问题所在了;我们调用apply(),但不要用结果替换生产者。我看不出这对任何类型的制作人是如何起作用的。
    【解决方案2】:

    感谢 Gary Russell 的快速回复。 createRawConsumer 有效地为事务性和非事务性消费者调用。

    Sleuth 正在使用 TraceConsumerPostProcessor 将 Kafka 消费者包装到 TracingConsumer 中。由于 ProducerPostProcessor 接口扩展了 Function 接口,我们可以假设可以/应该使用函数的结果,但 DefaultKafkaProducerFactory 的 createRawConsumer 方法正在应用后处理器而不使用返回类型。在这种特定情况下导致问题。

    那么,我们不能修改 createRawConsumer 的实现来分配后处理器的结果吗?如果没有,让后处理器扩展消费者而不是函数不是更好吗?

    通过覆盖 createRawConsumer 方法进行成功​​测试,如下所示

        @Override
        protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
            Producer<K, V> kafkaProducer = new KafkaProducer<>(rawConfigs, getKeySerializerSupplier().get(), getValueSerializerSupplier().get());
            for (ProducerPostProcessor<K, V> pp : getPostProcessors()) {
                kafkaProducer = pp.apply(kafkaProducer);
            }
            return kafkaProducer;
        }
    

    感谢您的帮助。

    【讨论】:

    • 其实;它更复杂;事实证明,我们丢弃了跟踪生产者是一件好事,因为 Sleuth 还用代理包装了工厂,然后用 TracingProducer 包装了 CloseSafeProducer;但我认为它与事务性和非事务性生产者的工作方式相同;我会更新我的答案。
    • 在生产者方面,似乎只有在工厂调用 createProducer 方法时才会触发 SleuthKafkaAspect,但在我们的例子中,由于事务支持,方法 createNonTransactionalProducer 被调用(KafkaTemplate#getTheProducer) .因此,我们没有带有修复的重复跟踪包装器。
    • 您应该为此向侦探提出问题。作为一种变通方法,您可以为非事务性生产者使用不同的生产者工厂和模板。
    • 非常感谢您的回复。
    猜你喜欢
    • 1970-01-01
    • 2018-02-08
    • 2019-01-15
    • 2021-03-14
    • 2020-07-14
    • 1970-01-01
    • 1970-01-01
    • 2020-12-26
    • 1970-01-01
    相关资源
    最近更新 更多