【问题标题】:Failed to inject Kafka client into GrpcService Quarkus无法将 Kafka 客户端注入 GrpcService Quarkus
【发布时间】:2022-08-24 03:27:19
【问题描述】:

我正在尝试通过 Grpc 服务接收消息,将其发送到 Kafka Emitter,然后返回一些值。

@Singleton
@GrpcService
public class MessageService implements protobuf.MessageService{

    @Inject
    @Channel(\"hello-out\")
    Emitter<Record<String, GeneratedMessageV3>> emitter;

    @Override
    public Uni<EnvelopeReply> processMessage(Envelope request) {
        return Uni.createFrom().completionStage(
                emitter.send(Record.of(request.getKey(), request))
        ).replaceWith(EnvelopeReply.newBuilder().build());
    }
}

在构建期间,我收到下一个错误:

 Error injecting org.eclipse.microprofile.reactive.messaging.Emitter<io.smallrye.reactive.messaging.kafka.Record<java.lang.String, com.google.protobuf.GeneratedMessageV3>> com.test.MessageService.emitter
...
Caused by: javax.enterprise.inject.spi.DefinitionException: SRMSG00019: Unable to connect an emitter with the channel `hello-out`

它与 Rest 资源一起正常工作。

    标签: java apache-kafka protocol-buffers grpc quarkus


    【解决方案1】:

    无需深入探讨该主题,这是我的解决方案: 您不能将 Kafka Emitter 直接注入 grpc 服务,它会抛出异常。

    GrpcService <- Emitter<Record...>
    

    可能的原因(我相信 Quarkus 团队会用正确的解决方案回复较低的 :))是所有 GrpcServices 都是@Singleton 类型,并且它们不能具有延迟初始化的属性,它们需要直接注入一些东西。 Emitter 在后期生成。 通过添加一个包装类,你解决了所有的问题,所以: GrpcService <- KafkaService <- Emitter<Record...>

    @ApplicationScoped
    public class KafkaService {
    
        @Inject
        @Channel("hello-out")
        Emitter<Record<String, GeneratedMessageV3>> emitter;
        // Implement this part properly, added just for example
        public Emitter<Record<String, GeneratedMessageV3>> getEmitter() {
            return emitter;
        }
    
    }
    ...
    @Singleton
    @GrpcService
    public class MessageService implements protobuf.MessageService {
    
        @Inject
        KafkaService kafkaService;
    
        @Override
        public Uni<EnvelopeReply> processMessage(Envelope request) {
            // use metadata if needed
            Map<String, String> metadataMap = request.getMetadataMap();
            return Uni.createFrom().completionStage(
                    kafkaService.getEmitter().send(Record.of(request.getKey(), request))
            ).replaceWith(EnvelopeReply.newBuilder().build());
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-05-22
      • 1970-01-01
      • 1970-01-01
      • 2020-02-03
      • 1970-01-01
      • 2020-02-15
      • 1970-01-01
      • 2019-11-04
      相关资源
      最近更新 更多