【问题标题】:closing socket connection in single-use=false spring integration TCP Server在单次使用中关闭套接字连接=false spring 集成 TCP Server
【发布时间】:2023-04-05 08:00:01
【问题描述】:

我知道 spring 集成有 TcpInboundGateway 和 ByteArrayStxEtxSerializer 来处理来自 TCP 端口的数据。

ByteArrayStxEtxSerializer 如果 TCP 服务器需要读取从客户端发送的所有数据然后处理它,则效果很好。 (请求和响应模型)我使用 single-use=false 以便可以在同一个连接中处理多个请求。

例如,如果客户端发送 0x02AAPL0x03,那么服务器可以发送 AAPL 价格。

如果客户端发送 0x02AAPL0x030x02GOOG0x03,我的 TCP 服务器正在工作。它发送 AAPL 价格和 GOOG 价格。

有时客户端可以发送 EOT (0x04)。如果客户端发送EOT,我想关闭socket连接。

例如:客户端请求可以是0x02AAPL0x030x02GOOG0x03 0x020x040x03。注意 EOT 出现在最后一个数据包中。

我知道可以自定义 ByteArrayStxEtxSerializer 解串器来读取客户端发送的字节。

解串器是关闭套接字连接的好地方吗?如果没有,应该如何通知spring集成框架关闭socket连接?

请帮忙。

这是我的弹簧配置:

<int-ip:tcp-connection-factory id="crLfServer"
        type="server"
        port="${availableServerSocket}"
        single-use="false"
        so-timeout="10000"
        using-nio="false" 
        serializer="connectionSerializeDeserialize"
        deserializer="connectionSerializeDeserialize"
        so-linger="2000"/>

    <bean id="connectionSerializeDeserialize" class="org.springframework.integration.ip.tcp.serializer.ByteArrayStxEtxSerializer"/>

    <int-ip:tcp-inbound-gateway id="gatewayCrLf"
        connection-factory="crLfServer"
        request-channel="serverBytes2StringChannel"
        error-channel="errorChannel"
        reply-timeout="10000"/> <!-- reply-timeout works on inbound-gateway -->

    <int:channel id="toSA" />

    <int:service-activator input-channel="toSA"
        ref="myService"
        method="prepare"/>

    <int:object-to-string-transformer id="serverBytes2String"
        input-channel="serverBytes2StringChannel"
        output-channel="toSA"/>

    <int:transformer id="errorHandler"
        input-channel="errorChannel"
        expression="payload.failedMessage.payload + ':' + payload.cause.message"/>

更新: 添加 throw new SoftEndOfStreamException("Stream closed") 以关闭序列化程序中的流,我可以在 EventListener 中看到 CLOSED 日志条目。当服务器关闭连接时,我希望在客户端接收 java.io.InputStream.read() 作为 -1。但是客户端正在接收

java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.read(SocketInputStream.java:129)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:264)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:306)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:158)
    at sun.nio.cs.StreamDecoder.read0(StreamDecoder.java:107)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:93)
    at java.io.InputStreamReader.read(InputStreamReader.java:151)

还有什么可以关闭服务器端的连接并将其传播到客户端吗?

感谢您的帮助。

谢谢

【问题讨论】:

    标签: java sockets tcp spring-integration


    【解决方案1】:

    反序列化器无法访问套接字,只能访问输入流;关闭它可能会起作用,但您可能会在日志中得到很多噪音。

    最好的解决办法是抛出一个SoftEndOfStreamException;这表示应该关闭套接字并清理所有内容。

    编辑

    添加侦听器以检测/记录关闭...

    @SpringBootApplication
    public class So40471456Application {
    
        public static void main(String[] args) throws Exception {
            ConfigurableApplicationContext context = SpringApplication.run(So40471456Application.class, args);
            Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
            socket.getOutputStream().write("foo\r\n".getBytes());
            socket.close();
            Thread.sleep(10000);
            context.close();
        }
    
        @Bean
        public EventListener eventListener() {
            return new EventListener();
        }
    
        @Bean
        public TcpNetServerConnectionFactory server() {
            return new TcpNetServerConnectionFactory(1234);
        }
    
        @Bean
        public TcpReceivingChannelAdapter inbound() {
            TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
            adapter.setConnectionFactory(server());
            adapter.setOutputChannelName("foo");
            return adapter;
        }
    
        @ServiceActivator(inputChannel = "foo")
        public void syso(byte[] in) {
            System.out.println(new String(in));
        }
    
        public static class EventListener implements ApplicationListener<TcpConnectionCloseEvent> {
    
            private final Log logger = LogFactory.getLog(getClass());
    
            @Override
            public void onApplicationEvent(TcpConnectionCloseEvent event) {
                logger.info(event);
            }
    
        }
    
    }
    

    使用 XML,只需为您的侦听器类添加 &lt;bean/&gt;

    结果:

    foo
    2016-11-07 16:52:04.133  INFO 29536 --- [pool-1-thread-2] c.e.So40471456Application$EventListener  : TcpConnectionCloseEvent 
    [source=org.springframework.integration.ip.tcp.connection.TcpNetConnection@118a7548], 
    [factory=server, connectionId=localhost:50347:1234:b9fcfaa9-e92c-487f-be59-1ed7ebd9312e] 
    **CLOSED**
    

    EDIT2

    对我来说它按预期工作......

    @SpringBootApplication
    public class So40471456Application {
    
        public static void main(String[] args) throws Exception {
            ConfigurableApplicationContext context = SpringApplication.run(So40471456Application.class, args);
            Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
            socket.getOutputStream().write("foo\r\n".getBytes());
            try {
                System.out.println("\n\n\n" + socket.getInputStream().read() + "\n\n\n");
                context.getBean(EventListener.class).latch.await(10, TimeUnit.SECONDS);
            }
            finally {
                socket.close();
                context.close();
            }
        }
    
        @Bean
        public EventListener eventListener() {
            return new EventListener();
        }
    
        @Bean
        public TcpNetServerConnectionFactory server() {
            TcpNetServerConnectionFactory server = new TcpNetServerConnectionFactory(1234);
            server.setDeserializer(is -> {
                throw new SoftEndOfStreamException();
            });
            return server;
        }
    
        @Bean
        public TcpReceivingChannelAdapter inbound() {
            TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
            adapter.setConnectionFactory(server());
            adapter.setOutputChannelName("foo");
            return adapter;
        }
    
        public static class EventListener implements ApplicationListener<TcpConnectionCloseEvent> {
    
            private final Log logger = LogFactory.getLog(getClass());
    
            private final CountDownLatch latch = new CountDownLatch(1);
    
            @Override
            public void onApplicationEvent(TcpConnectionCloseEvent event) {
                logger.info(event);
                latch.countDown();
            }
    
        }
    
    }
    

    结果:

    2016-11-08 08:27:25.964  INFO 86147 --- [           main] com.example2.So40471456Application       : Started So40471456Application in 1.195 seconds (JVM running for 1.764)
    
    
    
    -1
    
    
    
    2016-11-08 08:27:25.972  INFO 86147 --- [pool-1-thread-2] c.e.So40471456Application$EventListener  : TcpConnectionCloseEvent [source=org.springframework.integration.ip.tcp.connection.TcpNetConnection@fee3774], [factory=server, connectionId=localhost:54984:1234:f79a6826-0336-4823-8844-67054903a094] **CLOSED**
    

    【讨论】:

    • 听起来不错。框架在关闭连接时是否记录?想在日志中查看它何时关闭(最好使用 threadId。)
    • SoftEndOfStreamException 没有日志,因为它被视为“正常”关闭。但是,您可以添加一个ApplicationListener - 它将在调用您的反序列化器的同一线程上调用。我用一个例子编辑了我的答案。
    • ApplicationListener 非常有用。非常感谢。
    • Gary,请查看我关于客户端关闭连接问题的更新。您能否建议是否还有其他工作要做?
    猜你喜欢
    • 2011-06-22
    • 1970-01-01
    • 2017-10-02
    • 2013-04-29
    • 1970-01-01
    • 2018-12-12
    • 2020-05-26
    • 2020-11-30
    • 2011-01-26
    相关资源
    最近更新 更多