首页 文章详情

C++异步:libunifex中的concepts详解!

云加社区 | 505 2022-07-08 10:00 0 0 0
UniSMS (合一短信)


导语 | 本篇我们将深入libunifex的concepts设计的方方面面,结合libunifex中的各种concepts实现,更加深入地了解整个库的实现。希望对此方面感兴趣的开发者们给予一点经验和帮助。


前言


在前文《C++异步变化:libunifex实现!》中我们对libunifex的整体实现做了概要性的介绍,本篇我们将深入libunifex的concepts设计的方方面面,结合libunifex中的各种concepts实现,更加深入了解整个库的实现。



一、Sender/Receiver机制概述


上一篇中我们也提到过,与普通函数通过return来返回值相比,libunifex中的Sender和Receiver所表达的是这样一种关系:一个作为生产者的Sender对象通过:


  • set_value。


  • set_done。


  • set_error这三个cpo的其中一个来向作为消费者的Receiver对象传递值的。



如上图所示,类同于一个function或者一个lambda通过return value来返回调用结果,Sender其实就是一个异步任务中的function/lambda,只不过对比普通函数来说,Sender的对应的值传递的时机更复杂,同时也存在三个不同的分支来代表不同的异步处理状态。


当然,实际我们要做到为异步任务提供标准接口的同时,我们还需要让我们的异步任务能够自由的被各种通用异步算法组合,同时我们还希望相关的实现是具备lazy性质的。


(一)Structured Concurrency与Lazy Execution


比较多Execution的文章,都会聊到Lazy Execution,但Lazy Execution感觉更多是结果,这种特性更多的是来源于Structured Concurrency本身的设计。


这里我们只给出基本的概念,Structed Concurrency会在专门的一篇文章中进行展开。


我们先来看一下整个execution的概览图:



execution相关的设计可以认为是一种从DSL->Compiler->Execute过程都很完备的小型专用语言。对于一个使用structured concurrency来表达的异步操作来说:


  • DSL(BNF组成)-Concurrency Pipeline::=Sender Factory{ '|' Sender Adapter } '|' Receiver。


  • Compiler-通常情况我们可以将connect()看成是编译过程,借由Compiler Time的特性支持,我们可以通过connect()产生runtime所需的OperationState。


  • Execute-这个阶段就很自然了,OperationState的start() 就是DSL本身执行的入口点,当然,执行结果最后是通过:set_value,set_error,set_done这几个 receiver cpos来传递的。


所以对于lazy evaluate这个特性,因为设计本身就是有明显的DSL->Compiler->Execute过程的,必然就会具备lazy性质了(直到最后一步才执行Execute操作)。


structured concurrency本身的完备设计和系统性,为我们定制,或者更进一步支持异构计算打下了良好的基础。


接下来我们来具体的看一下整个connect-start的细节:



(二)发起一个异步操作


libunifex用的方式其实并不复杂,通过connect()操作,Sender和Receiver会生成一个中间态的OperationState对象,这个对象负责持有异步任务需要的所有状态和封装所有需要的逻辑。


对于pipeline组织的异步任务来说,整个OperationState是一步一步按层组织到组织到一起的,每个OperationState负责自己的状态管理,并负责生成每一步的操作结果,整个connect的过程我们可以简单理解为一个树状的OperationState的组织过程。connect()返回的结果则是这个树的根节点,也是一个OperationState。


到了真正执行OperationState的start()阶段,整个过程就简单了,我们以整个OperationState树的根节点作为入口,依次触发作为子节点OperationState的执行,直到整个异步任务执行完成。


所以从这里我们其实也能很明显的看到,通过connect-start机制,状态的保存和生命周期控制都交由OperationState来负责了,connect的时候产生相应的OperationState,在start()执行结束后,相应的OperationState即可释放了,很多时候我们甚至可以很方便的在栈上分配一个OperationState,在start结束后再释放它,这样也避免了过多的堆分配导致的性能下降。


这种构造,首先是很容易产生lazy evaluation的作用,利用生成的OperationState,我们可以将这个OperationState放在任何需要的地方执行,另外,整个OperationState这个时候也形成了一个类似AST(Abstract Syntax Tree)的存在,我们发现lambda post过程中丢失的节点与节点之间的值类型约束,清晰的上下文关系,因为这颗树的存在都解决了,是不是比大家想的要简单? 其实并不简单了,execution提案发展到这一步,大家看看中间各个版本的差异就能感知到了。只不过一些复杂问题的解决,往往到最后大家回头看,相关的方案总是简洁明了,理解起来相对顺畅的。


相较于lambda post依赖lambda对异步执行的中间状态进行保存的即时方式,connect-start机制中整个异步任务中的状态管理和生命周期控制都借由OperationState的存在具像化了,一方面代码具备了可复用性,另外整个机制的约束性明显也从自由使用,自行保证正确性,向带约束的使用,误用的地方会有明显的compiler time报错转移了,避免大家踩一些不容易发现又容易写错的坑。另外对于lambda post下我们经常使用智能指针来对状态进行保存和传递的方式比,connect-start机制也能允许我们更多的使用性能更高的栈对象来处理异步逻辑。



(三)Object-state的生命周期


通过前面的讲述,不难理解,对于connect-start模型来说,一个OperationState的生命周期,其实是在connect()后,到start()调用set_value,set_done,set_error这三者之间的任何一个的时刻,再已经返回异步调用的结果后,我们都应该保证receiver此时是知道对应的OpreationState已经被销毁,任何对OperationState的操作都是非法的了。


另外需要注意的一点是,OperationState对象应该是不存在move和copy语义的,正常来说,一个OperationState的构建都应该是作为connect()的返回值,作为值类型即时构建并返回的。



(四)一个异步操作的完成


libunifex中要完成一个异步任务,们以connect传入的receiver作为首个参数调用set_value,set_done,set_error这三者中的任何一个,相关的异步任务就执行完成了,三者的区别如下:


  • set_value用来表示一个成功执行的异步任务,同时后续会追加0..N个任务执行成功的返回值。


  • set_error毫无疑问用来表示一个执行失败的异步任务了。


  • set_done比较特殊,用来表示set_value和set_error之外的情况,很多时候是表示相关的结果不再需要了,比如A, B两个节点同时执行,我们只需要其中一个节点的结果,A成功执行,我们希望通过取消机制取消B的执行,这种情况如果B本身是一个可取消的节点,那就可以直接返回set_done了。很多时候你可以将set_done当成无返回值,或者返回值为空的情况。


这里我们需要特别注意一下 set_value(receiver) 和set_done(receiver)之间的区别,前者表达的意思很明确,就是一次成功的无任何返回值的异步调用,而后者则可能表达的是异步任务并未成功执行的情况。



二、Receiver Concept


简单来说,任何能够使用set_value,set_error,set_done这三个receiver cpos中的一个的类型都可以作为一个合法的Receiver。


我们也能将Receiver作为异步任务中的继续点来考虑,通过多层receiver cpos的级联,最终组成了我们的整个异步任务。


libunifex中并没有单独存在的通用receiver concept,只有几类针对不同receiver cpos的receiver concept 定义:


  • value_receiver<Values...>-一个能够接受set_value(receiver,Values...)的receiver concept定义。


  • error_receiver<Error>-一个能够接受set_error(receiver,Error)的receiver concept定义。


  • done_receiver-一个能够接受set_done(receiver)receiver concept定义。


相关的代码如下:


namespace unifex{  // CPOs  inline constexpr unspecified set_value = unspecified;  inline constexpr unspecified set_error = unspecified;  inline constexpr unspecified set_done = unspecified;
template<typename R> concept __receiver_common = std::move_constructible<R> && std::destructible<R>;
template<typename R> concept done_receiver = __receiver_common<R> && requires(R&& r) { set_done((R&&)r); };
template<typename R, typename... Values> concept value_receiver = __receiver_common<R> && requires(R&& r, Values&&... values) { set_value((R&&)r, (Values&&)values...); };
template<typename R, typename Error> concept error_receiver = __receiver_common<R> && requires(R&& r, Error&& error) { set_error((R&&)r, (Error&&)error); };}


这部分代码还是比较好理解的,receiver首先要同时满足可move构造和析构,然后就是done_receiver,value_receiver,error_receiver这个concept的定义了。


小技巧: 注意上面的:inline constexpr unspecified set_value= unspecified;

c++20后,很多cpo的定义都会使用这种方式在文档中出现,这基本都是代表对应的声明是一个cpo对象,因为cpo的声明其实并不简单,直接省略掉具体的cpo实现,会方便我们更好的阅读代码,比如上面的set_value, set_error,set_done这三个receiver cpos。


不同的Sender类型一般有不同的完成通知,所以对于connect()传入的Receiver会有不一样的约束。很多时候我们会根据相关的Sender实现来组合相关的Receiver约束,例如一个Sender如果在不同的分支处理下同时存在set_value,set_done和set_error调用的话,我们需要同时复合三种类型的receiver约束来组合出最终的receiver约束。


Receiver Query


同时,Receiver还有一种特殊的用法,用于作为caller和callee传递上下文信息的存在,比如对于get_stop_token(receiver)这个cpo来说,它可以用来查询当前receiver支持的stop_token类型,从而在相应的sender处调用合适的代码来处理相关的逻辑。



三、Sender Concept


对比明确的Receiver定义,libunifex中并没有一个通用的Sender约束,这是因为我们编译期没法知道Sender是否触发了三个receiver cpos中的一个,libunifex通过一种间接的方式来对Sender进行约束:


namespace unifex{  // Sender CPOs  inline constexpr unspecified connect = unspecified;
// Test whether a given sender and receiver can been connected. template<typename S, typename R> concept sender_to = requires(S&& sender, R&& receiver) { connect((S&&)sender, (R&&)receiver); };}


libunifex通过对connect接受的sender和receiver来间接的约束sender,也就是满足connect()调用的sender即是合法的sender。


虽然libunifex官方想通过加sender_traits和is_sender<T>的方式来改善相关实现,但在泛型加上不定时机两者的作用下,相关的实现不可能太智能, 可能依然还是库作者向如TypedSender对应的类型traits实现,也就是约束是由对应Sender的实现者来手动维护的。



四、TypedSender Concept


TypedSender其实并不是一个标准的concept实现,它仅仅是提供了一种编译期的traits,让我们有能力在编译期查询对应TypedSender能够支持的set_value和set_error的参数类型,使我们可以利用这些信息更好的约束数据在异步节点间的传递,当类型不匹配的情况下,compiler会直接报错。我们来看一个实际的例子:


struct some_typed_sender { template<template<typename...> class Variant,          template<typename...> class Tuple> using value_types = Variant<Tuple<int>,                             Tuple<std::string, int>,                             Tuple<>>;
template<template<typename...> class Variant> using error_types = Variant<std::exception_ptr>;
static constexpr bool sends_done = true; ...};


对于一个TypedSender实现,我们可以通过它内嵌的类型:


  • value_types-编译期获取其支持的set_value参数类型。


  • error_types-编译期获取其支持的set_error参数类型。


  • sends_done-该bool值用于判断对应sender是否支持以set_done调用结束异步操作。


对于上面的例子来说,我们容易知道,对应的TypedSender定义可能会对关联的Receiver调用以下函数重载:


  • set_value(R&&,int)


  • set_value(R&&,std::string,int)


  • set_value(R&&)


  • set_error(R&&,std::exception_ptr)


  • set_done(R&&)


不过我们的代码实现中,一般很少直接使用这些内嵌类型,而是通过sender_traits<Sender>来使用它们。


比如:typename unifex::sender_traits<Sender>::template value_types<std::variant, std::tuple>



五、OperationState Concept


一个OperationState对象包含了一个异步任务的所有内部状态。它同样也不是一个直接的类型约束。


我们知道一个OperationState是由connect(Sender,Receiver)返回,同时一个OperationState也是不能被move和copy的。对于一个OperationState对象,它只支持两种操作:


  • start()


  • 析构


一个OperationState的析构只有在未调用start()前,或者在已经发出任务结束通知的情况下才是合法的。


namespace unifex{  // CPO for starting an async operation  inline constexpr unspecified start = unspecified;
// CPO for an operation-state object. template<typename T> concept operation_state = std::destructible<T> && requires(T& operation) { start(operation); };}


从上面的代码中可以看到, 对于OperationState的约束, 主要就两点:


  • 支持析构


  • 支持start()操作



六、Scheduler Concept


之前的execution概览图中,我们其实也能看到,真正负责异常操作执行的,就是Scheduler和更底层的ExecutionContext。libunifex中的Scheduler其实就是一个轻量的Wrapper,真正负责异步任务执行的是底层的Execution Context实现。对于非异构的实现,这里的Execution Context一般代表一个Cpu线程或者一组Cpu线程(线程池),最简单的情况是相关的任务被投递到一个线程上来执行。


libunifex有两种方式挂接相关的异步任务到Scheduler对象上:


  • 调用schedule(Scheduler) 这个cpo生成一个sender,然后再利用pipeline关联异步操作到这个sender上。


  • via(sender,Scheduler) 和typed_via(sender,Scheduler)这两个sender adapter来中间切换后续任务执行的Execution Context。


schedule()的实现机制并不复杂,Execution Context是cpu线程来举例说明,底层会保证在关联的线程上执行一个参数类型为void的set_value(),这样因为sender和sender adapter以及receiver的组合机制,在没有via()或者typed_via()等特殊节点存在的情况下,整个pipeline的相关内容都会在这个线程上被执行。这样也就间接的通过一个set_value()的调用位置决定了整个pipeline执行所归属的Execution Context。


一个Scheduler Concept的定义如下:


namespace unifex{  // The schedule() CPO  inline constexpr unspecified schedule = {};
// The scheduler concept. template<typename T> concept scheduler = std::is_nothrow_copy_constructible_v<T> && std::is_nothrow_move_constructible_v<T> && std::destructible<T> && std::equality_comparable<T> && requires(const T cs, T s) { schedule(cs); // TODO: Constraint this returns a sender of void. schedule(s); };}


(一)TimeScheduler


TimeScheduler是Scheduler的扩展实现,除了Scheduler本身的支持外,TimeScheduler还支持在指定的时间执行异步任务的特性。扩展的能力包括以下:


  • typename TimeScheduler::time_point


  • now(ts) -> time_point


  • schedule_at(ts,time_point) -> sender_of<void>


  • schedule_after(ts,duration) -> sender_of<void>


其中:

  • now(ts)-用来返回当前时刻点的cpo


  • schedule_at(ts,time_point)-用于产生一个在指定时刻点调度的sender

  • schedule_after(ts,duration)-用于产生一个在duration指定的时间段后调度的sender


另外,通过time_point类型,用户可以按需定制实现自己的Scheduler来支持自定义的时间单位,以适配复杂的业务需要。


具体的TimerScheduler的Concept定义如下:


namespace unifex{  // TimeScheduler CPOs  inline constexpr unspecified now = unspecified;  inline constexpr unspecified schedule_at = unspecified;  inline constexpr unspecified schedule_after = unspecified;
template<typename T> concept time_scheduler = scheduler<T> && requires(const T scheduler) { now(scheduler); schedule_at(scheduler, now(scheduler)); schedule_after(scheduler, now(scheduler) - now(scheduler)); };}



七、其他Concepts实现


(一)StopToken


cancellation机制相关的concept,又是一个比较难讲清的话题,本篇暂时不具体展开了,有时间会单独阐述这部分的具体实现。



(二)ManySender/ManyReceiver Concept


正常来说,一个Sender只会触发一次完成信号,ManySender允许多次触发完成信号,与ManySender对应,也会有ManyReceiver,这部分更多是对Sender/Receiver的一种扩展,本系列的讲述中也不会实际用到它们,这里先直接略过,感兴趣的同学可以自行查阅相关文档。



(三)Stream Concept


可以理解为一种lazy性质的ManySender,当消费者主动调用next()的时候,才会产生值,本系列的讲述中也不会直接使用到,这里直接略过了,感兴趣的读者可以自行查阅相关文档。



八、总结


可以说几个关键性的cpo:


  • receiver cpos:

    set_value()、set_done()、set_error()


  • start()

  • connect()


再加上一系列的Concepts定义:


  • Receiver Concept

  • Sender Concept

  • OperationState Concept

  • Scheduler Concept


  • ...


将整个execution的执行框架勾勒出来了,使整个异步操作有了一个通用的基础框架,而Execution Context则是框架运行的基础,各种Algorithm实现则是细节,后续我们会分别介绍,带大家深入框架运行的基础以及实现的细节,更进一步的了解libunifex的实现。


参考资料:

1.libunifex源码库

2.Madokakaroto-浅谈The C++ Executors

3.Madokakaroto -The C++ Executors设计与演化的简史


 作者简介


沈芳

腾讯后台开发工程师

IEG研发效能部开发人员,毕业于华中科技大学。目前负责CrossEngine Server的开发工作,对GamePlay技术比较感兴趣。



 推荐阅读


万卷共知,一书一页总关情,TVP读书会带你突围阅读迷障!

C++异步变化:libunifex实现!

硬核!Redis知识总结,建议收藏

C++特殊定制:揭秘cpo与tag_invoke!



温馨提示:因公众号平台更改了推送规则,公众号推送的文章文末需要点一下“赞”“在看”,新的文章才会第一时间出现在你的订阅列表里噢~

good-icon 0
favorite-icon 0
收藏
回复数量: 0
    暂无评论~~
    Ctrl+Enter