【问题标题】:java grpc server for a bi-directional stream that connect activemq to push & get message from activemqjava grpc服务器,用于连接activemq以从activemq推送和获取消息的双向流
【发布时间】:2021-11-27 15:53:06
【问题描述】:

我正在尝试为连接 activemq 以从 activemq 推送和获取消息的双向流编写一个 java grpc 服务器

我能够在服务器上实例化一个 Stream 观察者。然而,问题是一旦我从 grpc 客户端获取数据,调用服务器 StreamObserver 上的 onNext 函数。我正在尝试使用 JMSTemplate 在 activemq 中推送消息。我总是将 jmsTemplate 设为 null。

我正在如下配置 JMSTemplate 来创建 JMSTemplate,

@Configuration
@EnableJms
public class JmsConfig {

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(
            ConnectionFactory connectionFactory) {

        DefaultJmsListenerContainerFactory factory
                = new DefaultJmsListenerContainerFactory();

        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("5-10");

        return factory;
    }
}

application.yml

spring:
  application-name: bidirectional-server
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin
    packages.trust-all: true

HelloService.java

@GrpcService

public class HelloService extends HelloServiceGrpc.HelloServiceImplBase {

    /**
     * @param responseObserver
     */
    @Override
    public StreamObserver<HelloEnvelope> transfer(StreamObserver<HelloEnvelope> responseObserver) {
        return new TransferHelloRequestStreaming(responseObserver);
    }
}

TransferHelloRequestStreaming.java

@Slf4j
@Service
public class TransferHelloRequestStreaming implements StreamObserver<HelloEnvelope> {


    private  StreamObserver<HelloEnvelope> transferHelloResponseStreamObserver;

    private HelloEnvelope transferHelloResponse = null;
    public TransferMessageRequestStreaming() {
    }
    public TransferHelloRequestStreaming(StreamObserver<HelloEnvelope> transferHelloResponseStreamObserver) {
        this.transferHelloResponseStreamObserver = transferHelloResponseStreamObserver;
    }

    @Override
    public void onNext(HelloEnvelope transferHelloRequest) {

   
       new HelloProducer().sendTo("input",transferHelloRequest);

  
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onCompleted() {
     this.transferHelloResponseStreamObserver.onCompleted();
    }

}

HelloProducer.java

@Service
@Slf4j
public class HelloProducer {

  @Autowired
  JmsTemplate jmsTemplate;

  public void sendTo(String destination, MessageEnvelope transferHelloRequest) {
 
    jmsTemplate.convertAndSend(destination, transferHelloRequest);

  }
}

HelloService.proto

syntax = "proto3";

import "google/protobuf/any.proto";
option java_multiple_files = true;
option java_outer_classname = "HelloProto";
package com.server.grpcserver;

service HelloService {
  
    rpc transfer(stream HelloEnvelope) returns
        (stream HelloEnvelope) {
    }
}

message HelloEnvelope {
    map<string, google.protobuf.Any> header =1;
    bytes body =2;
}

在 HelloProducer.java 中,我总是将 jmsTemplate 设为 null。一旦我在 onNext() 中从 grpc 客户端获取数据,我想将数据推送到 activeMQ 中。

【问题讨论】:

  • 我正在收藏这个问题!在某些时候似乎是一个非常好的工具。 (通过下面的 ~gary-russel 进行修复)。

标签: java spring-jms grpc-java protobuf-java jmstemplate


【解决方案1】:

new HelloProducer()

您还需要 @AutowireHelloProducer - 您正在创建一个新实例(不是由 Spring 管理的,因此它不会自动连接)。

【讨论】:

  • 我已经自动装配了 HelloProducer 然后 HelloProducer 变为 null 。
  • 您对TransferHelloRequestStreaming 也是如此。它也必须是 Spring 托管的 bean。
猜你喜欢
  • 2012-12-18
  • 2015-08-03
  • 1970-01-01
  • 2015-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-07-24
  • 1970-01-01
  • 2019-11-12
相关资源
最近更新 更多