【问题标题】:How to select between a future and stream in Rust?如何在 Rust 中的未来和流之间进行选择?
【发布时间】:2020-09-08 22:06:51
【问题描述】:

我刚刚开始在 Rust 中试验 futures/tokio。我可以只用期货或只用流来做非常基本的事情。我想知道如何在未来和流之间进行选择。

如何扩展 tokio 文档中的玩具问题以使用 tokio_timer::Timer 执行定时 HTTPS 请求?

extern crate futures; // v0.1 (old)
extern crate native_tls;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_tls;

use std::io;
use std::net::ToSocketAddrs;

use futures::Future;
use native_tls::TlsConnector;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;
use tokio_tls::TlsConnectorExt;

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let addr = "www.rust-lang.org:443".to_socket_addrs().unwrap().next().unwrap();

    let cx = TlsConnector::builder().unwrap().build().unwrap();
    let socket = TcpStream::connect(&addr, &handle);

    let tls_handshake = socket.and_then(|socket| {
        let tls = cx.connect_async("www.rust-lang.org", socket);
        tls.map_err(|e| {
            io::Error::new(io::ErrorKind::Other, e)
        })
    });
    let request = tls_handshake.and_then(|socket| {
        tokio_io::io::write_all(socket, "\
            GET / HTTP/1.0\r\n\
            Host: www.rust-lang.org\r\n\
            \r\n\
        ".as_bytes())
    });
    let response = request.and_then(|(socket, _request)| {
        tokio_io::io::read_to_end(socket, Vec::new())
    });

    let (_socket, data) = core.run(response).unwrap();
    println!("{}", String::from_utf8_lossy(&data));
}

【问题讨论】:

  • 你的意思是你想要连接超时吗?

标签: stream rust future rust-tokio


【解决方案1】:

您可以使用FutureExt::into_streamFuture 转换为Stream,然后在两个流上进行选择:

use futures::prelude::*; // 0.3.1

fn select_stream_or_future_as_stream<S, F>(stream: S, future: F) -> impl Stream<Item = S::Item>
where
    S: Stream,
    F: Future<Output = S::Item>,
{
    stream::select(future.into_stream(), stream)
}

另见:

【讨论】:

    【解决方案2】:

    我改编示例代码的方式如下,可能对初学者有用。

    let timer = tokio_timer::Timer::default();
        // Error out when timeout is reached
        let timeout = timer.sleep(time::Duration::from_millis(950)).then(|_| {
            future::err(io::Error::new(io::ErrorKind::Other, "Timeout"))
        });
    
        let handle = core.handle();
    
        // this returns IoFuture = BoxFuture<T, io::Error>;
        let addresses = tokio_dns::CpuPoolResolver::new(1 as usize).resolve("www.google.cz");
        let socket = addresses.and_then(|all_addresses| {
            let mut ipv4_addresses =  all_addresses.iter().filter(|x| is_ipv4(**x));
            let addr = ipv4_addresses.next().unwrap();
            let sock = TcpStream::connect(&SocketAddr::new(*addr, 443), &handle);
            sock.map_err(|e| {
                println!("{:?}", e);
                io::Error::new(io::ErrorKind::Other, e)
            })
        });
    
        let tls_handshake = socket.and_then(|socket| {
            println!("Got socket");
            let cx = TlsConnector::builder().unwrap().build().unwrap();
            let tls = cx.connect_async("www.google.cz", socket);
            tls.map_err(|e| {
                println!("{:?}", e);
                io::Error::new(io::ErrorKind::Other, e)
            })
        });
    
        let request = tls_handshake.and_then(|socket| {
            println!("SSL Handshake Successful");
            let write_all = tokio_io::io::write_all(socket, "\
                GET / HTTP/1.0\r\n\
                Host: www.google.cz\r\n\
                \r\n\
            ".as_bytes());
            println!("Wrote to socket");
            write_all.map_err(|e| {
                println!("{:?}", e);
                io::Error::new(io::ErrorKind::Other, e)
            })
        });
    
        let response = request.and_then(|(socket, _request)| {
            let read_till_end = tokio_io::io::read_to_end(socket, Vec::new());
            println!("Read till end of socket");
            read_till_end
        });
    
        let waiter = response.select(timeout).map(|(win, _)| {
            let (_socket, data) = win;
            data
        });
    
        let result = core.run(waiter);
    

    【讨论】:

      猜你喜欢
      • 2023-03-25
      • 2012-11-27
      • 2011-01-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多