首页 文章详情

C++异步:structured concurrency实现解析!

云加社区 | 359 2022-07-08 09:59 0 0 0
UniSMS (合一短信)


导语 | 本篇我们将介绍整个libunifex structure concurrency的实现思路,并结合一部分具体的cpo实现,对这部分的实现做深入分析。


前篇《C++异步:libunifex中的concepts详解!》中我们介绍了libunifex作为框架部分的concept设计,本篇我们将在这个基础上,继续介绍整个libunifex structure concurrency的实现思路,并结合一部分具体的cpo实现,对这部分的实现做深入分析。


一、Structured Concurrency


我们以一个简短的示例代码开启本章的内容:


single_thread_context tcontext;int count = 0;schedule(tcontext.get_scheduler())   | then([&] { ++count; })   | sync_wait();


这段代码的表达方式前面我们也介绍过,主要使用了ranges类同的pipeline表达,我们可以简单将这种表达方式看成是C++的一种特殊LINQ实现,一个专有的DSL,当然,作为一个DSL来说,就execution的整体设计而言,它被赋予了一些专有的特性和意义:所以,如果从一个DSL的角度来看execution的结构化 concurrency,我们容易得出类似以下的观点,对于execution的pipeline表达:



  • DSL定义(BNF组成)-首先是范式的组成,如上图所示,业务使用结构化并发表达的时候,整个范式是由Concurrency Pipeline::= Sender Factory { '|' Sender Adapter } '|' Receiver组成的。


  • Compiler-通常情况我们可以将|操作以及connect()加起来成是编译过程, 借由Compiler Time的特性支持, 我们可以通过connect()产生runtime所需的OperationState。


  • Execute-这个阶段就很自然了,OperationState的start()就是DSL本身执行的入口点,当然,执行结果最后是通过:set_value,set_error,set_done这几个receiver cpos来传递的。


本篇中我们将以这种思路结合一些Sender Factory,Send Adapter节点,以及这种结果处理节点的具体实现来展开.sync_wait()。


二、Sender Factory


各种Sender Factory cpos用于产生各类sender,前面我们也介绍过, sender最大的特征就是会触发set_value,set_error,set_done这几个用于结束通知的receiver cpos。此处我们以just() 为例,来看一下一个Sender Factory需要包含的实现内容,在后续文章中我们会再介绍另外一个schedule() cpo的实现。


(一)just实现解析


首先just(values...)的语义, 就是生成一个sender, 该sender可以向后续的节点通过receiver cpos传递values..., 我们具体来看一下libunifex的just()实现, 会比大家想的复杂一些, 这主要还是因为execution实现的整体思路就是在尝试定义一个DSL, 然后这个DSL本身是自恰的, 比如对于just()来说, 必然会包含以下几个部分:


  • sender生产方法-just() cpo本身。


  • just::sender的实现-具体的sender实现。


  • 相关的OperationState-节点可参与异步操作执行,则必然可以通过connect()来产生其OperationState对象,最后对start()作出响应。


我们来分别看一下这三部分的具体代码实现:


  • sender生产方法


constexpr auto just_cpo::operator()(Values&&... values) const {  return _just::sender<Values...>{std::in_place, (Values&&)values...};}


just()的入口定义比较简单,主要是根据输入的变参values...构造一个_just::sender<Values...>{}对象并返回。这就是我们下一节要介绍的sender实现。


bind_back()版的operator()重载主要用于pipeline组织,代码大量雷同,本篇将统一忽略,方便源码的阅读,有兴趣的读者可以自行翻阅相关的实现。


  • just::sender的实现


_just::sender<> 其实真实类型是 _just::_sender<>::type,这个只是libunifex惯用的一种封装方式,具体的实现如下:


template <typename... Values>class just::sender<Values...> { private:  std::tuple<Values...> values_; public:  template </*...*/>  using value_types = Variant<Tuple<Values...>>;
template </*...*/> using error_types = Variant<std::exception_ptr>;
static constexpr bool sends_done = false;
template<typename... Values2> explicit sender(std::in_place_t, Values2&&... values) : values_((Values2 &&) values...) {}
template<typename This, typename Receiver> friend auto tag_invoke(tag_t<connect>, This&& that, Receiver&& r) -> operation<Receiver, Values...> { return {static_cast<This&&>(that).values_, static_cast<Receiver&&>(r)}; }};


这是一个很标准的sender实现,如我们在《C++异步:libunifex中的concepts详解!》中介绍的一样。首先是sender traits需要的类型定义部分,决定了sender可能触发的receiver cpos的参数和类型:


  template </*...*/>  using value_types = Variant<Tuple<Values...>>;
template </*...*/> using error_types = Variant<std::exception_ptr>;
static constexpr bool sends_done = false;


其次是通过tag_invoke定义的connect()实现:


 template<typename This, typename Receiver>  friend auto tag_invoke(tag_t<connect>, This&& that, Receiver&& r)      -> operation<Receiver, Values...> {    return {static_cast<This&&>(that).values_, static_cast<Receiver&&>(r)};  }


此处返回的operation<>也是我们下一节要介绍的just()专用的OperationState实现。


  • 相关的OperationState


template <typename Receiver, typename... Values>struct just::operation<Receiver, Values...>::type {  std::tuple<Values...> values_;  Receiver receiver_;
void start() & noexcept { try { std::apply( [&](Values&&... values) { execution::set_value((Receiver &&) receiver_, (Values &&) values...); }, std::move(values_)); } catch (...) { execution::set_error((Receiver &&) receiver_, std::current_exception()); } }};


抛开绕来绕去的alias name来说, 这个OperationState的实现很简单, 存储了传入的values...和connect()时关联的Receiver, 并且在start()时向存储的Receiver调用set_value()传递存储下来的values...



(二)本章小结


对于一个sender factory类型的cpo来说,我们始终可以将其实现简单的分成以下几部分:


  • sender生产方法-如just()。


  • sender的实现-具体的sender实现。


  • 相关的OperationState-节点可参与异步操作执行,则必然可以通过connect()来产生其OperationState对象,最后对start()作出响应。因为用于产生一个sender,这类节点一般都出现在structured concurrency描述的最左侧,负责作为后续节点的数据来源,如最开始的示例代码中那样。



三、Sender Adapter


首先我们知道Sender Adapter是作为中间节点存在的:


Concurrency Pipeline ::= Sender Factory { '|' Sender Adapter } '|' Receiver


我们先来看一下Sender Adapter语义层面的特征:


  • Sender Adapter是Sender的包装器,接收前置Sender对象后形成新的Sender对象。


  • 新的Sender对象有自己的异步类型定义,同样也通过receiver cpos向后续节点传递异步操作结果。


Sender Adapter其实就像一个filter,它对原始的异步处理结果进行加工,产生新的结果,大致的工作情况如下图所示:



如上图所示,对于一个Sender Adapter定义,至少会包含两个对象:


Internal Receiver-用于接收Previous Sender发送的结果,处理自己的逻辑后再将结果发往后续节点。


Internal Sender-SenderAdapter(Sender,args...)形成一个新的Sender,连接到后续节点。当然,还会有一个用于作为入口的cpo。


我们具体以比较常用的then()的实现来具体看一下libunifex中一个典型的Sender Adapter是如何实现的:



(一)then() cpo


then()节点的作用是从上一个节点中获取异步返回值后,用该返回值作为输入值调用传入then()节点的函数后,将调用结果作为异步操作的结果返回后续节点,简单的图示如下:



对应的代码实现为:


template<typename Sender, typename Func>auto then_cpo::operator()(Sender&& predecessor, Func&& func) const  -> _result_t<Sender, Func> {  return execution::tag_invoke(_fn{}, (Sender&&)predecessor, (Func&&)func);}
template<typename Sender, typename Func>auto then_cpo::operator()(Sender&& predecessor, Func&& func) const -> _result_t<Sender, Func> { return _then::sender<Sender, Func>{(Sender &&) predecessor, (Func &&) func};}};


then()调用的处理区分了传入的Func是否可tag_invoke的判断,我们直接看最通常的情况,传入的是普通函数:


template<typename Sender, typename Func>auto then_cpo::operator()(Sender&& predecessor, Func&& func) const  -> _result_t<Sender, Func> {  return _then::sender<Sender, Func>{(Sender &&) predecessor, (Func &&) func};}


最后返回的是一个_then命名空间下定义的_then::sender<>对象,并且这个对象将前置的Sender对象和传入的func作为构造这个对象的参数。我们来看一下这个sender的具体实现:



(二)then()的Internal Sender实现


template <typename Predecessor, typename Func>struct then::sender<Predecessor, Func>::type {  Predecessor pred_;  Func func_; private:
template <typename... Args> using result = /*unspecified*/;public:
template </*unspecified*/> using value_types = /*unspecified*/;
template </*unspecified*/> using error_types = /*unspecified*/;
static constexpr bool sends_done = sender_traits<Predecessor>::sends_done;
template <typename Receiver> using receiver_t = receiver_t<Receiver, Func>;

template<typename Sender, typename Receiver> friend auto tag_invoke(tag_t<execution::connect>, Sender&& s, Receiver&& r) -> connect_result_t<member_t<Sender, Predecessor>, receiver_t<remove_cvref_t<Receiver>>> { return execution::connect( static_cast<Sender&&>(s).pred_, receiver_t<remove_cvref_t<Receiver>>{ static_cast<Sender&&>(s).func_, static_cast<Receiver&&>(r)}); }};


跟我们前面看到的just()内的sender实现一样,包含了基本的sender types定义,以及sender相关的connect()tag_invoke定义:


  template<typename Sender, typename Receiver>  friend auto tag_invoke(tag_t<execution::connect>, Sender&& s, Receiver&& r)      -> connect_result_t<member_t<Sender, Predecessor>, receiver_t<remove_cvref_t<Receiver>>> {    return execution::connect(      static_cast<Sender&&>(s).pred_,      receiver_t<remove_cvref_t<Receiver>>{        static_cast<Sender&&>(s).func_,        static_cast<Receiver&&>(r)});  }


我们可以看到,对then()的sender进行connect()的时候,真正发生connect()的是我们之前在then(Previous Sender,Func)调用时缓存下来的上一节点,以及新构建出的receiver_t<>对象,这个对象也是Func真正被执行的地方,同时这个对象也保存了后续的Reciver节点,方便向后续节点传递异步执行结果。



(三)then()的Internal Receiver实现


template <typename Receiver, typename Func>struct then::receiver_t<Receiver, Func>::type {  Func func_;  Receiver receiver_;
template <typename... Values> void set_value(Values&&... values) && noexcept { using result_type = std::invoke_result_t<Func, Values...>; if constexpr (std::is_void_v<result_type>) { if constexpr (noexcept(std::invoke( (Func &&) func_, (Values &&) values...))) { std::invoke((Func &&) func_, (Values &&) values...); execution::set_value((Receiver &&) receiver_); } else { try { std::invoke((Func &&) func_, (Values &&) values...); execution::set_value((Receiver &&) receiver_); } catch (...) { execution::set_error((Receiver &&) receiver_, std::current_exception()); } } } else { if constexpr (noexcept(std::invoke( (Func &&) func_, (Values &&) values...))) { execution::set_value( (Receiver &&) receiver_, std::invoke((Func &&) func_, (Values &&) values...)); } else { try { execution::set_value( (Receiver &&) receiver_, std::invoke((Func &&) func_, (Values &&) values...)); } catch (...) { execution::set_error((Receiver &&) receiver_, std::current_exception()); } } } }
template <typename Error> void set_error(Error&& error) && noexcept { execution::set_error((Receiver &&) receiver_, (Error &&) error); }
void set_done() && noexcept { execution::set_done((Receiver &&) receiver_); }};


到receiver的实现这里就很自然了,通过set_value()接受前面的Sender传递过来的结果,将结果作为输入参数调用Func后,再通过set_value()向后续节点传递Func的返回值。



(四)本章小结


对于一个Sender Adapater类型的cpo来说,主要需要完成以下几件事情:


  • 入口cpo(如then())-完成对前置Sender的接收和需要的参数的接收处理,创建一个专用的Internal Sender对象并返回。


  • Internal Sender-存储前置Sender和需要的参数,并实现tag_invoke(tag_t<execution::connect>)用于构建InternalReceiver,并将实际的connect()操作重定向到保存下来的前置Sender和新创建的InternalReceiver上。


  • InternalReceiver-获取前置Sender的异步结果,并在处理自身逻辑后,将最终的结果返回给后续节点。整体上来说可以将这看成一种wrapper机制, set_value是拦截点,在拦截点上插入自身逻辑,最后依然还是通过set_value返回下一步需要的异步执行结果。



四、sync_wait_r()与sync_wait()


libunifex的实现并没有提供一个类似default receiver的节点,但提供了工具节点sync_wait_r()和sync_wait(),当然,除了通过这种方式来处理返回结果,你也可以自行实现一个自己的Receiver来接收异步返回值。本章我们主要介绍sync_wait_r()和sync_wait()的实现,通过这两者,我们也能更深入理解libunifex常规状态下是如何发起一个异步操作执行并接收其返回结果的。


(一)cpo入口


sync_wait():


template<typename Sender>auto sync_wait_cpo::operator()(Sender&& sender) const    -> std::optional</*...*/> {  using Result = /*...*/;  return _sync_wait::_impl<Result>((Sender&&) sender);}


sync_wait_r():


template <typename Result>decltype(auto) sync_wait_r_cpo::operator()(Sender&& sender) const {  using Result2 = non_void_t<wrap_reference_t<decay_rvalue_t<Result>>>;  return _sync_wait::_impl<Result2>((Sender&&) sender);}


两者代码高度相似


  • 输入参数都是Sender。


  • 利用_sync_wait::_impl<>来完成具体的实现。


两者的差异:


  • sync_wait_r<Result>允许业务侧指定返回值的类型,不支持pipeline操作,一般直接以sync_wait_r<Result>(Sender)的方式来使用。


  • sync_wait 直接使用传入的Sender来推导返回值类型,可以作为pipeline的终结节点使用,如just(1)|sync_wait()。


我们接下来看一看sync_wait和sync_wait_r都引用的_sync_wait::_impl的实现:



(二)sync_wait::_impl的实现


auto _impl(Sender&& sender) {  manual_event_loop ctx;  // Store state for the operation on the stack.  auto operation = connect(      (Sender&&)sender,      _sync_wait::receiver_t<Result>{promise, ctx});
start(operation);
ctx.run();
// ... (retsult handling here)}


整体实现比较简洁,我们主要关注几点:


  • _impl()最终的返回值类型为std::optional<Result>。


  • 整个函数的实现完成了前面的们提到的connect()产生OperationState,再执行start()的过程。


  • connect()时与传入的Sender进行连接的Receiver是自定义的_sync_wait::_receiver<T>::type类型。


  • ctx.run()等待最终执行的完成(相关详细分析可参考后续文章)。


  • 根据promise.state_记录的类型对返回值进行处理(正确返回值还是抛异常)。


剩下的就只有_sync_wait::receiver_t<>的实现了,我们接着来看一下这部分的实现:



(三)_sync_wait::receiver_t<>的实现


template <typename T>struct sync_wait::receiver_t {    promise<T>& promise_;    manual_event_loop& ctx_;
template <typename... Values> void set_value(Values&&... values) && noexcept { try { execution::activate_union_member(promise_.value_, (Values&&)values...); promise_.state_ = promise<T>::state::value; } catch (...) { execution::activate_union_member(promise_.exception_, std::current_exception()); promise_.state_ = promise<T>::state::error; } signal_complete(); }
void set_error(std::exception_ptr err) && noexcept { execution::activate_union_member(promise_.exception_, std::move(err)); promise_.state_ = promise<T>::state::error; signal_complete(); }
void set_error(std::error_code ec) && noexcept { std::move(*this).set_error(make_exception_ptr(std::system_error{ec, "sync_wait"})); }
template <typename Error> void set_error(Error&& e) && noexcept { std::move(*this).set_error(make_exception_ptr((Error&&)e)); }
void set_done() && noexcept { promise_.state_ = promise<T>::state::done; signal_complete(); } private: void signal_complete() noexcept { ctx_.stop(); }};


这就是一个很标准的receiver实现,利用set_value,set_error,set_done的重载来完成对前置Sender执行结果的获取,通过前面的代码我们容易知道,如果是无异常的状态,则正常的通过std::optional<>来返回执行结果,否则抛出异常。另外,代码中的signal_complete()用于通知_impl函数中的 ctx.run()返回,最终向用户返回异步操作的结果。



五、总结


本篇我们从libunifex的structured concurrency设计开始,简述了整套execution整套DSL的组织和执行的逻辑,并结合具体的:


  • Sender Factory实现举例-just()。


  • Sender Adapter实现举例-then()。


  • 终结节点-sync_wait()和sync_wait_r()加深大家对execution各类节点实现的理解。


structured concurrency的设计是整个库的核心,理解了它,也能方便我们理解一些基础节点的实现,也为自己定制更多业务化的节点提供良好的基础。这也是为什么execution库也被当成一个库作者向的特性的原因,与其说它是一个异步库,不如说它在尝试定义一套从DSL到执行态都比较完备的c++异步专用语言。当然,后者的学习成本比学习一个库明显会高出比较多。


参考资料:

1.libunifex源码库


 作者简介


沈芳

腾讯后台开发工程师

IEG研发效能部开发人员,毕业于华中科技大学。目前负责CrossEngine Server的开发工作,对GamePlay技术比较感兴趣。



 推荐阅读


C++异步:libunifex中的concepts详解!

图文并茂!带你深度解析Kubernetes

万卷共知,一书一页总关情,TVP读书会带你突围阅读迷障!

C++异步变化:libunifex实现!



温馨提示:因公众号平台更改了推送规则,公众号推送的文章文末需要点一下“赞”“在看”,新的文章才会第一时间出现在你的订阅列表里噢~

good-icon 0
favorite-icon 0
收藏
回复数量: 0
    暂无评论~~
    Ctrl+Enter