【问题标题】:multiple back to back boost::asio async_send_to calls cause buffer overun多个背靠背 boost::asio async_send_to 调用导致缓冲区溢出
【发布时间】:2016-03-26 03:29:25
【问题描述】:

我在使用 boost asio 发送多个背靠背单独的 UDP 缓冲区时遇到问题。我有一个 1 秒的 asio 计时器,它触发一个通过 udp 传输 2 个单独的 UDP 数据报结构的回调。这些消息结构中的每一个都是通过 std::unique_ptr 分配的,因此在调用 async CADaemon::handle_send 回调时它们不应超出范围。

void
CADaemon::heartBeatTimer(
    const milliseconds& rHeartBeatMs)
{
    mpStatusTimer->expires_from_now(rHeartBeatMs);
    mpStatusTimer->async_wait(boost::bind(
        &CADaemon::heartBeatTimer,
        this, rHeartBeatMs));
    if (mpALBFSocket && mpALBFEndpoint) {
        mpALBFSocket->async_send_to(
            buffer(mpStatusMessage.get(),
                sizeof(MemberSystemStatusMessage)),
            *mpALBFEndpoint,
            boost::bind(&CADaemon::handle_send, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));


        // must insert delay to prevent buffer overwrites
        std::this_thread::sleep_for(std::chrono::milliseconds(10);

        // heartbeat messages are also sent to this socket/endpoint
        mpALBFSocket->async_send_to(
            buffer(mpHeartbeatMessage.get(),
                sizeof(CAServiceHeartbeatMessage)),
            *mpALBFEndpoint,
            boost::bind(&CADaemon::handle_send, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
    }
}

如果我在发送第一条消息和第二条消息之间稍加延迟,接收应用程序就可以工作,但是,如果我按原样发送它们,第二个缓冲区似乎会在它到达时覆盖第一个缓冲区接收申请。

我做错了什么?

我还尝试使用下面的代码发送多个缓冲区,但是由于它将两个数据报合并为一个长数据报,因此表现更差。

void
CADaemon::heartBeatTimer(
    const milliseconds& rHeartBeatMs)
{
    mpStatusTimer->expires_from_now(rHeartBeatMs);
    mpStatusTimer->async_wait(boost::bind(
        &CADaemon::heartBeatTimer,
        this, rHeartBeatMs));
    if (mpALBFSocket && mpALBFEndpoint) {
        std::vector<boost::asio::const_buffer> transmitBuffers;
        transmitBuffers.push_back(buffer(
            mpStatusMessage.get(), 
            sizeof(MemberSystemStatusMessage)));
        //transmitBuffers.push_back(buffer(
        //    mpHeartbeatMessage.get(), 
        //    sizeof(CAServiceHeartbeatMessage)));
        mpALBFSocket->async_send_to(
            transmitBuffers, *mpALBFEndpoint,
            boost::bind(&CADaemon::handle_send, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
    }
}

以下是关联头文件中涉及到 ASIO 的类的成员。

// this message is transmitted @1HZ
std::unique_ptr<MemberSystemStatusMessage> mpStatusMessage;
// this message is transmitted @1HZ
std::unique_ptr<CAServiceHeartbeatMessage> mpHeartbeatMessage;
// this message is received @1HZ
std::unique_ptr<WOperationalSupportMessage> mpOpSupportMessage;
// this message is received @1HZ when valid
std::unique_ptr<MaintenanceOTPMessage> mpOTPMessage;

std::shared_ptr<boost::asio::io_service> mpIOService;
std::unique_ptr<boost::asio::ip::udp::socket> mpALBFSocket;
std::unique_ptr<boost::asio::ip::udp::endpoint> mpALBFEndpoint;
std::unique_ptr<boost::asio::ip::udp::socket> mpServerSocket;
std::unique_ptr<boost::asio::ip::udp::endpoint> mpServerEndpoint;
std::unique_ptr<boost::asio::steady_timer> mpStatusTimer;
std::unique_ptr<uint8_t[]> mpReceiveBuffer;

这是回调处理程序

void
CADaemon::handle_send(
    const boost::system::error_code& error,
    std::size_t bytes_transferred)
{
    static auto& gEvtLog = gpLogger->getLoggerRef(
        Logger::LogDest::EventLog);
    if (!error || (error == boost::asio::error::message_size)) {
        // Critical Section - exclusive write
        boost::unique_lock<boost::shared_mutex> uniqueLock(gRWMutexGuard);
        LOG_EVT_INFO(gEvtLog) << *mpStatusMessage;
        LOG_EVT_INFO(gEvtLog) << *mpHeartbeatMessage;
        LOG_EVT_INFO(gEvtLog) << "Sent " << bytes_transferred << " bytes";
        mpStatusMessage->incrementSequenceCounter();
    } else {
        LOG_EVT_ERROR(gEvtLog) << "handle_send: asio error code["
            << error.value() << "]";
    }
}

编辑:添加了带有缓冲区损坏的接收 JAVA 应用程序代码

下面的代码显示了接收java应用程序中的代码,注意接收到的数据报的大小永远不会损坏,只是内容,大小似乎总是更长的数据报的大小。希望这对帮助追查问题很有用。

    @Override
    protected Task<Void> createTask() {
        return new Task<Void>() {
            @Override
            protected Void call() throws Exception {
                updateMessage("Running...");
                try {
                    DatagramSocket serverSocket = new DatagramSocket(mPortNum);
                    // allocate space for received datagrams
                    byte[] bytes = new byte[1024];
                    DatagramPacket packet = new DatagramPacket(bytes, bytes.length);                    
                    while (!isCancelled()) {                    
                        serverSocket.receive(packet);
                        int bytesReceived = packet.getLength();
                        MemberSystemStatusMessage statusMessage = 
                            new MemberSystemStatusMessage();
                        int statusMessageSize = statusMessage.size();
                        CAServiceHeartbeatMessage heartbeatMessage = 
                            new CAServiceHeartbeatMessage();
                        int heartbeatMessageSize = heartbeatMessage.size();
                        if (Platform.isFxApplicationThread()) {
                            if (bytesReceived == statusMessage.size()) {
                                statusMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
                                setMemberSystemMessage(statusMessage);
                            } else if (bytesReceived == heartbeatMessage.size()){
                                heartbeatMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
                                setHeartbeatMessage(heartbeatMessage);
                            } else {
                                System.out.println("unexpected datagram");
                            }
                        } else { // update later in FxApplicationThread
                            if (bytesReceived == statusMessage.size()) {
                                statusMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
                                Platform.runLater(() -> setMemberSystemMessage(statusMessage));
                            } else if (bytesReceived == heartbeatMessage.size()){
                                heartbeatMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
                                Platform.runLater(() -> setHeartbeatMessage(heartbeatMessage));
                            } else {
                                System.out.println("unexpected datagram");
                            }
                        }
                    }
                } catch (Exception ex) {
                    System.out.println(ex.getMessage());
                }
                updateMessage("Cancelled");
                return null;
            } 
        };
    }
}

【问题讨论】:

    标签: c++ asynchronous boost boost-asio


    【解决方案1】:

    只要缓冲区的大小正确并且缓冲区的底层内存在调用处理程序之前保持有效,代码看起来就很好。对于给定的 I/O 对象,可以安全地启动多个非组合异步操作,例如 async_send_to()。不过,这些操作的执行顺序并没有明确说明。

    接收方应用程序有一个共享字节数组,数据报被读取到该数组中。如果接收到两个数据报,并且发生了两次读取操作,那么缓冲区将包含最后读取的数据报的内容。根据所提供的代码,这可能会由于 Runnables 而创建一个竞争条件,该条件将在未来未指定的时间被调用。例如,考虑发送两个数据报的场景,第一个包含系统消息,第二个包含心跳消息。在以下代码中:

    byte[] bytes = new byte[1024];
    DatagramPacket packet = new DatagramPacket(bytes, bytes.length);
    while (...)
    {
      serverSocket.receive(packet);
      int bytesReceived = packet.getLength();
      MemberSystemStatusMessage statusMessage =  ...;
      CAServiceHeartbeatMessage heartbeatMessage =  ...;
      if (bytesReceived == statusMessage.size())
      {
        statusMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
        Platform.runLater(() -> setMemberSystemMessage(statusMessage));
      }
      ...
    }
    

    在第一次循环循环之后,bytes 包含一条状态消息,statusMessage 对象引用bytes 缓冲区。 Runnable 已被安排在未来未指定的时间运行。在读取第二个数据报时,bytes 缓冲区包含一个心跳消息。 Runnable 现在运行,将statusMessage 对象传递给setMemberSystemMessage();但是,它的底层缓冲区现在包含一个心跳消息。要解决此问题,请考虑在需要延迟执行时对字节数组进行深度复制:

    if (bytesReceived == statusMessage.size())
    {
      byte[] bytes_copy = Arrays.copyOf(bytes, bytesReceived);
      statusMessage.setByteBuffer(ByteBuffer.wrap(bytes_copy), 0);
      Platform.runLater(() -> setMemberSystemMessage(statusMessage));
    }
    

    或者,可以为每个读取操作使用一个新缓冲区。

    对底层协议的期望也可能存在问题。 UDP 被称为不可靠协议,因为它不向发送者提供有关数据报传递的通知。每个async_send_to() 操作将导致最多传输一个数据报。完成处理程序的状态指示数据是否已写入,如果数据报已被接收,则表示没有状态。即使通过scatter-gather I/O 提供多个缓冲区也是如此。因此,协议允许问题中描述的场景,其中启动了两个async_send_to() 操作,但接收者只收到一个数据报。应用程序协议应该考虑这种行为。例如,不是在错过单个心跳最后期限后报告错误,而是在连续数量的错过的心跳最后期限超过阈值时,接收者可以报告错误。在写入之间添加一个小的延迟不能保证协议的行为。

    【讨论】:

    • 如果 Boost.Asio 要匹配 TR2 提案,那么似乎指定了顺序。我同意该协议允许丢弃数据包,但是,在这种情况下,接收应用程序中的错误也是可能的。
    • @TannerSansbury 感谢您坚持并发现问题实际上是客户,做得好!
    • @TannerSansbury 我想知道你是否可以看一个后续问题stackoverflow.com/questions/36626870/… - 我将我的 java 代码更改为使用 NIO,但我从来没有真正理解为什么我必须在下一条消息之前快速复制字节损坏了现有的接收缓冲区字节。我认为给定一个足够大的接收缓冲区,下一次写入接收缓冲区不会破坏前一个缓冲区
    • @johnco3 原始代码只有一个缓冲区,每个数据报都被读入缓冲区的开头(例如bytes[0])。缓冲区的大小不会影响此行为。缓冲区不表现为流(例如,在先前读取的数据结束时读入缓冲区)。
    【解决方案2】:

    更新:

    正如 Tanner Sansbury 所说,这个答案很可能是错误的。我把它留在这里,以供人们寻找同时对async_send_to 进行多次呼叫是否有效这一问题的答案。答案似乎是“是”。

    原文:

    这段代码的问题是第二次调用async_send_to() 没有等待第一次调用完成。您应该从第一个完成处理程序中对async_send_to() 进行第二次调用,假设没有错误。

    在您的第二个示例中将两个缓冲区合并为一个数据报是预期行为。

    【讨论】:

    • 可以安全地在单个套接字上启动多个 async_send_to() 操作,同时在同一个套接字上存在待处理的 async_send_to() 操作 (demo)。
    • @TannerSansbury 很好的演示,但是我认为做我正在做的事情(你也在演示中做的)是无效的,即一个接一个地发送多个异步写入而不等待每个要先收到。我做了与您类似的事情-它似乎可以工作,但是当我检查它们时缓冲区已损坏,我将在 Windows 上尝试您的演示以查看其行为是否相同(我需要修改缓冲区内容,因为它们不是检查您的演示)。我的结果是通过带有 Boost v1.60 的 Visual Studio 2015
    • @TannerSansbury 很有趣。该演示可能适用于您的平台,但不能证明跨平台的 API 行为。我刚刚花了一些时间试图确定 API 承诺的性质;我的回答很可能是错误的。到目前为止,我发现的唯一语句是 comments.gmane.org/gmane.comp.lib.boost.asio.user/5403,它说“请记住,如果您使用 async_send_to(),则在第一个 async_send_to() 发出信号完成之前,您不能在同一个套接字上再次调用它。”这与我对 ASIO 的使用是一致的。我将快速浏览一下实现...
    • @janm 一般来说,Asio 文档说明了明确的限制,而不是支持的用例。例如,Asio 的文档没有明确说明可以同时挂起async_readasync_write 操作,但它确实指出,当async_read 挂起时,不应启动其他读取操作。我很少使用这个功能,但据我所知,它从 Boost.Asio 1.35 开始支持非 I/O 完成端口解复用器和 Boost.Asio 1.42.0 支持 I/O 完成端口。
    • @TannerSansbury 是的,总的来说是这样。然而,它并不总是(根据我的经验)列出所有明确的限制,因此偶尔会出现这样的发现练习。我注意到网络库提案的修订版 6 确实明确指出允许多个异步操作,正如您所注意到的,Boost.Asio 隐含了允许同时允许 async_readasync_write 操作(我使用的一个功能频繁地)。无论如何,我在async_send_to 上错了;这将导致这里和那里的一些代码修改。
    猜你喜欢
    • 2013-02-10
    • 2018-07-26
    • 2013-05-27
    • 2021-06-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多