首页 文章详情

C++异步:asio的coroutine实现!

云加社区 | 619 2022-08-25 13:51 0 0 0
UniSMS (合一短信)


导语 | 在c++20标准正式推出后,asio也对c++20 coroutine做了支持,虽然推出时间尚短,有一些不那么尽善尽美的地方,但其中的一些点还是值得我们学习的。asio最新版本的协程实现有两版,本文我们主要以作者在《Why C++20 is the Awesomest Language for Network Programming》中演示的那版来进行内容的展开。我们先从一个实际的使用示例出发,来逐步了解asio coroutine的整体设计。


一、asio协程的简单示例


大部分时候我们使用asio更多的是将它用作一个网络库,但实际上asio本身对通用任务的支持做得也是非常棒的。利用c++11引入的lambda和函数对象,我们的通用任务可以很好的包装成lambda之后post()到某个io_context上,然后在io_context::run()的时候执行。执行流程如下图所示:


asio::awaitable<int> coro_test(asio::io_context& ctx) {  asio::steady_timer timer(ctx);  timer.expires_after(1s);  co_await timer.async_wait(asio::use_awaitable);  co_return 43;}

TEST(THREAD_CONTEXT_TEST, CROUTINE_TEST) { asio::io_context ctx{}; auto wg = asio::make_work_guard(ctx); std::thread tmp_thread([&ctx] { ctx.run(); });
std::allocator<void> alloc; asio::co_spawn(ctx, coro_test(ctx), [](std::exception_ptr ex, int res) { std::cout << "value test:" << res << std::endl; });
std::this_thread::sleep_for(5s);
}


在上述代码中:


  • 我们实现了一个名叫“coro_test”的协程函数。


  • 在其中创建了一个timer,并使用co_await等待timer超时。


  • 使用asio::co_spawn()执行coro_test协程,并设置了一个callback函数来获取这个协程的返回值。


  • 通过阅读示例代码,相信大家肯定有不少疑问,如:


  • 原来接受一个callback的steady_timer::async_wait()为什么可以使用co_await()进行等待?


  • co_spawn()做了什么事情, 让协程可以被asio调度执行?


  • co_return的值是如何传递到最终的callback上的?


  • ...


疑问有不少,我们将从最基础的调度机制出发,来了解asio的整体实现。



二、asio::co_spawn()的实现


co_spawn()有多个重载实现,这些重载主要针对协程是否带返回值,传入的是io_context还是executor等,内部的实现大同小异,不一一讲述了。我们直接以示例代码最终调用到的这个co_spawn()实现来进行展开,co_spawn()的整体执行流程并不复杂:



最终调用的核心代码主要是两处: co_spawn_entry_point(): 完成业务协程到调度器协程的包装awaitable_handler ctor()&&launch(): 构建相关的awaitable_thread并执行(细节后续再展开)。


相关的代码如下:


template <    typename Executor,     typename T,     typename AwaitableExecutor,    typename CompletionToken> requires(    is_executor<Executor>::value    && std::is_convertible<Executor, AwaitableExecutor>::value    && !std::is_void_v<T>    && completion_token_for<CompletionToken, void(std::exception_ptr, T)>  )inline auto co_spawn(    const Executor& ex,    awaitable<T, AwaitableExecutor> a,     CompletionToken&& token){  return async_initiate<CompletionToken, void(std::exception_ptr,T)>(      detail::initiate_co_spawn<AwaitableExecutor>(AwaitableExecutor(ex)),      token, detail::awaitable_as_function<T, AwaitableExecutor>(std::move(a)));}


原始asio实现使用SFINAE完成,这里使用concept做了适当的重构,方便代码具有更好的可读性。感兴趣的读者可以自行翻阅原始实现,可以看到concept的引入对模板编程还是有相当大的简化的,对应代码的可读性也大幅提高。


async_initiate<>()函数的作用我们可以简单理解为根据传入的Initiation类型来自动采用合适的方式执行初始化操作,主要有两种情况:


  • 传入的是一个函数对象,则最终直接调用该函数对象。


  • 传入带intialise()函数的对象,则调用这个initalise()函数。


对于示例,此处传入的是detail::initate_co_spawn<>这个函数对象,则直接调用它的operator()操作符。


template <typename Executor>class initiate_co_spawn{public:  template <typename Handler, typename F>  void operator()(Handler&& handler, F&& f) const{    typedef typename std::invoke_result<F>::type awaitable_type;
auto a = (co_spawn_entry_point)(static_cast<awaitable_type*>(nullptr), ex_, std::forward<F>(f), std::forward<Handler>(handler)); awaitable_handler<executor_type, void>(std::move(a), ex_, proxy_cancel_state.slot(), cancel_state).launch(); }};


核心的代码就两处:


核心代码一


auto a = (co_spawn_entry_point)(static_cast<awaitable_type*>(nullptr),        ex_, std::forward<F>(f), std::forward<Handler>(handler));


这处是使用co_spawn_entry_point()帮助函数把原始的业务协程wrapper到一个新的协程函数,这样就方便做相关的错误处理,以及处理协程与最终callback的串接了。


核心代码二:


awaitable_handler<executor_type, void>(std::move(a), ex_,        proxy_cancel_state.slot(), cancel_state).launch();


这处从新wrapper出来的协程函数构建一个awaitable_thread,并调用launch()执行它,此处的awaitable_handler就是一个awaitable_thread的子类。


搞清楚这两者,我们基本就了解了asio协程调度的机制了,我们接下来具体分析这两者的实现细节。



三、co_spawn_entry_point()的实现


co_spawn_entry_point()的功能是将原始的协程wrapper为一个asio自身实现的协程,有两个版本的重载,分别对应原始协程有无返回值的情况,我们以有返回值的情况进行分析:


template <  typename T,   typename Executor,   typename F,   typename Handler> auto co_spawn_entry_point(    awaitable<T, Executor>*,     Executor ex,     F f,     Handler handler  ) -> awaitable<awaitable_thread_entry_point, Executor> {  auto spawn_work = make_co_spawn_work_guard(ex);  auto handler_work = make_co_spawn_work_guard(      asio::get_associated_executor(handler, ex));
(void) co_await (dispatch)( use_awaitable_t<Executor>{__FILE__, __LINE__, "co_spawn_entry_point"});
(co_await awaitable_thread_has_context_switched{}) = false; std::exception_ptr e = nullptr; bool done = false; try { T t = co_await f();
done = true;
if (co_await awaitable_thread_has_context_switched{}) { (dispatch)(handler_work.get_executor(), [handler = std::move(handler), t = std::move(t)]() mutable { std::move(handler)(std::exception_ptr(), std::move(t)); }); } else { (post)(handler_work.get_executor(), [handler = std::move(handler), t = std::move(t)]() mutable { std::move(handler)(std::exception_ptr(), std::move(t)); }); }
co_return; } catch (...) { if (done) throw;
e = std::current_exception(); }
if (co_await awaitable_thread_has_context_switched{}) { (dispatch)(handler_work.get_executor(), [handler = std::move(handler), e]() mutable { std::move(handler)(e, T()); }); } else { (post)(handler_work.get_executor(), [handler = std::move(handler), e]() mutable { std::move(handler)(e, T()); }); }}


首先是传入参数,我们需要先来区分一下f和handler的区别:


  • F f: 是业务层传入的原始协程函数。


  • Handler handler: 是业务层传入的最终回调函数。


co_spawn_entry_point()函数的功能前面我们也简单提到了,就是将业务传入的协程做二次包装,利用包装出来的新协程完成,整体的代码比较流程化,几个重点环节也很清晰:


  • 对协程的多段任务执行的时机做控制。


  • 对协程返回值的处理。


  • 异常处理。


  • 协程执行完成的业务回调处理。


此处有个用法我们稍微关注一下:


co_await awaitable_thread_has_context_switched{};


类似这种方式,很容易用来实现根据类型来返回我们需要的值的目的,如此处就是返回协程执行上下文有没有出现过切换。这是一个相对特殊的用法,我们在promise_type处(awaitable_frame)重载对特定类型的处理,则可以按需返回我们需要的值,类似的还有获取当前协程executor的:


co_await exectutor_t{};


我们实现一个空的executor_t类并对该类型的await_transform做重载即可完成我们想要的功能,后续内容会展开一个相关的例子,这种是自己实现调度器的时候的一个比较常用的方法。 


另外还有一个特殊的use_awaitable_t\<Executor\>,本节暂不展开,后续的内容会分析它的具体实现,它的作用是用于将常规的callback转换为一个可co_await的对象,callback触发时会自动唤醒协程的继续执行。


先不说具体的实现细节,这种协程的wrapper方式,很适合使用在协程调度器的定制上,asio通过将原始协程wrapper到另外一个协程,很方便的实现了错误处理,返回值处理,额外callback支持这些功能。除此之外,我们也能很容易的想到,利用这种机制,我们很容易实现coroutine group等概念,同时对一组协程进行管理,也能够很方便的实现将新的协程附加到已经存在的group上列队执行等功能,这种方法对于协程管理来说是非常可取的。



四、awiatable_handler的实现


awaitable_handler关联的对象比较多,我们先从c++ coroutine的核心对象来了解一下:



在上图中,我们将asio使用的协程对象(蓝色字标识)与C++的标准对象做了一个关联,从上图我们也能大致的了解到asio中这些相关类:


  • awaitable\<\>: 作为整个协程调度中可co_await的对象。


  • awaitable_frame_base\<\>: 作为promise_type,负责配置asio协程的各种行为,如co_await接受到不同数据类型时的处理->await_transform()各种重载实现。


然后再通过我们前面说的awaitable_handler<>完成协程调度相关的工作,整体的机制就这样串接起来了。我们先来看一下大致的概览图: 



通过awaitable_thread与其上的awatiable_frame stack,asio组织起了一个执行链,注意执行链上的每个awaitable_frame更多的是表达一个执行的先后顺序,不存在执行结果之类的关联。asio的coroutine唤醒机制一般分为两种:


  • callback转换而来的use_awaitable_t,这种比较容易想到,callback发生的时候会唤醒协程继续执行。


  • 利用awaitable_thread<-> awaitable_frame机制,pump的过程中不停唤醒协程继续步进。


两种情况的具体执行我们都来分析一下,这样我们就基本理解了asio的coroutine是如何进行调度的了。


(一)通过callback进行唤醒


这也是asio的一大特色,原有的一系列async_xxxx()的异步回调接口,通过这种机制就能很好的与协程结合起来了,如示例中的:


co_await timer.async_wait(asio::use_awaitable);


callback到co_await的转换魔法,来自上面的asio::use_awaitable,它是一个asio::use_awaitable_t类型的常量:


constexpr use_awaitable_t<> use_awaitable(0, 0, 0);


这部分的实现比较复杂,又关系到了原有的异步callback模式的设计,我们先来了解callback模式的实现,再来了解coroutine 模式下它的实现,从而深入理解asio为什么能够将callback直接转换为co_await,与coroutine无缝的对接。


  • timer async_wait callback模式的实现


在前篇《c++异步:asio的scheduler实现!》中,我们讲到Timer的实现的时候,省略掉了关于async_result<>这部分的逻辑,这里我们重新来展开一下:


asio::steady_timer t(ctx);t.expires_after(1s);t.async_wait([](asio::error_code ec) {   std::cout << "timeout!" << std::endl; });


timer本身对async_result<>的使用很简单,因为我们在此处并不需要提供额外的返回值,所以timer在非coroutine的工作模式下,仅仅只是起到了一个将外界传入的callback派发给最终的dealine_timer_service的作用,当然,看起来整个过程还是比较复杂的:


虽然过程比较复杂,但中间过程我们都可以省略掉,对于timer的callback模式来说,我们仅关注两处的代码就可以了:


代码1(做了一些简化, 去除了无用的返回类型等):


template <    ASIO_COMPLETION_TOKEN_FOR(void(asio::error_code)) WaitToken>auto basic_waitable_timer::async_wait(WaitToken&& token){return async_initiate<WaitToken, void(asio::error_code)>(  initiate_async_wait(this), token);}


代码2:


template <typename WaitHandler>void initiate_async_wait::operator()(WaitHandler&& handler) const{  // If you get an error on the following line it means that your handler  // does not meet the documented type requirements for a WaitHandler.  ASIO_WAIT_HANDLER_CHECK(WaitHandler, handler) type_check;
detail::non_const_lvalue<WaitHandler> handler2(handler); self_->impl_.get_service().async_wait( self_->impl_.get_implementation(), handler2.value, self_->impl_.get_implementation_executor());}


callback模式下用户传入的callback经过多次包装,最终交给了dealine_timer_service负责投递。


  • coroutine模式下对应代码的工作


看完callback模式下对应basic_watiable_timer::async_wait()的工作情况,我们再来看一下它coroutine模式下的工作情况:


asio::awaitable<void> watchdog(asio::io_context& ctx) {  asio::steady_timer timer(ctx);  timer.expires_after(1s);  co_await timer.async_wait(asio::use_awaitable);  co_return;}


对于使用asio::use_awaitable的情况,asio通过partial template specialization,执行的代码发生了变化:



看执行栈来理解还是比较容易的,对于示例代码,执行到:


co_await timer.async_wait(asio::use_awaitable);


时,通过上面的调用栈,系统会自动生成一个handler等待定时器超时的时候唤配,这个handler的作用就是唤醒当前的coroutine-上例中是watchdog,继续往下执行。这样就实现了由callback到co_await语义的转变。asio这版的实现比较晦涩,本身也依赖了像await_transform等机制,也引入了不少噪声很多的代码,我们尽量先理解意图,再去看代码,这样更容易理解一点。框架代码是特化的aync_result<>实现:


template <typename Executor, typename R, typename... Args>class async_result<use_awaitable_t<Executor>, R(Args...)>{public:  using handler_type     = typename detail::awaitable_handler<Executor, typename std::decay<Args>::type...>;  using return_type = typename handler_type::awaitable_type ;
template <typename Initiation, typename... InitArgs> static handler_type* do_init( detail::awaitable_frame_base<Executor>* frame, Initiation& initiation, use_awaitable_t<Executor> u, InitArgs&... args) { (void)u; ASIO_HANDLER_LOCATION((u.file_name_, u.line_, u.function_name_)); handler_type handler(frame->detach_thread()); std::move(initiation)(std::move(handler), std::move(args)...); return nullptr; }
template <typename Initiation, typename... InitArgs> static auto initiate( Initiation initiation, use_awaitable_t<Executor> u, InitArgs... args) -> return_type { co_await [&] (auto* frame) { return do_init(frame, initiation, u, args...); };
for (;;) {} // Never reached.#if defined(_MSC_VER) co_return dummy_return<typename return_type::value_type>();#endif // defined(_MSC_VER) }};


当然,围观上面的代码,我们只会一头雾水,还有奇怪的for(;;){}死循环,并注释上“Never reached”,这是认真的么? 回到前面说的,我们先尝试了解这段代码的作用,如刚才所说,不管如何做,我们要做到callback-> co_await语义的转换,最重要的完成以下目的:


  • 提供某种机制,能够获取到当前正在执行的coroutine,比如上例中的watch_dog()协程。


  • 提供一个函数对象,能够挂接到原来timer的callback位置,将原来自定义的callback变为唤醒coroutine继续往下执行的语义。


理解第一点目的: 第一点其实就是我们看到的死循环上面的部分完成的功能,因为对应的coroutine所有有用的功能都发生在coroutine suspend的时机,并不依赖resume,所以后面的死循环也必然不会跑到了。但这个地方的实现确实很晦涩:


co_await [&] (auto* frame) {  return do_init(frame, initiation, u, args...);};


如上的co_await,会触发promise_type(也就是awaitable_frame_base)对应的await_transform重载:


 template <typename Function>    requires(      std::is_convertible<        typename std::invoke_result<Function, awaitable_frame_base<Executor>*>::type,         awaitable_thread<Executor>*>::value    )   auto await_transform(Function f) {    struct result {      Function function_;      awaitable_frame_base* this_;
bool await_ready() const noexcept { return false; }
void await_suspend(coroutine_handle<void>) noexcept { function_(this_); }
void await_resume() const noexcept {} };
return result{std::move(f), this}; }


利用await_transofrm绕了一圈之后,我们通过struct result这个awaitable对象完成了向do_init()传递当前正在执行的coroutine的目的,这样第一点,正确向do_init()传入当前coroutine对象的目的就完成了。


理解第二点目的: 第二点其实我们比较容易想到,通过对象的operator()重载,我们可以比较方便的构造函数对象,asio这部分也是通过这种机制来实现的,当然,asio的awaitable_handler实现还提供了多个版本的重载以适配不一样的参数类型,比如有无error_code之类,实现都大同小异,我们直接来看其中一个版本:


template <typename Executor, typename... Ts>class awaitable_handler  : public awaitable_handler_base<Executor, std::tuple<Ts...>>{public:  using awaitable_handler_base<Executor,    std::tuple<Ts...>>::awaitable_handler_base;
template <typename... Args> void operator()(Args&&... args){ this->frame()->attach_thread(this); this->frame()->return_values(std::forward<Args>(args)...); this->frame()->clear_cancellation_slot(); this->frame()->pop_frame(); this->pump(); }};


这样,callback机制就成功的转换为了co_await挂起等待执行的机制了,虽然代码比较绕,但这种机制本身还是挺有价值的,给我们提供了传统callback模式如何无缝向coroutine模式切换的机制和思路,可以让我们在项目中更平滑的使用异步的callback模式和co_await模式。



(二)通过frame stack进行唤醒


这个就不细述了,主要是thread对stack的管理,对于常规的co_await,大多都会通过await transform转义到往frame stack添加执行项的操作,而最终在awaitable_thread执行pump的时候:


void pump(){    do {      bottom_of_stack_.frame_->top_of_stack_->resume();    } while (bottom_of_stack_.frame_ && bottom_of_stack_.frame_->top_of_stack_);
if (bottom_of_stack_.frame_) { awaitable<awaitable_thread_entry_point, Executor> a( std::move(bottom_of_stack_)); a.frame_->rethrow_exception(); }}


通过while循环,我们容易知道这种情况coroutine存在合法项会被一直进行resume操作,直到整个执行结束。核心的处理逻辑就是上面的pump()的实现部分了,不过我们下一小节还是结合一段实际代码的执行来更深入的了解一下区别于use_awaitable的立即唤醒模式的工作细节。



  • frame stack的管理


以前面我们介绍过的co_spawn_entry_point()为例,对于:


  try  {    T t = co_await f();
done = true;
//something ignore here... co_return; } catch(...) { //something ignore here... }


此处


T t = co_await f();


的执行就是直接通过frame stack来做唤醒管理的,展开此处的完整实现,其实就是asio对父子协程的管理机制,asio此处的实现比较特殊,没有借助全局的调度器,而是通过对象之间的串接完成了父子协程的执行切换和恢复处理。当然,对比全局的调度器实现,对象的串接,要理清相关的逻辑,就显得更麻烦,相关代码也会比较晦涩一点。



  • 子协程的执行


首先f()本身返回的是一个awaitable<>对象(业务侧本身定义的协程,也就是子协程),对其进行co_await操作挂起的时候,实际执行的代码是:


// Support for co_await keyword.template <class U>void awaitable::await_suspend(  detail::coroutine_handle<detail::awaitable_frame<U, Executor>> h){  frame_->push_frame(&h.promise());}


对于本节的示例代码,这里的f其实是业务层传入的协程,然后asio通过一个co_entry_point()对业务协程进行了包装,对于co_entry_point()来说,f()就是一个子协程,所以执行到co_await f()的时候,我们需要对协程的执行点进行转移,我们需要将协程的执行控制权移交到此处的f(),在f()执行完毕后,再继续co_entry_point()的执行。所以此处push_frame实际的作用就是转移控制权到f()的执行直到f()本身执行结束。下面我们来看一下f()结束发生的事情。



  • 子协程的返回


通过coroutine的final_suspend()定制:


  // On final suspension the frame is popped from the top of the stack.auto awaitable_frame_base::final_suspend() noexcept{struct result{  awaitable_frame_base* this_;
bool await_ready() const noexcept{ return false; }
void await_suspend(coroutine_handle<void>) noexcept{ this->this_->pop_frame(); }
void await_resume() const noexcept{ }};
return result{this};}


子协程在执行结束时,会执行final_suspend(),因为我们返回的result对象,执行pop_frame()后,协程会再次被挂起,然后继续执行父协程-co_entry_point(),同时我们注意到:


T t = co_await f();


在业务协程有提供返回值的情况,我们通过:


// Support for co_await keyword.T awaitable::await_resume(){  return awaitable(static_cast<awaitable&&>(*this)).frame_->get();}


获取到了通过promise_type::return_value保存起来的返回值,最终传递到了外部。



  • frame stack部分小结


其实asio这里的实现,跟coroutine本身的理念是有点相背离的,我们通过coroutine,其实有些时候期望的是将更多分离的代码更系统的组织到一起,这样一方面整体代码的组织更线性,也更容易理清执行的思路,像asio此处的实现,虽然确实是做到了父子协程的管理和相关的迁移,但涉及的对象和代码比较多,分散各处,要理清相关的执行逻辑其实并不容易,所以此处我们更多还是理解为主,并不推荐大家使用这种方式来组织协程,这种代码本身的迭代维护就存在较高的成本,如果出现问题,分析起来也是一种比较麻烦的事情。



五、awaitable的“&&”和“||”操作


这两种操作内部都是通过parallel_group来实现的,我们简单来看一下“&&”和“||”的相关的实现,相关的实现都有几个重载,我们简单看其中一个,重载版本的实现基本都大同小异。


(一)“&&”实现


/// Wait for both operations to succeed./** * If one operations fails, the other is cancelled as the AND-condition can no * longer be satisfied. */template <typename... T, typename U, typename Executor>auto operator&&(    awaitable<std::tuple<T...>, Executor> t,     awaitable<U, Executor> u  ) -> awaitable<std::tuple<T..., U>, Executor>{  auto ex = co_await this_coro::executor;
auto [order, ex0, r0, ex1, r1] = co_await make_parallel_group( co_spawn(ex, detail::awaitable_wrap(std::move(t)), deferred), co_spawn(ex, detail::awaitable_wrap(std::move(u)), deferred) ).async_wait( wait_for_one_error(), use_awaitable_t<Executor>{} );
if (ex0 && ex1) throw multiple_exceptions(ex0); if (ex0) std::rethrow_exception(ex0); if (ex1) std::rethrow_exception(ex1); co_return std::tuple_cat( std::move(detail::awaitable_unwrap<std::tuple<T...>>(r0)), std::make_tuple(std::move(detail::awaitable_unwrap<U>(r1))));}


作用注释写的也比较清楚了,两个子协程同时执行成功则返回成功后的tuple值,任何一个执行失败则直接返回错误。



(二)“||”实现


/// Wait for one operation to succeed./** * If one operations succeeds, the other is cancelled as the OR-condition is * already satisfied. */template <typename Executor>auto operator||(    awaitable<void, Executor> t,     awaitable<void, Executor> u  ) -> awaitable<std::variant<std::monostate, std::monostate>, Executor>{  auto ex = co_await this_coro::executor;
auto [order, ex0, ex1] = co_await make_parallel_group( co_spawn(ex, std::move(t), deferred), co_spawn(ex, std::move(u), deferred) ).async_wait( wait_for_one_success(), use_awaitable_t<Executor>{} );
if (order[0] == 0) { if (!ex0) co_return std::variant<std::monostate, std::monostate>{ std::in_place_index<0>}; if (!ex1) co_return std::variant<std::monostate, std::monostate>{ std::in_place_index<1>}; throw multiple_exceptions(ex0); } else { if (!ex1) co_return std::variant<std::monostate, std::monostate>{ std::in_place_index<1>}; if (!ex0) co_return std::variant<std::monostate, std::monostate>{ std::in_place_index<0>}; throw multiple_exceptions(ex1); }}


两者中的一个执行成功则直接返回成功,另外一个则直接被取消(视具体的cancel语义,大部分情况其实并不能安全取消,只是后者的执行结果会被忽略掉)。



(三)parallel_group


通过阅读&&”“||”的实现代码,我们容易发现,两者其实内部都是通过parallel_group来实现的,区别是async_wait()时的条件不一样,对于 &&”操作条件是wait_for_one_error(),而“||”则是 wait_for_one_success。另外对返回值的处理也是有区别的,&&”返回的是一个tuple,“||”返回的则是一个varaint。parallel_group的实现机制本身也是进行了大量的wrapper操作,最后实际被执行到的parallel_group_completion_handler,这些代码都有多层包装的问题,感兴趣的同学可以自己断一下相关的代码详细了解执行过程,这里就不详细展开了,这部分代码与awaitable实现有同样的问题,包装得都比较重,理解相关的逻辑会相对费力。



(四)简单的示例


asio::awaitable<int> timer_delay1(asio::io_context& ctx) {  asio::steady_timer timer(ctx);  timer.expires_after(1s);
co_await timer.async_wait(asio::use_awaitable); co_return 1;}
asio::awaitable<int> timer_delay2(asio::io_context& ctx) { asio::steady_timer timer(ctx); timer.expires_after(2s);
co_await timer.async_wait(asio::use_awaitable); co_return 2;}
asio::awaitable<int> watchdog2(asio::io_context& ctx) { using namespace asio::experimental::awaitable_operators; std::tuple<int, int> mm = co_await (timer_delay2(ctx) && timer_delay1(ctx)); co_return 43;}
TEST(THREAD_CONTEXT_TEST, CROUTINE_TEST) { asio::co_spawn(ctx, watchdog2(ctx), [](std::exception_ptr ex, int res) { std::cout << "abcd:" << res << std::endl; });}



(五)小结


&&”“||”的存在有效的弥补了asio在缺乏调度器的情况下对一组协程的状态进行管理的需要。但相关的实现我们也比较容易发现,整体包装比较重,但在asio协程的设计下面,抛开复杂度和性能,&&”“||”的存在确实能解决业务里面很多组合任务的诉求,比如通用的超时机制,我们很容易想到利用“||”组合一个timer的co_wait来实现,会让相关的实现有更好的泛用性,而不必每类任务的超时都单独去实现。



六、新版coro<>实现


最新版的asio已经在尝试改善coroutine相关的实现了,作者应该也已经意识到了原有awaitable版本复杂度较高的问题,重新实现了一版移除了复杂的thread&& frame的管理机制,使用更简单的方式来实现的版本,不过这部分还是experimental实现,这里暂时就不详细展开了,仅给出一个示例代码,有兴趣的同学可以自行深扒,结合原有awaitable的理解,这部分应该也是比较好掌握的。


asio::experimental::coro<void(), int> watchdog(asio::io_context& ctx) {  asio::steady_timer timer(ctx);
timer.expires_after(1s);
co_await timer.async_wait(asio::experimental::use_coro); co_return 42;}
asio::experimental::co_spawn(watchdog(ctx), [](std::exception_ptr ex, int res) { std::cout << "abc:" << res << std::endl; });


从用法上来说,业务层的使用接口变化不大,但细扒代码会发现整体机制会比原来的版本简洁不少。不细扒这部分的另外一个原因是笔者更多的是照顾游戏业务,游戏业务很多情况下都是基于心跳(frame)来驱动的,coroutine唤醒的情况也会更复杂一些,比如await_next_frame等语义,另外对父子协程的管理也会更复杂一些,所以我们更多还是会依赖有调度器存在的情况对系统中的协程做统一的调度和管理,也方便加入更多适合游戏的定制性调度行为, 这部分在以后的文章里再来细扒了。



七、总结


通过对整体asio coroutine实现的了解,我们能看到,asio更多还是基于原来的async callback模式兼容的目的去思考整个协程调度的实现,中间的wrapper过程会特别的多,层层包裹,以及一些代码利用一些不那么容易理解的机制来实现想要的功能,导致最终的代码实现对比原来的实现来说,有一定的落差,可读性,可维护性,整体性都会有一些折扣。


同时,我们也能看到,asio尝试提供了async callback到co_await平滑过渡的机制,另外awaitable本身支持&&”“||”扩展语义,也为业务提供了更灵活更可控的异步控制机制,这些还是很值得研究和学习的。


除了coroutine之外,新版的asio也尝试实现了一版go channel like的异步支持,感兴趣的同学可以自行翻阅相关实现。


参考资料:

1.asio官网

2.Talking Async Ep1: Why C++20 is the Awesomest Language for Network Programming

3.Talking Async Ep2: Cancellation in depth



 作者简介


沈芳

腾讯后台开发工程师

IEG研发效能部开发人员,毕业于华中科技大学。目前负责CrossEngine Server的开发工作,对GamePlay技术比较感兴趣。



 推荐阅读


c++异步:asio的scheduler实现!

C++异步:libunifex中的concepts详解!

C++异步变化:libunifex实现!

Go组件:context学习笔记!



good-icon 0
favorite-icon 0
收藏
回复数量: 0
    暂无评论~~
    Ctrl+Enter