【问题标题】:My tcp client using spring integration not able to get response我的 tcp 客户端使用 spring 集成无法获得响应
【发布时间】:2017-06-04 17:57:19
【问题描述】:

我已经使用 spring 集成创建了 tcp 客户端,我能够接收我发送消息的响应。但是当我使用 localDateTime.now() 记录时间时,我无法接收到 send message 的响应。我知道这可以使用时间设置使线程等待来解决。因为我是弹簧集成的新手所以请帮助我如何做到这一点。

@Configuration
@ComponentScan
@EnableAutoConfiguration
public class Test
{

    protected final Log logger = LogFactory.getLog(this.getClass());

    // **************** Client **********************************************
    @Bean
    public MessageChannel replyChannel()
    {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel sendChannel()
    {
        MessageChannel directChannel = new DirectChannel();
        return directChannel;
    }

    @EnableIntegration
    @IntegrationComponentScan
    @Configuration
    public static class config
    {
        @MessagingGateway(defaultRequestChannel = "sendChannel", defaultReplyChannel = "replyChannel")
        public interface Gateway
        {

            String Send(String in);

        }
    }

    @Bean
    AbstractClientConnectionFactory tcpNetClientConnectionFactory()
    {
        AbstractClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory("localhost",
                9999);
        tcpNetClientConnectionFactory.setSerializer(new UCCXImprovedSerializer());
        tcpNetClientConnectionFactory.setDeserializer(new UCCXImprovedSerializer());
        tcpNetClientConnectionFactory.setSingleUse(true);

        tcpNetClientConnectionFactory.setMapper(new TcpMessageMapper());
        return tcpNetClientConnectionFactory;
    }

    @Bean
    @ServiceActivator(inputChannel = "sendChannel")
    TcpOutboundGateway tcpOutboundGateway()
    {
        TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
        tcpOutboundGateway.setConnectionFactory(tcpNetClientConnectionFactory());
        tcpOutboundGateway.setReplyChannel(replyChannel());
        return tcpOutboundGateway;
    }

    public static void main(String args[])
    {
        // new LegaServer();
        ConfigurableApplicationContext applicationContext = SpringApplication.run(Test.class, args);
        String temp = applicationContext.getBean(Gateway.class).Send("kksingh");
        System.out.println(LocalDateTime.now()+"output" + temp);

        applicationContext.stop();

    }
}

我的自定义序列化器和反序列化器 UCCXImrovedSerializerclass 根据@Garry更新后

 public class UCCXImprovedSerializer implements Serializer<String>, Deserializer<String>
{
     @Override
    public String deserialize(InputStream initialStream) throws IOException
    {

        System.out.println("deserialzier called");
        StringBuilder sb = new StringBuilder();
        try (BufferedReader rdr = new BufferedReader(new InputStreamReader(initialStream)))
        {
            for (int c; (c = rdr.read()) != -1;)
            {
                sb.append((char) c);

            }
        }
        return sb.toString();
    }

    @Override
    public void serialize(String msg, OutputStream os) throws IOException
    {
        System.out.println(msg + "---serialize---" + Thread.currentThread().getName() + "");
        os.write(msg.getBytes());
    }
   }

我的服务器在端口 9999 代码

   try
        {
            clientSocket = echoServer.accept();
            System.out.println("client connection established..");
            is = new DataInputStream(clientSocket.getInputStream());
            os = new PrintStream(clientSocket.getOutputStream());
            String tempString = "kksingh";
            byte[] tempStringByte = tempString.getBytes();
            byte[] temp = new byte[tempString.getBytes().length];
            while (true)
            {
                is.read(temp);
                System.out.println(new String(temp) + "--received msg is--- " + LocalDateTime.now());
                System.out.println(LocalDateTime.now() + "sending value");
                os.write(tempStringByte);
                break;
            }
        } catch (IOException e)
        {
            System.out.println(e);
        }
    }

我的 tcp 客户端日志文件

2017-06-04 23:10:14.771  INFO 15568 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.endpoint.EventDrivenConsumer@1f12e153
kksingh---serialize---main
pool-1-thread-1---deserialize----
pool-1-thread-1---deserialize----
pool-1-thread-1---deserialize----
pool-1-thread-1---deserialize----
2017-06-04 23:10:14.812 ERROR 15568 --- [pool-1-thread-1] o.s.i.ip.tcp.TcpOutboundGateway          : Cannot correlate response - no pending reply for localhost:9999:57622:bc98ee29-8957-47bd-bd8a-f734c3ec3f9d
2017-06-04T23:10:14.809output
2017-06-04 23:10:14.821  INFO 15568 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 0

我的服务器端日志文件

client connection established..
kksingh--received msg is--- 2017-06-04T23:10:14.899
2017-06-04T23:10:14.899sending value

当我从服务器和 tcpclient 中删除 localdatetime.now() 时,我能够得到作为 outputkksingh 的响应

o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2017-06-05 12:46:32.494  INFO 29076 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2017-06-05 12:46:32.495  INFO 29076 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started _org.springframework.integration.errorLogger
2017-06-05 12:46:32.746  INFO 29076 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080 (http)
2017-06-05 12:46:32.753  INFO 29076 --- [           main] o.s.i.samples.tcpclientserver.Test       : Started Test in 2.422 seconds (JVM running for 2.716)
2017-06-05 12:46:32.761  INFO 29076 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {bridge:null} as a subscriber to the 'replyChannel' channel
2017-06-05 12:46:32.762  INFO 29076 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.replyChannel' has 1 subscriber(s).
2017-06-05 12:46:32.763  INFO 29076 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started org.springframework.integration.endpoint.EventDrivenConsumer@1f12e153
kksingh---serialize---main
pool-1-thread-1---deserialize----kksingh
outputkksingh
2017-06-05 12:46:32.837  INFO 29076 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 0
2017-06-05 12:46:32.839  INFO 29076 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Removing {bridge:null} as a subscriber to the 'replyChannel' channel
2017-06-05 12:46:32.839  INFO 29076 --- [   

【问题讨论】:

    标签: spring tcp spring-integration


    【解决方案1】:

    您的反序列化器正在反序列化多个数据包...

    pool-1-thread-1---deserialize----
    pool-1-thread-1---deserialize----
    pool-1-thread-1---deserialize----
    pool-1-thread-1---deserialize----
    

    产生4条回复消息;网关只能处理一个回复,这就是您看到错误消息的原因。

    您的反序列化器需要比仅捕获“可用”字节更智能。您需要消息中的某些内容来指示数据的结束(或关闭套接字以指示结束)。

    【讨论】:

    • 在我的用例中,消息末尾没有任何字符来标记流结束。所以我关闭了套接字。但我仍然得到同样的东西。如果你能举例说明我如何让我的反序列化器变得更聪明。
    • 当我从服务器和客户端删除 localdatetime.now() 时,我可以看到客户端的响应。日志在帖子中更新
    • 它可能会影响时间 - 你不能假设所有数据都会在一次读取中到达 - 你必须继续阅读直到收到-1(表示流结束)。跨度>
    • @Garry 我还有另一个与此相关的问题。如果我的服务器保持会话。那么我希望我的客户端在收到服务器的回复后不要关闭连接。或者在向服务器发送新消息后不应该启动新连接。如果您可以建议配置更改或提供相同的链接
    • tcpNetClientConnectionFactory.setSingleUse(false);,但您需要反序列化器中的逻辑才能知道每条消息的结尾是什么。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-09-14
    • 2018-08-01
    • 1970-01-01
    • 2017-05-03
    • 2015-10-11
    • 2016-07-26
    相关资源
    最近更新 更多