Concurrencpp membawa kekuatan tugas bersamaan ke dunia C ++, yang memungkinkan pengembang untuk menulis aplikasi yang sangat bersamaan dengan mudah dan aman dengan menggunakan tugas, pelaksana, dan coroutine. Dengan menggunakan aplikasi Concurrencpp dapat memecah prosedur besar yang perlu diproses secara tidak sinkron menjadi tugas-tugas kecil yang berjalan secara bersamaan dan bekerja secara kooperatif untuk mencapai hasil yang diinginkan. Concurrencpp juga memungkinkan aplikasi untuk menulis algoritma paralel dengan mudah dengan menggunakan coroutine paralel.
Keuntungan utama Concurrencpp adalah:
std::thread dan std::mutex .co_await .executor APIthread_pool_executor APImanual_executor APIresultresult APIlazy_resultlazy_resultresult_promiseresult_promiseshared_resultshared_resultmake_ready_resultmake_exceptional_resultwhen_allwhen_anyresume_ontimer_queuetimergeneratorgeneratorasync_lock APIscoped_async_lock APIasync_lockasync_condition_variable APIasync_condition_variableruntime APItasktaskConcurrencpp dibangun di sekitar konsep tugas bersamaan. Tugas adalah operasi asinkron. Tugas menawarkan tingkat abstraksi yang lebih tinggi untuk kode bersamaan daripada pendekatan thread-centric tradisional. Tugas dapat dirantai bersama, artinya tugas melewati hasil asinkronnya dari satu ke yang lain, di mana hasil dari satu tugas digunakan seolah -olah itu adalah parameter atau nilai menengah dari tugas lain yang sedang berlangsung. Tugas memungkinkan aplikasi untuk memanfaatkan sumber daya perangkat keras yang tersedia lebih baik dan skala lebih dari menggunakan utas mentah, karena tugas dapat ditangguhkan, menunggu tugas lain untuk menghasilkan hasil, tanpa memblokir threads yang mendasari. Tugas membawa lebih banyak produktivitas bagi pengembang dengan memungkinkan mereka untuk lebih fokus pada bisnis-logika dan lebih sedikit pada konsep-konsep tingkat rendah seperti manajemen utas dan sinkronisasi antar-thread.
Sementara tugas menentukan tindakan apa yang harus dieksekusi, pelaksana adalah objek pekerja yang menentukan di mana dan bagaimana menjalankan tugas. Eksekutor menyisihkan aplikasi manajemen kumpulan utas yang membosankan dan antrian tugas. Pelaksana juga memisahkan konsep -konsep tersebut dari kode aplikasi, dengan menyediakan API terpadu untuk membuat dan menjadwalkan tugas.
Tugas berkomunikasi satu sama lain menggunakan objek hasil . Objek hasil adalah pipa asinkron yang melewati hasil asinkron dari satu tugas ke tugas berkelanjutan lainnya. Hasil dapat ditunggu dan diselesaikan dengan cara yang tidak blokir.
Ketiga konsep ini - tugas, pelaksana dan hasil yang terkait adalah blok bangunan Concurrencpp. Eksekutor menjalankan tugas yang berkomunikasi satu sama lain dengan mengirimkan hasil melalui objek-hasil. Tugas, pelaksana, dan objek hasil bekerja bersama secara simbiotik untuk menghasilkan kode bersamaan yang cepat dan bersih.
Concurrencpp dibangun di sekitar konsep RAII. Untuk menggunakan tugas dan pelaksana, aplikasi membuat instance runtime di awal fungsi main . Runtime kemudian digunakan untuk memperoleh eksekutor yang ada dan mendaftarkan pelaksana yang ditentukan pengguna baru. Eksekutor digunakan untuk membuat dan menjadwalkan tugas untuk dijalankan, dan mereka mungkin mengembalikan objek result yang dapat digunakan untuk meneruskan hasil asinkron ke tugas lain yang bertindak sebagai konsumennya. Ketika runtime dihancurkan, ia beralih ke setiap pelaksana yang disimpan dan memanggil metode shutdown . Setiap pelaksana kemudian keluar dengan anggun. Tugas yang tidak terjadwal dihancurkan, dan upaya untuk membuat tugas baru akan melempar pengecualian.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::runtime runtime;
auto result = runtime. thread_executor ()-> submit ([] {
std::cout << " hello world " << std::endl;
});
result. get ();
return 0 ;
} Dalam contoh dasar ini, kami membuat objek runtime, kemudian kami memperoleh eksekutor utas dari runtime. Kami menggunakan submit untuk melewati lambda sebagai callable kami yang diberikan. Lambda ini mengembalikan void , oleh karena itu, pelaksana mengembalikan result<void> objek yang melewati hasil asinkron kembali ke penelepon. Panggilan main get yang memblokir utas utama sampai hasilnya siap. Jika tidak ada pengecualian yang dilemparkan, get pengembalian void . Jika pengecualian dilemparkan, get kembali. Secara tidak sinkron, thread_executor meluncurkan utas eksekusi baru dan menjalankan lambda yang diberikan. Ini secara implisit co_return void dan tugas selesai. main kemudian tidak diblokir.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < vector >
# include < algorithm >
# include < ctime >
using namespace concurrencpp ;
std::vector< int > make_random_vector () {
std::vector< int > vec ( 64 * 1'024 );
std::srand ( std::time ( nullptr ));
for ( auto & i : vec) {
i = :: rand ();
}
return vec;
}
result< size_t > count_even (std::shared_ptr<thread_pool_executor> tpe, const std::vector< int >& vector) {
const auto vecor_size = vector. size ();
const auto concurrency_level = tpe-> max_concurrency_level ();
const auto chunk_size = vecor_size / concurrency_level;
std::vector<result< size_t >> chunk_count;
for ( auto i = 0 ; i < concurrency_level; i++) {
const auto chunk_begin = i * chunk_size;
const auto chunk_end = chunk_begin + chunk_size;
auto result = tpe-> submit ([&vector, chunk_begin, chunk_end]() -> size_t {
return std::count_if (vector. begin () + chunk_begin, vector. begin () + chunk_end, []( auto i) {
return i % 2 == 0 ;
});
});
chunk_count. emplace_back ( std::move (result));
}
size_t total_count = 0 ;
for ( auto & result : chunk_count) {
total_count += co_await result;
}
co_return total_count;
}
int main () {
concurrencpp::runtime runtime;
const auto vector = make_random_vector ();
auto result = count_even (runtime. thread_pool_executor (), vector);
const auto total_count = result. get ();
std::cout << " there are " << total_count << " even numbers in the vector " << std::endl;
return 0 ;
} Dalam contoh ini, kami memulai program dengan membuat objek runtime. Kami membuat vektor yang diisi dengan nomor acak, lalu kami memperoleh thread_pool_executor dari runtime dan hubungi count_even . count_even adalah coroutine yang memunculkan lebih banyak tugas dan co_await untuk mereka finis di dalam. max_concurrency_level Mengembalikan jumlah maksimum pekerja yang didukung oleh pelaksana, dalam kasus pelaksana ThreadPool, jumlah pekerja dihitung dari jumlah core. Kami kemudian mempartisi array agar sesuai dengan jumlah pekerja dan mengirim setiap potongan untuk diproses dalam tugasnya sendiri. Secara tidak sinkron, para pekerja menghitung berapa banyak angka yang ada di setiap potongan, dan co_return hasilnya. count_even merangkum setiap hasil dengan menarik jumlah menggunakan co_await , hasil akhirnya adalah co_return ed. Utas utama, yang diblokir dengan menelepon get tidak diblokir dan jumlah total dikembalikan. Main mencetak jumlah angka genap dan program berakhir dengan anggun.
Setiap operasi besar atau kompleks dapat dipecah menjadi langkah yang lebih kecil dan dapat diikat. Tugas adalah operasi asinkron yang mengimplementasikan langkah -langkah komputasi tersebut. Tugas dapat berjalan di mana saja dengan bantuan pelaksana. Sementara tugas dapat dibuat dari callable reguler (seperti functors dan lambdas), tugas sebagian besar digunakan dengan coroutine, yang memungkinkan suspensi dan dimulainya kembali yang halus. Dalam Concurrencpp, konsep tugas diwakili oleh concurrencpp::task . Meskipun konsep tugas adalah pusat dari ConcurrenPP, aplikasi jarang harus membuat dan memanipulasi objek tugas sendiri, karena objek tugas dibuat dan dijadwalkan oleh runtime tanpa bantuan eksternal.
Concurrencpp memungkinkan aplikasi untuk memproduksi dan mengonsumsi coroutine sebagai cara utama membuat tugas. Concurrencpp mendukung tugas yang bersemangat dan malas.
Tugas yang bersemangat mulai berjalan saat mereka dipanggil. Jenis eksekusi ini direkomendasikan ketika aplikasi perlu menembakkan tindakan asinkron dan mengkonsumsi hasilnya nanti (api dan konsumsi nanti), atau sepenuhnya mengabaikan hasil asinkron (api dan lupa).
Tugas yang bersemangat dapat mengembalikan result atau null_result . Jenis pengembalian result memberi tahu coroutine untuk melewati nilai yang dikembalikan atau pengecualian yang dilemparkan (api dan konsumsi nanti) sementara tipe pengembalian null_result memberi tahu coroutine untuk jatuh dan mengabaikan salah satu dari mereka (api dan lupa).
Coroutine yang bersemangat dapat mulai berjalan secara sinkron, di utas penelepon. Jenis coroutine ini disebut "coroutine biasa". Concurrencpp Eager Coroutines juga dapat mulai berjalan secara paralel, di dalam pelaksana yang diberikan, coroutine semacam ini disebut "coroutine paralel".
Tugas malas, di sisi lain, mulai berjalan hanya ketika co_await ed. Jenis tugas ini direkomendasikan ketika hasil tugas dimaksudkan untuk dikonsumsi segera setelah membuat tugas. Tugas malas, ditangguhkan, sedikit lebih dioptimalkan untuk kasus konsumsi langsung, karena mereka tidak memerlukan sinkronisasi utas khusus untuk meneruskan hasil asinkron kembali ke konsumennya. Kompiler mungkin juga mengoptimalkan beberapa alokasi memori yang diperlukan untuk membentuk janji coroutine yang mendasarinya. Sementara itu tidak mungkin untuk menembakkan tugas yang malas dan melaksanakan sesuatu yang lain-penembakan coroutine malas-callee tentu berarti penangguhan penelepon-koroutine. Penelepon coroutine hanya akan dilanjutkan ketika coroutine malas-Callee selesai. Tugas malas hanya bisa mengembalikan lazy_result .
Tugas malas dapat dikonversi menjadi tugas yang bersemangat dengan menelepon lazy_result::run . Metode ini menjalankan tugas malas secara inline dan mengembalikan objek result yang memantau tugas yang baru dimulai. Jika pengembang tidak yakin jenis hasil mana yang akan digunakan, mereka didorong untuk menggunakan hasil malas, karena mereka dapat dikonversi menjadi hasil reguler (bersemangat) jika diperlukan.
Ketika suatu fungsi mengembalikan salah satu dari lazy_result , result atau null_result dan berisi setidaknya satu co_await atau co_return di tubuhnya, fungsinya adalah coroutine concurrencpp. Setiap Concurrencpp Coroutine yang valid adalah tugas yang valid. Dalam contoh penghitungan kami di atas, count_even adalah coroutine. Kami pertama kali melahirkan count_even , lalu di dalamnya pelaksana Threadpool melahirkan lebih banyak tugas anak (yang dibuat dari Callable biasa), yang akhirnya bergabung menggunakan co_await .
Eksekutor Concurrencpp adalah objek yang dapat menjadwalkan dan menjalankan tugas. Pelaksana menyederhanakan pekerjaan mengelola sumber daya seperti utas, kumpulan utas, dan antrian tugas dengan memisahkan mereka dari kode aplikasi. Pelaksana menyediakan cara penjadwalan dan melaksanakan tugas yang terpadu, karena mereka semua memperluas concurrencpp::executor .
executor API class executor {
/*
Initializes a new executor and gives it a name.
*/
executor (std::string_view name);
/*
Destroys this executor.
*/
virtual ~executor () noexcept = default ;
/*
The name of the executor, used for logging and debugging.
*/
const std::string name;
/*
Schedules a task to run in this executor.
Throws concurrencpp::errors::runtime_shutdown exception if shutdown was called before.
*/
virtual void enqueue (concurrencpp::task task) = 0;
/*
Schedules a range of tasks to run in this executor.
Throws concurrencpp::errors::runtime_shutdown exception if shutdown was called before.
*/
virtual void enqueue (std::span<concurrencpp::task> tasks) = 0;
/*
Returns the maximum count of real OS threads this executor supports.
The actual count of threads this executor is running might be smaller than this number.
returns numeric_limits<int>::max if the executor does not have a limit for OS threads.
*/
virtual int max_concurrency_level () const noexcept = 0;
/*
Returns true if shutdown was called before, false otherwise.
*/
virtual bool shutdown_requested () const noexcept = 0;
/*
Shuts down the executor:
- Tells underlying threads to exit their work loop and joins them.
- Destroys unexecuted coroutines.
- Makes subsequent calls to enqueue, post, submit, bulk_post and
bulk_submit to throw concurrencpp::errors::runtime_shutdown exception.
- Makes shutdown_requested return true.
*/
virtual void shutdown () noexcept = 0;
/*
Turns a callable and its arguments into a task object and
schedules it to run in this executor using enqueue.
Arguments are passed to the task by decaying them first.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type , class ... argument_types>
void post (callable_type&& callable, argument_types&& ... arguments);
/*
Like post, but returns a result object that passes the asynchronous result.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type , class ... argument_types>
result<type> submit (callable_type&& callable, argument_types&& ... arguments);
/*
Turns an array of callables into an array of tasks and
schedules them to run in this executor using enqueue.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type >
void bulk_post (std::span<callable_type> callable_list);
/*
Like bulk_post, but returns an array of result objects that passes the asynchronous results.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type >
std::vector<concurrencpp::result<type>> bulk_submit (std::span<callable_type> callable_list);
};Seperti disebutkan di atas, Concurrencpp menyediakan eksekutor yang umum digunakan. Jenis pelaksana ini adalah:
Thread Pool Executor - Pelaksana tujuan umum yang memelihara kumpulan benang. Eksekutor kumpulan utas cocok untuk tugas-tugas singkat yang terikat CPU yang tidak memblokir. Aplikasi didorong untuk menggunakan pelaksana ini sebagai pelaksana default untuk tugas yang tidak memblokir. Kumpulan utas Concurrencpp menyediakan injeksi utas dinamis dan penyeimbangan kerja yang dinamis.
Executor Latar Belakang - Eksekutor ThreadPool dengan kumpulan utas yang lebih besar. Cocok untuk meluncurkan tugas pemblokiran pendek seperti kueri File IO dan DB. Catatan Penting: Saat mengkonsumsi hasil pelaksana ini dikembalikan dengan memanggil submit dan bulk_submit , penting untuk beralih eksekusi menggunakan resume_on ke pelaksana yang terikat CPU, untuk mencegah tugas yang terikat CPU diproses di dalam latar belakang_executor.
contoh:
auto result = background_executor.submit([] { /* some blocking action */ });
auto done_result = co_await result.resolve();
co_await resume_on (some_cpu_executor);
auto val = co_await done_result; // runs inside some_cpu_executorEksekutor Thread - Pelaksana yang meluncurkan setiap tugas yang dijalankan untuk dijalankan pada utas eksekusi baru. Thread tidak digunakan kembali. Eksekutor ini baik untuk tugas yang sudah berjalan lama, seperti objek yang menjalankan loop kerja, atau operasi pemblokiran panjang.
Eksekutor Thread Pekerja - Eksekutor utas tunggal yang mempertahankan satu antrian tugas. Cocok saat aplikasi menginginkan utas khusus yang menjalankan banyak tugas terkait.
Eksekutor Manual - Pelaksana yang tidak menjalankan coroutine dengan sendirinya. Kode aplikasi dapat menjalankan tugas yang ditentukan sebelumnya dengan secara manual memohon metode eksekusi.
Eksekutor Derivable - kelas dasar untuk pelaksana yang ditentukan pengguna. Meskipun mewarisi langsung dari concurrencpp::executor dimungkinkan, derivable_executor menggunakan pola CRTP yang memberikan beberapa peluang optimasi untuk kompiler.
Inline Executor - terutama digunakan untuk mengesampingkan perilaku eksekutor lainnya. Membuat tugas setara dengan memohonnya sejalan.
Mekanisme telanjang seorang pelaksana dienkapsulasi dalam metode enqueue . Metode ini memasukkan tugas untuk dieksekusi dan memiliki dua kelebihan beban: satu kelebihan beban menerima satu objek tugas sebagai argumen, dan satu lagi yang menerima rentang objek tugas. Kelebihan beban kedua digunakan untuk memasukkan sejumlah tugas. Ini memungkinkan heuristik penjadwalan yang lebih baik dan penurunan pertengkaran.
Aplikasi tidak harus mengandalkan enqueue saja, concurrencpp::executor menyediakan API untuk menjadwalkan pengguna yang dapat dipanggil dengan mengubahnya menjadi objek tugas di belakang layar. Aplikasi dapat meminta pelaksana untuk mengembalikan objek hasil yang melewati hasil asinkron dari yang disediakan. Ini dilakukan dengan menelepon executor::submit dan executor::bulk_submit . submit mendapat panggilan yang dapat dipanggil, dan mengembalikan objek hasil. executor::bulk_submit mendapat span yang dapat dipanggil dan mengembalikan vector objek hasil dengan cara yang sama submit karya. Dalam banyak kasus, aplikasi tidak tertarik pada nilai atau pengecualian asinkron. Dalam hal ini, aplikasi dapat menggunakan executor:::post dan executor::bulk_post untuk menjadwalkan panggilan yang dapat dipanggil atau span yang dapat dipanggil untuk dieksekusi, tetapi juga memberi tahu tugas untuk menjatuhkan nilai yang dikembalikan atau pengecualian yang dilemparkan. Tidak melewati hasil asinkron lebih cepat daripada lewat, tetapi kemudian kami tidak memiliki cara untuk mengetahui status atau hasil dari tugas yang sedang berlangsung.
post , bulk_post , submit dan bulk_submit menggunakan enqueue di belakang layar untuk mekanisme penjadwalan yang mendasarinya.
thread_pool_executor API Selain dari post , submit , bulk_post dan bulk_submit , thread_pool_executor menyediakan metode tambahan ini.
class thread_pool_executor {
/*
Returns the number of milliseconds each thread-pool worker
remains idle (lacks any task to execute) before exiting.
This constant can be set by passing a runtime_options object
to the constructor of the runtime class.
*/
std::chrono::milliseconds max_worker_idle_time () const noexcept ;
};manual_executor API Selain dari post , submit , bulk_post dan bulk_submit , manual_executor menyediakan metode tambahan ini.
class manual_executor {
/*
Destructor. Equivalent to clear.
*/
~manual_executor () noexcept ;
/*
Returns the number of enqueued tasks at the moment of invocation.
This number can change quickly by the time the application handles it, it should be used as a hint.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
size_t size () const noexcept ;
/*
Queries whether the executor is empty from tasks at the moment of invocation.
This value can change quickly by the time the application handles it, it should be used as a hint.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
bool empty () const noexcept ;
/*
Clears the executor from any enqueued but yet to-be-executed tasks,
and returns the number of cleared tasks.
Tasks enqueued to this executor by (post_)submit method are resumed
and errors::broken_task exception is thrown inside them.
Ongoing tasks that are being executed by loop_once(_XXX) or loop(_XXX) are uneffected.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t clear ();
/*
Tries to execute a single task. If at the moment of invocation the executor
is empty, the method does nothing.
Returns true if a task was executed, false otherwise.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool loop_once ();
/*
Tries to execute a single task.
This method returns when either a task was executed or max_waiting_time
(in milliseconds) has reached.
If max_waiting_time is 0, the method is equivalent to loop_once.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool loop_once_for (std::chrono::milliseconds max_waiting_time);
/*
Tries to execute a single task.
This method returns when either a task was executed or timeout_time has reached.
If timeout_time has already expired, this method is equivalent to loop_once.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
bool loop_once_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Tries to execute max_count enqueued tasks and returns the number of tasks that were executed.
This method does not wait: it returns when the executor
becomes empty from tasks or max_count tasks have been executed.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t loop ( size_t max_count);
/*
Tries to execute max_count tasks.
This method returns when either max_count tasks were executed or a
total amount of max_waiting_time has passed.
If max_waiting_time is 0, the method is equivalent to loop.
Returns the actual amount of tasks that were executed.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t loop_for ( size_t max_count, std::chrono::milliseconds max_waiting_time);
/*
Tries to execute max_count tasks.
This method returns when either max_count tasks were executed or timeout_time has reached.
If timeout_time has already expired, the method is equivalent to loop.
Returns the actual amount of tasks that were executed.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
size_t loop_until ( size_t max_count, std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Waits for at least one task to be available for execution.
This method should be used as a hint,
as other threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
void wait_for_task ();
/*
This method returns when one or more tasks are available for
execution or max_waiting_time has passed.
Returns true if at at least one task is available for execution, false otherwise.
This method should be used as a hint, as other threads (calling loop, for example)
might empty the executor, before this thread has a chance to do something
with the newly enqueued tasks.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool wait_for_task_for (std::chrono::milliseconds max_waiting_time);
/*
This method returns when one or more tasks are available for execution or timeout_time has reached.
Returns true if at at least one task is available for execution, false otherwise.
This method should be used as a hint,
as other threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
bool wait_for_task_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
This method returns when max_count or more tasks are available for execution.
This method should be used as a hint, as other threads
(calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
void wait_for_tasks ( size_t max_count);
/*
This method returns when max_count or more tasks are available for execution
or max_waiting_time (in milliseconds) has passed.
Returns the number of tasks available for execution when the method returns.
This method should be used as a hint, as other
threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t wait_for_tasks_for ( size_t count, std::chrono::milliseconds max_waiting_time);
/*
This method returns when max_count or more tasks are available for execution
or timeout_time is reached.
Returns the number of tasks available for execution when the method returns.
This method should be used as a hint, as other threads
(calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
size_t wait_for_tasks_until ( size_t count, std::chrono::time_point<clock_type, duration_type> timeout_time);
}; Nilai dan pengecualian asinkron dapat dikonsumsi menggunakan objek hasil concurrencpp. Jenis result merupakan hasil asinkron dari tugas yang bersemangat sementara lazy_result mewakili hasil yang ditangguhkan dari tugas malas.
Ketika tugas (bersemangat atau malas) selesai, ia mengembalikan nilai yang valid atau melempar pengecualian. Dalam kedua kasus tersebut, hasil asinkron ini diteruskan ke konsumen dari objek hasil.
Objek result membentuk coroutine asimetris-pelaksanaan penelepon-koroutin tidak dipengaruhi oleh pelaksanaan callee-coroutine, kedua coroutine dapat berjalan secara mandiri. Hanya ketika mengkonsumsi hasil callee-coroutine, penelepon-koroutine mungkin ditangguhkan menunggu callee untuk diselesaikan. Sampai titik itu kedua coroutine berjalan secara mandiri. Callee-coroutine berjalan apakah hasilnya dikonsumsi atau tidak.
lazy_result objek membentuk simetris coroutine-eksekusi callee-coroutine terjadi hanya setelah suspensi penelepon-koroutine. Saat menunggu hasil yang malas, coroutine saat ini ditangguhkan dan tugas malas yang terkait dengan hasil malas mulai berjalan. Setelah callee-coroutine menyelesaikan dan menghasilkan hasilnya, penelepon-koroutine dilanjutkan. Jika hasil malas tidak dikonsumsi, tugas malas yang terkait tidak pernah mulai berjalan.
Semua objek hasil adalah tipe bergerak saja, dan dengan demikian, mereka tidak dapat digunakan setelah konten mereka dipindahkan ke objek hasil lain. Dalam hal ini, objek hasil dianggap kosong dan upaya untuk memanggil metode apa pun selain operator bool dan operator = akan melempar pengecualian.
Setelah hasil asinkron telah ditarik keluar dari objek hasil (misalnya, dengan menelepon get atau operator co_await ), objek hasil menjadi kosong. Kekosongan dapat diuji dengan operator bool .
Menunggu hasil sarana untuk menangguhkan coroutine saat ini sampai objek hasil siap. Jika nilai yang valid dikembalikan dari tugas terkait, itu dikembalikan dari objek hasil. Jika tugas yang terkait melempar pengecualian, itu dilemparkan kembali. Pada saat menunggu, jika hasilnya sudah siap, coroutine saat ini segera dilanjutkan. Kalau tidak, itu dilanjutkan oleh utas yang menetapkan hasil atau pengecualian asinkron.
Menyelesaikan hasilnya mirip dengan menunggu. Perbedaannya adalah bahwa ekspresi co_await akan mengembalikan objek hasil itu sendiri, dalam bentuk yang tidak kosong, dalam keadaan siap. Hasil asinkron kemudian dapat ditarik dengan menggunakan get atau co_await .
Setiap objek hasil memiliki status yang menunjukkan keadaan hasil asinkron. Status hasil bervariasi dari result_status::idle (hasil asinkron atau pengecualian belum diproduksi) ke result_status::value (tugas yang terkait diakhiri dengan anggun dengan mengembalikan nilai yang valid) ke result_status::exception (tugas diakhiri dengan melempar pengecualian). Status dapat ditanyakan dengan menelepon (lazy_)result::status .
result Jenis result merupakan hasil dari tugas yang sedang berlangsung dan asinkron, mirip dengan std::future .
Selain menunggu hasil hasil, mereka juga dapat menunggu dengan menelepon result::wait , result::wait_for , result::wait_until atau result::get . Menunggu hasil untuk menyelesaikan adalah operasi pemblokiran (dalam hal hasil asinkron tidak siap), dan akan menangguhkan seluruh utas eksekusi menunggu hasil asinkron tersedia. Operasi tunggu umumnya berkecil hati dan hanya diizinkan dalam tugas-tugas tingkat root atau dalam konteks yang memungkinkannya, seperti memblokir utas utama menunggu sisa aplikasi untuk menyelesaikan dengan anggun, atau menggunakan concurrencpp::blocking_executor atau concurrencpp::thread_executor .
Menunggu objek hasil dengan menggunakan co_await (dan dengan melakukan itu, mengubah fungsi/tugas saat ini menjadi coroutine juga) adalah cara yang lebih disukai untuk mengonsumsi objek hasil, karena tidak memblokir utas yang mendasarinya.
result API class result {
/*
Creates an empty result that isn't associated with any task.
*/
result () noexcept = default ;
/*
Destroys the result. Associated tasks are not cancelled.
The destructor does not block waiting for the asynchronous result to become ready.
*/
~result () noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result (result&& rhs) noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.
*/
result& operator = (result&& rhs) noexcept = default ;
/*
Returns true if this is a non-empty result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The returned value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Blocks the current thread of execution until this result is ready,
when status() != result_status::idle.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void wait ();
/*
Blocks until this result is ready or duration has passed. Returns the status
of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class duration_unit , class ratio >
result_status wait_for (std::chrono::duration<duration_unit, ratio> duration);
/*
Blocks until this result is ready or timeout_time has reached. Returns the status
of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class clock , class duration >
result_status wait_until (std::chrono::time_point<clock, duration> timeout_time);
/*
Blocks the current thread of execution until this result is ready,
when status() != result_status::idle.
If the result is a valid value, it is returned, otherwise, get rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
type get ();
/*
Returns an awaitable used to await this result.
If the result is already ready - the current coroutine resumes
immediately in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended
and resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to resolve this result.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
};lazy_resultObjek hasil malas mewakili hasil dari tugas malas yang ditangguhkan.
lazy_result memiliki tanggung jawab untuk memulai tugas malas terkait dan meneruskan hasil yang ditangguhkannya kembali kepada konsumennya. Ketika ditunggu atau diselesaikan, hasil malas menangguhkan coroutine saat ini dan memulai tugas malas yang terkait. Ketika tugas yang terkait selesai, nilai asinkronnya diteruskan ke tugas penelepon, yang kemudian dilanjutkan.
Terkadang, API mungkin mengembalikan hasil yang malas, tetapi aplikasi membutuhkan tugas yang terkait untuk berjalan dengan penuh semangat (tanpa menangguhkan tugas penelepon). Dalam hal ini, tugas -tugas malas dapat dikonversi menjadi tugas yang bersemangat dengan menelepon run hasil malas yang terkait. Dalam hal ini, tugas terkait akan mulai berjalan inline, tanpa menangguhkan tugas penelepon. Hasil malas asli dikosongkan dan objek result yang valid yang memantau tugas yang baru dimulai akan dikembalikan sebagai gantinya.
lazy_result class lazy_result {
/*
Creates an empty lazy result that isn't associated with any task.
*/
lazy_result () noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
lazy_result (lazy_result&& rhs) noexcept ;
/*
Destroys the result. If not empty, the destructor destroys the associated task without resuming it.
*/
~lazy_result () noexcept ;
/*
Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.
If *this is not empty, then operator= destroys the associated task without resuming it.
*/
lazy_result& operator =(lazy_result&& rhs) noexcept ;
/*
Returns true if this is a non-empty result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The returned value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Returns an awaitable used to start the associated task and await this result.
If the result is already ready - the current coroutine resumes immediately
in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended and
resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to start the associated task and resolve this result.
If the result is already ready - the current coroutine resumes immediately
in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended and resumed
when the asynchronous result is ready, by the thread which
had set the asynchronous value or exception.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
/*
Runs the associated task inline and returns a result object that monitors the newly started task.
After this call, *this is empty.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
*/
result<type> run ();
};Coroutines yang bersemangat reguler mulai berjalan secara sinkron di utas eksekusi panggilan. Eksekusi mungkin bergeser ke utas eksekusi lain jika coroutine mengalami penjadwalan ulang, misalnya dengan menunggu objek hasil yang tidak sehat di dalamnya. Concurrencpp juga menyediakan coroutines paralel, yang mulai berjalan di dalam pelaksana tertentu, bukan di utas eksekusi yang memohon. Gaya penjadwalan coroutine ini sangat membantu ketika menulis algoritma paralel, algoritma rekursif dan algoritma bersamaan yang menggunakan model fork-join.
Setiap coroutine paralel harus memenuhi prasyarat berikut:
result apa pun / null_result .executor_tag sebagai argumen pertama.type* / type& / std::shared_ptr<type> , di mana type adalah kelas executor konkret sebagai argumen kedua.co_await atau co_return di tubuhnya. Jika semua hal di atas berlaku, fungsinya adalah coroutine paralel: Concurrencpp akan memulai coroutine ditangguhkan dan segera menjadwal ulang untuk berjalan di pelaksana yang disediakan. concurrencpp::executor_tag adalah placeholder dummy untuk memberi tahu runtime Concurrencpp bahwa fungsi ini bukan fungsi reguler, ia perlu mulai berjalan di dalam pelaksana yang diberikan. Jika pelaksana lulus ke coroutine paralel adalah nol, coroutine tidak akan mulai berjalan dan pengecualian std::invalid_argument akan dilemparkan secara serempak. Jika semua prasyarat terpenuhi, aplikasi dapat mengkonsumsi hasil coroutine paralel dengan menggunakan objek hasil yang dikembalikan.
Dalam contoh ini, kami menghitung anggota ke-30 dari urutan Fibonacci secara paralel. Kami mulai meluncurkan setiap langkah Fibonacci dalam coroutine paralelnya sendiri. Argumen pertama adalah dummy executor_tag dan argumen kedua adalah eksekutor ThreadPool. Setiap langkah rekursif memunculkan coroutine paralel baru yang berjalan secara paralel. Setiap hasil adalah co_return ed ke tugas induknya dan diperoleh dengan menggunakan co_await .
Ketika kami menganggap input cukup kecil untuk dihitung secara serempak (ketika curr <= 10 ), kami berhenti mengeksekusi setiap langkah rekursif dalam tugasnya sendiri dan hanya menyelesaikan algoritma secara serempak.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace concurrencpp ;
int fibonacci_sync ( int i) {
if (i == 0 ) {
return 0 ;
}
if (i == 1 ) {
return 1 ;
}
return fibonacci_sync (i - 1 ) + fibonacci_sync (i - 2 );
}
result< int > fibonacci (executor_tag, std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10 ) {
co_return fibonacci_sync (curr);
}
auto fib_1 = fibonacci ({}, tpe, curr - 1 );
auto fib_2 = fibonacci ({}, tpe, curr - 2 );
co_return co_await fib_1 + co_await fib_2;
}
int main () {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci ({}, runtime. thread_pool_executor (), 30 ). get ();
std::cout << " fibonacci(30) = " << fibb_30 << std::endl;
return 0 ;
} Untuk membandingkan, ini adalah bagaimana kode yang sama ditulis tanpa menggunakan coroutine paralel, dan mengandalkan executor::submit sendiri. Karena fibonacci mengembalikan result<int> , mengirimkannya secara rekursif melalui executor::submit akan menghasilkan result<result<int>> .
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace concurrencpp ;
int fibonacci_sync ( int i) {
if (i == 0 ) {
return 0 ;
}
if (i == 1 ) {
return 1 ;
}
return fibonacci_sync (i - 1 ) + fibonacci_sync (i - 2 );
}
result< int > fibonacci (std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10 ) {
co_return fibonacci_sync (curr);
}
auto fib_1 = tpe-> submit (fibonacci, tpe, curr - 1 );
auto fib_2 = tpe-> submit (fibonacci, tpe, curr - 2 );
co_return co_await co_await fib_1 +
co_await co_await fib_2;
}
int main () {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci (runtime. thread_pool_executor (), 30 ). get ();
std::cout << " fibonacci(30) = " << fibb_30 << std::endl;
return 0 ;
} Objek Hasil adalah cara utama untuk meneruskan data antara tugas -tugas di Concurrencpp dan kami telah melihat bagaimana pelaksana dan coroutine menghasilkan objek tersebut. Terkadang kami ingin menggunakan kemampuan objek hasil dengan non-tugas, misalnya saat menggunakan pustaka pihak ketiga. Dalam hal ini, kita dapat menyelesaikan objek hasil dengan menggunakan result_promise . result_promise menyerupai Objek std::promise - Aplikasi dapat secara manual mengatur hasil atau pengecualian asinkron dan membuat objek result yang terkait menjadi siap.
Sama seperti objek hasil, hasil hasil adalah jenis langkah saja yang menjadi kosong setelah bergerak. Demikian pula, setelah menetapkan hasil atau pengecualian, janji hasilnya menjadi kosong juga. Jika prajurit hasil keluar dari ruang lingkup dan tidak ada hasil/pengecualian yang telah ditetapkan, penghancur promisan hasil mengatur concurrencpp::errors::broken_task pengecualian menggunakan metode set_exception . Tugas yang ditangguhkan dan diblokir menunggu objek hasil terkait dilanjutkan/tidak diblokir.
Janji hasil dapat mengubah gaya callback kode menjadi gaya kode async/await : setiap kali komponen memerlukan panggilan balik untuk melewati hasil asinkron, kita dapat melewati panggilan balik yang memanggil set_result atau set_exception (tergantung pada hasil asinkron itu sendiri) pada janji hasil yang diteruskan, dan mengembalikan hasil yang terkait.
result_promise template < class type >
class result_promise {
/*
Constructs a valid result_promise.
Might throw std::bad_alloc if fails to allocate memory.
*/
result_promise ();
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result_promise (result_promise&& rhs) noexcept ;
/*
Destroys *this, possibly setting an errors::broken_task exception
by calling set_exception if *this is not empty at the time of destruction.
*/
~result_promise () noexcept ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result_promise& operator = (result_promise&& rhs) noexcept ;
/*
Returns true if this is a non-empty result-promise.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Sets a value by constructing <<type>> from arguments... in-place.
Makes the associated result object become ready - tasks waiting for it
to become ready are unblocked.
Suspended tasks are resumed inline.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Might throw any exception that the constructor
of type(std::forward<argument_types>(arguments)...) throws.
*/
template < class ... argument_types>
void set_result (argument_types&& ... arguments);
/*
Sets an exception.
Makes the associated result object become ready - tasks waiting for it
to become ready are unblocked.
Suspended tasks are resumed inline.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Throws std::invalid_argument exception if exception_ptr is null.
*/
void set_exception (std::exception_ptr exception_ptr);
/*
A convenience method that invokes a callable with arguments... and calls set_result
with the result of the invocation.
If an exception is thrown, the thrown exception is caught and set instead by calling set_exception.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Might throw any exception that callable(std::forward<argument_types>(arguments)...)
or the contructor of type(type&&) throw.
*/
template < class callable_type , class ... argument_types>
void set_from_function (callable_type&& callable, argument_types&& ... arguments);
/*
Gets the associated result object.
Throws errors::empty_result_promise exception If *this is empty.
Throws errors::result_already_retrieved exception if this method had been called before.
*/
result<type> get_result ();
};result_promise : Dalam contoh ini, result_promise digunakan untuk mendorong data dari satu utas, dan dapat ditarik dari objek result yang terkait dari utas lain.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::result_promise<std::string> promise;
auto result = promise. get_result ();
std::thread my_3_party_executor ([promise = std::move (promise)] () mutable {
std::this_thread::sleep_for ( std::chrono::seconds ( 1 )); // Imitate real work
promise. set_result ( " hello world " );
});
auto asynchronous_string = result. get ();
std::cout << " result promise returned string: " << asynchronous_string << std::endl;
my_3_party_executor. join ();
} Dalam contoh ini, kami menggunakan std::thread sebagai pelaksana pihak ketiga. Ini mewakili skenario ketika pelaksana non-Koncurrencpp digunakan sebagai bagian dari siklus hidup aplikasi. Kami mengekstrak objek hasil sebelum kami memberikan janji dan memblokir utas utama sampai hasilnya siap. Di my_3_party_executor , kami menetapkan hasil seolah -olah kami co_return ed.
Hasil yang dibagikan adalah jenis khusus dari hasil hasil yang memungkinkan banyak konsumen untuk mengakses hasil asinkron, mirip dengan std::shared_future . Konsumen yang berbeda dari utas yang berbeda dapat memanggil fungsi seperti await , get dan resolve dengan cara yang aman.
Hasil bersama dibangun dari objek hasil reguler dan tidak seperti objek hasil biasa, keduanya dapat disalin dan bergerak. Dengan demikian, shared_result berperilaku seperti std::shared_ptr tipe. Jika instance hasil bersama dipindahkan ke instance lain, instance menjadi kosong, dan mencoba mengaksesnya akan melempar pengecualian.
Untuk mendukung banyak konsumen, hasil bersama mengembalikan referensi ke nilai asinkron alih -alih memindahkannya (seperti hasil reguler). Misalnya, shared_result<int> mengembalikan int& when get , await dll. Dipanggil. Jika jenis yang mendasari dari shared_result adalah void atau jenis referensi (seperti int& ), mereka dikembalikan seperti biasa. Jika hasil asinkron adalah pengecualian, itu dilemparkan kembali.
Perhatikan bahwa saat memperoleh hasil asinkron menggunakan shared_result dari beberapa utas aman-aman, nilai aktual mungkin tidak aman utas. Misalnya, beberapa utas dapat memperoleh integer asinkron dengan menerima referensi ( int& ). Itu tidak membuat integer itu sendiri aman. Tidak apa -apa untuk bermutasi nilai asinkron jika nilai asinkron sudah aman. Atau, aplikasi didorong untuk menggunakan tipe const untuk memulai (seperti const int ), dan memperoleh referensi konstan (seperti const int& ) yang mencegah mutasi.
shared_result class share_result {
/*
Creates an empty shared-result that isn't associated with any task.
*/
shared_result () noexcept = default ;
/*
Destroys the shared-result. Associated tasks are not cancelled.
The destructor does not block waiting for the asynchronous result to become ready.
*/
~shared_result () noexcept = default ;
/*
Converts a regular result object to a shared-result object.
After this call, rhs is empty.
Might throw std::bad_alloc if fails to allocate memory.
*/
shared_result (result<type> rhs);
/*
Copy constructor. Creates a copy of the shared result object that monitors the same task.
*/
shared_result ( const shared_result&) noexcept = default ;
/*
Move constructor. Moves rhs to *this. After this call, rhs is empty.
*/
shared_result (shared_result&& rhs) noexcept = default ;
/*
Copy assignment operator. Copies rhs to *this and monitors the same task that rhs monitors.
*/
shared_result& operator =( const shared_result& rhs) noexcept ;
/*
Move assignment operator. Moves rhs to *this. After this call, rhs is empty.
*/
shared_result& operator =(shared_result&& rhs) noexcept ;
/*
Returns true if this is a non-empty shared-result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The return value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Blocks the current thread of execution until this shared-result is ready,
when status() != result_status::idle.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void wait ();
/*
Blocks until this shared-result is ready or duration has passed.
Returns the status of this shared-result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class duration_type , class ratio_type >
result_status wait_for (std::chrono::duration<duration_type, ratio_type> duration);
/*
Blocks until this shared-result is ready or timeout_time has reached.
Returns the status of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class clock_type , class duration_type >
result_status wait_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Blocks the current thread of execution until this shared-result is ready,
when status() != result_status::idle.
If the result is a valid value, a reference to it is returned,
otherwise, get rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
std:: add_lvalue_reference_t <type> get ();
/*
Returns an awaitable used to await this shared-result.
If the shared-result is already ready - the current coroutine resumes
immediately in the calling thread of execution.
If the shared-result is not ready yet, the current coroutine is
suspended and resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, a reference to it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to resolve this shared-result.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
};shared_result : Dalam contoh ini, objek result dikonversi ke objek shared_result dan referensi ke hasil int asinkron diperoleh oleh banyak tugas yang dikeluarkan dengan thread_executor .
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < chrono >
concurrencpp::result< void > consume_shared_result (concurrencpp::shared_result< int > shared_result,
std::shared_ptr<concurrencpp::executor> resume_executor) {
std::cout << " Awaiting shared_result to have a value " << std::endl;
const auto & async_value = co_await shared_result;
concurrencpp::resume_on (resume_executor);
std::cout << " In thread id " << std::this_thread::get_id () << " , got: " << async_value << " , memory address: " << &async_value << std::endl;
}
int main () {
concurrencpp::runtime runtime;
auto result = runtime. background_executor ()-> submit ([] {
std::this_thread::sleep_for ( std::chrono::seconds ( 1 ));
return 100 ;
});
concurrencpp::shared_result< int > shared_result ( std::move (result));
concurrencpp::result< void > results[ 8 ];
for ( size_t i = 0 ; i < 8 ; i++) {
results[i] = consume_shared_result (shared_result, runtime. thread_pool_executor ());
}
std::cout << " Main thread waiting for all consumers to finish " << std::endl;
auto tpe = runtime. thread_pool_executor ();
auto all_consumed = concurrencpp::when_all (tpe, std::begin (results), std::end (results)). run ();
all_consumed. get ();
std::cout << " All consumers are done, exiting " << std::endl;
return 0 ;
} Ketika objek runtime keluar dari ruang lingkup main , ia mengulangi setiap pelaksana yang disimpan dan memanggil metode shutdown . Mencoba mengakses timer-queue atau pelaksana mana pun akan melempar errors::runtime_shutdown pengecualian. Ketika seorang pelaksana dimatikan, itu membersihkan antrian tugas batinnya, menghancurkan objek task yang tidak dieksekusi. Jika objek tugas menyimpan concurrencpp-coroutine, coroutine itu dilanjutkan secara inline dan errors::broken_task Exception dilemparkan ke dalamnya. Dalam hal apa pun di mana runtime_shutdown atau pengecualian broken_task dilemparkan, aplikasi harus mengakhiri aliran kode mereka saat ini dengan anggun sesegera mungkin. Pengecualian itu tidak boleh diabaikan. Baik runtime_shutdown dan broken_task mewarisi dari errors::interrupted_task , dan tipe ini juga dapat digunakan dalam klausa catch untuk menangani penghentian dengan cara yang terpadu.
Banyak tindakan asinkron concurrencpp membutuhkan contoh pelaksana sebagai pelaksana resume mereka. Ketika tindakan asinkron (diimplementasikan sebagai coroutine) dapat menyelesaikan secara serempak, ia segera dilanjutkan di utas eksekusi panggilan. Jika aksi asinkron tidak dapat diselesaikan secara sinkron, itu akan dilanjutkan ketika selesai, di dalam pelaksana resume yang diberikan. Misalnya, Fungsi Utilitas when_any membutuhkan instance resume-executor sebagai argumen pertama. when_any Returns a lazy_result yang menjadi siap ketika setidaknya satu hasil yang diberikan menjadi siap. Jika salah satu hasilnya sudah siap pada saat menelepon when_any , coroutine panggilan dilanjutkan secara serempak di utas eksekusi panggilan. Jika tidak, coroutine panggilan akan dilanjutkan ketika setidaknya hasilnya selesai, di dalam resume-executor yang diberikan. Pelaksana resume penting karena mereka mengamanatkan di mana coroutine dilanjutkan dalam kasus -kasus di mana tidak jelas di mana coroutine seharusnya dilanjutkan (misalnya, dalam kasus when_any dan when_all ), atau dalam kasus di mana tindakan asinkron diproses di dalam salah satu pekerja concurrencpp, yang hanya digunakan untuk memproses tindakan spesifik, dan bukan kode aplikasi.
make_ready_result make_ready_result membuat objek hasil siap dari argumen yang diberikan. Menunggu hasil seperti itu akan menyebabkan coroutine saat ini segera dilanjutkan. get dan operator co_await akan mengembalikan nilai yang dibangun.
/*
Creates a ready result object by building <<type>> from arguments&&... in-place.
Might throw any exception that the constructor
of type(std::forward<argument_types>(arguments)...) throws.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type , class ... argument_types>
result<type> make_ready_result (argument_types&& ... arguments);
/*
An overload for void type.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
result< void > make_ready_result ();make_exceptional_result make_exceptional_result membuat objek hasil siap dari pengecualian yang diberikan. Menunggu hasil seperti itu akan menyebabkan coroutine saat ini segera dilanjutkan. get dan operator co_await akan melemparkan kembali pengecualian yang diberikan.
/*
Creates a ready result object from an exception pointer.
The returned result object will re-throw exception_ptr when calling get or await.
Throws std::invalid_argument if exception_ptr is null.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type >
result<type> make_exceptional_result (std::exception_ptr exception_ptr);
/*
Overload. Similar to make_exceptional_result(std::exception_ptr),
but gets an exception object directly.
Might throw any exception that the constructor of exception_type(std::move(exception)) might throw.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type , class exception_type >
result<type> make_exceptional_result (exception_type exception );when_all when_all adalah fungsi utilitas yang membuat objek hasil malas yang menjadi siap ketika semua hasil input selesai. Menunggu hasil malas ini mengembalikan semua objek result input dalam keadaan siap, siap untuk dikonsumsi.
Fungsi when_all hadir dengan tiga rasa - satu yang menerima berbagai objek hasil yang heterogen, yang lain yang mendapatkan sepasang iterator ke berbagai objek hasil dari jenis yang sama, dan terakhir kelebihan beban yang tidak menerima objek hasil sama sekali. Dalam hal tidak ada objek hasil input - fungsi mengembalikan objek hasil siap dari tuple kosong.
Jika salah satu objek hasil yang dilewati kosong, pengecualian akan dilemparkan. Dalam hal ini, objek penghasilan input tidak terpengaruh oleh fungsi dan dapat digunakan lagi setelah pengecualian ditangani. Jika semua objek hasil input valid, mereka dikosongkan oleh fungsi ini, dan dikembalikan dalam keadaan yang valid dan siap sebagai hasil output.
Saat ini, when_all hanya menerima objek result .
Semua kelebihan menerima pelaksana resume sebagai parameter pertama mereka. Saat menunggu hasil yang dikembalikan oleh when_all , penelepon coroutine akan dilanjutkan oleh pelaksana resume yang diberikan.
/*
Creates a result object that becomes ready when all the input results become ready.
Passed result objects are emptied and returned as a tuple.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class ... result_types>
lazy_result<std::tuple< typename std::decay<result_types>::type...>>
when_all (std::shared_ptr<executor_type> resume_executor,
result_types&& ... results);
/*
Overload. Similar to when_all(result_types&& ...) but receives a pair of iterators referencing a range.
Passed result objects are emptied and returned as a vector.
If begin == end, the function returns immediately with an empty vector.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class iterator_type >
lazy_result<std::vector< typename std::iterator_traits<iterator_type>::value_type>>
when_all (std::shared_ptr<executor_type> resume_executor,
iterator_type begin, iterator_type end);
/*
Overload. Returns a ready result object that doesn't monitor any asynchronous result.
Might throw an std::bad_alloc exception if no memory is available.
*/
lazy_result<std::tuple<>> when_all (std::shared_ptr<executor_type> resume_executor);when_any Function when_any adalah fungsi utilitas yang membuat objek hasil malas yang menjadi siap ketika setidaknya satu hasil input selesai. Menunggu hasil ini akan mengembalikan struktur helper yang berisi semua objek penghasilan input ditambah indeks tugas yang diselesaikan. Bisa jadi pada saat mengkonsumsi hasil yang siap, hasil lain mungkin sudah selesai secara tidak sinkron. Aplikasi dapat menghubungi when_any berulang kali untuk mengkonsumsi hasil yang siap seiring selesai sampai semua hasil dikonsumsi.
when_any Function hadir dengan hanya dua rasa - satu yang menerima berbagai objek hasil yang heterogen dan lainnya yang mendapatkan sepasang iterator ke berbagai hasil -objek dari jenis yang sama. Berbeda dengan when_all , tidak ada arti menunggu setidaknya satu tugas untuk menyelesaikan ketika kisaran hasil benar -benar kosong. Oleh karena itu, tidak ada kelebihan tanpa argumen. Juga, kelebihan dua iterator akan melempar pengecualian jika iterator tersebut merujuk jangkauan kosong (saat begin == end ).
Jika salah satu objek hasil yang dilewati kosong, pengecualian akan dilemparkan. Dalam kasus apa pun pengecualian dilemparkan, objek penghasilan input tidak terpengaruh oleh fungsi dan dapat digunakan lagi setelah pengecualian ditangani. Jika semua objek hasil input valid, mereka dikosongkan oleh fungsi ini, dan dikembalikan dalam keadaan yang valid sebagai hasil output.
Saat ini, when_any hanya menerima objek result .
Semua kelebihan menerima pelaksana resume sebagai parameter pertama mereka. Saat menunggu hasil yang dikembalikan oleh when_any , penelepon coroutine akan dilanjutkan oleh pelaksana resume yang diberikan.
/*
Helper struct returned from when_any.
index is the position of the ready result in results sequence.
results is either an std::tuple or an std::vector of the results that were passed to when_any.
*/
template < class sequence_type >
struct when_any_result {
std:: size_t index;
sequence_type results;
};
/*
Creates a result object that becomes ready when at least one of the input results is ready.
Passed result objects are emptied and returned as a tuple.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class ... result_types>
lazy_result<when_any_result<std::tuple<result_types...>>>
when_any (std::shared_ptr<executor_type> resume_executor,
result_types&& ... results);
/*
Overload. Similar to when_any(result_types&& ...) but receives a pair of iterators referencing a range.
Passed result objects are emptied and returned as a vector.
Throws std::invalid_argument if begin == end.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class iterator_type >
lazy_result<when_any_result<std::vector< typename std::iterator_traits<iterator_type>::value_type>>>
when_any (std::shared_ptr<executor_type> resume_executor,
iterator_type begin, iterator_type end);resume_on resume_on mengembalikan yang menunggu yang menangguhkan coroutine saat ini dan melanjutkannya di dalam executor yang diberikan. Ini adalah fungsi penting yang memastikan coroutine berjalan di pelaksana yang tepat. Misalnya, aplikasi mungkin menjadwalkan tugas latar belakang menggunakan background_executor dan menunggu objek hasil yang dikembalikan. Dalam hal ini, coroutine yang menunggu akan dilanjutkan di dalam pelaksana latar belakang. Panggilan untuk resume_on dengan pelaksana yang terikat CPU memastikan bahwa baris kode yang terikat CPU tidak akan berjalan pada pelaksana latar belakang setelah tugas latar belakang selesai. Jika suatu tugas dijadwalkan kembali untuk menjalankan eksekutor lain menggunakan resume_on , tetapi eksekutor itu ditutup sebelum dapat melanjutkan tugas yang ditangguhkan, tugas itu dilanjutkan segera dan pengecualian erros::broken_task dilemparkan. Dalam hal ini, aplikasi perlu dengan sangat anggun.
/*
Returns an awaitable that suspends the current coroutine and resumes it inside executor.
Might throw any exception that executor_type::enqueue throws.
*/
template < class executor_type >
auto resume_on (std::shared_ptr<executor_type> executor);Concurrencpp juga menyediakan timer dan antrian pengatur waktu. Pengatur waktu adalah objek yang mendefinisikan tindakan asinkron yang berjalan pada pelaksana dalam interval waktu yang terdefinisi dengan baik. Ada tiga jenis timer - pengatur waktu reguler , pengatur waktu dan objek tunda .
Pengatur waktu reguler memiliki empat properti yang mendefinisikannya:
Seperti benda -benda lain di Concurrencpp, timer adalah jenis langkah saja yang bisa kosong. Ketika timer dirusak atau timer::cancel dipanggil, timer membatalkan tugas yang dijadwalkan tetapi belum dieksekusi. Tugas yang sedang berlangsung tidak terpengaruh. Pengatur waktu yang dapat dipanggil harus aman. Dianjurkan untuk mengatur waktunya dan frekuensi pengatur waktu menjadi granularitas 50 milidetik.
Antrian timer adalah pekerja Concurrencpp yang mengelola kumpulan pengatur waktu dan memprosesnya hanya dalam satu utas eksekusi. Ini juga agen yang digunakan untuk membuat timer baru. Ketika tenggat waktu timer (apakah itu waktu atau frekuensi penghitung waktu) telah mencapai, antrian timer "menembakkan" timer dengan menjadwalkan panggilannya untuk dijalankan pada pelaksana terkait sebagai tugas.
Sama seperti eksekutor, antrian pengatur waktu juga mematuhi konsep RAII. Ketika objek runtime keluar dari cakupan, itu menutup antrian timer, membatalkan semua pengatur waktu yang tertunda. Setelah antrian pengatur waktu ditutup, panggilan berikutnya ke make_timer , make_onshot_timer dan make_delay_object akan melempar errors::runtime_shutdown pengecualian. Aplikasi tidak boleh mencoba mematikan antrian pengatur waktu sendiri.
timer_queue API: class timer_queue {
/*
Destroys this timer_queue.
*/
~timer_queue () noexcept ;
/*
Shuts down this timer_queue:
Tells the underlying thread of execution to quit and joins it.
Cancels all pending timers.
After this call, invocation of any method besides shutdown
and shutdown_requested will throw an errors::runtime_shutdown.
If shutdown had been called before, this method has no effect.
*/
void shutdown () noexcept ;
/*
Returns true if shutdown had been called before, false otherwise.
*/
bool shutdown_requested () const noexcept ;
/*
Creates a new running timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
template < class callable_type , class ... argumet_types>
timer make_timer (
std::chrono::milliseconds due_time,
std::chrono::milliseconds frequency,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new one-shot timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
template < class callable_type , class ... argumet_types>
timer make_one_shot_timer (
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new delay object where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
result< void > make_delay_object (
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor);
};timer : class timer {
/*
Creates an empty timer.
*/
timer () noexcept = default ;
/*
Cancels the timer, if not empty.
*/
~timer () noexcept ;
/*
Moves the content of rhs to *this.
rhs is empty after this call.
*/
timer (timer&& rhs) noexcept = default ;
/*
Moves the content of rhs to *this.
rhs is empty after this call.
Returns *this.
*/
timer& operator = (timer&& rhs) noexcept ;
/*
Cancels this timer.
After this call, the associated timer_queue will not schedule *this
to run again and *this becomes empty.
Scheduled, but not yet executed tasks are cancelled.
Ongoing tasks are uneffected.
This method has no effect if *this is empty or the associated timer_queue has already expired.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void cancel ();
/*
Returns the associated executor of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::shared_ptr<executor> get_executor () const ;
/*
Returns the associated timer_queue of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::weak_ptr<timer_queue> get_timer_queue () const ;
/*
Returns the due time of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::chrono::milliseconds get_due_time () const ;
/*
Returns the frequency of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::chrono::milliseconds get_frequency () const ;
/*
Sets new frequency for this timer.
Callables already scheduled to run at the time of invocation are not affected.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
void set_frequency (std::chrono::milliseconds new_frequency);
/*
Returns true is *this is not an empty timer, false otherwise.
The timer should not be used if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
};Dalam contoh ini kami membuat timer reguler dengan menggunakan antrian timer. Pengatur waktu menjadwalkan yang dapat dipanggil untuk berjalan setelah 1,5 detik, kemudian menembakkannya setiap 2 detik. Berjalan Callable yang diberikan pada Eksekutor ThreadPool.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
int main () {
concurrencpp::runtime runtime;
std:: atomic_size_t counter = 1 ;
concurrencpp::timer timer = runtime. timer_queue ()-> make_timer (
1500ms,
2000ms,
runtime. thread_pool_executor (),
[&] {
const auto c = counter. fetch_add ( 1 );
std::cout << " timer was invoked for the " << c << " th time " << std::endl;
});
std::this_thread::sleep_for (12s);
return 0 ;
}Pengatur waktu Oneshot adalah pengatur waktu satu kali dengan hanya waktu - setelah menjadwalkannya untuk dijalankan setelah tidak pernah menjadwalkannya lagi untuk berjalan lagi.
Dalam contoh ini, kami membuat timer yang berjalan hanya sekali - setelah 3 detik dari kreasinya, timer akan menjadwalkan callable untuk dijalankan pada utas eksekusi baru (menggunakan thread_executor ).
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
int main () {
concurrencpp::runtime runtime;
concurrencpp::timer timer = runtime. timer_queue ()-> make_one_shot_timer (
3000ms,
runtime. thread_executor (),
[&] {
std::cout << " hello and goodbye " << std::endl;
});
std::this_thread::sleep_for (4s);
return 0 ;
} Objek penundaan adalah objek hasil yang malas yang siap ketika itu adalah co_await ed dan waktunya tercapai. Aplikasi dapat co_await hasil ini untuk menunda coroutine saat ini dengan cara yang tidak blokir. Coroutine saat ini dilanjutkan oleh pelaksana yang diteruskan ke make_delay_object .
Dalam contoh ini, kami memunculkan tugas (yang tidak mengembalikan hasil atau pengecualian), yang menunda dirinya dalam satu lingkaran dengan memanggil co_await pada objek penundaan.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
concurrencpp::null_result delayed_task (
std::shared_ptr<concurrencpp::timer_queue> tq,
std::shared_ptr<concurrencpp::thread_pool_executor> ex) {
size_t counter = 1 ;
while ( true ) {
std::cout << " task was invoked " << counter << " times. " << std::endl;
counter++;
co_await tq-> make_delay_object (1500ms, ex);
}
}
int main () {
concurrencpp::runtime runtime;
delayed_task (runtime. timer_queue (), runtime. thread_pool_executor ());
std::this_thread::sleep_for (10s);
return 0 ;
} Generator adalah coroutine yang malas dan sinkron yang mampu menghasilkan aliran nilai untuk dikonsumsi. Generator menggunakan kata kunci co_yield untuk menghasilkan nilai kembali ke konsumen mereka.
Generator dimaksudkan untuk digunakan secara sinkron - mereka hanya dapat menggunakan kata kunci co_yield dan tidak boleh menggunakan kata kunci co_await . Generator akan terus menghasilkan nilai selama kata kunci co_yield dipanggil. Jika kata kunci co_return dipanggil (secara eksplisit atau implisit), maka generator akan berhenti menghasilkan nilai. Demikian pula, jika pengecualian dilemparkan maka generator akan berhenti menghasilkan nilai dan pengecualian yang dilemparkan akan dilemparkan kembali ke konsumen generator.
Generator dimaksudkan untuk digunakan dalam loop range-for : generator secara implisit menghasilkan dua iterator - begin dan end yang mengontrol pelaksanaan loop for . Iterator ini tidak boleh ditangani atau diakses secara manual.
Ketika generator dibuat, itu dimulai sebagai tugas yang malas. Ketika metode begin dipanggil, generator dilanjutkan untuk pertama kalinya dan iterator dikembalikan. Tugas malas dilanjutkan berulang kali dengan menelepon operator++ pada iterator yang dikembalikan. Iterator yang dikembalikan akan sama dengan end iterator ketika generator menyelesaikan eksekusi baik dengan keluar dengan anggun atau melemparkan pengecualian. Seperti yang disebutkan sebelumnya, ini terjadi di belakang layar oleh mekanisme batin loop dan generator, dan tidak boleh dipanggil secara langsung.
Seperti benda-benda lain di Concurrencpp, generator adalah tipe gerakan saja. Setelah generator dipindahkan, itu dianggap kosong dan mencoba mengakses metode dalamnya (selain operator bool ) akan melempar pengecualian. Kekosongan generator tidak boleh terjadi pada umumnya - disarankan untuk mengkonsumsi generator pada penciptaan mereka dalam lingkaran for loop dan tidak mencoba memanggil metode mereka secara individual.
generator class generator {
/*
Move constructor. After this call, rhs is empty.
*/
generator (generator&& rhs) noexcept ;
/*
Destructor. Invalidates existing iterators.
*/
~generator () noexcept ;
generator ( const generator& rhs) = delete ;
generator& operator =(generator&& rhs) = delete ;
generator& operator =( const generator& rhs) = delete ;
/*
Returns true if this generator is not empty.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Starts running this generator and returns an iterator.
Throws errors::empty_generator if *this is empty.
Re-throws any exception that is thrown inside the generator code.
*/
iterator begin ();
/*
Returns an end iterator.
*/
static generator_end_iterator end () noexcept ;
};
class generator_iterator {
using value_type = std:: remove_reference_t <type>;
using reference = value_type&;
using pointer = value_type*;
using iterator_category = std::input_iterator_tag;
using difference_type = std:: ptrdiff_t ;
/*
Resumes the suspended generator and returns *this.
Re-throws any exception that was thrown inside the generator code.
*/
generator_iterator& operator ++();
/*
Post-increment version of operator++.
*/
void operator ++( int );
/*
Returns the latest value produced by the associated generator.
*/
reference operator *() const noexcept ;
/*
Returns a pointer to the latest value produced by the associated generator.
*/
pointer operator ->() const noexcept ;
/*
Comparision operators.
*/
friend bool operator ==( const generator_iterator& it0, const generator_iterator& it1) noexcept ;
friend bool operator ==( const generator_iterator& it, generator_end_iterator) noexcept ;
friend bool operator ==(generator_end_iterator end_it, const generator_iterator& it) noexcept ;
friend bool operator !=( const generator_iterator& it, generator_end_iterator end_it) noexcept ;
friend bool operator !=(generator_end_iterator end_it, const generator_iterator& it) noexcept ;
};generator : Dalam contoh ini, kami akan menulis generator yang menghasilkan anggota n-th dari urutan S(n) = 1 + 2 + 3 + ... + n di mana n <= 100 :
concurrencpp::generator< int > sequence () {
int i = 1 ;
int sum = 0 ;
while (i <= 100 ) {
sum += i;
++i;
co_yield sum;
}
}
int main () {
for ( auto value : sequence ()) {
std::cout << value << std::end;
}
return 0 ;
} Kunci sinkron biasa tidak dapat digunakan dengan aman di dalam tugas karena sejumlah alasan:
std::mutex , diharapkan akan dikunci dan dibuka di utas eksekusi yang sama. Membuka kunci kunci sinkron di utas yang belum terkunci itu adalah perilaku yang tidak ditentukan. Karena tugas dapat ditangguhkan dan dilanjutkan dalam utas eksekusi apa pun, kunci sinkron akan pecah ketika digunakan di dalam tugas. concurrencpp::async_lock memecahkan masalah-masalah itu dengan memberikan API yang serupa dengan std::mutex , dengan perbedaan utama yang panggilan ke concurrencpp::async_lock akan mengembalikan hasil malas yang dapat co_awaited dengan aman di dalam tugas. Jika satu tugas mencoba untuk mengunci kunci async dan gagal, tugas akan ditangguhkan, dan akan dilanjutkan ketika kunci dibuka dan diperoleh dengan tugas yang ditangguhkan. Hal ini memungkinkan para pelaksana untuk memproses sejumlah besar tugas yang menunggu untuk mendapatkan kunci tanpa switching konteks yang mahal dan panggilan kernel mahal.
Mirip dengan cara kerja std::mutex , hanya satu tugas yang dapat memperoleh async_lock pada waktu tertentu, dan penghalang baca adalah tempat pada saat memperoleh. Melepaskan kunci async menempatkan penghalang tulis dan memungkinkan tugas berikutnya untuk mendapatkannya, membuat rantai satu modifier pada waktu yang melihat perubahan yang telah dilakukan pengubah lain dan memposting modifikasi untuk pengubah berikutnya untuk dilihat.
Seperti std::mutex , concurrencpp::async_lock tidak rekursif . Perhatian ekstra harus diberikan ketika memperoleh kunci tersebut - kunci tidak boleh diperoleh lagi dalam tugas yang telah dilahirkan oleh tugas lain yang telah memperoleh kunci. Dalam kasus seperti itu, kunci mati yang tidak dapat dihindari akan terjadi. Tidak seperti objek lain di Concurrencpp, async_lock tidak dapat dicopong atau bergerak.
Seperti kunci standar, concurrencpp::async_lock dimaksudkan untuk digunakan dengan pembungkus yang dilingkupi yang memanfaatkan idiom C ++ RAII untuk memastikan kunci selalu dibuka pada fungsi pengembalian atau pengecualian yang dilemparkan. async_lock::lock mengembalikan hasil malas dari pembungkus yang disebut async_lock::unlock kehancuran. Penggunaan mentah async_lock::unlock tidak dianjurkan. concurrencpp::scoped_async_lock bertindak sebagai pembungkus yang dilingkup dan menyediakan API yang hampir identik dengan std::unique_lock . concurrencpp::scoped_async_lock dapat dipindahkan, tetapi tidak dapat ditembus.
async_lock::lock dan scoped_async_lock::lock memerlukan resume-executor sebagai parameter mereka. Setelah memanggil metode -metode itu, jika kunci tersedia untuk dikunci, maka itu terkunci dan tugas saat ini segera dilanjutkan. Jika tidak, maka tugas saat ini ditangguhkan, dan akan dilanjutkan di dalam pelaksana resume yang diberikan ketika kunci akhirnya diperoleh.
concurrencpp::scoped_async_lock membungkus async_lock dan memastikannya dibuka dengan benar. Seperti std::unique_lock , ada kasus yang tidak membungkus kunci apa pun, dan dalam hal ini dianggap kosong. Sebuah scoped_async_lock yang kosong dapat terjadi ketika dibangun secara default, dipindahkan, atau scoped_async_lock::release dipanggil. Lock-in-lock yang kosong tidak akan membuka kunci apa pun pada kehancuran.
Bahkan jika lock-async-lock tidak kosong, itu tidak berarti bahwa ia memiliki kunci async yang mendasarinya dan itu akan membukanya saat penghancuran. Kunci scoped-async yang tidak kosong dan tidak dimiliki dapat terjadi jika scoped_async_lock::unlock dipanggil atau lock scoped-async dibangun menggunakan scoped_async_lock(async_lock&, std::defer_lock_t) konstruktor.
async_lock API class async_lock {
/*
Constructs an async lock object.
*/
async_lock () noexcept ;
/*
Destructs an async lock object.
*this is not automatically unlocked at the moment of destruction.
*/
~async_lock () noexcept ;
/*
Asynchronously acquires the async lock.
If *this has already been locked by another non-parent task, the current task will be suspended
and will be resumed when *this is acquired, inside resume_executor.
If *this has not been locked by another task, then *this will be acquired and the current task will be resumed
immediately in the calling thread of execution.
If *this has already been locked by a parent task, then unavoidable dead-lock will occur.
Throws std::invalid_argument if resume_executor is null.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result<scoped_async_lock> lock (std::shared_ptr<executor> resume_executor);
/*
Tries to acquire *this in the calling thread of execution.
Returns true if *this is acquired, false otherwise.
In any case, the current task is resumed immediately in the calling thread of execution.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result< bool > try_lock ();
/*
Releases *this and allows other tasks (including suspended tasks waiting for *this) to acquire it.
Throws std::system error if *this is not locked at the moment of calling this method.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
void unlock ();
};scoped_async_lock API class scoped_async_lock {
/*
Constructs an async lock wrapper that does not wrap any async lock.
*/
scoped_async_lock () noexcept = default ;
/*
If *this wraps async_lock, this method releases the wrapped lock.
*/
~scoped_async_lock () noexcept ;
/*
Moves rhs to *this.
After this call, *rhs does not wrap any async lock.
*/
scoped_async_lock (scoped_async_lock&& rhs) noexcept ;
/*
Wrapps unlocked lock.
lock must not be in acquired mode when calling this method.
*/
scoped_async_lock (async_lock& lock, std:: defer_lock_t ) noexcept ;
/*
Wrapps locked lock.
lock must be already acquired when calling this method.
*/
scoped_async_lock (async_lock& lock, std:: adopt_lock_t ) noexcept ;
/*
Calls async_lock::lock on the wrapped locked, using resume_executor as a parameter.
Throws std::invalid_argument if resume_executor is nulll.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::lock throws.
*/
lazy_result< void > lock (std::shared_ptr<executor> resume_executor);
/*
Calls async_lock::try_lock on the wrapped lock.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::try_lock throws.
*/
lazy_result< bool > try_lock ();
/*
Calls async_lock::unlock on the wrapped lock.
If *this does not wrap any lock, this method does nothing.
Throws std::system_error if *this wraps a lock and it is not locked.
*/
void unlock ();
/*
Checks whether *this wraps a locked mutex or not.
Returns true if wrapped locked is in acquired state, false otherwise.
*/
bool owns_lock () const noexcept ;
/*
Equivalent to owns_lock.
*/
explicit operator bool () const noexcept ;
/*
Swaps the contents of *this and rhs.
*/
void swap (scoped_async_lock& rhs) noexcept ;
/*
Empties *this and returns a pointer to the previously wrapped lock.
After a call to this method, *this doesn't wrap any lock.
The previously wrapped lock is not released,
it must be released by either unlocking it manually through the returned pointer or by
capturing the pointer with another scoped_async_lock which will take ownerwhip over it.
*/
async_lock* release () noexcept ;
/*
Returns a pointer to the wrapped async_lock, or a null pointer if there is no wrapped async_lock.
*/
async_lock* mutex () const noexcept ;
};async_lock : Dalam contoh ini kami mendorong 10.000.000 bilangan bulat ke objek std::vector dari berbagai tugas secara bersamaan, sambil menggunakan async_lock untuk memastikan tidak ada ras data yang terjadi dan kebenaran keadaan internal objek vektor tersebut dipertahankan.
# include " concurrencpp/concurrencpp.h "
# include < vector >
# include < iostream >
std::vector< size_t > numbers;
concurrencpp::async_lock lock;
concurrencpp::result< void > add_numbers (concurrencpp::executor_tag,
std::shared_ptr<concurrencpp::executor> executor,
size_t begin,
size_t end) {
for ( auto i = begin; i < end; i++) {
concurrencpp::scoped_async_lock raii_wrapper = co_await lock. lock (executor);
numbers. push_back (i);
}
}
int main () {
concurrencpp::runtime runtime;
constexpr size_t range = 10'000'000 ;
constexpr size_t sections = 4 ;
concurrencpp::result< void > results[sections];
for ( size_t i = 0 ; i < 4 ; i++) {
const auto range_start = i * range / sections;
const auto range_end = (i + 1 ) * range / sections;
results[i] = add_numbers ({}, runtime. thread_pool_executor (), range_start, range_end);
}
for ( auto & result : results) {
result. get ();
}
std::cout << " vector size is " << numbers. size () << std::endl;
// make sure the vector state has not been corrupted by unprotected concurrent accesses
std::sort (numbers. begin (), numbers. end ());
for ( size_t i = 0 ; i < range; i++) {
if (numbers[i] != i) {
std::cerr << " vector state is corrupted. " << std::endl;
return - 1 ;
}
}
std::cout << " succeeded pushing range [0 - 10,000,000] concurrently to the vector! " << std::endl;
return 0 ;
} async_condition_variable meniru condition_variable standar dan dapat digunakan dengan aman dengan tugas bersama async_lock . async_condition_variable bekerja dengan async_lock untuk menangguhkan tugas sampai beberapa memori bersama (dilindungi oleh kunci) telah berubah. Tugas yang ingin memantau perubahan memori bersama akan mengunci instance async_lock , dan hubungi async_condition_variable::await . Ini secara atom akan membuka kunci kunci dan menangguhkan tugas saat ini sampai beberapa tugas pengubah memberi tahu variabel kondisi. Tugas pengubah memperoleh kunci, memodifikasi memori bersama, membuka kunci kunci dan memanggil notify_one atau notify_all . Ketika tugas yang ditangguhkan dilanjutkan (menggunakan pelaksana resume yang diberikan untuk await ), itu mengunci kunci lagi, memungkinkan tugas untuk melanjutkan dari titik suspensi dengan mulus. Seperti async_lock , async_condition_variable tidak dapat dipindahkan atau dapat dihindari - itu dimaksudkan untuk dibuat di satu tempat dan diakses oleh banyak tugas.
async_condition_variable::await kelebihan membutuhkan resume-executor, yang akan digunakan untuk melanjutkan tugas, dan scoped_async_lock yang terkunci. async_condition_variable::await datang dengan dua kelebihan - satu yang menerima predikat dan yang tidak. Kelebihan beban yang tidak menerima predikat akan menangguhkan tugas panggilan segera setelah doa sampai dilanjutkan dengan panggilan untuk notify_* . Kelebihan beban yang menerima predikat karya dengan membiarkan predikat memeriksa memori bersama dan menangguhkan tugas berulang kali sampai memori bersama telah mencapai keadaan yang dicari. Secara skematis itu berfungsi seperti menelepon
while (!pred()) { // pred() inspects the shared memory and returns true or false
co_await await (resume_executor, lock); // suspend the current task until another task calls `notify_xxx`
} Sama seperti variabel kondisi standar, aplikasi didorong untuk menggunakan predikat-kelebihan, karena memungkinkan lebih banyak kontrol berbutir halus atas suspensi dan hasilnya. async_condition_variable dapat digunakan untuk menulis koleksi dan struktur data bersamaan seperti antrian dan saluran bersamaan.
Secara internal, async_condition_variable memegang suspensi-queue, di mana tugas-tugas itu membuat diri mereka sendiri ketika mereka menunggu variabel kondisi yang harus diberitahukan. Ketika salah satu metode notify_* dipanggil, tugas pemberitahuan dequeues baik satu tugas atau semua tugas, tergantung pada metode yang dipanggil. Tugas didequeued dari suspensi-kuimaten dengan cara FIFO. Misalnya, jika tugas A panggilan await dan kemudian Tugas B panggilan await , maka Tugas C Panggilan notify_one , maka Tugas Internal A akan dequeued dan dilanjutkan. Tugas B akan tetap ditangguhkan sampai panggilan lain untuk notify_one atau notify_all dipanggil. Jika Tugas A dan Tugas B ditangguhkan dan Tugas C Panggilan notify_all , maka kedua tugas akan didequeued dan dilanjutkan.
async_condition_variable API class async_condition_variable {
/*
Constructor.
*/
async_condition_variable () noexcept ;
/*
Atomically releases lock and suspends the current task by adding it to *this suspension-queue.
Throws std::invalid_argument if resume_executor is null.
Throws std::invalid_argument if lock is not locked at the moment of calling this method.
Might throw std::system_error if the underlying std::mutex throws.
*/
lazy_result< void > await (std::shared_ptr<executor> resume_executor, scoped_async_lock& lock);
/*
Equivalent to:
while (!pred()) {
co_await await(resume_executor, lock);
}
Might throw any exception that await(resume_executor, lock) might throw.
Might throw any exception that pred might throw.
*/
template < class predicate_type >
lazy_result< void > await (std::shared_ptr<executor> resume_executor, scoped_async_lock& lock, predicate_type pred);
/*
Dequeues one task from *this suspension-queue and resumes it, if any available at the moment of calling this method.
The suspended task is resumed by scheduling it to run on the executor given when await was called.
Might throw std::system_error if the underlying std::mutex throws.
*/
void notify_one ();
/*
Dequeues all tasks from *this suspension-queue and resumes them, if any available at the moment of calling this method.
The suspended tasks are resumed by scheduling them to run on the executors given when await was called.
Might throw std::system_error if the underlying std::mutex throws.
*/
void notify_all ();
};async_condition_variable Contoh: Dalam contoh ini, async_lock dan async_condition_variable bekerja bersama untuk mengimplementasikan antrian bersamaan yang dapat digunakan untuk mengirim data (dalam contoh ini, bilangan bulat) di antara tugas. Perhatikan bahwa beberapa metode mengembalikan result sementara pengembalian lain lazy_result , menunjukkan bagaimana tugas yang bersemangat dan malas dapat bekerja sama.
# include " concurrencpp/concurrencpp.h "
# include < queue >
# include < iostream >
using namespace concurrencpp ;
class concurrent_queue {
private:
async_lock _lock;
async_condition_variable _cv;
std::queue< int > _queue;
bool _abort = false ;
public:
concurrent_queue () = default ;
result< void > shutdown (std::shared_ptr<executor> resume_executor) {
{
auto guard = co_await _lock. lock (resume_executor);
_abort = true ;
}
_cv. notify_all ();
}
lazy_result< void > push (std::shared_ptr<executor> resume_executor, int i) {
{
auto guard = co_await _lock. lock (resume_executor);
_queue. push (i);
}
_cv. notify_one ();
}
lazy_result< int > pop (std::shared_ptr<executor> resume_executor) {
auto guard = co_await _lock. lock (resume_executor);
co_await _cv. await (resume_executor, guard, [ this ] {
return _abort || !_queue. empty ();
});
if (!_queue. empty ()) {
auto result = _queue. front ();
_queue. pop ();
co_return result;
}
assert (_abort);
throw std::runtime_error ( " queue has been shut down. " );
}
};
result< void > producer_loop (executor_tag,
std::shared_ptr<thread_pool_executor> tpe,
concurrent_queue& queue,
int range_start,
int range_end) {
for (; range_start < range_end; ++range_start) {
co_await queue. push (tpe, range_start);
}
}
result< void > consumer_loop (executor_tag, std::shared_ptr<thread_pool_executor> tpe, concurrent_queue& queue) {
try {
while ( true ) {
std::cout << co_await queue. pop (tpe) << std::endl;
}
} catch ( const std:: exception & e) {
std::cerr << e. what () << std::endl;
}
}
int main () {
runtime runtime;
const auto thread_pool_executor = runtime. thread_pool_executor ();
concurrent_queue queue;
result< void > producers[ 4 ];
result< void > consumers[ 4 ];
for ( int i = 0 ; i < 4 ; i++) {
producers[i] = producer_loop ({}, thread_pool_executor, queue, i * 5 , (i + 1 ) * 5 );
}
for ( int i = 0 ; i < 4 ; i++) {
consumers[i] = consumer_loop ({}, thread_pool_executor, queue);
}
for ( int i = 0 ; i < 4 ; i++) {
producers[i]. get ();
}
queue. shutdown (thread_pool_executor). get ();
for ( int i = 0 ; i < 4 ; i++) {
consumers[i]. get ();
}
return 0 ;
} Objek Runtime Concurrencpp adalah agen yang digunakan untuk memperoleh, menyimpan, dan membuat eksekutor baru.
Runtime harus dibuat sebagai tipe nilai segera setelah fungsi utama mulai berjalan. Ketika runtime Concurrencpp keluar dari ruang lingkup, ia mengulangi eksekutor yang tersimpan dan menutupnya satu per satu dengan memanggil executor::shutdown . Eksekutor kemudian keluar dari loop kerja batin mereka dan setiap upaya selanjutnya untuk menjadwalkan tugas baru akan melempar concurrencpp::runtime_shutdown Exception. Runtime ini juga berisi antrian pengatur waktu global yang digunakan untuk membuat pengatur waktu dan menunda objek. Setelah kehancuran, eksekutor yang disimpan menghancurkan tugas yang tidak dieksekusi, dan menunggu tugas yang sedang berlangsung selesai. Jika tugas yang sedang berlangsung mencoba menggunakan pelaksana untuk menelurkan tugas baru atau menjadwalkan kelanjutan tugasnya sendiri - pengecualian akan dilemparkan. Dalam hal ini, tugas -tugas yang sedang berlangsung perlu berhenti sesegera mungkin, memungkinkan pelaksana mereka yang mendasarinya untuk berhenti. Antrian timer juga akan ditutup, membatalkan semua pengatur waktu yang berjalan. Dengan gaya kode RAII ini, tidak ada tugas yang dapat diproses sebelum pembuatan objek runtime, dan sementara/setelah runtime keluar dari ruang lingkup. Ini membebaskan aplikasi bersamaan dari yang membutuhkan untuk mengkomunikasikan pesan penghentian secara eksplisit. Tugas adalah eksekutor penggunaan gratis selama objek runtime masih hidup.
runtime API class runtime {
/*
Creates a runtime object with default options.
*/
runtime ();
/*
Creates a runtime object with user defined options.
*/
runtime ( const concurrencpp::runtime_options& options);
/*
Destroys this runtime object.
Calls executor::shutdown on each monitored executor.
Calls timer_queue::shutdown on the global timer queue.
*/
~runtime () noexcept ;
/*
Returns this runtime timer queue used to create new times.
*/
std::shared_ptr<concurrencpp::timer_queue> timer_queue () const noexcept ;
/*
Returns this runtime concurrencpp::inline_executor
*/
std::shared_ptr<concurrencpp::inline_executor> inline_executor () const noexcept ;
/*
Returns this runtime concurrencpp::thread_pool_executor
*/
std::shared_ptr<concurrencpp::thread_pool_executor> thread_pool_executor () const noexcept ;
/*
Returns this runtime concurrencpp::background_executor
*/
std::shared_ptr<concurrencpp::thread_pool_executor> background_executor () const noexcept ;
/*
Returns this runtime concurrencpp::thread_executor
*/
std::shared_ptr<concurrencpp::thread_executor> thread_executor () const noexcept ;
/*
Creates a new concurrencpp::worker_thread_executor and registers it in this runtime.
Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
*/
std::shared_ptr<concurrencpp::worker_thread_executor> make_worker_thread_executor ();
/*
Creates a new concurrencpp::manual_executor and registers it in this runtime.
Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
*/
std::shared_ptr<concurrencpp::manual_executor> make_manual_executor ();
/*
Creates a new user defined executor and registers it in this runtime.
executor_type must be a valid concrete class of concurrencpp::executor.
Might throw std::bad_alloc if no memory is available.
Might throw any exception that the constructor of <<executor_type>> might throw.
*/
template < class executor_type , class ... argument_types>
std::shared_ptr<executor_type> make_executor (argument_types&& ... arguments);
/*
returns the version of concurrencpp that the library was built with.
*/
static std::tuple< unsigned int , unsigned int , unsigned int > version () noexcept ;
}; Dalam beberapa kasus, aplikasi tertarik untuk memantau pembuatan utas dan penghentian, misalnya, beberapa pengalokasi memori mengharuskan utas baru untuk didaftarkan dan tidak terdaftar pada penciptaan dan penghentiannya. Runtime Concurrencpp memungkinkan pengaturan panggilan balik pembuatan utas dan panggilan balik terminasi utas. Panggilan balik itu akan dipanggil setiap kali salah satu pekerja Concurrencpp membuat utas baru dan ketika utas itu berakhir. Panggilan balik itu selalu dipanggil dari dalam utas yang dibuat/diakhiri, jadi std::this_thread::get_id akan selalu mengembalikan ID utas yang relevan. Tanda tangan dari panggilan balik itu adalah void callback (std::string_view thread_name) . thread_name adalah judul spesifik Concurrencpp yang diberikan kepada utas dan dapat diamati di beberapa debugger yang menyajikan nama utas. Nama utas tidak dijamin unik dan harus digunakan untuk penebangan dan debugging.
Untuk mengatur panggilan balik ciptaan utas dan/atau panggilan balik terminasi utas, aplikasi dapat mengatur anggota thread_started_callback dan/atau thread_terminated_callback anggota runtime_options yang diteruskan ke konstruktor runtime. Karena panggilan balik tersebut disalin ke setiap pekerja Concurrencpp yang mungkin membuat utas, panggilan balik itu harus dapat dikopong.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::runtime_options options;
options. thread_started_callback = [](std::string_view thread_name) {
std::cout << " A new thread is starting to run, name: " << thread_name << " , thread id: " << std::this_thread::get_id ()
<< std::endl;
};
options. thread_terminated_callback = [](std::string_view thread_name) {
std::cout << " A thread is terminating, name: " << thread_name << " , thread id: " << std::this_thread::get_id () << std::endl;
};
concurrencpp::runtime runtime (options);
const auto timer_queue = runtime. timer_queue ();
const auto thread_pool_executor = runtime. thread_pool_executor ();
concurrencpp::timer timer =
timer_queue-> make_timer ( std::chrono::milliseconds ( 100 ), std::chrono::milliseconds ( 500 ), thread_pool_executor, [] {
std::cout << " A timer callable is executing " << std::endl;
});
std::this_thread::sleep_for ( std::chrono::seconds ( 3 ));
return 0 ;
}Kemungkinan output:
A new thread is starting to run, name: concurrencpp::timer_queue worker, thread id: 7496
A new thread is starting to run, name: concurrencpp::thread_pool_executor worker, thread id: 21620
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A thread is terminating, name: concurrencpp::timer_queue worker, thread id: 7496
A thread is terminating, name: concurrencpp::thread_pool_executor worker, thread id: 21620
Aplikasi dapat membuat tipe pelaksana kustom mereka sendiri dengan mewarisi kelas derivable_executor . Ada beberapa poin yang perlu dipertimbangkan ketika mengimplementasikan eksekutor yang ditentukan pengguna: yang paling penting adalah mengingat bahwa pelaksana digunakan dari banyak utas, sehingga metode yang diimplementasikan harus aman.
Eksekutor baru dapat dibuat menggunakan runtime::make_executor . Aplikasi tidak boleh membuat eksekutor baru dengan instantiasi biasa (seperti std::make_shared atau polos new ), hanya dengan menggunakan runtime::make_executor . Juga, aplikasi tidak boleh mencoba untuk menginstalasi kembali eksekutor concurrencpp bawaan, seperti thread_pool_executor atau thread_executor , para pelaksana hanya harus diakses melalui instance yang ada di objek runtime.
Poin penting lainnya adalah menangani shutdown dengan benar: shutdown , shutdown_requested dan enqueue semua harus memantau status pelaksana dan berperilaku sesuai ketika dipanggil:
shutdown harus memberi tahu utas yang mendasari untuk berhenti dan kemudian bergabung dengan mereka.shutdown mungkin disebut beberapa kali, dan metode harus menangani skenario ini dengan mengabaikan panggilan berikutnya untuk shutdown setelah doa pertama.enqueue harus melempar concurrencpp::errors::runtime_shutdown pengecualian jika shutdown telah dipanggil sebelumnya. task Menerapkan pelaksana adalah salah satu kasus langka di mana aplikasi perlu bekerja dengan concurrencpp::task secara langsung. concurrencpp::task adalah std::function seperti objek, tetapi dengan beberapa perbedaan. Seperti std::function , objek tugas menyimpan callable yang bertindak sebagai operasi asinkron. Tidak seperti std::function , task adalah jenis langkah saja. Saat doa, objek tugas tidak menerima parameter dan mengembalikan void . Selain itu, setiap objek tugas hanya dapat dipanggil sekali saja. Setelah doa pertama, objek tugas menjadi kosong. Memanggil objek tugas kosong setara dengan memohon lambda kosong ( []{} ), dan tidak akan melempar pengecualian apa pun. Objek tugas menerima callable sebagai referensi penerusan ( type&& di mana type adalah parameter template), dan bukan dengan salinan (seperti std::function ). Konstruksi yang disimpan Callable terjadi di tempat. Ini memungkinkan objek tugas berisi callable yang merupakan tipe bergerak saja (seperti std::unique_ptr dan concurrencpp::result ). Objek tugas mencoba menggunakan metode yang berbeda untuk mengoptimalkan penggunaan jenis yang tersimpan, misalnya, objek tugas menerapkan optimisasi pendek-puffer (SBO) untuk Callable Kecil, dan akan menyambung panggilan ke std::coroutine_handle<void> dengan menelepon mereka secara langsung tanpa pengiriman virtual.
task class task {
/*
Creates an empty task object.
*/
task () noexcept ;
/*
Creates a task object by moving the stored callable of rhs to *this.
If rhs is empty, then *this will also be empty after construction.
After this call, rhs is empty.
*/
task (task&& rhs) noexcept ;
/*
Creates a task object by storing callable in *this.
<<typename std::decay<callable_type>::type>> will be in-place-
constructed inside *this by perfect forwarding callable.
*/
template < class callable_type >
task (callable_type&& callable);
/*
Destroys stored callable, does nothing if empty.
*/
~task () noexcept ;
/*
If *this is empty, does nothing.
Invokes stored callable, and immediately destroys it.
After this call, *this is empty.
May throw any exception that the invoked callable may throw.
*/
void operator ()();
/*
Moves the stored callable of rhs to *this.
If rhs is empty, then *this will also be empty after this call.
If *this already contains a stored callable, operator = destroys it first.
*/
task& operator =(task&& rhs) noexcept ;
/*
If *this is not empty, task::clear destroys the stored callable and empties *this.
If *this is empty, clear does nothing.
*/
void clear () noexcept ;
/*
Returns true if *this stores a callable. false otherwise.
*/
explicit operator bool () const noexcept ;
/*
Returns true if *this stores a callable,
and that stored callable has the same type as <<typename std::decay<callable_type>::type>>
*/
template < class callable_type >
bool contains () const noexcept ;
}; Saat mengimplementasikan eksekutor yang ditentukan pengguna, tergantung pada implementasi untuk menyimpan objek task (ketika enqueue dipanggil), dan melaksanakannya sesuai dengan mekanisme dalam-mekanisme.
Dalam contoh ini, kami membuat eksekutor yang mencatat tindakan seperti tugas -tugas atau melaksanakannya. Kami mengimplementasikan antarmuka executor , dan kami meminta runtime untuk membuat dan menyimpan contohnya dengan menelepon runtime::make_executor . Sisa aplikasi berperilaku persis sama seperti jika kita menggunakan pelaksana yang tidak ditentukan pengguna.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < queue >
# include < thread >
# include < mutex >
# include < condition_variable >
class logging_executor : public concurrencpp ::derivable_executor<logging_executor> {
private:
mutable std::mutex _lock;
std::queue<concurrencpp::task> _queue;
std::condition_variable _condition;
bool _shutdown_requested;
std::thread _thread;
const std::string _prefix;
void work_loop () {
while ( true ) {
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
return ;
}
if (!_queue. empty ()) {
auto task = std::move (_queue. front ());
_queue. pop ();
lock. unlock ();
std::cout << _prefix << " A task is being executed " << std::endl;
task ();
continue ;
}
_condition. wait (lock, [ this ] {
return !_queue. empty () || _shutdown_requested;
});
}
}
public:
logging_executor (std::string_view prefix) :
derivable_executor<logging_executor>( " logging_executor " ),
_shutdown_requested ( false ),
_prefix (prefix) {
_thread = std::thread ([ this ] {
work_loop ();
});
}
void enqueue (concurrencpp::task task) override {
std::cout << _prefix << " A task is being enqueued! " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
throw concurrencpp::errors::runtime_shutdown ( " logging executor - executor was shutdown. " );
}
_queue. emplace ( std::move (task));
_condition. notify_one ();
}
void enqueue (std::span<concurrencpp::task> tasks) override {
std::cout << _prefix << tasks. size () << " tasks are being enqueued! " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
throw concurrencpp::errors::runtime_shutdown ( " logging executor - executor was shutdown. " );
}
for ( auto & task : tasks) {
_queue. emplace ( std::move (task));
}
_condition. notify_one ();
}
int max_concurrency_level () const noexcept override {
return 1 ;
}
bool shutdown_requested () const noexcept override {
std::unique_lock<std::mutex> lock (_lock);
return _shutdown_requested;
}
void shutdown () noexcept override {
std::cout << _prefix << " shutdown requested " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) return ; // nothing to do.
_shutdown_requested = true ;
lock. unlock ();
_condition. notify_one ();
_thread. join ();
}
};
int main () {
concurrencpp::runtime runtime;
auto logging_ex = runtime. make_executor <logging_executor>( " Session #1234 " );
for ( size_t i = 0 ; i < 10 ; i++) {
logging_ex-> post ([] {
std::cout << " hello world " << std::endl;
});
}
std::getchar ();
return 0 ;
}$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S . -B build /lib
$ cmake -- build build /lib --config Release$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S test -B build / test
$ cmake -- build build / test
< # for release mode: cmake --build build/test --config Release #>
$ cd build / test
$ ctest . -V -C Debug
< # for release mode: ctest . -V -C Release #> $ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -DCMAKE_BUILD_TYPE=Release -S . -B build /lib
$ cmake -- build build /lib
#optional, install the library: sudo cmake --install build/lib Dengan dentang dan GCC, juga dimungkinkan untuk menjalankan tes dengan dukungan TSAN (Thread Sanitizer).
$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S test -B build / test
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S test -B build/test
#for TSAN mode: cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_THREAD_SANITIZER=Yes -S test -B build/test
$ cmake -- build build / test
$ cd build / test
$ ctest . -V Saat menyusun di Linux, perpustakaan mencoba menggunakan libstdc++ secara default. Jika Anda bermaksud menggunakan libc++ sebagai implementasi pustaka standar Anda, bendera CMAKE_TOOLCHAIN_FILE harus ditentukan seperti di bawah ini:
$ cmake -DCMAKE_TOOLCHAIN_FILE=../cmake/libc++.cmake -DCMAKE_BUILD_TYPE=Release -S . -B build /libAtau untuk membangun dan menginstal perpustakaan secara manual, pengembang mungkin mendapatkan rilis stabil Concurrencpp melalui manajer paket VCPKG dan Conan:
vcpkg:
$ vcpkg install concurrencppConan: Concurrencpp di Conancenter
Concurrencpp hadir dengan program kotak pasir bawaan yang dapat dimodifikasi dan bereksperimen pengembang, tanpa harus menginstal atau menautkan pustaka yang dikompilasi ke basis kode yang berbeda. Untuk bermain dengan kotak pasir, pengembang dapat memodifikasi sandbox/main.cpp dan mengkompilasi aplikasi menggunakan perintah berikut:
$ cmake -S sandbox -B build /sandbox
$ cmake -- build build /sandbox
< # for release mode: cmake --build build/sandbox --config Release #>
$ ./ build /sandbox < # runs the sandbox> $ cmake -S sandbox -B build /sandbox
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S sandbox -B build/sandbox
$ cmake -- build build /sandbox
$ ./ build /sandbox #runs the sandbox