【问题标题】:Ping(ICMP) multiple destinations parallely Using Boost.asio使用 Boost.asio 并行 Ping(ICMP) 多个目的地
【发布时间】:2020-12-07 18:25:03
【问题描述】:

我已修改 ICMP ping 实现 (https://think-async.com/Asio/asio-1.18.0/src/examples/cpp03/icmp/ping.cpp) 以同时 ping 多个目标,而不是按示例中所示的顺序。我尝试使用 std::thread 和 std::async(以及期货)。

但只有当所有目的地都无法到达时,它才能按预期工作。不能同时进行吗?我在 pinger 类中禁用了对结果/超时的重新 ping

const char* ping(const char* destination)
{
   asio::io_context io_context;
   pinger p(io_context, destination);
   io_context.run();
   return p.get();
}
   
 int main()
{
   std::future<const char*> a1 = std::async(std::launch::async, ping, "10.2.7.196");
   std::future<const char*> a2 = std::async(std::launch::async, ping, "10.2.7.19");
   std::cout<<a1.get()<<std::endl;
   std::cout<<a2.get()<<std::endl; 
}

【问题讨论】:

    标签: c++ multithreading asynchronous networking boost-asio


    【解决方案1】:

    你不需要std::async¹。

    但是从您展示的一小段代码中,我可以猜测²您的错误正在返回原始char const*。他们引用pinger 中的数据的可能性很大,当未来完成时——显然——不再有效(pinger 将超出范围)。

    发生这种情况的典型方法是,如果您将输出存储在 std::string 成员中,并使用 .c_str()get() 返回。

    如果get() 简单地返回像return "unreachable" 这样的字符串文字,它会“工作”于无法访问的目标的一个原因,它不会出现上述的生命周期问题。

    放弃水晶球

    所以,想象一下返回结果的正确方法:

    Live On Wandbox³

    #include <boost/asio.hpp>
    #include <boost/bind/bind.hpp>
    namespace asio = boost::asio;
    
    #include "icmp_header.hpp"
    #include "ipv4_header.hpp"
    
    using asio::steady_timer;
    using asio::ip::icmp;
    namespace chrono = asio::chrono;
    
    class pinger {
      public:
        pinger(asio::io_context& io_context, const char* destination)
                : resolver_(io_context), socket_(io_context, icmp::v4()),
                  timer_(io_context), sequence_number_(0), num_replies_(0) {
            destination_ = *resolver_.resolve(icmp::v4(), destination, "").begin();
    
            start_send();
            start_receive();
        }
    
        std::string get() { auto r = _output.str(); _output.str(""); return r; }
      private:
        void start_send() {
            std::string body("\"Hello!\" from Asio ping.");
    
            // Create an ICMP header for an echo request.
            icmp_header echo_request;
            echo_request.type(icmp_header::echo_request);
            echo_request.code(0);
            echo_request.identifier(get_identifier());
            echo_request.sequence_number(++sequence_number_);
            compute_checksum(echo_request, body.begin(), body.end());
    
            // Encode the request packet.
            asio::streambuf request_buffer;
            std::ostream os(&request_buffer);
            os << echo_request << body;
    
            // Send the request.
            time_sent_ = steady_timer::clock_type::now();
            socket_.send_to(request_buffer.data(), destination_);
    
            // Wait up to five seconds for a reply.
            num_replies_ = 0;
            timer_.expires_at(time_sent_ + chrono::seconds(5));
            timer_.async_wait(boost::bind(&pinger::handle_timeout, this));
        }
    
        void handle_timeout() {
            if (num_replies_ == 0)
                _output << "Request timed out";
    
            //// Requests must be sent no less than one second apart.
            //timer_.expires_at(time_sent_ + chrono::seconds(1));
            //timer_.async_wait(boost::bind(&pinger::start_send, this));
        }
    
        void start_receive() {
            // Discard any data already in the buffer.
            reply_buffer_.consume(reply_buffer_.size());
    
            // Wait for a reply. We prepare the buffer to receive up to 64KB.
            socket_.async_receive(reply_buffer_.prepare(65536),
                                  boost::bind(&pinger::handle_receive, this,
                                              boost::placeholders::_2));
        }
    
        void handle_receive(std::size_t length) {
            // The actual number of bytes received is committed to the buffer so
            // that we can extract it using a std::istream object.
            reply_buffer_.commit(length);
    
            // Decode the reply packet.
            std::istream is(&reply_buffer_);
            ipv4_header ipv4_hdr;
            icmp_header icmp_hdr;
            is >> ipv4_hdr >> icmp_hdr;
    
            // We can receive all ICMP packets received by the host, so we need to
            // filter out only the echo replies that match the our identifier and
            // expected sequence number.
            if (is && icmp_hdr.type() == icmp_header::echo_reply &&
                icmp_hdr.identifier() == get_identifier() &&
                icmp_hdr.sequence_number() == sequence_number_) {
                // If this is the first reply, interrupt the five second timeout.
                if (num_replies_++ == 0)
                    timer_.cancel();
    
                // Print out some information about the reply packet.
                chrono::steady_clock::time_point now = chrono::steady_clock::now();
                chrono::steady_clock::duration elapsed = now - time_sent_;
                _output
                    << length - ipv4_hdr.header_length() << " bytes from "
                    << ipv4_hdr.source_address()
                    << ": icmp_seq=" << icmp_hdr.sequence_number()
                    << ", ttl=" << ipv4_hdr.time_to_live() << ", time="
                    << chrono::duration_cast<chrono::milliseconds>(elapsed).count();
            }
    
            //start_receive();
        }
    
        static unsigned short get_identifier() {
    #if defined(ASIO_WINDOWS)
            return static_cast<unsigned short>(::GetCurrentProcessId());
    #else
            return static_cast<unsigned short>(::getpid());
    #endif
        }
    
        std::ostringstream _output;
    
        icmp::resolver resolver_;
        icmp::endpoint destination_;
        icmp::socket socket_;
        steady_timer timer_;
        unsigned short sequence_number_;
        chrono::steady_clock::time_point time_sent_;
        asio::streambuf reply_buffer_;
        std::size_t num_replies_;
    };
    
    std::string ping1(const char* destination) {
        asio::io_context io_context;
        pinger p(io_context, destination);
        io_context.run();
        return p.get();
    }
    
    #include <list>
    #include <iostream>
    int main(int argc, char** argv) {
        std::list<std::future<std::string> > futures;
        for (char const* arg : std::vector(argv+1, argv+argc)) {
            futures.push_back(std::async(std::launch::async, ping1, arg));
        }
    
        for (auto& f : futures) {
            std::cout << f.get() << std::endl;
        }
    }
    

    如您所见,我制作了目的地命令行参数列表。因此,当我像这样运行它时:

    sudo ./sotest 127.0.0.{1..100} |& sort | uniq -c
    

    我得到这个输出:

      1 32 bytes from 127.0.0.12: icmp_seq=1, ttl=64, time=0
      1 32 bytes from 127.0.0.16: icmp_seq=1, ttl=64, time=0
      7 32 bytes from 127.0.0.44: icmp_seq=1, ttl=64, time=0
      1 32 bytes from 127.0.0.77: icmp_seq=1, ttl=64, time=1
      1 32 bytes from 127.0.0.82: icmp_seq=1, ttl=64, time=1
      1 32 bytes from 127.0.0.9: icmp_seq=1, ttl=64, time=0
     88 Request timed out
    

    我实际上不确定为什么会有这么多超时,但关键是现在的代码正确。此代码运行并完成 UBSan/ASan 清理。不过,请参阅下文了解稍后发现的修复程序

    现在,让我们放弃未来

    未来可能会产生大量开销。事实上,每个 ping 都有一个 io_service。让我们一个人完成。

    #include <list>
    #include <iostream>
    int main(int argc, char** argv) {
        asio::io_context io_context;
    
        std::list<pinger> pingers;
        for (char const* arg : std::vector(argv+1, argv+argc)) {
            pingers.emplace_back(io_context, arg);
        }
    
        io_context.run();
    
        for (auto& p : pingers) {
            std::cout << p.get() << std::endl;
        }
    }
    

    注意这里的同步点是io_context.run(),和以前一样,只是现在它在主线程上一次性运行所有的ping。

    纠正取消

    所以,我现在注意到了为什么这么多 ping 被误认为无法访问。

    原因是因为handle_receive 需要过滤掉不响应我们 ping 的 ICMP 回复,所以如果发生这种情况,我们需要继续 start_receive() 直到我们得到它:

    void start_receive() {
        // Discard any data already in the buffer.
        reply_buffer_.consume(reply_buffer_.size());
    
        // Wait for a reply. We prepare the buffer to receive up to 64KB.
        socket_.async_receive(reply_buffer_.prepare(65536),
                              boost::bind(&pinger::handle_receive, this,
                                 boost::asio::placeholders::error(),
                                 boost::asio::placeholders::bytes_transferred()));
    }
    
    void handle_receive(boost::system::error_code ec, std::size_t length) {
        if (ec) {
            if (ec == boost::asio::error::operation_aborted) {
                _output << "Request timed out";
            } else {
                _output << "error: " << ec.message();
            }
            return;
        }
        // The actual number of bytes received is committed to the buffer so
        // that we can extract it using a std::istream object.
        reply_buffer_.commit(length);
    
        // Decode the reply packet.
        std::istream is(&reply_buffer_);
        ipv4_header ipv4_hdr;
        icmp_header icmp_hdr;
        is >> ipv4_hdr >> icmp_hdr;
    
        // We can receive all ICMP packets received by the host, so we need to
        // filter out only the echo replies that match the our identifier and
        // expected sequence number.
        if (is && icmp_hdr.type() == icmp_header::echo_reply &&
            icmp_hdr.identifier() == get_identifier() &&
            icmp_hdr.sequence_number() == sequence_number_) {
            // If this is the first reply, interrupt the five second timeout.
            if (num_replies_++ == 0)
                timer_.cancel();
    
            // Print out some information about the reply packet.
            chrono::steady_clock::time_point now = chrono::steady_clock::now();
            chrono::steady_clock::duration elapsed = now - time_sent_;
            _output
                << length - ipv4_hdr.header_length() << " bytes from "
                << ipv4_hdr.source_address()
                << ": icmp_seq=" << icmp_hdr.sequence_number()
                << ", ttl=" << ipv4_hdr.time_to_live() << ", time="
                << chrono::duration_cast<chrono::milliseconds>(elapsed).count();
        } else start_receive();
    }
    

    现在,handle_timeout 可以简化为:

    void handle_timeout() {
        if (num_replies_ == 0) {
            socket_.cancel(); // _output is set in response to error_code
        }
    }
    

    事实上,我们可能会简化以完全删除num_replies,但我将把它留给读者作为驱魔

    完整演示

    Live On Wandbox

    #include <boost/asio.hpp>
    #include <boost/bind/bind.hpp>
    namespace asio = boost::asio;
    
    #include "icmp_header.hpp"
    #include "ipv4_header.hpp"
    
    using asio::steady_timer;
    using asio::ip::icmp;
    namespace chrono = asio::chrono;
    
    class pinger {
      public:
        pinger(asio::io_context& io_context, const char* destination)
                : resolver_(io_context), socket_(io_context, icmp::v4()),
                  timer_(io_context), sequence_number_(0), num_replies_(0) {
            destination_ = *resolver_.resolve(icmp::v4(), destination, "").begin();
    
            start_send();
            start_receive();
        }
    
        std::string get() { auto r = _output.str(); _output.str(""); return r; }
      private:
        void start_send() {
            std::string body("\"Hello!\" from Asio ping.");
    
            // Create an ICMP header for an echo request.
            icmp_header echo_request;
            echo_request.type(icmp_header::echo_request);
            echo_request.code(0);
            echo_request.identifier(get_identifier());
            echo_request.sequence_number(++sequence_number_);
            compute_checksum(echo_request, body.begin(), body.end());
    
            // Encode the request packet.
            asio::streambuf request_buffer;
            std::ostream os(&request_buffer);
            os << echo_request << body;
    
            // Send the request.
            time_sent_ = steady_timer::clock_type::now();
            socket_.send_to(request_buffer.data(), destination_);
    
            // Wait up to five seconds for a reply.
            num_replies_ = 0;
            timer_.expires_at(time_sent_ + chrono::seconds(5));
            timer_.async_wait(boost::bind(&pinger::handle_timeout, this));
        }
    
        void handle_timeout() {
            if (num_replies_ == 0) {
                socket_.cancel(); // _output is set in response to error_code
            }
        }
    
        void start_receive() {
            // Discard any data already in the buffer.
            reply_buffer_.consume(reply_buffer_.size());
    
            // Wait for a reply. We prepare the buffer to receive up to 64KB.
            socket_.async_receive(reply_buffer_.prepare(65536),
                                  boost::bind(&pinger::handle_receive, this,
                                     boost::asio::placeholders::error(),
                                     boost::asio::placeholders::bytes_transferred()));
        }
    
        void handle_receive(boost::system::error_code ec, std::size_t length) {
            if (ec) {
                if (ec == boost::asio::error::operation_aborted) {
                    _output << "Request timed out";
                } else {
                    _output << "error: " << ec.message();
                }
                return;
            }
            // The actual number of bytes received is committed to the buffer so
            // that we can extract it using a std::istream object.
            reply_buffer_.commit(length);
    
            // Decode the reply packet.
            std::istream is(&reply_buffer_);
            ipv4_header ipv4_hdr;
            icmp_header icmp_hdr;
            is >> ipv4_hdr >> icmp_hdr;
    
            // We can receive all ICMP packets received by the host, so we need to
            // filter out only the echo replies that match the our identifier and
            // expected sequence number.
            if (is && icmp_hdr.type() == icmp_header::echo_reply &&
                icmp_hdr.identifier() == get_identifier() &&
                icmp_hdr.sequence_number() == sequence_number_) {
                // If this is the first reply, interrupt the five second timeout.
                if (num_replies_++ == 0)
                    timer_.cancel();
    
                // Print out some information about the reply packet.
                chrono::steady_clock::time_point now = chrono::steady_clock::now();
                chrono::steady_clock::duration elapsed = now - time_sent_;
                _output
                    << length - ipv4_hdr.header_length() << " bytes from "
                    << ipv4_hdr.source_address()
                    << ": icmp_seq=" << icmp_hdr.sequence_number()
                    << ", ttl=" << ipv4_hdr.time_to_live() << ", time="
                    << chrono::duration_cast<chrono::milliseconds>(elapsed).count();
            } else start_receive();
        }
    
        static unsigned short get_identifier() {
    #if defined(ASIO_WINDOWS)
            return static_cast<unsigned short>(::GetCurrentProcessId());
    #else
            return static_cast<unsigned short>(::getpid());
    #endif
        }
    
        std::ostringstream _output;
    
        icmp::resolver resolver_;
        icmp::endpoint destination_;
        icmp::socket socket_;
        steady_timer timer_;
        unsigned short sequence_number_;
        chrono::steady_clock::time_point time_sent_;
        asio::streambuf reply_buffer_;
        std::size_t num_replies_;
    };
    
    #include <list>
    #include <iostream>
    int main(int argc, char** argv) {
        asio::io_context io_context;
    
        std::list<pinger> pingers;
        for (char const* arg : std::vector(argv+1, argv+argc)) {
            pingers.emplace_back(io_context, arg);
        }
    
        io_context.run();
    
        for (auto& p : pingers) {
            std::cout << p.get() << std::endl;
        }
    }
    

    现在例如的输出time sudo ./sotest 127.0.0.{1..100} 18.0.0.1 符合预期:

    32 bytes from 127.0.0.1: icmp_seq=1, ttl=64, time=8
    32 bytes from 127.0.0.2: icmp_seq=1, ttl=64, time=8
    32 bytes from 127.0.0.3: icmp_seq=1, ttl=64, time=8
    32 bytes from 127.0.0.4: icmp_seq=1, ttl=64, time=8
    ...
    32 bytes from 127.0.0.98: icmp_seq=1, ttl=64, time=0
    32 bytes from 127.0.0.99: icmp_seq=1, ttl=64, time=0
    32 bytes from 127.0.0.100: icmp_seq=1, ttl=64, time=0
    Request timed out
    

    ¹事实上,这很少/永远不是正确的工具

    ² 使用我的水晶球

    ³ 显然我们无权制作 ICMP 数据包,更不用说在 Wandbox 上发送它们了

    【讨论】:

    • TL;DR 使用 C++ 类型,注意生命周期,避免线程。此外,如果必须,请从最后一个列表中复制/粘贴(因为在此过程中进行了许多小改进)
    • 我尝试了类似的解决方案,但没有成功。现在也行不通。 ping 环回地址后跟 18.0.0.1 对我有用。但是在这种情况下,我得到了响应(其中第二个无法访问)。 sudo ./a.out 10.2.7.194 18.0.0.2 10.2.7.194 中的 32 个字节:icmp_seq=1, ttl=60, time=27 10.2.7.194 中的 32 个字节:icmp_seq=1, ttl=60, time=27跨度>
    • $ sudo ./a.out 10.2.7.194 18.0.0.2 # 不能正常工作 为什么?来自 10.2.7.194 的 32 个字节:icmp_seq=1, ttl=60, time=26 来自 10.2.7.194 的 32 个字节:icmp_seq=1, ttl=60, time=26 $ sudo ./a.out 127.0.0.1 18.0.0.2 # Good 32 bytes from 127.0.0.1: icmp_seq=1, ttl=64, time=0 Request timed out $ sudo ./a.out 10.2.4.190 18.0.0.2 #Good Both are unreachable Request timed out Request timed out
    • 我认为这里的问题是 ICMP 是无连接的 - 就像 UDP 一样,并且您收到同一个 IP 的多个响应。奇怪的是 /says/ from 10.2.7.194: icmp_seq=1 两次。这意味着您实际上也在同一进程中两次发送回显请求。我想你想告诉我确切的代码。为什么不像我那样把它贴在 wandbox 上呢?
    • 我只是复制粘贴了你的代码并用 gcc 编译。只是在 async_receive 中将占位符错误和 bytes_transferred 更改为 _1 和 _2。我没有改变任何其他东西。是否需要清除句柄接收中的任何内容。代码中是否存在概念错误。 (就像允许从一个进程一次 ping 一个目的地)
    猜你喜欢
    • 1970-01-01
    • 2012-02-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-11-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多