【问题标题】:Perpetual tokio TCP stream (client)永久 tokio TCP 流(客户端)
【发布时间】:2020-04-18 09:21:07
【问题描述】:

前言,可以跳到下一节

所以我决定在我的新的相对较小的项目中尝试使用 Rust,因为我喜欢它生成一个单一的可执行文件,该可执行文件很容易在我的基于 ARM 的目标上部署,并且在 RAM 和磁盘空间方面的资源相对较少。我以前没有使用 Rust 的经验,但有很多其他语言的经验,到目前为止我有点失望。似乎对于许多 Rust 库,可能还有 Rust 本身,API 变化如此之快,以至于网上找到的 90% 的示例代码都无法使用最新版本的库(如 tokiotokio-util 等)进行编译。此外,文档通常误导。例如,如果您在 Google 搜索 LinesCodec,它将显示在 tokio_io::codec::LinesCodectokio::codec::LinesCodectokio_codec::LinesCodectokio_util::codec::LinesCodec,这最终似乎是今天可以使用的一个。 FramedRead 等其他事物也有同样的困惑,它在某些版本中具有 and_thenmap 成员函数,但它们似乎在最新版本中不存在。最后,在 SO 上与 Rust 相关的问题和答案的数量远远少于我使用过的其他语言,这使得开始使用 Rust 变得更加困难。我过去 2 天尝试做的事情在大多数编程语言中都相对容易解决,我相信在 Rust 中也一定有一个简单的解决方案,但到目前为止我还没有成功。

问题本身

我需要将 TCP 客户端连接到远程服务器,并在数据传入时无限期地逐行读取和处理。这需要异步完成,因为同一个进程也充当 HTTP 服务器,所以我使用的是tokio

据我了解,比较常见的方法是使用 TcpStream,将其分割到 RX/TX 部分,然后我尝试连接 LinesCodec(与 FramedRead)但我无法在不出现编译错误的情况下将所有这些连接在一起。

[dependencies]
futures = "*"
hyper = "*"
tokio = { version = "*", features = ["full"] }
tokio-util = "0.2.0"
tokio-modbus = { version = "*", features = ["tcp", "server", "tcp-server-unstable"], git = "https://github.com/slowtec/tokio-modbus" }
let stream = TcpStream::connect("172.16.100.10:1001").await.unwrap();
let transport = FramedRead::new(stream, LinesCodec::new()); // need to split?
/* ... what to do next to process incoming data line-by-line ...? */

到目前为止,我提出了这个解决方案,但不确定它有多好

tokio::spawn(async {
    let connection = TcpStream::connect("172.16.100.10:1001").await.unwrap();
    let mut reader = BufReader::new(connection);

    loop {
        let mut line = String::new();
        reader.read_line(&mut line).await.unwrap();
        println!("{}", line);
    }
});

【问题讨论】:

    标签: rust rust-tokio


    【解决方案1】:

    带有 Cargo.toml 的简单应用,例如:

    [dependencies]
    tokio = { version = "0.3", features = ["full"] }
    tokio-util = { version = "0.4", features = ["codec"] }
    

    还有一个 main.rs,例如:

    use tokio::net::{TcpListener, TcpStream };
    use tokio_util::codec::{ Framed, LinesCodec };
    use tokio::stream::StreamExt;
    use std::error::Error;
    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn Error>> {
        let args: Vec<String> = std::env::args().collect();
    
        if args[1] == "server"
        {
            let local_addr: String = format!("{}{}",":::",args[2]); // app <server | client> <port>
    
            let listener = TcpListener::bind(&local_addr).await?;
    
            while let Ok((socket, peer)) = listener.accept().await {
    
                tokio::spawn(async move {
                    println!("Client Connected from: {}",peer.to_string());
                    let mut client = Framed::new(socket, LinesCodec::new_with_max_length(1024));
            
                    while let Some(Ok(line)) = client.next().await {
                        println!("{}", line);
                    }
                });
            }
        }
        else if args[1] == "client"
        {
            let port = args[2].parse::<u16>().unwrap(); // app client <port>
            let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); 
            let conn = TcpStream::connect(saddr).await?;
    
            let mut server = Framed::new(conn, LinesCodec::new_with_max_length(1024));
    
            while let Some(Ok(line)) = server.next().await {
                println!("{}", line);
            }
        }
    
        Ok({})
    }
    

    作为服务器运行:

    cargo run server 8080
    (in another shell) nc localhost 8080
    

    作为客户端运行:

    (in another shell) nc -l -p 8080
    cargo run client 8080
    

    【讨论】:

      【解决方案2】:

      chat example program 中有一个如何使用 LineCodec 的示例。相关部分是“进程”函数。

      一个较小的示例(服务器,但客户端的原理相同)反转它接收到的每一行并将其回显,最大缓冲区大小为 5000:

      use tokio::net::TcpListener;
      use tokio::stream::StreamExt;
      
      use tokio_util::codec::{Framed, LinesCodec};
      
      extern crate unicode_segmentation;
      use unicode_segmentation::UnicodeSegmentation;
      
      use futures::SinkExt;
      
      async fn talk(sock: tokio::net::TcpStream) {
          let mut lines = Framed::new(sock, LinesCodec::LinesCodec::new_with_max_length(5000));
      
          while let Some(Ok(line)) = lines.next().await {
              let rev = line.graphemes(true).rev().collect::<String>();
              if let Err(_e) = lines.send(rev).await {
                  break;
              }
          }
      }
      
      #[tokio::main]
      async fn main() {
          let addr = "127.0.0.1:6200";
          let mut listener = TcpListener::bind(addr).await.unwrap();
      
          let mut incoming = listener.incoming();
          while let Some(conn) = incoming.next().await {
              match conn {
                  Err(e) => eprintln!("accept failed = {:?}", e),
                  Ok(sock) => {
                      tokio::spawn(talk(sock));
                  }
              }
          }
      }
      

      read_line 解决方法的一个问题是read_line 没有提供限制行长的选项,因此如果您有不受信任的输入,它可能会导致您的程序消耗任意数量的内存。 LineCodec 确实提供了限制行长的选项。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2012-08-11
        • 2012-08-23
        • 1970-01-01
        • 1970-01-01
        • 2011-12-07
        • 2016-12-26
        • 2018-09-08
        • 1970-01-01
        相关资源
        最近更新 更多