【发布时间】:2016-03-26 03:29:25
【问题描述】:
我在使用 boost asio 发送多个背靠背单独的 UDP 缓冲区时遇到问题。我有一个 1 秒的 asio 计时器,它触发一个通过 udp 传输 2 个单独的 UDP 数据报结构的回调。这些消息结构中的每一个都是通过 std::unique_ptr 分配的,因此在调用 async CADaemon::handle_send 回调时它们不应超出范围。
void
CADaemon::heartBeatTimer(
const milliseconds& rHeartBeatMs)
{
mpStatusTimer->expires_from_now(rHeartBeatMs);
mpStatusTimer->async_wait(boost::bind(
&CADaemon::heartBeatTimer,
this, rHeartBeatMs));
if (mpALBFSocket && mpALBFEndpoint) {
mpALBFSocket->async_send_to(
buffer(mpStatusMessage.get(),
sizeof(MemberSystemStatusMessage)),
*mpALBFEndpoint,
boost::bind(&CADaemon::handle_send, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
// must insert delay to prevent buffer overwrites
std::this_thread::sleep_for(std::chrono::milliseconds(10);
// heartbeat messages are also sent to this socket/endpoint
mpALBFSocket->async_send_to(
buffer(mpHeartbeatMessage.get(),
sizeof(CAServiceHeartbeatMessage)),
*mpALBFEndpoint,
boost::bind(&CADaemon::handle_send, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
如果我在发送第一条消息和第二条消息之间稍加延迟,接收应用程序就可以工作,但是,如果我按原样发送它们,第二个缓冲区似乎会在它到达时覆盖第一个缓冲区接收申请。
我做错了什么?
我还尝试使用下面的代码发送多个缓冲区,但是由于它将两个数据报合并为一个长数据报,因此表现更差。
void
CADaemon::heartBeatTimer(
const milliseconds& rHeartBeatMs)
{
mpStatusTimer->expires_from_now(rHeartBeatMs);
mpStatusTimer->async_wait(boost::bind(
&CADaemon::heartBeatTimer,
this, rHeartBeatMs));
if (mpALBFSocket && mpALBFEndpoint) {
std::vector<boost::asio::const_buffer> transmitBuffers;
transmitBuffers.push_back(buffer(
mpStatusMessage.get(),
sizeof(MemberSystemStatusMessage)));
//transmitBuffers.push_back(buffer(
// mpHeartbeatMessage.get(),
// sizeof(CAServiceHeartbeatMessage)));
mpALBFSocket->async_send_to(
transmitBuffers, *mpALBFEndpoint,
boost::bind(&CADaemon::handle_send, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
以下是关联头文件中涉及到 ASIO 的类的成员。
// this message is transmitted @1HZ
std::unique_ptr<MemberSystemStatusMessage> mpStatusMessage;
// this message is transmitted @1HZ
std::unique_ptr<CAServiceHeartbeatMessage> mpHeartbeatMessage;
// this message is received @1HZ
std::unique_ptr<WOperationalSupportMessage> mpOpSupportMessage;
// this message is received @1HZ when valid
std::unique_ptr<MaintenanceOTPMessage> mpOTPMessage;
std::shared_ptr<boost::asio::io_service> mpIOService;
std::unique_ptr<boost::asio::ip::udp::socket> mpALBFSocket;
std::unique_ptr<boost::asio::ip::udp::endpoint> mpALBFEndpoint;
std::unique_ptr<boost::asio::ip::udp::socket> mpServerSocket;
std::unique_ptr<boost::asio::ip::udp::endpoint> mpServerEndpoint;
std::unique_ptr<boost::asio::steady_timer> mpStatusTimer;
std::unique_ptr<uint8_t[]> mpReceiveBuffer;
这是回调处理程序
void
CADaemon::handle_send(
const boost::system::error_code& error,
std::size_t bytes_transferred)
{
static auto& gEvtLog = gpLogger->getLoggerRef(
Logger::LogDest::EventLog);
if (!error || (error == boost::asio::error::message_size)) {
// Critical Section - exclusive write
boost::unique_lock<boost::shared_mutex> uniqueLock(gRWMutexGuard);
LOG_EVT_INFO(gEvtLog) << *mpStatusMessage;
LOG_EVT_INFO(gEvtLog) << *mpHeartbeatMessage;
LOG_EVT_INFO(gEvtLog) << "Sent " << bytes_transferred << " bytes";
mpStatusMessage->incrementSequenceCounter();
} else {
LOG_EVT_ERROR(gEvtLog) << "handle_send: asio error code["
<< error.value() << "]";
}
}
编辑:添加了带有缓冲区损坏的接收 JAVA 应用程序代码
下面的代码显示了接收java应用程序中的代码,注意接收到的数据报的大小永远不会损坏,只是内容,大小似乎总是更长的数据报的大小。希望这对帮助追查问题很有用。
@Override
protected Task<Void> createTask() {
return new Task<Void>() {
@Override
protected Void call() throws Exception {
updateMessage("Running...");
try {
DatagramSocket serverSocket = new DatagramSocket(mPortNum);
// allocate space for received datagrams
byte[] bytes = new byte[1024];
DatagramPacket packet = new DatagramPacket(bytes, bytes.length);
while (!isCancelled()) {
serverSocket.receive(packet);
int bytesReceived = packet.getLength();
MemberSystemStatusMessage statusMessage =
new MemberSystemStatusMessage();
int statusMessageSize = statusMessage.size();
CAServiceHeartbeatMessage heartbeatMessage =
new CAServiceHeartbeatMessage();
int heartbeatMessageSize = heartbeatMessage.size();
if (Platform.isFxApplicationThread()) {
if (bytesReceived == statusMessage.size()) {
statusMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
setMemberSystemMessage(statusMessage);
} else if (bytesReceived == heartbeatMessage.size()){
heartbeatMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
setHeartbeatMessage(heartbeatMessage);
} else {
System.out.println("unexpected datagram");
}
} else { // update later in FxApplicationThread
if (bytesReceived == statusMessage.size()) {
statusMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
Platform.runLater(() -> setMemberSystemMessage(statusMessage));
} else if (bytesReceived == heartbeatMessage.size()){
heartbeatMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0);
Platform.runLater(() -> setHeartbeatMessage(heartbeatMessage));
} else {
System.out.println("unexpected datagram");
}
}
}
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
updateMessage("Cancelled");
return null;
}
};
}
}
【问题讨论】:
标签: c++ asynchronous boost boost-asio