【问题标题】:spring arbitrary messaging tcp socketspring 任意消息传递 tcp 套接字
【发布时间】:2017-06-05 05:51:22
【问题描述】:

我正在使用 spring-integration 开发一个自定义的双向 tcp 套接字服务器。

服务器将处理请求/响应任务,但我无法向特定连接 ID 发送任意消息

我也知道可能使用TcpSendingMessageHandlerTcpReceivingChannelAdapter 是解决方案,但我找不到任何关于如何使用它的示例代码。

这是我的代码:

public class SocketServer {

    private Logger logger = LoggerFactory.getLogger(SocketServer.class);

    @Bean
    public AbstractServerConnectionFactory connectionFactory() {
        return new TcpNetServerConnectionFactory(2025);
    }

    @Bean
    public TcpInboundGateway TcpInboundGateway(AbstractServerConnectionFactory connectionFactory) {
        TcpInboundGateway inGate = new TcpInboundGateway();
        inGate.setConnectionFactory(connectionFactory);
        inGate.setRequestChannelName("directChannel");
        return inGate;
    }

    @Bean
    public DirectChannel directChannel() {
        return new DirectChannel();
    }

    @MessageEndpoint
    public class Echo {

        @Transformer(inputChannel = "directChannel")
        public String process(byte[] input) throws Exception {
            return new String(input).toUpperCase();
        }

    }

    public boolean sendMessage(String connectionId){
           //TODO send Message
    }
}

【问题讨论】:

    标签: java spring sockets tcp spring-integration


    【解决方案1】:

    给你 - 应该是不言自明的......

    @SpringBootApplication
    public class So41760289Application {
    
        public static void main(String[] args) throws Exception {
            ConfigurableApplicationContext context = SpringApplication.run(So41760289Application.class, args);
            Socket socket = SocketFactory.getDefault().createSocket("localhost", 12345);
    
            // request/reply
            socket.getOutputStream().write("foo\r\n".getBytes());
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            System.out.println(reader.readLine());
    
            // arbitrary send
            @SuppressWarnings("unchecked")
            Set<String> connections = context.getBean(Set.class);
            for (String connection : connections) {
                context.getBean("bar", MessageChannel.class).send(
                        MessageBuilder.withPayload("foo")
                            .setHeader(IpHeaders.CONNECTION_ID, connection)
                            .build());
            }
            System.out.println(reader.readLine());
            reader.close();
            context.close();
        }
    
        @Bean
        public TcpNetServerConnectionFactory cf() {
            return new TcpNetServerConnectionFactory(12345);
        }
    
        @Bean
        public TcpReceivingChannelAdapter receiver() {
            TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
            adapter.setConnectionFactory(cf());
            adapter.setOutputChannelName("foo");
            return adapter;
        }
    
        @Transformer(inputChannel = "foo", outputChannel = "bar")
        public String process(byte[] in) {
            return new String(in).toUpperCase();
        }
    
        @Bean
        @ServiceActivator(inputChannel = "bar")
        public TcpSendingMessageHandler sender() {
            TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
            handler.setConnectionFactory(cf());
            return handler;
        }
    
        @Bean
        public Set<String> connections() {
            return Collections.synchronizedSet(new HashSet<>());
        }
    
        @Bean
        public ApplicationListener<TcpConnectionEvent> listener() {
            return new ApplicationListener<TcpConnectionEvent>() {
    
                @Override
                public void onApplicationEvent(TcpConnectionEvent event) {
                    if (event instanceof TcpConnectionOpenEvent) {
                        connections().add(event.getConnectionId());
                    }
                    else if (event instanceof TcpConnectionCloseEvent) {
                        connections().remove(event.getConnectionId());
                    }
                }
    
            };
        }
    
    }
    

    【讨论】:

    • 您能否在响应中添加请求/响应、任意消息传递客户端?
    • 您能进一步解释一下吗? main() 方法就是这样做的。
    • Bean named 'bar' is expected to be of type 'org.springframework.messaging.MessageChannel' but was actually of type 'org.springframework.integration.ip.tcp.TcpSendingMessageHandler'
    • 您应该提出一个新问题,展示您的配置,而不是评论现有的答案。看来您有 2 个名为 bar 的 bean 定义,但我需要查看您的配置。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-01-21
    • 1970-01-01
    • 2019-03-31
    • 1970-01-01
    • 2012-03-25
    • 1970-01-01
    相关资源
    最近更新 更多