你不需要std::async¹。
但是从您展示的一小段代码中,我可以猜测²您的错误正在返回原始char const*。他们引用pinger 中的数据的可能性很大,当未来完成时——显然——不再有效(pinger 将超出范围)。
发生这种情况的典型方法是,如果您将输出存储在 std::string 成员中,并使用 .c_str() 从 get() 返回。
如果get() 简单地返回像return "unreachable" 这样的字符串文字,它会“工作”于无法访问的目标的一个原因,它不会出现上述的生命周期问题。
放弃水晶球
所以,想象一下返回结果的正确方法:
Live On Wandbox³
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
namespace asio = boost::asio;
#include "icmp_header.hpp"
#include "ipv4_header.hpp"
using asio::steady_timer;
using asio::ip::icmp;
namespace chrono = asio::chrono;
class pinger {
public:
pinger(asio::io_context& io_context, const char* destination)
: resolver_(io_context), socket_(io_context, icmp::v4()),
timer_(io_context), sequence_number_(0), num_replies_(0) {
destination_ = *resolver_.resolve(icmp::v4(), destination, "").begin();
start_send();
start_receive();
}
std::string get() { auto r = _output.str(); _output.str(""); return r; }
private:
void start_send() {
std::string body("\"Hello!\" from Asio ping.");
// Create an ICMP header for an echo request.
icmp_header echo_request;
echo_request.type(icmp_header::echo_request);
echo_request.code(0);
echo_request.identifier(get_identifier());
echo_request.sequence_number(++sequence_number_);
compute_checksum(echo_request, body.begin(), body.end());
// Encode the request packet.
asio::streambuf request_buffer;
std::ostream os(&request_buffer);
os << echo_request << body;
// Send the request.
time_sent_ = steady_timer::clock_type::now();
socket_.send_to(request_buffer.data(), destination_);
// Wait up to five seconds for a reply.
num_replies_ = 0;
timer_.expires_at(time_sent_ + chrono::seconds(5));
timer_.async_wait(boost::bind(&pinger::handle_timeout, this));
}
void handle_timeout() {
if (num_replies_ == 0)
_output << "Request timed out";
//// Requests must be sent no less than one second apart.
//timer_.expires_at(time_sent_ + chrono::seconds(1));
//timer_.async_wait(boost::bind(&pinger::start_send, this));
}
void start_receive() {
// Discard any data already in the buffer.
reply_buffer_.consume(reply_buffer_.size());
// Wait for a reply. We prepare the buffer to receive up to 64KB.
socket_.async_receive(reply_buffer_.prepare(65536),
boost::bind(&pinger::handle_receive, this,
boost::placeholders::_2));
}
void handle_receive(std::size_t length) {
// The actual number of bytes received is committed to the buffer so
// that we can extract it using a std::istream object.
reply_buffer_.commit(length);
// Decode the reply packet.
std::istream is(&reply_buffer_);
ipv4_header ipv4_hdr;
icmp_header icmp_hdr;
is >> ipv4_hdr >> icmp_hdr;
// We can receive all ICMP packets received by the host, so we need to
// filter out only the echo replies that match the our identifier and
// expected sequence number.
if (is && icmp_hdr.type() == icmp_header::echo_reply &&
icmp_hdr.identifier() == get_identifier() &&
icmp_hdr.sequence_number() == sequence_number_) {
// If this is the first reply, interrupt the five second timeout.
if (num_replies_++ == 0)
timer_.cancel();
// Print out some information about the reply packet.
chrono::steady_clock::time_point now = chrono::steady_clock::now();
chrono::steady_clock::duration elapsed = now - time_sent_;
_output
<< length - ipv4_hdr.header_length() << " bytes from "
<< ipv4_hdr.source_address()
<< ": icmp_seq=" << icmp_hdr.sequence_number()
<< ", ttl=" << ipv4_hdr.time_to_live() << ", time="
<< chrono::duration_cast<chrono::milliseconds>(elapsed).count();
}
//start_receive();
}
static unsigned short get_identifier() {
#if defined(ASIO_WINDOWS)
return static_cast<unsigned short>(::GetCurrentProcessId());
#else
return static_cast<unsigned short>(::getpid());
#endif
}
std::ostringstream _output;
icmp::resolver resolver_;
icmp::endpoint destination_;
icmp::socket socket_;
steady_timer timer_;
unsigned short sequence_number_;
chrono::steady_clock::time_point time_sent_;
asio::streambuf reply_buffer_;
std::size_t num_replies_;
};
std::string ping1(const char* destination) {
asio::io_context io_context;
pinger p(io_context, destination);
io_context.run();
return p.get();
}
#include <list>
#include <iostream>
int main(int argc, char** argv) {
std::list<std::future<std::string> > futures;
for (char const* arg : std::vector(argv+1, argv+argc)) {
futures.push_back(std::async(std::launch::async, ping1, arg));
}
for (auto& f : futures) {
std::cout << f.get() << std::endl;
}
}
如您所见,我制作了目的地命令行参数列表。因此,当我像这样运行它时:
sudo ./sotest 127.0.0.{1..100} |& sort | uniq -c
我得到这个输出:
1 32 bytes from 127.0.0.12: icmp_seq=1, ttl=64, time=0
1 32 bytes from 127.0.0.16: icmp_seq=1, ttl=64, time=0
7 32 bytes from 127.0.0.44: icmp_seq=1, ttl=64, time=0
1 32 bytes from 127.0.0.77: icmp_seq=1, ttl=64, time=1
1 32 bytes from 127.0.0.82: icmp_seq=1, ttl=64, time=1
1 32 bytes from 127.0.0.9: icmp_seq=1, ttl=64, time=0
88 Request timed out
我实际上不确定为什么会有这么多超时,但关键是现在的代码正确。此代码运行并完成 UBSan/ASan 清理。不过,请参阅下文了解稍后发现的修复程序
现在,让我们放弃未来
未来可能会产生大量开销。事实上,每个 ping 都有一个 io_service。让我们一个人完成。
#include <list>
#include <iostream>
int main(int argc, char** argv) {
asio::io_context io_context;
std::list<pinger> pingers;
for (char const* arg : std::vector(argv+1, argv+argc)) {
pingers.emplace_back(io_context, arg);
}
io_context.run();
for (auto& p : pingers) {
std::cout << p.get() << std::endl;
}
}
注意这里的同步点是io_context.run(),和以前一样,只是现在它在主线程上一次性运行所有的ping。
纠正取消
所以,我现在注意到了为什么这么多 ping 被误认为无法访问。
原因是因为handle_receive 需要过滤掉不响应我们 ping 的 ICMP 回复,所以如果发生这种情况,我们需要继续 start_receive() 直到我们得到它:
void start_receive() {
// Discard any data already in the buffer.
reply_buffer_.consume(reply_buffer_.size());
// Wait for a reply. We prepare the buffer to receive up to 64KB.
socket_.async_receive(reply_buffer_.prepare(65536),
boost::bind(&pinger::handle_receive, this,
boost::asio::placeholders::error(),
boost::asio::placeholders::bytes_transferred()));
}
void handle_receive(boost::system::error_code ec, std::size_t length) {
if (ec) {
if (ec == boost::asio::error::operation_aborted) {
_output << "Request timed out";
} else {
_output << "error: " << ec.message();
}
return;
}
// The actual number of bytes received is committed to the buffer so
// that we can extract it using a std::istream object.
reply_buffer_.commit(length);
// Decode the reply packet.
std::istream is(&reply_buffer_);
ipv4_header ipv4_hdr;
icmp_header icmp_hdr;
is >> ipv4_hdr >> icmp_hdr;
// We can receive all ICMP packets received by the host, so we need to
// filter out only the echo replies that match the our identifier and
// expected sequence number.
if (is && icmp_hdr.type() == icmp_header::echo_reply &&
icmp_hdr.identifier() == get_identifier() &&
icmp_hdr.sequence_number() == sequence_number_) {
// If this is the first reply, interrupt the five second timeout.
if (num_replies_++ == 0)
timer_.cancel();
// Print out some information about the reply packet.
chrono::steady_clock::time_point now = chrono::steady_clock::now();
chrono::steady_clock::duration elapsed = now - time_sent_;
_output
<< length - ipv4_hdr.header_length() << " bytes from "
<< ipv4_hdr.source_address()
<< ": icmp_seq=" << icmp_hdr.sequence_number()
<< ", ttl=" << ipv4_hdr.time_to_live() << ", time="
<< chrono::duration_cast<chrono::milliseconds>(elapsed).count();
} else start_receive();
}
现在,handle_timeout 可以简化为:
void handle_timeout() {
if (num_replies_ == 0) {
socket_.cancel(); // _output is set in response to error_code
}
}
事实上,我们可能会简化以完全删除num_replies,但我将把它留给读者作为驱魔
完整演示
Live On Wandbox
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
namespace asio = boost::asio;
#include "icmp_header.hpp"
#include "ipv4_header.hpp"
using asio::steady_timer;
using asio::ip::icmp;
namespace chrono = asio::chrono;
class pinger {
public:
pinger(asio::io_context& io_context, const char* destination)
: resolver_(io_context), socket_(io_context, icmp::v4()),
timer_(io_context), sequence_number_(0), num_replies_(0) {
destination_ = *resolver_.resolve(icmp::v4(), destination, "").begin();
start_send();
start_receive();
}
std::string get() { auto r = _output.str(); _output.str(""); return r; }
private:
void start_send() {
std::string body("\"Hello!\" from Asio ping.");
// Create an ICMP header for an echo request.
icmp_header echo_request;
echo_request.type(icmp_header::echo_request);
echo_request.code(0);
echo_request.identifier(get_identifier());
echo_request.sequence_number(++sequence_number_);
compute_checksum(echo_request, body.begin(), body.end());
// Encode the request packet.
asio::streambuf request_buffer;
std::ostream os(&request_buffer);
os << echo_request << body;
// Send the request.
time_sent_ = steady_timer::clock_type::now();
socket_.send_to(request_buffer.data(), destination_);
// Wait up to five seconds for a reply.
num_replies_ = 0;
timer_.expires_at(time_sent_ + chrono::seconds(5));
timer_.async_wait(boost::bind(&pinger::handle_timeout, this));
}
void handle_timeout() {
if (num_replies_ == 0) {
socket_.cancel(); // _output is set in response to error_code
}
}
void start_receive() {
// Discard any data already in the buffer.
reply_buffer_.consume(reply_buffer_.size());
// Wait for a reply. We prepare the buffer to receive up to 64KB.
socket_.async_receive(reply_buffer_.prepare(65536),
boost::bind(&pinger::handle_receive, this,
boost::asio::placeholders::error(),
boost::asio::placeholders::bytes_transferred()));
}
void handle_receive(boost::system::error_code ec, std::size_t length) {
if (ec) {
if (ec == boost::asio::error::operation_aborted) {
_output << "Request timed out";
} else {
_output << "error: " << ec.message();
}
return;
}
// The actual number of bytes received is committed to the buffer so
// that we can extract it using a std::istream object.
reply_buffer_.commit(length);
// Decode the reply packet.
std::istream is(&reply_buffer_);
ipv4_header ipv4_hdr;
icmp_header icmp_hdr;
is >> ipv4_hdr >> icmp_hdr;
// We can receive all ICMP packets received by the host, so we need to
// filter out only the echo replies that match the our identifier and
// expected sequence number.
if (is && icmp_hdr.type() == icmp_header::echo_reply &&
icmp_hdr.identifier() == get_identifier() &&
icmp_hdr.sequence_number() == sequence_number_) {
// If this is the first reply, interrupt the five second timeout.
if (num_replies_++ == 0)
timer_.cancel();
// Print out some information about the reply packet.
chrono::steady_clock::time_point now = chrono::steady_clock::now();
chrono::steady_clock::duration elapsed = now - time_sent_;
_output
<< length - ipv4_hdr.header_length() << " bytes from "
<< ipv4_hdr.source_address()
<< ": icmp_seq=" << icmp_hdr.sequence_number()
<< ", ttl=" << ipv4_hdr.time_to_live() << ", time="
<< chrono::duration_cast<chrono::milliseconds>(elapsed).count();
} else start_receive();
}
static unsigned short get_identifier() {
#if defined(ASIO_WINDOWS)
return static_cast<unsigned short>(::GetCurrentProcessId());
#else
return static_cast<unsigned short>(::getpid());
#endif
}
std::ostringstream _output;
icmp::resolver resolver_;
icmp::endpoint destination_;
icmp::socket socket_;
steady_timer timer_;
unsigned short sequence_number_;
chrono::steady_clock::time_point time_sent_;
asio::streambuf reply_buffer_;
std::size_t num_replies_;
};
#include <list>
#include <iostream>
int main(int argc, char** argv) {
asio::io_context io_context;
std::list<pinger> pingers;
for (char const* arg : std::vector(argv+1, argv+argc)) {
pingers.emplace_back(io_context, arg);
}
io_context.run();
for (auto& p : pingers) {
std::cout << p.get() << std::endl;
}
}
现在例如的输出time sudo ./sotest 127.0.0.{1..100} 18.0.0.1 符合预期:
32 bytes from 127.0.0.1: icmp_seq=1, ttl=64, time=8
32 bytes from 127.0.0.2: icmp_seq=1, ttl=64, time=8
32 bytes from 127.0.0.3: icmp_seq=1, ttl=64, time=8
32 bytes from 127.0.0.4: icmp_seq=1, ttl=64, time=8
...
32 bytes from 127.0.0.98: icmp_seq=1, ttl=64, time=0
32 bytes from 127.0.0.99: icmp_seq=1, ttl=64, time=0
32 bytes from 127.0.0.100: icmp_seq=1, ttl=64, time=0
Request timed out
¹事实上,这很少/永远不是正确的工具
² 使用我的水晶球
³ 显然我们无权制作 ICMP 数据包,更不用说在 Wandbox 上发送它们了