【问题标题】:Spring Integration tcp client multiple connectionsSpring集成tcp客户端多连接
【发布时间】:2016-11-09 13:05:09
【问题描述】:

我使用 Spring Integration tcp-outbound-adapter 和 tcp-inbound-adapter 通过 TCP 与第三方外部系统进行通信。

我使用的连接工厂是“client”类型,并且有single-use="false",因为与外部系统通信的本质是几十个请求和回复的会话。 外部系统希望我为每个会话打开一个新的 TCP 连接。

有没有办法通过 Spring Integration 做到这一点?

我的代码成功地将 SI 用于此类会话。但我希望我的系统打开几个这样的连接,这样我就可以处理几个并发会话。 目前,如果我向入站适配器发送新会话的消息,它使用相同的 TCP 连接。

请帮忙。

更新:

在使用 Gary 在这里给出的 ThreadAffinity 解决方案时,当我们执行超过 4 个并发请求时会出现此异常。知道这是为什么吗?

11:08:02.083  [pool-1-thread-2] 193.xxx.yyy.zz:443:55729:46c71372-5933-4707-a27b-93cc4bf78c59 Message sent GenericMessage [payload=byte[326], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2fb866, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2fb866, ip_tcp_remotePort=55718, ip_connectionId=127.0.0.1:55718:4444:7f71ce96-eaac-4b21-8b2c-bf736102f818, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=2dc3e330-d703-8a61-c46c-012233cadf6f, ip_hostname=127.0.0.1, timestamp=1481706480700}]
11:08:12.093  [pool-1-thread-2] Remote Timeout on 193.xxx.yyy.zz:443:55729:46c71372-5933-4707-a27b-93cc4bf78c59
11:08:12.093  [pool-1-thread-2] Tcp Gateway exception
org.springframework.integration.MessageTimeoutException: Timed out waiting for response
            at org.springframework.integration.ip.tcp.TcpOutboundGateway.handleRequestMessage(TcpOutboundGateway.java:146)
            at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
            at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
            at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
            at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
            at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
            at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
            at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
            at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
            at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
            at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
            at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
            at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292)
            at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212)
            at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129)
            at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
            at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
            at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
            at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
            at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
            at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
            at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
            at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
            at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:150)
            at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:45)
            at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:42)
            at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97)
            at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:441)
            at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:409)
            at org.springframework.integration.ip.tcp.TcpInboundGateway.doOnMessage(TcpInboundGateway.java:120)
            at org.springframework.integration.ip.tcp.TcpInboundGateway.onMessage(TcpInboundGateway.java:98)
            at org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorSupport.onMessage(TcpConnectionInterceptorSupport.java:159)
            at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:182)
            at org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorSupport.run(TcpConnectionInterceptorSupport.java:111)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

【问题讨论】:

    标签: tcp spring-integration


    【解决方案1】:

    这取决于什么构成“会话” - 如果来自客户端会话的所有请求都在单个线程上运行,您可以为连接工厂编写一个简单的包装器,将连接存储在ThreadLocal .在最后一次请求关闭连接并将其从ThreadLocal 中删除后,您需要一些机制来调用工厂包装器。

    如果会话请求可以发生在多个线程上,那会稍微复杂一些,但您仍然可以使用映射到连接实例的 ThreadLocal 来完成。

    编辑

    这是一个例子......

    @SpringBootApplication
    public class So40507731Application {
    
        public static void main(String[] args) throws Exception {
            ConfigurableApplicationContext context = SpringApplication.run(So40507731Application.class, args);
            MessageChannel channel = context.getBean("clientFlow.input", MessageChannel.class);
            MessagingTemplate template = new MessagingTemplate(channel);
            ThreadAffinityClientConnectionFactory affinityCF = context.getBean(ThreadAffinityClientConnectionFactory.class);
            ExecutorService exec = Executors.newCachedThreadPool();
            CountDownLatch latch = new CountDownLatch(2);
            exec.execute(() -> {
                String result = new String(template.convertSendAndReceive("foo", byte[].class));
                System.out.println(Thread.currentThread().getName() + " " + result);
                result = new String(template.convertSendAndReceive("foo", byte[].class));
                System.out.println(Thread.currentThread().getName() + " " + result);
                affinityCF.release();
                latch.countDown();
            });
            exec.execute(() -> {
                String result = new String(template.convertSendAndReceive("foo", byte[].class));
                System.out.println(Thread.currentThread().getName() + " " + result);
                result = new String(template.convertSendAndReceive("foo", byte[].class));
                System.out.println(Thread.currentThread().getName() + " " + result);
                affinityCF.release();
                latch.countDown();
            });
            latch.await(10, TimeUnit.SECONDS);
            context.close();
            exec.shutdownNow();
        }
    
        @Bean
        public TcpNetClientConnectionFactory delegateCF() {
            TcpNetClientConnectionFactory clientCF = new TcpNetClientConnectionFactory("localhost", 1234);
            clientCF.setSingleUse(true); // so each thread gets his own connection
            return clientCF;
        }
    
        @Bean
        public ThreadAffinityClientConnectionFactory affinityCF() {
            return new ThreadAffinityClientConnectionFactory(delegateCF());
        }
    
        @Bean
        public TcpOutboundGateway outGate() {
            TcpOutboundGateway outGate = new TcpOutboundGateway();
            outGate.setConnectionFactory(affinityCF());
            return outGate;
        }
    
        @Bean
        public IntegrationFlow clientFlow() {
            return f -> f.handle(outGate());
        }
    
        @Bean
        public TcpNetServerConnectionFactory serverCF() {
            return new TcpNetServerConnectionFactory(1234);
        }
    
        @Bean
        public TcpInboundGateway inGate() {
            TcpInboundGateway inGate = new TcpInboundGateway();
            inGate.setConnectionFactory(serverCF());
            return inGate;
        }
    
        @Bean
        public IntegrationFlow serverFlow() {
            return IntegrationFlows.from(inGate())
                    .transform(Transformers.objectToString())
                    .transform("headers['ip_connectionId'] + ' ' + payload")
                    .get();
        }
    
        public static class ThreadAffinityClientConnectionFactory extends AbstractClientConnectionFactory
                implements TcpListener {
    
            private final AbstractClientConnectionFactory delegate;
    
            private final ThreadLocal<TcpConnectionSupport> connection = new ThreadLocal<>();
    
            public ThreadAffinityClientConnectionFactory(AbstractClientConnectionFactory delegate) {
                super("", 0);
                delegate.registerListener(this);
                this.delegate = delegate;
            }
    
            @Override
            protected TcpConnectionSupport obtainConnection() throws Exception {
                TcpConnectionSupport tcpConnection = this.connection.get();
                if (tcpConnection == null || !tcpConnection.isOpen()) {
                    tcpConnection = this.delegate.getConnection();
                    this.connection.set(tcpConnection);
                }
                return tcpConnection;
            }
    
            public void release() {
                TcpConnectionSupport connection = this.connection.get();
                if (connection != null) {
                    connection.close();
                    this.connection.remove();
                }
            }
    
            @Override
            public void start() {
                this.delegate.start();
                setActive(true);
                super.start();
            }
    
            @Override
            public void stop() {
                this.delegate.stop();
                setActive(false);
                super.stop();
            }
    
            @Override
            public boolean onMessage(Message<?> message) {
                return getListener().onMessage(message);
            }
    
        }
    
    }
    

    结果:

    pool-2-thread-2 localhost:64559:1234:3d898822-ea91-421d-97f2-5f9620b9d369 foo
    pool-2-thread-1 localhost:64560:1234:227f8a9f-1461-41bf-943c-68a56f708b0c foo
    pool-2-thread-2 localhost:64559:1234:3d898822-ea91-421d-97f2-5f9620b9d369 foo
    pool-2-thread-1 localhost:64560:1234:227f8a9f-1461-41bf-943c-68a56f708b0c foo
    

    【讨论】:

    • 感谢您的快速回答。 SI 中不是已经准备好解决方案的常规用例吗?
    • 我假设包装器类似于 CachingClientConnectionFactory ?但是如何配置 int-ip:tcp-connection-factory 以返回包装类?
    • 能否请您更详细地告知如何做到这一点?我尝试了几种可能性,但都没有奏效。似乎在许多类中都可以找到一个共享连接的逻辑(在 singleUse="false" 的情况下)。 TIA 寻求帮助。
    • 之前没有被要求过;我开了一个JIRA Issue。它将使用与缓存 CF 类似的技术,但复杂性要低得多,因为它只是委托并且不必包装连接本身。我编辑了我的答案。
    • 感谢代码示例。这很有帮助。但是,您使用了 singleUse=true,而我的用例有所不同:我有几个 tcp 客户端(在同一个 JVM 中),都连接到同一个 tcp 服务器(外部系统),每个客户端都有一个“对话”,其中包含双向 20-150 条消息。所以我必须使用 singleUse=false 和 2 通道适配器。我扩展了您的代码示例,但不得不将 ConnectionFactoryFactoryBean 复制粘贴到我自己的代码中,以便创建我自己的 ConnectionFactory,因为它的某些方法无法扩展。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-03-14
    • 2018-12-14
    • 1970-01-01
    • 2015-10-11
    • 2010-11-18
    • 1970-01-01
    相关资源
    最近更新 更多