【问题标题】:Boost ASIO threadpool with a thread timeout使用线程超时提升 ASIO 线程池
【发布时间】:2016-08-18 18:13:20
【问题描述】:

我将 Boost ASIO 库用作线程池,这已被广泛描述。但是,如果线程处理时间超过 1 秒,我想中断每个线程,然后转到线程的下一个发布任务。

我可以使用单独的deadline_timer 轻松实现这一点,如果线程在deadline 之前完成或者如果任务执行时间过长会中断线程,它会被重置。但是我认为这将内置到 ASIO 中。因为有一个任务似乎很自然,网络操作超时。但我在 API 中看不到任何内容,要简洁地做到这一点。

谁能告诉我这个功能是否已经存在?或者我应该按照我描述的方式实现它?

【问题讨论】:

  • 截止时间计时器方法只会在线程未执行处理程序时中断线程。我假设(因为您将其用作线程池)您想要中断已执行超过 1 秒的处理程序。这要复杂得多(ASIO 库中没有直接支持这样做)。
  • 定义“中断”。您的意思是希望在完成处理程序运行时从处理程序本身外部终止它的执行吗?
  • @RichardHodges 是的,我可以在执行工作的线程上调用 interrupt() 并在我的代码中放置一个 boost::this_thread::interruption_point() 来促进这一点。
  • @Alex 在 asio 中并不容易。无法保证哪个线程将执行闭包。最好将上下文引用传递给函子并通过它进行通信。
  • 是的,我现在明白你的意思了。

标签: c++ multithreading boost boost-asio


【解决方案1】:

这是我拼凑起来的一个快速解决方案。

它要求您提交的函数对象接受exec_context 类型的参数。

io_service 中运行的任务可以查询.canceled() 访问器(它是原子的)以确定它是否应该提前取消。

然后它可以抛出异常或返回它想要返回的任何值。

调用者通过submit 函数提交。该函数用上下文对象包装工作函数,并将其返回值和/或异常编组到 std::future 中。

然后调用者可以适当地查询或等待这个未来(或忽略它)。

调用者得到一个句柄对象,它上面有方法cancel()。使用这个句柄,调用者可以取消、查询或等待提交的任务。

希望对您有所帮助。写起来很有趣。

#include <boost/asio.hpp>
#include <iostream>
#include <atomic>
#include <thread>
#include <chrono>
#include <future>
#include <stdexcept>
#include <exception>
#include <utility>
#include <type_traits>


//
// an object to allow the caller to communicate a cancellation request to the
// submitted task
//
struct exec_controller
{
    /// @returns previous cancellation request state;
    bool notify_cancel()
    {
        return _should_cancel.exchange(true);
    }

    bool should_cancel() const {
        return _should_cancel;
    }

private:
    std::atomic<bool> _should_cancel = { false };
};

template<class Ret>
struct exec_state : std::enable_shared_from_this<exec_state<Ret>>
{
    using return_type = Ret;

    bool notify_cancel() {
        return _controller.notify_cancel();
    }

    std::shared_ptr<exec_controller>
    get_controller_ptr() {
        return std::shared_ptr<exec_controller>(this->shared_from_this(),
                                                std::addressof(_controller));
    }

    std::promise<return_type>& promise() { return _promise; }

private:
    std::promise<return_type> _promise;
    exec_controller _controller;
};

struct applyer;

struct exec_context
{
    exec_context(std::shared_ptr<exec_controller> impl)
    : _impl(impl)
    {}

    bool canceled() const {
        return _impl->should_cancel();
    }

private:
    friend applyer;
    std::shared_ptr<exec_controller> _impl;
};

struct applyer
{
    template<class F, class Ret>
    void operator()(F& f, std::shared_ptr<exec_state<Ret>> const& p) const
    {
        try {
            p->promise().set_value(f(exec_context { p->get_controller_ptr() }));
        }
        catch(...) {
            p->promise().set_exception(std::current_exception());
        }
    }

    template<class F>
    void operator()(F& f, std::shared_ptr<exec_state<void>> const& p) const
    {
        try {
            f(exec_context { p->get_controller_ptr() });
            p->promise().set_value();
        }
        catch(...) {
            p->promise().set_exception(std::current_exception());
        }
    }
};

template<class Ret>
struct exec_result
{
    using return_type = Ret;
    exec_result(std::shared_ptr<exec_state<return_type>> p)
    : _impl(p)
    {}

    bool cancel() {
        return _impl->notify_cancel();
    }

    std::future<Ret>& get_future()
    {
        return _future;
    }

private:

    std::shared_ptr<exec_state<return_type>> _impl;
    std::future<return_type> _future { _impl->promise().get_future() };
};


template<class Executor, class F>
auto submit(Executor& exec, F&& f)
{
    using function_type = std::decay_t<F>;
    using result_type = std::result_of_t<function_type(exec_context)>;
    using state_type = exec_state<result_type>;
    auto shared_state = std::make_shared<state_type>();
    exec.post([shared_state, f = std::forward<F>(f)]
              {
                  applyer()(f, shared_state);
              });
    return exec_result<result_type>(std::move(shared_state));
}


int main()
{
    using namespace std::literals;

    boost::asio::io_service ios;
    boost::asio::io_service::strand strand(ios);
    boost::asio::io_service::work work(ios);

    std::thread runner([&] { ios.run(); });
    std::thread runner2([&] { ios.run(); });

    auto func = [](auto context)
    {
        for(int i = 0 ; i < 1000 ; ++i)
        {
            if (context.canceled())
                throw std::runtime_error("canceled");
            std::this_thread::sleep_for(100ms);
        }
    };

    auto handle = submit(strand, func);
    auto handle2 = submit(ios, [](auto context) { return 2 + 2; });
    // cancel the handle, or wait on it as you wish

    std::this_thread::sleep_for(1s);
    handle.cancel();
    handle2.cancel();   // prove that late cancellation is a nop
    try {
        std::cout << "2 + 2 is " << handle2.get_future().get() << std::endl;
    }
    catch(std::exception& e)
    {
        std::cerr << "failed to add 2 + 2 : " << e.what() << std::endl;
    }
    try {
        handle.get_future().get();
        std::cout << "task completed" << std::endl;
    }
    catch(std::exception const& e) {
        std::cout << "task threw exception: " << e.what() << std::endl;
    }

    ios.stop();
    runner.join();
    runner2.join();
}

更新:v2 为课程添加了一些隐私保护,演示了 2 个同时执行的任务。

预期输出:

2 + 2 is 4
task threw exception: canceled

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-08-26
    • 2011-12-18
    相关资源
    最近更新 更多