【问题标题】:Failed to inject Kafka client into GrpcService Quarkus无法将 Kafka 客户端注入 GrpcService Quarkus
【发布时间】:2022-08-24 03:27:19
【问题描述】:
我正在尝试通过 Grpc 服务接收消息,将其发送到 Kafka Emitter,然后返回一些值。
@Singleton
@GrpcService
public class MessageService implements protobuf.MessageService{
@Inject
@Channel(\"hello-out\")
Emitter<Record<String, GeneratedMessageV3>> emitter;
@Override
public Uni<EnvelopeReply> processMessage(Envelope request) {
return Uni.createFrom().completionStage(
emitter.send(Record.of(request.getKey(), request))
).replaceWith(EnvelopeReply.newBuilder().build());
}
}
在构建期间,我收到下一个错误:
Error injecting org.eclipse.microprofile.reactive.messaging.Emitter<io.smallrye.reactive.messaging.kafka.Record<java.lang.String, com.google.protobuf.GeneratedMessageV3>> com.test.MessageService.emitter
...
Caused by: javax.enterprise.inject.spi.DefinitionException: SRMSG00019: Unable to connect an emitter with the channel `hello-out`
它与 Rest 资源一起正常工作。
标签:
java
apache-kafka
protocol-buffers
grpc
quarkus
【解决方案1】:
无需深入探讨该主题,这是我的解决方案:
您不能将 Kafka Emitter 直接注入 grpc 服务,它会抛出异常。
GrpcService <- Emitter<Record...>
可能的原因(我相信 Quarkus 团队会用正确的解决方案回复较低的 :))是所有 GrpcServices 都是@Singleton 类型,并且它们不能具有延迟初始化的属性,它们需要直接注入一些东西。 Emitter 在后期生成。
通过添加一个包装类,你解决了所有的问题,所以:
GrpcService <- KafkaService <- Emitter<Record...>
@ApplicationScoped
public class KafkaService {
@Inject
@Channel("hello-out")
Emitter<Record<String, GeneratedMessageV3>> emitter;
// Implement this part properly, added just for example
public Emitter<Record<String, GeneratedMessageV3>> getEmitter() {
return emitter;
}
}
...
@Singleton
@GrpcService
public class MessageService implements protobuf.MessageService {
@Inject
KafkaService kafkaService;
@Override
public Uni<EnvelopeReply> processMessage(Envelope request) {
// use metadata if needed
Map<String, String> metadataMap = request.getMetadataMap();
return Uni.createFrom().completionStage(
kafkaService.getEmitter().send(Record.of(request.getKey(), request))
).replaceWith(EnvelopeReply.newBuilder().build());
}
}