【问题标题】:Need help understanding TCP Channels需要帮助了解 TCP 通道
【发布时间】:2022-01-14 19:52:00
【问题描述】:

我有一台正在运行的机器,它通过 TCP 将状态信息发送到您在机器上设置的 IP 地址和端口。如果我在该 IP 地址上使用机器的命令行并运行“nc -l”,我会从机器获取状态数据。我正在尝试构建一个 Java Spring 应用程序来摄取它,但是所有 Java TCP 教程都在谈论设置通道名称和订阅通道?通道是建立在 TCP 之上的东西,而我的机器只是没有使用通道,还是在运行命令行“nc -l”命令时监听了一些默认通道?请帮助我很困惑

编辑 1:添加我无法与 Spring 应用程序集成的首次尝试代码,也无法将数据存储在 Spring JPA 中

public class EchoMultiServer {


    private ServerSocket serverSocket;

    public void start(int port) {
        try {
            serverSocket = new ServerSocket(port);
            while (true)
                new EchoClientHandler(serverSocket.accept()).start();

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            stop();
        }

    }

    public void stop() {
        try {

            serverSocket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    private static class EchoClientHandler extends Thread {
        private Socket clientSocket;
        private PrintWriter out;
        private BufferedReader in;

        @Autowired
        PowerStationService powerStationService;
//this service connects to the repository to store the data

        public EchoClientHandler(Socket socket) {
            this.clientSocket = socket;
        }

        public JSONObject mapJsonInput(String incomingText){
            try{
                JSONObject json = new JSONObject(incomingText);
                return json;
            } catch (JSONException e){
                System.out.println("JSONException " + e);
                return null;
            }

        }

        public JSONObject run() {
            try {
                out = new PrintWriter(clientSocket.getOutputStream(), true);
                in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                StringBuilder textBuilder = new StringBuilder();
                int c = 0;
                int leftCaratCount=0;
                int rightCaratCount=0;
                while ((c = in.read()) != -1) {
                    char character = (char) c;
                    textBuilder.append(character);
                    if (character == '{') {
                        leftCaratCount++;
                    } else if (character == '}') {
                        rightCaratCount++;
                        if (rightCaratCount == leftCaratCount) {
                            System.out.println(textBuilder);
                            JSONObject registrationJson = mapJsonInput(textBuilder.toString());
                            System.out.println("we got em");
                            powerStationService.save(new PowerStation(registrationJson.get("D").toString(), registrationJson.get("G").toString(), Integer.parseInt(registrationJson.get("Y").toString()), Integer.parseInt(registrationJson.get("S").toString()), registrationJson.get("C").toString(), registrationJson.get("Z").toString(), registrationJson.get("V").toString()));
                            out.println("000250{\"A\":\"45514\",\"C\":\""+registrationJson.get("Y")+"\",\"E\":\"30,5\",\"G\":\""+registrationJson.get("G")+"\",\"H\":\"0\",\"K\":\"1\",\"M\":\"123456\",\"N\":\"" + System.currentTimeMillis() + "\",\"O\":\"13371\",\"P\":\"" + clientSocket.getLocalAddress().getHostAddress() + "\",\"S\":\"60000\",\"U\":\"\",\"V\":\"\",\"W\":\"https://admin.chargenow.top/cdb-socket-api/v1/socketserver/common\",\"X\":\"0\",\"Y\":\"FJC\",\"Z\":\"\"}");
                            in.close();
                            out.close();
                            clientSocket.close();
                        }
                    }
                }

            } catch (IOException e) {
                System.out.println(e.getMessage());
            }
        }
    }

    public static void main(String[] args) {
        EchoMultiServer server = new EchoMultiServer();
        server.start(13370);
    }

}

编辑 2:此外,我尝试使用 Spring 示例 Github 中的示例来查看它是否可以在我尝试的端口上接收消息。我可以使用 NetCat 查看 ServerOut 消息,但应用程序没有收到回复


@SpringBootApplication
@EnableConfigurationProperties(SampleProperties.class)
public class TcpAsyncBiDirectionalApplication {

    public static void main(String[] args) {
        SpringApplication.run(TcpAsyncBiDirectionalApplication.class, args);
    }
}
@Configuration 
class ServerPeer {

    private final Set<String> clients = ConcurrentHashMap.newKeySet();

    @Bean
    public AbstractServerConnectionFactory server(SampleProperties properties) {
        return Tcp.netServer(properties.getServerPort()).get();
    }

    @Bean
    public IntegrationFlow serverIn(AbstractServerConnectionFactory server) {
        return IntegrationFlows.from(Tcp.inboundAdapter(server))
                .transform(Transformers.objectToString())
                .log(msg -> "received by server: " + msg.getPayload())
                .get();
    }

    @Bean
    public IntegrationFlow serverOut(AbstractServerConnectionFactory server) {
        return IntegrationFlows.fromSupplier(() -> "seed", e -> e.poller(Pollers.fixedDelay(5000)))
                .split(this.clients, "iterator")
                .enrichHeaders(h -> h.headerExpression(IpHeaders.CONNECTION_ID, "payload"))
                .transform(p -> "sent by server Hello from server")
                .handle(Tcp.outboundAdapter(server))
                .get();
    }

    @EventListener
    public void open(TcpConnectionOpenEvent event) {
        if (event.getConnectionFactoryName().equals("server")) {
            this.clients.add(event.getConnectionId());
        }
    }

    @EventListener
    public void close(TcpConnectionCloseEvent event) {
        this.clients.remove(event.getConnectionId());
    }

}

    enter code here
    enter code here

【问题讨论】:

    标签: java spring tcp spring-integration


    【解决方案1】:

    TCP/IP 只是 Spring Integration 中作为通道适配器实现的协议之一。

    查看更多理论信息:https://www.enterpriseintegrationpatterns.com/ChannelAdapter.html

    这里是 Spring Integration 中所有支持的协议的列表:

    https://docs.spring.io/spring-integration/docs/current/reference/html/endpoint-summary.html#spring-integration-endpoints

    这些教程所讨论的是应用程序逻辑内部的消息传递通道。它与外部协议无关,例如您的 TCP/IP。如果您的应用程序将成为该 IP 主机/端口的客户端,或者它将成为打开相应套接字的服务器,您只需要自己了解:

    https://docs.spring.io/spring-integration/docs/current/reference/html/ip.html#ip

    【讨论】:

    • 哇,谢谢,这太棒了!我担心没有人会回应。所以我试图接收的 TCP 流似乎是在没有任何通道适配器的情况下发送的,它只是对该端口上的任何东西开放。我怎样才能让 Spring 摄入呢?当我尝试时,它似乎试图将其限制在频道上。
    • 对不起,我不完全明白发生了什么。从未与nc 合作过。您是否介意分享您迄今为止尝试过的问题?
    • 感谢您的回复,我将添加更多信息以帮助解释它
    • 好的。看起来nc -l 确实成为特定端口上的 TCP/IP 服务器:varonis.com/blog/netcat-commands。因此,您需要使用 Spring Integration 中的 TcpReceivingChannelAdapterTcpNetServerConnectionFactory 来成为您客户的类似服务器。确保使用正确的消息分隔符让您的 Spring 服务器将一个包与另一个包分开!我在回答中提到的 TCP/IP 文档中提供了所有信息。
    • 我认为你真的需要让自己熟悉 EIP:enterpriseintegrationpatterns.com 并了解什么是端点、消息和通道。您确实可以接受来自 TCP 的数据并使用 Spring Integration 将其转发到 JPA。您确实需要与您的客户同意最终发送一些分隔符。否则你将无法正确解析包。计算左括号真的不值得......
    猜你喜欢
    • 1970-01-01
    • 2017-06-19
    • 2016-05-03
    • 1970-01-01
    • 2021-01-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多