【问题标题】:Spring integration TCP/IP close connection problemSpring集成TCP/IP关闭连接问题
【发布时间】:2019-04-02 20:45:35
【问题描述】:

我使用 spring 集成作为全双工通信系统的网关模块。 该流程是客户端应用程序 spring-integration-ip-module (siid) 服务器应用程序 问题是当客户端应用程序关闭时,ssid 无法关闭与服务器应用程序端的连接? 这是我的代码

    // siid connet to client

@Bean
public TcpNetServerConnectionFactory server(){
    TcpNetServerConnectionFactory server=new TcpNetServerConnectionFactory(1234);
    server.setMapper(new TcpSerMapper()); // use 'mapper' attribute in XML
    MySerializer mySeri=new MySerializer();
    server.setDeserializer(mySeri);
    server.setSerializer(mySeri);
    return server;
    }

// inboundGateway, inChannel as reqeustChannel
@Bean
public TcpInboundGateway inGate(){
    TcpInboundGateway inGate=new TcpInboundGateway();
    inGate.setConnectionFactory(server());
    inGate.setRequestChannelName("inChannel");
    inGate.setReplyChannelName("outputChannel");
    return inGate;
    }

// serviceActivator to get inChannel's payload msg and send though a gateway.
@ServiceActivator(inputChannel = "inChannel")
public  byte[]doClientForward(Message<?> msg){
    byte[]msgPayload=(byte[])(msg.getPayload());
    byte[]sendResult=null;
    ToTCP toTcp=(ToTCP)contextBean.get("toTcpBean"); // ToTCP is a gateway
    sendResult=toTcp.sends((msgPayload),"localhost",7779);
    QueueChannel outputChannel=(QueueChannel)contextBean.get("outputChannel");
    return sendResult;
    }

public static class DynamicSerSeri extends AbstractPooledBufferByteArraySerializer {
protected byte[] doDeserialize(InputStream inputStream, byte[] buffer) throws IOException {
    byte[] bytes = this.copyBuffer(inputStream, buffer);
    return bytes;
}

public void serialize(byte[] object, OutputStream outputStream) throws IOException {
    outputStream.write(object);
}

public byte[] copyBuffer(InputStream inputStream, byte[] buffer) throws IOException {

    int n = 0;
    int bite = 0;
    try {
        while (true) {
            bite = inputStream.read(); // blocked here
            this.setMaxMessageSize(inputStream.available() + 1);
            buffer = new byte[inputStream.available() + 1];
            if (bite < 0 && n == 0) {
                throw new SoftEndOfStreamException("Stream closed between payloads");
            }
            checkClosure(bite);
            buffer[n++] = (byte) bite;
            if (bite == -1) {
                break;

            }
            if (n == this.maxMessageSize) {
                break;
            }
        }
        return buffer;
    } catch (SoftEndOfStreamException e) {
        throw e; // I was stuck here. when client closed, cf can't receive this exception and send close singnal to server side
    } catch (IOException e) {
        publishEvent(e, buffer, n);
        throw e;
    } catch (RuntimeException e) {
        publishEvent(e, buffer, n);
        throw e;
    }
}

}


@MessagingGateway()
public interface ToTCP {
@Gateway(requestChannel = "toTcp.input", replyChannel = "outputChannel")
public byte[] sends(byte[] data, @Header("host") String host, @Header("port") int port);
}

@Bean
public IntegrationFlow toTcp() {
    return f -> f.route(new ClientTcpRouter());
}

// I am not sure I understand IntegrationFlowContext,but it works
public static class ClientTcpRouter extends AbstractMessageRouter {
@Autowired
private IntegrationFlowContext flowContext;

@Override
protected synchronized Collection<MessageChannel> determineTargetChannels(Message<?> message) {
    // connection to server side.
    TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port); //?? this connection factory does's closed when inGate's connection factory throw SoftEndOfStreamException
    TcpOutboundGateway handler = new TcpOutboundGateway();
    handler.setConnectionFactory(cf);
    cf.setDeserializer(new DynamicSerSeri());
    cf.setSerializer(new DynamicSerSeri());
    IntegrationFlow flow = f -> f.handle(handler);
    IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
            this.flowContext.registration(flow)
                    .addBean(cf)
                    .id(hostPort + ".flow")
                    .register();
    MessageChannel inputChannel = flowRegistration.getInputChannel();
    this.subFlows.put(hostPort, inputChannel);
    return inputChannel;
}
}

TcpInboundGateway 获取从客户端到 inputChannel 的连接,我使用 serviceActivator 获取 inputChannel 的有效负载并通过 TcpOutboundGateway 发送到服务器端,该 TcpOutboundGateway 与服务器端有连接工厂。 当客户端关闭与spring-integration-ip-module的连接时,TcpInboundGateway可以得到SoftEndOfStreamException异常,但我不知道如何关闭TcpOutboundGateway与服务器端的连接。

【问题讨论】:

    标签: spring-integration spring-integration-ip


    【解决方案1】:

    使用ApplicationListener bean 或@EventListener 方法来监听TCP Events

    当你第一次打开一个出站连接时,你会得到一个TcpConnectionOpenEvent。默认情况下,它会在调用线程上发布(并将在其上接收)。您可以将出站连接 ID 与入站相关联。

    从入站连接工厂监听TcpConnectionCloseEvent;然后,您可以使用其connectionId 关闭出站连接。

    outboundFactory.closeConnection(connectionId);
    

    编辑

    由于您使用的是TcpNetServerConnectionFactory,因此您可以使用ThreadAffinityClientConnectionFactory,它将自动将传出连接与传入连接相关联。

    当您收到关闭传入连接的事件时,它将位于同一个线程上,因此您只需在该线程上调用releaseConnection() 即可关闭传出连接。

    这是一个例子

    @SpringBootApplication
    public class So55207274Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So55207274Application.class, args);
        }
    
        @Bean
        public IntegrationFlow flow() {
            return IntegrationFlows.from(Tcp.inboundGateway(server()))
                    .log()
                    .handle(Tcp.outboundGateway(threadBoundClient()))
                    .get();
        }
    
        @Bean
        public TcpNetServerConnectionFactory server() {
            return new TcpNetServerConnectionFactory(1234);
        }
    
        @Bean
        public ThreadAffinityClientConnectionFactory threadBoundClient() {
            return new ThreadAffinityClientConnectionFactory(client());
        }
    
        public TcpNetClientConnectionFactory client() {
            TcpNetClientConnectionFactory client = new TcpNetClientConnectionFactory("localhost", 1235);
            client.setSingleUse(true);
            return client;
        }
    
        @EventListener
        public void listen(TcpConnectionCloseEvent event) {
            if (event.getConnectionFactoryName().equals("server")) {
                try {
                    threadBoundClient().releaseConnection();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
            System.out.println(event);
        }
    
        // Test server
    
        @Bean
        public IntegrationFlow test() {
            return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1235)))
                    .transform(Transformers.objectToString())
                    .<String, String>transform(p -> p.toUpperCase())
                    .get();
        }
    
    }
    

    【讨论】:

    • 出站连接是否在@Gateway 中打开?你能告诉我它是如何出版的吗?因为在我的代码中,我认为出站连接是在“createNewSubflow”方法的注册中打开的。
    • 查看我的答案的编辑以获得更简单的解决方案。
    猜你喜欢
    • 1970-01-01
    • 2018-12-12
    • 2012-04-12
    • 1970-01-01
    • 2012-11-19
    • 2017-10-13
    • 2015-03-14
    • 2016-06-18
    • 1970-01-01
    相关资源
    最近更新 更多