【问题标题】:Tcp connection with Spring Integration framework与 Spring Integration 框架的 TCP 连接
【发布时间】:2016-06-17 16:33:18
【问题描述】:

我正在尝试创建一个接受入站连接的 Tcp 服务器,并异步向连接的客户端发送消息。 有一个 Tcp 服务器的示例,但是它使用的是请求/响应的网关,不支持异步。

我的目标,

  1. 服务器监听套接字,例如9000
  2. 一个 tcp 客户端连接到 9000
  3. 服务器接受连接并接收消息。 (使用TcpReceivingChannelAdapter?)
  4. 服务器保留连接/套接字并记下 ip_connectId 标头。
  5. 当某些事件或计划任务为客户端生成消息时,它会查找 ip_connectId 并向该客户端发送消息。 (使用TcpSendingMessageHandler?)

从参考文档中,我应该使用协作出站和入站通道适配器。但没有 java 配置示例。我不明白如何使用 java config 执行此操作,尤其是如何以及在何处查找要发送的客户端。

我需要两个频道吗?一个用于入境,一个用于出境? inboundAdapter->fromTcpChannel->consumer producer->outboundAdapter->toTcpChannel

我是否创建ServiceActivator 或端点来充当消费者/生产者? spring 集成是否默认保持连接处于活动状态?当我需要向它发送消息时,只需将 ip_connectId 标头添加到消息中? 我是使用TcpSendingMessageHandler 将消息发送给客户端还是需要实现gateway

在 Gary 的帮助下清理我的代码并再次测试,这是我目前的代码。

@EnableIntegration
@IntegrationComponentScan
@Configuration
public class IntegrationConfig implements
        ApplicationListener<TcpConnectionEvent> {
    @Value("${listen.port:8000}")
    private int port;

    @Bean  //for accepting text message from TCP, putty
    public MessageChannel fromTcp() {
        return new DirectChannel();
    }

    @Bean  //for sending text message to TCP client, outbound
    public MessageChannel toTcp() {
        return new DirectChannel();
    }

    // receive from MVC controller
    @Bean
    public MessageChannel invokeChannel() {
        return new DirectChannel();
    }   

    @Bean  //inbound, it is working, I could read the inbound message while debugging
    public TcpReceivingChannelAdapter in(
            AbstractServerConnectionFactory connectionFactory) {
        TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
        adapter.setOutputChannel(fromTcp());
        adapter.setConnectionFactory(connectionFactory);
        return adapter;
    }

    //transform TCP bytes to string message, working
    @Transformer(inputChannel = "fromTcp", outputChannel = "toCollaborate")
    public String convert(byte[] bytes) {

        return new String(bytes);
    }

    MessageHeaders staticheader;  //save ip_connectinId, use this to collaborate outbound message later, for testing purpose only
    @ServiceActivator(inputChannel = "toCollaborate", outputChannel = "toTcp")
    public Message<String> handleTcpMessage(Message<String> stringMsg) {
        staticheader = stringMsg.getHeaders();
        return stringMsg;
        // save the header, collaborate to output channel
    }

    //collaborate message from REST API invokeChannel to a outbound tcp client, this fail
    @Transformer(inputChannel = "invokeChannel", outputChannel = "toTcp")
    public Message<String> headerBeforeSend(String test) {
        GenericMessage<String> msg = new GenericMessage<String>(
                "from rest api");
        if (staticheader != null) {         
            MessageBuilder
                    .fromMessage(msg)
                    .setHeader("ip_connectionId",
                            staticheader.get("ip_connectionId")).build();
        }
        return msg;
    }

    @ServiceActivator(inputChannel = "toTcp")
    @Bean
    public TcpSendingMessageHandler out(
            AbstractServerConnectionFactory connectionFactory) {
        TcpSendingMessageHandler tcpOutboundAdp = new TcpSendingMessageHandler();
        tcpOutboundAdp.setConnectionFactory(connectionFactory);


        return tcpOutboundAdp;
    }   

    // should need only 1 factory? and keep connectin alive
    // server for in coming connection
    @Bean
    public AbstractServerConnectionFactory serverCF() {
        return new TcpNetServerConnectionFactory(this.port);
    }

    @Override
    public void onApplicationEvent(TcpConnectionEvent tcpEvent) {
        // TODO Auto-generated method stub
        TcpConnection source = (TcpConnection) tcpEvent.getSource();

    }

}
//The MVC controller
@Autowired
    MessageChannel invokeChannel;
    @RequestMapping(value="/invoke")
    public String sayHello()
    {
        //trigger gateway to send a message
        String msg = "hello";
        MessagingTemplate template = new MessagingTemplate();
        template.send(invokeChannel, new GenericMessage<String>(msg));      
        return msg;
    }

测试结果: 1. putty connect ok,发短信 2. SI收到消息ok 3.使用REST APIlocalhost/webappname/rest/invoke发送消息到invokeChannel,ok 4.transformer设置消息头 5. 异常如下

异常 org.springframework.web.util.NestedServletException: 请求 处理失败;嵌套异常是 org.springframework.messaging.MessageHandlingException:找不到 出站套接字 org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:981) org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:860) javax.servlet.http.HttpServlet.service(HttpServlet.java:622) org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:845) javax.servlet.http.HttpServlet.service(HttpServlet.java:729) org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)

【问题讨论】:

    标签: java sockets tcp spring-integration


    【解决方案1】:

    是的;连接默认保持打开状态;是的,您可以使用@ServiceActivator 来处理请求;是的,您只需设置连接 ID 标头。

    要在 Java 配置中配置出站适配器,请将 @ServiceActivator 添加到处理程序 bean...

    @Bean
    public TcpReceivingChannelAdapter() in() {
    
        ...
        adapter.setOutputChannel(newRequests());
    
    }
    
    
    ...
    
    @ServiceActivator(inputChannel="toClientChannel")
    @Bean
    public TcpSendingMessageHandler out() {
    
        ...
    
    }
    

    【讨论】:

    • 另见参考手册中的@Configuration 示例:docs.spring.io/spring-integration/reference/html/…
    • 加里,我还是不太明白。当我创建TcpSendingMessageHandler bean 时,我应该为其参数分配一个AbstractServerConnectionFactory 对象吗?当有消息输出到toClientChannel 通道时,如何在out() 中获取此消息对象?如何在这个 out() 处理程序中通过 TCP 发送消息?
    • 如何从newRequest()返回频道?我的频道是在 Endpoint 类之外定义的。 @EnableIntegration @IntegrationComponentScan @Configuration public class IntegrationConfig { @MessageEndpoint public static class RelayService{ @Bean public TcpReceivingChannelAdapter() in() { ... adapter.setOutputChannel(newRequests()); } } }
    • 我让它工作了,结果我没有正确设置标题。消息头是不可变的,我必须将build()结果分配给另一个消息对象,然后返回新消息。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-04-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多