concurrencppは、C ++の世界に同時タスクの力をもたらし、開発者がタスク、エグゼクティブ、コルーチンを使用して、非常に同時のアプリケーションを簡単かつ安全に記述できるようにします。 Concurrencppアプリケーションを使用することにより、非同期に処理する必要がある大きな手順を分解し、同時に実行し、協同組合で作業して指名手配結果を達成する必要があります。また、concurrencppを使用すると、アプリケーションは並列コルーチンを使用して並列アルゴリズムを簡単に書き込むこともできます。
同時に主な利点は次のとおりです。
std::threadやstd::mutexのような低レベルのプリミティブの代わりに、より高いレベルのプリミティブではなく、より高いレベルのタスクを使用して最新の並行性コードを作成します。co_awaitキーワードを使用して、非ブロッキング、同期のようなコードを簡単に実現します。executor APIthread_pool_executor APImanual_executor APIresultタイプresult APIlazy_resultタイプlazy_result apiresult_promise apiresult_promise例shared_result APIshared_result例make_ready_resultmake_exceptional_resultwhen_allwhen_anyresume_ontimer_queue APItimer APIgenerator APIgenerator例async_lock apiscoped_async_lock apiasync_lock例async_condition_variable apiasync_condition_variable例runtime APItaskオブジェクトtask APIconcurrencppは、コンカレントタスクの概念を中心に構築されています。タスクは非同期操作です。タスクは、従来のスレッド中心のアプローチよりも、同時コードの抽象化のレベルが高いことを提供します。タスクは一緒にチェーンすることができます。つまり、タスクは非同期結果を次のように渡すことを意味します。ここでは、1つのタスクの結果が別の継続的なタスクのパラメーターまたは中間値であるかのように使用されます。タスクは、利用可能なハードウェアリソースをより良く利用し、生のスレッドを使用するよりもはるかにスケーリングすることができます。タスクは一時停止できるため、OS-Threadsの基礎をブロックすることなく、結果を生成する別のタスクを待っています。タスクは、開発者がビジネスロジックにより焦点を合わせ、スレッド管理やスレッド間同期などの低レベルの概念により焦点を合わせることができるようにすることで、はるかに生産性を高めます。
タスクは実行されるアクションを指定しますが、エグゼキューターは、タスクを実行する場所と方法を指定するワーカーオブジェクトです。執行者は、スレッドプールとタスクキューの退屈な管理を余儀なくされます。また、エグゼクターは、タスクを作成およびスケジュールするための統一されたAPIを提供することにより、アプリケーションコードからこれらの概念を切り離します。
タスクは、結果オブジェクトを使用して相互に通信します。結果のオブジェクトは、あるタスクの非同期結果を別の継続的なタスクに渡す非同期パイプです。結果は、非ブロッキング方法で待ち望まれ、解決することができます。
これらの3つの概念 - タスク、執行者、および関連する結果は、concurrencppの構成要素です。執行者は、結果オブジェクトを介して結果を送信することにより、互いに通信するタスクを実行します。タスク、エグゼキューター、および結果のオブジェクトは、共生的に連携して、高速でクリーンな同時コードを作成します。
concurrencppは、RAIIコンセプトを中心に構築されています。タスクとエグゼキューターを使用するために、アプリケーションはmain関数の開始時にruntimeインスタンスを作成します。その後、ランタイムは既存の執行者を獲得し、新しいユーザー定義のエグゼクターを登録するために使用されます。執行者は、実行するタスクを作成およびスケジュールするために使用され、非同期結果を消費者として機能させる別のタスクに渡すために使用できるresultオブジェクトを返す場合があります。ランタイムが破壊されると、すべての保存された執行者を反復し、 shutdown方法を呼び出します。その後、すべての執行者は優雅に終了します。予定外のタスクが破壊され、新しいタスクを作成しようとすると例外が投げかけられます。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::runtime runtime;
auto result = runtime. thread_executor ()-> submit ([] {
std::cout << " hello world " << std::endl;
});
result. get ();
return 0 ;
}この基本的な例では、ランタイムオブジェクトを作成し、ランタイムからスレッドエグゼクタを取得しました。 submit使用して、指定された呼び出し可能なラムダを渡しました。このラムダはvoidを返します。したがって、エグゼキューターは、非同期結果を発信者に戻すresult<void>オブジェクトを返します。 mainコールは、結果の準備が整うまでメインスレッドをブロックしget 。例外がスローされていない場合は、Get Returns void get 。例外がスローされた場合は、再スローしget 。非同期に、 thread_executor新しい実行のスレッドを起動し、与えられたLambdaを実行します。暗黙的にco_return voidとタスクが終了します。 mainはブロックされていません。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < vector >
# include < algorithm >
# include < ctime >
using namespace concurrencpp ;
std::vector< int > make_random_vector () {
std::vector< int > vec ( 64 * 1'024 );
std::srand ( std::time ( nullptr ));
for ( auto & i : vec) {
i = :: rand ();
}
return vec;
}
result< size_t > count_even (std::shared_ptr<thread_pool_executor> tpe, const std::vector< int >& vector) {
const auto vecor_size = vector. size ();
const auto concurrency_level = tpe-> max_concurrency_level ();
const auto chunk_size = vecor_size / concurrency_level;
std::vector<result< size_t >> chunk_count;
for ( auto i = 0 ; i < concurrency_level; i++) {
const auto chunk_begin = i * chunk_size;
const auto chunk_end = chunk_begin + chunk_size;
auto result = tpe-> submit ([&vector, chunk_begin, chunk_end]() -> size_t {
return std::count_if (vector. begin () + chunk_begin, vector. begin () + chunk_end, []( auto i) {
return i % 2 == 0 ;
});
});
chunk_count. emplace_back ( std::move (result));
}
size_t total_count = 0 ;
for ( auto & result : chunk_count) {
total_count += co_await result;
}
co_return total_count;
}
int main () {
concurrencpp::runtime runtime;
const auto vector = make_random_vector ();
auto result = count_even (runtime. thread_pool_executor (), vector);
const auto total_count = result. get ();
std::cout << " there are " << total_count << " even numbers in the vector " << std::endl;
return 0 ;
}この例では、ランタイムオブジェクトを作成してプログラムを開始します。ランダム数で満たされたベクトルを作成し、ランタイムからthread_pool_executorを取得し、 count_evenを呼び出します。 count_evenは、より多くのタスクとco_await生成するコルーチンです。 max_concurrency_level 、執行者がサポートする労働者の最大額を返します。ThreadPoolExextorの場合、労働者の数はコア数から計算されます。次に、アレイを分割して労働者の数に一致し、独自のタスクで処理されるすべてのチャンクを送信します。非同期に、労働者は各チャンクに含まれる偶数の数を数え、結果co_return 。 count_even 、 co_awaitを使用してカウントを引くことですべての結果を合計します。最終結果はco_return edです。 Calling getによってブロックされたメインスレッドはブロックされておらず、合計カウントが返されます。メインは偶数の数を印刷し、プログラムは優雅に終了します。
すべての大きなまたは複雑な操作は、より小さくて連鎖可能なステップに分解できます。タスクは、これらの計算手順を実装する非同期操作です。タスクは、執行者の助けを借りてどこでも実行できます。タスクは通常の呼び出し可能なもの(ファンクションやラムダなど)から作成できますが、タスクは主にコルーチンで使用されているため、滑らかなサスペンションと再開を可能にします。 concurrencppでは、タスクの概念はconcurrencpp::taskクラスで表されます。タスクの概念はConcurRenppの中心ですが、タスクオブジェクトは外部ヘルプなしでランタイムによって作成およびスケジュールされるため、アプリケーションはタスクオブジェクト自体を作成および操作する必要はほとんどありません。
concurrencppを使用すると、アプリケーションがタスクを作成する主な方法としてコルタインを作成および消費することができます。 Concurrencppは、熱心なタスクと怠zyなタスクの両方をサポートしています。
熱心なタスクが呼び出された瞬間に実行され始めます。このタイプの実行は、アプリケーションが非同期アクションを発射し、その結果を後で消費する必要がある場合(後で火災と消費)、または非同期結果(火と忘却)を完全に無視する場合に推奨されます。
熱心なタスクは、 resultまたはnull_resultを返すことができます。 resultリターンタイプは、coroutineに返された値またはスローされた例外(火災と消費)を通過するように指示しますが、 null_resultリターンタイプは、コルーチンにそれらのいずれかをドロップして無視するように指示します(火と忘れ)。
熱心なコルーチンは、発信者スレッドで同期して実行を開始できます。この種のコルーチンは「通常のコルーチン」と呼ばれます。 Concurrencpp Eager Coroutinesは、特定の執行者内で並行して実行を開始することもできます。この種のコルーチンは「パラレルコルーチン」と呼ばれます。
一方、怠zyなタスクは、 co_awaitの場合にのみ実行を開始します。このタイプのタスクは、タスクの結果がタスクを作成した直後に消費されることを意図している場合に推奨されます。延期される怠zyなタスクは、非同期結果を消費者に戻すために特別な糸状化を必要としないため、即時消費の場合にはもう少し最適化されています。コンパイラはまた、基礎となるコルーチンの約束を形成するために必要ないくつかのメモリ割り当てを最適化するかもしれません。怠zyなタスクを発射して他の何かを実行することは不可能です - 怠zyなカリーコルーチンの発射は、必然的に発信者コルーチンの停止を意味します。発信者のコルーチンは、レイジーカリーコルーチンが完了したときにのみ再開されます。怠zyなタスクはlazy_resultのみを返すことができます。
lazy_result::runを呼び出すことにより、怠zyなタスクを熱心なタスクに変換できます。この方法は、怠zyなタスクをインラインで実行し、新しく開始されたタスクを監視するresultオブジェクトを返します。開発者が使用する結果タイプが不明な場合は、必要に応じて定期的な(熱心な)結果に変換できるため、怠zyな結果を使用することをお勧めします。
関数がlazy_result 、 result 、またはnull_resultのいずれかを返し、その本体に少なくとも1つのco_awaitまたはco_returnを含む場合、関数は同時コルーチンです。すべての有効なCONCURRENCPP Coroutineは有効なタスクです。上記のカウントの例では、 count_evenはそのようなコルーチンです。最初にcount_even生み出した後、その内部では、Threadpoolの執行者がより多くの子供のタスク(通常の呼び出しが作成されます)を生み出し、最終的にco_await使用して参加しました。
concurrencppエグゼクティアは、タスクをスケジュールして実行できるオブジェクトです。エグゼクティブは、アプリケーションコードから切り離すことにより、スレッド、スレッドプール、タスクキューなどのリソースを管理する作業を簡素化します。執行者は、タスクをスケジュールおよび実行する統一された方法を提供します。なぜなら、それらはすべてconcurrencpp::executor拡張するためです。
executor API class executor {
/*
Initializes a new executor and gives it a name.
*/
executor (std::string_view name);
/*
Destroys this executor.
*/
virtual ~executor () noexcept = default ;
/*
The name of the executor, used for logging and debugging.
*/
const std::string name;
/*
Schedules a task to run in this executor.
Throws concurrencpp::errors::runtime_shutdown exception if shutdown was called before.
*/
virtual void enqueue (concurrencpp::task task) = 0;
/*
Schedules a range of tasks to run in this executor.
Throws concurrencpp::errors::runtime_shutdown exception if shutdown was called before.
*/
virtual void enqueue (std::span<concurrencpp::task> tasks) = 0;
/*
Returns the maximum count of real OS threads this executor supports.
The actual count of threads this executor is running might be smaller than this number.
returns numeric_limits<int>::max if the executor does not have a limit for OS threads.
*/
virtual int max_concurrency_level () const noexcept = 0;
/*
Returns true if shutdown was called before, false otherwise.
*/
virtual bool shutdown_requested () const noexcept = 0;
/*
Shuts down the executor:
- Tells underlying threads to exit their work loop and joins them.
- Destroys unexecuted coroutines.
- Makes subsequent calls to enqueue, post, submit, bulk_post and
bulk_submit to throw concurrencpp::errors::runtime_shutdown exception.
- Makes shutdown_requested return true.
*/
virtual void shutdown () noexcept = 0;
/*
Turns a callable and its arguments into a task object and
schedules it to run in this executor using enqueue.
Arguments are passed to the task by decaying them first.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type , class ... argument_types>
void post (callable_type&& callable, argument_types&& ... arguments);
/*
Like post, but returns a result object that passes the asynchronous result.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type , class ... argument_types>
result<type> submit (callable_type&& callable, argument_types&& ... arguments);
/*
Turns an array of callables into an array of tasks and
schedules them to run in this executor using enqueue.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type >
void bulk_post (std::span<callable_type> callable_list);
/*
Like bulk_post, but returns an array of result objects that passes the asynchronous results.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type >
std::vector<concurrencpp::result<type>> bulk_submit (std::span<callable_type> callable_list);
};上記のように、Concurrencppは一般的に使用される執行者を提供します。これらのエグゼキュータータイプは次のとおりです。
スレッドプールエグゼキューター- スレッドのプールを維持する汎用執行者。スレッドプールエグゼクタは、ブロックしない短いCPUバインドタスクに適しています。アプリケーションは、非ブロッキングタスクのデフォルトのエグゼクタとしてこのエグゼクティアを使用することをお勧めします。 concurrencppスレッドプールは、動的なスレッドインジェクションと動的作業のバランスを提供します。
バックグラウンドエグゼキューター- スレッドのプールが大きいスレッドプールエグゼキューター。ファイルIOやDBクエリなどの短いブロッキングタスクの起動に適しています。重要な注意:結果を消費する場合、このエグゼクティブはsubmitとbulk_submitを呼び出して返された場合、background_executor内でCPUバウンドタスクが処理されるのを防ぐために、 resume_onを使用して実行をCPUバウンドエグゼキューターに切り替えることが重要です。
例:
auto result = background_executor.submit([] { /* some blocking action */ });
auto done_result = co_await result.resolve();
co_await resume_on (some_cpu_executor);
auto val = co_await done_result; // runs inside some_cpu_executorスレッドエグゼキューター- 各エンキュータスクを起動して、新しい実行のスレッドで実行するエグゼクター。スレッドは再利用されません。この執行者は、作業ループを実行するオブジェクトや長いブロッキング操作など、長期にわたるタスクに適しています。
ワーカースレッドエグゼキューター- 単一のタスクキューを維持する単一のスレッドエグゼキューター。アプリケーションが多くの関連するタスクを実行する専用のスレッドが必要な場合に適しています。
手動執行者- コルーチンを単独で実行しない執行者。アプリケーションコードは、実行方法を手動で呼び出すことにより、以前にEnqueuedタスクを実行できます。
派生可能なエグゼクティア- ユーザー定義のエグゼクティア向けの基本クラス。 concurrencpp::executorから直接継承することは可能ですが、 derivable_executor 、コンパイラに最適化の機会を提供するCRTPパターンを使用します。
インライン執行者- 主に他の執行者の行動をオーバーライドするために使用されます。タスクをインラインで呼び出すのと同等です。
執行者の裸のメカニズムは、そのenqueueメソッドにカプセル化されています。このメソッドは、実行のタスクを除き、2つの過負荷があります。1つは引数として単一のタスクオブジェクトを受信し、もう1つはタスクオブジェクトのスパンを受信します。 2番目のオーバーロードは、タスクのバッチを排除するために使用されます。これにより、ヒューリスティックのスケジュールを改善し、競合を減少させることができます。
アプリケーションは、 enqueueのみに依存する必要はありません。Concurrencpp concurrencpp::executor舞台裏のタスクオブジェクトに変換することにより、ユーザーコールブルをスケジュールするためのAPIを提供します。アプリケーションは、提供された呼び出し可能なものの非同期結果に合格する結果オブジェクトを返すように執行者を要求できます。これはexecutor::submit and executor::bulk_submitを呼び出すことによって行われます。 submit呼び出し可能なものを取得し、結果オブジェクトを返します。 executor::bulk_submit 、呼び出し可能なspanを取得し、結果のvectorを同様の方法でsubmitします。多くの場合、アプリケーションは非同期値や例外に関心がありません。この場合、アプリケーションはexecutor:::post and executor::bulk_postを使用して、実行可能な呼び出し可能なspanをスケジュールすることもできますが、返された値またはスローされた例外をドロップするタスクも指示します。非同期結果を通過しないことは、合格よりも速いですが、進行中のタスクのステータスや結果を知る方法はありません。
post 、 bulk_post 、 submit and bulk_submit基礎となるスケジューリングメカニズムのために、舞台裏のenqueueを使用します。
thread_pool_executor API post 、 bulk_post 、 bulk_submitのsubmitは別として、 thread_pool_executorはこれらの追加方法を提供します。
class thread_pool_executor {
/*
Returns the number of milliseconds each thread-pool worker
remains idle (lacks any task to execute) before exiting.
This constant can be set by passing a runtime_options object
to the constructor of the runtime class.
*/
std::chrono::milliseconds max_worker_idle_time () const noexcept ;
};manual_executor API post 、bulk_post、 bulk_post 、 submit 、 bulk_submit 、 manual_executorこれらの追加方法を提供します。
class manual_executor {
/*
Destructor. Equivalent to clear.
*/
~manual_executor () noexcept ;
/*
Returns the number of enqueued tasks at the moment of invocation.
This number can change quickly by the time the application handles it, it should be used as a hint.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
size_t size () const noexcept ;
/*
Queries whether the executor is empty from tasks at the moment of invocation.
This value can change quickly by the time the application handles it, it should be used as a hint.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
bool empty () const noexcept ;
/*
Clears the executor from any enqueued but yet to-be-executed tasks,
and returns the number of cleared tasks.
Tasks enqueued to this executor by (post_)submit method are resumed
and errors::broken_task exception is thrown inside them.
Ongoing tasks that are being executed by loop_once(_XXX) or loop(_XXX) are uneffected.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t clear ();
/*
Tries to execute a single task. If at the moment of invocation the executor
is empty, the method does nothing.
Returns true if a task was executed, false otherwise.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool loop_once ();
/*
Tries to execute a single task.
This method returns when either a task was executed or max_waiting_time
(in milliseconds) has reached.
If max_waiting_time is 0, the method is equivalent to loop_once.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool loop_once_for (std::chrono::milliseconds max_waiting_time);
/*
Tries to execute a single task.
This method returns when either a task was executed or timeout_time has reached.
If timeout_time has already expired, this method is equivalent to loop_once.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
bool loop_once_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Tries to execute max_count enqueued tasks and returns the number of tasks that were executed.
This method does not wait: it returns when the executor
becomes empty from tasks or max_count tasks have been executed.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t loop ( size_t max_count);
/*
Tries to execute max_count tasks.
This method returns when either max_count tasks were executed or a
total amount of max_waiting_time has passed.
If max_waiting_time is 0, the method is equivalent to loop.
Returns the actual amount of tasks that were executed.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t loop_for ( size_t max_count, std::chrono::milliseconds max_waiting_time);
/*
Tries to execute max_count tasks.
This method returns when either max_count tasks were executed or timeout_time has reached.
If timeout_time has already expired, the method is equivalent to loop.
Returns the actual amount of tasks that were executed.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
size_t loop_until ( size_t max_count, std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Waits for at least one task to be available for execution.
This method should be used as a hint,
as other threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
void wait_for_task ();
/*
This method returns when one or more tasks are available for
execution or max_waiting_time has passed.
Returns true if at at least one task is available for execution, false otherwise.
This method should be used as a hint, as other threads (calling loop, for example)
might empty the executor, before this thread has a chance to do something
with the newly enqueued tasks.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool wait_for_task_for (std::chrono::milliseconds max_waiting_time);
/*
This method returns when one or more tasks are available for execution or timeout_time has reached.
Returns true if at at least one task is available for execution, false otherwise.
This method should be used as a hint,
as other threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
bool wait_for_task_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
This method returns when max_count or more tasks are available for execution.
This method should be used as a hint, as other threads
(calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
void wait_for_tasks ( size_t max_count);
/*
This method returns when max_count or more tasks are available for execution
or max_waiting_time (in milliseconds) has passed.
Returns the number of tasks available for execution when the method returns.
This method should be used as a hint, as other
threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t wait_for_tasks_for ( size_t count, std::chrono::milliseconds max_waiting_time);
/*
This method returns when max_count or more tasks are available for execution
or timeout_time is reached.
Returns the number of tasks available for execution when the method returns.
This method should be used as a hint, as other threads
(calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
size_t wait_for_tasks_until ( size_t count, std::chrono::time_point<clock_type, duration_type> timeout_time);
};非同期値と例外は、concurrencpp結果オブジェクトを使用して消費できます。 resultタイプは熱心なタスクの非同期結果を表し、 lazy_result怠zyなタスクの延期された結果を表します。
タスク(熱心または怠zy)が完了すると、有効な値を返すか、例外をスローします。どちらの場合でも、この非同期結果は結果オブジェクトの消費者に渡されます。
resultオブジェクトは非対称コルーチンを形成します - 発信者コルーチンの実行は、Callee-Coroutineの実行によっては影響を受けません。両方のコルーチンは独立して実行できます。 Callee-Coroutineの結果を消費する場合にのみ、Callee CoroutineはCalleeが完了するのを待って吊り下げられる可能性があります。その時点まで、両方のコルーチンが独立して実行されます。 Callee-Coroutineは、その結果が消費されるかどうかを実行します。
lazy_resultオブジェクトは対称コルーチンを形成します - Callee-Coroutineの実行は、発信者コルーチンの停止後にのみ発生します。怠zyな結果を待つと、現在のコルーチンが中断され、怠zyな結果に関連する怠zyなタスクが実行され始めます。 Callee-Coroutineが完了して結果を生み出した後、発信者コルーチンが再開されます。怠zyな結果が消費されない場合、それに関連する怠zyなタスクが実行され始めません。
すべての結果オブジェクトはムーブのみのタイプであるため、コンテンツが別の結果オブジェクトに移動した後に使用することはできません。この場合、結果オブジェクトは空であると見なされ、 operator boolおよびoperator =以外のメソッドを呼び出そうとする試みが例外をスローします。
非同期結果が結果オブジェクトから引き出された後(たとえば、 get or operator co_awaitを呼び出すことにより)、結果オブジェクトは空になります。空虚はoperator boolでテストできます。
結果を待つということは、結果オブジェクトの準備ができるまで現在のコルーチンを吊り下げることを意味します。関連するタスクから有効な値が返された場合、結果オブジェクトから返されます。関連するタスクが例外をスローする場合、それは再発します。待っている時点で、結果がすでに準備ができている場合、現在のコルーチンはすぐに再開します。それ以外の場合、非同期の結果または例外を設定するスレッドによって再開されます。
結果を解決することは、それを待つことに似ています。違いは、 co_await式が、結果のない形で、結果のオブジェクト自体を、すぐに空の形で返すことです。非同期結果は、 getまたはco_awaitを使用して引くことができます。
すべての結果オブジェクトには、非同期結果の状態を示すステータスがあります。結果ステータスはresult_status::idle (非同期結果または例外がまだ生成されていない)からresult_status::exception result_status::valueまでさまざまです。ステータスは(lazy_)result::statusを呼び出すことで照会できます。
resultタイプresultタイプはstd::futureと同様に、進行中の非同期タスクの結果を表します。
result-objectsを待って解決することは別として、 result::wait 、 result::wait_for 、 result::wait_untilまたはresult::get 。結果が終了するのを待つのはブロッキング操作です(非同期結果の準備が整っていない場合)、非同期結果が待機する実行のスレッド全体を一時停止して利用可能になります。待機操作は一般に落胆し、ルートレベルのタスクまたはそれを許可する文脈でのみ許可されています。これは、アプリケーションの残りの部分が優雅に仕上げるのを待っているメインスレッドをブロックするか、 concurrencpp::blocking_executorまたはconcurrencpp::thread_executorを使用します。
co_awaitを使用して結果オブジェクトを待っています(そして、そうすることで、現在の関数/タスクをコルーチンに変えることもできます)は、基礎となるスレッドをブロックしないため、結果オブジェクトを消費する好ましい方法です。
result API class result {
/*
Creates an empty result that isn't associated with any task.
*/
result () noexcept = default ;
/*
Destroys the result. Associated tasks are not cancelled.
The destructor does not block waiting for the asynchronous result to become ready.
*/
~result () noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result (result&& rhs) noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.
*/
result& operator = (result&& rhs) noexcept = default ;
/*
Returns true if this is a non-empty result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The returned value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Blocks the current thread of execution until this result is ready,
when status() != result_status::idle.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void wait ();
/*
Blocks until this result is ready or duration has passed. Returns the status
of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class duration_unit , class ratio >
result_status wait_for (std::chrono::duration<duration_unit, ratio> duration);
/*
Blocks until this result is ready or timeout_time has reached. Returns the status
of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class clock , class duration >
result_status wait_until (std::chrono::time_point<clock, duration> timeout_time);
/*
Blocks the current thread of execution until this result is ready,
when status() != result_status::idle.
If the result is a valid value, it is returned, otherwise, get rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
type get ();
/*
Returns an awaitable used to await this result.
If the result is already ready - the current coroutine resumes
immediately in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended
and resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to resolve this result.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
};lazy_resultタイプ怠zyな結果オブジェクトは、延期された怠zyなタスクの結果を表します。
lazy_resultは、関連する怠zyなタスクを開始し、その繰延結果を消費者に渡す責任があります。待っているか解決されたとき、怠zyな結果は現在のコルーチンを一時停止し、関連する怠zyなタスクを開始します。関連するタスクが完了すると、その非同期値は発信者タスクに渡され、その後再開されます。
APIが怠zyな結果を返すことがありますが、アプリケーションは、(発信者タスクを一時停止せずに)熱心に実行するために関連するタスクが必要です。この場合、怠zyなタスクは、関連する怠zyな結果でrun呼び出すことにより、熱心なタスクに変換できます。この場合、関連するタスクは、発信者タスクを一時停止せずに、インラインで実行され始めます。元の怠zyな結果は空になり、新しく開始されたタスクを監視する有効なresultオブジェクトが代わりに返されます。
lazy_result api class lazy_result {
/*
Creates an empty lazy result that isn't associated with any task.
*/
lazy_result () noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
lazy_result (lazy_result&& rhs) noexcept ;
/*
Destroys the result. If not empty, the destructor destroys the associated task without resuming it.
*/
~lazy_result () noexcept ;
/*
Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.
If *this is not empty, then operator= destroys the associated task without resuming it.
*/
lazy_result& operator =(lazy_result&& rhs) noexcept ;
/*
Returns true if this is a non-empty result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The returned value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Returns an awaitable used to start the associated task and await this result.
If the result is already ready - the current coroutine resumes immediately
in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended and
resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to start the associated task and resolve this result.
If the result is already ready - the current coroutine resumes immediately
in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended and resumed
when the asynchronous result is ready, by the thread which
had set the asynchronous value or exception.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
/*
Runs the associated task inline and returns a result object that monitors the newly started task.
After this call, *this is empty.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
*/
result<type> run ();
};通常の熱心なコルーチンは、実行の呼び出しスレッドで同期して実行され始めます。たとえば、コルーチンが再スケジュールを受ける場合、その内部の準備ができていない結果オブジェクトを待つことにより、実行の別のスレッドに実行される場合があります。 concurrencppは、並列コルーチンも提供します。並列コルーチンは、実行されるスレッドではなく、特定のエグゼクターの内部で実行され始めます。このスタイルのスケジューリングコルーチンは、並列アルゴリズム、再帰アルゴリズム、およびフォークジーンモデルを使用する同時アルゴリズムを作成する場合に特に役立ちます。
すべての並列コルーチンは、次の前提条件を満たす必要があります。
result / null_resultのいずれかを返します。executor_tag最初の引数として取得します。type* / type& / std::shared_ptr<type>のいずれかを取得します。このtype 、2番目の引数として具体的なクラスのexecutorです。co_awaitまたはco_returnが含まれています。上記のすべてが適用される場合、関数は並列コルーチンです。CONCURRENCPPは、Coroutineの中断を開始し、すぐにそれを再スケジュールして、提供されたエグゼキューターで実行します。 concurrencpp::executor_tagこの関数が通常の関数ではないことをconcurrencppランタイムに伝えるためのダミープレースホルダーであり、指定されたエグゼクターの内部で実行を開始する必要があります。エグゼキューターがパラレルコルーチンに渡された場合、コルーチンは実行を開始せず、 std::invalid_argument例外が同期してスローされます。すべての前提条件が満たされている場合、アプリケーションは、返された結果オブジェクトを使用して、並列コルーチンの結果を消費できます。
この例では、フィボナッチ配列の30番目のメンバーを並行方法で計算します。各フィボナッチステップの発売を開始し始めます。最初の引数はダミーexecutor_tagであり、2番目の引数はThreadPool Exectorです。すべての再帰ステップは、並行して実行される新しい並列コルーチンを呼び出します。各結果は、親タスクに対してco_return edであり、 co_awaitを使用して取得されます。
入力を同期的に計算するのに十分なほど小さいと判断した場合( curr <= 10の場合)、各再帰ステップの実行を停止し、アルゴリズムを同期して解決します。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace concurrencpp ;
int fibonacci_sync ( int i) {
if (i == 0 ) {
return 0 ;
}
if (i == 1 ) {
return 1 ;
}
return fibonacci_sync (i - 1 ) + fibonacci_sync (i - 2 );
}
result< int > fibonacci (executor_tag, std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10 ) {
co_return fibonacci_sync (curr);
}
auto fib_1 = fibonacci ({}, tpe, curr - 1 );
auto fib_2 = fibonacci ({}, tpe, curr - 2 );
co_return co_await fib_1 + co_await fib_2;
}
int main () {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci ({}, runtime. thread_pool_executor (), 30 ). get ();
std::cout << " fibonacci(30) = " << fibb_30 << std::endl;
return 0 ;
}比較するために、これは並列コルーチンを使用せずに同じコードがどのように記述されるか、およびexecutor::submit方法です。 fibonacci result<int>を返すため、 executor::submitを介して再帰的に送信するとresult<result<int>> 。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace concurrencpp ;
int fibonacci_sync ( int i) {
if (i == 0 ) {
return 0 ;
}
if (i == 1 ) {
return 1 ;
}
return fibonacci_sync (i - 1 ) + fibonacci_sync (i - 2 );
}
result< int > fibonacci (std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10 ) {
co_return fibonacci_sync (curr);
}
auto fib_1 = tpe-> submit (fibonacci, tpe, curr - 1 );
auto fib_2 = tpe-> submit (fibonacci, tpe, curr - 2 );
co_return co_await co_await fib_1 +
co_await co_await fib_2;
}
int main () {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci (runtime. thread_pool_executor (), 30 ). get ();
std::cout << " fibonacci(30) = " << fibb_30 << std::endl;
return 0 ;
}結果オブジェクトは、concurrencppのタスク間でデータを渡す主な方法であり、エグゼキューターとコルーチンがそのようなオブジェクトをどのように生成するかを見てきました。たとえば、サードパーティライブラリを使用する場合、非タスクを備えた結果オブジェクトの機能を使用する場合があります。この場合、 result_promiseを使用して結果オブジェクトを完了できます。 result_promise std::promiseオブジェクトに似ています - アプリケーションは、非同期結果または例外を手動で設定し、関連するresultオブジェクトを準備することができます。
結果のオブジェクトと同様に、結果のプロモーションは、移動後に空になる移動唯一のタイプです。同様に、結果または例外を設定した後、結果の約束も空になります。結果の範囲が範囲から抜け出し、結果/例外が設定されていない場合、result-promise Destructorはset_exceptionメソッドを使用してconcurrencpp::errors::broken_task例外を設定します。関連する結果オブジェクトを待っている吊り下げ式およびブロックされたタスクは、再開/ブロックされていません。
結果の約束は、コールバックスタイルのコードスタイルをasync/awaitスタイルのコードスタイルに変換できます。コンポーネントが非同期結果を渡すためにコールバックを必要とする場合は、渡された結果の約束でset_resultまたはset_exception (非同期結果自体に応じて)を呼び出し、関連する結果を返します。
result_promise api template < class type >
class result_promise {
/*
Constructs a valid result_promise.
Might throw std::bad_alloc if fails to allocate memory.
*/
result_promise ();
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result_promise (result_promise&& rhs) noexcept ;
/*
Destroys *this, possibly setting an errors::broken_task exception
by calling set_exception if *this is not empty at the time of destruction.
*/
~result_promise () noexcept ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result_promise& operator = (result_promise&& rhs) noexcept ;
/*
Returns true if this is a non-empty result-promise.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Sets a value by constructing <<type>> from arguments... in-place.
Makes the associated result object become ready - tasks waiting for it
to become ready are unblocked.
Suspended tasks are resumed inline.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Might throw any exception that the constructor
of type(std::forward<argument_types>(arguments)...) throws.
*/
template < class ... argument_types>
void set_result (argument_types&& ... arguments);
/*
Sets an exception.
Makes the associated result object become ready - tasks waiting for it
to become ready are unblocked.
Suspended tasks are resumed inline.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Throws std::invalid_argument exception if exception_ptr is null.
*/
void set_exception (std::exception_ptr exception_ptr);
/*
A convenience method that invokes a callable with arguments... and calls set_result
with the result of the invocation.
If an exception is thrown, the thrown exception is caught and set instead by calling set_exception.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Might throw any exception that callable(std::forward<argument_types>(arguments)...)
or the contructor of type(type&&) throw.
*/
template < class callable_type , class ... argument_types>
void set_from_function (callable_type&& callable, argument_types&& ... arguments);
/*
Gets the associated result object.
Throws errors::empty_result_promise exception If *this is empty.
Throws errors::result_already_retrieved exception if this method had been called before.
*/
result<type> get_result ();
};result_promise例:この例では、 result_promiseを使用して1つのスレッドからデータをプッシュし、別のスレッドから関連するresultオブジェクトから引き出すことができます。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::result_promise<std::string> promise;
auto result = promise. get_result ();
std::thread my_3_party_executor ([promise = std::move (promise)] () mutable {
std::this_thread::sleep_for ( std::chrono::seconds ( 1 )); // Imitate real work
promise. set_result ( " hello world " );
});
auto asynchronous_string = result. get ();
std::cout << " result promise returned string: " << asynchronous_string << std::endl;
my_3_party_executor. join ();
}この例では、 std::threadサードパーティのエグゼキューターとして使用します。これは、アプリケーションのライフサイクルの一部として非concurrencppエグゼクティアが使用される場合のシナリオを表します。約束を渡す前に結果オブジェクトを抽出し、結果の準備ができるまでメインスレッドをブロックします。 my_3_party_executorでは、 co_return edのように結果を設定します。
共有結果は、複数の消費者がstd::shared_futureと同様に、非同期結果にアクセスできるようにする特別な種類の結果オブジェクトです。さまざまなスレッドからのさまざまな消費者は、 awaitなどのgetをresolveことができます。
共有結果は通常の結果オブジェクトから構築されており、通常の結果オブジェクトとは異なり、コピー可能で可動性があります。そのため、 shared_result std::shared_ptrタイプのように動作します。共有結果インスタンスが別のインスタンスに移動された場合、インスタンスが空になり、アクセスしようとすると例外がスローされます。
複数の消費者をサポートするために、共有結果は、それを移動する代わりに非同期値への参照を返します(通常の結果のように)。たとえば、 shared_result<int>は、 int& when get 、 awaitなどを返します。 shared_resultの根底にあるタイプがvoidまたは参照タイプ( int&など)である場合、それらは通常どおりに返されます。非同期の結果がスローされた発現である場合、それは再び投げかけられます。
複数のスレッドからshared_resultを使用して非同期結果を取得することはスレッドセーフであることに注意してください。実際の値はスレッド安全ではない場合があります。たとえば、複数のスレッドは、その参照( int& )を受信することにより非同期整数を取得できます。整数自体の糸を安全にしません。非同期値がすでにスレッド安全である場合、非同期値を変異させることは大丈夫です。あるいは、アプリケーションは、 constタイプを使用して最初に( const intなど)を使用し、突然変異を防ぐ一定の参照( const int&など)を取得することをお勧めします。
shared_result API class share_result {
/*
Creates an empty shared-result that isn't associated with any task.
*/
shared_result () noexcept = default ;
/*
Destroys the shared-result. Associated tasks are not cancelled.
The destructor does not block waiting for the asynchronous result to become ready.
*/
~shared_result () noexcept = default ;
/*
Converts a regular result object to a shared-result object.
After this call, rhs is empty.
Might throw std::bad_alloc if fails to allocate memory.
*/
shared_result (result<type> rhs);
/*
Copy constructor. Creates a copy of the shared result object that monitors the same task.
*/
shared_result ( const shared_result&) noexcept = default ;
/*
Move constructor. Moves rhs to *this. After this call, rhs is empty.
*/
shared_result (shared_result&& rhs) noexcept = default ;
/*
Copy assignment operator. Copies rhs to *this and monitors the same task that rhs monitors.
*/
shared_result& operator =( const shared_result& rhs) noexcept ;
/*
Move assignment operator. Moves rhs to *this. After this call, rhs is empty.
*/
shared_result& operator =(shared_result&& rhs) noexcept ;
/*
Returns true if this is a non-empty shared-result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The return value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Blocks the current thread of execution until this shared-result is ready,
when status() != result_status::idle.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void wait ();
/*
Blocks until this shared-result is ready or duration has passed.
Returns the status of this shared-result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class duration_type , class ratio_type >
result_status wait_for (std::chrono::duration<duration_type, ratio_type> duration);
/*
Blocks until this shared-result is ready or timeout_time has reached.
Returns the status of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class clock_type , class duration_type >
result_status wait_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Blocks the current thread of execution until this shared-result is ready,
when status() != result_status::idle.
If the result is a valid value, a reference to it is returned,
otherwise, get rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
std:: add_lvalue_reference_t <type> get ();
/*
Returns an awaitable used to await this shared-result.
If the shared-result is already ready - the current coroutine resumes
immediately in the calling thread of execution.
If the shared-result is not ready yet, the current coroutine is
suspended and resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, a reference to it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to resolve this shared-result.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
};shared_result例:この例では、 resultオブジェクトはshared_resultオブジェクトに変換され、非同期int結果への参照は、 thread_executorで発生した多くのタスクによって取得されます。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < chrono >
concurrencpp::result< void > consume_shared_result (concurrencpp::shared_result< int > shared_result,
std::shared_ptr<concurrencpp::executor> resume_executor) {
std::cout << " Awaiting shared_result to have a value " << std::endl;
const auto & async_value = co_await shared_result;
concurrencpp::resume_on (resume_executor);
std::cout << " In thread id " << std::this_thread::get_id () << " , got: " << async_value << " , memory address: " << &async_value << std::endl;
}
int main () {
concurrencpp::runtime runtime;
auto result = runtime. background_executor ()-> submit ([] {
std::this_thread::sleep_for ( std::chrono::seconds ( 1 ));
return 100 ;
});
concurrencpp::shared_result< int > shared_result ( std::move (result));
concurrencpp::result< void > results[ 8 ];
for ( size_t i = 0 ; i < 8 ; i++) {
results[i] = consume_shared_result (shared_result, runtime. thread_pool_executor ());
}
std::cout << " Main thread waiting for all consumers to finish " << std::endl;
auto tpe = runtime. thread_pool_executor ();
auto all_consumed = concurrencpp::when_all (tpe, std::begin (results), std::end (results)). run ();
all_consumed. get ();
std::cout << " All consumers are done, exiting " << std::endl;
return 0 ;
}ランタイムオブジェクトがmainの範囲から抜け出すと、保存されている各エグゼキューターを反復し、 shutdown方法を呼び出します。タイマーキューまたは任意のエグゼキューにアクセスしようとするとerrors::runtime_shutdown例外がスローされます。執行者がシャットダウンすると、内部のタスクキューをクリアし、実行されていないtaskオブジェクトを破壊します。タスクオブジェクトがconcurrencpp-coroutineを保存する場合、そのコルーチンはインラインで再開され、 errors::broken_task例外がその内部にスローされます。 runtime_shutdownまたはbroken_task例外がスローされている場合には、アプリケーションはできるだけ早く現在のコードフローを優雅に終了する必要があります。これらの例外は無視されるべきではありません。 runtime_shutdownとbroken_task両方がerrors::interrupted_taskベースクラスから継承されます。このタイプは、統一された方法で終了を処理するためにcatch句でも使用できます。
多くの同時非同期アクションには、履歴書の執行者として執行者のインスタンスが必要です。非同期アクション(コルーチンとして実装)が同期して終了できる場合、実行の呼び出しスレッドですぐに再開されます。非同期アクションが同期して終了できない場合、指定された履歴書実行者内で終了すると再開されます。たとえば、 when_anyユーティリティ関数には、最初の引数として履歴書実行者のインスタンスが必要です。 when_any lazy_resultを返します。結果の1つがwhen_any呼び出す瞬間に既に準備ができている場合、呼び出しコルーチンは、実行の呼び出しスレッドで同期して再開されます。そうでない場合、指定された履歴書執行者の内部で、少なくとも結果が終了すると、呼び出しのコルーチンが再開されます。履歴書執行者は、コルーチンが再開されることになっている場合(たとえば、 when_any and when_allの場合)、または非同期アクションが特定のアクションを処理するためだけに使用され、アプリケーションコードを処理し、アプリケーションコードを処理するために使用される同時労働者のいずれか内で処理される場合に、コルーチンが再開される場合を義務付けているため、執行者が重要です。
make_ready_result関数make_ready_result 、与えられた引数から準備ができた結果オブジェクトを作成します。そのような結果を待つと、現在のコルーチンがすぐに再開されます。 get and operator co_await構築された値を返します。
/*
Creates a ready result object by building <<type>> from arguments&&... in-place.
Might throw any exception that the constructor
of type(std::forward<argument_types>(arguments)...) throws.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type , class ... argument_types>
result<type> make_ready_result (argument_types&& ... arguments);
/*
An overload for void type.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
result< void > make_ready_result ();make_exceptional_result関数make_exceptional_result特定の例外から準備が整った結果オブジェクトを作成します。そのような結果を待つと、現在のコルーチンがすぐに再開されます。 get and operator co_await指定された例外を再スローします。
/*
Creates a ready result object from an exception pointer.
The returned result object will re-throw exception_ptr when calling get or await.
Throws std::invalid_argument if exception_ptr is null.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type >
result<type> make_exceptional_result (std::exception_ptr exception_ptr);
/*
Overload. Similar to make_exceptional_result(std::exception_ptr),
but gets an exception object directly.
Might throw any exception that the constructor of exception_type(std::move(exception)) might throw.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type , class exception_type >
result<type> make_exceptional_result (exception_type exception );when_all関数when_all 、すべての入力結果が完了すると準備ができる怠zyな結果オブジェクトを作成するユーティリティ関数です。この怠zyな結果を待つと、準備が整う準備ができているすべての入力結果オブジェクトが返されます。
when_all関数には3つのフレーバーが付いています。1つは異種の範囲の結果オブジェクトを受け入れ、もう1つは同じタイプの結果オブジェクトの範囲に繰り返しのペアを取得し、最後に結果オブジェクトをまったく受け入れないオーバーロードです。入力結果オブジェクトがない場合 - 関数は、空のタプルの準備ができた結果オブジェクトを返します。
渡された結果オブジェクトの1つが空である場合、例外がスローされます。この場合、入力結果オブジェクトは関数の影響を受けず、例外が処理された後に再び使用できます。すべての入力結果オブジェクトが有効である場合、それらはこの関数によって空になり、出力結果として有効で準備ができた状態で返されます。
現在、 when_all resultオブジェクトのみを受け入れます。
すべてのオーバーロードは、履歴書執行者を最初のパラメーターとして受け入れます。 when_allによって返される結果を待つと、発信者のコルーチンは与えられた履歴書執行者によって再開されます。
/*
Creates a result object that becomes ready when all the input results become ready.
Passed result objects are emptied and returned as a tuple.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class ... result_types>
lazy_result<std::tuple< typename std::decay<result_types>::type...>>
when_all (std::shared_ptr<executor_type> resume_executor,
result_types&& ... results);
/*
Overload. Similar to when_all(result_types&& ...) but receives a pair of iterators referencing a range.
Passed result objects are emptied and returned as a vector.
If begin == end, the function returns immediately with an empty vector.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class iterator_type >
lazy_result<std::vector< typename std::iterator_traits<iterator_type>::value_type>>
when_all (std::shared_ptr<executor_type> resume_executor,
iterator_type begin, iterator_type end);
/*
Overload. Returns a ready result object that doesn't monitor any asynchronous result.
Might throw an std::bad_alloc exception if no memory is available.
*/
lazy_result<std::tuple<>> when_all (std::shared_ptr<executor_type> resume_executor);when_any関数when_any 、少なくとも1つの入力結果が完了すると準備が整った怠zyな結果オブジェクトを作成するユーティリティ関数です。この結果を待つと、すべての入力結果オブジェクトと完成したタスクのインデックスを含むヘルパー構造体が返されます。準備が整った結果を消費するまでに、他の結果がすでに非同期に完了している可能性があります。アプリケーションは、すべての結果が消費されるまで完了するときにすぐに結果を消費するために、 when_any繰り返し呼び出すことができます。
when_any関数には2つのフレーバーのみが搭載されています。1つは異種の範囲の結果オブジェクトを受け入れ、もう1つは同じタイプの範囲の結果オブジェクトに繰り返しのペアを取得するものです。 when_allとは異なり、結果の範囲が完全に空である場合に終了するために少なくとも1つのタスクを待つことには意味がありません。したがって、引数のない過負荷はありません。また、2つのイテレーターの過負荷は、これらの反復器が空の範囲を参照する場合( begin == end )を参照する場合に例外をスローします。
渡された結果オブジェクトの1つが空である場合、例外がスローされます。いずれにせよ、例外がスローされ、入力結果オブジェクトは関数の影響を受けず、例外を処理した後に再度使用できます。すべての入力結果オブジェクトが有効である場合、それらはこの関数によって空になり、出力の結果として有効な状態で返されます。
現在、 when_any resultオブジェクトのみを受け入れます。
すべてのオーバーロードは、履歴書執行者を最初のパラメーターとして受け入れます。 when_anyによって返される結果を待つと、発信者のコルーチンは与えられた履歴書執行者によって再開されます。
/*
Helper struct returned from when_any.
index is the position of the ready result in results sequence.
results is either an std::tuple or an std::vector of the results that were passed to when_any.
*/
template < class sequence_type >
struct when_any_result {
std:: size_t index;
sequence_type results;
};
/*
Creates a result object that becomes ready when at least one of the input results is ready.
Passed result objects are emptied and returned as a tuple.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class ... result_types>
lazy_result<when_any_result<std::tuple<result_types...>>>
when_any (std::shared_ptr<executor_type> resume_executor,
result_types&& ... results);
/*
Overload. Similar to when_any(result_types&& ...) but receives a pair of iterators referencing a range.
Passed result objects are emptied and returned as a vector.
Throws std::invalid_argument if begin == end.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class iterator_type >
lazy_result<when_any_result<std::vector< typename std::iterator_traits<iterator_type>::value_type>>>
when_any (std::shared_ptr<executor_type> resume_executor,
iterator_type begin, iterator_type end);resume_on関数resume_on 、現在のコルーチンを一時停止し、与えられたexecutor内でそれを再開する待望を返します。これは、Coroutineが適切な執行者で実行されていることを確認する重要な機能です。たとえば、アプリケーションはbackground_executorを使用してバックグラウンドタスクをスケジュールし、返された結果オブジェクトを待っている場合があります。この場合、待ち望んでいるコルーチンは、バックグラウンドエグゼクタ内で再開されます。別のCPUバウンドエグゼキューターとのresume_onへの呼び出しは、バックグラウンドタスクが完了すると、CPUバインドのコードラインがバックグラウンドエグゼクタで実行されないことを確認します。 resume_onを使用して別のエグゼキュータで実行するようにタスクが再スケジュールされているが、そのエグゼクティが一時停止されたタスクを再開する前にシャットダウンされると、そのタスクはすぐに再開され、 erros::broken_task例外がスローされます。この場合、アプリケーションは非常に優雅に必要です。
/*
Returns an awaitable that suspends the current coroutine and resumes it inside executor.
Might throw any exception that executor_type::enqueue throws.
*/
template < class executor_type >
auto resume_on (std::shared_ptr<executor_type> executor);concurrencppは、タイマーとタイマーキューも提供します。タイマーは、明確に定義された時間内に執行者に実行される非同期アクションを定義するオブジェクトです。通常のタイマー、オンショットタイマー、遅延オブジェクトの3種類のタイマーがあります。
通常のタイマーには、それらを定義する4つのプロパティがあります。
concurrencppの他のオブジェクトと同様に、タイマーは空になる可能性のある動きのみのタイプです。タイマーが破壊されるか、 timer::cancelが呼び出されると、タイマーはスケジュールされているがまだ実行されていないタスクをキャンセルします。進行中のタスクは発生していません。呼び出すタイマーは、スレッドセーフでなければなりません。デュータイムとタイマーの頻度を50ミリ秒の粒度に設定することをお勧めします。
タイマーキューは、タイマーのコレクションを管理し、実行の1つのスレッドでそれらを処理する同時ワーカーです。また、新しいタイマーを作成するために使用されるエージェントでもあります。タイマーの締め切り(タイマーの期日または頻度であろうと)が到達した場合、タイマーキューは、タスクとして関連するエグゼキューターで実行できるように呼び出すことができるスケジュールをスケジュールすることにより、タイマーを「発射」します。
執行者と同様に、タイマーキューもRAIIの概念を遵守します。ランタイムオブジェクトがスコープから抜け出すと、タイマーキューがシャットダウンされ、保留中のすべてのタイマーがキャンセルされます。タイマーキューがシャットダウンされた後、 make_timer 、 make_onshot_timer 、およびmake_delay_objectへのその後の呼び出しはerrors::runtime_shutdown例外をスローします。アプリケーションは、それ自体でタイマーキューをシャットダウンしようとしてはなりません。
timer_queue API: class timer_queue {
/*
Destroys this timer_queue.
*/
~timer_queue () noexcept ;
/*
Shuts down this timer_queue:
Tells the underlying thread of execution to quit and joins it.
Cancels all pending timers.
After this call, invocation of any method besides shutdown
and shutdown_requested will throw an errors::runtime_shutdown.
If shutdown had been called before, this method has no effect.
*/
void shutdown () noexcept ;
/*
Returns true if shutdown had been called before, false otherwise.
*/
bool shutdown_requested () const noexcept ;
/*
Creates a new running timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
template < class callable_type , class ... argumet_types>
timer make_timer (
std::chrono::milliseconds due_time,
std::chrono::milliseconds frequency,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new one-shot timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
template < class callable_type , class ... argumet_types>
timer make_one_shot_timer (
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new delay object where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
result< void > make_delay_object (
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor);
};timer API: class timer {
/*
Creates an empty timer.
*/
timer () noexcept = default ;
/*
Cancels the timer, if not empty.
*/
~timer () noexcept ;
/*
Moves the content of rhs to *this.
rhs is empty after this call.
*/
timer (timer&& rhs) noexcept = default ;
/*
Moves the content of rhs to *this.
rhs is empty after this call.
Returns *this.
*/
timer& operator = (timer&& rhs) noexcept ;
/*
Cancels this timer.
After this call, the associated timer_queue will not schedule *this
to run again and *this becomes empty.
Scheduled, but not yet executed tasks are cancelled.
Ongoing tasks are uneffected.
This method has no effect if *this is empty or the associated timer_queue has already expired.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void cancel ();
/*
Returns the associated executor of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::shared_ptr<executor> get_executor () const ;
/*
Returns the associated timer_queue of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::weak_ptr<timer_queue> get_timer_queue () const ;
/*
Returns the due time of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::chrono::milliseconds get_due_time () const ;
/*
Returns the frequency of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::chrono::milliseconds get_frequency () const ;
/*
Sets new frequency for this timer.
Callables already scheduled to run at the time of invocation are not affected.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
void set_frequency (std::chrono::milliseconds new_frequency);
/*
Returns true is *this is not an empty timer, false otherwise.
The timer should not be used if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
};この例では、タイマーキューを使用して通常のタイマーを作成します。タイマーは、1.5秒後に実行できるように呼ばれるスケジュールをスケジュールし、2秒ごとに呼び出し可能な発射を行います。指定された呼び出し可能なものは、ThreadPool Exectorで実行されます。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
int main () {
concurrencpp::runtime runtime;
std:: atomic_size_t counter = 1 ;
concurrencpp::timer timer = runtime. timer_queue ()-> make_timer (
1500ms,
2000ms,
runtime. thread_pool_executor (),
[&] {
const auto c = counter. fetch_add ( 1 );
std::cout << " timer was invoked for the " << c << " th time " << std::endl;
});
std::this_thread::sleep_for (12s);
return 0 ;
}OneShotタイマーは、1回限りのタイムタイマーであるため、再スケジュールを再スケジュールして再び実行できなくなった後、実行可能なスケジュールをスケジュールした後です。
この例では、作成から3秒後に1回だけ実行されるタイマーを作成します。タイマーは、新しい実行スレッド( thread_executorを使用)で実行できるように呼び出します。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
int main () {
concurrencpp::runtime runtime;
concurrencpp::timer timer = runtime. timer_queue ()-> make_one_shot_timer (
3000ms,
runtime. thread_executor (),
[&] {
std::cout << " hello and goodbye " << std::endl;
});
std::this_thread::sleep_for (4s);
return 0 ;
}遅延オブジェクトは、 co_await edであり、その期限に達すると準備が整うようになる怠zyな結果オブジェクトです。アプリケーションは、この結果オブジェクトco_await 、非ブロッキング方法で現在のコルーチンを遅らせることができます。現在のCoroutineは、 make_delay_objectに渡された執行者によって再開されます。
この例では、遅延オブジェクトでco_await呼び出すことにより、ループで自分自身を遅らせるタスク(結果を返さない、または例外をスローしません)を生成します。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
concurrencpp::null_result delayed_task (
std::shared_ptr<concurrencpp::timer_queue> tq,
std::shared_ptr<concurrencpp::thread_pool_executor> ex) {
size_t counter = 1 ;
while ( true ) {
std::cout << " task was invoked " << counter << " times. " << std::endl;
counter++;
co_await tq-> make_delay_object (1500ms, ex);
}
}
int main () {
concurrencpp::runtime runtime;
delayed_task (runtime. timer_queue (), runtime. thread_pool_executor ());
std::this_thread::sleep_for (10s);
return 0 ;
}ジェネレーターは、消費する値の流れを生成できる怠zyな同期コルーチンです。ジェネレーターはco_yieldキーワードを使用して、消費者に値を戻します。
ジェネレーターは同期的に使用することを目的としています - co_yieldキーワードのみを使用でき、 co_awaitキーワードを使用してはなりません。 co_yieldキーワードが呼び出されている限り、ジェネレーターは値を生成し続けます。 co_returnキーワードが(明示的または暗黙的に)呼び出された場合、ジェネレーターは値の生成を停止します。同様に、例外がスローされた場合、ジェネレーターは値の生成を停止し、スローされた例外がジェネレーターの消費者に再投げられます。
発電機は、 range-forで使用されるforを目的としています。ジェネレーターはbegin 2つendイテレーターを暗黙的に生成します。これらの反復器は、手動で処理またはアクセスしないでください。
ジェネレーターが作成されると、怠zyなタスクとして始まります。 begin方法が呼び出されると、発電機が初めて再開され、イテレーターが返されます。怠zyなタスクは、返されたイテレーターでoperator++を呼び出すことにより、繰り返し再開されます。返されたイテレーターは、ジェネレーターが優雅に終了するか、例外をスローすることにより、発電機が実行を終了すると、 end反復因子と等しくなります。前述のように、これはループとジェネレーターの内部メカニズムによって舞台裏で起こり、直接呼び出されるべきではありません。
concurrencppの他のオブジェクトと同様に、ジェネレーターはムーブのみのタイプです。発電機が移動した後、それは空であると見なされ、その内部の方法( operator bool以外)にアクセスしようとすると例外がスローされます。ジェネレーターの空虚は一般的に発生するべきではありません - forループでの作成時に発電機を消費することをお勧めします。
generator API class generator {
/*
Move constructor. After this call, rhs is empty.
*/
generator (generator&& rhs) noexcept ;
/*
Destructor. Invalidates existing iterators.
*/
~generator () noexcept ;
generator ( const generator& rhs) = delete ;
generator& operator =(generator&& rhs) = delete ;
generator& operator =( const generator& rhs) = delete ;
/*
Returns true if this generator is not empty.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Starts running this generator and returns an iterator.
Throws errors::empty_generator if *this is empty.
Re-throws any exception that is thrown inside the generator code.
*/
iterator begin ();
/*
Returns an end iterator.
*/
static generator_end_iterator end () noexcept ;
};
class generator_iterator {
using value_type = std:: remove_reference_t <type>;
using reference = value_type&;
using pointer = value_type*;
using iterator_category = std::input_iterator_tag;
using difference_type = std:: ptrdiff_t ;
/*
Resumes the suspended generator and returns *this.
Re-throws any exception that was thrown inside the generator code.
*/
generator_iterator& operator ++();
/*
Post-increment version of operator++.
*/
void operator ++( int );
/*
Returns the latest value produced by the associated generator.
*/
reference operator *() const noexcept ;
/*
Returns a pointer to the latest value produced by the associated generator.
*/
pointer operator ->() const noexcept ;
/*
Comparision operators.
*/
friend bool operator ==( const generator_iterator& it0, const generator_iterator& it1) noexcept ;
friend bool operator ==( const generator_iterator& it, generator_end_iterator) noexcept ;
friend bool operator ==(generator_end_iterator end_it, const generator_iterator& it) noexcept ;
friend bool operator !=( const generator_iterator& it, generator_end_iterator end_it) noexcept ;
friend bool operator !=(generator_end_iterator end_it, const generator_iterator& it) noexcept ;
};generator例:この例では、シーケンスS(n) = 1 + 2 + 3 + ... + nのn番目のメンバーを生成するジェネレーターを書き込みます。ここで、 n <= 100 :
concurrencpp::generator< int > sequence () {
int i = 1 ;
int sum = 0 ;
while (i <= 100 ) {
sum += i;
++i;
co_yield sum;
}
}
int main () {
for ( auto value : sequence ()) {
std::cout << value << std::end;
}
return 0 ;
} いくつかの理由で、通常の同期ロックをタスク内で安全に使用することはできません。
std::mutexなどの同期ロックは、同じ実行スレッドでロックされてロック解除されると予想されます。ロックされていなかったスレッドの同期ロックのロックを解除することは、未定義の動作です。実行のスレッドでタスクを中断して再開できるため、タスク内で使用すると同期ロックが破損します。 concurrencpp::async_lock std::mutexに同様のAPIを提供することでこれらの問題を解決します。concurrencpp concurrencpp::async_lockを呼び出す主な違いは、タスク内で安全にco_awaitedを繰り返します。 1つのタスクがAsync-Lockをロックしようとすると失敗すると、タスクは一時停止され、ロックがロック解除されて停止されたタスクによって獲得されると再開されます。これにより、執行者は、高価なコンテキストスイッチングや高価なカーネルコールなしでロックを取得するのを待っている膨大な量のタスクを処理できます。
std::mutex仕組みと同様に、いつでもasync_lockを取得できるタスクは1つだけであり、取得時に読み取りバリアが配置されます。 Async Lockをリリースすると、書き込み障壁があり、次のタスクがそれを取得できるようにし、他の修飾子が行った変更を確認し、次の修飾子が表示される修正を投稿する一度に1桁のチェーンを作成します。
std::mutexのように、 concurrencpp::async_lock再帰的ではありません。そのようなロックを取得するときは、特に注意を払う必要があります - すでにロックを取得した別のタスクによって生じたタスクでロックを再度取得してはなりません。そのような場合、避けられないデッドロックが発生します。 concurrencppの他のオブジェクトとは異なり、 async_lockはコピー可能でも可動性もありません。
標準ロックと同様に、 concurrencpp::async_lockは、C ++ Raii Idiomを活用して、機能の戻りまたはスローされた例外によりロックが常にロックされていることを確認するスコープラッパーで使用することを目的としています。 async_lock::lock async_lock::unlock on destructionを呼び出すスコープラッパーの怠zyな結果を返します。 async_lock::unlockの生の使用は落胆します。 concurrencpp::scoped_async_lockスコープラッパーとして機能し、 std::unique_lockとほぼ同じAPIを提供します。 concurrencpp::scoped_async_lockは移動可能ですが、コピーできません。
async_lock::lock and scoped_async_lock::lockパラメーターとしてresume-Executorを必要とします。これらのメソッドを呼び出すと、ロックがロックできる場合、ロックされ、現在のタスクがすぐに再開されます。そうでない場合は、現在のタスクが停止され、ロックが最終的に取得されると、指定された履歴書実行者内に再開されます。
concurrencpp::scoped_async_lock async_lockをラップし、適切にロック解除されていることを確認します。 std::unique_lockのように、ロックをラップしない場合があり、この場合は空であると考えられています。空のscoped_async_lock 、デフォルトの構築、移動、またはscoped_async_lock::releaseメソッドが呼び出されたときに発生する可能性があります。空のスコープアジンロックは、破壊のロックのロックを解除しません。
スコープ付きアジンロックが空でない場合でも、基礎となる非同期ロックを所有しており、破壊時にロックを解除することを意味しません。 scoped_async_lock::unlockが呼び出された場合、またはscoped_async_lock(async_lock&, std::defer_lock_t) constructorを使用してscoped-async-lockを構築した場合、Scoped_async_lock :: lockが呼び出された場合、空ではなく所有者の非所有のscoped-asyncロックが発生する可能性があります。
async_lock api class async_lock {
/*
Constructs an async lock object.
*/
async_lock () noexcept ;
/*
Destructs an async lock object.
*this is not automatically unlocked at the moment of destruction.
*/
~async_lock () noexcept ;
/*
Asynchronously acquires the async lock.
If *this has already been locked by another non-parent task, the current task will be suspended
and will be resumed when *this is acquired, inside resume_executor.
If *this has not been locked by another task, then *this will be acquired and the current task will be resumed
immediately in the calling thread of execution.
If *this has already been locked by a parent task, then unavoidable dead-lock will occur.
Throws std::invalid_argument if resume_executor is null.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result<scoped_async_lock> lock (std::shared_ptr<executor> resume_executor);
/*
Tries to acquire *this in the calling thread of execution.
Returns true if *this is acquired, false otherwise.
In any case, the current task is resumed immediately in the calling thread of execution.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result< bool > try_lock ();
/*
Releases *this and allows other tasks (including suspended tasks waiting for *this) to acquire it.
Throws std::system error if *this is not locked at the moment of calling this method.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
void unlock ();
};scoped_async_lock api class scoped_async_lock {
/*
Constructs an async lock wrapper that does not wrap any async lock.
*/
scoped_async_lock () noexcept = default ;
/*
If *this wraps async_lock, this method releases the wrapped lock.
*/
~scoped_async_lock () noexcept ;
/*
Moves rhs to *this.
After this call, *rhs does not wrap any async lock.
*/
scoped_async_lock (scoped_async_lock&& rhs) noexcept ;
/*
Wrapps unlocked lock.
lock must not be in acquired mode when calling this method.
*/
scoped_async_lock (async_lock& lock, std:: defer_lock_t ) noexcept ;
/*
Wrapps locked lock.
lock must be already acquired when calling this method.
*/
scoped_async_lock (async_lock& lock, std:: adopt_lock_t ) noexcept ;
/*
Calls async_lock::lock on the wrapped locked, using resume_executor as a parameter.
Throws std::invalid_argument if resume_executor is nulll.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::lock throws.
*/
lazy_result< void > lock (std::shared_ptr<executor> resume_executor);
/*
Calls async_lock::try_lock on the wrapped lock.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::try_lock throws.
*/
lazy_result< bool > try_lock ();
/*
Calls async_lock::unlock on the wrapped lock.
If *this does not wrap any lock, this method does nothing.
Throws std::system_error if *this wraps a lock and it is not locked.
*/
void unlock ();
/*
Checks whether *this wraps a locked mutex or not.
Returns true if wrapped locked is in acquired state, false otherwise.
*/
bool owns_lock () const noexcept ;
/*
Equivalent to owns_lock.
*/
explicit operator bool () const noexcept ;
/*
Swaps the contents of *this and rhs.
*/
void swap (scoped_async_lock& rhs) noexcept ;
/*
Empties *this and returns a pointer to the previously wrapped lock.
After a call to this method, *this doesn't wrap any lock.
The previously wrapped lock is not released,
it must be released by either unlocking it manually through the returned pointer or by
capturing the pointer with another scoped_async_lock which will take ownerwhip over it.
*/
async_lock* release () noexcept ;
/*
Returns a pointer to the wrapped async_lock, or a null pointer if there is no wrapped async_lock.
*/
async_lock* mutex () const noexcept ;
};async_lock例:この例では、 async_lockを使用してデータレースが発生しないことを確認し、そのベクトルオブジェクトの内部状態の正確性が保存されていることを確認しながら、異なるタスクから10,000,000整数をstd::vectorベクトルオブジェクトにプッシュします。
# include " concurrencpp/concurrencpp.h "
# include < vector >
# include < iostream >
std::vector< size_t > numbers;
concurrencpp::async_lock lock;
concurrencpp::result< void > add_numbers (concurrencpp::executor_tag,
std::shared_ptr<concurrencpp::executor> executor,
size_t begin,
size_t end) {
for ( auto i = begin; i < end; i++) {
concurrencpp::scoped_async_lock raii_wrapper = co_await lock. lock (executor);
numbers. push_back (i);
}
}
int main () {
concurrencpp::runtime runtime;
constexpr size_t range = 10'000'000 ;
constexpr size_t sections = 4 ;
concurrencpp::result< void > results[sections];
for ( size_t i = 0 ; i < 4 ; i++) {
const auto range_start = i * range / sections;
const auto range_end = (i + 1 ) * range / sections;
results[i] = add_numbers ({}, runtime. thread_pool_executor (), range_start, range_end);
}
for ( auto & result : results) {
result. get ();
}
std::cout << " vector size is " << numbers. size () << std::endl;
// make sure the vector state has not been corrupted by unprotected concurrent accesses
std::sort (numbers. begin (), numbers. end ());
for ( size_t i = 0 ; i < range; i++) {
if (numbers[i] != i) {
std::cerr << " vector state is corrupted. " << std::endl;
return - 1 ;
}
}
std::cout << " succeeded pushing range [0 - 10,000,000] concurrently to the vector! " << std::endl;
return 0 ;
}async_condition_variable標準のcondition_variableを模倣し、 async_lockと一緒にタスクで安全に使用できます。 async_condition_variable 、 async_lockで動作し、一部の共有メモリ(ロックで保護)が変更されるまでタスクを一時停止します。共有メモリの変更を監視したいタスクは、 async_lockのインスタンスをロックし、 async_condition_variable::await 。これにより、ロックのロックが原子的に解除され、一部の修飾子タスクが条件変数に通知されるまで現在のタスクを一時停止します。モディファイアタスクはロックを取得し、共有メモリを変更し、ロックのロックを解除し、 notify_oneまたはnotify_allいずれかを呼び出します。中断されたタスクが再開されると( awaitでいる履歴書執行者を使用して)、再びロックをロックし、タスクをサスペンションのポイントからシームレスに継続できるようにします。 async_lockと同様に、 async_condition_variable可動またはコピー可能でもありません。1つの場所で作成され、複数のタスクでアクセスすることを意図しています。
async_condition_variable::await overloadsには、タスクの再開に使用される履歴書実行者とロックされたscoped_async_lock必要です。 async_condition_variable::await 2つの過負荷があります。1つは述語を受け入れ、もう1つは受け入れないものです。述語を受け入れない過負荷はnotify_*への呼び出しによって再開されるまで、呼び出し時にすぐに呼び出しタスクを一時停止します。述語を受け入れる過負荷は、共有メモリを共有メモリを検査させ、共有メモリが必要な状態に到達するまで繰り返しタスクを一時停止させることにより機能します。概略的には、呼び出しのように機能します
while (!pred()) { // pred() inspects the shared memory and returns true or false
co_await await (resume_executor, lock); // suspend the current task until another task calls `notify_xxx`
}標準の条件変数と同様に、アプリケーションは、懸濁液や逆漏れに対するより細かい制御を可能にするため、Predicate-Outerloadを使用することをお勧めします。 async_condition_variable使用して、並行キューやチャネルなどの同時コレクションとデータ構造を作成できます。
内部的には、 async_condition_variableサスペンションキューを保持します。このキューは、条件変数が通知されるのを待っているときに自分自身をエンキューします。 notify_*メソッドのいずれかが呼び出されると、呼び出されたメソッドに応じて、通知タスクDequeuesまたはすべてのタスクのいずれかのタスクのいずれかになります。タスクは、FIFOの方法でサスペンションキューから除去されます。たとえば、タスクAコールがawaitからタスクBの呼び出しがawait場合、タスクCはnotify_oneを呼び出し、内部的にタスクAは削除され、再開されます。タスクBは、 notify_oneまたはnotify_allの別の呼び出しが呼び出されるまで中断されたままになります。タスクAとタスクBが停止され、タスクCがnotify_allを呼び出す場合、両方のタスクが削除され、再開されます。
async_condition_variable api class async_condition_variable {
/*
Constructor.
*/
async_condition_variable () noexcept ;
/*
Atomically releases lock and suspends the current task by adding it to *this suspension-queue.
Throws std::invalid_argument if resume_executor is null.
Throws std::invalid_argument if lock is not locked at the moment of calling this method.
Might throw std::system_error if the underlying std::mutex throws.
*/
lazy_result< void > await (std::shared_ptr<executor> resume_executor, scoped_async_lock& lock);
/*
Equivalent to:
while (!pred()) {
co_await await(resume_executor, lock);
}
Might throw any exception that await(resume_executor, lock) might throw.
Might throw any exception that pred might throw.
*/
template < class predicate_type >
lazy_result< void > await (std::shared_ptr<executor> resume_executor, scoped_async_lock& lock, predicate_type pred);
/*
Dequeues one task from *this suspension-queue and resumes it, if any available at the moment of calling this method.
The suspended task is resumed by scheduling it to run on the executor given when await was called.
Might throw std::system_error if the underlying std::mutex throws.
*/
void notify_one ();
/*
Dequeues all tasks from *this suspension-queue and resumes them, if any available at the moment of calling this method.
The suspended tasks are resumed by scheduling them to run on the executors given when await was called.
Might throw std::system_error if the underlying std::mutex throws.
*/
void notify_all ();
};async_condition_variable例:この例では、 async_lockとasync_condition_variable協力して、タスク間でデータ(この例では整数)を送信するために使用できる同時キューを実装します。いくつかのメソッドはresultを返し、別のメソッドがlazy_resultを返し、熱心なタスクとレイジータスクの両方がどのように連携するかを示していることに注意してください。
# include " concurrencpp/concurrencpp.h "
# include < queue >
# include < iostream >
using namespace concurrencpp ;
class concurrent_queue {
private:
async_lock _lock;
async_condition_variable _cv;
std::queue< int > _queue;
bool _abort = false ;
public:
concurrent_queue () = default ;
result< void > shutdown (std::shared_ptr<executor> resume_executor) {
{
auto guard = co_await _lock. lock (resume_executor);
_abort = true ;
}
_cv. notify_all ();
}
lazy_result< void > push (std::shared_ptr<executor> resume_executor, int i) {
{
auto guard = co_await _lock. lock (resume_executor);
_queue. push (i);
}
_cv. notify_one ();
}
lazy_result< int > pop (std::shared_ptr<executor> resume_executor) {
auto guard = co_await _lock. lock (resume_executor);
co_await _cv. await (resume_executor, guard, [ this ] {
return _abort || !_queue. empty ();
});
if (!_queue. empty ()) {
auto result = _queue. front ();
_queue. pop ();
co_return result;
}
assert (_abort);
throw std::runtime_error ( " queue has been shut down. " );
}
};
result< void > producer_loop (executor_tag,
std::shared_ptr<thread_pool_executor> tpe,
concurrent_queue& queue,
int range_start,
int range_end) {
for (; range_start < range_end; ++range_start) {
co_await queue. push (tpe, range_start);
}
}
result< void > consumer_loop (executor_tag, std::shared_ptr<thread_pool_executor> tpe, concurrent_queue& queue) {
try {
while ( true ) {
std::cout << co_await queue. pop (tpe) << std::endl;
}
} catch ( const std:: exception & e) {
std::cerr << e. what () << std::endl;
}
}
int main () {
runtime runtime;
const auto thread_pool_executor = runtime. thread_pool_executor ();
concurrent_queue queue;
result< void > producers[ 4 ];
result< void > consumers[ 4 ];
for ( int i = 0 ; i < 4 ; i++) {
producers[i] = producer_loop ({}, thread_pool_executor, queue, i * 5 , (i + 1 ) * 5 );
}
for ( int i = 0 ; i < 4 ; i++) {
consumers[i] = consumer_loop ({}, thread_pool_executor, queue);
}
for ( int i = 0 ; i < 4 ; i++) {
producers[i]. get ();
}
queue. shutdown (thread_pool_executor). get ();
for ( int i = 0 ; i < 4 ; i++) {
consumers[i]. get ();
}
return 0 ;
}Concurrencpp Runtimeオブジェクトは、新しいエグゼクティブを取得、保存、作成するために使用されるエージェントです。
ランタイムは、メイン関数の実行が開始されるとすぐに、値タイプとして作成する必要があります。 Concurrencppランタイムが範囲から抜け出すと、執行者を繰り返して繰り返し、 executor::shutdownを呼び出すことで1つずつシャットダウンします。その後、エグゼキューターは内部作業ループを終了し、その後の新しいタスクをスケジュールしようとするとconcurrencpp::runtime_shutdown例外がスローされます。ランタイムには、タイマーの作成とオブジェクトの遅延に使用されるグローバルタイマーキューも含まれています。破壊されると、保存された執行者は不明確なタスクを破壊し、進行中のタスクが終了するのを待ちます。継続的なタスクがエグゼキューターを使用して新しいタスクを生み出したり、独自のタスクの継続をスケジュールしたりしようとする場合 - 例外がスローされます。この場合、進行中のタスクはできるだけ早く辞める必要があり、基礎となる執行者が辞めることができます。タイマーキューもシャットダウンされ、実行中のすべてのタイマーがキャンセルされます。このRAIIスタイルのコードを使用すると、ランタイムオブジェクトを作成する前にタスクを処理することはできません。ランタイムが範囲外に出てから/後には範囲外になります。これにより、解約メッセージを明示的に通信する必要があるため、同時のアプリケーションが解放されます。タスクは、ランタイムオブジェクトが生存している限り、無料の使用執行者です。
runtime API class runtime {
/*
Creates a runtime object with default options.
*/
runtime ();
/*
Creates a runtime object with user defined options.
*/
runtime ( const concurrencpp::runtime_options& options);
/*
Destroys this runtime object.
Calls executor::shutdown on each monitored executor.
Calls timer_queue::shutdown on the global timer queue.
*/
~runtime () noexcept ;
/*
Returns this runtime timer queue used to create new times.
*/
std::shared_ptr<concurrencpp::timer_queue> timer_queue () const noexcept ;
/*
Returns this runtime concurrencpp::inline_executor
*/
std::shared_ptr<concurrencpp::inline_executor> inline_executor () const noexcept ;
/*
Returns this runtime concurrencpp::thread_pool_executor
*/
std::shared_ptr<concurrencpp::thread_pool_executor> thread_pool_executor () const noexcept ;
/*
Returns this runtime concurrencpp::background_executor
*/
std::shared_ptr<concurrencpp::thread_pool_executor> background_executor () const noexcept ;
/*
Returns this runtime concurrencpp::thread_executor
*/
std::shared_ptr<concurrencpp::thread_executor> thread_executor () const noexcept ;
/*
Creates a new concurrencpp::worker_thread_executor and registers it in this runtime.
Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
*/
std::shared_ptr<concurrencpp::worker_thread_executor> make_worker_thread_executor ();
/*
Creates a new concurrencpp::manual_executor and registers it in this runtime.
Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
*/
std::shared_ptr<concurrencpp::manual_executor> make_manual_executor ();
/*
Creates a new user defined executor and registers it in this runtime.
executor_type must be a valid concrete class of concurrencpp::executor.
Might throw std::bad_alloc if no memory is available.
Might throw any exception that the constructor of <<executor_type>> might throw.
*/
template < class executor_type , class ... argument_types>
std::shared_ptr<executor_type> make_executor (argument_types&& ... arguments);
/*
returns the version of concurrencpp that the library was built with.
*/
static std::tuple< unsigned int , unsigned int , unsigned int > version () noexcept ;
};場合によっては、アプリケーションはスレッドの作成と終了の監視に関心があります。たとえば、一部のメモリアロケーターは、作成と終了時に新しいスレッドを登録および登録解除する必要があります。 concurrencppランタイムを使用すると、スレッド作成コールバックとスレッド終了コールバックを設定できます。これらのコールバックは、CONCURRENCPPワーカーの1人が新しいスレッドを作成し、そのスレッドが終了しているときはいつでも呼び出されます。これらのコールバックは常に作成/終了スレッド内から呼び出されるため、 std::this_thread::get_id常に関連するスレッドIDを返します。これらのコールバックの署名はvoid callback (std::string_view thread_name)です。 thread_name 、スレッドに与えられた同時に特定のタイトルであり、スレッド名を提示するいくつかのデバッガーで観察できます。スレッド名は一意であることが保証されておらず、ロギングとデバッグに使用する必要があります。
スレッド作成コールバックおよび/またはスレッド終端コールバックを設定するために、アプリケーションは、Runtimeコンストラクターに渡されるruntime_optionsのthread_started_callbackおよび/またはthread_terminated_callbackメンバーを設定できます。これらのコールバックは、スレッドを作成する可能性のある各concurrencppワーカーにコピーされるため、それらのコールバックはコピー可能でなければなりません。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::runtime_options options;
options. thread_started_callback = [](std::string_view thread_name) {
std::cout << " A new thread is starting to run, name: " << thread_name << " , thread id: " << std::this_thread::get_id ()
<< std::endl;
};
options. thread_terminated_callback = [](std::string_view thread_name) {
std::cout << " A thread is terminating, name: " << thread_name << " , thread id: " << std::this_thread::get_id () << std::endl;
};
concurrencpp::runtime runtime (options);
const auto timer_queue = runtime. timer_queue ();
const auto thread_pool_executor = runtime. thread_pool_executor ();
concurrencpp::timer timer =
timer_queue-> make_timer ( std::chrono::milliseconds ( 100 ), std::chrono::milliseconds ( 500 ), thread_pool_executor, [] {
std::cout << " A timer callable is executing " << std::endl;
});
std::this_thread::sleep_for ( std::chrono::seconds ( 3 ));
return 0 ;
}可能な出力:
A new thread is starting to run, name: concurrencpp::timer_queue worker, thread id: 7496
A new thread is starting to run, name: concurrencpp::thread_pool_executor worker, thread id: 21620
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A thread is terminating, name: concurrencpp::timer_queue worker, thread id: 7496
A thread is terminating, name: concurrencpp::thread_pool_executor worker, thread id: 21620
アプリケーションは、 derivable_executorクラスを継承することにより、独自のカスタムエグゼキュータータイプを作成できます。ユーザー定義の実行者を実装する際に考慮すべきいくつかのポイントがあります。最も重要なことは、エグゼキューターが複数のスレッドから使用されていることを覚えておくことです。そのため、実装された方法はスレッドセーフでなければなりません。
runtime::make_executorを使用して新しいエグゼクティブを作成できます。アプリケーションは、 runtime::make_executor使用することによってのみ、単純なインスタンス化( std::make_sharedまたはplane newなど)を使用して新しいエグゼクティブを作成してはなりません。また、アプリケーションは、 thread_pool_executorやthread_executorなど、組み込みのconcurrencppエグゼクティアを再作成してはならないため、それらのエグゼクティブはランタイムオブジェクトの既存のインスタンスを介してのみアクセスする必要があります。
もう1つの重要なポイントは、シャットダウンを正しく処理することです。 shutdown 、 shutdown_requested 、およびenqueueすべてエグゼキューターの状態を監視し、呼び出されたときにそれに応じて動作する必要があります。
shutdown 、基礎となるスレッドに終了してから参加するように指示するはずです。shutdown複数回呼び出される場合があり、このメソッドは、最初の呼び出し後にその後のshutdownへの呼び出しを無視することにより、このシナリオを処理する必要があります。enqueue 、 shutdownが以前に呼び出された場合は、 concurrencpp::errors::runtime_shutdown例外をスローする必要があります。 taskオブジェクト実行者の実装は、アプリケーションがconcurrencpp::taskクラスを直接操作する必要があるまれなケースの1つです。 concurrencpp::taskは、 std::function yike objectですが、いくつかの違いがあります。 std::functionのように、タスクオブジェクトは非同期操作として機能する呼び出し可能なものを保存します。 std::functionとは異なり、 task移動のみのタイプです。呼び出し時に、タスクオブジェクトはパラメーターを受信しず、 voidを返します。さらに、すべてのタスクオブジェクトは一度だけ呼び出すことができます。最初の呼び出しの後、タスクオブジェクトは空になります。空のタスクオブジェクトを呼び出すことは、空のラムダ( []{} )を呼び出すのと同等であり、例外をスローしません。タスクオブジェクトは、転送リファレンス( type&&ここでtypeはテンプレートパラメーター)として呼び出すことができ、コピー( std::functionなど)ではありません。保存された呼び出し可能な構築は、その場で発生します。これにより、タスクオブジェクトには、移動専用タイプ( std::unique_ptrやconcurrencpp::resultなど)の呼び出しが含まれます。タスクオブジェクトは、さまざまなメソッドを使用して保存されたタイプの使用を最適化しようとします。たとえば、タスクオブジェクトは、通常の小さな呼び出し可能な場合にstd::coroutine_handle<void>へのインラインコールを適用します。
task API class task {
/*
Creates an empty task object.
*/
task () noexcept ;
/*
Creates a task object by moving the stored callable of rhs to *this.
If rhs is empty, then *this will also be empty after construction.
After this call, rhs is empty.
*/
task (task&& rhs) noexcept ;
/*
Creates a task object by storing callable in *this.
<<typename std::decay<callable_type>::type>> will be in-place-
constructed inside *this by perfect forwarding callable.
*/
template < class callable_type >
task (callable_type&& callable);
/*
Destroys stored callable, does nothing if empty.
*/
~task () noexcept ;
/*
If *this is empty, does nothing.
Invokes stored callable, and immediately destroys it.
After this call, *this is empty.
May throw any exception that the invoked callable may throw.
*/
void operator ()();
/*
Moves the stored callable of rhs to *this.
If rhs is empty, then *this will also be empty after this call.
If *this already contains a stored callable, operator = destroys it first.
*/
task& operator =(task&& rhs) noexcept ;
/*
If *this is not empty, task::clear destroys the stored callable and empties *this.
If *this is empty, clear does nothing.
*/
void clear () noexcept ;
/*
Returns true if *this stores a callable. false otherwise.
*/
explicit operator bool () const noexcept ;
/*
Returns true if *this stores a callable,
and that stored callable has the same type as <<typename std::decay<callable_type>::type>>
*/
template < class callable_type >
bool contains () const noexcept ;
};ユーザー定義のエグゼクターを実装する場合、 taskオブジェクトを保存するのは実装次第( enqueueが呼び出された場合)、エグゼクティアの内部機械に従って実行します。
この例では、タスクをエンキューするようなアクションや実行などのアクションを記録するエグゼクターを作成します。 executorインターフェイスを実装し、実行時間を作成して、 runtime::make_executorを呼び出すことでインスタンスを作成および保存することをリクエストします。アプリケーションの残りの部分は、非ユーザー定義の執行者を使用する場合とまったく同じように動作します。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < queue >
# include < thread >
# include < mutex >
# include < condition_variable >
class logging_executor : public concurrencpp ::derivable_executor<logging_executor> {
private:
mutable std::mutex _lock;
std::queue<concurrencpp::task> _queue;
std::condition_variable _condition;
bool _shutdown_requested;
std::thread _thread;
const std::string _prefix;
void work_loop () {
while ( true ) {
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
return ;
}
if (!_queue. empty ()) {
auto task = std::move (_queue. front ());
_queue. pop ();
lock. unlock ();
std::cout << _prefix << " A task is being executed " << std::endl;
task ();
continue ;
}
_condition. wait (lock, [ this ] {
return !_queue. empty () || _shutdown_requested;
});
}
}
public:
logging_executor (std::string_view prefix) :
derivable_executor<logging_executor>( " logging_executor " ),
_shutdown_requested ( false ),
_prefix (prefix) {
_thread = std::thread ([ this ] {
work_loop ();
});
}
void enqueue (concurrencpp::task task) override {
std::cout << _prefix << " A task is being enqueued! " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
throw concurrencpp::errors::runtime_shutdown ( " logging executor - executor was shutdown. " );
}
_queue. emplace ( std::move (task));
_condition. notify_one ();
}
void enqueue (std::span<concurrencpp::task> tasks) override {
std::cout << _prefix << tasks. size () << " tasks are being enqueued! " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
throw concurrencpp::errors::runtime_shutdown ( " logging executor - executor was shutdown. " );
}
for ( auto & task : tasks) {
_queue. emplace ( std::move (task));
}
_condition. notify_one ();
}
int max_concurrency_level () const noexcept override {
return 1 ;
}
bool shutdown_requested () const noexcept override {
std::unique_lock<std::mutex> lock (_lock);
return _shutdown_requested;
}
void shutdown () noexcept override {
std::cout << _prefix << " shutdown requested " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) return ; // nothing to do.
_shutdown_requested = true ;
lock. unlock ();
_condition. notify_one ();
_thread. join ();
}
};
int main () {
concurrencpp::runtime runtime;
auto logging_ex = runtime. make_executor <logging_executor>( " Session #1234 " );
for ( size_t i = 0 ; i < 10 ; i++) {
logging_ex-> post ([] {
std::cout << " hello world " << std::endl;
});
}
std::getchar ();
return 0 ;
}$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S . -B build /lib
$ cmake -- build build /lib --config Release$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S test -B build / test
$ cmake -- build build / test
< # for release mode: cmake --build build/test --config Release #>
$ cd build / test
$ ctest . -V -C Debug
< # for release mode: ctest . -V -C Release #> $ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -DCMAKE_BUILD_TYPE=Release -S . -B build /lib
$ cmake -- build build /lib
#optional, install the library: sudo cmake --install build/lib ClangとGCCを使用すると、TSAN(スレッド消毒剤)サポートでテストを実行することもできます。
$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S test -B build / test
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S test -B build/test
#for TSAN mode: cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_THREAD_SANITIZER=Yes -S test -B build/test
$ cmake -- build build / test
$ cd build / test
$ ctest . -VLinuxをコンパイルするとき、ライブラリはデフォルトでlibstdc++を使用しようとします。 libc++標準ライブラリの実装として使用する場合は、 CMAKE_TOOLCHAIN_FILEフラグを以下に指定する必要があります。
$ cmake -DCMAKE_TOOLCHAIN_FILE=../cmake/libc++.cmake -DCMAKE_BUILD_TYPE=Release -S . -B build /lib代わりに、ライブラリを手動で構築してインストールすることにかかわらず、開発者はVCPKGおよびCONANパッケージマネージャーを介してCONCURRENCPPの安定したリリースを取得できます。
vcpkg:
$ vcpkg install concurrencppコナン:Concurrencpp on Conancenter
concurrencppには、コンパイルされたライブラリを別のコードベースにインストールまたはリンクすることなく、開発者が変更および実験できる組み込みのサンドボックスプログラムが付属しています。サンドボックスで遊ぶために、開発者はsandbox/main.cpp変更し、次のコマンドを使用してアプリケーションをコンパイルできます。
$ cmake -S sandbox -B build /sandbox
$ cmake -- build build /sandbox
< # for release mode: cmake --build build/sandbox --config Release #>
$ ./ build /sandbox < # runs the sandbox> $ cmake -S sandbox -B build /sandbox
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S sandbox -B build/sandbox
$ cmake -- build build /sandbox
$ ./ build /sandbox #runs the sandbox