Consurrencpp將並發任務的功能帶到C ++世界,使開發人員可以使用任務,執行者和Coroutines輕鬆,安全地編寫高度並發應用程序。通過使用Concurrencpp應用程序,可以分解需要將異步處理成同時運行並以合作方式工作以實現所需結果的較小任務。 Concurrencpp還允許應用程序使用並行的Coroutines輕鬆編寫並行算法。
concurrencpp的主要優勢是:
std::thread and std::mutex之類的低級原始碼。co_await關鍵字,可以輕鬆地實現非阻滯,同步代碼。executor APIthread_pool_executor APImanual_executor APIresult類型result APIlazy_result類型lazy_result apiresult_promise apiresult_promise示例shared_result apishared_result示例make_ready_resultmake_exceptional_resultwhen_allwhen_anyresume_ontimer_queue apitimer APIgenerator APIgenerator示例async_lock apiscoped_async_lock apiasync_lock示例async_condition_variable APIasync_condition_variable示例runtime APItask對象task API共倫普普圍繞並發任務的概念構建。一項任務是異步操作。與傳統中心的方法相比,任務為並發代碼提供了更高的抽像水平。可以將任務束縛在一起,這意味著任務將其異步結果從一個轉移到另一個任務,其中一個任務的結果就好像是另一個正在進行的任務的參數或中間值一樣。任務使應用程序可以更好地利用可用的硬件資源,比使用原始線程更大,因為可以暫停任務,等待另一個任務以產生結果,而不會阻止基礎的OS線程。任務通過允許他們更多地專注於業務邏輯,而更少地放在線程管理和線程間同步等低級概念上,從而為開發人員帶來了更多的生產率。
任務指定必須執行哪些操作,而執行者是指定在何處以及如何執行任務的工作者對象。執行者備用應用程序繁瑣的線程池和任務隊列的管理。執行者還通過提供用於創建和調度任務的統一API將這些概念從應用程序代碼中解脫出來。
任務使用結果對象相互通信。結果對像是一個異步管,將一個任務的異步結果傳遞給另一個正在進行的任務。結果可以等待和以非阻滯方式解決。
這三個概念 - 任務,執行者和相關的結果是concurrencpp的構建塊。執行者通過通過結果對象發送結果來運行與彼此通信的任務。任務,執行者和結果對像在共生上共同起作用,以產生快速和乾淨的並發代碼。
Concurrencpp是圍繞RAII概念而建立的。為了使用任務和執行者,應用程序在main函數開頭創建一個runtime實例。然後,運行時間用於獲取現有執行者並註冊新的用戶定義的執行者。執行者被用來創建和安排運行的任務,他們可能會返回一個可用於將異步result傳遞給另一個用作消費者的任務的結果對象。當運行時破壞時,它會在每個存儲的執行程序上迭代並調用其shutdown方法。然後,每個執行人都優雅地退出。計劃外的任務被破壞了,創建新任務的嘗試將引發例外。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::runtime runtime;
auto result = runtime. thread_executor ()-> submit ([] {
std::cout << " hello world " << std::endl;
});
result. get ();
return 0 ;
}在這個基本示例中,我們創建了一個運行時對象,然後從運行時獲得了線程執行程序。我們使用submit將lambda作為我們給定的可笑。該lambda返回void ,因此,執行者返回一個result<void>對象,該對象將異步結果傳遞回呼叫者。 main呼叫get哪個會阻止主線程,直到結果準備就緒。如果沒有例外, get返回void 。如果拋出了例外, get重新插入。異步, thread_executor啟動了一個新的執行線程並運行給定的lambda。它隱含地co_return void並完成任務。然後, main被封鎖。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < vector >
# include < algorithm >
# include < ctime >
using namespace concurrencpp ;
std::vector< int > make_random_vector () {
std::vector< int > vec ( 64 * 1'024 );
std::srand ( std::time ( nullptr ));
for ( auto & i : vec) {
i = :: rand ();
}
return vec;
}
result< size_t > count_even (std::shared_ptr<thread_pool_executor> tpe, const std::vector< int >& vector) {
const auto vecor_size = vector. size ();
const auto concurrency_level = tpe-> max_concurrency_level ();
const auto chunk_size = vecor_size / concurrency_level;
std::vector<result< size_t >> chunk_count;
for ( auto i = 0 ; i < concurrency_level; i++) {
const auto chunk_begin = i * chunk_size;
const auto chunk_end = chunk_begin + chunk_size;
auto result = tpe-> submit ([&vector, chunk_begin, chunk_end]() -> size_t {
return std::count_if (vector. begin () + chunk_begin, vector. begin () + chunk_end, []( auto i) {
return i % 2 == 0 ;
});
});
chunk_count. emplace_back ( std::move (result));
}
size_t total_count = 0 ;
for ( auto & result : chunk_count) {
total_count += co_await result;
}
co_return total_count;
}
int main () {
concurrencpp::runtime runtime;
const auto vector = make_random_vector ();
auto result = count_even (runtime. thread_pool_executor (), vector);
const auto total_count = result. get ();
std::cout << " there are " << total_count << " even numbers in the vector " << std::endl;
return 0 ;
}在此示例中,我們通過創建運行時對象來啟動程序。我們創建一個充滿隨機數的向量,然後我們從運行時獲取thread_pool_executor並調用count_even 。 count_even是一個coroutine,可以催生更多的任務,並供他們完成內部co_await 。 max_concurrency_level返回執行人支持的最大工人,在ThreadPool Executor案例中,工人數量是根據內核數量計算的。然後,我們對數組進行分區以匹配工人的數量,並發送每個要在其任務中處理的塊。異步地,工人計算每個塊包含多少個數字,並co_return結果。 count_even通過使用co_await提取計數來概括每個結果,然後最終結果是co_return ed。主線程被呼叫get封鎖的主線程沒有阻止,並且返回了總數。 MAIN打印均勻數字的數量,並且該程序優雅地終止。
每個大型或複雜的操作都可以分解為較小且可鏈的步驟。任務是實施這些計算步驟的異步操作。任務可以在執行人的幫助下在任何地方運行。雖然可以通過常規可可(例如函子和lambdas)創建任務,但任務主要用於旋律,這些任務允許平滑的懸架和恢復。在Concurrencpp中,任務概念由concurrencpp::task類表示。儘管任務概念是Concurrenpp的核心,但由於運行時創建和安排任務對象而沒有外部幫助,因此很少需要創建和操縱任務對象。
Concurrencpp允許應用程序生產和消費Coroutines作為創建任務的主要方式。 Concurrencpp支持渴望和懶惰的任務。
急切的任務開始運行他們被調用的那一刻。當應用程序需要發射異步動作並在以後(稍後消防)上消耗其結果時,建議使用這種執行,或者完全忽略異步結果(火與忘記)。
急切的任務可以返回result或null_result 。 result返回類型告訴Coroutine通過返回的值或拋出的異常(稍後火和消耗),而null_result返回類型告訴Coroutine掉落並忽略了任何一個(火和忘記)。
急切的Coroutines可以在呼叫者線程中同步運行。這種共素被稱為“常規珊瑚酸”。 Concurrencpp急切的Coroutines也可以開始並行運行,在給定的執行者內部,這種coroutines稱為“並行coroutines”。
另一方面,懶惰任務僅在co_await ED時開始運行。當任務的結果應在創建任務後立即消費時,建議使用此類任務。延期延遲的懶惰任務對立即消耗的情況進行了更優化,因為它們不需要特殊的線程同步即可將異步結果傳遞回消費者。編譯器還可能優化形成基礎的Coroutine承諾所需的一些內存分配。不可能解僱懶惰的任務並執行其他事情 - 驅動懶惰的coroutine一定意味著暫停來電者coroutine。只有在懶惰的coroutine完成時,呼叫者的coroutine才會恢復。懶惰任務只能返回lazy_result 。
懶惰任務可以通過調用lazy_result::run來轉換為急切的任務。此方法運行懶惰任務內聯,並返回一個result對象,該對象可以監視新啟動的任務。如果不確定開發人員使用哪種結果類型,則鼓勵他們使用懶惰的結果,因為如果需要,可以將其轉換為常規(急切)結果。
當函數返回lazy_result的任何一個, result或null_result ,並且在其體內至少包含一個co_await或co_return時,該函數是confurrencpp coroutine。每個有效的concurrencpp coroutine都是有效的任務。在上面的計數示例中, count_even就是這樣的coroutine。我們首先催生了count_even ,然後在其中threadpool執行者催生了更多的子任務(是從常規可可創建的),最終使用co_await加入了這些任務。
concurrencpp opecutor是能夠安排和運行任務的對象。執行者通過將其脫離應用程序代碼來簡化管理資源(例如線程,線程池和任務隊列)的工作。執行者提供了一種統一的調度和執行任務方式,因為它們都擴展了concurrencpp::executor 。
executor API class executor {
/*
Initializes a new executor and gives it a name.
*/
executor (std::string_view name);
/*
Destroys this executor.
*/
virtual ~executor () noexcept = default ;
/*
The name of the executor, used for logging and debugging.
*/
const std::string name;
/*
Schedules a task to run in this executor.
Throws concurrencpp::errors::runtime_shutdown exception if shutdown was called before.
*/
virtual void enqueue (concurrencpp::task task) = 0;
/*
Schedules a range of tasks to run in this executor.
Throws concurrencpp::errors::runtime_shutdown exception if shutdown was called before.
*/
virtual void enqueue (std::span<concurrencpp::task> tasks) = 0;
/*
Returns the maximum count of real OS threads this executor supports.
The actual count of threads this executor is running might be smaller than this number.
returns numeric_limits<int>::max if the executor does not have a limit for OS threads.
*/
virtual int max_concurrency_level () const noexcept = 0;
/*
Returns true if shutdown was called before, false otherwise.
*/
virtual bool shutdown_requested () const noexcept = 0;
/*
Shuts down the executor:
- Tells underlying threads to exit their work loop and joins them.
- Destroys unexecuted coroutines.
- Makes subsequent calls to enqueue, post, submit, bulk_post and
bulk_submit to throw concurrencpp::errors::runtime_shutdown exception.
- Makes shutdown_requested return true.
*/
virtual void shutdown () noexcept = 0;
/*
Turns a callable and its arguments into a task object and
schedules it to run in this executor using enqueue.
Arguments are passed to the task by decaying them first.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type , class ... argument_types>
void post (callable_type&& callable, argument_types&& ... arguments);
/*
Like post, but returns a result object that passes the asynchronous result.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type , class ... argument_types>
result<type> submit (callable_type&& callable, argument_types&& ... arguments);
/*
Turns an array of callables into an array of tasks and
schedules them to run in this executor using enqueue.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type >
void bulk_post (std::span<callable_type> callable_list);
/*
Like bulk_post, but returns an array of result objects that passes the asynchronous results.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type >
std::vector<concurrencpp::result<type>> bulk_submit (std::span<callable_type> callable_list);
};如上所述,Concurrencpp提供常用的執行者。這些執行人類型是:
線程池執行器- 維護線程池的通用執行器。線程池執行程序適用於不阻止CPU的短任務。鼓勵應用程序使用該執行程序作為非阻止任務的默認執行程序。 ConcurRencpp線程池提供動態的線程注入和動態工作平衡。
背景執行程序- 帶有較大線程池的ThreadPool執行程序。適用於啟動簡短的阻止任務,例如文件IO和DB查詢。重要說明:當消耗結果時,該執行人通過調用submit和bulk_submit返回,使用resume_on將執行切換到cpu結合的執行程序很重要,以防止在後台_executor內部處理CPU綁定的任務。
例子:
auto result = background_executor.submit([] { /* some blocking action */ });
auto done_result = co_await result.resolve();
co_await resume_on (some_cpu_executor);
auto val = co_await done_result; // runs inside some_cpu_executor線程執行程序- 啟動每個重新任務以在新的執行線程上運行的執行程序。線程沒有重複使用。該執行人適合長期運行的任務,例如運行工作循環的對像或長時間的阻止操作。
Worker線程執行程序- 維護單個任務隊列的單個線程執行程序。適用於應用程序需要一個專用線程來執行許多相關任務。
手動執行人- 執行者不單獨執行coroutines。應用程序代碼可以通過手動調用其執行方法來執行先前重演的任務。
可衍生的執行人- 用戶定義的執行者的基類。儘管可以直接從concurrencpp::executor繼承,但derivable_executor使用CRTP模式為編譯器提供了一些優化機會。
內聯執行程序- 主要用於覆蓋其他執行者的行為。起訴一項任務等同於將其串聯調用。
執行人的裸露機制被封裝在其enqueue方法中。此方法引起了執行任務,並具有兩個重載:一個過載作為參數接收單個任務對象,而另一個接收任務對象的跨度。第二個過載用於加入一批任務。這樣可以更好地安排啟發式方法和降低爭論。
應用程序不必單獨依靠enqueue , concurrencpp::executor提供了一個用於安排用戶可可的API,通過將其轉換為幕後任務對象。應用程序可以請求執行者返回通過提供的可challable的異步結果的結果對象。這是通過致電executor::submit and executor::bulk_submit來完成的。 submit獲取可召喚,並返回結果對象。 executor::bulk_submit獲得了一定span的可呼叫,並以類似的方式submit結果對象的vector 。在許多情況下,應用程序對異步值或異常不感興趣。在這種情況下,應用程序可以使用executor:::post and executor::bulk_post安排可召喚或要執行的可呼叫的span ,但也告訴任務丟棄任何返回的值或投擲異常。不傳遞異步結果比通過的速度快,但是我們無法知道正在進行的任務的狀態或結果。
post , bulk_post , submit和bulk_submit在幕後使用基礎調度enqueue 。
thread_pool_executor API除了post , submit , bulk_post和bulk_submit , thread_pool_executor提供了這些其他方法。
class thread_pool_executor {
/*
Returns the number of milliseconds each thread-pool worker
remains idle (lacks any task to execute) before exiting.
This constant can be set by passing a runtime_options object
to the constructor of the runtime class.
*/
std::chrono::milliseconds max_worker_idle_time () const noexcept ;
};manual_executor API除了post , submit , bulk_post和bulk_submit外, manual_executor _executor提供了這些其他方法。
class manual_executor {
/*
Destructor. Equivalent to clear.
*/
~manual_executor () noexcept ;
/*
Returns the number of enqueued tasks at the moment of invocation.
This number can change quickly by the time the application handles it, it should be used as a hint.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
size_t size () const noexcept ;
/*
Queries whether the executor is empty from tasks at the moment of invocation.
This value can change quickly by the time the application handles it, it should be used as a hint.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
bool empty () const noexcept ;
/*
Clears the executor from any enqueued but yet to-be-executed tasks,
and returns the number of cleared tasks.
Tasks enqueued to this executor by (post_)submit method are resumed
and errors::broken_task exception is thrown inside them.
Ongoing tasks that are being executed by loop_once(_XXX) or loop(_XXX) are uneffected.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t clear ();
/*
Tries to execute a single task. If at the moment of invocation the executor
is empty, the method does nothing.
Returns true if a task was executed, false otherwise.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool loop_once ();
/*
Tries to execute a single task.
This method returns when either a task was executed or max_waiting_time
(in milliseconds) has reached.
If max_waiting_time is 0, the method is equivalent to loop_once.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool loop_once_for (std::chrono::milliseconds max_waiting_time);
/*
Tries to execute a single task.
This method returns when either a task was executed or timeout_time has reached.
If timeout_time has already expired, this method is equivalent to loop_once.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
bool loop_once_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Tries to execute max_count enqueued tasks and returns the number of tasks that were executed.
This method does not wait: it returns when the executor
becomes empty from tasks or max_count tasks have been executed.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t loop ( size_t max_count);
/*
Tries to execute max_count tasks.
This method returns when either max_count tasks were executed or a
total amount of max_waiting_time has passed.
If max_waiting_time is 0, the method is equivalent to loop.
Returns the actual amount of tasks that were executed.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t loop_for ( size_t max_count, std::chrono::milliseconds max_waiting_time);
/*
Tries to execute max_count tasks.
This method returns when either max_count tasks were executed or timeout_time has reached.
If timeout_time has already expired, the method is equivalent to loop.
Returns the actual amount of tasks that were executed.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
size_t loop_until ( size_t max_count, std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Waits for at least one task to be available for execution.
This method should be used as a hint,
as other threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
void wait_for_task ();
/*
This method returns when one or more tasks are available for
execution or max_waiting_time has passed.
Returns true if at at least one task is available for execution, false otherwise.
This method should be used as a hint, as other threads (calling loop, for example)
might empty the executor, before this thread has a chance to do something
with the newly enqueued tasks.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool wait_for_task_for (std::chrono::milliseconds max_waiting_time);
/*
This method returns when one or more tasks are available for execution or timeout_time has reached.
Returns true if at at least one task is available for execution, false otherwise.
This method should be used as a hint,
as other threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
bool wait_for_task_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
This method returns when max_count or more tasks are available for execution.
This method should be used as a hint, as other threads
(calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
void wait_for_tasks ( size_t max_count);
/*
This method returns when max_count or more tasks are available for execution
or max_waiting_time (in milliseconds) has passed.
Returns the number of tasks available for execution when the method returns.
This method should be used as a hint, as other
threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t wait_for_tasks_for ( size_t count, std::chrono::milliseconds max_waiting_time);
/*
This method returns when max_count or more tasks are available for execution
or timeout_time is reached.
Returns the number of tasks available for execution when the method returns.
This method should be used as a hint, as other threads
(calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
size_t wait_for_tasks_until ( size_t count, std::chrono::time_point<clock_type, duration_type> timeout_time);
};可以使用Concurrencpp結果對象消耗異步值和異常。 result類型代表了急切的任務的異步結果,而lazy_result表示懶惰任務的延期結果。
當任務(急切或懶惰)完成時,它要么返回有效的值,要么拋出異常。無論哪種情況,這種異步結果都傳遞給結果對象的消費者。
result對象形成不對稱的coroutines-執行呼叫者coroutine的執行不是由callee-coroutine執行而實現的,兩個coroutines均可獨立運行。只有在消耗callee-coroutine的結果時,呼叫者核心可能會被暫停等待Callee完成。直到那時,兩個共核能獨立運行。 Callee-coroutine是否會消耗其結果。
lazy_result對象形式形式對稱旋律 - callee-coroutine的執行僅在呼叫者-Coroutine懸掛後才發生。當等待懶惰的結果時,當前的Coroutine被暫停,並且與懶惰結果相關的懶惰任務開始運行。在Callee-Coroutine完成並產生結果後,恢復了呼叫者-Coroutine。如果不消耗懶惰的結果,其相關的懶惰任務永遠不會開始運行。
所有結果對像都是僅移動的類型,因此,將其內容移至另一個結果對像後,它們無法使用。在這種情況下,結果對像被認為是空的,並且試圖調用operator bool和operator =以外的任何方法都會引發異常。
在將異步結果從結果對像中抽出(例如,通過調用get或operator co_await )將結果對像變為空。可以用operator bool測試空虛。
等待結果意味著暫停當前的coroutine,直到結果對象準備就緒。如果從關聯的任務返回有效值,則將其從結果對象返回。如果關聯的任務引發了異常,則將重新投入。在等待的那一刻,如果結果已經準備好,那麼當前的Coroutine會立即恢復。否則,通過設置異步結果或異常的線程恢復它。
解決結果類似於等待它。不同之處在於, co_await表達式將以無空狀態以非空形式返回結果對象本身。然後可以使用get或co_await取消異步結果。
每個結果對像都具有指示異步結果狀態的狀態。結果狀態與result_status::idle (尚未產生異步結果或異常)變化為result_status::value (關聯的任務(通過返回有效值返回有效值)為result_status::exception (通過投擲例外終止的任務終止))。可以通過調用(lazy_)result::status 。
result類型result類型代表了正在進行的,異步任務的結果,類似於std::future 。
除了等待和解決結果對象result::get ,還可以通過調用result::wait_for result::wait result::wait_until等待它們:等待結果完成的是阻止操作(在情況下,異步結果尚未準備就緒),並且將暫停整個執行線程,等待異步結果可用。等待操作通常是勸阻的,並且僅在根級任務或允許其允許的上下文中允許,例如阻止主線程等待應用程序的其餘部分優雅地完成,或使用concurrencpp::blocking_executor或concurrencpp::thread_executor 。
通過使用co_await等待結果對象(通過這樣做,將當前功能/任務也轉換為coroutine)是消耗結果對象的首選方法,因為它不會阻止基礎線程。
result API class result {
/*
Creates an empty result that isn't associated with any task.
*/
result () noexcept = default ;
/*
Destroys the result. Associated tasks are not cancelled.
The destructor does not block waiting for the asynchronous result to become ready.
*/
~result () noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result (result&& rhs) noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.
*/
result& operator = (result&& rhs) noexcept = default ;
/*
Returns true if this is a non-empty result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The returned value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Blocks the current thread of execution until this result is ready,
when status() != result_status::idle.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void wait ();
/*
Blocks until this result is ready or duration has passed. Returns the status
of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class duration_unit , class ratio >
result_status wait_for (std::chrono::duration<duration_unit, ratio> duration);
/*
Blocks until this result is ready or timeout_time has reached. Returns the status
of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class clock , class duration >
result_status wait_until (std::chrono::time_point<clock, duration> timeout_time);
/*
Blocks the current thread of execution until this result is ready,
when status() != result_status::idle.
If the result is a valid value, it is returned, otherwise, get rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
type get ();
/*
Returns an awaitable used to await this result.
If the result is already ready - the current coroutine resumes
immediately in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended
and resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to resolve this result.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
};lazy_result類型懶惰結果對象表示遞延懶惰任務的結果。
lazy_result負責啟動相關的懶惰任務並將其遞延結果轉交給消費者。當等待或解決時,懶惰的結果暫停了當前的coroutine,並啟動了相關的懶惰任務。當關聯的任務完成時,其異步值將傳遞給呼叫者任務,然後恢復。
有時,API可能會返回懶惰的結果,但是應用程序需要其關聯的任務才能急切地運行(而無需暫停呼叫者任務)。在這種情況下,可以通過在其關聯的懶惰結果上調用run來將懶惰任務轉換為急切的任務。在這種情況下,關聯的任務將開始在內聯行動,而無需暫停呼叫者任務。原始的懶惰結果被清空,並將返回新啟動任務的有效result對象。
lazy_result api class lazy_result {
/*
Creates an empty lazy result that isn't associated with any task.
*/
lazy_result () noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
lazy_result (lazy_result&& rhs) noexcept ;
/*
Destroys the result. If not empty, the destructor destroys the associated task without resuming it.
*/
~lazy_result () noexcept ;
/*
Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.
If *this is not empty, then operator= destroys the associated task without resuming it.
*/
lazy_result& operator =(lazy_result&& rhs) noexcept ;
/*
Returns true if this is a non-empty result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The returned value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Returns an awaitable used to start the associated task and await this result.
If the result is already ready - the current coroutine resumes immediately
in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended and
resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to start the associated task and resolve this result.
If the result is already ready - the current coroutine resumes immediately
in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended and resumed
when the asynchronous result is ready, by the thread which
had set the asynchronous value or exception.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
/*
Runs the associated task inline and returns a result object that monitors the newly started task.
After this call, *this is empty.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
*/
result<type> run ();
};常規的急切coroutines在執行的調用線程中開始同步運行。如果Coroutine經歷重新安排,則執行可能會轉移到另一個執行線程,例如,通過等待其內部未準備就緒的結果對象。 concurrencpp還提供並行的coroutines,它們開始在給定的執行者內部運行,而不是在調用執行線程中運行。當編寫使用叉-Join模型的並發算法時,這種調度旋ou風格在編寫平行算法,遞歸算法和並發算法時特別有用。
每個平行的Coroutine都必須符合以下先決條件:
result / null_result 。executor_tag作為其第一個參數。type* / type& / std::shared_ptr<type> ,其中type是executor類別的第二個參數。co_await或co_return 。如果上述所有應用程序都適用,則該功能是一個並行的coroutine:concurrencpp將啟動coroutine吊銷,並立即重新安排其在提供的執行者中運行。 concurrencpp::executor_tag是一個虛擬佔位符,可以告訴concurrencpp運行時,此函數不是常規函數,它需要在給定的執行程序內開始運行。如果執行人傳遞給並行的coroutine為null,則Coroutine將不會開始運行,並且std::invalid_argument異常將同步投擲。如果滿足所有前提條件,則應用程序可以使用返回的結果對象消耗並行coroutine的結果。
在此示例中,我們以平行方式計算斐波那契序列的第30個成員。我們開始以自己的並行coroutine啟動每個斐波那契。第一個參數是虛擬executor_tag ,第二個參數是threadpool executor。每個遞歸步驟都會調用一個並行運行的新的並行式Coroutine。每個結果都co_return於其父任務,並通過使用co_await獲得。
當我們認為輸入足夠小以同步計算(當curr <= 10 )時,我們停止在自己的任務中執行每個遞歸步驟,而只是同步求解算法。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace concurrencpp ;
int fibonacci_sync ( int i) {
if (i == 0 ) {
return 0 ;
}
if (i == 1 ) {
return 1 ;
}
return fibonacci_sync (i - 1 ) + fibonacci_sync (i - 2 );
}
result< int > fibonacci (executor_tag, std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10 ) {
co_return fibonacci_sync (curr);
}
auto fib_1 = fibonacci ({}, tpe, curr - 1 );
auto fib_2 = fibonacci ({}, tpe, curr - 2 );
co_return co_await fib_1 + co_await fib_2;
}
int main () {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci ({}, runtime. thread_pool_executor (), 30 ). get ();
std::cout << " fibonacci(30) = " << fibb_30 << std::endl;
return 0 ;
}為了比較,這是在不使用並行coroutines的情況下編寫相同代碼的方式,並依靠executor::submit 。由於fibonacci返回result<int> ,因此通過executor::submit將result<result<int>> 。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace concurrencpp ;
int fibonacci_sync ( int i) {
if (i == 0 ) {
return 0 ;
}
if (i == 1 ) {
return 1 ;
}
return fibonacci_sync (i - 1 ) + fibonacci_sync (i - 2 );
}
result< int > fibonacci (std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10 ) {
co_return fibonacci_sync (curr);
}
auto fib_1 = tpe-> submit (fibonacci, tpe, curr - 1 );
auto fib_2 = tpe-> submit (fibonacci, tpe, curr - 2 );
co_return co_await co_await fib_1 +
co_await co_await fib_2;
}
int main () {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci (runtime. thread_pool_executor (), 30 ). get ();
std::cout << " fibonacci(30) = " << fibb_30 << std::endl;
return 0 ;
}結果對像是在Concurrencpp中傳遞數據之間數據的主要方法,我們已經看到了執行者和Coroutines如何產生此類對象。有時,我們想使用帶有非任務的結果對象的功能,例如使用第三方庫時。在這種情況下,我們可以使用result_promise完成結果對象。 result_promise類似於std::promise對象 - 應用程序可以手動設置異步結果或異常,並使關聯的result對象準備就緒。
就像結果對像一樣,結果促銷是移動後僅移動類型。同樣,設置結果或例外後,結果承諾也變為空。如果結果促銷脫離範圍並且沒有設置結果/異常,則使用set_exception方法將結果促銷驅動器設置一個concurrencpp::errors::broken_task異常。暫停和阻塞的任務等待關聯的結果對像被恢復/未阻止。
結果承諾可以將代碼的回調樣式轉換為async/await代碼樣式:每當組件需要回調以傳遞異步結果時,我們可以通過調用set_result或set_exception或set_exception (取決於異步結果本身)的回調,並返回相關結果。
result_promise api template < class type >
class result_promise {
/*
Constructs a valid result_promise.
Might throw std::bad_alloc if fails to allocate memory.
*/
result_promise ();
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result_promise (result_promise&& rhs) noexcept ;
/*
Destroys *this, possibly setting an errors::broken_task exception
by calling set_exception if *this is not empty at the time of destruction.
*/
~result_promise () noexcept ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result_promise& operator = (result_promise&& rhs) noexcept ;
/*
Returns true if this is a non-empty result-promise.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Sets a value by constructing <<type>> from arguments... in-place.
Makes the associated result object become ready - tasks waiting for it
to become ready are unblocked.
Suspended tasks are resumed inline.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Might throw any exception that the constructor
of type(std::forward<argument_types>(arguments)...) throws.
*/
template < class ... argument_types>
void set_result (argument_types&& ... arguments);
/*
Sets an exception.
Makes the associated result object become ready - tasks waiting for it
to become ready are unblocked.
Suspended tasks are resumed inline.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Throws std::invalid_argument exception if exception_ptr is null.
*/
void set_exception (std::exception_ptr exception_ptr);
/*
A convenience method that invokes a callable with arguments... and calls set_result
with the result of the invocation.
If an exception is thrown, the thrown exception is caught and set instead by calling set_exception.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Might throw any exception that callable(std::forward<argument_types>(arguments)...)
or the contructor of type(type&&) throw.
*/
template < class callable_type , class ... argument_types>
void set_from_function (callable_type&& callable, argument_types&& ... arguments);
/*
Gets the associated result object.
Throws errors::empty_result_promise exception If *this is empty.
Throws errors::result_already_retrieved exception if this method had been called before.
*/
result<type> get_result ();
};result_promise示例:在此示例中, result_promise用於從一個線程中推動數據,並且可以從另一個線程中將其從其關聯的result對像中提取。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::result_promise<std::string> promise;
auto result = promise. get_result ();
std::thread my_3_party_executor ([promise = std::move (promise)] () mutable {
std::this_thread::sleep_for ( std::chrono::seconds ( 1 )); // Imitate real work
promise. set_result ( " hello world " );
});
auto asynchronous_string = result. get ();
std::cout << " result promise returned string: " << asynchronous_string << std::endl;
my_3_party_executor. join ();
}在此示例中,我們將std::thread用作第三方執行者。這代表了一種場景,當將非concurrencpp遺囑執行人用作應用程序生命週期的一部分時。在通過承諾並阻止主線程之前,我們提取結果對象,直到結果準備就緒。在my_3_party_executor中,我們設置了一個結果,就好像我們co_return為ed。
共享結果是一種特殊的結果對象,允許多個消費者訪問類似於std::shared_future異步結果。來自不同線程的不同消費者可以調用await ,以線程安全的方式get和resolve功能。
共享結果是由常規結果對象構建的,與常規結果對像不同,它們都是可複制的和可移動的。因此, shared_result行為類似於std::shared_ptr類型。如果共享結果實例移至另一個實例,則該實例變為空,並嘗試訪問它將引發異常。
為了支持多個消費者,共享結果返回對異步值的引用,而不是移動它(如常規結果)。例如, shared_result<int>返回int&何時get , await等等。如果shared_result的基本類型是void或參考類型(例如int& ),則它們會像往常一樣返回。如果異步結果是拋出的結果,則重新啟動。
請注意,在使用來自多個線程的shared_result _Result獲取異步結果的情況下是線程安全時,實際值可能不是線程安全的。例如,多個線程可以通過接收其參考( int& )來獲取異步整數。它不會使整數本身安全。如果異步值已經安全,則可以突變異步值。另外,鼓勵應用程序使用const類型以(例如const int )開始,並獲取恆定引用(例如const int& )防止突變。
shared_result api class share_result {
/*
Creates an empty shared-result that isn't associated with any task.
*/
shared_result () noexcept = default ;
/*
Destroys the shared-result. Associated tasks are not cancelled.
The destructor does not block waiting for the asynchronous result to become ready.
*/
~shared_result () noexcept = default ;
/*
Converts a regular result object to a shared-result object.
After this call, rhs is empty.
Might throw std::bad_alloc if fails to allocate memory.
*/
shared_result (result<type> rhs);
/*
Copy constructor. Creates a copy of the shared result object that monitors the same task.
*/
shared_result ( const shared_result&) noexcept = default ;
/*
Move constructor. Moves rhs to *this. After this call, rhs is empty.
*/
shared_result (shared_result&& rhs) noexcept = default ;
/*
Copy assignment operator. Copies rhs to *this and monitors the same task that rhs monitors.
*/
shared_result& operator =( const shared_result& rhs) noexcept ;
/*
Move assignment operator. Moves rhs to *this. After this call, rhs is empty.
*/
shared_result& operator =(shared_result&& rhs) noexcept ;
/*
Returns true if this is a non-empty shared-result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The return value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Blocks the current thread of execution until this shared-result is ready,
when status() != result_status::idle.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void wait ();
/*
Blocks until this shared-result is ready or duration has passed.
Returns the status of this shared-result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class duration_type , class ratio_type >
result_status wait_for (std::chrono::duration<duration_type, ratio_type> duration);
/*
Blocks until this shared-result is ready or timeout_time has reached.
Returns the status of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class clock_type , class duration_type >
result_status wait_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Blocks the current thread of execution until this shared-result is ready,
when status() != result_status::idle.
If the result is a valid value, a reference to it is returned,
otherwise, get rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
std:: add_lvalue_reference_t <type> get ();
/*
Returns an awaitable used to await this shared-result.
If the shared-result is already ready - the current coroutine resumes
immediately in the calling thread of execution.
If the shared-result is not ready yet, the current coroutine is
suspended and resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, a reference to it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to resolve this shared-result.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
};shared_result示例:在此示例中, result對象將轉換為shared_result對象,並引用了與thread_executor催生的許多任務一起對異步int結果的引用。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < chrono >
concurrencpp::result< void > consume_shared_result (concurrencpp::shared_result< int > shared_result,
std::shared_ptr<concurrencpp::executor> resume_executor) {
std::cout << " Awaiting shared_result to have a value " << std::endl;
const auto & async_value = co_await shared_result;
concurrencpp::resume_on (resume_executor);
std::cout << " In thread id " << std::this_thread::get_id () << " , got: " << async_value << " , memory address: " << &async_value << std::endl;
}
int main () {
concurrencpp::runtime runtime;
auto result = runtime. background_executor ()-> submit ([] {
std::this_thread::sleep_for ( std::chrono::seconds ( 1 ));
return 100 ;
});
concurrencpp::shared_result< int > shared_result ( std::move (result));
concurrencpp::result< void > results[ 8 ];
for ( size_t i = 0 ; i < 8 ; i++) {
results[i] = consume_shared_result (shared_result, runtime. thread_pool_executor ());
}
std::cout << " Main thread waiting for all consumers to finish " << std::endl;
auto tpe = runtime. thread_pool_executor ();
auto all_consumed = concurrencpp::when_all (tpe, std::begin (results), std::end (results)). run ();
all_consumed. get ();
std::cout << " All consumers are done, exiting " << std::endl;
return 0 ;
}當運行時對象脫離main範圍時,它會迭代每個存儲的執行程序並調用其shutdown方法。嘗試訪問計時器或任何執行人都會丟棄errors::runtime_shutdown異常。當執行人關閉時,它會清除其內部任務隊列,破壞未執行的task對象。如果任務對象存儲一個confurencpp-coroutine,則將該coroutine恢復為嵌入線,並且在其中拋出了一個errors::broken_task異常。在拋出runtime_shutdown或broken_task異常的任何情況下,應用程序應盡快優雅地終止其當前代碼流。這些例外不應被忽略。 runtime_shutdown和broken_task都從errors::interrupted_task基類繼承,並且此類型也可以在catch子句中使用以以統一的方式處理終止。
許多Concurrencpp異步動作都需要執行人作為簡歷執行人的實例。當異步操作(以coroutine的形式實施)可以同步完成時,它會立即恢復執行的調用線程。如果異步動作無法同步完成,則在給定的簡歷執行者內部將恢復它。例如, when_any實用程序函數需要簡歷 - 執行的實例作為其第一個參數。 when_any返回一個lazy_result時,當至少一個給定的結果準備就緒時就準備就緒。如果在when_any時已經準備好了一個結果之一,則在執行的調用線程中同時恢復調用coroutine。如果沒有,至少在結果完成後,將在給定的簡歷執行中恢復調用Coroutine。簡歷執行者很重要,因為在不清楚應該恢復coroutine的情況下,恢復了coroutines的位置(例如,在when_any和when_all時),或者如果在confurrencpp工人中處理異步行動的情況下,僅處理該特定操作代碼,而不是處理該特定的操作代碼。
make_ready_result函數make_ready_result從給定參數創建一個就緒結果對象。等待此結果將導致當前的Coroutine立即恢復。 get和operator co_await將返回構造的值。
/*
Creates a ready result object by building <<type>> from arguments&&... in-place.
Might throw any exception that the constructor
of type(std::forward<argument_types>(arguments)...) throws.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type , class ... argument_types>
result<type> make_ready_result (argument_types&& ... arguments);
/*
An overload for void type.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
result< void > make_ready_result ();make_exceptional_result函數make_exceptional_result從給定的異常中創建一個就緒結果對象。等待此結果將導致當前的Coroutine立即恢復。 get和operator co_await將重新啟動給定的例外。
/*
Creates a ready result object from an exception pointer.
The returned result object will re-throw exception_ptr when calling get or await.
Throws std::invalid_argument if exception_ptr is null.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type >
result<type> make_exceptional_result (std::exception_ptr exception_ptr);
/*
Overload. Similar to make_exceptional_result(std::exception_ptr),
but gets an exception object directly.
Might throw any exception that the constructor of exception_type(std::move(exception)) might throw.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type , class exception_type >
result<type> make_exceptional_result (exception_type exception );when_all函數時when_all是一個實用程序函數時,可以創建一個懶惰的結果對象,該對像在完成所有輸入結果時就準備就緒。等待這個懶惰的結果將返回已準備就緒狀態的所有輸入物質對象。
when_all函數帶有三種口味時 - 一種接受異質範圍的結果對象範圍,另一個將一對迭代器帶到相同類型的一系列結果對象,最後是一個根本不接受結果對象的過載。如果沒有輸入結果對象 - 該函數返回空元組的就緒結果對象。
如果通過的結果對象之一為空,則將拋出一個例外。在這種情況下,輸入值不受函數的影響,可以在處理異常後再次使用。如果所有輸入結果對像都是有效的,則將它們通過此功能清空,並以有效且就緒狀態返回作為輸出結果。
當前, when_all僅接受result對象時。
所有過載都接受簡歷執行程序作為其第一個參數。當等待when_all返回的結果時,給定的簡歷執行人將恢復呼叫者Coroutine。
/*
Creates a result object that becomes ready when all the input results become ready.
Passed result objects are emptied and returned as a tuple.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class ... result_types>
lazy_result<std::tuple< typename std::decay<result_types>::type...>>
when_all (std::shared_ptr<executor_type> resume_executor,
result_types&& ... results);
/*
Overload. Similar to when_all(result_types&& ...) but receives a pair of iterators referencing a range.
Passed result objects are emptied and returned as a vector.
If begin == end, the function returns immediately with an empty vector.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class iterator_type >
lazy_result<std::vector< typename std::iterator_traits<iterator_type>::value_type>>
when_all (std::shared_ptr<executor_type> resume_executor,
iterator_type begin, iterator_type end);
/*
Overload. Returns a ready result object that doesn't monitor any asynchronous result.
Might throw an std::bad_alloc exception if no memory is available.
*/
lazy_result<std::tuple<>> when_all (std::shared_ptr<executor_type> resume_executor);when_any函數時when_any是一個實用程序函數時,可以創建一個懶惰的結果對象,該對象至少完成一個輸入結果時就可以準備就緒。等待此結果將返回一個輔助結構,其中包含所有輸入重點對像以及完成任務的索引。可能是在食用準備成果時,其他結果可能已經異步完成。 when_any反復以消耗現成的結果時,申請可以在所有結果均消耗之前就可以撥打。
when_any函數僅帶有兩種口味時 - 一種接受異質範圍的結果對象範圍,而另一個則將一對迭代器帶到相同類型的一系列結果對象。與when_all不同,當結果範圍完全為空時,等待至少一項任務即可完成一項任務沒有意義。因此,沒有沒有參數的超載。同樣,如果這些迭代器引用空範圍( begin == end ),則兩個迭代器的過載將引發異常。
如果通過的結果對象之一為空,則將拋出一個例外。在任何情況下,都會拋出異常,輸入星期對像不受該函數的影響,並且可以在處理異常後再次使用。如果所有輸入結果對像都是有效的,則將它們用此函數清空,並以有效狀態返回作為輸出結果。
當前, when_any僅接受result對象時。
所有過載都接受簡歷執行程序作為其第一個參數。當等待when_any返回的結果時,給定的簡歷執行人將恢復呼叫者Coroutine。
/*
Helper struct returned from when_any.
index is the position of the ready result in results sequence.
results is either an std::tuple or an std::vector of the results that were passed to when_any.
*/
template < class sequence_type >
struct when_any_result {
std:: size_t index;
sequence_type results;
};
/*
Creates a result object that becomes ready when at least one of the input results is ready.
Passed result objects are emptied and returned as a tuple.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class ... result_types>
lazy_result<when_any_result<std::tuple<result_types...>>>
when_any (std::shared_ptr<executor_type> resume_executor,
result_types&& ... results);
/*
Overload. Similar to when_any(result_types&& ...) but receives a pair of iterators referencing a range.
Passed result objects are emptied and returned as a vector.
Throws std::invalid_argument if begin == end.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class iterator_type >
lazy_result<when_any_result<std::vector< typename std::iterator_traits<iterator_type>::value_type>>>
when_any (std::shared_ptr<executor_type> resume_executor,
iterator_type begin, iterator_type end);resume_on函數resume_on返回了一個等待的,該期待暫停了當前的coroutine並將其恢復給定executor 。這是一個重要的功能,可確保coroutine在正確的執行者中運行。例如,應用程序可能會使用background_executor安排背景任務,並等待返回的結果對象。在這種情況下,等待的Coroutine將在後台執行者內恢復。與另一個CPU結合的執行程序resume_on的呼叫可確保一旦後台任務完成後,CPU結合的代碼不會在後台執行程序上運行。如果將任務重新安排在使用resume_on上的另一個執行程序上運行,但是該執行程序在可以恢復暫停任務之前被關閉,則立即恢復該任務,並拋出一個erros::broken_task異常。在這種情況下,應用程序需要非常優雅。
/*
Returns an awaitable that suspends the current coroutine and resumes it inside executor.
Might throw any exception that executor_type::enqueue throws.
*/
template < class executor_type >
auto resume_on (std::shared_ptr<executor_type> executor);Concurrencpp還提供計時器和計時器隊列。計時器是在定義明確的時間間隔內定義在執行器上運行的異步操作的對象。有三種類型的計時器 -常規計時器,啟示式計時器和延遲對象。
常規計時器有四個定義它們的屬性:
像concurrencpp中的其他對像一樣,計時器是一個可以空的移動類型。當調用計時器或timer::cancel時,計時器會取消其計劃,但尚未執行任務。正在進行的任務未有效果。計時器可呼叫必須安全。建議將適當的時間和計時器的頻率設置為50毫秒的粒度。
計時器隊列是一個confurencpp的工人,它管理了一系列計時器,並僅以一個執行線程處理它們。它也是用於創建新計時器的代理。當計時器截止日期(無論是計時器的適當時間還是頻率)到達時,計時器隊列通過安排其可呼叫以在關聯的執行器上運行的任務來“觸發”計時器。
就像執行者一樣,計時器隊列也遵守RAII概念。當運行時對象脫離範圍時,它將關閉計時器隊列,取消所有待處理的計時器。關閉計時器隊列後,任何後續呼叫to make_timer , make_onshot_timer和make_delay_object都會拋出一個errors::runtime_shutdown exception。應用程序不得試圖自己關閉計時器隊列。
timer_queue api: class timer_queue {
/*
Destroys this timer_queue.
*/
~timer_queue () noexcept ;
/*
Shuts down this timer_queue:
Tells the underlying thread of execution to quit and joins it.
Cancels all pending timers.
After this call, invocation of any method besides shutdown
and shutdown_requested will throw an errors::runtime_shutdown.
If shutdown had been called before, this method has no effect.
*/
void shutdown () noexcept ;
/*
Returns true if shutdown had been called before, false otherwise.
*/
bool shutdown_requested () const noexcept ;
/*
Creates a new running timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
template < class callable_type , class ... argumet_types>
timer make_timer (
std::chrono::milliseconds due_time,
std::chrono::milliseconds frequency,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new one-shot timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
template < class callable_type , class ... argumet_types>
timer make_one_shot_timer (
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new delay object where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
result< void > make_delay_object (
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor);
};timer API: class timer {
/*
Creates an empty timer.
*/
timer () noexcept = default ;
/*
Cancels the timer, if not empty.
*/
~timer () noexcept ;
/*
Moves the content of rhs to *this.
rhs is empty after this call.
*/
timer (timer&& rhs) noexcept = default ;
/*
Moves the content of rhs to *this.
rhs is empty after this call.
Returns *this.
*/
timer& operator = (timer&& rhs) noexcept ;
/*
Cancels this timer.
After this call, the associated timer_queue will not schedule *this
to run again and *this becomes empty.
Scheduled, but not yet executed tasks are cancelled.
Ongoing tasks are uneffected.
This method has no effect if *this is empty or the associated timer_queue has already expired.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void cancel ();
/*
Returns the associated executor of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::shared_ptr<executor> get_executor () const ;
/*
Returns the associated timer_queue of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::weak_ptr<timer_queue> get_timer_queue () const ;
/*
Returns the due time of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::chrono::milliseconds get_due_time () const ;
/*
Returns the frequency of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::chrono::milliseconds get_frequency () const ;
/*
Sets new frequency for this timer.
Callables already scheduled to run at the time of invocation are not affected.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
void set_frequency (std::chrono::milliseconds new_frequency);
/*
Returns true is *this is not an empty timer, false otherwise.
The timer should not be used if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
};在此示例中,我們使用計時器隊列創建一個常規計時器。計時器安排其可呼叫以在1.5秒後運行,然後每2秒發射一次可可。給定的可呼叫運行在ThreadPool執行程序上。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
int main () {
concurrencpp::runtime runtime;
std:: atomic_size_t counter = 1 ;
concurrencpp::timer timer = runtime. timer_queue ()-> make_timer (
1500ms,
2000ms,
runtime. thread_pool_executor (),
[&] {
const auto c = counter. fetch_add ( 1 );
std::cout << " timer was invoked for the " << c << " th time " << std::endl;
});
std::this_thread::sleep_for (12s);
return 0 ;
}OneShot計時器是一個一次性計時器,只有適當的時間 - 在安排其可召喚的可召喚之後,一旦它從未重新安排它再次運行。
在此示例中,我們創建一個僅運行一次的計時器 - 從其創建後3秒鐘後,計時器將安排其可呼叫以在新的執行線程上運行(使用thread_executor )。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
int main () {
concurrencpp::runtime runtime;
concurrencpp::timer timer = runtime. timer_queue ()-> make_one_shot_timer (
3000ms,
runtime. thread_executor (),
[&] {
std::cout << " hello and goodbye " << std::endl;
});
std::this_thread::sleep_for (4s);
return 0 ;
}延遲對像是一個懶惰的結果對象,當它是co_await ed時就可以準備就緒的對象,並且到達了適當的時間。應用程序可以co_await此結果對象,以以非阻滯方式延遲當前的Coroutine。當前的coroutine由傳遞給make_delay_object執行人恢復。
在此示例中,我們產生了一個任務(不會返回任何結果或投擲異常),該任務通過在延遲對像上調用co_await來延遲循環。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
concurrencpp::null_result delayed_task (
std::shared_ptr<concurrencpp::timer_queue> tq,
std::shared_ptr<concurrencpp::thread_pool_executor> ex) {
size_t counter = 1 ;
while ( true ) {
std::cout << " task was invoked " << counter << " times. " << std::endl;
counter++;
co_await tq-> make_delay_object (1500ms, ex);
}
}
int main () {
concurrencpp::runtime runtime;
delayed_task (runtime. timer_queue (), runtime. thread_pool_executor ());
std::this_thread::sleep_for (10s);
return 0 ;
}發電機是一種懶惰的同步Coroutine,能夠產生以消耗的值。發電機使用co_yield關鍵字將值回到消費者中。
發電機是同步使用的 - 他們只能使用co_yield關鍵字,並且不得使用co_await關鍵字。只要調用co_yield關鍵字,生成器將繼續產生值。如果co_return關鍵字被調用(明確或隱式),則生成器將停止產生值。同樣,如果拋出異常,則發電機將停止產生值,並且拋出的異常將被重新提交給發電機的消費者。
發電機應在range-for中使用:發電機隱式產生兩個迭代器 - begin和end ,以控制for循環的執行。這些迭代器不應手動處理或訪問。
當創建發電機時,它是從懶惰任務開始的。當調用其begin方法時,首次恢復發電機並返回迭代器。通過在返回的迭代器上調用operator++ ,可以重複恢復懶惰的任務。當發電機通過優雅地退出或拋出異常來完成執行時,返回的迭代器將等於end迭代器。如前所述,這是通過循環和發電機的內部機制在幕後發生的,不應直接調用。
像Concurrencpp中的其他對像一樣,發電機是僅動作的類型。移動發電機後,它被認為是空的,並且試圖訪問其內部方法( operator bool以外)將引發異常。發電機的空虛通常不應該發生 - 建議在for中創建發電機時消耗發電機,而不要試圖單獨調用其方法。
generator API class generator {
/*
Move constructor. After this call, rhs is empty.
*/
generator (generator&& rhs) noexcept ;
/*
Destructor. Invalidates existing iterators.
*/
~generator () noexcept ;
generator ( const generator& rhs) = delete ;
generator& operator =(generator&& rhs) = delete ;
generator& operator =( const generator& rhs) = delete ;
/*
Returns true if this generator is not empty.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Starts running this generator and returns an iterator.
Throws errors::empty_generator if *this is empty.
Re-throws any exception that is thrown inside the generator code.
*/
iterator begin ();
/*
Returns an end iterator.
*/
static generator_end_iterator end () noexcept ;
};
class generator_iterator {
using value_type = std:: remove_reference_t <type>;
using reference = value_type&;
using pointer = value_type*;
using iterator_category = std::input_iterator_tag;
using difference_type = std:: ptrdiff_t ;
/*
Resumes the suspended generator and returns *this.
Re-throws any exception that was thrown inside the generator code.
*/
generator_iterator& operator ++();
/*
Post-increment version of operator++.
*/
void operator ++( int );
/*
Returns the latest value produced by the associated generator.
*/
reference operator *() const noexcept ;
/*
Returns a pointer to the latest value produced by the associated generator.
*/
pointer operator ->() const noexcept ;
/*
Comparision operators.
*/
friend bool operator ==( const generator_iterator& it0, const generator_iterator& it1) noexcept ;
friend bool operator ==( const generator_iterator& it, generator_end_iterator) noexcept ;
friend bool operator ==(generator_end_iterator end_it, const generator_iterator& it) noexcept ;
friend bool operator !=( const generator_iterator& it, generator_end_iterator end_it) noexcept ;
friend bool operator !=(generator_end_iterator end_it, const generator_iterator& it) noexcept ;
};generator示例:在此示例中,我們將編寫一個生成器,該發電機產生序列S(n) = 1 + 2 + 3 + ... + n的n-thenter,其中n <= 100 :
concurrencpp::generator< int > sequence () {
int i = 1 ;
int sum = 0 ;
while (i <= 100 ) {
sum += i;
++i;
co_yield sum;
}
}
int main () {
for ( auto value : sequence ()) {
std::cout << value << std::end;
}
return 0 ;
} 由於多種原因,常規同步鎖無法安全地使用內部任務:
std::mutex )有望在同一執行線程中鎖定並解鎖。在沒有鎖定的線程中解鎖同步鎖定是不確定的行為。由於可以在任何執行線程中暫停任務並恢復任務,因此在內部任務中使用時同步鎖將破裂。 concurrencpp::async_lock通過提供與std::mutex類似的API來解決這些問題,並帶有呼叫concurrencpp::async_lock主要區別,將返回可以安全地將其安全地與內部任務co_awaited懶惰。如果一個任務試圖鎖定異步鎖並失敗,則該任務將被暫停,並且當鎖定任務解鎖和獲取時,將恢復任務。這使執行者可以處理大量的任務,等待獲得鎖定而無需昂貴的上下文開關和昂貴的內核電話。
類似於std::mutex工作方式,只有一個任務可以在任何給定時間獲取async_lock ,並且在獲取時讀取屏障。釋放異步鎖定的寫入障礙,並允許下一個任務獲取它,從而創建一個單模型的鏈條,以看到其他修飾符所做的更改,並發布其修改,以供下一個修飾符看到。
就像std::mutex一樣, concurrencpp::async_lock不是遞歸的。在獲取此類鎖時必須給出額外的關注 - 在已經獲得鎖定的另一個任務所產生的任務中,不得再次獲得鎖。在這種情況下,將發生不可避免的死鎖。與concurrencpp中的其他對像不同, async_lock既不是共配也不是可移動的。
像標準鎖一樣, concurrencpp::async_lock旨在與示波器包裝器一起使用,這些包裝器利用C ++ RAII IDIOM來確保在功能返回或投擲異常時始終解鎖鎖。 async_lock::lock返回了一個懶惰的包裝器的懶惰,該包裝器稱為async_lock::unlock破壞。不建議使用async_lock::unlock的原始用途。 concurrencpp::scoped_async_lock充當範圍的包裝器,並提供了與std::unique_lock幾乎相同的API。 concurrencpp::scoped_async_lock是可移動的,但不交流。
async_lock::lock and scoped_async_lock::lock需要簡歷 - 授權作為其參數。調用這些方法後,如果可以鎖定鎖定,則將其鎖定,並且當前任務立即恢復。如果不是,則當前任務將被暫停,並在最終獲得鎖定時將恢復給定的簡歷執行。
concurrencpp::scoped_async_lock包裝async_lock並確保其正確解鎖。像std::unique_lock一樣,在某些情況下它不會包裝任何鎖,在這種情況下,它被認為是空的。當調用空scoped_async_lock默認構造,移動或scoped_async_lock::release時,可能會發生。空的示波器鎖定鎖不會解鎖任何破壞的鎖。
即使范圍 - 驅動器鎖並不空,也不意味著它擁有基礎異步鎖,並且會在破壞時解鎖。如果使用scoped_async_lock::unlock scoped_async_lock(async_lock&, std::defer_lock_t)構建構建器,則非空和非所有範圍示波器鎖定鎖可能會發生。
async_lock api class async_lock {
/*
Constructs an async lock object.
*/
async_lock () noexcept ;
/*
Destructs an async lock object.
*this is not automatically unlocked at the moment of destruction.
*/
~async_lock () noexcept ;
/*
Asynchronously acquires the async lock.
If *this has already been locked by another non-parent task, the current task will be suspended
and will be resumed when *this is acquired, inside resume_executor.
If *this has not been locked by another task, then *this will be acquired and the current task will be resumed
immediately in the calling thread of execution.
If *this has already been locked by a parent task, then unavoidable dead-lock will occur.
Throws std::invalid_argument if resume_executor is null.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result<scoped_async_lock> lock (std::shared_ptr<executor> resume_executor);
/*
Tries to acquire *this in the calling thread of execution.
Returns true if *this is acquired, false otherwise.
In any case, the current task is resumed immediately in the calling thread of execution.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result< bool > try_lock ();
/*
Releases *this and allows other tasks (including suspended tasks waiting for *this) to acquire it.
Throws std::system error if *this is not locked at the moment of calling this method.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
void unlock ();
};scoped_async_lock api class scoped_async_lock {
/*
Constructs an async lock wrapper that does not wrap any async lock.
*/
scoped_async_lock () noexcept = default ;
/*
If *this wraps async_lock, this method releases the wrapped lock.
*/
~scoped_async_lock () noexcept ;
/*
Moves rhs to *this.
After this call, *rhs does not wrap any async lock.
*/
scoped_async_lock (scoped_async_lock&& rhs) noexcept ;
/*
Wrapps unlocked lock.
lock must not be in acquired mode when calling this method.
*/
scoped_async_lock (async_lock& lock, std:: defer_lock_t ) noexcept ;
/*
Wrapps locked lock.
lock must be already acquired when calling this method.
*/
scoped_async_lock (async_lock& lock, std:: adopt_lock_t ) noexcept ;
/*
Calls async_lock::lock on the wrapped locked, using resume_executor as a parameter.
Throws std::invalid_argument if resume_executor is nulll.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::lock throws.
*/
lazy_result< void > lock (std::shared_ptr<executor> resume_executor);
/*
Calls async_lock::try_lock on the wrapped lock.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::try_lock throws.
*/
lazy_result< bool > try_lock ();
/*
Calls async_lock::unlock on the wrapped lock.
If *this does not wrap any lock, this method does nothing.
Throws std::system_error if *this wraps a lock and it is not locked.
*/
void unlock ();
/*
Checks whether *this wraps a locked mutex or not.
Returns true if wrapped locked is in acquired state, false otherwise.
*/
bool owns_lock () const noexcept ;
/*
Equivalent to owns_lock.
*/
explicit operator bool () const noexcept ;
/*
Swaps the contents of *this and rhs.
*/
void swap (scoped_async_lock& rhs) noexcept ;
/*
Empties *this and returns a pointer to the previously wrapped lock.
After a call to this method, *this doesn't wrap any lock.
The previously wrapped lock is not released,
it must be released by either unlocking it manually through the returned pointer or by
capturing the pointer with another scoped_async_lock which will take ownerwhip over it.
*/
async_lock* release () noexcept ;
/*
Returns a pointer to the wrapped async_lock, or a null pointer if there is no wrapped async_lock.
*/
async_lock* mutex () const noexcept ;
};async_lock示例:在此示例中,我們同時將10,000,000個整數從不同的任務轉到std::vector對象,同時使用async_lock確保不會發生數據競爭,並且保留了該向量對象的內部狀態的正確性。
# include " concurrencpp/concurrencpp.h "
# include < vector >
# include < iostream >
std::vector< size_t > numbers;
concurrencpp::async_lock lock;
concurrencpp::result< void > add_numbers (concurrencpp::executor_tag,
std::shared_ptr<concurrencpp::executor> executor,
size_t begin,
size_t end) {
for ( auto i = begin; i < end; i++) {
concurrencpp::scoped_async_lock raii_wrapper = co_await lock. lock (executor);
numbers. push_back (i);
}
}
int main () {
concurrencpp::runtime runtime;
constexpr size_t range = 10'000'000 ;
constexpr size_t sections = 4 ;
concurrencpp::result< void > results[sections];
for ( size_t i = 0 ; i < 4 ; i++) {
const auto range_start = i * range / sections;
const auto range_end = (i + 1 ) * range / sections;
results[i] = add_numbers ({}, runtime. thread_pool_executor (), range_start, range_end);
}
for ( auto & result : results) {
result. get ();
}
std::cout << " vector size is " << numbers. size () << std::endl;
// make sure the vector state has not been corrupted by unprotected concurrent accesses
std::sort (numbers. begin (), numbers. end ());
for ( size_t i = 0 ; i < range; i++) {
if (numbers[i] != i) {
std::cerr << " vector state is corrupted. " << std::endl;
return - 1 ;
}
}
std::cout << " succeeded pushing range [0 - 10,000,000] concurrently to the vector! " << std::endl;
return 0 ;
}async_condition_variable模仿標準condition_variable ,可以與async_lock一起安全使用。 async_condition_variable與async_lock一起使用暫停任務,直到某些共享內存(受鎖定為鎖定)已更改。想要監視共享內存更改的任務將鎖定async_lock的實例,並調用async_condition_variable::await 。這將在原子上解鎖鎖定並暫停當前任務,直到某些修改器任務通知條件變量為止。修飾符任務獲取鎖,修改共享內存,解鎖鎖定並調用notify_one或notify_all 。當恢復懸掛任務(使用給出await的簡歷執行人)時,它再次鎖定鎖,從而使該任務從無縫的懸架點開始。像async_lock一樣, async_condition_variable既不是可移動或共配 - 它是在一個地方創建的,並通過多個任務訪問。
async_condition_variable::await重載需要簡歷 - 執行,該簡歷將用於恢復任務,以及鎖定的scoped_async_lock 。 async_condition_variable::await兩個過載 - 一種接受謂詞,一個不接受謂詞。不接受謂詞的過載將在調用後立即暫停調用任務,直到通知notify_* 。通過讓謂詞檢查共享內存並重複暫停任務,直到共享內存達到所需狀態,那確實接受謂詞的過載可以工作。示意性地工作就像打電話
while (!pred()) { // pred() inspects the shared memory and returns true or false
co_await await (resume_executor, lock); // suspend the current task until another task calls `notify_xxx`
}就像標準條件變量一樣,鼓勵應用程序使用謂詞置換載荷,因為它可以對懸架和恢復進行更細粒度的控制。 async_condition_variable可用於編寫並發集合和數據結構(例如並發隊列和頻道)。
在內部, async_condition_variable擁有一個懸架標題,其中任務在等待要通知的條件變量時會引入自己。當調用任何一個notify_*方法時,根據調用方法,通知任務刪除一個任務或所有任務。任務以FIFO的方式從懸架標題中脫水。例如,如果任務A呼叫await ,然後任務B調用await ,則任務C調用notify_one ,然後內部任務A將被脫水並恢復。任務B將保持暫停,直到調用notify_one或notify_all另一個呼叫。如果任務A和任務B被暫停,並且任務C調用notify_all ,則兩個任務將被脫水和恢復。
async_condition_variable API class async_condition_variable {
/*
Constructor.
*/
async_condition_variable () noexcept ;
/*
Atomically releases lock and suspends the current task by adding it to *this suspension-queue.
Throws std::invalid_argument if resume_executor is null.
Throws std::invalid_argument if lock is not locked at the moment of calling this method.
Might throw std::system_error if the underlying std::mutex throws.
*/
lazy_result< void > await (std::shared_ptr<executor> resume_executor, scoped_async_lock& lock);
/*
Equivalent to:
while (!pred()) {
co_await await(resume_executor, lock);
}
Might throw any exception that await(resume_executor, lock) might throw.
Might throw any exception that pred might throw.
*/
template < class predicate_type >
lazy_result< void > await (std::shared_ptr<executor> resume_executor, scoped_async_lock& lock, predicate_type pred);
/*
Dequeues one task from *this suspension-queue and resumes it, if any available at the moment of calling this method.
The suspended task is resumed by scheduling it to run on the executor given when await was called.
Might throw std::system_error if the underlying std::mutex throws.
*/
void notify_one ();
/*
Dequeues all tasks from *this suspension-queue and resumes them, if any available at the moment of calling this method.
The suspended tasks are resumed by scheduling them to run on the executors given when await was called.
Might throw std::system_error if the underlying std::mutex throws.
*/
void notify_all ();
};async_condition_variable示例:在此示例中, async_lock和async_condition_variable共同實現了一個並發隊列,該隊列可用於在任務之間發送數據(在此示例中,整數)。請注意,某些方法在另一個返回lazy_result的同時返回result ,顯示了渴望和懶惰任務如何一起工作。
# include " concurrencpp/concurrencpp.h "
# include < queue >
# include < iostream >
using namespace concurrencpp ;
class concurrent_queue {
private:
async_lock _lock;
async_condition_variable _cv;
std::queue< int > _queue;
bool _abort = false ;
public:
concurrent_queue () = default ;
result< void > shutdown (std::shared_ptr<executor> resume_executor) {
{
auto guard = co_await _lock. lock (resume_executor);
_abort = true ;
}
_cv. notify_all ();
}
lazy_result< void > push (std::shared_ptr<executor> resume_executor, int i) {
{
auto guard = co_await _lock. lock (resume_executor);
_queue. push (i);
}
_cv. notify_one ();
}
lazy_result< int > pop (std::shared_ptr<executor> resume_executor) {
auto guard = co_await _lock. lock (resume_executor);
co_await _cv. await (resume_executor, guard, [ this ] {
return _abort || !_queue. empty ();
});
if (!_queue. empty ()) {
auto result = _queue. front ();
_queue. pop ();
co_return result;
}
assert (_abort);
throw std::runtime_error ( " queue has been shut down. " );
}
};
result< void > producer_loop (executor_tag,
std::shared_ptr<thread_pool_executor> tpe,
concurrent_queue& queue,
int range_start,
int range_end) {
for (; range_start < range_end; ++range_start) {
co_await queue. push (tpe, range_start);
}
}
result< void > consumer_loop (executor_tag, std::shared_ptr<thread_pool_executor> tpe, concurrent_queue& queue) {
try {
while ( true ) {
std::cout << co_await queue. pop (tpe) << std::endl;
}
} catch ( const std:: exception & e) {
std::cerr << e. what () << std::endl;
}
}
int main () {
runtime runtime;
const auto thread_pool_executor = runtime. thread_pool_executor ();
concurrent_queue queue;
result< void > producers[ 4 ];
result< void > consumers[ 4 ];
for ( int i = 0 ; i < 4 ; i++) {
producers[i] = producer_loop ({}, thread_pool_executor, queue, i * 5 , (i + 1 ) * 5 );
}
for ( int i = 0 ; i < 4 ; i++) {
consumers[i] = consumer_loop ({}, thread_pool_executor, queue);
}
for ( int i = 0 ; i < 4 ; i++) {
producers[i]. get ();
}
queue. shutdown (thread_pool_executor). get ();
for ( int i = 0 ; i < 4 ; i++) {
consumers[i]. get ();
}
return 0 ;
}concurrencpp運行時對像是用於獲取,存儲和創建新執行者的代理。
運行時必須在主函數開始運行後立即創建為值類型。當Consurrencpp運行時脫離範圍時,它會在其存儲的執行者上迭代,並通過致電executor::shutdown將其一個接一個地關閉。然後,執行者退出其內部工作循環,任何隨後的安排新任務的嘗試都會引發concurrencpp::runtime_shutdown異常。運行時還包含用於創建計時器和延遲對象的全局計時器隊列。毀滅後,存儲的執行者摧毀了未執行的任務,並等待正在進行的任務完成。如果正在進行的任務試圖使用執行者催生新任務或安排自己的任務繼續 - 將會引發例外。在這種情況下,持續的任務需要盡快退出,允許其基礎執行者退出。計時器隊列也將關閉,取消所有運行計時器。有了這種RAII的代碼樣式,在創建運行時對像以及運行時/之後的範圍之後,無法處理任何任務。這使同時應用的應用程序不需要明確通信終止消息。只要運行時對像還活著,任務就是免費使用執行者。
runtime API class runtime {
/*
Creates a runtime object with default options.
*/
runtime ();
/*
Creates a runtime object with user defined options.
*/
runtime ( const concurrencpp::runtime_options& options);
/*
Destroys this runtime object.
Calls executor::shutdown on each monitored executor.
Calls timer_queue::shutdown on the global timer queue.
*/
~runtime () noexcept ;
/*
Returns this runtime timer queue used to create new times.
*/
std::shared_ptr<concurrencpp::timer_queue> timer_queue () const noexcept ;
/*
Returns this runtime concurrencpp::inline_executor
*/
std::shared_ptr<concurrencpp::inline_executor> inline_executor () const noexcept ;
/*
Returns this runtime concurrencpp::thread_pool_executor
*/
std::shared_ptr<concurrencpp::thread_pool_executor> thread_pool_executor () const noexcept ;
/*
Returns this runtime concurrencpp::background_executor
*/
std::shared_ptr<concurrencpp::thread_pool_executor> background_executor () const noexcept ;
/*
Returns this runtime concurrencpp::thread_executor
*/
std::shared_ptr<concurrencpp::thread_executor> thread_executor () const noexcept ;
/*
Creates a new concurrencpp::worker_thread_executor and registers it in this runtime.
Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
*/
std::shared_ptr<concurrencpp::worker_thread_executor> make_worker_thread_executor ();
/*
Creates a new concurrencpp::manual_executor and registers it in this runtime.
Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
*/
std::shared_ptr<concurrencpp::manual_executor> make_manual_executor ();
/*
Creates a new user defined executor and registers it in this runtime.
executor_type must be a valid concrete class of concurrencpp::executor.
Might throw std::bad_alloc if no memory is available.
Might throw any exception that the constructor of <<executor_type>> might throw.
*/
template < class executor_type , class ... argument_types>
std::shared_ptr<executor_type> make_executor (argument_types&& ... arguments);
/*
returns the version of concurrencpp that the library was built with.
*/
static std::tuple< unsigned int , unsigned int , unsigned int > version () noexcept ;
};在某些情況下,應用程序對監視線程創建和終止感興趣,例如,某些內存分配器需要在創建和終止時註冊並未註冊新線程。 concurrencpp運行時允許設置線程創建回調和線程終止回調。每當ConsurrenCpp工人創建一個新線程以及該線程終止時,這些回調將被調用。這些回調總是從創建/終止線程內部調用,因此std::this_thread::get_id將始終返回相關的線程ID。這些回調的簽名是void callback (std::string_view thread_name) 。 thread_name是一個confurrencpp特定標題,它給出了線程,可以在一些呈現線程名稱的辯論者中觀察到。線程名稱不能保證是唯一的,應用於記錄和調試。
為了設置線程創建回調和/或線程終止回調,應用程序可以設置runtime_options的thread_started_callback和/或thread_terminated_callback成員,該成員傳遞給了運行時構建器。由於這些回調被複製到可能創建線程的每個contrencpp工人,因此這些回調必須是可交互的。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::runtime_options options;
options. thread_started_callback = [](std::string_view thread_name) {
std::cout << " A new thread is starting to run, name: " << thread_name << " , thread id: " << std::this_thread::get_id ()
<< std::endl;
};
options. thread_terminated_callback = [](std::string_view thread_name) {
std::cout << " A thread is terminating, name: " << thread_name << " , thread id: " << std::this_thread::get_id () << std::endl;
};
concurrencpp::runtime runtime (options);
const auto timer_queue = runtime. timer_queue ();
const auto thread_pool_executor = runtime. thread_pool_executor ();
concurrencpp::timer timer =
timer_queue-> make_timer ( std::chrono::milliseconds ( 100 ), std::chrono::milliseconds ( 500 ), thread_pool_executor, [] {
std::cout << " A timer callable is executing " << std::endl;
});
std::this_thread::sleep_for ( std::chrono::seconds ( 3 ));
return 0 ;
}可能的輸出:
A new thread is starting to run, name: concurrencpp::timer_queue worker, thread id: 7496
A new thread is starting to run, name: concurrencpp::thread_pool_executor worker, thread id: 21620
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A thread is terminating, name: concurrencpp::timer_queue worker, thread id: 7496
A thread is terminating, name: concurrencpp::thread_pool_executor worker, thread id: 21620
應用程序可以通過繼承derivable_executor類創建自己的自定義執行類型。實現用戶定義的執行者時,有幾點要考慮:最重要的是要記住,執行者是從多個線程中使用的,因此實現的方法必須是線程安全的。
可以使用runtime::make_executor創建新執行者。僅通過使用runtime::make_executor ,應用程序不得使用普通實例化創建新的執行者(例如std::make_shared或Plain new )。此外,應用程序不得嘗試重新確定內置的concurrencpp執行者,例如thread_pool_executor或thread_executor ,這些執行者只能通過運行時對像中的現有實例訪問這些執行者。
另一個重要的一點是正確處理關閉: shutdown , shutdown_requested and enqueue均應監視執行者狀態並在調用時相應地行事:
shutdown應告訴基礎線程退出,然後加入它們。shutdown ,該方法必須通過忽略第一次調用後任何後續的shutdown調用來處理此情況。shutdown , enqueue必須投擲concurrencpp::errors::runtime_shutdown異常。 task對象實施執行者是罕見的情況之一,需要直接與concurrencpp::task類。 concurrencpp::task是一個類似對象的std::function ,但有一些差異。與std::function一樣,任務對象存儲可調用,該可容納可作為異步操作。與std::function不同, task是僅移動類型。在調用時,任務對像沒有接收參數並返回void 。此外,每個任務對像只能調用一次。第一次調用後,任務對像變為空。調用一個空任務對像等於調用一個空的lambda( []{} ),並且不會拋出任何例外。任務對象將其可調用作為轉發參考( type&& type是模板參數),而不是通過複製(例如std::function )。存儲的可呼叫的構造發生在就位。這允許任務對象包含一個僅移動類型的可可(例如std::unique_ptr和concurrencpp::result )。任務對象嘗試使用不同的方法來優化存儲類型的使用情況,例如,任務對象將短緩衝區優化(SBO)應用於常規,小可可的情況,並將對std::coroutine_handle<void>內聯呼叫將其直接致電無需虛擬派遣而撥打。
task API class task {
/*
Creates an empty task object.
*/
task () noexcept ;
/*
Creates a task object by moving the stored callable of rhs to *this.
If rhs is empty, then *this will also be empty after construction.
After this call, rhs is empty.
*/
task (task&& rhs) noexcept ;
/*
Creates a task object by storing callable in *this.
<<typename std::decay<callable_type>::type>> will be in-place-
constructed inside *this by perfect forwarding callable.
*/
template < class callable_type >
task (callable_type&& callable);
/*
Destroys stored callable, does nothing if empty.
*/
~task () noexcept ;
/*
If *this is empty, does nothing.
Invokes stored callable, and immediately destroys it.
After this call, *this is empty.
May throw any exception that the invoked callable may throw.
*/
void operator ()();
/*
Moves the stored callable of rhs to *this.
If rhs is empty, then *this will also be empty after this call.
If *this already contains a stored callable, operator = destroys it first.
*/
task& operator =(task&& rhs) noexcept ;
/*
If *this is not empty, task::clear destroys the stored callable and empties *this.
If *this is empty, clear does nothing.
*/
void clear () noexcept ;
/*
Returns true if *this stores a callable. false otherwise.
*/
explicit operator bool () const noexcept ;
/*
Returns true if *this stores a callable,
and that stored callable has the same type as <<typename std::decay<callable_type>::type>>
*/
template < class callable_type >
bool contains () const noexcept ;
};在實施用戶定義的執行者時,要按照執行者的內部機制來存儲task對象的實現(當enqueue入口時)。
在此示例中,我們創建了一個執行者,該執行人記錄了諸如完成任務或執行任務之類的操作。我們實現了executor接口,並請求運行時通過調用runtime::make_executor來創建和存儲它的實例。其餘的應用程序的行為與我們使用非用戶定義的執行者完全相同。
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < queue >
# include < thread >
# include < mutex >
# include < condition_variable >
class logging_executor : public concurrencpp ::derivable_executor<logging_executor> {
private:
mutable std::mutex _lock;
std::queue<concurrencpp::task> _queue;
std::condition_variable _condition;
bool _shutdown_requested;
std::thread _thread;
const std::string _prefix;
void work_loop () {
while ( true ) {
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
return ;
}
if (!_queue. empty ()) {
auto task = std::move (_queue. front ());
_queue. pop ();
lock. unlock ();
std::cout << _prefix << " A task is being executed " << std::endl;
task ();
continue ;
}
_condition. wait (lock, [ this ] {
return !_queue. empty () || _shutdown_requested;
});
}
}
public:
logging_executor (std::string_view prefix) :
derivable_executor<logging_executor>( " logging_executor " ),
_shutdown_requested ( false ),
_prefix (prefix) {
_thread = std::thread ([ this ] {
work_loop ();
});
}
void enqueue (concurrencpp::task task) override {
std::cout << _prefix << " A task is being enqueued! " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
throw concurrencpp::errors::runtime_shutdown ( " logging executor - executor was shutdown. " );
}
_queue. emplace ( std::move (task));
_condition. notify_one ();
}
void enqueue (std::span<concurrencpp::task> tasks) override {
std::cout << _prefix << tasks. size () << " tasks are being enqueued! " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
throw concurrencpp::errors::runtime_shutdown ( " logging executor - executor was shutdown. " );
}
for ( auto & task : tasks) {
_queue. emplace ( std::move (task));
}
_condition. notify_one ();
}
int max_concurrency_level () const noexcept override {
return 1 ;
}
bool shutdown_requested () const noexcept override {
std::unique_lock<std::mutex> lock (_lock);
return _shutdown_requested;
}
void shutdown () noexcept override {
std::cout << _prefix << " shutdown requested " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) return ; // nothing to do.
_shutdown_requested = true ;
lock. unlock ();
_condition. notify_one ();
_thread. join ();
}
};
int main () {
concurrencpp::runtime runtime;
auto logging_ex = runtime. make_executor <logging_executor>( " Session #1234 " );
for ( size_t i = 0 ; i < 10 ; i++) {
logging_ex-> post ([] {
std::cout << " hello world " << std::endl;
});
}
std::getchar ();
return 0 ;
}$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S . -B build /lib
$ cmake -- build build /lib --config Release$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S test -B build / test
$ cmake -- build build / test
< # for release mode: cmake --build build/test --config Release #>
$ cd build / test
$ ctest . -V -C Debug
< # for release mode: ctest . -V -C Release #> $ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -DCMAKE_BUILD_TYPE=Release -S . -B build /lib
$ cmake -- build build /lib
#optional, install the library: sudo cmake --install build/lib 使用Clang和GCC,也可以使用TSAN(線程消毒器)支持進行測試。
$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S test -B build / test
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S test -B build/test
#for TSAN mode: cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_THREAD_SANITIZER=Yes -S test -B build/test
$ cmake -- build build / test
$ cd build / test
$ ctest . -V在Linux上編譯時,該庫默認使用libstdc++使用。如果您打算將libc++作為標準庫實現,則應指定CMAKE_TOOLCHAIN_FILE標誌如下:
$ cmake -DCMAKE_TOOLCHAIN_FILE=../cmake/libc++.cmake -DCMAKE_BUILD_TYPE=Release -S . -B build /lib或者,為手動構建和安裝圖書館,開發人員可以通過VCPKG和Conan軟件包經理獲得Concurrencpp的穩定版本:
VCPKG:
$ vcpkg install concurrencpp柯南:同意
Concurrencpp帶有一個內置的沙盒程序,開發人員可以修改和實驗,而無需安裝或鏈接編譯的庫與其他代碼庫。為了使用沙箱,開發人員可以修改sandbox/main.cpp並使用以下命令編譯應用程序:
$ cmake -S sandbox -B build /sandbox
$ cmake -- build build /sandbox
< # for release mode: cmake --build build/sandbox --config Release #>
$ ./ build /sandbox < # runs the sandbox> $ cmake -S sandbox -B build /sandbox
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S sandbox -B build/sandbox
$ cmake -- build build /sandbox
$ ./ build /sandbox #runs the sandbox