【发布时间】:2016-04-20 07:41:57
【问题描述】:
我正在使用boost::asio 构建网络服务,但我不确定线程安全性。
io_service.run() 只在 io_service 工作专用线程中调用一次
另一方面,send_message() 可以由后面提到的第二个 io_service 处理程序中的代码调用,也可以在用户交互时由 mainThread 调用。这就是我变得紧张的原因。
std::deque<message> out_queue;
// send_message will be called by two different threads
void send_message(MsgPtr msg){
while (out_queue->size() >= 20){
Sleep(50);
}
io_service_.post([this, msg]() { deliver(msg); });
}
// from my understanding, deliver will only be called by the thread which called io_service.run()
void deliver(const MsgPtr){
bool write_in_progress = !out_queue.empty();
out_queue.push_back(msg);
if (!write_in_progress)
{
write();
}
}
void write()
{
auto self(shared_from_this());
asio::async_write(socket_,
asio::buffer(out_queue.front().header(),
message::header_length), [this, self](asio::error_code ec, std::size_t/)
{
if (!ec)
{
asio::async_write(socket_,
asio::buffer(out_queue.front().data(),
out_queue.front().paddedPayload_size()),
[this, self](asio::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
out_queue.pop_front();
if (!out_queue.empty())
{
write();
}
}
});
}
});
}
这种情况安全吗?
类似的第二种情况:当网络线程收到一条消息时,它会将它们发布到另一个asio::io_service,该asio::io_service 也由它自己的专用线程运行。此 io_service 使用 std::unordered_map 来存储回调函数等。
std::unordered_map<int, eventSink> eventSinkMap_;
//...
// called by the main thread (GUI), writes a callback function object to the map
int IOReactor::registerEventSink(std::function<void(int, std::shared_ptr<message>)> fn, QObject* window, std::string endpointId){
util::ScopedLock lock(&sync_);
eventSink es;
es.id = generateRandomId();
// ....
std::pair<int, eventSink> eventSinkPair(es.id, es);
eventSinkMap_.insert(eventSinkPair);
return es.id;
}
// called by the second thread, the network service thread when a message was received
void IOReactor::onMessageReceived(std::shared_ptr<message> msg, ConPtr con)
{
reactor_io_service_.post([=](){ handleReceive(msg, con); });
}
// should be called only by the one thread running the reactor_io_service.run()
// read and write access to the map
void IOReactor::handleReceive(std::shared_ptr<message> msg, ConPtr con){
util::ScopedLock lock(&sync_);
auto es = eventSinkMap_.find(msg.requestId);
if (es != eventSinkMap_.end())
{
auto fn = es->second.handler;
auto ctx = es->second.context;
QMetaObject::invokeMethod(ctx, "runInMainThread", Qt::QueuedConnection, Q_ARG(std::function<void(int, std::shared_ptr<msg::IMessage>)>, fn), Q_ARG(int, CallBackResult::SUCCESS), Q_ARG(std::shared_ptr<msg::IMessage>, msg));
eventSinkMap_.erase(es);
}
首先:我什至需要在这里使用锁吗?
Ofc 两种方法都访问地图,但它们访问的不是相同的元素(receiveHandler 无法尝试访问或读取尚未注册/插入到地图中的元素)。那是线程安全的吗?
【问题讨论】:
标签: c++ multithreading boost-asio