Consurrencpp将并发任务的功能带到C ++世界,使开发人员可以使用任务,执行者和Coroutines轻松,安全地编写高度并发应用程序。通过使用Concurrencpp应用程序,可以分解需要将异步处理成同时运行并以合作方式工作以实现所需结果的较小任务。 Concurrencpp还允许应用程序使用并行的Coroutines轻松编写并行算法。
concurrencpp的主要优势是:
std::thread and std::mutex之类的低级原始码。co_await关键字,可以轻松地实现非阻滞,同步代码。executor APIthread_pool_executor APImanual_executor APIresult类型result APIlazy_result类型lazy_result apiresult_promise apiresult_promise示例shared_result apishared_result示例make_ready_resultmake_exceptional_resultwhen_allwhen_anyresume_ontimer_queue apitimer APIgenerator APIgenerator示例async_lock apiscoped_async_lock apiasync_lock示例async_condition_variable APIasync_condition_variable示例runtime APItask对象task API共伦普普围绕并发任务的概念构建。一项任务是异步操作。与传统中心的方法相比,任务为并发代码提供了更高的抽象水平。可以将任务束缚在一起,这意味着任务将其异步结果从一个转移到另一个任务,其中一个任务的结果就好像是另一个正在进行的任务的参数或中间值一样。任务使应用程序可以更好地利用可用的硬件资源,比使用原始线程更大,因为可以暂停任务,等待另一个任务以产生结果,而不会阻止基础的OS线程。任务通过允许他们更多地专注于业务逻辑,而更少地放在线程管理和线程间同步等低级概念上,从而为开发人员带来了更多的生产率。
任务指定必须执行哪些操作,而执行者是指定在何处以及如何执行任务的工作者对象。执行者备用应用程序繁琐的线程池和任务队列的管理。执行者还通过提供用于创建和调度任务的统一API将这些概念从应用程序代码中解脱出来。
任务使用结果对象相互通信。结果对象是一个异步管,将一个任务的异步结果传递给另一个正在进行的任务。结果可以等待和以非阻滞方式解决。
这三个概念 - 任务,执行者和相关的结果是concurrencpp的构建块。执行者通过通过结果对象发送结果来运行与彼此通信的任务。任务,执行者和结果对象在共生上共同起作用,以产生快速和干净的并发代码。
Concurrencpp是围绕RAII概念而建立的。为了使用任务和执行者,应用程序在main函数开头创建一个runtime实例。然后,运行时间用于获取现有执行者并注册新的用户定义的执行者。执行者被用来创建和安排运行的任务,他们可能会返回一个可用于将异步result传递给另一个用作消费者的任务的结果对象。当运行时破坏时,它会在每个存储的执行程序上迭代并调用其shutdown方法。然后,每个执行人都优雅地退出。计划外的任务被破坏了,创建新任务的尝试将引发例外。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::runtime runtime;
auto result = runtime. thread_executor ()-> submit ([] {
std::cout << " hello world " << std::endl;
});
result. get ();
return 0 ;
}在这个基本示例中,我们创建了一个运行时对象,然后从运行时获得了线程执行程序。我们使用submit将lambda作为我们给定的可笑。该lambda返回void ,因此,执行者返回一个result<void>对象,该对象将异步结果传递回呼叫者。 main呼叫get哪个会阻止主线程,直到结果准备就绪。如果没有例外, get返回void 。如果抛出了例外, get重新插入。异步, thread_executor启动了一个新的执行线程并运行给定的lambda。它隐含地co_return void并完成任务。然后, main被封锁。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < vector >
# include < algorithm >
# include < ctime >
using namespace concurrencpp ;
std::vector< int > make_random_vector () {
std::vector< int > vec ( 64 * 1'024 );
std::srand ( std::time ( nullptr ));
for ( auto & i : vec) {
i = :: rand ();
}
return vec;
}
result< size_t > count_even (std::shared_ptr<thread_pool_executor> tpe, const std::vector< int >& vector) {
const auto vecor_size = vector. size ();
const auto concurrency_level = tpe-> max_concurrency_level ();
const auto chunk_size = vecor_size / concurrency_level;
std::vector<result< size_t >> chunk_count;
for ( auto i = 0 ; i < concurrency_level; i++) {
const auto chunk_begin = i * chunk_size;
const auto chunk_end = chunk_begin + chunk_size;
auto result = tpe-> submit ([&vector, chunk_begin, chunk_end]() -> size_t {
return std::count_if (vector. begin () + chunk_begin, vector. begin () + chunk_end, []( auto i) {
return i % 2 == 0 ;
});
});
chunk_count. emplace_back ( std::move (result));
}
size_t total_count = 0 ;
for ( auto & result : chunk_count) {
total_count += co_await result;
}
co_return total_count;
}
int main () {
concurrencpp::runtime runtime;
const auto vector = make_random_vector ();
auto result = count_even (runtime. thread_pool_executor (), vector);
const auto total_count = result. get ();
std::cout << " there are " << total_count << " even numbers in the vector " << std::endl;
return 0 ;
}在此示例中,我们通过创建运行时对象来启动程序。我们创建一个充满随机数的向量,然后我们从运行时获取thread_pool_executor并调用count_even 。 count_even是一个coroutine,可以催生更多的任务,并供他们完成内部co_await 。 max_concurrency_level返回执行人支持的最大工人,在ThreadPool Executor案例中,工人数量是根据内核数量计算的。然后,我们对数组进行分区以匹配工人的数量,并发送每个要在其任务中处理的块。异步地,工人计算每个块包含多少个数字,并co_return结果。 count_even通过使用co_await提取计数来概括每个结果,然后最终结果是co_return ed。主线程被呼叫get封锁的主线程没有阻止,并且返回了总数。 MAIN打印均匀数字的数量,并且该程序优雅地终止。
每个大型或复杂的操作都可以分解为较小且可链的步骤。任务是实施这些计算步骤的异步操作。任务可以在执行人的帮助下在任何地方运行。虽然可以通过常规可可(例如函子和lambdas)创建任务,但任务主要用于旋律,这些任务允许平滑的悬架和恢复。在Concurrencpp中,任务概念由concurrencpp::task类表示。尽管任务概念是Concurrenpp的核心,但由于运行时创建和安排任务对象而没有外部帮助,因此很少需要创建和操纵任务对象。
Concurrencpp允许应用程序生产和消费Coroutines作为创建任务的主要方式。 Concurrencpp支持渴望和懒惰的任务。
急切的任务开始运行他们被调用的那一刻。当应用程序需要发射异步动作并在以后(稍后消防)上消耗其结果时,建议使用这种执行,或者完全忽略异步结果(火与忘记)。
急切的任务可以返回result或null_result 。 result返回类型告诉Coroutine通过返回的值或抛出的异常(稍后火和消耗),而null_result返回类型告诉Coroutine掉落并忽略了任何一个(火和忘记)。
急切的Coroutines可以在呼叫者线程中同步运行。这种共素被称为“常规珊瑚酸”。 Concurrencpp急切的Coroutines也可以开始并行运行,在给定的执行者内部,这种coroutines称为“并行coroutines”。
另一方面,懒惰任务仅在co_await ED时开始运行。当任务的结果应在创建任务后立即消费时,建议使用此类任务。延期延迟的懒惰任务对立即消耗的情况进行了更优化,因为它们不需要特殊的线程同步即可将异步结果传递回消费者。编译器还可能优化形成基础的Coroutine承诺所需的一些内存分配。不可能解雇懒惰的任务并执行其他事情 - 驱动懒惰的coroutine一定意味着暂停来电者coroutine。只有在懒惰的coroutine完成时,呼叫者的coroutine才会恢复。懒惰任务只能返回lazy_result 。
懒惰任务可以通过调用lazy_result::run来转换为急切的任务。此方法运行懒惰任务内联,并返回一个result对象,该对象可以监视新启动的任务。如果不确定开发人员使用哪种结果类型,则鼓励他们使用懒惰的结果,因为如果需要,可以将其转换为常规(急切)结果。
当函数返回lazy_result的任何一个, result或null_result ,并且在其体内至少包含一个co_await或co_return时,该函数是confurrencpp coroutine。每个有效的concurrencpp coroutine都是有效的任务。在上面的计数示例中, count_even就是这样的coroutine。我们首先催生了count_even ,然后在其中threadpool执行者催生了更多的子任务(是从常规可可创建的),最终使用co_await加入了这些任务。
concurrencpp opecutor是能够安排和运行任务的对象。执行者通过将其脱离应用程序代码来简化管理资源(例如线程,线程池和任务队列)的工作。执行者提供了一种统一的调度和执行任务方式,因为它们都扩展了concurrencpp::executor 。
executor API class executor {
/*
Initializes a new executor and gives it a name.
*/
executor (std::string_view name);
/*
Destroys this executor.
*/
virtual ~executor () noexcept = default ;
/*
The name of the executor, used for logging and debugging.
*/
const std::string name;
/*
Schedules a task to run in this executor.
Throws concurrencpp::errors::runtime_shutdown exception if shutdown was called before.
*/
virtual void enqueue (concurrencpp::task task) = 0;
/*
Schedules a range of tasks to run in this executor.
Throws concurrencpp::errors::runtime_shutdown exception if shutdown was called before.
*/
virtual void enqueue (std::span<concurrencpp::task> tasks) = 0;
/*
Returns the maximum count of real OS threads this executor supports.
The actual count of threads this executor is running might be smaller than this number.
returns numeric_limits<int>::max if the executor does not have a limit for OS threads.
*/
virtual int max_concurrency_level () const noexcept = 0;
/*
Returns true if shutdown was called before, false otherwise.
*/
virtual bool shutdown_requested () const noexcept = 0;
/*
Shuts down the executor:
- Tells underlying threads to exit their work loop and joins them.
- Destroys unexecuted coroutines.
- Makes subsequent calls to enqueue, post, submit, bulk_post and
bulk_submit to throw concurrencpp::errors::runtime_shutdown exception.
- Makes shutdown_requested return true.
*/
virtual void shutdown () noexcept = 0;
/*
Turns a callable and its arguments into a task object and
schedules it to run in this executor using enqueue.
Arguments are passed to the task by decaying them first.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type , class ... argument_types>
void post (callable_type&& callable, argument_types&& ... arguments);
/*
Like post, but returns a result object that passes the asynchronous result.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type , class ... argument_types>
result<type> submit (callable_type&& callable, argument_types&& ... arguments);
/*
Turns an array of callables into an array of tasks and
schedules them to run in this executor using enqueue.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type >
void bulk_post (std::span<callable_type> callable_list);
/*
Like bulk_post, but returns an array of result objects that passes the asynchronous results.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type >
std::vector<concurrencpp::result<type>> bulk_submit (std::span<callable_type> callable_list);
};如上所述,Concurrencpp提供常用的执行者。这些执行人类型是:
线程池执行器- 维护线程池的通用执行器。线程池执行程序适用于不阻止CPU的短任务。鼓励应用程序使用该执行程序作为非阻止任务的默认执行程序。 ConcurRencpp线程池提供动态的线程注入和动态工作平衡。
背景执行程序- 带有较大线程池的ThreadPool执行程序。适用于启动简短的阻止任务,例如文件IO和DB查询。重要说明:当消耗结果时,该执行人通过调用submit和bulk_submit返回,使用resume_on将执行切换到cpu结合的执行程序很重要,以防止在后台_executor内部处理CPU绑定的任务。
例子:
auto result = background_executor.submit([] { /* some blocking action */ });
auto done_result = co_await result.resolve();
co_await resume_on (some_cpu_executor);
auto val = co_await done_result; // runs inside some_cpu_executor线程执行程序- 启动每个重新任务以在新的执行线程上运行的执行程序。线程没有重复使用。该执行人适合长期运行的任务,例如运行工作循环的对象或长时间的阻止操作。
Worker线程执行程序- 维护单个任务队列的单个线程执行程序。适用于应用程序需要一个专用线程来执行许多相关任务。
手动执行人- 执行者不单独执行coroutines。应用程序代码可以通过手动调用其执行方法来执行先前重演的任务。
可衍生的执行人- 用户定义的执行者的基类。尽管可以直接从concurrencpp::executor继承,但derivable_executor使用CRTP模式为编译器提供了一些优化机会。
内联执行程序- 主要用于覆盖其他执行者的行为。起诉一项任务等同于将其串联调用。
执行人的裸露机制被封装在其enqueue方法中。此方法引起了执行任务,并具有两个重载:一个过载作为参数接收单个任务对象,而另一个接收任务对象的跨度。第二个过载用于加入一批任务。这样可以更好地安排启发式方法和降低争论。
应用程序不必单独依靠enqueue , concurrencpp::executor提供了一个用于安排用户可可的API,通过将其转换为幕后任务对象。应用程序可以请求执行者返回通过提供的可challable的异步结果的结果对象。这是通过致电executor::submit and executor::bulk_submit来完成的。 submit获取可召唤,并返回结果对象。 executor::bulk_submit获得了一定span的可呼叫,并以类似的方式submit结果对象的vector 。在许多情况下,应用程序对异步值或异常不感兴趣。在这种情况下,应用程序可以使用executor:::post and executor::bulk_post安排可召唤或要执行的可呼叫的span ,但也告诉任务丢弃任何返回的值或投掷异常。不传递异步结果比通过的速度快,但是我们无法知道正在进行的任务的状态或结果。
post , bulk_post , submit和bulk_submit在幕后使用基础调度enqueue 。
thread_pool_executor API除了post , submit , bulk_post和bulk_submit , thread_pool_executor提供了这些其他方法。
class thread_pool_executor {
/*
Returns the number of milliseconds each thread-pool worker
remains idle (lacks any task to execute) before exiting.
This constant can be set by passing a runtime_options object
to the constructor of the runtime class.
*/
std::chrono::milliseconds max_worker_idle_time () const noexcept ;
};manual_executor API除了post , submit , bulk_post和bulk_submit外, manual_executor _executor提供了这些其他方法。
class manual_executor {
/*
Destructor. Equivalent to clear.
*/
~manual_executor () noexcept ;
/*
Returns the number of enqueued tasks at the moment of invocation.
This number can change quickly by the time the application handles it, it should be used as a hint.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
size_t size () const noexcept ;
/*
Queries whether the executor is empty from tasks at the moment of invocation.
This value can change quickly by the time the application handles it, it should be used as a hint.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
bool empty () const noexcept ;
/*
Clears the executor from any enqueued but yet to-be-executed tasks,
and returns the number of cleared tasks.
Tasks enqueued to this executor by (post_)submit method are resumed
and errors::broken_task exception is thrown inside them.
Ongoing tasks that are being executed by loop_once(_XXX) or loop(_XXX) are uneffected.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t clear ();
/*
Tries to execute a single task. If at the moment of invocation the executor
is empty, the method does nothing.
Returns true if a task was executed, false otherwise.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool loop_once ();
/*
Tries to execute a single task.
This method returns when either a task was executed or max_waiting_time
(in milliseconds) has reached.
If max_waiting_time is 0, the method is equivalent to loop_once.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool loop_once_for (std::chrono::milliseconds max_waiting_time);
/*
Tries to execute a single task.
This method returns when either a task was executed or timeout_time has reached.
If timeout_time has already expired, this method is equivalent to loop_once.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
bool loop_once_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Tries to execute max_count enqueued tasks and returns the number of tasks that were executed.
This method does not wait: it returns when the executor
becomes empty from tasks or max_count tasks have been executed.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t loop ( size_t max_count);
/*
Tries to execute max_count tasks.
This method returns when either max_count tasks were executed or a
total amount of max_waiting_time has passed.
If max_waiting_time is 0, the method is equivalent to loop.
Returns the actual amount of tasks that were executed.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t loop_for ( size_t max_count, std::chrono::milliseconds max_waiting_time);
/*
Tries to execute max_count tasks.
This method returns when either max_count tasks were executed or timeout_time has reached.
If timeout_time has already expired, the method is equivalent to loop.
Returns the actual amount of tasks that were executed.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
size_t loop_until ( size_t max_count, std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Waits for at least one task to be available for execution.
This method should be used as a hint,
as other threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
void wait_for_task ();
/*
This method returns when one or more tasks are available for
execution or max_waiting_time has passed.
Returns true if at at least one task is available for execution, false otherwise.
This method should be used as a hint, as other threads (calling loop, for example)
might empty the executor, before this thread has a chance to do something
with the newly enqueued tasks.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool wait_for_task_for (std::chrono::milliseconds max_waiting_time);
/*
This method returns when one or more tasks are available for execution or timeout_time has reached.
Returns true if at at least one task is available for execution, false otherwise.
This method should be used as a hint,
as other threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
bool wait_for_task_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
This method returns when max_count or more tasks are available for execution.
This method should be used as a hint, as other threads
(calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
void wait_for_tasks ( size_t max_count);
/*
This method returns when max_count or more tasks are available for execution
or max_waiting_time (in milliseconds) has passed.
Returns the number of tasks available for execution when the method returns.
This method should be used as a hint, as other
threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t wait_for_tasks_for ( size_t count, std::chrono::milliseconds max_waiting_time);
/*
This method returns when max_count or more tasks are available for execution
or timeout_time is reached.
Returns the number of tasks available for execution when the method returns.
This method should be used as a hint, as other threads
(calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
size_t wait_for_tasks_until ( size_t count, std::chrono::time_point<clock_type, duration_type> timeout_time);
};可以使用Concurrencpp结果对象消耗异步值和异常。 result类型代表了急切的任务的异步结果,而lazy_result表示懒惰任务的延期结果。
当任务(急切或懒惰)完成时,它要么返回有效的值,要么抛出异常。无论哪种情况,这种异步结果都传递给结果对象的消费者。
result对象形成不对称的coroutines-执行呼叫者coroutine的执行不是由callee-coroutine执行而实现的,两个coroutines均可独立运行。只有在消耗callee-coroutine的结果时,呼叫者核心可能会被暂停等待Callee完成。直到那时,两个共核能独立运行。 Callee-coroutine是否会消耗其结果。
lazy_result对象形式形式对称旋律 - callee-coroutine的执行仅在呼叫者-Coroutine悬挂后才发生。当等待懒惰的结果时,当前的Coroutine被暂停,并且与懒惰结果相关的懒惰任务开始运行。在Callee-Coroutine完成并产生结果后,恢复了呼叫者-Coroutine。如果不消耗懒惰的结果,其相关的懒惰任务永远不会开始运行。
所有结果对象都是仅移动的类型,因此,将其内容移至另一个结果对象后,它们无法使用。在这种情况下,结果对象被认为是空的,并且试图调用operator bool和operator =以外的任何方法都会引发异常。
在将异步结果从结果对象中抽出(例如,通过调用get或operator co_await )将结果对象变为空。可以用operator bool测试空虚。
等待结果意味着暂停当前的coroutine,直到结果对象准备就绪。如果从关联的任务返回有效值,则将其从结果对象返回。如果关联的任务引发了异常,则将重新投入。在等待的那一刻,如果结果已经准备好,那么当前的Coroutine会立即恢复。否则,通过设置异步结果或异常的线程恢复它。
解决结果类似于等待它。不同之处在于, co_await表达式将以无空状态以非空形式返回结果对象本身。然后可以使用get或co_await取消异步结果。
每个结果对象都具有指示异步结果状态的状态。结果状态与result_status::idle (尚未产生异步结果或异常)变化为result_status::value (关联的任务(通过返回有效值返回有效值)为result_status::exception (通过投掷例外终止的任务终止))。可以通过调用(lazy_)result::status 。
result类型result类型代表了正在进行的,异步任务的结果,类似于std::future 。
除了等待和解决结果对象result::get ,还可以通过调用result::wait_for result::wait result::wait_until等待它们:等待结果完成的是阻止操作(在情况下,异步结果尚未准备就绪),并且将暂停整个执行线程,等待异步结果可用。等待操作通常是劝阻的,并且仅在根级任务或允许其允许的上下文中允许,例如阻止主线程等待应用程序的其余部分优雅地完成,或使用concurrencpp::blocking_executor或concurrencpp::thread_executor 。
通过使用co_await等待结果对象(通过这样做,将当前功能/任务也转换为coroutine)是消耗结果对象的首选方法,因为它不会阻止基础线程。
result API class result {
/*
Creates an empty result that isn't associated with any task.
*/
result () noexcept = default ;
/*
Destroys the result. Associated tasks are not cancelled.
The destructor does not block waiting for the asynchronous result to become ready.
*/
~result () noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result (result&& rhs) noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.
*/
result& operator = (result&& rhs) noexcept = default ;
/*
Returns true if this is a non-empty result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The returned value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Blocks the current thread of execution until this result is ready,
when status() != result_status::idle.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void wait ();
/*
Blocks until this result is ready or duration has passed. Returns the status
of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class duration_unit , class ratio >
result_status wait_for (std::chrono::duration<duration_unit, ratio> duration);
/*
Blocks until this result is ready or timeout_time has reached. Returns the status
of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class clock , class duration >
result_status wait_until (std::chrono::time_point<clock, duration> timeout_time);
/*
Blocks the current thread of execution until this result is ready,
when status() != result_status::idle.
If the result is a valid value, it is returned, otherwise, get rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
type get ();
/*
Returns an awaitable used to await this result.
If the result is already ready - the current coroutine resumes
immediately in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended
and resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to resolve this result.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
};lazy_result类型懒惰结果对象表示递延懒惰任务的结果。
lazy_result负责启动相关的懒惰任务并将其递延结果转交给消费者。当等待或解决时,懒惰的结果暂停了当前的coroutine,并启动了相关的懒惰任务。当关联的任务完成时,其异步值将传递给呼叫者任务,然后恢复。
有时,API可能会返回懒惰的结果,但是应用程序需要其关联的任务才能急切地运行(而无需暂停呼叫者任务)。在这种情况下,可以通过在其关联的懒惰结果上调用run来将懒惰任务转换为急切的任务。在这种情况下,关联的任务将开始在内联行动,而无需暂停呼叫者任务。原始的懒惰结果被清空,并将返回新启动任务的有效result对象。
lazy_result api class lazy_result {
/*
Creates an empty lazy result that isn't associated with any task.
*/
lazy_result () noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
lazy_result (lazy_result&& rhs) noexcept ;
/*
Destroys the result. If not empty, the destructor destroys the associated task without resuming it.
*/
~lazy_result () noexcept ;
/*
Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.
If *this is not empty, then operator= destroys the associated task without resuming it.
*/
lazy_result& operator =(lazy_result&& rhs) noexcept ;
/*
Returns true if this is a non-empty result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The returned value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Returns an awaitable used to start the associated task and await this result.
If the result is already ready - the current coroutine resumes immediately
in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended and
resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to start the associated task and resolve this result.
If the result is already ready - the current coroutine resumes immediately
in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended and resumed
when the asynchronous result is ready, by the thread which
had set the asynchronous value or exception.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
/*
Runs the associated task inline and returns a result object that monitors the newly started task.
After this call, *this is empty.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
*/
result<type> run ();
};常规的急切coroutines在执行的调用线程中开始同步运行。如果Coroutine经历重新安排,则执行可能会转移到另一个执行线程,例如,通过等待其内部未准备就绪的结果对象。 concurrencpp还提供并行的coroutines,它们开始在给定的执行者内部运行,而不是在调用执行线程中运行。当编写使用叉-Join模型的并发算法时,这种调度旋ou风格在编写平行算法,递归算法和并发算法时特别有用。
每个平行的Coroutine都必须符合以下先决条件:
result / null_result 。executor_tag作为其第一个参数。type* / type& / std::shared_ptr<type> ,其中type是executor类别的第二个参数。co_await或co_return 。如果上述所有应用程序都适用,则该功能是一个并行的coroutine:concurrencpp将启动coroutine吊销,并立即重新安排其在提供的执行者中运行。 concurrencpp::executor_tag是一个虚拟占位符,可以告诉concurrencpp运行时,此函数不是常规函数,它需要在给定的执行程序内开始运行。如果执行人传递给并行的coroutine为null,则Coroutine将不会开始运行,并且std::invalid_argument异常将同步投掷。如果满足所有前提条件,则应用程序可以使用返回的结果对象消耗并行coroutine的结果。
在此示例中,我们以平行方式计算斐波那契序列的第30个成员。我们开始以自己的并行coroutine启动每个斐波那契。第一个参数是虚拟executor_tag ,第二个参数是threadpool executor。每个递归步骤都会调用一个并行运行的新的并行式Coroutine。每个结果都co_return于其父任务,并通过使用co_await获得。
当我们认为输入足够小以同步计算(当curr <= 10 )时,我们停止在自己的任务中执行每个递归步骤,而只是同步求解算法。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace concurrencpp ;
int fibonacci_sync ( int i) {
if (i == 0 ) {
return 0 ;
}
if (i == 1 ) {
return 1 ;
}
return fibonacci_sync (i - 1 ) + fibonacci_sync (i - 2 );
}
result< int > fibonacci (executor_tag, std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10 ) {
co_return fibonacci_sync (curr);
}
auto fib_1 = fibonacci ({}, tpe, curr - 1 );
auto fib_2 = fibonacci ({}, tpe, curr - 2 );
co_return co_await fib_1 + co_await fib_2;
}
int main () {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci ({}, runtime. thread_pool_executor (), 30 ). get ();
std::cout << " fibonacci(30) = " << fibb_30 << std::endl;
return 0 ;
}为了比较,这是在不使用并行coroutines的情况下编写相同代码的方式,并依靠executor::submit 。由于fibonacci返回result<int> ,因此通过executor::submit将result<result<int>> 。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace concurrencpp ;
int fibonacci_sync ( int i) {
if (i == 0 ) {
return 0 ;
}
if (i == 1 ) {
return 1 ;
}
return fibonacci_sync (i - 1 ) + fibonacci_sync (i - 2 );
}
result< int > fibonacci (std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10 ) {
co_return fibonacci_sync (curr);
}
auto fib_1 = tpe-> submit (fibonacci, tpe, curr - 1 );
auto fib_2 = tpe-> submit (fibonacci, tpe, curr - 2 );
co_return co_await co_await fib_1 +
co_await co_await fib_2;
}
int main () {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci (runtime. thread_pool_executor (), 30 ). get ();
std::cout << " fibonacci(30) = " << fibb_30 << std::endl;
return 0 ;
}结果对象是在Concurrencpp中传递数据之间数据的主要方法,我们已经看到了执行者和Coroutines如何产生此类对象。有时,我们想使用带有非任务的结果对象的功能,例如使用第三方库时。在这种情况下,我们可以使用result_promise完成结果对象。 result_promise类似于std::promise对象 - 应用程序可以手动设置异步结果或异常,并使关联的result对象准备就绪。
就像结果对象一样,结果促销是移动后仅移动类型。同样,设置结果或例外后,结果承诺也变为空。如果结果促销脱离范围并且没有设置结果/异常,则使用set_exception方法将结果促销驱动器设置一个concurrencpp::errors::broken_task异常。暂停和阻塞的任务等待关联的结果对象被恢复/未阻止。
结果承诺可以将代码的回调样式转换为async/await代码样式:每当组件需要回调以传递异步结果时,我们可以通过调用set_result或set_exception或set_exception (取决于异步结果本身)的回调,并返回相关结果。
result_promise api template < class type >
class result_promise {
/*
Constructs a valid result_promise.
Might throw std::bad_alloc if fails to allocate memory.
*/
result_promise ();
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result_promise (result_promise&& rhs) noexcept ;
/*
Destroys *this, possibly setting an errors::broken_task exception
by calling set_exception if *this is not empty at the time of destruction.
*/
~result_promise () noexcept ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result_promise& operator = (result_promise&& rhs) noexcept ;
/*
Returns true if this is a non-empty result-promise.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Sets a value by constructing <<type>> from arguments... in-place.
Makes the associated result object become ready - tasks waiting for it
to become ready are unblocked.
Suspended tasks are resumed inline.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Might throw any exception that the constructor
of type(std::forward<argument_types>(arguments)...) throws.
*/
template < class ... argument_types>
void set_result (argument_types&& ... arguments);
/*
Sets an exception.
Makes the associated result object become ready - tasks waiting for it
to become ready are unblocked.
Suspended tasks are resumed inline.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Throws std::invalid_argument exception if exception_ptr is null.
*/
void set_exception (std::exception_ptr exception_ptr);
/*
A convenience method that invokes a callable with arguments... and calls set_result
with the result of the invocation.
If an exception is thrown, the thrown exception is caught and set instead by calling set_exception.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Might throw any exception that callable(std::forward<argument_types>(arguments)...)
or the contructor of type(type&&) throw.
*/
template < class callable_type , class ... argument_types>
void set_from_function (callable_type&& callable, argument_types&& ... arguments);
/*
Gets the associated result object.
Throws errors::empty_result_promise exception If *this is empty.
Throws errors::result_already_retrieved exception if this method had been called before.
*/
result<type> get_result ();
};result_promise示例:在此示例中, result_promise用于从一个线程中推动数据,并且可以从另一个线程中将其从其关联的result对象中提取。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::result_promise<std::string> promise;
auto result = promise. get_result ();
std::thread my_3_party_executor ([promise = std::move (promise)] () mutable {
std::this_thread::sleep_for ( std::chrono::seconds ( 1 )); // Imitate real work
promise. set_result ( " hello world " );
});
auto asynchronous_string = result. get ();
std::cout << " result promise returned string: " << asynchronous_string << std::endl;
my_3_party_executor. join ();
}在此示例中,我们将std::thread用作第三方执行者。这代表了一种场景,当将非concurrencpp遗嘱执行人用作应用程序生命周期的一部分时。在通过承诺并阻止主线程之前,我们提取结果对象,直到结果准备就绪。在my_3_party_executor中,我们设置了一个结果,就好像我们co_return为ed。
共享结果是一种特殊的结果对象,允许多个消费者访问类似于std::shared_future异步结果。来自不同线程的不同消费者可以调用await ,以线程安全的方式get和resolve功能。
共享结果是由常规结果对象构建的,与常规结果对象不同,它们都是可复制的和可移动的。因此, shared_result行为类似于std::shared_ptr类型。如果共享结果实例移至另一个实例,则该实例变为空,并尝试访问它将引发异常。
为了支持多个消费者,共享结果返回对异步值的引用,而不是移动它(如常规结果)。例如, shared_result<int>返回int&何时get , await等等。如果shared_result的基本类型是void或参考类型(例如int& ),则它们会像往常一样返回。如果异步结果是抛出的结果,则重新启动。
请注意,在使用来自多个线程的shared_result _Result获取异步结果的情况下是线程安全时,实际值可能不是线程安全的。例如,多个线程可以通过接收其参考( int& )来获取异步整数。它不会使整数本身安全。如果异步值已经安全,则可以突变异步值。另外,鼓励应用程序使用const类型以(例如const int )开始,并获取恒定引用(例如const int& )防止突变。
shared_result api class share_result {
/*
Creates an empty shared-result that isn't associated with any task.
*/
shared_result () noexcept = default ;
/*
Destroys the shared-result. Associated tasks are not cancelled.
The destructor does not block waiting for the asynchronous result to become ready.
*/
~shared_result () noexcept = default ;
/*
Converts a regular result object to a shared-result object.
After this call, rhs is empty.
Might throw std::bad_alloc if fails to allocate memory.
*/
shared_result (result<type> rhs);
/*
Copy constructor. Creates a copy of the shared result object that monitors the same task.
*/
shared_result ( const shared_result&) noexcept = default ;
/*
Move constructor. Moves rhs to *this. After this call, rhs is empty.
*/
shared_result (shared_result&& rhs) noexcept = default ;
/*
Copy assignment operator. Copies rhs to *this and monitors the same task that rhs monitors.
*/
shared_result& operator =( const shared_result& rhs) noexcept ;
/*
Move assignment operator. Moves rhs to *this. After this call, rhs is empty.
*/
shared_result& operator =(shared_result&& rhs) noexcept ;
/*
Returns true if this is a non-empty shared-result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The return value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Blocks the current thread of execution until this shared-result is ready,
when status() != result_status::idle.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void wait ();
/*
Blocks until this shared-result is ready or duration has passed.
Returns the status of this shared-result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class duration_type , class ratio_type >
result_status wait_for (std::chrono::duration<duration_type, ratio_type> duration);
/*
Blocks until this shared-result is ready or timeout_time has reached.
Returns the status of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class clock_type , class duration_type >
result_status wait_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Blocks the current thread of execution until this shared-result is ready,
when status() != result_status::idle.
If the result is a valid value, a reference to it is returned,
otherwise, get rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
std:: add_lvalue_reference_t <type> get ();
/*
Returns an awaitable used to await this shared-result.
If the shared-result is already ready - the current coroutine resumes
immediately in the calling thread of execution.
If the shared-result is not ready yet, the current coroutine is
suspended and resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, a reference to it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to resolve this shared-result.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
};shared_result示例:在此示例中, result对象将转换为shared_result对象,并引用了与thread_executor催生的许多任务一起对异步int结果的引用。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < chrono >
concurrencpp::result< void > consume_shared_result (concurrencpp::shared_result< int > shared_result,
std::shared_ptr<concurrencpp::executor> resume_executor) {
std::cout << " Awaiting shared_result to have a value " << std::endl;
const auto & async_value = co_await shared_result;
concurrencpp::resume_on (resume_executor);
std::cout << " In thread id " << std::this_thread::get_id () << " , got: " << async_value << " , memory address: " << &async_value << std::endl;
}
int main () {
concurrencpp::runtime runtime;
auto result = runtime. background_executor ()-> submit ([] {
std::this_thread::sleep_for ( std::chrono::seconds ( 1 ));
return 100 ;
});
concurrencpp::shared_result< int > shared_result ( std::move (result));
concurrencpp::result< void > results[ 8 ];
for ( size_t i = 0 ; i < 8 ; i++) {
results[i] = consume_shared_result (shared_result, runtime. thread_pool_executor ());
}
std::cout << " Main thread waiting for all consumers to finish " << std::endl;
auto tpe = runtime. thread_pool_executor ();
auto all_consumed = concurrencpp::when_all (tpe, std::begin (results), std::end (results)). run ();
all_consumed. get ();
std::cout << " All consumers are done, exiting " << std::endl;
return 0 ;
}当运行时对象脱离main范围时,它会迭代每个存储的执行程序并调用其shutdown方法。尝试访问计时器或任何执行人都会丢弃errors::runtime_shutdown异常。当执行人关闭时,它会清除其内部任务队列,破坏未执行的task对象。如果任务对象存储一个confurencpp-coroutine,则将该coroutine恢复为嵌入线,并且在其中抛出了一个errors::broken_task异常。在抛出runtime_shutdown或broken_task异常的任何情况下,应用程序应尽快优雅地终止其当前代码流。这些例外不应被忽略。 runtime_shutdown和broken_task都从errors::interrupted_task基类继承,并且此类型也可以在catch子句中使用以以统一的方式处理终止。
许多Concurrencpp异步动作都需要执行人作为简历执行人的实例。当异步操作(以coroutine的形式实施)可以同步完成时,它会立即恢复执行的调用线程。如果异步动作无法同步完成,则在给定的简历执行者内部将恢复它。例如, when_any实用程序函数需要简历 - 执行的实例作为其第一个参数。 when_any返回一个lazy_result时,当至少一个给定的结果准备就绪时就准备就绪。如果在when_any时已经准备好了一个结果之一,则在执行的调用线程中同时恢复调用coroutine。如果没有,至少在结果完成后,将在给定的简历执行中恢复调用Coroutine。简历执行者很重要,因为在不清楚应该恢复coroutine的情况下,恢复了coroutines的位置(例如,在when_any和when_all时),或者如果在confurrencpp工人中处理异步行动的情况下,仅处理该特定操作代码,而不是处理该特定的操作代码。
make_ready_result函数make_ready_result从给定参数创建一个就绪结果对象。等待此结果将导致当前的Coroutine立即恢复。 get和operator co_await将返回构造的值。
/*
Creates a ready result object by building <<type>> from arguments&&... in-place.
Might throw any exception that the constructor
of type(std::forward<argument_types>(arguments)...) throws.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type , class ... argument_types>
result<type> make_ready_result (argument_types&& ... arguments);
/*
An overload for void type.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
result< void > make_ready_result ();make_exceptional_result函数make_exceptional_result从给定的异常中创建一个就绪结果对象。等待此结果将导致当前的Coroutine立即恢复。 get和operator co_await将重新启动给定的例外。
/*
Creates a ready result object from an exception pointer.
The returned result object will re-throw exception_ptr when calling get or await.
Throws std::invalid_argument if exception_ptr is null.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type >
result<type> make_exceptional_result (std::exception_ptr exception_ptr);
/*
Overload. Similar to make_exceptional_result(std::exception_ptr),
but gets an exception object directly.
Might throw any exception that the constructor of exception_type(std::move(exception)) might throw.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type , class exception_type >
result<type> make_exceptional_result (exception_type exception );when_all函数时when_all是一个实用程序函数时,可以创建一个懒惰的结果对象,该对象在完成所有输入结果时就准备就绪。等待这个懒惰的结果将返回已准备就绪状态的所有输入物质对象。
when_all函数带有三种口味时 - 一种接受异质范围的结果对象范围,另一个将一对迭代器带到相同类型的一系列结果对象,最后是一个根本不接受结果对象的过载。如果没有输入结果对象 - 该函数返回空元组的就绪结果对象。
如果通过的结果对象之一为空,则将抛出一个例外。在这种情况下,输入值不受函数的影响,可以在处理异常后再次使用。如果所有输入结果对象都是有效的,则将它们通过此功能清空,并以有效且就绪状态返回作为输出结果。
当前, when_all仅接受result对象时。
所有过载都接受简历执行程序作为其第一个参数。当等待when_all返回的结果时,给定的简历执行人将恢复呼叫者Coroutine。
/*
Creates a result object that becomes ready when all the input results become ready.
Passed result objects are emptied and returned as a tuple.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class ... result_types>
lazy_result<std::tuple< typename std::decay<result_types>::type...>>
when_all (std::shared_ptr<executor_type> resume_executor,
result_types&& ... results);
/*
Overload. Similar to when_all(result_types&& ...) but receives a pair of iterators referencing a range.
Passed result objects are emptied and returned as a vector.
If begin == end, the function returns immediately with an empty vector.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class iterator_type >
lazy_result<std::vector< typename std::iterator_traits<iterator_type>::value_type>>
when_all (std::shared_ptr<executor_type> resume_executor,
iterator_type begin, iterator_type end);
/*
Overload. Returns a ready result object that doesn't monitor any asynchronous result.
Might throw an std::bad_alloc exception if no memory is available.
*/
lazy_result<std::tuple<>> when_all (std::shared_ptr<executor_type> resume_executor);when_any函数时when_any是一个实用程序函数时,可以创建一个懒惰的结果对象,该对象至少完成一个输入结果时就可以准备就绪。等待此结果将返回一个辅助结构,其中包含所有输入重点对象以及完成任务的索引。可能是在食用准备成果时,其他结果可能已经异步完成。 when_any反复以消耗现成的结果时,申请可以在所有结果均消耗之前就可以拨打。
when_any函数仅带有两种口味时 - 一种接受异质范围的结果对象范围,而另一个则将一对迭代器带到相同类型的一系列结果对象。与when_all不同,当结果范围完全为空时,等待至少一项任务即可完成一项任务没有意义。因此,没有没有参数的超载。同样,如果这些迭代器引用空范围( begin == end ),则两个迭代器的过载将引发异常。
如果通过的结果对象之一为空,则将抛出一个例外。在任何情况下,都会抛出异常,输入星期对象不受该函数的影响,并且可以在处理异常后再次使用。如果所有输入结果对象都是有效的,则将它们用此函数清空,并以有效状态返回作为输出结果。
当前, when_any仅接受result对象时。
所有过载都接受简历执行程序作为其第一个参数。当等待when_any返回的结果时,给定的简历执行人将恢复呼叫者Coroutine。
/*
Helper struct returned from when_any.
index is the position of the ready result in results sequence.
results is either an std::tuple or an std::vector of the results that were passed to when_any.
*/
template < class sequence_type >
struct when_any_result {
std:: size_t index;
sequence_type results;
};
/*
Creates a result object that becomes ready when at least one of the input results is ready.
Passed result objects are emptied and returned as a tuple.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class ... result_types>
lazy_result<when_any_result<std::tuple<result_types...>>>
when_any (std::shared_ptr<executor_type> resume_executor,
result_types&& ... results);
/*
Overload. Similar to when_any(result_types&& ...) but receives a pair of iterators referencing a range.
Passed result objects are emptied and returned as a vector.
Throws std::invalid_argument if begin == end.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class iterator_type >
lazy_result<when_any_result<std::vector< typename std::iterator_traits<iterator_type>::value_type>>>
when_any (std::shared_ptr<executor_type> resume_executor,
iterator_type begin, iterator_type end);resume_on函数resume_on返回了一个等待的,该期待暂停了当前的coroutine并将其恢复给定executor 。这是一个重要的功能,可确保coroutine在正确的执行者中运行。例如,应用程序可能会使用background_executor安排背景任务,并等待返回的结果对象。在这种情况下,等待的Coroutine将在后台执行者内恢复。与另一个CPU结合的执行程序resume_on的呼叫可确保一旦后台任务完成后,CPU结合的代码不会在后台执行程序上运行。如果将任务重新安排在使用resume_on上的另一个执行程序上运行,但是该执行程序在可以恢复暂停任务之前被关闭,则立即恢复该任务,并抛出一个erros::broken_task异常。在这种情况下,应用程序需要非常优雅。
/*
Returns an awaitable that suspends the current coroutine and resumes it inside executor.
Might throw any exception that executor_type::enqueue throws.
*/
template < class executor_type >
auto resume_on (std::shared_ptr<executor_type> executor);Concurrencpp还提供计时器和计时器队列。计时器是在定义明确的时间间隔内定义在执行器上运行的异步操作的对象。有三种类型的计时器 -常规计时器,启示式计时器和延迟对象。
常规计时器有四个定义它们的属性:
像concurrencpp中的其他对象一样,计时器是一个可以空的移动类型。当调用计时器或timer::cancel时,计时器会取消其计划,但尚未执行任务。正在进行的任务未有效果。计时器可呼叫必须安全。建议将适当的时间和计时器的频率设置为50毫秒的粒度。
计时器队列是一个confurencpp的工人,它管理了一系列计时器,并仅以一个执行线程处理它们。它也是用于创建新计时器的代理。当计时器截止日期(无论是计时器的适当时间还是频率)到达时,计时器队列通过安排其可呼叫以在关联的执行器上运行的任务来“触发”计时器。
就像执行者一样,计时器队列也遵守RAII概念。当运行时对象脱离范围时,它将关闭计时器队列,取消所有待处理的计时器。关闭计时器队列后,任何后续呼叫to make_timer , make_onshot_timer和make_delay_object都会抛出一个errors::runtime_shutdown exception。应用程序不得试图自己关闭计时器队列。
timer_queue api: class timer_queue {
/*
Destroys this timer_queue.
*/
~timer_queue () noexcept ;
/*
Shuts down this timer_queue:
Tells the underlying thread of execution to quit and joins it.
Cancels all pending timers.
After this call, invocation of any method besides shutdown
and shutdown_requested will throw an errors::runtime_shutdown.
If shutdown had been called before, this method has no effect.
*/
void shutdown () noexcept ;
/*
Returns true if shutdown had been called before, false otherwise.
*/
bool shutdown_requested () const noexcept ;
/*
Creates a new running timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
template < class callable_type , class ... argumet_types>
timer make_timer (
std::chrono::milliseconds due_time,
std::chrono::milliseconds frequency,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new one-shot timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
template < class callable_type , class ... argumet_types>
timer make_one_shot_timer (
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new delay object where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
result< void > make_delay_object (
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor);
};timer API: class timer {
/*
Creates an empty timer.
*/
timer () noexcept = default ;
/*
Cancels the timer, if not empty.
*/
~timer () noexcept ;
/*
Moves the content of rhs to *this.
rhs is empty after this call.
*/
timer (timer&& rhs) noexcept = default ;
/*
Moves the content of rhs to *this.
rhs is empty after this call.
Returns *this.
*/
timer& operator = (timer&& rhs) noexcept ;
/*
Cancels this timer.
After this call, the associated timer_queue will not schedule *this
to run again and *this becomes empty.
Scheduled, but not yet executed tasks are cancelled.
Ongoing tasks are uneffected.
This method has no effect if *this is empty or the associated timer_queue has already expired.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void cancel ();
/*
Returns the associated executor of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::shared_ptr<executor> get_executor () const ;
/*
Returns the associated timer_queue of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::weak_ptr<timer_queue> get_timer_queue () const ;
/*
Returns the due time of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::chrono::milliseconds get_due_time () const ;
/*
Returns the frequency of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::chrono::milliseconds get_frequency () const ;
/*
Sets new frequency for this timer.
Callables already scheduled to run at the time of invocation are not affected.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
void set_frequency (std::chrono::milliseconds new_frequency);
/*
Returns true is *this is not an empty timer, false otherwise.
The timer should not be used if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
};在此示例中,我们使用计时器队列创建一个常规计时器。计时器安排其可呼叫以在1.5秒后运行,然后每2秒发射一次可可。给定的可呼叫运行在ThreadPool执行程序上。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
int main () {
concurrencpp::runtime runtime;
std:: atomic_size_t counter = 1 ;
concurrencpp::timer timer = runtime. timer_queue ()-> make_timer (
1500ms,
2000ms,
runtime. thread_pool_executor (),
[&] {
const auto c = counter. fetch_add ( 1 );
std::cout << " timer was invoked for the " << c << " th time " << std::endl;
});
std::this_thread::sleep_for (12s);
return 0 ;
}OneShot计时器是一个一次性计时器,只有适当的时间 - 在安排其可召唤的可召唤之后,一旦它从未重新安排它再次运行。
在此示例中,我们创建一个仅运行一次的计时器 - 从其创建后3秒钟后,计时器将安排其可呼叫以在新的执行线程上运行(使用thread_executor )。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
int main () {
concurrencpp::runtime runtime;
concurrencpp::timer timer = runtime. timer_queue ()-> make_one_shot_timer (
3000ms,
runtime. thread_executor (),
[&] {
std::cout << " hello and goodbye " << std::endl;
});
std::this_thread::sleep_for (4s);
return 0 ;
}延迟对象是一个懒惰的结果对象,当它是co_await ed时就可以准备就绪的对象,并且到达了适当的时间。应用程序可以co_await此结果对象,以以非阻滞方式延迟当前的Coroutine。当前的coroutine由传递给make_delay_object执行人恢复。
在此示例中,我们产生了一个任务(不会返回任何结果或投掷异常),该任务通过在延迟对象上调用co_await来延迟循环。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
concurrencpp::null_result delayed_task (
std::shared_ptr<concurrencpp::timer_queue> tq,
std::shared_ptr<concurrencpp::thread_pool_executor> ex) {
size_t counter = 1 ;
while ( true ) {
std::cout << " task was invoked " << counter << " times. " << std::endl;
counter++;
co_await tq-> make_delay_object (1500ms, ex);
}
}
int main () {
concurrencpp::runtime runtime;
delayed_task (runtime. timer_queue (), runtime. thread_pool_executor ());
std::this_thread::sleep_for (10s);
return 0 ;
}发电机是一种懒惰的同步Coroutine,能够产生以消耗的值。发电机使用co_yield关键字将值回到消费者中。
发电机是同步使用的 - 他们只能使用co_yield关键字,并且不得使用co_await关键字。只要调用co_yield关键字,生成器将继续产生值。如果co_return关键字被调用(明确或隐式),则生成器将停止产生值。同样,如果抛出异常,则发电机将停止产生值,并且抛出的异常将被重新提交给发电机的消费者。
发电机应在range-for中使用:发电机隐式产生两个迭代器 - begin和end ,以控制for循环的执行。这些迭代器不应手动处理或访问。
当创建发电机时,它是从懒惰任务开始的。当调用其begin方法时,首次恢复发电机并返回迭代器。通过在返回的迭代器上调用operator++ ,可以重复恢复懒惰的任务。当发电机通过优雅地退出或抛出异常来完成执行时,返回的迭代器将等于end迭代器。如前所述,这是通过循环和发电机的内部机制在幕后发生的,不应直接调用。
像Concurrencpp中的其他对象一样,发电机是仅动作的类型。移动发电机后,它被认为是空的,并且试图访问其内部方法( operator bool以外)将引发异常。发电机的空虚通常不应该发生 - 建议在for中创建发电机时消耗发电机,而不要试图单独调用其方法。
generator API class generator {
/*
Move constructor. After this call, rhs is empty.
*/
generator (generator&& rhs) noexcept ;
/*
Destructor. Invalidates existing iterators.
*/
~generator () noexcept ;
generator ( const generator& rhs) = delete ;
generator& operator =(generator&& rhs) = delete ;
generator& operator =( const generator& rhs) = delete ;
/*
Returns true if this generator is not empty.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Starts running this generator and returns an iterator.
Throws errors::empty_generator if *this is empty.
Re-throws any exception that is thrown inside the generator code.
*/
iterator begin ();
/*
Returns an end iterator.
*/
static generator_end_iterator end () noexcept ;
};
class generator_iterator {
using value_type = std:: remove_reference_t <type>;
using reference = value_type&;
using pointer = value_type*;
using iterator_category = std::input_iterator_tag;
using difference_type = std:: ptrdiff_t ;
/*
Resumes the suspended generator and returns *this.
Re-throws any exception that was thrown inside the generator code.
*/
generator_iterator& operator ++();
/*
Post-increment version of operator++.
*/
void operator ++( int );
/*
Returns the latest value produced by the associated generator.
*/
reference operator *() const noexcept ;
/*
Returns a pointer to the latest value produced by the associated generator.
*/
pointer operator ->() const noexcept ;
/*
Comparision operators.
*/
friend bool operator ==( const generator_iterator& it0, const generator_iterator& it1) noexcept ;
friend bool operator ==( const generator_iterator& it, generator_end_iterator) noexcept ;
friend bool operator ==(generator_end_iterator end_it, const generator_iterator& it) noexcept ;
friend bool operator !=( const generator_iterator& it, generator_end_iterator end_it) noexcept ;
friend bool operator !=(generator_end_iterator end_it, const generator_iterator& it) noexcept ;
};generator示例:在此示例中,我们将编写一个生成器,该发电机产生序列S(n) = 1 + 2 + 3 + ... + n的n-thenter,其中n <= 100 :
concurrencpp::generator< int > sequence () {
int i = 1 ;
int sum = 0 ;
while (i <= 100 ) {
sum += i;
++i;
co_yield sum;
}
}
int main () {
for ( auto value : sequence ()) {
std::cout << value << std::end;
}
return 0 ;
} 由于多种原因,常规同步锁无法安全地使用内部任务:
std::mutex )有望在同一执行线程中锁定并解锁。在没有锁定的线程中解锁同步锁定是不确定的行为。由于可以在任何执行线程中暂停任务并恢复任务,因此在内部任务中使用时同步锁将破裂。 concurrencpp::async_lock通过提供与std::mutex类似的API来解决这些问题,并带有呼叫concurrencpp::async_lock主要区别,将返回可以安全地将其安全地与内部任务co_awaited懒惰。如果一个任务试图锁定异步锁并失败,则该任务将被暂停,并且当锁定任务解锁和获取时,将恢复任务。这使执行者可以处理大量的任务,等待获得锁定而无需昂贵的上下文开关和昂贵的内核电话。
类似于std::mutex工作方式,只有一个任务可以在任何给定时间获取async_lock ,并且在获取时读取屏障。释放异步锁定的写入障碍,并允许下一个任务获取它,从而创建一个单模型的链条,以看到其他修饰符所做的更改,并发布其修改,以供下一个修饰符看到。
就像std::mutex一样, concurrencpp::async_lock不是递归的。在获取此类锁时必须给出额外的关注 - 在已经获得锁定的另一个任务所产生的任务中,不得再次获得锁。在这种情况下,将发生不可避免的死锁。与concurrencpp中的其他对象不同, async_lock既不是共配也不是可移动的。
像标准锁一样, concurrencpp::async_lock旨在与示波器包装器一起使用,这些包装器利用C ++ RAII IDIOM来确保在功能返回或投掷异常时始终解锁锁。 async_lock::lock返回了一个懒惰的包装器的懒惰,该包装器称为async_lock::unlock破坏。不建议使用async_lock::unlock的原始用途。 concurrencpp::scoped_async_lock充当范围的包装器,并提供了与std::unique_lock几乎相同的API。 concurrencpp::scoped_async_lock是可移动的,但不交流。
async_lock::lock and scoped_async_lock::lock需要简历 - 授权作为其参数。调用这些方法后,如果可以锁定锁定,则将其锁定,并且当前任务立即恢复。如果不是,则当前任务将被暂停,并在最终获得锁定时将恢复给定的简历执行。
concurrencpp::scoped_async_lock包装async_lock并确保其正确解锁。像std::unique_lock一样,在某些情况下它不会包装任何锁,在这种情况下,它被认为是空的。当调用空scoped_async_lock默认构造,移动或scoped_async_lock::release时,可能会发生。空的示波器锁定锁不会解锁任何破坏的锁。
即使范围 - 驱动器锁并不空,也不意味着它拥有基础异步锁,并且会在破坏时解锁。如果使用scoped_async_lock::unlock scoped_async_lock(async_lock&, std::defer_lock_t)构建构建器,则非空和非所有范围示波器锁定锁可能会发生。
async_lock api class async_lock {
/*
Constructs an async lock object.
*/
async_lock () noexcept ;
/*
Destructs an async lock object.
*this is not automatically unlocked at the moment of destruction.
*/
~async_lock () noexcept ;
/*
Asynchronously acquires the async lock.
If *this has already been locked by another non-parent task, the current task will be suspended
and will be resumed when *this is acquired, inside resume_executor.
If *this has not been locked by another task, then *this will be acquired and the current task will be resumed
immediately in the calling thread of execution.
If *this has already been locked by a parent task, then unavoidable dead-lock will occur.
Throws std::invalid_argument if resume_executor is null.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result<scoped_async_lock> lock (std::shared_ptr<executor> resume_executor);
/*
Tries to acquire *this in the calling thread of execution.
Returns true if *this is acquired, false otherwise.
In any case, the current task is resumed immediately in the calling thread of execution.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result< bool > try_lock ();
/*
Releases *this and allows other tasks (including suspended tasks waiting for *this) to acquire it.
Throws std::system error if *this is not locked at the moment of calling this method.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
void unlock ();
};scoped_async_lock api class scoped_async_lock {
/*
Constructs an async lock wrapper that does not wrap any async lock.
*/
scoped_async_lock () noexcept = default ;
/*
If *this wraps async_lock, this method releases the wrapped lock.
*/
~scoped_async_lock () noexcept ;
/*
Moves rhs to *this.
After this call, *rhs does not wrap any async lock.
*/
scoped_async_lock (scoped_async_lock&& rhs) noexcept ;
/*
Wrapps unlocked lock.
lock must not be in acquired mode when calling this method.
*/
scoped_async_lock (async_lock& lock, std:: defer_lock_t ) noexcept ;
/*
Wrapps locked lock.
lock must be already acquired when calling this method.
*/
scoped_async_lock (async_lock& lock, std:: adopt_lock_t ) noexcept ;
/*
Calls async_lock::lock on the wrapped locked, using resume_executor as a parameter.
Throws std::invalid_argument if resume_executor is nulll.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::lock throws.
*/
lazy_result< void > lock (std::shared_ptr<executor> resume_executor);
/*
Calls async_lock::try_lock on the wrapped lock.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::try_lock throws.
*/
lazy_result< bool > try_lock ();
/*
Calls async_lock::unlock on the wrapped lock.
If *this does not wrap any lock, this method does nothing.
Throws std::system_error if *this wraps a lock and it is not locked.
*/
void unlock ();
/*
Checks whether *this wraps a locked mutex or not.
Returns true if wrapped locked is in acquired state, false otherwise.
*/
bool owns_lock () const noexcept ;
/*
Equivalent to owns_lock.
*/
explicit operator bool () const noexcept ;
/*
Swaps the contents of *this and rhs.
*/
void swap (scoped_async_lock& rhs) noexcept ;
/*
Empties *this and returns a pointer to the previously wrapped lock.
After a call to this method, *this doesn't wrap any lock.
The previously wrapped lock is not released,
it must be released by either unlocking it manually through the returned pointer or by
capturing the pointer with another scoped_async_lock which will take ownerwhip over it.
*/
async_lock* release () noexcept ;
/*
Returns a pointer to the wrapped async_lock, or a null pointer if there is no wrapped async_lock.
*/
async_lock* mutex () const noexcept ;
};async_lock示例:在此示例中,我们同时将10,000,000个整数从不同的任务转到std::vector对象,同时使用async_lock确保不会发生数据竞争,并且保留了该向量对象的内部状态的正确性。
# include " concurrencpp/concurrencpp.h "
# include < vector >
# include < iostream >
std::vector< size_t > numbers;
concurrencpp::async_lock lock;
concurrencpp::result< void > add_numbers (concurrencpp::executor_tag,
std::shared_ptr<concurrencpp::executor> executor,
size_t begin,
size_t end) {
for ( auto i = begin; i < end; i++) {
concurrencpp::scoped_async_lock raii_wrapper = co_await lock. lock (executor);
numbers. push_back (i);
}
}
int main () {
concurrencpp::runtime runtime;
constexpr size_t range = 10'000'000 ;
constexpr size_t sections = 4 ;
concurrencpp::result< void > results[sections];
for ( size_t i = 0 ; i < 4 ; i++) {
const auto range_start = i * range / sections;
const auto range_end = (i + 1 ) * range / sections;
results[i] = add_numbers ({}, runtime. thread_pool_executor (), range_start, range_end);
}
for ( auto & result : results) {
result. get ();
}
std::cout << " vector size is " << numbers. size () << std::endl;
// make sure the vector state has not been corrupted by unprotected concurrent accesses
std::sort (numbers. begin (), numbers. end ());
for ( size_t i = 0 ; i < range; i++) {
if (numbers[i] != i) {
std::cerr << " vector state is corrupted. " << std::endl;
return - 1 ;
}
}
std::cout << " succeeded pushing range [0 - 10,000,000] concurrently to the vector! " << std::endl;
return 0 ;
}async_condition_variable模仿标准condition_variable ,可以与async_lock一起安全使用。 async_condition_variable与async_lock一起使用暂停任务,直到某些共享内存(受锁定为锁定)已更改。想要监视共享内存更改的任务将锁定async_lock的实例,并调用async_condition_variable::await 。这将在原子上解锁锁定并暂停当前任务,直到某些修改器任务通知条件变量为止。修饰符任务获取锁,修改共享内存,解锁锁定并调用notify_one或notify_all 。当恢复悬挂任务(使用给出await的简历执行人)时,它再次锁定锁,从而使该任务从无缝的悬架点开始。像async_lock一样, async_condition_variable既不是可移动或共配 - 它是在一个地方创建的,并通过多个任务访问。
async_condition_variable::await重载需要简历 - 执行,该简历将用于恢复任务,以及锁定的scoped_async_lock 。 async_condition_variable::await两个过载 - 一种接受谓词,一个不接受谓词。不接受谓词的过载将在调用后立即暂停调用任务,直到通知notify_* 。通过让谓词检查共享内存并重复暂停任务,直到共享内存达到所需状态,那确实接受谓词的过载可以工作。示意性地工作就像打电话
while (!pred()) { // pred() inspects the shared memory and returns true or false
co_await await (resume_executor, lock); // suspend the current task until another task calls `notify_xxx`
}就像标准条件变量一样,鼓励应用程序使用谓词置换载荷,因为它可以对悬架和恢复进行更细粒度的控制。 async_condition_variable可用于编写并发集合和数据结构(例如并发队列和频道)。
在内部, async_condition_variable拥有一个悬架标题,其中任务在等待要通知的条件变量时会引入自己。当调用任何一个notify_*方法时,根据调用方法,通知任务删除一个任务或所有任务。任务以FIFO的方式从悬架标题中脱水。例如,如果任务A呼叫await ,然后任务B调用await ,则任务C调用notify_one ,然后内部任务A将被脱水并恢复。任务B将保持暂停,直到调用notify_one或notify_all另一个呼叫。如果任务A和任务B被暂停,并且任务C调用notify_all ,则两个任务将被脱水和恢复。
async_condition_variable API class async_condition_variable {
/*
Constructor.
*/
async_condition_variable () noexcept ;
/*
Atomically releases lock and suspends the current task by adding it to *this suspension-queue.
Throws std::invalid_argument if resume_executor is null.
Throws std::invalid_argument if lock is not locked at the moment of calling this method.
Might throw std::system_error if the underlying std::mutex throws.
*/
lazy_result< void > await (std::shared_ptr<executor> resume_executor, scoped_async_lock& lock);
/*
Equivalent to:
while (!pred()) {
co_await await(resume_executor, lock);
}
Might throw any exception that await(resume_executor, lock) might throw.
Might throw any exception that pred might throw.
*/
template < class predicate_type >
lazy_result< void > await (std::shared_ptr<executor> resume_executor, scoped_async_lock& lock, predicate_type pred);
/*
Dequeues one task from *this suspension-queue and resumes it, if any available at the moment of calling this method.
The suspended task is resumed by scheduling it to run on the executor given when await was called.
Might throw std::system_error if the underlying std::mutex throws.
*/
void notify_one ();
/*
Dequeues all tasks from *this suspension-queue and resumes them, if any available at the moment of calling this method.
The suspended tasks are resumed by scheduling them to run on the executors given when await was called.
Might throw std::system_error if the underlying std::mutex throws.
*/
void notify_all ();
};async_condition_variable示例:在此示例中, async_lock和async_condition_variable共同实现了一个并发队列,该队列可用于在任务之间发送数据(在此示例中,整数)。请注意,某些方法在另一个返回lazy_result的同时返回result ,显示了渴望和懒惰任务如何一起工作。
# include " concurrencpp/concurrencpp.h "
# include < queue >
# include < iostream >
using namespace concurrencpp ;
class concurrent_queue {
private:
async_lock _lock;
async_condition_variable _cv;
std::queue< int > _queue;
bool _abort = false ;
public:
concurrent_queue () = default ;
result< void > shutdown (std::shared_ptr<executor> resume_executor) {
{
auto guard = co_await _lock. lock (resume_executor);
_abort = true ;
}
_cv. notify_all ();
}
lazy_result< void > push (std::shared_ptr<executor> resume_executor, int i) {
{
auto guard = co_await _lock. lock (resume_executor);
_queue. push (i);
}
_cv. notify_one ();
}
lazy_result< int > pop (std::shared_ptr<executor> resume_executor) {
auto guard = co_await _lock. lock (resume_executor);
co_await _cv. await (resume_executor, guard, [ this ] {
return _abort || !_queue. empty ();
});
if (!_queue. empty ()) {
auto result = _queue. front ();
_queue. pop ();
co_return result;
}
assert (_abort);
throw std::runtime_error ( " queue has been shut down. " );
}
};
result< void > producer_loop (executor_tag,
std::shared_ptr<thread_pool_executor> tpe,
concurrent_queue& queue,
int range_start,
int range_end) {
for (; range_start < range_end; ++range_start) {
co_await queue. push (tpe, range_start);
}
}
result< void > consumer_loop (executor_tag, std::shared_ptr<thread_pool_executor> tpe, concurrent_queue& queue) {
try {
while ( true ) {
std::cout << co_await queue. pop (tpe) << std::endl;
}
} catch ( const std:: exception & e) {
std::cerr << e. what () << std::endl;
}
}
int main () {
runtime runtime;
const auto thread_pool_executor = runtime. thread_pool_executor ();
concurrent_queue queue;
result< void > producers[ 4 ];
result< void > consumers[ 4 ];
for ( int i = 0 ; i < 4 ; i++) {
producers[i] = producer_loop ({}, thread_pool_executor, queue, i * 5 , (i + 1 ) * 5 );
}
for ( int i = 0 ; i < 4 ; i++) {
consumers[i] = consumer_loop ({}, thread_pool_executor, queue);
}
for ( int i = 0 ; i < 4 ; i++) {
producers[i]. get ();
}
queue. shutdown (thread_pool_executor). get ();
for ( int i = 0 ; i < 4 ; i++) {
consumers[i]. get ();
}
return 0 ;
}concurrencpp运行时对象是用于获取,存储和创建新执行者的代理。
运行时必须在主函数开始运行后立即创建为值类型。当Consurrencpp运行时脱离范围时,它会在其存储的执行者上迭代,并通过致电executor::shutdown将其一个接一个地关闭。然后,执行者退出其内部工作循环,任何随后的安排新任务的尝试都会引发concurrencpp::runtime_shutdown异常。运行时还包含用于创建计时器和延迟对象的全局计时器队列。毁灭后,存储的执行者摧毁了未执行的任务,并等待正在进行的任务完成。如果正在进行的任务试图使用执行者催生新任务或安排自己的任务继续 - 将会引发例外。在这种情况下,持续的任务需要尽快退出,允许其基础执行者退出。计时器队列也将关闭,取消所有运行计时器。有了这种RAII的代码样式,在创建运行时对象以及运行时/之后的范围之后,无法处理任何任务。这使同时应用的应用程序不需要明确通信终止消息。只要运行时对象还活着,任务就是免费使用执行者。
runtime API class runtime {
/*
Creates a runtime object with default options.
*/
runtime ();
/*
Creates a runtime object with user defined options.
*/
runtime ( const concurrencpp::runtime_options& options);
/*
Destroys this runtime object.
Calls executor::shutdown on each monitored executor.
Calls timer_queue::shutdown on the global timer queue.
*/
~runtime () noexcept ;
/*
Returns this runtime timer queue used to create new times.
*/
std::shared_ptr<concurrencpp::timer_queue> timer_queue () const noexcept ;
/*
Returns this runtime concurrencpp::inline_executor
*/
std::shared_ptr<concurrencpp::inline_executor> inline_executor () const noexcept ;
/*
Returns this runtime concurrencpp::thread_pool_executor
*/
std::shared_ptr<concurrencpp::thread_pool_executor> thread_pool_executor () const noexcept ;
/*
Returns this runtime concurrencpp::background_executor
*/
std::shared_ptr<concurrencpp::thread_pool_executor> background_executor () const noexcept ;
/*
Returns this runtime concurrencpp::thread_executor
*/
std::shared_ptr<concurrencpp::thread_executor> thread_executor () const noexcept ;
/*
Creates a new concurrencpp::worker_thread_executor and registers it in this runtime.
Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
*/
std::shared_ptr<concurrencpp::worker_thread_executor> make_worker_thread_executor ();
/*
Creates a new concurrencpp::manual_executor and registers it in this runtime.
Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
*/
std::shared_ptr<concurrencpp::manual_executor> make_manual_executor ();
/*
Creates a new user defined executor and registers it in this runtime.
executor_type must be a valid concrete class of concurrencpp::executor.
Might throw std::bad_alloc if no memory is available.
Might throw any exception that the constructor of <<executor_type>> might throw.
*/
template < class executor_type , class ... argument_types>
std::shared_ptr<executor_type> make_executor (argument_types&& ... arguments);
/*
returns the version of concurrencpp that the library was built with.
*/
static std::tuple< unsigned int , unsigned int , unsigned int > version () noexcept ;
};在某些情况下,应用程序对监视线程创建和终止感兴趣,例如,某些内存分配器需要在创建和终止时注册并未注册新线程。 concurrencpp运行时允许设置线程创建回调和线程终止回调。每当ConsurrenCpp工人创建一个新线程以及该线程终止时,这些回调将被调用。这些回调总是从创建/终止线程内部调用,因此std::this_thread::get_id将始终返回相关的线程ID。这些回调的签名是void callback (std::string_view thread_name) 。 thread_name是一个confurrencpp特定标题,它给出了线程,可以在一些呈现线程名称的辩论者中观察到。线程名称不能保证是唯一的,应用于记录和调试。
为了设置线程创建回调和/或线程终止回调,应用程序可以设置runtime_options的thread_started_callback和/或thread_terminated_callback成员,该成员传递给了运行时构建器。由于这些回调被复制到可能创建线程的每个contrencpp工人,因此这些回调必须是可交互的。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::runtime_options options;
options. thread_started_callback = [](std::string_view thread_name) {
std::cout << " A new thread is starting to run, name: " << thread_name << " , thread id: " << std::this_thread::get_id ()
<< std::endl;
};
options. thread_terminated_callback = [](std::string_view thread_name) {
std::cout << " A thread is terminating, name: " << thread_name << " , thread id: " << std::this_thread::get_id () << std::endl;
};
concurrencpp::runtime runtime (options);
const auto timer_queue = runtime. timer_queue ();
const auto thread_pool_executor = runtime. thread_pool_executor ();
concurrencpp::timer timer =
timer_queue-> make_timer ( std::chrono::milliseconds ( 100 ), std::chrono::milliseconds ( 500 ), thread_pool_executor, [] {
std::cout << " A timer callable is executing " << std::endl;
});
std::this_thread::sleep_for ( std::chrono::seconds ( 3 ));
return 0 ;
}可能的输出:
A new thread is starting to run, name: concurrencpp::timer_queue worker, thread id: 7496
A new thread is starting to run, name: concurrencpp::thread_pool_executor worker, thread id: 21620
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A thread is terminating, name: concurrencpp::timer_queue worker, thread id: 7496
A thread is terminating, name: concurrencpp::thread_pool_executor worker, thread id: 21620
应用程序可以通过继承derivable_executor类创建自己的自定义执行类型。实现用户定义的执行者时,有几点要考虑:最重要的是要记住,执行者是从多个线程中使用的,因此实现的方法必须是线程安全的。
可以使用runtime::make_executor创建新执行者。仅通过使用runtime::make_executor ,应用程序不得使用普通实例化创建新的执行者(例如std::make_shared或Plain new )。此外,应用程序不得尝试重新确定内置的concurrencpp执行者,例如thread_pool_executor或thread_executor ,这些执行者只能通过运行时对象中的现有实例访问这些执行者。
另一个重要的一点是正确处理关闭: shutdown , shutdown_requested and enqueue均应监视执行者状态并在调用时相应地行事:
shutdown应告诉基础线程退出,然后加入它们。shutdown ,该方法必须通过忽略第一次调用后任何后续的shutdown调用来处理此情况。shutdown , enqueue必须投掷concurrencpp::errors::runtime_shutdown异常。 task对象实施执行者是罕见的情况之一,需要直接与concurrencpp::task类。 concurrencpp::task是一个类似对象的std::function ,但有一些差异。与std::function一样,任务对象存储可调用,该可容纳可作为异步操作。与std::function不同, task是仅移动类型。在调用时,任务对象没有接收参数并返回void 。此外,每个任务对象只能调用一次。第一次调用后,任务对象变为空。调用一个空任务对象等于调用一个空的lambda( []{} ),并且不会抛出任何例外。任务对象将其可调用作为转发参考( type&& type是模板参数),而不是通过复制(例如std::function )。存储的可呼叫的构造发生在就位。这允许任务对象包含一个仅移动类型的可可(例如std::unique_ptr和concurrencpp::result )。任务对象尝试使用不同的方法来优化存储类型的使用情况,例如,任务对象将短缓冲区优化(SBO)应用于常规,小可可的情况,并将对std::coroutine_handle<void>内联呼叫将其直接致电无需虚拟派遣而拨打。
task API class task {
/*
Creates an empty task object.
*/
task () noexcept ;
/*
Creates a task object by moving the stored callable of rhs to *this.
If rhs is empty, then *this will also be empty after construction.
After this call, rhs is empty.
*/
task (task&& rhs) noexcept ;
/*
Creates a task object by storing callable in *this.
<<typename std::decay<callable_type>::type>> will be in-place-
constructed inside *this by perfect forwarding callable.
*/
template < class callable_type >
task (callable_type&& callable);
/*
Destroys stored callable, does nothing if empty.
*/
~task () noexcept ;
/*
If *this is empty, does nothing.
Invokes stored callable, and immediately destroys it.
After this call, *this is empty.
May throw any exception that the invoked callable may throw.
*/
void operator ()();
/*
Moves the stored callable of rhs to *this.
If rhs is empty, then *this will also be empty after this call.
If *this already contains a stored callable, operator = destroys it first.
*/
task& operator =(task&& rhs) noexcept ;
/*
If *this is not empty, task::clear destroys the stored callable and empties *this.
If *this is empty, clear does nothing.
*/
void clear () noexcept ;
/*
Returns true if *this stores a callable. false otherwise.
*/
explicit operator bool () const noexcept ;
/*
Returns true if *this stores a callable,
and that stored callable has the same type as <<typename std::decay<callable_type>::type>>
*/
template < class callable_type >
bool contains () const noexcept ;
};在实施用户定义的执行者时,要按照执行者的内部机制来存储task对象的实现(当enqueue入口时)。
在此示例中,我们创建了一个执行者,该执行人记录了诸如完成任务或执行任务之类的操作。我们实现了executor接口,并请求运行时通过调用runtime::make_executor来创建和存储它的实例。其余的应用程序的行为与我们使用非用户定义的执行者完全相同。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < queue >
# include < thread >
# include < mutex >
# include < condition_variable >
class logging_executor : public concurrencpp ::derivable_executor<logging_executor> {
private:
mutable std::mutex _lock;
std::queue<concurrencpp::task> _queue;
std::condition_variable _condition;
bool _shutdown_requested;
std::thread _thread;
const std::string _prefix;
void work_loop () {
while ( true ) {
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
return ;
}
if (!_queue. empty ()) {
auto task = std::move (_queue. front ());
_queue. pop ();
lock. unlock ();
std::cout << _prefix << " A task is being executed " << std::endl;
task ();
continue ;
}
_condition. wait (lock, [ this ] {
return !_queue. empty () || _shutdown_requested;
});
}
}
public:
logging_executor (std::string_view prefix) :
derivable_executor<logging_executor>( " logging_executor " ),
_shutdown_requested ( false ),
_prefix (prefix) {
_thread = std::thread ([ this ] {
work_loop ();
});
}
void enqueue (concurrencpp::task task) override {
std::cout << _prefix << " A task is being enqueued! " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
throw concurrencpp::errors::runtime_shutdown ( " logging executor - executor was shutdown. " );
}
_queue. emplace ( std::move (task));
_condition. notify_one ();
}
void enqueue (std::span<concurrencpp::task> tasks) override {
std::cout << _prefix << tasks. size () << " tasks are being enqueued! " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
throw concurrencpp::errors::runtime_shutdown ( " logging executor - executor was shutdown. " );
}
for ( auto & task : tasks) {
_queue. emplace ( std::move (task));
}
_condition. notify_one ();
}
int max_concurrency_level () const noexcept override {
return 1 ;
}
bool shutdown_requested () const noexcept override {
std::unique_lock<std::mutex> lock (_lock);
return _shutdown_requested;
}
void shutdown () noexcept override {
std::cout << _prefix << " shutdown requested " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) return ; // nothing to do.
_shutdown_requested = true ;
lock. unlock ();
_condition. notify_one ();
_thread. join ();
}
};
int main () {
concurrencpp::runtime runtime;
auto logging_ex = runtime. make_executor <logging_executor>( " Session #1234 " );
for ( size_t i = 0 ; i < 10 ; i++) {
logging_ex-> post ([] {
std::cout << " hello world " << std::endl;
});
}
std::getchar ();
return 0 ;
}$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S . -B build /lib
$ cmake -- build build /lib --config Release$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S test -B build / test
$ cmake -- build build / test
< # for release mode: cmake --build build/test --config Release #>
$ cd build / test
$ ctest . -V -C Debug
< # for release mode: ctest . -V -C Release #> $ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -DCMAKE_BUILD_TYPE=Release -S . -B build /lib
$ cmake -- build build /lib
#optional, install the library: sudo cmake --install build/lib 使用Clang和GCC,也可以使用TSAN(线程消毒器)支持进行测试。
$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S test -B build / test
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S test -B build/test
#for TSAN mode: cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_THREAD_SANITIZER=Yes -S test -B build/test
$ cmake -- build build / test
$ cd build / test
$ ctest . -V在Linux上编译时,该库默认使用libstdc++使用。如果您打算将libc++作为标准库实现,则应指定CMAKE_TOOLCHAIN_FILE标志如下:
$ cmake -DCMAKE_TOOLCHAIN_FILE=../cmake/libc++.cmake -DCMAKE_BUILD_TYPE=Release -S . -B build /lib或者,为手动构建和安装图书馆,开发人员可以通过VCPKG和Conan软件包经理获得Concurrencpp的稳定版本:
VCPKG:
$ vcpkg install concurrencpp柯南:同意
Concurrencpp带有一个内置的沙盒程序,开发人员可以修改和实验,而无需安装或链接编译的库与其他代码库。为了使用沙箱,开发人员可以修改sandbox/main.cpp并使用以下命令编译应用程序:
$ cmake -S sandbox -B build /sandbox
$ cmake -- build build /sandbox
< # for release mode: cmake --build build/sandbox --config Release #>
$ ./ build /sandbox < # runs the sandbox> $ cmake -S sandbox -B build /sandbox
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S sandbox -B build/sandbox
$ cmake -- build build /sandbox
$ ./ build /sandbox #runs the sandbox