يجلب Concurrencpp قوة المهام المتزامنة إلى عالم C ++ ، مما يسمح للمطورين بكتابة تطبيقات متزامنة للغاية بسهولة وأمان باستخدام المهام والمنفذين والكوروتين. باستخدام تطبيقات concurrencpp يمكن أن تقطع الإجراءات الكبيرة التي يجب معالجتها بشكل غير متزامن في مهام أصغر تعمل بشكل متزامن وتعمل بطريقة تعاونية لتحقيق النتيجة المطلوبة. يسمح Concurrencpp أيضًا للتطبيقات بكتابة خوارزميات متوازية بسهولة باستخدام coroutines المتوازية.
المزايا الرئيسية Concurrencpp هي:
std::thread و std::mutex .co_await .executor APIthread_pool_executor APImanual_executor APIresultresult APIlazy_resultlazy_result APIresult_promise APIresult_promiseshared_resultshared_resultmake_ready_resultmake_exceptional_resultwhen_allwhen_anyresume_ontimer_queue APItimer واجهة برمجة التطبيقاتgenerator واجهة برمجة التطبيقاتgeneratorasync_lock APIscoped_async_lock APIasync_lockasync_condition_variable APIasync_condition_variableruntimetasktask APIتم بناء Concurrencpp حول مفهوم المهام المتزامنة. المهمة هي عملية غير متزامنة. توفر المهام مستوى أعلى من التجريد للرمز المتزامن من الأساليب التقليدية المتمحورة حول مؤشرات الترابط. يمكن ربط المهام ببعضها البعض ، مما يعني أن المهام تمر نتائجها غير المتزامنة من واحد إلى آخر ، حيث يتم استخدام نتيجة مهمة واحدة كما لو كانت معلمة أو قيمة وسيطة لمهمة أخرى مستمرة. تسمح المهام بالتطبيقات باستخدام موارد الأجهزة المتاحة بشكل أفضل وتوسيع نطاقها أكثر بكثير من استخدام الخيوط الأولية ، حيث يمكن تعليق المهام ، في انتظار مهمة أخرى لإنتاج نتيجة ، دون منع اختلافات نظام التشغيل الأساسي. تجلب المهام المزيد من الإنتاجية للمطورين من خلال السماح لهم بالتركيز بشكل أكبر على سقوط الأعمال وأقل على مفاهيم منخفضة المستوى مثل إدارة الخيوط ومزامنة بين الخيوط.
على الرغم من أن المهام تحدد الإجراءات التي يجب تنفيذها ، فإن المنفذين هم من العمال الذين يحددون أين وكيفية تنفيذ المهام. يقوم المنفذون بتنفيذ التطبيقات الإدارة الشاقة لمجمعات الخيوط وقوائم قوائم المهام. يفصل المنفذون أيضًا هذه المفاهيم بعيدًا عن رمز التطبيق ، من خلال توفير واجهة برمجة تطبيقات موحدة لإنشاء مهام وجدولة.
تتواصل المهام مع بعضها البعض باستخدام كائنات النتائج . كائن النتيجة هو أنبوب غير متزامن يمر بالنتيجة غير المتزامنة لمهمة إلى مهمة أخرى مستمرة. يمكن أن تنتظر النتائج وحلها بطريقة غير محظورة.
هذه المفاهيم الثلاثة - المهمة ، المنفذ والنتيجة المرتبطة بها هي اللبنات الأساسية لـ Concurrencpp. يقوم المنفذون بتشغيل المهام التي تتواصل مع بعضهم البعض عن طريق إرسال النتائج من خلال نتائج النتائج. تعمل المهام والمنفذون وكائنات النتائج معًا بشكل متماثل لإنتاج رمز متزامن سريع ونظيف.
تم بناء Concurrencpp حول مفهوم RAII. من أجل استخدام المهام والمنفذين ، تنشئ التطبيقات مثيل runtime في بداية الوظيفة main . ثم يتم استخدام وقت التشغيل لاكتساب المنفذين الحاليين وتسجيل منفذيين جدد المعرفة من قبل المستخدم. يتم استخدام المنفذين لإنشاء وتحديد موعد تشغيل المهام ، وقد يعيدون كائن result يمكن استخدامه لتمرير النتيجة غير المتزامنة إلى مهمة أخرى تعمل كمستهلك. عندما يتم تدمير وقت التشغيل ، فإنه يتكرر على كل منفذ مخزن ويطلق على طريقة shutdown . كل منفذ ثم يخرج برشاقة. يتم تدمير المهام غير المجدولة ، ومحاولات إنشاء مهام جديدة ستلقي استثناء.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::runtime runtime;
auto result = runtime. thread_executor ()-> submit ([] {
std::cout << " hello world " << std::endl;
});
result. get ();
return 0 ;
} في هذا المثال الأساسي ، أنشأنا كائن وقت تشغيل ، ثم حصلنا على مؤشر ترابط المنفذ من وقت التشغيل. استخدمنا submit لتمرير Lambda كما لدينا قابلة للاتصال. يعيد هذا lambda void ، وبالتالي ، يعيد المنفذ result<void> التي تمرر النتيجة غير المتزامنة مرة أخرى إلى المتصل. get المكالمات main التي تحظر الخيط الرئيسي حتى تصبح النتيجة جاهزة. إذا لم يتم إلقاء استثناء ، get عودة void . إذا تم إلقاء استثناء ، get إعادة التهوية. بشكل غير متزامن ، يقوم thread_executor بتشغيل مؤشر ترابط جديد من التنفيذ ويدير Lambda المعطى. انها ضمنيا co_return void وينتهي المهمة. ثم يتم إلغاء main .
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < vector >
# include < algorithm >
# include < ctime >
using namespace concurrencpp ;
std::vector< int > make_random_vector () {
std::vector< int > vec ( 64 * 1'024 );
std::srand ( std::time ( nullptr ));
for ( auto & i : vec) {
i = :: rand ();
}
return vec;
}
result< size_t > count_even (std::shared_ptr<thread_pool_executor> tpe, const std::vector< int >& vector) {
const auto vecor_size = vector. size ();
const auto concurrency_level = tpe-> max_concurrency_level ();
const auto chunk_size = vecor_size / concurrency_level;
std::vector<result< size_t >> chunk_count;
for ( auto i = 0 ; i < concurrency_level; i++) {
const auto chunk_begin = i * chunk_size;
const auto chunk_end = chunk_begin + chunk_size;
auto result = tpe-> submit ([&vector, chunk_begin, chunk_end]() -> size_t {
return std::count_if (vector. begin () + chunk_begin, vector. begin () + chunk_end, []( auto i) {
return i % 2 == 0 ;
});
});
chunk_count. emplace_back ( std::move (result));
}
size_t total_count = 0 ;
for ( auto & result : chunk_count) {
total_count += co_await result;
}
co_return total_count;
}
int main () {
concurrencpp::runtime runtime;
const auto vector = make_random_vector ();
auto result = count_even (runtime. thread_pool_executor (), vector);
const auto total_count = result. get ();
std::cout << " there are " << total_count << " even numbers in the vector " << std::endl;
return 0 ;
} في هذا المثال ، نبدأ البرنامج عن طريق إنشاء كائن وقت التشغيل. نقوم بإنشاء متجه مملوء بأرقام عشوائية ، ثم نحصل على thread_pool_executor من وقت التشغيل والمكالمات count_even . count_even هو coroutine الذي يولد المزيد من المهام و co_await s حتى ينتهي في الداخل. max_concurrency_level إرجاع الحد الأقصى للمبلغ من العمال الذي يدعمه المنفذ ، في حالة Executor Threadpool ، يتم حساب عدد العمال من عدد النوى. ثم نقوم بتقسيم الصفيف لتتناسب مع عدد العمال وإرسال كل قطعة لتتم معالجتها في مهمتها الخاصة. بشكل غير متزامن ، يحسب العمال عدد الأرقام الزوجية التي تحتوي عليها كل قطعة ، co_return النتيجة. يلخص count_even كل نتيجة عن طريق سحب العدد باستخدام co_await ، والنتيجة النهائية هي بعد ذلك co_return ed. الخيط الرئيسي ، الذي تم حظره عن طريق استدعاء get هو إلغاء حظره ويتم إرجاع العدد الكلي. يطبع الرئيسي عدد الأرقام الزوجية ويقوم البرنامج بإنهاء برشاقة.
يمكن تقسيم كل عملية كبيرة أو معقدة إلى خطوات أصغر وقابلة للسلسلة. المهام هي عمليات غير متزامنة تنفذ تلك الخطوات الحسابية. يمكن أن تعمل المهام في أي مكان بمساعدة المنفذين. على الرغم من أنه يمكن إنشاء المهام من callables العادية (مثل functors و lambdas) ، يتم استخدام المهام في الغالب مع coroutines ، والتي تتيح التعليق السلس واستئناف. في Concurrencpp ، يتم تمثيل مفهوم المهمة من قبل concurrencpp::task Class. على الرغم من أن مفهوم المهمة أمر أساسي في Concurrenpp ، إلا أنه نادراً ما يتعين على التطبيقات إنشاء كائنات مهمة ومعالجتها نفسها ، حيث يتم إنشاء كائنات المهمة وجدولةها بواسطة وقت التشغيل بدون مساعدة خارجية.
يسمح Concurrencpp للتطبيقات بإنتاج واستهلاك Coroutines كطريقة رئيسية لإنشاء المهام. Concurrencpp يدعم كل من المهام المتحمسة والكسل.
تبدأ المهام المتحمسة في الجري في اللحظة التي يتم فيها استدعاءها. يوصى بهذا النوع من التنفيذ عندما تحتاج التطبيقات إلى إطلاق إجراء غير متزامن واستهلاك نتيجته لاحقًا (النار والاستهلاك لاحقًا) ، أو تجاهل النتيجة غير المتزامنة تمامًا (النار والنسيان).
يمكن للمهام الحريصة إرجاع result أو null_result . يخبر نوع الإرجاع result عن Coroutine لتمرير القيمة التي تم إرجاعها أو الاستثناء الذي تم إلقاؤه (النار والاستهلاك لاحقًا) بينما يخبر null_result Return the Coroutine بإسقاطه وتجاهل أي منها (النار والنساء).
يمكن أن تبدأ coroutines المتحمسين في الركض بشكل متزامن ، في موضوع المتصل. هذا النوع من coroutines يسمى "coroutines العادية". يمكن أن يبدأ Concurrencpp Conaterines أيضًا في الركض بالتوازي ، داخل المنفذ المعطى ، ويسمى هذا النوع من coroutines "coroutines الموازية".
المهام البطيئة ، من ناحية أخرى ، تبدأ في الجري فقط عند co_await ed. يوصى بهذا النوع من المهام عندما يُقصد من نتيجة المهمة استهلاكها فور إنشاء المهمة. المهام البطيئة ، التي يتم تأجيلها ، هي أكثر تحسينًا قليلاً لحالة الاستهلاك الفوري ، لأنها لا تحتاج إلى تزامن خاص من أجل تمرير النتيجة غير المتزامنة إلى المستهلك. قد يقوم المترجم أيضًا بتحسين بعض تخصيصات الذاكرة اللازمة لتشكيل وعد Coroutine الأساسي. لا يمكن إطلاق مهمة كسول وتنفيذ شيء آخر في هذه الأثناء-إن إطلاق كوروتين كسول الكسول يعني بالضرورة تعليق كوروتين المتصل. لن يتم استئناف Caller Coroutine إلا عند اكتمال كوروتين الكسول. يمكن للمهام البطيئة العودة فقط lazy_result .
يمكن تحويل المهام البطيئة إلى مهام حريصة عن طريق الاتصال lazy_result::run . تقوم هذه الطريقة بتشغيل المهمة البطيئة المضمّنة وإرجاع كائن result يراقب المهمة التي بدأت حديثًا. إذا كان المطورون غير متأكدين من نوع النتيجة التي يجب استخدامها ، يتم تشجيعهم على استخدام النتائج البطيئة ، حيث يمكن تحويلها إلى نتائج منتظمة (متحمسة) إذا لزم الأمر.
عندما تُرجع وظيفة أي من lazy_result أو result أو null_result وتحتوي على واحد على الأقل co_await أو co_return في جسمها ، فإن الوظيفة هي concurrencpp coroutine. كل concurrencpp concuroutine هو مهمة صالحة. في مثالنا ، على سبيل المثال أعلاه ، يعد count_even مثل هذا الكوروتين. لقد ولدت أولاً count_even ، ثم داخلها ، أنتج Executor Threadpool المزيد من مهام الأطفال (التي يتم إنشاؤها من callables العادية) ، والتي تم ربطها في النهاية باستخدام co_await .
Concurrencpp Executor هو كائن قادر على جدولة المهام وتشغيلها. يقوم المنفذون بتبسيط عمل إدارة الموارد مثل مؤشرات الترابط ومجمعات مؤشرات الترابط وقوائم المهام عن طريق فصلها عن رمز التطبيق. يوفر المنفذون طريقة موحدة لجدولة المهام وتنفيذها ، لأنهم جميعًا يمتدون concurrencpp::executor .
executor API class executor {
/*
Initializes a new executor and gives it a name.
*/
executor (std::string_view name);
/*
Destroys this executor.
*/
virtual ~executor () noexcept = default ;
/*
The name of the executor, used for logging and debugging.
*/
const std::string name;
/*
Schedules a task to run in this executor.
Throws concurrencpp::errors::runtime_shutdown exception if shutdown was called before.
*/
virtual void enqueue (concurrencpp::task task) = 0;
/*
Schedules a range of tasks to run in this executor.
Throws concurrencpp::errors::runtime_shutdown exception if shutdown was called before.
*/
virtual void enqueue (std::span<concurrencpp::task> tasks) = 0;
/*
Returns the maximum count of real OS threads this executor supports.
The actual count of threads this executor is running might be smaller than this number.
returns numeric_limits<int>::max if the executor does not have a limit for OS threads.
*/
virtual int max_concurrency_level () const noexcept = 0;
/*
Returns true if shutdown was called before, false otherwise.
*/
virtual bool shutdown_requested () const noexcept = 0;
/*
Shuts down the executor:
- Tells underlying threads to exit their work loop and joins them.
- Destroys unexecuted coroutines.
- Makes subsequent calls to enqueue, post, submit, bulk_post and
bulk_submit to throw concurrencpp::errors::runtime_shutdown exception.
- Makes shutdown_requested return true.
*/
virtual void shutdown () noexcept = 0;
/*
Turns a callable and its arguments into a task object and
schedules it to run in this executor using enqueue.
Arguments are passed to the task by decaying them first.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type , class ... argument_types>
void post (callable_type&& callable, argument_types&& ... arguments);
/*
Like post, but returns a result object that passes the asynchronous result.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type , class ... argument_types>
result<type> submit (callable_type&& callable, argument_types&& ... arguments);
/*
Turns an array of callables into an array of tasks and
schedules them to run in this executor using enqueue.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type >
void bulk_post (std::span<callable_type> callable_list);
/*
Like bulk_post, but returns an array of result objects that passes the asynchronous results.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type >
std::vector<concurrencpp::result<type>> bulk_submit (std::span<callable_type> callable_list);
};كما ذكر أعلاه ، يوفر Concurrencpp تنفيذي شائع الاستخدام. هذه الأنواع المنفذة هي:
Pool Pool Executor - منفذي للأغراض العامة يحافظ على مجموعة من المواضيع. يعد Executor Pool Executor مناسبًا للمهام القصيرة التي لا تحظى بوحدة المعالجة المركزية التي لا تمنع. يتم تشجيع الطلبات على استخدام هذا المنفذ باعتباره المنفذ الافتراضي للمهام غير الحظر. يوفر مجموعة مؤشرات الترابط Concurrencpp حقن مؤشر ترابط ديناميكي وموازنة العمل الديناميكي.
خلفية المنفذ - منفذ ThreadPool مع مجموعة أكبر من المواضيع. مناسب لإطلاق مهام الحظر القصيرة مثل استفسارات ملف IO و DB. ملاحظة مهمة: عند استهلاك النتائج التي تم إرجاعها هذا المنفذ عن طريق استدعاء submit و bulk_submit ، من المهم تبديل التنفيذ باستخدام resume_on إلى المنفذ المرتبط بوحدة المعالجة المركزية ، من أجل منع معالجة المهام المرتبطة بوحدة المعالجة المركزية داخل Background_Executor.
مثال:
auto result = background_executor.submit([] { /* some blocking action */ });
auto done_result = co_await result.resolve();
co_await resume_on (some_cpu_executor);
auto val = co_await done_result; // runs inside some_cpu_executorThread Executor - المنفذ يقوم بتشغيل كل مهمة enqueued لتشغيلها على مؤشر ترابط جديد من التنفيذ. لا يتم إعادة استخدام المواضيع. هذا المنفذ مفيد للمهام طويلة المدى ، مثل الكائنات التي تدير حلقة عمل ، أو عمليات الحظر الطويلة.
Executor Thread Thread - منفذ مؤشر ترابط واحد يحافظ على قائمة انتظار مهمة واحدة. مناسبة عندما تريد التطبيقات موضوع مخصص ينفذ العديد من المهام ذات الصلة.
Manual Executor - المنفذ الذي لا ينفذ coroutines بمفرده. يمكن لبرمجة التطبيق تنفيذ المهام التي سبق أن تم توجيهها يدويًا عن أساليب التنفيذ.
Devivable Executor - فئة أساسية للمنفذين المعرّفين للمستخدم. على الرغم من أن الوراثة مباشرة من concurrencpp::executor ممكنة ، فإن derivable_executor يستخدم نمط CRTP الذي يوفر بعض فرص التحسين للمترجم.
INLINE Executor - يستخدم بشكل رئيسي لتجاوز سلوك المنفذين الآخرين. إن توسيع مهمة ما يعادل استدعاءها مضمّنًا.
يتم تغليف الآلية العارية للمنفذ في طريقة enqueue . هذه الطريقة تضع مهمة للتنفيذ ولها حمولين زائدان: يحمل أحد الحمل الزائد كائن مهمة واحدة كوسيطة ، وآخر يتلقى نطاقًا من كائنات المهمة. يتم استخدام الحمل الزائد الثاني لإثبات مجموعة من المهام. هذا يسمح بجدولة الاستدلال بشكل أفضل وانخفاض الخلاف.
لا يتعين على التطبيقات الاعتماد على enqueue وحدها ، concurrencpp::executor واجهة برمجة تطبيقات لجدولة مستخدمي المستخدم عن طريق تحويلها إلى كائنات مهمة وراء الكواليس. يمكن للتطبيقات أن تطلب من المنفذين إرجاع كائن نتيجة يمرر النتيجة غير المتزامنة للاستدعاء المقدمة. يتم ذلك عن طريق الاتصال على executor::submit و executor::bulk_submit . submit يحصل على قابلة للاستدعاء ، ويعيد كائن نتيجة. executor::bulk_submit يحصل على span من callables ويعيد vector كائنات النتائج بطريقة مماثلة submit الأعمال. في كثير من الحالات ، لا تهتم التطبيقات بالقيمة أو الاستثناء غير المتزامن. في هذه الحالة ، يمكن للتطبيقات استخدام executor:::post و executor::bulk_post لتحديد موعد لمكالمة أو span من callables ليتم تنفيذه ، ولكن أيضًا يخبر المهمة بإسقاط أي قيمة يتم إرجاعها أو إلقاءها. عدم تمرير النتيجة غير المتزامنة أسرع من المرور ، ولكن بعد ذلك ليس لدينا طريقة لمعرفة الحالة أو نتيجة المهمة المستمرة.
post ، bulk_post ، submit و bulk_submit استخدم enqueue خلف الكواليس لآلية الجدولة الأساسية.
thread_pool_executor API بصرف النظر عن post ، submit ، bulk_post و bulk_submit ، يوفر thread_pool_executor هذه الطرق الإضافية.
class thread_pool_executor {
/*
Returns the number of milliseconds each thread-pool worker
remains idle (lacks any task to execute) before exiting.
This constant can be set by passing a runtime_options object
to the constructor of the runtime class.
*/
std::chrono::milliseconds max_worker_idle_time () const noexcept ;
};manual_executor API بصرف النظر عن post ، submit ، bulk_post و bulk_submit ، يوفر manual_executor هذه الطرق الإضافية.
class manual_executor {
/*
Destructor. Equivalent to clear.
*/
~manual_executor () noexcept ;
/*
Returns the number of enqueued tasks at the moment of invocation.
This number can change quickly by the time the application handles it, it should be used as a hint.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
size_t size () const noexcept ;
/*
Queries whether the executor is empty from tasks at the moment of invocation.
This value can change quickly by the time the application handles it, it should be used as a hint.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
bool empty () const noexcept ;
/*
Clears the executor from any enqueued but yet to-be-executed tasks,
and returns the number of cleared tasks.
Tasks enqueued to this executor by (post_)submit method are resumed
and errors::broken_task exception is thrown inside them.
Ongoing tasks that are being executed by loop_once(_XXX) or loop(_XXX) are uneffected.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t clear ();
/*
Tries to execute a single task. If at the moment of invocation the executor
is empty, the method does nothing.
Returns true if a task was executed, false otherwise.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool loop_once ();
/*
Tries to execute a single task.
This method returns when either a task was executed or max_waiting_time
(in milliseconds) has reached.
If max_waiting_time is 0, the method is equivalent to loop_once.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool loop_once_for (std::chrono::milliseconds max_waiting_time);
/*
Tries to execute a single task.
This method returns when either a task was executed or timeout_time has reached.
If timeout_time has already expired, this method is equivalent to loop_once.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
bool loop_once_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Tries to execute max_count enqueued tasks and returns the number of tasks that were executed.
This method does not wait: it returns when the executor
becomes empty from tasks or max_count tasks have been executed.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t loop ( size_t max_count);
/*
Tries to execute max_count tasks.
This method returns when either max_count tasks were executed or a
total amount of max_waiting_time has passed.
If max_waiting_time is 0, the method is equivalent to loop.
Returns the actual amount of tasks that were executed.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t loop_for ( size_t max_count, std::chrono::milliseconds max_waiting_time);
/*
Tries to execute max_count tasks.
This method returns when either max_count tasks were executed or timeout_time has reached.
If timeout_time has already expired, the method is equivalent to loop.
Returns the actual amount of tasks that were executed.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
size_t loop_until ( size_t max_count, std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Waits for at least one task to be available for execution.
This method should be used as a hint,
as other threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
void wait_for_task ();
/*
This method returns when one or more tasks are available for
execution or max_waiting_time has passed.
Returns true if at at least one task is available for execution, false otherwise.
This method should be used as a hint, as other threads (calling loop, for example)
might empty the executor, before this thread has a chance to do something
with the newly enqueued tasks.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool wait_for_task_for (std::chrono::milliseconds max_waiting_time);
/*
This method returns when one or more tasks are available for execution or timeout_time has reached.
Returns true if at at least one task is available for execution, false otherwise.
This method should be used as a hint,
as other threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
bool wait_for_task_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
This method returns when max_count or more tasks are available for execution.
This method should be used as a hint, as other threads
(calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
void wait_for_tasks ( size_t max_count);
/*
This method returns when max_count or more tasks are available for execution
or max_waiting_time (in milliseconds) has passed.
Returns the number of tasks available for execution when the method returns.
This method should be used as a hint, as other
threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t wait_for_tasks_for ( size_t count, std::chrono::milliseconds max_waiting_time);
/*
This method returns when max_count or more tasks are available for execution
or timeout_time is reached.
Returns the number of tasks available for execution when the method returns.
This method should be used as a hint, as other threads
(calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
size_t wait_for_tasks_until ( size_t count, std::chrono::time_point<clock_type, duration_type> timeout_time);
}; يمكن استهلاك القيم والاستثناءات غير المتزامنة باستخدام كائنات نتائج concurrencpp. يمثل نوع result النتيجة غير المتزامنة لمهمة حريصة بينما يمثل lazy_result النتيجة المؤجلة لمهمة كسول.
عند اكتمال مهمة (متحمس أو كسول) ، فإنها إما إرجاع قيمة صالحة أو ترمي استثناء. في كلتا الحالتين ، يتم تمرير هذه النتيجة غير المتزامنة إلى المستهلك لكائن النتيجة.
تشكل كائنات result coroutines غير متماثلة-تنفيذ كوروتين المتصل لا يتأثر بتنفيذ Callee-coroutine ، يمكن أن يعمل كلا coroutines بشكل مستقل. فقط عند استهلاك نتيجة Callee-coroutine ، قد يتم تعليق الكوروتين المتصل في انتظار إكمال Callee. حتى هذه النقطة تعمل كلا coroutines بشكل مستقل. يعمل Callee-coroutine سواء تم استهلاك النتيجة أم لا.
تشكل كائنات lazy_result coroutines متماثلة-لا يحدث تنفيذ Callee-coroutine إلا بعد تعليق Caller-Coroutine. عند انتظار نتيجة كسول ، يتم تعليق coroutine الحالي وتبدأ المهمة البطيئة المرتبطة بالنتيجة الكسول في التشغيل. بعد الانتهاء من Callee-coroutine ويعطي نتيجة ، يتم استئناف كوروتين المتصل. إذا لم يتم استهلاك نتيجة كسول ، فلن تبدأ مهمتها البطيئة المرتبطة بها أبدًا.
جميع كائنات النتائج هي نوع خطوة فقط ، وعلى هذا النحو ، لا يمكن استخدامها بعد نقل محتوىها إلى كائن نتيجة آخر. في هذه الحالة ، يعتبر كائن النتيجة فارغًا ويحاول استدعاء أي طريقة أخرى غير operator bool operator = سوف يرمي استثناء.
بعد سحب النتيجة غير المتزامنة من كائن النتيجة (على سبيل المثال ، عن طريق get أو operator co_await ) ، يصبح كائن النتيجة فارغًا. يمكن اختبار الفراغ مع operator bool .
في انتظار النتيجة يعني تعليق coroutine الحالي حتى يصبح كائن النتيجة جاهزًا. إذا تم إرجاع قيمة صالحة من المهمة المرتبطة بها ، يتم إرجاعها من كائن النتيجة. إذا كانت المهمة المرتبطة بها تلقي استثناءً ، فسيتم إعادة تشكيلها. في لحظة الانتظار ، إذا كانت النتيجة جاهزة بالفعل ، يستأنف Coroutine الحالي على الفور. خلاف ذلك ، يتم استئنافها بواسطة الخيط الذي يحدد النتيجة أو الاستثناء غير المتزامن.
حل النتيجة يشبه في انتظار ذلك. الفرق هو أن تعبير co_await سيعيد كائن النتيجة نفسه ، في شكل غير فارغ ، في حالة جاهزة. يمكن بعد ذلك سحب النتيجة غير المتزامنة باستخدام get أو co_await .
كل كائن نتيجة لديه حالة تشير إلى حالة النتيجة غير المتزامنة. تختلف حالة النتيجة من result_status::idle (النتيجة غير المتزامنة أو الاستثناء لم يتم إنتاجها بعد) إلى result_status::value (تم إنهاء المهمة المرتبطة بأمان من خلال إرجاع قيمة صالحة) إلى result_status::exception (المهمة التي تم إنهاءها عن طريق إلقاء استثناء). يمكن الاستعلام عن الحالة عن طريق الاتصال (lazy_)result::status .
result يمثل نوع result نتيجة لمهمة مستمرة غير متزامنة ، على غرار std::future .
بصرف النظر عن انتظار وحل النتائج ، يمكن أيضًا انتظارها عن طريق الاتصال بأي من result::wait ، result::wait_for ، result::wait_until أو result::get . في انتظار الانتهاء من النتيجة هي عملية حظر (في حالة أن النتيجة غير المتزامنة غير جاهزة) ، وسوف تعلق مؤشر ترابط التنفيذ بالكامل في انتظار أن تصبح النتيجة غير المتزامنة متوفرة. يتم تثبيط عمليات الانتظار عمومًا ولا يُسمح بها إلا في المهام على مستوى الجذر أو في سياقات تسمح بذلك ، مثل حظر الخيط الرئيسي في انتظار بقية التطبيق حتى ينتهي برشاقة ، أو باستخدام concurrencpp::blocking_executor أو concurrencpp::thread_executor .
إن انتظار كائنات النتائج باستخدام co_await (وبقيامها بذلك ، فإن تحويل الوظيفة/المهمة الحالية إلى coroutine أيضًا) هو الطريقة المفضلة لاستهلاك كائنات النتائج ، لأنها لا تمنع مؤشرات الترابط الأساسية.
result API class result {
/*
Creates an empty result that isn't associated with any task.
*/
result () noexcept = default ;
/*
Destroys the result. Associated tasks are not cancelled.
The destructor does not block waiting for the asynchronous result to become ready.
*/
~result () noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result (result&& rhs) noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.
*/
result& operator = (result&& rhs) noexcept = default ;
/*
Returns true if this is a non-empty result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The returned value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Blocks the current thread of execution until this result is ready,
when status() != result_status::idle.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void wait ();
/*
Blocks until this result is ready or duration has passed. Returns the status
of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class duration_unit , class ratio >
result_status wait_for (std::chrono::duration<duration_unit, ratio> duration);
/*
Blocks until this result is ready or timeout_time has reached. Returns the status
of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class clock , class duration >
result_status wait_until (std::chrono::time_point<clock, duration> timeout_time);
/*
Blocks the current thread of execution until this result is ready,
when status() != result_status::idle.
If the result is a valid value, it is returned, otherwise, get rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
type get ();
/*
Returns an awaitable used to await this result.
If the result is already ready - the current coroutine resumes
immediately in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended
and resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to resolve this result.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
};lazy_resultيمثل كائن نتيجة كسول نتيجة مهمة كسول مؤجلة.
تتحمل lazy_result مسؤولية بدء المهمة البطيئة المرتبطة بها وتمرير نتائجها المؤجلة إلى المستهلك. عندما تنتظر أو حل ، تعلق النتيجة الكسولة Coroutine الحالية وتبدأ المهمة البطيئة المرتبطة بها. عند اكتمال المهمة المرتبطة ، يتم تمرير قيمتها غير المتزامنة إلى مهمة المتصل ، والتي يتم استئنافها بعد ذلك.
في بعض الأحيان ، قد تُرجع واجهة برمجة التطبيقات إلى نتيجة كسول ، لكن التطبيقات تحتاج إلى مهمتها المرتبطة بالركض بفارغ الصبر (دون تعليق مهمة المتصل). في هذه الحالة ، يمكن تحويل المهام البطيئة إلى مهام حريصة عن طريق استدعاء run على النتيجة البطيئة المرتبطة بها. في هذه الحالة ، ستبدأ المهمة المرتبطة في تشغيلها ، دون تعليق مهمة المتصل. يتم إفراغ النتيجة البطيئة الأصلية وكائن result صالح يراقب المهمة التي تم البدء بها حديثًا بدلاً من ذلك.
lazy_result API class lazy_result {
/*
Creates an empty lazy result that isn't associated with any task.
*/
lazy_result () noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
lazy_result (lazy_result&& rhs) noexcept ;
/*
Destroys the result. If not empty, the destructor destroys the associated task without resuming it.
*/
~lazy_result () noexcept ;
/*
Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.
If *this is not empty, then operator= destroys the associated task without resuming it.
*/
lazy_result& operator =(lazy_result&& rhs) noexcept ;
/*
Returns true if this is a non-empty result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The returned value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Returns an awaitable used to start the associated task and await this result.
If the result is already ready - the current coroutine resumes immediately
in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended and
resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to start the associated task and resolve this result.
If the result is already ready - the current coroutine resumes immediately
in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended and resumed
when the asynchronous result is ready, by the thread which
had set the asynchronous value or exception.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
/*
Runs the associated task inline and returns a result object that monitors the newly started task.
After this call, *this is empty.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
*/
result<type> run ();
};تبدأ coroutines العادية العادية في الركض بشكل متزامن في مؤشر ترابط الاستدعاء للتنفيذ. قد يتحول التنفيذ إلى مؤشر ترابط آخر من التنفيذ إذا خضع coroutine لإعادة جدولة ، على سبيل المثال من خلال انتظار كائن نتيجة غير مستعدين بداخله. يوفر Concurrencpp أيضًا coroutines متوازية ، والتي تبدأ في الركض داخل منفذ معين ، وليس في مؤشر ترابط التنفيذ. هذا النمط من جدولة coroutines مفيد بشكل خاص عند كتابة الخوارزميات المتوازية والخوارزميات العودية والخوارزميات المتزامنة التي تستخدم نموذج الشوكة.
يجب أن تفي كل كوروتين متوازي بالشهور المسبقة التالية:
result / null_result .executor_tag كوسيطة لها.type* / type& / std::shared_ptr<type> ، حيث يكون type فئة ملموسة من executor كوسيطة ثانية له.co_await أو co_return في الجسم. إذا تم تطبيق كل ما سبق ، فإن الوظيفة هي coroutine متوازية: ستبدأ Concurrencpp من Coroutine معلقًا وإعادة جدولةها على الفور لتشغيلها في المنفذ المقدم. concurrencpp::executor_tag هو عنصر نائب وهمي لإخبار وقت تشغيل Concurrencpp بأن هذه الوظيفة ليست وظيفة منتظمة ، فهي تحتاج إلى بدء تشغيل داخل المنفذ المحدد. إذا كان المنفذ قد تم تمريره إلى coroutine الموازي خالية ، فلن يبدأ Coroutine في الركض وسيتم إلقاء استثناء std::invalid_argument بشكل متزامن. إذا تم استيفاء جميع الشروط المسبقة ، يمكن للتطبيقات أن تستهلك نتيجة coroutine المتوازية باستخدام كائن النتيجة التي تم إرجاعها.
في هذا المثال ، نقوم بحساب العضو 30 من تسلسل فيبوناتشي بطريقة موازية. نبدأ في إطلاق كل خطوة فيبوناتشي في كوروتين المتوازي. الوسيطة الأولى هي Dummy executor_tag والوسيطة الثانية هي Executor Threadpool. كل خطوة متكررة تستدعي كوروتين متوازي جديد يعمل بالتوازي. كل نتيجة هي co_return إلى مهمتها الأم واكتسبه باستخدام co_await .
عندما نرى أن المدخلات صغيرة بما يكفي ليتم حسابها بشكل متزامن (عندما يكون curr <= 10 ) ، نتوقف عن تنفيذ كل خطوة متكررة في مهمتها الخاصة ونحل الخوارزمية بشكل متزامن.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace concurrencpp ;
int fibonacci_sync ( int i) {
if (i == 0 ) {
return 0 ;
}
if (i == 1 ) {
return 1 ;
}
return fibonacci_sync (i - 1 ) + fibonacci_sync (i - 2 );
}
result< int > fibonacci (executor_tag, std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10 ) {
co_return fibonacci_sync (curr);
}
auto fib_1 = fibonacci ({}, tpe, curr - 1 );
auto fib_2 = fibonacci ({}, tpe, curr - 2 );
co_return co_await fib_1 + co_await fib_2;
}
int main () {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci ({}, runtime. thread_pool_executor (), 30 ). get ();
std::cout << " fibonacci(30) = " << fibb_30 << std::endl;
return 0 ;
} للمقارنة ، هذه هي الطريقة التي يتم بها كتابة نفس الرمز دون استخدام coroutines المتوازية ، والاعتماد على executor::submit وحده. نظرًا لأن fibonacci يعيد result<int> ، فإن إرسالها بشكل متكرر عبر executor::submit سيؤدي إلى result<result<int>> .
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace concurrencpp ;
int fibonacci_sync ( int i) {
if (i == 0 ) {
return 0 ;
}
if (i == 1 ) {
return 1 ;
}
return fibonacci_sync (i - 1 ) + fibonacci_sync (i - 2 );
}
result< int > fibonacci (std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10 ) {
co_return fibonacci_sync (curr);
}
auto fib_1 = tpe-> submit (fibonacci, tpe, curr - 1 );
auto fib_2 = tpe-> submit (fibonacci, tpe, curr - 2 );
co_return co_await co_await fib_1 +
co_await co_await fib_2;
}
int main () {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci (runtime. thread_pool_executor (), 30 ). get ();
std::cout << " fibonacci(30) = " << fibb_30 << std::endl;
return 0 ;
} كائنات النتيجة هي الطريقة الرئيسية لتمرير البيانات بين المهام في Concurrencpp ورأينا كيف ينتج المنفذون والكوروتين مثل هذه الكائنات. في بعض الأحيان نريد استخدام إمكانات كائنات النتائج مع غير المهام ، على سبيل المثال عند استخدام مكتبة طرف ثالث. في هذه الحالة ، يمكننا إكمال كائن نتيجة باستخدام result_promise . result_promise يشبه كائن std::promise - يمكن للتطبيقات تعيين النتيجة أو الاستثناء غير المتزامن يدويًا وجعل كائن result المرتبطة جاهزًا.
تمامًا مثل كائنات النتائج ، تعتبر برامج النتيجة عبارة عن نوع تحرك فقط يصبح فارغًا بعد التحرك. وبالمثل ، بعد تحديد نتيجة أو استثناء ، يصبح وعد النتيجة فارغًا أيضًا. إذا خرجت النتيجة عن النطاق ولم يتم تعيين أي نتيجة/استثناء ، فإن Destructor الناتج عن النتيجة يقوم بتعيين استثناء concurrencpp::errors::broken_task باستخدام طريقة set_exception . يتم استئناف/إلغاء حظر المهام المعلقة والممنوعة في انتظار كائن النتيجة المرتبطة.
تتمكن وعود النتيجة من تحويل نمط رد الاتصال من الكود إلى نمط رمز async/await : كلما كان المكون يتطلب ردًا لتمرير النتيجة غير المتزامنة ، يمكننا تمرير رد اتصال يدعو set_result أو set_exception (اعتمادًا على النتيجة غير المتزامنة نفسها) على الوعد الناتج الناتج عن النتيجة ، وإعادة النتيجة المرتبطة.
result_promise API template < class type >
class result_promise {
/*
Constructs a valid result_promise.
Might throw std::bad_alloc if fails to allocate memory.
*/
result_promise ();
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result_promise (result_promise&& rhs) noexcept ;
/*
Destroys *this, possibly setting an errors::broken_task exception
by calling set_exception if *this is not empty at the time of destruction.
*/
~result_promise () noexcept ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result_promise& operator = (result_promise&& rhs) noexcept ;
/*
Returns true if this is a non-empty result-promise.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Sets a value by constructing <<type>> from arguments... in-place.
Makes the associated result object become ready - tasks waiting for it
to become ready are unblocked.
Suspended tasks are resumed inline.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Might throw any exception that the constructor
of type(std::forward<argument_types>(arguments)...) throws.
*/
template < class ... argument_types>
void set_result (argument_types&& ... arguments);
/*
Sets an exception.
Makes the associated result object become ready - tasks waiting for it
to become ready are unblocked.
Suspended tasks are resumed inline.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Throws std::invalid_argument exception if exception_ptr is null.
*/
void set_exception (std::exception_ptr exception_ptr);
/*
A convenience method that invokes a callable with arguments... and calls set_result
with the result of the invocation.
If an exception is thrown, the thrown exception is caught and set instead by calling set_exception.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Might throw any exception that callable(std::forward<argument_types>(arguments)...)
or the contructor of type(type&&) throw.
*/
template < class callable_type , class ... argument_types>
void set_from_function (callable_type&& callable, argument_types&& ... arguments);
/*
Gets the associated result object.
Throws errors::empty_result_promise exception If *this is empty.
Throws errors::result_already_retrieved exception if this method had been called before.
*/
result<type> get_result ();
};result_promise : مثال: في هذا المثال ، يتم استخدام result_promise لدفع البيانات من مؤشر ترابط واحد ، ويمكن سحبه من كائن result المرتبط به من مؤشر ترابط آخر.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::result_promise<std::string> promise;
auto result = promise. get_result ();
std::thread my_3_party_executor ([promise = std::move (promise)] () mutable {
std::this_thread::sleep_for ( std::chrono::seconds ( 1 )); // Imitate real work
promise. set_result ( " hello world " );
});
auto asynchronous_string = result. get ();
std::cout << " result promise returned string: " << asynchronous_string << std::endl;
my_3_party_executor. join ();
} في هذا المثال ، نستخدم std::thread تابع لجهة خارجية. يمثل هذا سيناريو عند استخدام المنفذ غير المتواصل كجزء من دورة حياة التطبيق. نستخرج كائن النتيجة قبل أن نمر بالوعد وحظر الخيط الرئيسي حتى تصبح النتيجة جاهزة. في my_3_party_executor ، قمنا بتعيين نتيجة كما لو أننا co_return ed.
النتائج المشتركة هي نوع خاص من كائنات النتائج التي تسمح للعديد من المستهلكين بالوصول إلى النتيجة غير المتزامنة ، على غرار std::shared_future . يمكن للمستهلكين المختلفين من مؤشرات الترابط المختلفة استدعاء وظائف مثل await get resolve بطريقة آمنة لخيط.
تم تصميم النتائج المشتركة من كائنات النتائج العادية وعلى عكس كائنات النتائج العادية ، فهي قابلة للنسخ والتحريك. على هذا النحو ، يتصرف shared_result مثل نوع std::shared_ptr . إذا تم نقل مثيل نتيجة مشتركة إلى مثيل آخر ، يصبح المثيل فارغًا ، ومحاولة الوصول إليه ، فستقوم بإلقاء استثناء.
من أجل دعم العديد من المستهلكين ، تُرجع النتائج المشتركة إشارة إلى القيمة غير المتزامنة بدلاً من تحريكها (مثل النتائج العادية). على سبيل المثال ، يتم استدعاء A shared_result<int> int& get ، await وما إلى ذلك. إذا كان النوع الأساسي من shared_result void أو نوعًا مرجعيًا (مثل int& ) ، يتم إرجاعها كالمعتاد. إذا كانت النتيجة غير المتزامنة هي استثمارها ، فسيتم إعادة تشكيلها.
لاحظ أنه أثناء الحصول على النتيجة غير المتزامنة باستخدام shared_result من مؤشرات الترابط المتعددة ، فإن القيمة الآمنة مؤلمة ، قد لا تكون القيمة الفعلية آمنة. على سبيل المثال ، يمكن لخيوط متعددة الحصول على عدد صحيح غير متزامن عن طريق تلقي مرجعها ( int& ). لا يجعل عدد صحيح نفسه آمن. لا بأس في تحوير القيمة غير المتزامنة إذا كانت القيمة غير المتزامنة آمنة بالفعل. بدلاً من ذلك ، يتم تشجيع التطبيقات على استخدام أنواع const لتبدأ (مثل const int ) ، والحصول على مراجع ثابتة (مثل const int& ) التي تمنع الطفرة.
shared_result class share_result {
/*
Creates an empty shared-result that isn't associated with any task.
*/
shared_result () noexcept = default ;
/*
Destroys the shared-result. Associated tasks are not cancelled.
The destructor does not block waiting for the asynchronous result to become ready.
*/
~shared_result () noexcept = default ;
/*
Converts a regular result object to a shared-result object.
After this call, rhs is empty.
Might throw std::bad_alloc if fails to allocate memory.
*/
shared_result (result<type> rhs);
/*
Copy constructor. Creates a copy of the shared result object that monitors the same task.
*/
shared_result ( const shared_result&) noexcept = default ;
/*
Move constructor. Moves rhs to *this. After this call, rhs is empty.
*/
shared_result (shared_result&& rhs) noexcept = default ;
/*
Copy assignment operator. Copies rhs to *this and monitors the same task that rhs monitors.
*/
shared_result& operator =( const shared_result& rhs) noexcept ;
/*
Move assignment operator. Moves rhs to *this. After this call, rhs is empty.
*/
shared_result& operator =(shared_result&& rhs) noexcept ;
/*
Returns true if this is a non-empty shared-result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The return value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Blocks the current thread of execution until this shared-result is ready,
when status() != result_status::idle.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void wait ();
/*
Blocks until this shared-result is ready or duration has passed.
Returns the status of this shared-result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class duration_type , class ratio_type >
result_status wait_for (std::chrono::duration<duration_type, ratio_type> duration);
/*
Blocks until this shared-result is ready or timeout_time has reached.
Returns the status of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class clock_type , class duration_type >
result_status wait_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Blocks the current thread of execution until this shared-result is ready,
when status() != result_status::idle.
If the result is a valid value, a reference to it is returned,
otherwise, get rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
std:: add_lvalue_reference_t <type> get ();
/*
Returns an awaitable used to await this shared-result.
If the shared-result is already ready - the current coroutine resumes
immediately in the calling thread of execution.
If the shared-result is not ready yet, the current coroutine is
suspended and resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, a reference to it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to resolve this shared-result.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
};shared_result : في هذا المثال ، يتم تحويل كائن result إلى كائن shared_result ويتم الحصول على مرجع إلى نتيجة int غير المتزامنة بواسطة العديد من المهام التي تسببت مع thread_executor .
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < chrono >
concurrencpp::result< void > consume_shared_result (concurrencpp::shared_result< int > shared_result,
std::shared_ptr<concurrencpp::executor> resume_executor) {
std::cout << " Awaiting shared_result to have a value " << std::endl;
const auto & async_value = co_await shared_result;
concurrencpp::resume_on (resume_executor);
std::cout << " In thread id " << std::this_thread::get_id () << " , got: " << async_value << " , memory address: " << &async_value << std::endl;
}
int main () {
concurrencpp::runtime runtime;
auto result = runtime. background_executor ()-> submit ([] {
std::this_thread::sleep_for ( std::chrono::seconds ( 1 ));
return 100 ;
});
concurrencpp::shared_result< int > shared_result ( std::move (result));
concurrencpp::result< void > results[ 8 ];
for ( size_t i = 0 ; i < 8 ; i++) {
results[i] = consume_shared_result (shared_result, runtime. thread_pool_executor ());
}
std::cout << " Main thread waiting for all consumers to finish " << std::endl;
auto tpe = runtime. thread_pool_executor ();
auto all_consumed = concurrencpp::when_all (tpe, std::begin (results), std::end (results)). run ();
all_consumed. get ();
std::cout << " All consumers are done, exiting " << std::endl;
return 0 ;
} عندما يخرج كائن وقت التشغيل عن نطاق main ، فإنه يكرر كل منفذ مخزن ويطلق على طريقة shutdown . محاولة الوصول إلى المؤقت أو أي منفذ سوف يرمي errors::runtime_shutdown . عندما يغلق المنفذ ، يقوم بمسح قوائم انتظار المهام الداخلية ، مما يدمر كائنات task غير تنفذ. إذا قام كائن مهمة بتخزين concurrencpp-coroutine ، يتم استئناف هذا coroutine داخل الخط ويتم إلقاء استثناء errors::broken_task بداخله. في أي حال من الأحوال التي يتم فيها إلقاء استثناء runtime_shutdown أو broken_task ، يجب أن تنهي التطبيقات تدفق الكود الحالي بأمان في أقرب وقت ممكن. لا ينبغي تجاهل هذه الاستثناءات. يرث كل من runtime_shutdown و broken_task من فئة Base errors::interrupted_task ، ويمكن أيضًا استخدام هذا النوع في جملة catch للتعامل مع الإنهاء بطريقة موحدة.
تتطلب العديد من الإجراءات غير المتزامنة Concurrencpp مثيلًا للمنفذ كمنفذ استئناف . عندما يمكن أن تنتهي إجراء غير متزامن (تم تنفيذه كـ coroutine) بشكل متزامن ، فإنه يستأنف على الفور في مؤشر ترابط الاستدعاء للتنفيذ. إذا لم يتمكن الإجراء غير المتزامن بشكل متزامن ، فسيتم استئنافه عند الانتهاء ، داخل المستأنف المعطى. على سبيل المثال ، تتطلب وظيفة الأداة المساعدة when_any تتطلب دالة الأداة المساعدة مثيلًا لاستئناف التنفيذ كوسيطة أولى. when_any تُرجع _any lazy_result التي تصبح جاهزة عندما تصبح نتيجة واحدة على الأقل جاهزة. إذا كانت إحدى النتائج جاهزة بالفعل في لحظة الاتصال when_any ، يتم استئناف استدعاء coroutine بشكل متزامن في مؤشر ترابط الاستدعاء للتنفيذ. إذا لم يكن الأمر كذلك ، فسيتم استئناف Call Coroutine عند الانتهاء من النتيجة على الأقل ، داخل المستأنف المعطى. يعد منفذي السيرة الذاتية مهمين لأنهم يفرضون على مكان استئناف coroutines في الحالات التي لا يكون من المفترض فيها استئناف Coroutine (على سبيل المثال ، في حالة when_any و when_all ) ، أو في الحالات التي تتم فيها معالجة الإجراء غير المتزامن داخل أحد عمال Concurrencpp ، والتي يتم استخدامها فقط لمعالجة هذا الإجراء المحدد ، وليس رمز التطبيق.
make_ready_result make_ready_result ينشئ كائن نتيجة جاهزة من وسيطات معينة. إن انتظار هذه النتيجة سيؤدي إلى استئناف Coroutine الحالي على الفور. get and operator co_await ستعيد القيمة المصنفة.
/*
Creates a ready result object by building <<type>> from arguments&&... in-place.
Might throw any exception that the constructor
of type(std::forward<argument_types>(arguments)...) throws.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type , class ... argument_types>
result<type> make_ready_result (argument_types&& ... arguments);
/*
An overload for void type.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
result< void > make_ready_result ();make_exceptional_result وظيفة make_exceptional_result ينشئ كائن نتيجة جاهزة من استثناء معين. إن انتظار هذه النتيجة سيؤدي إلى استئناف Coroutine الحالي على الفور. get and operator co_await سيعيد إعادة تشكيل الاستثناء المعطى.
/*
Creates a ready result object from an exception pointer.
The returned result object will re-throw exception_ptr when calling get or await.
Throws std::invalid_argument if exception_ptr is null.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type >
result<type> make_exceptional_result (std::exception_ptr exception_ptr);
/*
Overload. Similar to make_exceptional_result(std::exception_ptr),
but gets an exception object directly.
Might throw any exception that the constructor of exception_type(std::move(exception)) might throw.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type , class exception_type >
result<type> make_exceptional_result (exception_type exception );when_all وظيفة _all when_all تكون _all وظيفة فائدة تنشئ كائن نتيجة كسول يصبح جاهزًا عند اكتمال جميع نتائج الإدخال. إن انتظار هذه النتيجة البطيئة تُرجع جميع كائنات الإدخال في حالة جاهزة ، جاهزة للاستهلاك.
when_all تأتي وظيفة _all بثلاثة نكهات - واحدة تقبل نطاقًا غير متجانس من كائنات النتائج ، وآخر يحصل على زوج من التكرار إلى مجموعة من كائنات النتائج من نفس النوع ، وأخيرًا تحميل زائد لا يقبل أي كائنات نتائج على الإطلاق. في حالة عدم وجود كائنات نتائج الإدخال - تقوم الدالة بإرجاع كائن نتيجة جاهزة لتوابل فارغة.
إذا كان أحد الأشياء التي تم تمريرها فارغة ، فسيتم طرح استثناء. في هذه الحالة ، لا تتأثر كائنات إدخال الإدخال بالوظيفة ويمكن استخدامها مرة أخرى بعد معالجة الاستثناء. إذا كانت جميع كائنات نتائج الإدخال صالحة ، يتم إفراغها بواسطة هذه الوظيفة ، وإعادتها في حالة صالحة وجاهزة كنتيجة للإخراج.
حاليا ، when_all يقبل فقط كائنات result .
جميع الأحمال الزائدة تقبل استئناف المنفذ كمعلمة الأولى. عند انتظار النتيجة التي تم إرجاعها بواسطة when_all ، سيتم استئناف المتصل Coroutine بواسطة منفذ السيرة الذاتية المحددة.
/*
Creates a result object that becomes ready when all the input results become ready.
Passed result objects are emptied and returned as a tuple.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class ... result_types>
lazy_result<std::tuple< typename std::decay<result_types>::type...>>
when_all (std::shared_ptr<executor_type> resume_executor,
result_types&& ... results);
/*
Overload. Similar to when_all(result_types&& ...) but receives a pair of iterators referencing a range.
Passed result objects are emptied and returned as a vector.
If begin == end, the function returns immediately with an empty vector.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class iterator_type >
lazy_result<std::vector< typename std::iterator_traits<iterator_type>::value_type>>
when_all (std::shared_ptr<executor_type> resume_executor,
iterator_type begin, iterator_type end);
/*
Overload. Returns a ready result object that doesn't monitor any asynchronous result.
Might throw an std::bad_alloc exception if no memory is available.
*/
lazy_result<std::tuple<>> when_all (std::shared_ptr<executor_type> resume_executor);when_any وظيفة _any when_any تكون _any وظيفة فائدة تنشئ كائن نتيجة كسول يصبح جاهزًا عند اكتمال نتيجة إدخال واحدة على الأقل. سيؤدي انتظار هذه النتيجة إلى إرجاع بنية مساعدة تحتوي على جميع كائنات إدخال الإدخال بالإضافة إلى فهرس المهمة المكتملة. قد يكون ذلك بحلول وقت استهلاك النتيجة الجاهزة ، ربما تكون نتائج أخرى قد أكملت بالفعل بشكل غير متزامن. يمكن أن تتصل التطبيقات when_any مرارًا وتكرارًا لاستهلاك نتائج جاهزة عند اكتمالها حتى يتم استهلاك جميع النتائج.
when_any تأتي وظيفة _any مع نكهات فقط - واحدة تقبل مجموعة غير متجانسة من كائنات النتائج والآخر يحصل على زوج من التكرار إلى مجموعة من النتائج الناتجة من نفس النوع. على عكس when_all ، لا يوجد معنى في انتظار مهمة واحدة على الأقل لإنهائها عندما يكون نطاق النتائج فارغًا تمامًا. وبالتالي ، لا يوجد تحميل زائد بدون حجج. وأيضًا ، فإن الحمل الزائد من اثنين من التكرار سوف يرمي استثناء إذا كان هؤلاء التكرار يشيرون إلى نطاق فارغ (عندما begin == end ).
إذا كان أحد الأشياء التي تم تمريرها فارغة ، فسيتم طرح استثناء. في أي حال ، يتم إلقاء استثناء ، لا تتأثر كائنات إدخال الإدخال بالوظيفة ويمكن استخدامها مرة أخرى بعد معالجة الاستثناء. إذا كانت جميع كائنات نتائج الإدخال صالحة ، يتم إفراغها بواسطة هذه الوظيفة ، وإعادتها في حالة صالحة كنتيجة مخرجات.
حاليا ، when_any يقبل فقط كائنات result .
جميع الأحمال الزائدة تقبل استئناف المنفذ كمعلمة الأولى. عند انتظار النتيجة التي تم إرجاعها بواسطة when_any ، سيتم استئناف المتصل Coroutine من قبل المنفذ السيرة الذاتية المحددة.
/*
Helper struct returned from when_any.
index is the position of the ready result in results sequence.
results is either an std::tuple or an std::vector of the results that were passed to when_any.
*/
template < class sequence_type >
struct when_any_result {
std:: size_t index;
sequence_type results;
};
/*
Creates a result object that becomes ready when at least one of the input results is ready.
Passed result objects are emptied and returned as a tuple.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class ... result_types>
lazy_result<when_any_result<std::tuple<result_types...>>>
when_any (std::shared_ptr<executor_type> resume_executor,
result_types&& ... results);
/*
Overload. Similar to when_any(result_types&& ...) but receives a pair of iterators referencing a range.
Passed result objects are emptied and returned as a vector.
Throws std::invalid_argument if begin == end.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class iterator_type >
lazy_result<when_any_result<std::vector< typename std::iterator_traits<iterator_type>::value_type>>>
when_any (std::shared_ptr<executor_type> resume_executor,
iterator_type begin, iterator_type end);resume_on وظيفة resume_on يعيد انتظارًا يعلق coroutine الحالي ويستأنفه داخل executor المعطى. هذه وظيفة مهمة تتأكد من تشغيل coroutine في المنفذ الصحيح. على سبيل المثال ، قد تحدد التطبيقات مهمة خلفية باستخدام background_executor وانتظار كائن النتيجة التي تم إرجاعها. في هذه الحالة ، سيتم استئناف كوروتين في انتظار داخل المنفذ الخلفي. تتأكد مكالمة resume_on مع منفذي آخر مقترن من وحدة المعالجة المركزية من عدم تشغيل خطوط التعليمات البرمجية المرتبطة بوحدة المعالجة المركزية على المنفذ الخلفية بمجرد اكتمال مهمة الخلفية. إذا تم إعادة جدولة المهمة لتشغيلها على منافس آخر باستخدام resume_on ، ولكن يتم إيقاف هذا المنفذ قبل أن يتمكن من استئناف المهمة المعلقة ، يتم استئناف هذه المهمة على الفور ويتم إلقاء استثناء erros::broken_task . في هذه الحالة ، تحتاج التطبيقات إلى برشاقة.
/*
Returns an awaitable that suspends the current coroutine and resumes it inside executor.
Might throw any exception that executor_type::enqueue throws.
*/
template < class executor_type >
auto resume_on (std::shared_ptr<executor_type> executor);يوفر Concurrencpp أيضًا أجهزة ضبط الوقت وقوائم مؤقت. أجهزة ضبط الوقت هي كائنات تحدد الإجراءات غير المتزامنة التي تعمل على المنفذ خلال فترة زمنية محددة جيدًا. هناك ثلاثة أنواع من أجهزة ضبط الوقت - أجهزة ضبط الوقت العادية ، والكرات البارزة وأشياء تأخير .
تحتوي أجهزة ضبط الوقت العادية على أربع عقارات تحددها:
مثل الكائنات الأخرى في concurrencpp ، فإن أجهزة ضبط الوقت هي نوع تحرك فقط يمكن أن يكون فارغًا. عندما يتم تدمير مؤقت أو timer::cancel إلغاء ، يلغي المؤقت مهامه المجدولة ولكن لم يتم تنفيذه بعد. المهام المستمرة غير متأثرة. يجب أن يكون المؤقت القابل للاتصال آمنًا. يوصى بتعيين الوقت الواجب وتواتر أجهزة ضبط الوقت إلى تفاصيل 50 ميلي ثانية.
قائمة انتظار الموقت هي عامل concurrencpp يدير مجموعة من أجهزة ضبط الوقت ومعالجتها في سلسلة واحدة فقط من التنفيذ. وهو أيضًا الوكيل المستخدم لإنشاء أجهزة ضبط الوقت الجديدة. عند الوصول إلى الموعد النهائي لوقت الموقت (سواء كان الوقت أو التردد الموقت) ، فإن قائمة انتظار الموقت "يطلق" الموقت عن طريق جدولة قابلية الاتصال بالتشغيل على المنفذ المرتبط بمهمة.
تمامًا مثل المنفذين ، تلتزم قوائم انتظار الموقت أيضًا بمفهوم RAII. عندما يخرج كائن وقت التشغيل عن النطاق ، فإنه يغلق قائمة انتظار الموقت ، مما يؤدي إلى إلغاء جميع أجهزة ضبط الوقت المعلقة. بعد إيقاف قائمة انتظار الموقت ، ستقوم أي مكالمة لاحقة لـ make_timer و make_onshot_timer و make_delay_object بإلقاء errors::runtime_shutdown . يجب ألا تحاول التطبيقات إيقاف قوائم انتظار الموقت بأنفسهم.
timer_queue API: class timer_queue {
/*
Destroys this timer_queue.
*/
~timer_queue () noexcept ;
/*
Shuts down this timer_queue:
Tells the underlying thread of execution to quit and joins it.
Cancels all pending timers.
After this call, invocation of any method besides shutdown
and shutdown_requested will throw an errors::runtime_shutdown.
If shutdown had been called before, this method has no effect.
*/
void shutdown () noexcept ;
/*
Returns true if shutdown had been called before, false otherwise.
*/
bool shutdown_requested () const noexcept ;
/*
Creates a new running timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
template < class callable_type , class ... argumet_types>
timer make_timer (
std::chrono::milliseconds due_time,
std::chrono::milliseconds frequency,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new one-shot timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
template < class callable_type , class ... argumet_types>
timer make_one_shot_timer (
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new delay object where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
result< void > make_delay_object (
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor);
};timer : class timer {
/*
Creates an empty timer.
*/
timer () noexcept = default ;
/*
Cancels the timer, if not empty.
*/
~timer () noexcept ;
/*
Moves the content of rhs to *this.
rhs is empty after this call.
*/
timer (timer&& rhs) noexcept = default ;
/*
Moves the content of rhs to *this.
rhs is empty after this call.
Returns *this.
*/
timer& operator = (timer&& rhs) noexcept ;
/*
Cancels this timer.
After this call, the associated timer_queue will not schedule *this
to run again and *this becomes empty.
Scheduled, but not yet executed tasks are cancelled.
Ongoing tasks are uneffected.
This method has no effect if *this is empty or the associated timer_queue has already expired.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void cancel ();
/*
Returns the associated executor of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::shared_ptr<executor> get_executor () const ;
/*
Returns the associated timer_queue of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::weak_ptr<timer_queue> get_timer_queue () const ;
/*
Returns the due time of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::chrono::milliseconds get_due_time () const ;
/*
Returns the frequency of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::chrono::milliseconds get_frequency () const ;
/*
Sets new frequency for this timer.
Callables already scheduled to run at the time of invocation are not affected.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
void set_frequency (std::chrono::milliseconds new_frequency);
/*
Returns true is *this is not an empty timer, false otherwise.
The timer should not be used if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
};في هذا المثال ، نقوم بإنشاء مؤقت منتظم باستخدام قائمة انتظار المؤقت. يقوم الموقت بجدولة قابلة للاستدعاء بعد 1.5 ثانية ، ثم يطلق النار على قابلية الاتصال كل ثانيتين. يعمل المعطى القابل للاتصال على Executor Threadpool.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
int main () {
concurrencpp::runtime runtime;
std:: atomic_size_t counter = 1 ;
concurrencpp::timer timer = runtime. timer_queue ()-> make_timer (
1500ms,
2000ms,
runtime. thread_pool_executor (),
[&] {
const auto c = counter. fetch_add ( 1 );
std::cout << " timer was invoked for the " << c << " th time " << std::endl;
});
std::this_thread::sleep_for (12s);
return 0 ;
}يعد Timer Oneshot مؤقتًا لمرة واحدة فقط مع وقت استحقاق فقط - بعد أن يحدد جدولةه للتشغيل بمجرد إعادة جدولةه أبدًا للركض مرة أخرى.
في هذا المثال ، نقوم بإنشاء جهاز توقيت يعمل مرة واحدة فقط - بعد 3 ثوانٍ من إنشائها ، سيقوم المؤقت بجدولة قابلة للاستدعاء على تشغيل مؤشر ترابط جديد من التنفيذ (باستخدام thread_executor ).
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
int main () {
concurrencpp::runtime runtime;
concurrencpp::timer timer = runtime. timer_queue ()-> make_one_shot_timer (
3000ms,
runtime. thread_executor (),
[&] {
std::cout << " hello and goodbye " << std::endl;
});
std::this_thread::sleep_for (4s);
return 0 ;
} كائن التأخير هو كائن نتيجة كسول يصبح جاهزًا عندما يتم co_await ed ويتم الوصول إلى وقت الاستحقاق. يمكن للتطبيقات co_await كائن النتيجة هذا لتأخير coroutine الحالي بطريقة غير حظر. يتم استئناف Coroutine الحالي بواسطة المنفذ الذي تم تمريره إلى make_delay_object .
في هذا المثال ، نفرد مهمة (لا تُرجع أي نتيجة أو استثناء تم إلقاؤها) ، والتي تؤخر نفسها في حلقة عن طريق استدعاء co_await على كائن تأخير.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
concurrencpp::null_result delayed_task (
std::shared_ptr<concurrencpp::timer_queue> tq,
std::shared_ptr<concurrencpp::thread_pool_executor> ex) {
size_t counter = 1 ;
while ( true ) {
std::cout << " task was invoked " << counter << " times. " << std::endl;
counter++;
co_await tq-> make_delay_object (1500ms, ex);
}
}
int main () {
concurrencpp::runtime runtime;
delayed_task (runtime. timer_queue (), runtime. thread_pool_executor ());
std::this_thread::sleep_for (10s);
return 0 ;
} المولد هو coroutine كسول ، متزامن قادر على إنتاج تيار من القيم للاستهلاك. تستخدم المولدات الكلمة الرئيسية co_yield لإعداد القيم إلى المستهلكين.
من المفترض أن يتم استخدام المولدات بشكل متزامن - يمكنهم فقط استخدام الكلمة الرئيسية co_yield ويجب ألا تستخدم الكلمة الرئيسية co_await . سيستمر المولد في إنتاج قيم طالما تسمى الكلمة الرئيسية co_yield . إذا تم استدعاء الكلمة الرئيسية co_return (بشكل صريح أو ضمني) ، فسيتوقف المولد عن إنتاج القيم. وبالمثل ، إذا تم طرح استثناء ، فسيتوقف المولد عن إنتاج القيم وسيتم إعادة تراكم الاستثناء الذي تم طرحه إلى المستهلك للمولد.
من المفترض أن يتم استخدام المولدات في حلقة range-for المدى: تنتج المولدات ضمنيًا اثنين من التكرار - begin end يتحكم في تنفيذ for . لا ينبغي التعامل مع هؤلاء المتكررين أو الوصول إليه يدويًا.
عند إنشاء مولد ، يبدأ كمهمة كسول. عندما يتم استدعاء طريقة begin ، يتم استئناف المولد لأول مرة ويتم إرجاع التكرار. يتم استئناف المهمة الكسول مرارًا وتكرارًا عن طريق الاتصال operator++ على المتكرر الذي تم إرجاعه. سيكون التكرار الذي تم إرجاعه مساوياً للتكرار end عندما ينهي المولد التنفيذ إما عن طريق الخروج بأمان أو رمي استثناء. كما ذكرنا سابقًا ، يحدث هذا وراء الكواليس من قبل الآلية الداخلية للحلقة والمولد ، ويجب عدم استدعاءها مباشرة.
مثل الكائنات الأخرى في concurrencpp ، المولدات هي نوع الحركة فقط. بعد نقل المولد ، يعتبر فارغًا ويحاول الوصول إلى أساليبه الداخلية (بخلاف operator bool ) استثناء. يجب ألا يحدث فراغ المولد بشكل عام - يُنصح باستهلاك المولدات عند إنشائها في for وعدم محاولة استدعاء أساليبهم بشكل فردي.
generator واجهة برمجة التطبيقات class generator {
/*
Move constructor. After this call, rhs is empty.
*/
generator (generator&& rhs) noexcept ;
/*
Destructor. Invalidates existing iterators.
*/
~generator () noexcept ;
generator ( const generator& rhs) = delete ;
generator& operator =(generator&& rhs) = delete ;
generator& operator =( const generator& rhs) = delete ;
/*
Returns true if this generator is not empty.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Starts running this generator and returns an iterator.
Throws errors::empty_generator if *this is empty.
Re-throws any exception that is thrown inside the generator code.
*/
iterator begin ();
/*
Returns an end iterator.
*/
static generator_end_iterator end () noexcept ;
};
class generator_iterator {
using value_type = std:: remove_reference_t <type>;
using reference = value_type&;
using pointer = value_type*;
using iterator_category = std::input_iterator_tag;
using difference_type = std:: ptrdiff_t ;
/*
Resumes the suspended generator and returns *this.
Re-throws any exception that was thrown inside the generator code.
*/
generator_iterator& operator ++();
/*
Post-increment version of operator++.
*/
void operator ++( int );
/*
Returns the latest value produced by the associated generator.
*/
reference operator *() const noexcept ;
/*
Returns a pointer to the latest value produced by the associated generator.
*/
pointer operator ->() const noexcept ;
/*
Comparision operators.
*/
friend bool operator ==( const generator_iterator& it0, const generator_iterator& it1) noexcept ;
friend bool operator ==( const generator_iterator& it, generator_end_iterator) noexcept ;
friend bool operator ==(generator_end_iterator end_it, const generator_iterator& it) noexcept ;
friend bool operator !=( const generator_iterator& it, generator_end_iterator end_it) noexcept ;
friend bool operator !=(generator_end_iterator end_it, const generator_iterator& it) noexcept ;
};generator : في هذا المثال ، سنكتب مولدًا يعطي العضو N-th في التسلسل S(n) = 1 + 2 + 3 + ... + n حيث n <= 100 :
concurrencpp::generator< int > sequence () {
int i = 1 ;
int sum = 0 ;
while (i <= 100 ) {
sum += i;
++i;
co_yield sum;
}
}
int main () {
for ( auto value : sequence ()) {
std::cout << value << std::end;
}
return 0 ;
} لا يمكن استخدام الأقفال المتزامنة العادية بأمان داخل المهام لعدة أسباب:
std::mutex ، في نفس مؤشر ترابط التنفيذ. فتح قفل متزامن في خيط لم يغلقه هو سلوك غير محدد. نظرًا لأنه يمكن تعليق المهام واستئنافها في أي مؤشر ترابط التنفيذ ، فإن الأقفال المتزامنة ستنهار عند استخدامها داخل المهام. concurrencpp::async_lock يحل هذه المشكلات من خلال توفير واجهة برمجة تطبيقات مماثلة لـ std::mutex ، مع الفرق الرئيسي الذي يدعو إلى concurrencpp::async_lock ستعيد النسخ الكسول يمكن أن co_awaited بأمان داخل المهام. إذا حاولت إحدى المهام قفل قفل غير متزامن وفشل ، فسيتم تعليق المهمة ، وسيتم استئنافها عندما يتم فتح القفل واكتسبه من قبل المهمة المعلقة. يتيح ذلك للمنفذين معالجة قدر كبير من المهام في انتظار الحصول على قفل دون تبديل السياق ومكالمات النواة باهظة الثمن.
على غرار كيفية عمل std::mutex ، يمكن أن تحصل على مهمة واحدة فقط async_lock في أي وقت معين ، وحاجز القراءة هو مكان في لحظة الاستحواذ. يضع إطلاق قفل Async حاجزًا للكتابة ويسمح للمهمة التالية بالحصول عليها ، وإنشاء سلسلة من المعدل الواحد في وقت يرى أن التغييرات التي قامت بها المعدلات الأخرى التي قامت بها وتنشر تعديلاتها على المعدلات التالية.
مثل std::mutex ، concurrencpp::async_lock ليست متكررة . يجب إيلاء اهتمام إضافي عند الحصول على مثل هذا القفل - يجب عدم الحصول على القفل مرة أخرى في مهمة تم إنتاجها بمهمة أخرى اكتسبت بالفعل القفل. في مثل هذه الحالة ، سيحدث قفل ميت لا مفر منه. على عكس الكائنات الأخرى في concurrencpp ، فإن async_lock غير قابلة للنسخ ولا متحرك.
مثل الأقفال القياسية ، من المفترض أن يتم استخدام concurrencpp::async_lock مع الأغطية التي يتم تحديدها والتي تستفيد من تعبير C ++ RAII لضمان فتح الأقفال دائمًا عند إرجاع الوظيفة أو الاستثناء. async_lock::lock إرجاع النسخ الكسول من غلاف مندد يدعو async_lock::unlock على الدمار. يتم تثبيط الاستخدامات الخام async_lock::unlock . concurrencpp::scoped_async_lock بمثابة الغلاف المنحطي ويوفر واجهة برمجة تطبيقات متطابقة تقريبًا مع std::unique_lock . concurrencpp::scoped_async_lock متحرك ، ولكن غير قابل للنسخ.
async_lock::lock and scoped_async_lock::lock يتطلب استئناف تنفيذي كمعلمة. عند استدعاء هذه الطرق ، إذا كان القفل متاحًا للقفل ، فسيتم قفله ويتم استئناف المهمة الحالية على الفور. إذا لم يكن الأمر كذلك ، فسيتم تعليق المهمة الحالية ، وسيتم استئنافها داخل المستأنف المستأنف المعطى عند الحصول على القفل أخيرًا.
concurrencpp::scoped_async_lock يلتف a async_lock وتأكد من فتحه بشكل صحيح. مثل std::unique_lock ، هناك حالات لا تلف أي قفل ، وفي هذه الحالة تعتبر فارغة. يمكن أن يحدث scoped_async_lock الفارغ عندما يتم بناؤه بشكل افتراضي أو نقله أو scoped_async_lock::release . لن يفتح القفل الفارغ النسيان أي قفل على الدمار.
حتى إذا لم يكن قفل النطاق الممتلكين فارغًا ، فهذا لا يعني أنه يمتلك قفلًا متزامنًا أساسيًا وسيقوم بإلغاء تأمينه على الدمار. يمكن أن يحدث أقفال غير فارغة وغير مملوكة للمحطات إذا تم استدعاء scoped_async_lock::unlock أو تم بناء قفل SCOPED-ASYNC باستخدام scoped_async_lock(async_lock&, std::defer_lock_t) .
async_lock API class async_lock {
/*
Constructs an async lock object.
*/
async_lock () noexcept ;
/*
Destructs an async lock object.
*this is not automatically unlocked at the moment of destruction.
*/
~async_lock () noexcept ;
/*
Asynchronously acquires the async lock.
If *this has already been locked by another non-parent task, the current task will be suspended
and will be resumed when *this is acquired, inside resume_executor.
If *this has not been locked by another task, then *this will be acquired and the current task will be resumed
immediately in the calling thread of execution.
If *this has already been locked by a parent task, then unavoidable dead-lock will occur.
Throws std::invalid_argument if resume_executor is null.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result<scoped_async_lock> lock (std::shared_ptr<executor> resume_executor);
/*
Tries to acquire *this in the calling thread of execution.
Returns true if *this is acquired, false otherwise.
In any case, the current task is resumed immediately in the calling thread of execution.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result< bool > try_lock ();
/*
Releases *this and allows other tasks (including suspended tasks waiting for *this) to acquire it.
Throws std::system error if *this is not locked at the moment of calling this method.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
void unlock ();
};scoped_async_lock API class scoped_async_lock {
/*
Constructs an async lock wrapper that does not wrap any async lock.
*/
scoped_async_lock () noexcept = default ;
/*
If *this wraps async_lock, this method releases the wrapped lock.
*/
~scoped_async_lock () noexcept ;
/*
Moves rhs to *this.
After this call, *rhs does not wrap any async lock.
*/
scoped_async_lock (scoped_async_lock&& rhs) noexcept ;
/*
Wrapps unlocked lock.
lock must not be in acquired mode when calling this method.
*/
scoped_async_lock (async_lock& lock, std:: defer_lock_t ) noexcept ;
/*
Wrapps locked lock.
lock must be already acquired when calling this method.
*/
scoped_async_lock (async_lock& lock, std:: adopt_lock_t ) noexcept ;
/*
Calls async_lock::lock on the wrapped locked, using resume_executor as a parameter.
Throws std::invalid_argument if resume_executor is nulll.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::lock throws.
*/
lazy_result< void > lock (std::shared_ptr<executor> resume_executor);
/*
Calls async_lock::try_lock on the wrapped lock.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::try_lock throws.
*/
lazy_result< bool > try_lock ();
/*
Calls async_lock::unlock on the wrapped lock.
If *this does not wrap any lock, this method does nothing.
Throws std::system_error if *this wraps a lock and it is not locked.
*/
void unlock ();
/*
Checks whether *this wraps a locked mutex or not.
Returns true if wrapped locked is in acquired state, false otherwise.
*/
bool owns_lock () const noexcept ;
/*
Equivalent to owns_lock.
*/
explicit operator bool () const noexcept ;
/*
Swaps the contents of *this and rhs.
*/
void swap (scoped_async_lock& rhs) noexcept ;
/*
Empties *this and returns a pointer to the previously wrapped lock.
After a call to this method, *this doesn't wrap any lock.
The previously wrapped lock is not released,
it must be released by either unlocking it manually through the returned pointer or by
capturing the pointer with another scoped_async_lock which will take ownerwhip over it.
*/
async_lock* release () noexcept ;
/*
Returns a pointer to the wrapped async_lock, or a null pointer if there is no wrapped async_lock.
*/
async_lock* mutex () const noexcept ;
};async_lock : في هذا المثال ، ندفع 1000000 نسمة إلى كائن std::vector من مهام مختلفة بشكل متزامن ، أثناء استخدام async_lock للتأكد من عدم حدوث سباق بيانات ويتم الحفاظ على صحة الحالة الداخلية لهذا الكائن المتجه.
# include " concurrencpp/concurrencpp.h "
# include < vector >
# include < iostream >
std::vector< size_t > numbers;
concurrencpp::async_lock lock;
concurrencpp::result< void > add_numbers (concurrencpp::executor_tag,
std::shared_ptr<concurrencpp::executor> executor,
size_t begin,
size_t end) {
for ( auto i = begin; i < end; i++) {
concurrencpp::scoped_async_lock raii_wrapper = co_await lock. lock (executor);
numbers. push_back (i);
}
}
int main () {
concurrencpp::runtime runtime;
constexpr size_t range = 10'000'000 ;
constexpr size_t sections = 4 ;
concurrencpp::result< void > results[sections];
for ( size_t i = 0 ; i < 4 ; i++) {
const auto range_start = i * range / sections;
const auto range_end = (i + 1 ) * range / sections;
results[i] = add_numbers ({}, runtime. thread_pool_executor (), range_start, range_end);
}
for ( auto & result : results) {
result. get ();
}
std::cout << " vector size is " << numbers. size () << std::endl;
// make sure the vector state has not been corrupted by unprotected concurrent accesses
std::sort (numbers. begin (), numbers. end ());
for ( size_t i = 0 ; i < range; i++) {
if (numbers[i] != i) {
std::cerr << " vector state is corrupted. " << std::endl;
return - 1 ;
}
}
std::cout << " succeeded pushing range [0 - 10,000,000] concurrently to the vector! " << std::endl;
return 0 ;
} async_condition_variable يقلد الشرط القياسي condition_variable ويمكن استخدامه بأمان مع المهام إلى جانب async_lock . تعمل async_condition_variable مع async_lock لتعليق مهمة حتى تتغير بعض الذاكرة المشتركة (المحمية بواسطة القفل). المهام التي ترغب في مراقبة تغييرات الذاكرة المشتركة ستقفل مثيل async_lock ، والاتصال async_condition_variable::await . سيؤدي هذا إلى فتح القفل وتعليق المهمة الحالية حتى تقوم بعض المهام المعدل بإعلام متغير الحالة. تكتسب مهمة المعدل القفل ، ويعدل الذاكرة المشتركة ، وإلغاء تأمين القفل واتصل إما notify_one أو notify_all . عندما يتم استئناف مهمة معلقة (باستخدام المنفذ السيرة الذاتية التي أعطيت await ) ، فإنها تغلق القفل مرة أخرى ، مما يسمح للمهمة بالاستمرار من نقطة التعليق بسلاسة. مثل async_lock ، async_condition_variable ليس متحركًا أو قابلًا للنسخ - من المفترض أن يتم إنشاؤه في مكان واحد والوصول إليه بواسطة مهام متعددة.
تتطلب async_condition_variable::await الحمولة الزائدة من السيرة الذاتية ، والتي سيتم استخدامها لاستئناف المهمة ، و scoped_async_lock . async_condition_variable::await يأتي مع اثنين من الأحمال الزائدة - واحد يقبل المسند والآخر لا. سيقوم الحمل الزائد الذي لا يقبل مسند بتعليق مهمة الاتصال فور الاحتجاج حتى يتم استئنافها عن طريق مكالمة notify_* . يعمل الحمل الزائد الذي يقبل ما يعمل من خلال السماح للمسند بفحص الذاكرة المشتركة وتعليق المهمة مرارًا وتكرارًا حتى تصل الذاكرة المشتركة إلى حالتها المطلوبة. بشكل تخطيطي يعمل مثل الاتصال
while (!pred()) { // pred() inspects the shared memory and returns true or false
co_await await (resume_executor, lock); // suspend the current task until another task calls `notify_xxx`
} تمامًا مثل متغير الشرط القياسي ، يتم تشجيع التطبيقات على استخدام التحميل المسند ، حيث يتيح المزيد من التحكم في الحبيبات على التعليق والاستئناف. يمكن استخدام async_condition_variable لكتابة المجموعات المتزامنة وهياكل البيانات مثل قوائم الانتظار والقنوات المتزامنة.
داخليًا ، يحمل async_condition_variable موعدًا للتعليق ، حيث يضع المهام أنفسهم عندما ينتظرون متغير الحالة ليتم إخطاره. عندما يتم استدعاء أي من أساليب notify_* ، تخلط المهمة الإخطار إما مهمة واحدة أو جميع المهام ، اعتمادًا على الطريقة التي تم استدعاؤها. يتم إلغاء المهام من التعليق-بطريقة FIFO. على سبيل المثال ، إذا كانت المهمة A await المكالمات ثم await المكالمات B المهمة B ، فسيتم notify_one المكالمات المهمة C ، ثم يتم إزالة المهمة A داخليًا واستئنافها. ستبقى المهمة B معلقة حتى يتم استدعاء مكالمة أخرى notify_one or notify_all . إذا تم تعليق المهمة A والمهمة B ومكالمات المهمة C notify_all ، فسيتم إلغاء كلا المهمتين واستئنافها.
async_condition_variable API class async_condition_variable {
/*
Constructor.
*/
async_condition_variable () noexcept ;
/*
Atomically releases lock and suspends the current task by adding it to *this suspension-queue.
Throws std::invalid_argument if resume_executor is null.
Throws std::invalid_argument if lock is not locked at the moment of calling this method.
Might throw std::system_error if the underlying std::mutex throws.
*/
lazy_result< void > await (std::shared_ptr<executor> resume_executor, scoped_async_lock& lock);
/*
Equivalent to:
while (!pred()) {
co_await await(resume_executor, lock);
}
Might throw any exception that await(resume_executor, lock) might throw.
Might throw any exception that pred might throw.
*/
template < class predicate_type >
lazy_result< void > await (std::shared_ptr<executor> resume_executor, scoped_async_lock& lock, predicate_type pred);
/*
Dequeues one task from *this suspension-queue and resumes it, if any available at the moment of calling this method.
The suspended task is resumed by scheduling it to run on the executor given when await was called.
Might throw std::system_error if the underlying std::mutex throws.
*/
void notify_one ();
/*
Dequeues all tasks from *this suspension-queue and resumes them, if any available at the moment of calling this method.
The suspended tasks are resumed by scheduling them to run on the executors given when await was called.
Might throw std::system_error if the underlying std::mutex throws.
*/
void notify_all ();
};async_condition_variable : في هذا المثال ، تعمل async_lock و async_condition_variable معًا لتنفيذ قائمة انتظار متزامنة يمكن استخدامها لإرسال البيانات (في هذا المثال ، الأعداد الصحيحة) بين المهام. لاحظ أن بعض الطرق تُرجع result في حين أن إرجاع lazy_result آخر ، مما يوضح كيف يمكن أن تعمل المهام المتحمسة والكسل معًا.
# include " concurrencpp/concurrencpp.h "
# include < queue >
# include < iostream >
using namespace concurrencpp ;
class concurrent_queue {
private:
async_lock _lock;
async_condition_variable _cv;
std::queue< int > _queue;
bool _abort = false ;
public:
concurrent_queue () = default ;
result< void > shutdown (std::shared_ptr<executor> resume_executor) {
{
auto guard = co_await _lock. lock (resume_executor);
_abort = true ;
}
_cv. notify_all ();
}
lazy_result< void > push (std::shared_ptr<executor> resume_executor, int i) {
{
auto guard = co_await _lock. lock (resume_executor);
_queue. push (i);
}
_cv. notify_one ();
}
lazy_result< int > pop (std::shared_ptr<executor> resume_executor) {
auto guard = co_await _lock. lock (resume_executor);
co_await _cv. await (resume_executor, guard, [ this ] {
return _abort || !_queue. empty ();
});
if (!_queue. empty ()) {
auto result = _queue. front ();
_queue. pop ();
co_return result;
}
assert (_abort);
throw std::runtime_error ( " queue has been shut down. " );
}
};
result< void > producer_loop (executor_tag,
std::shared_ptr<thread_pool_executor> tpe,
concurrent_queue& queue,
int range_start,
int range_end) {
for (; range_start < range_end; ++range_start) {
co_await queue. push (tpe, range_start);
}
}
result< void > consumer_loop (executor_tag, std::shared_ptr<thread_pool_executor> tpe, concurrent_queue& queue) {
try {
while ( true ) {
std::cout << co_await queue. pop (tpe) << std::endl;
}
} catch ( const std:: exception & e) {
std::cerr << e. what () << std::endl;
}
}
int main () {
runtime runtime;
const auto thread_pool_executor = runtime. thread_pool_executor ();
concurrent_queue queue;
result< void > producers[ 4 ];
result< void > consumers[ 4 ];
for ( int i = 0 ; i < 4 ; i++) {
producers[i] = producer_loop ({}, thread_pool_executor, queue, i * 5 , (i + 1 ) * 5 );
}
for ( int i = 0 ; i < 4 ; i++) {
consumers[i] = consumer_loop ({}, thread_pool_executor, queue);
}
for ( int i = 0 ; i < 4 ; i++) {
producers[i]. get ();
}
queue. shutdown (thread_pool_executor). get ();
for ( int i = 0 ; i < 4 ; i++) {
consumers[i]. get ();
}
return 0 ;
} كائن وقت تشغيل concurrencpp هو الوكيل المستخدم لاكتساب وتخزين وإنشاء منفذيين جدد.
يجب إنشاء وقت التشغيل كنوع قيمة بمجرد بدء تشغيل الوظيفة الرئيسية. عندما يخرج وقت تشغيل Concurrencpp عن النطاق ، فإنه يتكرر على منفذيها المخزنون ويغلقهم واحدًا تلو الآخر عن طريق الاتصال بـ executor::shutdown . ثم يخرج المنفذون من حلقة العمل الداخلية وأي محاولة لاحقة لجدولة مهمة جديدة سوف يلقي استثناء concurrencpp::runtime_shutdown . يحتوي وقت التشغيل أيضًا على قائمة انتظار الموقت العالمية المستخدمة لإنشاء أجهزة ضبط الوقت وتأخير الكائنات. عند التدمير ، يدمر المنفذون المخزنون المهام غير المخللة ، وانتظر المهام المستمرة. إذا حاولت المهمة المستمرة استخدام المنفذ لتفرخ مهام جديدة أو جدولة استمرار مهمتها - فسيتم طرح استثناء. في هذه الحالة ، تحتاج المهام المستمرة إلى الإقلاع عن التدخين في أقرب وقت ممكن ، مما يتيح للمنفذين الأساسيين الإقلاع عن التدخين. سيتم أيضًا إيقاف قائمة انتظار الموقت ، وإلغاء جميع أجهزة ضبط الوقت قيد التشغيل. مع هذا النمط من الكود RAII ، لا يمكن معالجة أي مهام قبل إنشاء كائن وقت التشغيل ، وبينما يخرج وقت التشغيل عن النطاق. يحرر هذا التطبيقات المتزامنة من الحاجة إلى توصيل رسائل الإنهاء بشكل صريح. المهام هي الاستخدام المجاني للمنفذين طالما أن كائن وقت التشغيل على قيد الحياة.
runtime class runtime {
/*
Creates a runtime object with default options.
*/
runtime ();
/*
Creates a runtime object with user defined options.
*/
runtime ( const concurrencpp::runtime_options& options);
/*
Destroys this runtime object.
Calls executor::shutdown on each monitored executor.
Calls timer_queue::shutdown on the global timer queue.
*/
~runtime () noexcept ;
/*
Returns this runtime timer queue used to create new times.
*/
std::shared_ptr<concurrencpp::timer_queue> timer_queue () const noexcept ;
/*
Returns this runtime concurrencpp::inline_executor
*/
std::shared_ptr<concurrencpp::inline_executor> inline_executor () const noexcept ;
/*
Returns this runtime concurrencpp::thread_pool_executor
*/
std::shared_ptr<concurrencpp::thread_pool_executor> thread_pool_executor () const noexcept ;
/*
Returns this runtime concurrencpp::background_executor
*/
std::shared_ptr<concurrencpp::thread_pool_executor> background_executor () const noexcept ;
/*
Returns this runtime concurrencpp::thread_executor
*/
std::shared_ptr<concurrencpp::thread_executor> thread_executor () const noexcept ;
/*
Creates a new concurrencpp::worker_thread_executor and registers it in this runtime.
Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
*/
std::shared_ptr<concurrencpp::worker_thread_executor> make_worker_thread_executor ();
/*
Creates a new concurrencpp::manual_executor and registers it in this runtime.
Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
*/
std::shared_ptr<concurrencpp::manual_executor> make_manual_executor ();
/*
Creates a new user defined executor and registers it in this runtime.
executor_type must be a valid concrete class of concurrencpp::executor.
Might throw std::bad_alloc if no memory is available.
Might throw any exception that the constructor of <<executor_type>> might throw.
*/
template < class executor_type , class ... argument_types>
std::shared_ptr<executor_type> make_executor (argument_types&& ... arguments);
/*
returns the version of concurrencpp that the library was built with.
*/
static std::tuple< unsigned int , unsigned int , unsigned int > version () noexcept ;
}; في بعض الحالات ، تهتم التطبيقات بمراقبة إنشاء مؤشرات الترابط وإنهائها ، على سبيل المثال ، يتطلب بعض مخصصات الذاكرة أن يتم تسجيل مؤشرات ترابط جديدة وغير مسجلة عند إنشائها وإنهائها. يتيح وقت تشغيل Concurrencpp إعداد رد اتصال إنشاء مؤشر ترابط ورد اتصال لإنهاء مؤشر ترابط. سيتم استدعاء عمليات الاسترجاعات هذه كلما قام أحد عمال Concurrencpp بإنشاء موضوع جديد وعندما ينتهي هذا الموضوع. تتم استدعاء عمليات الاسترجاع هذه دائمًا من داخل مؤشر ترابط الإنهاء/إنهاءها ، لذلك ستعيد std::this_thread::get_id دائمًا معرف مؤشر الترابط ذي الصلة. توقيع هذه العوائد هو void callback (std::string_view thread_name) . thread_name هو عنوان محدد concurrencpp يتم إعطاؤه للمعلومات ويمكن ملاحظته في بعض أصحاب الأخطاء الذين يقدمون اسم مؤشر الترابط. لا يضمن اسم الخيط أن يكون فريدًا ويجب استخدامه لتسجيل وتصحيح الأخطاء.
من أجل تعيين رد اتصال لإنشاء مؤشرات الترابط و/أو رد اتصال لإنهاء مؤشرات الترابط ، يمكن للتطبيقات تعيين thread_started_callback و/أو thread_terminated_callback أعضاء في runtime_options التي يتم تمريرها إلى مُنشئ وقت التشغيل. نظرًا لأن عمليات الاسترجاعات هذه يتم نسخها إلى كل عامل concurrencpp الذي قد ينشئ مؤشرات الترابط ، يجب أن تكون عمليات الاسترجاعات هذه قابلة للنسخ.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::runtime_options options;
options. thread_started_callback = [](std::string_view thread_name) {
std::cout << " A new thread is starting to run, name: " << thread_name << " , thread id: " << std::this_thread::get_id ()
<< std::endl;
};
options. thread_terminated_callback = [](std::string_view thread_name) {
std::cout << " A thread is terminating, name: " << thread_name << " , thread id: " << std::this_thread::get_id () << std::endl;
};
concurrencpp::runtime runtime (options);
const auto timer_queue = runtime. timer_queue ();
const auto thread_pool_executor = runtime. thread_pool_executor ();
concurrencpp::timer timer =
timer_queue-> make_timer ( std::chrono::milliseconds ( 100 ), std::chrono::milliseconds ( 500 ), thread_pool_executor, [] {
std::cout << " A timer callable is executing " << std::endl;
});
std::this_thread::sleep_for ( std::chrono::seconds ( 3 ));
return 0 ;
}الإخراج المحتمل:
A new thread is starting to run, name: concurrencpp::timer_queue worker, thread id: 7496
A new thread is starting to run, name: concurrencpp::thread_pool_executor worker, thread id: 21620
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A thread is terminating, name: concurrencpp::timer_queue worker, thread id: 7496
A thread is terminating, name: concurrencpp::thread_pool_executor worker, thread id: 21620
يمكن للتطبيقات إنشاء نوع المنفذ المخصص الخاص بها عن طريق وراثة فئة derivable_executor . هناك بعض النقاط التي يجب مراعاتها عند تنفيذ المنفذين المعينين للمستخدم: الشيء الأكثر أهمية هو أن نتذكر أن المنفذين يتم استخدامهم من مؤشرات ترابط متعددة ، لذلك يجب أن تكون الأساليب التي يتم تنفيذها آمنة مؤشرات الترابط.
يمكن إنشاء منفذيين جدد باستخدام runtime::make_executor . يجب ألا تقوم التطبيقات بإنشاء منفذيين جدد ذوي إنشاء مثيل عادي (مثل std::make_shared أو new عادي) ، فقط باستخدام runtime::make_executor . أيضًا ، يجب ألا تحاول التطبيقات إعادة إمكانية إدخال منفذي Concurrencpp المدمج ، مثل thread_pool_executor أو thread_executor ، يجب الوصول إلى هؤلاء المنفذين فقط من خلال مثيلاتهم الحالية في كائن وقت التشغيل.
هناك نقطة مهمة أخرى هي التعامل مع الإغلاق بشكل صحيح: يجب على shutdown ، shutdown_requested و enqueue مراقبة جميع حالة المنفذ وتصرف وفقًا لذلك عندما يتم الاحتجاج:
shutdown المواضيع الأساسية للاستقالة ثم الانضمام إليهم.shutdown عدة مرات ، ويجب أن تتعامل الطريقة مع هذا السيناريو من خلال تجاهل أي مكالمات لاحقة shutdown بعد الاحتجاج الأول.enqueue رمي concurrencpp::errors::runtime_shutdown استثناء إذا تم استدعاء shutdown من قبل. task يعد تنفيذ المنفذين أحد الحالات النادرة التي تحتاج فيها التطبيقات إلى العمل مع concurrencpp::task Class مباشرة. concurrencpp::task هي std::function مثل كائن ، ولكن مع بعض الاختلافات. مثل std::function ، يقوم كائن المهمة بتخزين قابلة للاستدعاء تعمل كعملية غير متزامنة. على عكس std::function ، فإن task هي نوع الحركة فقط. عند الاحتجاج ، لا تتلقى كائنات المهمة أي معلمات وإرجاع void . علاوة على ذلك ، يمكن استدعاء كل كائن مهمة مرة واحدة فقط. بعد الاحتجاج الأول ، يصبح كائن المهمة فارغًا. إن استدعاء كائن مهمة فارغ يعادل استدعاء Lambda فارغ ( []{} ) ، ولن يلقي أي استثناء. تتلقى كائنات المهمة قابلة للاستدعاء كمرجع توجيه ( type&& حيث type هو معلمة قالب) ، وليس عن طريق نسخة (مثل std::function ). يحدث بناء القابل للتخزين في مكانه. يتيح ذلك أن تحتوي كائنات المهمة على callables التي يتم نقلها فقط (مثل std::unique_ptr و concurrencpp::result ). تحاول كائنات المهمة استخدام طرق مختلفة لتحسين استخدام الأنواع المخزنة ، على سبيل المثال ، تطبيقات المهمة تطبق التحسين المختصر (SBO) من أجل callables العادية والصغيرة ، وسوف تضمن المكالمات إلى std::coroutine_handle<void> عن طريق استدعاءها مباشرة دون الإرسال الافتراضي.
task API class task {
/*
Creates an empty task object.
*/
task () noexcept ;
/*
Creates a task object by moving the stored callable of rhs to *this.
If rhs is empty, then *this will also be empty after construction.
After this call, rhs is empty.
*/
task (task&& rhs) noexcept ;
/*
Creates a task object by storing callable in *this.
<<typename std::decay<callable_type>::type>> will be in-place-
constructed inside *this by perfect forwarding callable.
*/
template < class callable_type >
task (callable_type&& callable);
/*
Destroys stored callable, does nothing if empty.
*/
~task () noexcept ;
/*
If *this is empty, does nothing.
Invokes stored callable, and immediately destroys it.
After this call, *this is empty.
May throw any exception that the invoked callable may throw.
*/
void operator ()();
/*
Moves the stored callable of rhs to *this.
If rhs is empty, then *this will also be empty after this call.
If *this already contains a stored callable, operator = destroys it first.
*/
task& operator =(task&& rhs) noexcept ;
/*
If *this is not empty, task::clear destroys the stored callable and empties *this.
If *this is empty, clear does nothing.
*/
void clear () noexcept ;
/*
Returns true if *this stores a callable. false otherwise.
*/
explicit operator bool () const noexcept ;
/*
Returns true if *this stores a callable,
and that stored callable has the same type as <<typename std::decay<callable_type>::type>>
*/
template < class callable_type >
bool contains () const noexcept ;
}; عند تنفيذ المنفذين المعرفة من قبل المستخدم ، فإن الأمر متروك للتنفيذ لتخزين كائنات task (عند استدعاء enqueue ) ، وتنفيذها وفقًا للآلية الداخلية المنفذة.
في هذا المثال ، نقوم بإنشاء منفذي يقوم بتسجيل إجراءات مثل المهام أو تنفيذها. نقوم بتنفيذ واجهة executor ، ونطلب من وقت التشغيل إنشاء وتخزين مثيل منه عن طريق الاتصال runtime::make_executor . يتصرف بقية التطبيق تمامًا كما لو كنا نستخدم المنفذين غير المعرفة من قبل المستخدم.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < queue >
# include < thread >
# include < mutex >
# include < condition_variable >
class logging_executor : public concurrencpp ::derivable_executor<logging_executor> {
private:
mutable std::mutex _lock;
std::queue<concurrencpp::task> _queue;
std::condition_variable _condition;
bool _shutdown_requested;
std::thread _thread;
const std::string _prefix;
void work_loop () {
while ( true ) {
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
return ;
}
if (!_queue. empty ()) {
auto task = std::move (_queue. front ());
_queue. pop ();
lock. unlock ();
std::cout << _prefix << " A task is being executed " << std::endl;
task ();
continue ;
}
_condition. wait (lock, [ this ] {
return !_queue. empty () || _shutdown_requested;
});
}
}
public:
logging_executor (std::string_view prefix) :
derivable_executor<logging_executor>( " logging_executor " ),
_shutdown_requested ( false ),
_prefix (prefix) {
_thread = std::thread ([ this ] {
work_loop ();
});
}
void enqueue (concurrencpp::task task) override {
std::cout << _prefix << " A task is being enqueued! " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
throw concurrencpp::errors::runtime_shutdown ( " logging executor - executor was shutdown. " );
}
_queue. emplace ( std::move (task));
_condition. notify_one ();
}
void enqueue (std::span<concurrencpp::task> tasks) override {
std::cout << _prefix << tasks. size () << " tasks are being enqueued! " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
throw concurrencpp::errors::runtime_shutdown ( " logging executor - executor was shutdown. " );
}
for ( auto & task : tasks) {
_queue. emplace ( std::move (task));
}
_condition. notify_one ();
}
int max_concurrency_level () const noexcept override {
return 1 ;
}
bool shutdown_requested () const noexcept override {
std::unique_lock<std::mutex> lock (_lock);
return _shutdown_requested;
}
void shutdown () noexcept override {
std::cout << _prefix << " shutdown requested " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) return ; // nothing to do.
_shutdown_requested = true ;
lock. unlock ();
_condition. notify_one ();
_thread. join ();
}
};
int main () {
concurrencpp::runtime runtime;
auto logging_ex = runtime. make_executor <logging_executor>( " Session #1234 " );
for ( size_t i = 0 ; i < 10 ; i++) {
logging_ex-> post ([] {
std::cout << " hello world " << std::endl;
});
}
std::getchar ();
return 0 ;
}$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S . -B build /lib
$ cmake -- build build /lib --config Release$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S test -B build / test
$ cmake -- build build / test
< # for release mode: cmake --build build/test --config Release #>
$ cd build / test
$ ctest . -V -C Debug
< # for release mode: ctest . -V -C Release #> $ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -DCMAKE_BUILD_TYPE=Release -S . -B build /lib
$ cmake -- build build /lib
#optional, install the library: sudo cmake --install build/lib مع Clang و GCC ، من الممكن أيضًا إجراء الاختبارات باستخدام دعم Tsan (Throse Thitizer).
$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S test -B build / test
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S test -B build/test
#for TSAN mode: cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_THREAD_SANITIZER=Yes -S test -B build/test
$ cmake -- build build / test
$ cd build / test
$ ctest . -V عند التجميع على Linux ، تحاول المكتبة استخدام libstdc++ افتراضيًا. إذا كنت تنوي استخدام libc++ كتطبيق مكتبة قياسي ، فيجب تحديد علامة CMAKE_TOOLCHAIN_FILE على النحو التالي:
$ cmake -DCMAKE_TOOLCHAIN_FILE=../cmake/libc++.cmake -DCMAKE_BUILD_TYPE=Release -S . -B build /libبدلاً من ذلك لبناء المكتبة وتثبيتها يدويًا ، قد يحصل المطورون على إصدارات مستقرة من Concurrencpp عبر مديري حزم VCPKG و Conan:
VCPKG:
$ vcpkg install concurrencppكونان: Concurrencpp على Conancenter
يأتي Concurrencpp مع برنامج صندوق رمل مدمج يمكن للمطورين تعديله والتجربة ، دون الحاجة إلى تثبيت المكتبة المترجمة أو ربطها بقاعدة رمز مختلفة. من أجل اللعب مع صندوق الرمل ، يمكن للمطورين تعديل sandbox/main.cpp وتجميع التطبيق باستخدام الأوامر التالية:
$ cmake -S sandbox -B build /sandbox
$ cmake -- build build /sandbox
< # for release mode: cmake --build build/sandbox --config Release #>
$ ./ build /sandbox < # runs the sandbox> $ cmake -S sandbox -B build /sandbox
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S sandbox -B build/sandbox
$ cmake -- build build /sandbox
$ ./ build /sandbox #runs the sandbox