【问题标题】:Sending messages to a socket port and receiving response with Spring Integration使用 Spring Integration 向套接字端口发送消息并接收响应
【发布时间】:2018-03-01 09:49:34
【问题描述】:

现状

  • 在 linux 服务器上,几个 jar 使用套接字运行 - 侦听和响应字符串消息
  • 在 wildfly 应用服务器中运行的新战争正在将请求委托给这些套接字
  • WAR 使用的是 spring,尤其是 spring 与注释的集成

我有一个配置类持有服务@Configuration / @EnableIntegration /@IntegrationComponentScan

我已经创建了一个消息传递网关

@MessagingGateway(defaultRequestChannel = "testGateway")
public interface TestGateway{
    public Future<String> sendMessage(String in);
}

应用程序应该作为客户端发送和接收请求。我创建了一个空事件处理程序,因为应用程序应该只发送字符串并等待答案

@Bean
public MessageChannel testChannel() {
    return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel = "testGateway")
public MessageHandler testGate() {
    final TcpOutboundGateway gate = new TcpOutboundGateway();
    gate.setConnectionFactory(connectionFactory());
    gate.setReplyChannel(docServerChannel());
    return gate;
}

@Bean
public AbstractClientConnectionFactory connectionFactory() {
    final AbstractClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("localhost", 5959);
    connectionFactory.setSoTimeout(300000);
    connectionFactory.setApplicationEventPublisher(new NullEventPublisher());
    connectionFactory.setSerializer(new DefaultSerializer());
    connectionFactory.setDeserializer(new DefaultDeserializer());
    return connectionFactory;
}

消息在接收数据时应转换为字符串,在发送时应转换为字节

@MessageEndpoint
public static class TestMessage {

    @Transformer(inputChannel = "testChannel")
    public String convert(final byte[] bytes) {
        return new String(bytes);
    }

    @Transformer(inputChannel = "testGateway")
    public String convertResult(final byte[] bytes) {
        return new String(bytes);
    }

}

应用程序已部署,但响应总是超时。套接字正在运行。我只想要一个简单的直接双向连接:WAR JAR.

有人可以帮助或给我一个提示吗?

-----UPDATE-1----------

套接字正在接收消息,但由于发送消息后套接字已关闭,因此无法读取响应。

-----UPDATE-2----------

  • 这是一个错字。系统返回一个MessageHandler
  • 我已将工厂添加为 Spring 管理的 bean
  • 我已将“\r\n”添加到旧代码中
  • 应用程序仍在抱怨“等待响应超时”

旧版服务器正在打开服务器套接字并向套接字发送消息

final OutputStream os = serverSocket.getOutputStream();
final PrintWriter pw = new PrintWriter(os, true);
final BufferedReader br = new BufferedReader(new InputStreamReader(serverSocket.getInputStream()));
final String incoming = br.readLine();
final String response= "ok\r\n";
pw.println(response);
pw.flush();
Thread.sleep(5000);
pw.close();
serverSocket.close();

-----UPDATE-3----------

Spring 的 TcpOutboundGateway 没有响应

        connection.send(requestMessage);
        Message<?> replyMessage = reply.getReply();
        if (replyMessage == null) {

【问题讨论】:

    标签: java spring tcp spring-integration


    【解决方案1】:

    连接工厂需要是@Bean,以便Spring可以管理它。

    public TcpInboundGateway testGate() {
        final AbstractClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("localhost", 5959); // already running socket
        connectionFactory.setApplicationEventPublisher(new NullEventPublisher());
        final TcpOutboundGateway gate = new TcpOutboundGateway();
        gate.setConnectionFactory(connectionFactory);
        gate.setOutputChannelName("testChannel");
        return gate;
    }
    

    这不会编译;返回类型与您返回的内容不匹配。

    假设这只是一个错字,并且 bean 实际上是一个出站网关,使用此配置,回复必须以 \r\n (CRLF) 终止。

    the documentation;向下滚动到...

    TCP 是一种流协议;这意味着必须为通过 TCP 传输的数据提供某种结构,以便接收方可以将数据划分为离散的消息。连接工厂配置为使用(反)序列化程序在消息有效负载和通过 TCP 发送的位之间进行转换。这是通过分别为入站和出站消息提供反序列化器和序列化器来实现的。提供了许多标准(反)序列化程序。

    ...并阅读有关标准反序列化程序的信息。根据您的配置,标准解串器正在等待终止 \r\n (CRLF)。

    服务器代码是做什么的?

    编辑

    @SpringBootApplication
    public class So49046888Application {
    
        public static void main(String[] args) throws Exception {
            ConfigurableApplicationContext ctx = SpringApplication.run(So49046888Application.class, args);
            String reply = ctx.getBean(TestGateway.class).sendMessage("foo").get();
            System.out.println(reply);
            Thread.sleep(10_000);
            ctx.close();
        }
    
        @Bean
        public ServerSocket serverSocket() throws IOException {
            return ServerSocketFactory.getDefault().createServerSocket(5959);
        }
    
        @Bean
        public ApplicationRunner runner(TaskExecutor exec) {
            return args -> {
                exec.execute(() -> {
                    try {
                        while (true) {
                            Socket socket = serverSocket().accept();
                            final OutputStream os = socket.getOutputStream();
                            final PrintWriter pw = new PrintWriter(os, true);
                            final BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                            final String incoming = br.readLine();
                            System.out.println(incoming);
                            final String response= "ok\r\n";
                            pw.print(response);
                            pw.flush();
                            Thread.sleep(5000);
                            pw.close();
                            socket.close();
                        }
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            };
        }
    
        @Bean
        public TaskExecutor exec() {
            return new ThreadPoolTaskExecutor();
        }
    
        @Bean
        @ServiceActivator(inputChannel = "testGateway")
        public MessageHandler testGate() {
            final TcpOutboundGateway gate = new TcpOutboundGateway();
            gate.setConnectionFactory(connectionFactory());
            gate.setReplyChannelName("toString");
            gate.setRemoteTimeout(60_000);
            return gate;
        }
    
        @Transformer(inputChannel = "toString")
        public String transform(byte[] bytes) {
            return new String(bytes);
        }
    
        @Bean
        public AbstractClientConnectionFactory connectionFactory() {
            final AbstractClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("localhost", 5959);
            connectionFactory.setSoTimeout(300000);
            return connectionFactory;
        }
    
        @MessagingGateway(defaultRequestChannel = "testGateway")
        public static interface TestGateway {
            public Future<String> sendMessage(String in);
        }
    
    }
    

    【讨论】:

    • 编辑了我的问题以提供更多详细信息。
    • 我建议你为org.springframework.integration开启DEBUG级别的日志;你会得到很多有助于追踪事情的信息。
    • 调试模式没有任何新功能:TcpNetConnectio 读取异常 localhost:5959:55274:d08b8ffe-f2a9-465f-9165-956db23adbdb SocketException:Socket closed
    • 您现在使用的是 Java 序列化而不是默认的 CRLF 序列化 - 我用一个工作示例编辑了我的答案。
    • 您还应该在服务器上使用pw.print() 而不是println() 以避免额外的\n。如果要使用println,请使用ByteArrayLfSerializer 并删除\r\n
    猜你喜欢
    • 1970-01-01
    • 2019-05-07
    • 1970-01-01
    • 1970-01-01
    • 2019-08-25
    • 2020-07-10
    • 1970-01-01
    • 2019-09-09
    • 2019-07-04
    相关资源
    最近更新 更多