Concurrencp apporte la puissance des tâches simultanées au monde C ++, permettant aux développeurs d'écrire des applications très concurrentes facilement et en toute sécurité en utilisant des tâches, des exécuteurs et des coroutines. En utilisant des applications concurrencp, peuvent décomposer les grandes procédures qui doivent être traitées de manière asynchrone en tâches plus petites qui s'exécutent simultanément et fonctionnent de manière coopérative pour atteindre le résultat recherché. ConcurrenCP permet également aux applications d'écrire facilement des algorithmes parallèles en utilisant des coroutines parallèles.
Concurrencp Les principaux avantages sont:
std::thread et std::mutex .co_await .executorthread_pool_executormanual_executorresultresultlazy_resultlazy_resultresult_promise APIresult_promise Exempleshared_resultshared_resultmake_ready_resultmake_exceptional_resultwhen_allwhen_anyresume_ontimer_queuetimergeneratorgeneratorasync_lockscoped_async_lockasync_lockasync_condition_variableasync_condition_variableruntimetasktaskConcurrencp est construit autour du concept de tâches simultanées. Une tâche est une opération asynchrone. Les tâches offrent un niveau d'abstraction plus élevé pour le code simultané que les approches traditionnelles axées sur les filetages. Les tâches peuvent être enchaînées, ce qui signifie que les tâches passent leur résultat asynchrone de l'un à l'autre, où le résultat d'une tâche est utilisé comme s'il s'agissait d'un paramètre ou d'une valeur intermédiaire d'une autre tâche en cours. Les tâches permettent aux applications d'utiliser mieux les ressources matérielles disponibles et de faire évoluer beaucoup plus que d'utiliser des threads bruts, car les tâches peuvent être suspendues, en attendant une autre tâche pour produire un résultat, sans bloquer les threads OS sous-jacents. Les tâches apportent beaucoup plus de productivité aux développeurs en leur permettant de se concentrer davantage sur les entreprises commerciales et moins sur les concepts de bas niveau comme la gestion des threads et la synchronisation inter-thread.
Alors que les tâches spécifient quelles actions doivent être exécutées, les exécuteurs sont des objets de travail qui spécifient où et comment exécuter des tâches. Les applications de rechange de l'exécuteur de rechange la gestion fastidieuse des pools de threads et des files d'attente de tâches. Les exécuteurs exécuteurs découplent également ces concepts loin du code d'application, en fournissant une API unifiée pour la création et la planification des tâches.
Les tâches communiquent entre elles à l'aide d'objets de résultat . Un objet résultat est un tuyau asynchrone qui passe le résultat asynchrone d'une tâche à une autre tâche en cours. Les résultats peuvent être attendus et résolus de manière non bloquante.
Ces trois concepts - la tâche, l'exécuteur testamentaire et le résultat associé sont les éléments constitutifs de concurrencp. Les exécuteurs exécutent des tâches qui communiquent entre elles en envoyant des résultats via des objets de résultat. Les tâches, les exécuteurs et les objets de résultat fonctionnent symbiotiquement pour produire du code simultané qui est rapide et propre.
Concurrencp est construit autour du concept RAII. Afin d'utiliser des tâches et des exécuteurs, les applications créent une instance runtime au début de la fonction main . Le runtime est ensuite utilisé pour acquérir des exécuteurs existants et enregistrer de nouveaux exécuteurs définis par l'utilisateur. Les exécuteurs sont utilisés pour créer et planifier des tâches à exécuter, et ils peuvent renvoyer un objet result qui peut être utilisé pour passer le résultat asynchrone à une autre tâche qui agit comme son consommateur. Lorsque le temps d'exécution est détruit, il itère sur chaque exécuteur stocké et appelle sa méthode shutdown . Chaque exécuteur testamentaire sort alors gracieusement. Les tâches imprévues sont détruites et les tentatives de créer de nouvelles tâches lanceront une exception.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::runtime runtime;
auto result = runtime. thread_executor ()-> submit ([] {
std::cout << " hello world " << std::endl;
});
result. get ();
return 0 ;
} Dans cet exemple de base, nous avons créé un objet d'exécution, puis nous avons acquis l'exécuteur de thread à partir de l'exécution. Nous avons utilisé submit pour passer un lambda comme notre appelable donné. Ce Lambda renvoie void , par conséquent, l'exécuteur renvoie un objet result<void> qui transmet le résultat asynchrone à l'appelant. Les appels main get ce qui bloque le thread principal jusqu'à ce que le résultat devienne prêt. Si aucune exception n'a été lancée, get les rendements void . Si une exception a été lancée, get -le le reménage. De façon asynchrone, thread_executor lance un nouveau thread d'exécution et exécute la lambda donnée. Il est implicitement co_return void et la tâche est terminée. main est alors débloqué.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < vector >
# include < algorithm >
# include < ctime >
using namespace concurrencpp ;
std::vector< int > make_random_vector () {
std::vector< int > vec ( 64 * 1'024 );
std::srand ( std::time ( nullptr ));
for ( auto & i : vec) {
i = :: rand ();
}
return vec;
}
result< size_t > count_even (std::shared_ptr<thread_pool_executor> tpe, const std::vector< int >& vector) {
const auto vecor_size = vector. size ();
const auto concurrency_level = tpe-> max_concurrency_level ();
const auto chunk_size = vecor_size / concurrency_level;
std::vector<result< size_t >> chunk_count;
for ( auto i = 0 ; i < concurrency_level; i++) {
const auto chunk_begin = i * chunk_size;
const auto chunk_end = chunk_begin + chunk_size;
auto result = tpe-> submit ([&vector, chunk_begin, chunk_end]() -> size_t {
return std::count_if (vector. begin () + chunk_begin, vector. begin () + chunk_end, []( auto i) {
return i % 2 == 0 ;
});
});
chunk_count. emplace_back ( std::move (result));
}
size_t total_count = 0 ;
for ( auto & result : chunk_count) {
total_count += co_await result;
}
co_return total_count;
}
int main () {
concurrencpp::runtime runtime;
const auto vector = make_random_vector ();
auto result = count_even (runtime. thread_pool_executor (), vector);
const auto total_count = result. get ();
std::cout << " there are " << total_count << " even numbers in the vector " << std::endl;
return 0 ;
} Dans cet exemple, nous commençons le programme en créant un objet d'exécution. Nous créons un vecteur rempli de nombres aléatoires, puis nous acquérons le thread_pool_executor à partir de l'exécution et des appels count_even . count_even est une coroutine qui engendre plus de tâches et co_await pour eux pour terminer à l'intérieur. max_concurrency_level Renvoie la quantité maximale de travailleurs que l'exécuteur prend en charge, dans le cas d'exécuteur Threadpool, le nombre de travailleurs est calculé à partir du nombre de cœurs. Nous partitions ensuite le tableau pour correspondre au nombre de travailleurs et envoyons chaque morceau à traiter dans sa propre tâche. De manière asynchrone, les travailleurs comptent combien de nombres contiennent chaque morceau et co_return le résultat. count_even résume chaque résultat en tirant le compte à l'aide de co_await , le résultat final est alors co_return ed. Le fil principal, qui a été bloqué par l'appel get est débloqué et le nombre total est retourné. Le principal imprime le nombre de nombres pair et le programme se termine gracieusement.
Chaque opération grande ou complexe peut être décomposée en étapes plus petites et chaînables. Les tâches sont des opérations asynchrones implémentant ces étapes de calcul. Les tâches peuvent fonctionner n'importe où avec l'aide d'exécuteurs. Bien que les tâches puissent être créées à partir de callables réguliers (tels que des fonds et des lambdas), les tâches sont principalement utilisées avec des coroutines, qui permettent une suspension et une reprise lisses. Dans concurrencp, le concept de tâche est représenté par la classe concurrencpp::task . Bien que le concept de tâche soit central pour concurrenpp, les applications devront rarement créer et manipuler eux-mêmes des objets de tâche, car les objets de tâche sont créés et planifiés par le temps d'exécution sans aide externe.
Concurrencp permet aux applications de produire et de consommer des coroutines comme principal moyen de créer des tâches. Concurrencp soutient les tâches à la fois impatientes et paresseuses.
Les tâches impatientes commencent à courir le moment où ils sont invoqués. Ce type d'exécution est recommandé lorsque les applications doivent tirer une action asynchrones et consommer son résultat plus tard (incendier et consommer plus tard), ou ignorer complètement le résultat asynchrone (feu et oublier).
Les tâches désireuses peuvent renvoyer result ou null_result . Le type de retour result indique à la coroutine de passer la valeur retournée ou l'exception lancée (incendie et consommer plus tard) tandis que le type de retour null_result dit à la coroutine de tomber et d'ignorer l'un d'eux (incendier et oublier).
Les coroutines désireuses peuvent commencer à s'exécuter de manière synchrone, dans le fil de l'appelant. Ce type de coroutines est appelé "coroutines régulières". Les coroutines désireuses de concurrencp peuvent également commencer à s'exécuter en parallèle, à l'intérieur d'un exécuteur donné, ce type de coroutines est appelé "coroutines parallèles".
Les tâches paresseuses, en revanche, commencent à fonctionner uniquement lorsque co_await Ed. Ce type de tâches est recommandé lorsque le résultat de la tâche est censé être consommé immédiatement après la création de la tâche. Les tâches paresseuses, reportées, sont un peu plus optimisées pour le cas de la consommation immédiate, car ils n'ont pas besoin de synchronisation spéciale pour transmettre le résultat asynchrone à son consommateur. Le compilateur peut également optimiser certaines allocations de mémoire nécessaires pour former la promesse de Coroutine sous-jacente. Il n'est pas possible de tirer une tâche paresseuse et d'exécuter autre chose - le tir d'une coroutine paresseuse signifie nécessairement la suspension de l'appelant-coroutine. L'appelant Coroutine ne reprendra que lorsque la coroutine paresseuse se terminera. Les tâches paresseuses ne peuvent retourner que lazy_result .
Les tâches paresseuses peuvent être converties en tâches impatientes en appelant lazy_result::run . Cette méthode exécute la tâche paresseuse en ligne et renvoie un objet result qui surveille la tâche nouvellement démarrée. Si les développeurs ne savent pas quel type de résultat utiliser, ils sont encouragés à utiliser des résultats paresseux, car ils peuvent être convertis en résultats réguliers (désireux) si nécessaire.
Lorsqu'une fonction renvoie l'un de lazy_result , result ou null_result et contient au moins un co_await ou co_return dans son corps, la fonction est une coroutine concurrencp. Chaque coroutine concurrencp valide est une tâche valide. Dans notre exemple de compte ci-dessus, count_even est une telle coroutine. Nous avons d'abord engendré count_even , puis à l'intérieur de lui, l'exécuteur de threadpool a engendré plus de tâches enfants (qui sont créées à partir de callables réguliers), qui ont finalement été rejoints à l'aide de co_await .
Un exécuteur concurrencp est un objet qui est capable de planifier et d'exécuter des tâches. Les exécuteurs simplifient le travail de gestion des ressources telles que les threads, les pools de threads et les files d'attente de tâches en les découplant loin du code d'application. Les exécuteurs fournissent un moyen unifié de planifier et d'exécuter des tâches, car ils étendent tous concurrencpp::executor .
executor class executor {
/*
Initializes a new executor and gives it a name.
*/
executor (std::string_view name);
/*
Destroys this executor.
*/
virtual ~executor () noexcept = default ;
/*
The name of the executor, used for logging and debugging.
*/
const std::string name;
/*
Schedules a task to run in this executor.
Throws concurrencpp::errors::runtime_shutdown exception if shutdown was called before.
*/
virtual void enqueue (concurrencpp::task task) = 0;
/*
Schedules a range of tasks to run in this executor.
Throws concurrencpp::errors::runtime_shutdown exception if shutdown was called before.
*/
virtual void enqueue (std::span<concurrencpp::task> tasks) = 0;
/*
Returns the maximum count of real OS threads this executor supports.
The actual count of threads this executor is running might be smaller than this number.
returns numeric_limits<int>::max if the executor does not have a limit for OS threads.
*/
virtual int max_concurrency_level () const noexcept = 0;
/*
Returns true if shutdown was called before, false otherwise.
*/
virtual bool shutdown_requested () const noexcept = 0;
/*
Shuts down the executor:
- Tells underlying threads to exit their work loop and joins them.
- Destroys unexecuted coroutines.
- Makes subsequent calls to enqueue, post, submit, bulk_post and
bulk_submit to throw concurrencpp::errors::runtime_shutdown exception.
- Makes shutdown_requested return true.
*/
virtual void shutdown () noexcept = 0;
/*
Turns a callable and its arguments into a task object and
schedules it to run in this executor using enqueue.
Arguments are passed to the task by decaying them first.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type , class ... argument_types>
void post (callable_type&& callable, argument_types&& ... arguments);
/*
Like post, but returns a result object that passes the asynchronous result.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type , class ... argument_types>
result<type> submit (callable_type&& callable, argument_types&& ... arguments);
/*
Turns an array of callables into an array of tasks and
schedules them to run in this executor using enqueue.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type >
void bulk_post (std::span<callable_type> callable_list);
/*
Like bulk_post, but returns an array of result objects that passes the asynchronous results.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type >
std::vector<concurrencpp::result<type>> bulk_submit (std::span<callable_type> callable_list);
};Comme mentionné ci-dessus, ConcurrenCP fournit des exécuteurs couramment utilisés. Ces types d'exécuteur sont:
Exécuteur de la pool de threads - un exécuteur à usage général qui maintient un pool de threads. L'exécuteur de la pool de threads convient aux tâches courtes liées au processeur qui ne bloquent pas. Les applications sont encouragées à utiliser cet exécuteur exécuteur comme exécuteur par défaut pour les tâches non bloquantes. Le pool de threads concurrencp fournit une injection de filetage dynamique et un équilibrage de travail dynamique.
Exécuteur en arrière-plan - un exécuteur de threadpool avec un plus grand pool de threads. Convient pour le lancement de tâches de blocage courtes comme le fichier IO et les requêtes DB. Remarque importante: Lors de la consommation de résultats, cet exécuteur est renvoyé en appelant submit et bulk_submit , il est important de changer l'exécution à l'aide resume_on à un exécuteur lié au processeur, afin d'éviter les tâches liées au processeur à traiter dans background_executor.
exemple:
auto result = background_executor.submit([] { /* some blocking action */ });
auto done_result = co_await result.resolve();
co_await resume_on (some_cpu_executor);
auto val = co_await done_result; // runs inside some_cpu_executorThread Exécuteur - un exécuteur testamentaire qui lance chaque tâche en file d'attente pour s'exécuter sur un nouveau thread d'exécution. Les fils ne sont pas réutilisés. Cet exécuteur est bon pour les tâches de longue durée, comme des objets qui exécutent une boucle de travail ou des opérations de blocage longues.
Exécuteur de threads de travail - Un exécuteur de thread unique qui maintient une seule file d'attente de tâches. Convient lorsque les applications souhaitent un thread dédié qui exécute de nombreuses tâches connexes.
Exécuteur manuel - Un exécuteur testamentaire qui n'exécute pas les coroutines en soi. Le code d'application peut exécuter des tâches précédemment enquêté en invoquant manuellement ses méthodes d'exécution.
Exécuteur dérivable - une classe de base pour les exécuteurs définis par l'utilisateur. Bien que héritage directement de concurrencpp::executor est possible, derivable_executor utilise le modèle CRTP qui offre quelques opportunités d'optimisation pour le compilateur.
Exécuteur en ligne - principalement utilisé pour remplacer le comportement d'autres exécuteurs. L'enquête d'une tâche équivaut à l'invoquer en ligne.
Le mécanisme nu d'un exécuteur testamentaire est encapsulé dans sa méthode enqueue . Cette méthode entame une tâche d'exécution et a deux surcharges: une surcharge reçoit un seul objet de tâche comme argument, et un autre qui reçoit une portée d'objets de tâche. La deuxième surcharge est utilisée pour mettre en place un lot de tâches. Cela permet une meilleure horaire des heuristiques et une diminution des affirmations.
Les applications n'ont pas à s'appuyer uniquement sur enqueue , concurrencpp::executor fournit une API pour planifier des callables utilisateur en les convertissant en objets de tâche dans les coulisses. Les applications peuvent demander aux exécuteurs des exécuteurs de renvoyer un objet de résultat qui passe le résultat asynchrone de la fourniture fournie. Cela se fait en appelant executor::submit et executor::bulk_submit . submit obtient un appelable et renvoie un objet de résultat. executor::bulk_submit obtient une span de callables et renvoie un vector d'objets de résultat d'une manière similaire submit . Dans de nombreux cas, les applications ne sont pas intéressées par la valeur ou l'exception asynchrone. Dans ce cas, les applications peuvent utiliser executor:::post et executor::bulk_post pour planifier une tentative ou une span de callmables à exécuter, mais indique également à la tâche de supprimer toute valeur renvoyée ou exception jetée. Ne pas passer le résultat asynchrone est plus rapide que le passage, mais nous n'avons donc aucun moyen de connaître le statut ou le résultat de la tâche en cours.
post , bulk_post , submit et bulk_submit utilisent enqueue dans les coulisses pour le mécanisme de planification sous-jacent.
thread_pool_executor Mis à part post , submit , bulk_post et bulk_submit , le thread_pool_executor fournit ces méthodes supplémentaires.
class thread_pool_executor {
/*
Returns the number of milliseconds each thread-pool worker
remains idle (lacks any task to execute) before exiting.
This constant can be set by passing a runtime_options object
to the constructor of the runtime class.
*/
std::chrono::milliseconds max_worker_idle_time () const noexcept ;
};manual_executor Mis à part post , submit , bulk_post et bulk_submit , le manual_executor fournit ces méthodes supplémentaires.
class manual_executor {
/*
Destructor. Equivalent to clear.
*/
~manual_executor () noexcept ;
/*
Returns the number of enqueued tasks at the moment of invocation.
This number can change quickly by the time the application handles it, it should be used as a hint.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
size_t size () const noexcept ;
/*
Queries whether the executor is empty from tasks at the moment of invocation.
This value can change quickly by the time the application handles it, it should be used as a hint.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
bool empty () const noexcept ;
/*
Clears the executor from any enqueued but yet to-be-executed tasks,
and returns the number of cleared tasks.
Tasks enqueued to this executor by (post_)submit method are resumed
and errors::broken_task exception is thrown inside them.
Ongoing tasks that are being executed by loop_once(_XXX) or loop(_XXX) are uneffected.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t clear ();
/*
Tries to execute a single task. If at the moment of invocation the executor
is empty, the method does nothing.
Returns true if a task was executed, false otherwise.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool loop_once ();
/*
Tries to execute a single task.
This method returns when either a task was executed or max_waiting_time
(in milliseconds) has reached.
If max_waiting_time is 0, the method is equivalent to loop_once.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool loop_once_for (std::chrono::milliseconds max_waiting_time);
/*
Tries to execute a single task.
This method returns when either a task was executed or timeout_time has reached.
If timeout_time has already expired, this method is equivalent to loop_once.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
bool loop_once_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Tries to execute max_count enqueued tasks and returns the number of tasks that were executed.
This method does not wait: it returns when the executor
becomes empty from tasks or max_count tasks have been executed.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t loop ( size_t max_count);
/*
Tries to execute max_count tasks.
This method returns when either max_count tasks were executed or a
total amount of max_waiting_time has passed.
If max_waiting_time is 0, the method is equivalent to loop.
Returns the actual amount of tasks that were executed.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t loop_for ( size_t max_count, std::chrono::milliseconds max_waiting_time);
/*
Tries to execute max_count tasks.
This method returns when either max_count tasks were executed or timeout_time has reached.
If timeout_time has already expired, the method is equivalent to loop.
Returns the actual amount of tasks that were executed.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
size_t loop_until ( size_t max_count, std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Waits for at least one task to be available for execution.
This method should be used as a hint,
as other threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
void wait_for_task ();
/*
This method returns when one or more tasks are available for
execution or max_waiting_time has passed.
Returns true if at at least one task is available for execution, false otherwise.
This method should be used as a hint, as other threads (calling loop, for example)
might empty the executor, before this thread has a chance to do something
with the newly enqueued tasks.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool wait_for_task_for (std::chrono::milliseconds max_waiting_time);
/*
This method returns when one or more tasks are available for execution or timeout_time has reached.
Returns true if at at least one task is available for execution, false otherwise.
This method should be used as a hint,
as other threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
bool wait_for_task_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
This method returns when max_count or more tasks are available for execution.
This method should be used as a hint, as other threads
(calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
void wait_for_tasks ( size_t max_count);
/*
This method returns when max_count or more tasks are available for execution
or max_waiting_time (in milliseconds) has passed.
Returns the number of tasks available for execution when the method returns.
This method should be used as a hint, as other
threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t wait_for_tasks_for ( size_t count, std::chrono::milliseconds max_waiting_time);
/*
This method returns when max_count or more tasks are available for execution
or timeout_time is reached.
Returns the number of tasks available for execution when the method returns.
This method should be used as a hint, as other threads
(calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
size_t wait_for_tasks_until ( size_t count, std::chrono::time_point<clock_type, duration_type> timeout_time);
}; Les valeurs et exceptions asynchrones peuvent être consommées à l'aide d'objets de résultat concurrencp. Le type result représente le résultat asynchrone d'une tâche impatient tandis que lazy_result représente le résultat différé d'une tâche paresseuse.
Lorsqu'une tâche (désireuse ou paresseuse) se termine, elle renvoie une valeur valide ou lance une exception. Dans les deux cas, ce résultat asynchrone est transmis au consommateur de l'objet résultat.
Les objets result forment des coroutines asymétriques - L'exécution d'un appelant-coroutine n'est pas effectuée par l'exécution d'une callee-coroutine, les deux coroutines peuvent s'exécuter indépendamment. Ce n'est que lorsque vous consommant le résultat de la Callee-Coroutine, l'appelant-coroutine pourrait être suspendu en attendant la Callee à terminer. Jusque-là, les deux coroutines fonctionnent indépendamment. La Callee-Coroutine fonctionne, que son résultat soit consommé ou non.
Les objets lazy_result forment les coroutines symétriques - L'exécution d'une callee-coroutine ne se produit qu'après la suspension de l'appelant-corotine. En attendant un résultat paresseux, la coroutine actuelle est suspendue et la tâche paresseuse associée au résultat paresseux commence à s'exécuter. Une fois la Callee-Coroutine terminée et donné un résultat, l'appelant-coroutine reprend. Si un résultat paresseux n'est pas consommé, sa tâche paresseuse associée ne commence jamais à s'exécuter.
Tous les objets de résultat sont un type de déplacement uniquement, et en tant que tels, ils ne peuvent pas être utilisés après que leur contenu a été déplacé vers un autre objet de résultat. Dans ce cas, l'objet de résultat est considéré comme vide et tente d'appeler une méthode autre que operator bool et operator = lancera une exception.
Une fois que le résultat asynchrone a été retiré de l'objet de résultat (par exemple, en appelant get ou operator co_await ), l'objet résultat devient vide. Le vide peut être testé avec operator bool .
En attente d'un résultat signifie suspendre la coroutine actuelle jusqu'à ce que l'objet résultat soit prêt. Si une valeur valide était renvoyée de la tâche associée, elle est renvoyée de l'objet résultat. Si la tâche associée lance une exception, elle est retrouvée. Au moment de l'attente, si le résultat est déjà prêt, la coroutine actuelle reprend immédiatement. Sinon, il est repris par le fil qui définit le résultat ou l'exception asynchrone.
La résolution d'un résultat est similaire à l'attendre. La différence est que l'expression co_await renverra l'objet de résultat lui-même, sous une forme non vide, dans un état prêt. Le résultat asynchrone peut ensuite être tiré en utilisant get ou co_await .
Chaque objet de résultat a un statut indiquant l'état du résultat asynchrone. L'état des résultats varie de result_status::idle (le résultat ou l'exception asynchrone n'a pas encore été produit) sur result_status::value (la tâche associée s'est terminée gracieusement en renvoyant une valeur valide) à result_status::exception (la tâche terminée en lançant une exception). Le statut peut être interrogé en appelant (lazy_)result::status .
result Le type result représente le résultat d'une tâche asynchrone continue, similaire à std::future .
En plus d'attendre et de résoudre les objets de résultat, ils peuvent également être attendus en appelant n'importe quel result::wait , result::wait_for , result::wait_until ou result::get . Attendre un résultat pour terminer est une opération de blocage (dans le cas, le résultat asynchrone n'est pas prêt) et suspendre l'ensemble du fil d'exécution en attendant que le résultat asynchrone soit disponible. Les opérations d'attente sont généralement découragées et autorisées uniquement dans les tâches au niveau de la racine ou dans des contextes qui le permettent, comme bloquer le thread principal en attendant que le reste de l'application se termine gracieusement, ou en utilisant concurrencpp::blocking_executor ou concurrencpp::thread_executor .
En attente d'objets de résultat en utilisant co_await (et ce faisant, transformer également la fonction / tâche actuelle en coroutine) est le moyen préféré de consommer des objets de résultat, car il ne bloque pas les threads sous-jacents.
result class result {
/*
Creates an empty result that isn't associated with any task.
*/
result () noexcept = default ;
/*
Destroys the result. Associated tasks are not cancelled.
The destructor does not block waiting for the asynchronous result to become ready.
*/
~result () noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result (result&& rhs) noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.
*/
result& operator = (result&& rhs) noexcept = default ;
/*
Returns true if this is a non-empty result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The returned value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Blocks the current thread of execution until this result is ready,
when status() != result_status::idle.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void wait ();
/*
Blocks until this result is ready or duration has passed. Returns the status
of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class duration_unit , class ratio >
result_status wait_for (std::chrono::duration<duration_unit, ratio> duration);
/*
Blocks until this result is ready or timeout_time has reached. Returns the status
of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class clock , class duration >
result_status wait_until (std::chrono::time_point<clock, duration> timeout_time);
/*
Blocks the current thread of execution until this result is ready,
when status() != result_status::idle.
If the result is a valid value, it is returned, otherwise, get rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
type get ();
/*
Returns an awaitable used to await this result.
If the result is already ready - the current coroutine resumes
immediately in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended
and resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to resolve this result.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
};lazy_resultUn objet de résultat paresseux représente le résultat d'une tâche paresseuse différée.
lazy_result a la responsabilité à la fois de démarrer la tâche paresseuse associée et de remettre son résultat différé à son consommateur. Lorsqu'il attend ou résolu, le résultat paresseux suspend la coroutine actuelle et démarre la tâche paresseuse associée. Lorsque la tâche associée se termine, sa valeur asynchrone est transmise à la tâche de l'appelant, qui reprend ensuite.
Parfois, une API peut renvoyer un résultat paresseux, mais les applications ont besoin de sa tâche associée pour s'exécuter avec impatience (sans suspendre la tâche de l'appelant). Dans ce cas, les tâches paresseuses peuvent être converties en tâches impatientes en appelant run sur son résultat paresseux associé. Dans ce cas, la tâche associée commencera à exécuter en ligne, sans suspendre la tâche de l'appelant. Le résultat paresseux d'origine est vidé et un objet result valide qui surveille la tâche nouvellement démarrée sera retournée à la place.
lazy_result class lazy_result {
/*
Creates an empty lazy result that isn't associated with any task.
*/
lazy_result () noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
lazy_result (lazy_result&& rhs) noexcept ;
/*
Destroys the result. If not empty, the destructor destroys the associated task without resuming it.
*/
~lazy_result () noexcept ;
/*
Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.
If *this is not empty, then operator= destroys the associated task without resuming it.
*/
lazy_result& operator =(lazy_result&& rhs) noexcept ;
/*
Returns true if this is a non-empty result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The returned value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Returns an awaitable used to start the associated task and await this result.
If the result is already ready - the current coroutine resumes immediately
in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended and
resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to start the associated task and resolve this result.
If the result is already ready - the current coroutine resumes immediately
in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended and resumed
when the asynchronous result is ready, by the thread which
had set the asynchronous value or exception.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
/*
Runs the associated task inline and returns a result object that monitors the newly started task.
After this call, *this is empty.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
*/
result<type> run ();
};Les coroutines désireuses régulières commencent à s'exécuter de manière synchrone dans le fil d'appel de l'exécution. L'exécution peut se déplacer vers un autre thread d'exécution si une coroutine subit un reprogrammation, par exemple en attendant un objet de résultat non prudent à l'intérieur. ConcurrencP fournit également des coroutines parallèles, qui commencent à s'exécuter à l'intérieur d'un exécuteur donné, pas dans le fil d'Exécution invoquant. Ce style de planification des coroutines est particulièrement utile lors de la rédaction d'algorithmes parallèles, d'algorithmes récursifs et d'algorithmes simultanés qui utilisent le modèle de jointure de fourche.
Chaque coroutine parallèle doit répondre aux conditions préalables suivantes:
result / null_result .executor_tag comme premier argument.type* / type& / std::shared_ptr<type> , où type est une classe concrete d' executor comme deuxième argument.co_await ou co_return dans son corps. Si tout ce qui précède s'applique, la fonction est une coroutine parallèle: concurrencp démarrera la coroutine suspendue et la reprogrammera immédiatement pour s'exécuter dans l'exécuteur fourni. concurrencpp::executor_tag est un espace réservé factice pour indiquer au concurrencp Runtime que cette fonction n'est pas une fonction régulière, elle doit commencer à fonctionner à l'intérieur de l'exécuteur donné. Si l'exécuteur transmis à la coroutine parallèle est nul, la coroutine ne commencera pas à s'exécuter et une exception std::invalid_argument sera jetée de manière synchrone. Si toutes les conditions préalables sont remplies, les applications peuvent consommer le résultat de la coroutine parallèle en utilisant l'objet de résultat renvoyé.
Dans cet exemple, nous calculons le 30e membre de la séquence Fibonacci de manière parallèle. Nous commençons à lancer chaque étape de Fibonacci dans sa propre coroutine parallèle. Le premier argument est un executor_tag factice et le deuxième argument est l'exécuteur Threadpool. Chaque étape récursive invoque une nouvelle coroutine parallèle qui fonctionne en parallèle. Chaque résultat est co_return ed à sa tâche parent et acquis en utilisant co_await .
Lorsque nous jugeons que l'entrée est suffisamment petite pour être calculée de manière synchrone (lorsque curr <= 10 ), nous cessons d'exécuter chaque étape récursive dans sa propre tâche et résolvons simplement l'algorithme de manière synchrone.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace concurrencpp ;
int fibonacci_sync ( int i) {
if (i == 0 ) {
return 0 ;
}
if (i == 1 ) {
return 1 ;
}
return fibonacci_sync (i - 1 ) + fibonacci_sync (i - 2 );
}
result< int > fibonacci (executor_tag, std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10 ) {
co_return fibonacci_sync (curr);
}
auto fib_1 = fibonacci ({}, tpe, curr - 1 );
auto fib_2 = fibonacci ({}, tpe, curr - 2 );
co_return co_await fib_1 + co_await fib_2;
}
int main () {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci ({}, runtime. thread_pool_executor (), 30 ). get ();
std::cout << " fibonacci(30) = " << fibb_30 << std::endl;
return 0 ;
} Pour comparer, c'est ainsi que le même code est écrit sans utiliser de coroutines parallèles et s'appuyant sur executor::submit seul. Puisque fibonacci renvoie un result<int> , le soumettre de manière récursive via executor::submit résultera un result<result<int>> .
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace concurrencpp ;
int fibonacci_sync ( int i) {
if (i == 0 ) {
return 0 ;
}
if (i == 1 ) {
return 1 ;
}
return fibonacci_sync (i - 1 ) + fibonacci_sync (i - 2 );
}
result< int > fibonacci (std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10 ) {
co_return fibonacci_sync (curr);
}
auto fib_1 = tpe-> submit (fibonacci, tpe, curr - 1 );
auto fib_2 = tpe-> submit (fibonacci, tpe, curr - 2 );
co_return co_await co_await fib_1 +
co_await co_await fib_2;
}
int main () {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci (runtime. thread_pool_executor (), 30 ). get ();
std::cout << " fibonacci(30) = " << fibb_30 << std::endl;
return 0 ;
} Les objets de résultat sont le principal moyen de transmettre des données entre les tâches dans concurrencp et nous avons vu comment les exécuteurs et les coroutines produisent de tels objets. Parfois, nous voulons utiliser les capacités des objets de résultat avec des non-tâches, par exemple lors de l'utilisation d'une bibliothèque tierce. Dans ce cas, nous pouvons compléter un objet de résultat en utilisant un result_promise . result_promise ressemble à un objet std::promise - les applications peuvent définir manuellement le résultat ou l'exception asynchrone et rendre l'objet result associé à être prêt.
Tout comme les objets de résultat, les promesses de résultat sont un type de mouvement uniquement qui devient vide après le mouvement. De même, après avoir établi un résultat ou une exception, la promesse du résultat devient également vide. Si un résultat-promis sort de la portée et qu'aucun résultat / exception n'a été défini, le destructeur de promis de résultat définit une exception concurrencpp::errors::broken_task à l'aide de la méthode set_exception . Les tâches suspendues et bloquées en attente de l'objet de résultat associé sont repris / non bloqués.
Les promesses de résultat peuvent convertir le style de code de code en style de code async/await : chaque fois qu'un composant nécessite un rappel pour passer le résultat asynchrone, nous pouvons passer un rappel qui appelle set_result ou set_exception (en fonction du résultat asynchrone lui-même) sur la promesse de résultats passée et renvoyez le résultat associé.
result_promise API template < class type >
class result_promise {
/*
Constructs a valid result_promise.
Might throw std::bad_alloc if fails to allocate memory.
*/
result_promise ();
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result_promise (result_promise&& rhs) noexcept ;
/*
Destroys *this, possibly setting an errors::broken_task exception
by calling set_exception if *this is not empty at the time of destruction.
*/
~result_promise () noexcept ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result_promise& operator = (result_promise&& rhs) noexcept ;
/*
Returns true if this is a non-empty result-promise.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Sets a value by constructing <<type>> from arguments... in-place.
Makes the associated result object become ready - tasks waiting for it
to become ready are unblocked.
Suspended tasks are resumed inline.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Might throw any exception that the constructor
of type(std::forward<argument_types>(arguments)...) throws.
*/
template < class ... argument_types>
void set_result (argument_types&& ... arguments);
/*
Sets an exception.
Makes the associated result object become ready - tasks waiting for it
to become ready are unblocked.
Suspended tasks are resumed inline.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Throws std::invalid_argument exception if exception_ptr is null.
*/
void set_exception (std::exception_ptr exception_ptr);
/*
A convenience method that invokes a callable with arguments... and calls set_result
with the result of the invocation.
If an exception is thrown, the thrown exception is caught and set instead by calling set_exception.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Might throw any exception that callable(std::forward<argument_types>(arguments)...)
or the contructor of type(type&&) throw.
*/
template < class callable_type , class ... argument_types>
void set_from_function (callable_type&& callable, argument_types&& ... arguments);
/*
Gets the associated result object.
Throws errors::empty_result_promise exception If *this is empty.
Throws errors::result_already_retrieved exception if this method had been called before.
*/
result<type> get_result ();
};result_promise Exemple: Dans cet exemple, result_promise est utilisé pour pousser les données à partir d'un thread, et il peut être tiré de son objet result associé à partir d'un autre thread.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::result_promise<std::string> promise;
auto result = promise. get_result ();
std::thread my_3_party_executor ([promise = std::move (promise)] () mutable {
std::this_thread::sleep_for ( std::chrono::seconds ( 1 )); // Imitate real work
promise. set_result ( " hello world " );
});
auto asynchronous_string = result. get ();
std::cout << " result promise returned string: " << asynchronous_string << std::endl;
my_3_party_executor. join ();
} Dans cet exemple, nous utilisons std::thread comme exécuteur tiers. Cela représente un scénario lorsqu'un exécuteur non-CurrencPP est utilisé dans le cadre du cycle de vie de l'application. Nous extraissons l'objet de résultat avant de passer la promesse et bloquons le thread principal jusqu'à ce que le résultat devienne prêt. Dans my_3_party_executor , nous avons établi un résultat comme si nous le co_return .
Les résultats partagés sont un type spécial d'objets de résultat qui permettent à plusieurs consommateurs d'accéder au résultat asynchrone, similaire à std::shared_future . Différents consommateurs de différents threads peuvent appeler des fonctions telles que await , get et resolve de manière sûre.
Les résultats partagés sont construits à partir d'objets de résultats réguliers et contrairement aux objets de résultats réguliers, ils sont à la fois copyables et mobiles. En tant que tel, shared_result se comporte comme std::shared_ptr Type. Si une instance de résultat partagée est déplacée vers une autre instance, l'instance devient vide et essayer d'y accéder lancera une exception.
Afin de prendre en charge plusieurs consommateurs, les résultats partagés renvoient une référence à la valeur asynchrone au lieu de le déplacer (comme un résultat régulier). Par exemple, un shared_result<int> renvoie un int& quand get , await etc. sont appelés. Si le type sous-jacent du shared_result est void ou un type de référence (comme int& ), ils sont retournés comme d'habitude. Si le résultat asynchrone est une exception lancée, il est retrouvé.
Notez que lors de l'acquisition du résultat asynchrone à l'aide de shared_result à partir de plusieurs threads est en file d'attente, la valeur réelle peut ne pas être sûre. Par exemple, plusieurs threads peuvent acquérir un entier asynchrone en recevant sa référence ( int& ). Il ne rend pas le fil entier lui-même sûr. Il est bon de muter la valeur asynchrone si la valeur asynchrone est déjà sûre. Alternativement, les applications sont encouragées à utiliser les types const pour commencer (comme const int ), et acquérir des références constantes (comme const int& ) qui empêchent la mutation.
shared_result class share_result {
/*
Creates an empty shared-result that isn't associated with any task.
*/
shared_result () noexcept = default ;
/*
Destroys the shared-result. Associated tasks are not cancelled.
The destructor does not block waiting for the asynchronous result to become ready.
*/
~shared_result () noexcept = default ;
/*
Converts a regular result object to a shared-result object.
After this call, rhs is empty.
Might throw std::bad_alloc if fails to allocate memory.
*/
shared_result (result<type> rhs);
/*
Copy constructor. Creates a copy of the shared result object that monitors the same task.
*/
shared_result ( const shared_result&) noexcept = default ;
/*
Move constructor. Moves rhs to *this. After this call, rhs is empty.
*/
shared_result (shared_result&& rhs) noexcept = default ;
/*
Copy assignment operator. Copies rhs to *this and monitors the same task that rhs monitors.
*/
shared_result& operator =( const shared_result& rhs) noexcept ;
/*
Move assignment operator. Moves rhs to *this. After this call, rhs is empty.
*/
shared_result& operator =(shared_result&& rhs) noexcept ;
/*
Returns true if this is a non-empty shared-result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The return value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Blocks the current thread of execution until this shared-result is ready,
when status() != result_status::idle.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void wait ();
/*
Blocks until this shared-result is ready or duration has passed.
Returns the status of this shared-result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class duration_type , class ratio_type >
result_status wait_for (std::chrono::duration<duration_type, ratio_type> duration);
/*
Blocks until this shared-result is ready or timeout_time has reached.
Returns the status of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class clock_type , class duration_type >
result_status wait_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Blocks the current thread of execution until this shared-result is ready,
when status() != result_status::idle.
If the result is a valid value, a reference to it is returned,
otherwise, get rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
std:: add_lvalue_reference_t <type> get ();
/*
Returns an awaitable used to await this shared-result.
If the shared-result is already ready - the current coroutine resumes
immediately in the calling thread of execution.
If the shared-result is not ready yet, the current coroutine is
suspended and resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, a reference to it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to resolve this shared-result.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
};shared_result : Dans cet exemple, un objet result est converti en un objet shared_result et une référence à un résultat int asynchrone est acquise par de nombreuses tâches engendrées avec thread_executor .
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < chrono >
concurrencpp::result< void > consume_shared_result (concurrencpp::shared_result< int > shared_result,
std::shared_ptr<concurrencpp::executor> resume_executor) {
std::cout << " Awaiting shared_result to have a value " << std::endl;
const auto & async_value = co_await shared_result;
concurrencpp::resume_on (resume_executor);
std::cout << " In thread id " << std::this_thread::get_id () << " , got: " << async_value << " , memory address: " << &async_value << std::endl;
}
int main () {
concurrencpp::runtime runtime;
auto result = runtime. background_executor ()-> submit ([] {
std::this_thread::sleep_for ( std::chrono::seconds ( 1 ));
return 100 ;
});
concurrencpp::shared_result< int > shared_result ( std::move (result));
concurrencpp::result< void > results[ 8 ];
for ( size_t i = 0 ; i < 8 ; i++) {
results[i] = consume_shared_result (shared_result, runtime. thread_pool_executor ());
}
std::cout << " Main thread waiting for all consumers to finish " << std::endl;
auto tpe = runtime. thread_pool_executor ();
auto all_consumed = concurrencpp::when_all (tpe, std::begin (results), std::end (results)). run ();
all_consumed. get ();
std::cout << " All consumers are done, exiting " << std::endl;
return 0 ;
} Lorsque l'objet d'exécution sort de la portée de main , il itère chaque exécuteur stocké et appelle sa méthode shutdown . Essayer d'accéder à la fibre de temporisation ou à tout exécuteur lancera une exception errors::runtime_shutdown . Lorsqu'un exécuteur s'arrête, il efface ses files d'attente de tâches intérieures, détruisant des objets task non exécutés. Si un objet de tâche stocke un concurrencp-coroutine, cette coroutine reprend en ligne et une exception errors::broken_task est jetée à l'intérieur. Dans tous les cas où une exception runtime_shutdown ou une exception broken_task est lancée, les applications doivent mettre fin à leur flux de code actuel gracieusement dès que possible. Ces exceptions ne doivent pas être ignorées. runtime_shutdown et broken_task héritent de errors::interrupted_task Base Class, et ce type peut également être utilisé dans une clause catch pour gérer la terminaison de manière unifiée.
De nombreuses actions asynchrones concurrencp nécessitent une instance d'un exécuteur testamentaire comme exécuteur de CV . Lorsqu'une action asynchrone (implémentée sous forme de coroutine) peut terminer de manière synchrone, elle reprend immédiatement dans le fil d'appel de l'exécution. Si l'action asynchrone ne peut pas terminer de manière synchrone, elle reprendra à la fin, à l'intérieur de l'exécuteur de CV donné. Par exemple, la fonction utilitaire when_any nécessite une instance d'un EXECUTOR CV comme son premier argument. when_any renvoie un lazy_result qui devient prêt quand au moins un résultat donné devient prêt. Si l'un des résultats est déjà prêt au moment de l'appel when_any , la Coroutine d'appel reprend de manière synchrone dans le fil d'appel de l'exécution. Sinon, la coroutine appelle sera reprise lorsque au moins le résultat sera terminé, à l'intérieur de l'exécuteur de CV donné. Les exécuteurs de reprise sont importants car ils obligent où les coroutines relèvent dans les cas où il n'est pas clair où une coroutine est censée reprendre (par exemple, dans le cas de when_any and when_all ), ou dans les cas où l'action asynchrone est traitée dans l'un des travailleurs concurrencp, qui ne sont utilisés que pour traiter cette action spécifique et non le code d'application.
make_ready_result make_ready_result crée un objet de résultat prêt à partir d'arguments donnés. En attente d'un tel résultat, la Coroutine actuelle reprendra immédiatement. get et operator co_await renverra la valeur construite.
/*
Creates a ready result object by building <<type>> from arguments&&... in-place.
Might throw any exception that the constructor
of type(std::forward<argument_types>(arguments)...) throws.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type , class ... argument_types>
result<type> make_ready_result (argument_types&& ... arguments);
/*
An overload for void type.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
result< void > make_ready_result ();make_exceptional_result make_exceptional_result crée un objet de résultat prêt à partir d'une exception donnée. En attente d'un tel résultat, la Coroutine actuelle reprendra immédiatement. get and operator co_await remémorera l'exception donnée.
/*
Creates a ready result object from an exception pointer.
The returned result object will re-throw exception_ptr when calling get or await.
Throws std::invalid_argument if exception_ptr is null.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type >
result<type> make_exceptional_result (std::exception_ptr exception_ptr);
/*
Overload. Similar to make_exceptional_result(std::exception_ptr),
but gets an exception object directly.
Might throw any exception that the constructor of exception_type(std::move(exception)) might throw.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type , class exception_type >
result<type> make_exceptional_result (exception_type exception );when_all when_all est une fonction utilitaire qui crée un objet de résultat paresseux qui devient prêt lorsque tous les résultats d'entrée sont terminés. En attente de ce résultat paresseux renvoie tous les objets de resulte d'entrée dans un état prêt, prêt à être consommé.
La fonction when_all est livrée avec trois saveurs - une qui accepte une gamme hétérogène d'objets de résultat, une autre qui obtient une paire d'itérateurs à une gamme d'objets de résultat du même type, et enfin une surcharge qui n'accepte aucun objet de résultat du tout. Dans le cas de l'absence d'objets de résultat d'entrée - la fonction renvoie un objet de résultat prêt à un tuple vide.
Si l'un des objets de résultat passé est vide, une exception sera lancée. Dans ce cas, les objets de resulte d'entrée ne sont pas affectés par la fonction et peuvent être utilisés à nouveau après la gestion de l'exception. Si tous les objets de résultat d'entrée sont valides, ils sont vidés par cette fonction et renvoyés dans un état valide et prêt comme le résultat de la sortie.
Actuellement, when_all accepte uniquement les objets result .
Toutes les surcharges acceptent un exécuteur de CV comme premier paramètre. En attendant un résultat renvoyé par when_all , l'appelant Coroutine sera repris par l'exécuteur de CV donné.
/*
Creates a result object that becomes ready when all the input results become ready.
Passed result objects are emptied and returned as a tuple.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class ... result_types>
lazy_result<std::tuple< typename std::decay<result_types>::type...>>
when_all (std::shared_ptr<executor_type> resume_executor,
result_types&& ... results);
/*
Overload. Similar to when_all(result_types&& ...) but receives a pair of iterators referencing a range.
Passed result objects are emptied and returned as a vector.
If begin == end, the function returns immediately with an empty vector.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class iterator_type >
lazy_result<std::vector< typename std::iterator_traits<iterator_type>::value_type>>
when_all (std::shared_ptr<executor_type> resume_executor,
iterator_type begin, iterator_type end);
/*
Overload. Returns a ready result object that doesn't monitor any asynchronous result.
Might throw an std::bad_alloc exception if no memory is available.
*/
lazy_result<std::tuple<>> when_all (std::shared_ptr<executor_type> resume_executor);when_any la fonction when_any et une fonction utilitaire qui crée un objet de résultat paresseux qui devient prêt lorsque au moins un résultat d'entrée est terminé. En attente de ce résultat, une structure d'assistance contenant tous les objets de resulte d'entrée plus l'index de la tâche terminée. Il se pourrait qu'au moment de consommer le résultat prêt, d'autres résultats auraient déjà terminé de manière asynchrone. Les applications peuvent appeler when_any à plusieurs reprises afin de consommer des résultats prêts à la fin de leur complétude jusqu'à ce que tous les résultats soient consommés.
when_any la fonction est livrée avec seulement deux saveurs - une qui accepte une gamme hétérogène d'objets de résultat et une autre qui obtient une paire d'itérateurs à une gamme d'objets de résultat du même type. Contrairement à when_all , il n'y a aucun sens à attendre au moins une tâche à terminer lorsque la plage de résultats est complètement vide. Par conséquent, il n'y a pas de surcharge sans arguments. De plus, la surcharge de deux itérateurs lèvera une exception si ces itérateurs font référence à une plage vide (lorsque begin == end ).
Si l'un des objets de résultat passé est vide, une exception sera lancée. Dans tous les cas, une exception est lancée, les objets à résults d'entrée ne sont pas affectés par la fonction et peuvent être utilisés à nouveau après la gestion de l'exception. Si tous les objets de résultat d'entrée sont valides, ils sont vidés par cette fonction et renvoyés à l'état valide en fonction de la sortie.
Actuellement, when_any accepte uniquement les objets result .
Toutes les surcharges acceptent un exécuteur de CV comme premier paramètre. En attendant un résultat renvoyé par when_any , l'appelant Coroutine sera repris par l'exécuteur de curriculum vitae donné.
/*
Helper struct returned from when_any.
index is the position of the ready result in results sequence.
results is either an std::tuple or an std::vector of the results that were passed to when_any.
*/
template < class sequence_type >
struct when_any_result {
std:: size_t index;
sequence_type results;
};
/*
Creates a result object that becomes ready when at least one of the input results is ready.
Passed result objects are emptied and returned as a tuple.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class ... result_types>
lazy_result<when_any_result<std::tuple<result_types...>>>
when_any (std::shared_ptr<executor_type> resume_executor,
result_types&& ... results);
/*
Overload. Similar to when_any(result_types&& ...) but receives a pair of iterators referencing a range.
Passed result objects are emptied and returned as a vector.
Throws std::invalid_argument if begin == end.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class iterator_type >
lazy_result<when_any_result<std::vector< typename std::iterator_traits<iterator_type>::value_type>>>
when_any (std::shared_ptr<executor_type> resume_executor,
iterator_type begin, iterator_type end);resume_on resume_on renvoie un attendable qui suspend la coroutine actuelle et la reprend à l'intérieur executor donné. Il s'agit d'une fonction importante qui garantit qu'une coroutine s'exécute dans le bon exécuteur. Par exemple, les applications peuvent planifier une tâche d'arrière-plan à l'aide du background_executor et attendre l'objet de résultat renvoyé. Dans ce cas, la coroutine en attente sera reprise à l'intérieur de l'exécuteur de l'arrière-plan. Un appel à resume_on avec un autre exécuteur lié au CPU s'assure que les lignes de code liées au CPU ne s'exécuteront pas sur l'exécuteur de l'arrière-plan une fois la tâche d'arrière-plan terminée. Si une tâche est reprochée pour s'exécuter sur un autre exécuteur à l'aide resume_on , mais que l'exécuteur est fermé avant de pouvoir reprendre la tâche suspendue, cette tâche est reprise immédiatement et une exception erros::broken_task est lancée. Dans ce cas, les candidatures doivent être avec élégance.
/*
Returns an awaitable that suspends the current coroutine and resumes it inside executor.
Might throw any exception that executor_type::enqueue throws.
*/
template < class executor_type >
auto resume_on (std::shared_ptr<executor_type> executor);Concurrencp fournit également des chronoméateurs et des files d'attente de minuterie. Les minuteries sont des objets qui définissent des actions asynchrones exécutées sur un exécuteur exécuteur dans un intervalle de temps bien défini. Il existe trois types de minuteries - minuteries régulières , ancs à chronométrage et objets de retard .
Les minuteries régulières ont quatre propriétés qui les définissent:
Comme d'autres objets dans concurrencp, les minuteries sont un type de mouvement uniquement qui peut être vide. Lorsqu'une minuterie est détruite ou timer::cancel est appelée, le minuteur annule ses tâches planifiées mais pas exécutées. Les tâches en cours ne sont pas effectées. La minuterie appelant le fil doit être sûre. Il est recommandé de définir le temps prévu et la fréquence des minuteries à une granularité de 50 millisecondes.
Une file d'attente de temporisation est un travailleur concurrencp qui gère une collection de minuteries et les traite dans un seul fil d'exécution. C'est également l'agent utilisé pour créer de nouveaux minuteries. Lorsqu'une date limite de minuterie (qu'il s'agisse de l'heure ou de la fréquence du minuteur), la file d'attente de la minuterie "tire" la minuterie en planifiant son appel à exécuter sur l'exécuteur associé en tant que tâche.
Tout comme les exécuteurs, les files d'attente de minuterie adhèrent également au concept RAII. Lorsque l'objet d'exécution sort de la portée, il arrête la file d'attente de la minuterie, annulant tous les minuteries en attente. Une fois qu'une file d'attente de minuterie a été arrêtée, tout appel ultérieur à make_timer , make_onshot_timer et make_delay_object lancera une exception errors::runtime_shutdown . Les applications ne doivent pas essayer d'arrêter les files d'attente de minuterie par elles-mêmes.
timer_queue : class timer_queue {
/*
Destroys this timer_queue.
*/
~timer_queue () noexcept ;
/*
Shuts down this timer_queue:
Tells the underlying thread of execution to quit and joins it.
Cancels all pending timers.
After this call, invocation of any method besides shutdown
and shutdown_requested will throw an errors::runtime_shutdown.
If shutdown had been called before, this method has no effect.
*/
void shutdown () noexcept ;
/*
Returns true if shutdown had been called before, false otherwise.
*/
bool shutdown_requested () const noexcept ;
/*
Creates a new running timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
template < class callable_type , class ... argumet_types>
timer make_timer (
std::chrono::milliseconds due_time,
std::chrono::milliseconds frequency,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new one-shot timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
template < class callable_type , class ... argumet_types>
timer make_one_shot_timer (
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new delay object where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
result< void > make_delay_object (
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor);
};timer : class timer {
/*
Creates an empty timer.
*/
timer () noexcept = default ;
/*
Cancels the timer, if not empty.
*/
~timer () noexcept ;
/*
Moves the content of rhs to *this.
rhs is empty after this call.
*/
timer (timer&& rhs) noexcept = default ;
/*
Moves the content of rhs to *this.
rhs is empty after this call.
Returns *this.
*/
timer& operator = (timer&& rhs) noexcept ;
/*
Cancels this timer.
After this call, the associated timer_queue will not schedule *this
to run again and *this becomes empty.
Scheduled, but not yet executed tasks are cancelled.
Ongoing tasks are uneffected.
This method has no effect if *this is empty or the associated timer_queue has already expired.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void cancel ();
/*
Returns the associated executor of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::shared_ptr<executor> get_executor () const ;
/*
Returns the associated timer_queue of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::weak_ptr<timer_queue> get_timer_queue () const ;
/*
Returns the due time of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::chrono::milliseconds get_due_time () const ;
/*
Returns the frequency of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::chrono::milliseconds get_frequency () const ;
/*
Sets new frequency for this timer.
Callables already scheduled to run at the time of invocation are not affected.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
void set_frequency (std::chrono::milliseconds new_frequency);
/*
Returns true is *this is not an empty timer, false otherwise.
The timer should not be used if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
};Dans cet exemple, nous créons une minuterie régulière en utilisant la file d'attente de la minuterie. La minuterie planifie son appel à exécuter après 1,5 seconde, puis tire son appelable toutes les 2 secondes. Le Callable est donné sur l'exécuteur Threadpool.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
int main () {
concurrencpp::runtime runtime;
std:: atomic_size_t counter = 1 ;
concurrencpp::timer timer = runtime. timer_queue ()-> make_timer (
1500ms,
2000ms,
runtime. thread_pool_executor (),
[&] {
const auto c = counter. fetch_add ( 1 );
std::cout << " timer was invoked for the " << c << " th time " << std::endl;
});
std::this_thread::sleep_for (12s);
return 0 ;
}Une minuterie OneShot est une minuterie ponctuelle avec seulement une durée prévue - après avoir planifié son appel à fonctionner une fois qu'il ne le reprograisse jamais pour fonctionner à nouveau.
Dans cet exemple, nous créons une minuterie qui ne fonctionne qu'une seule fois - après 3 secondes à partir de sa création, le minuteur planifie son appelable pour exécuter sur un nouveau thread d'exécution (en utilisant thread_executor ).
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
int main () {
concurrencpp::runtime runtime;
concurrencpp::timer timer = runtime. timer_queue ()-> make_one_shot_timer (
3000ms,
runtime. thread_executor (),
[&] {
std::cout << " hello and goodbye " << std::endl;
});
std::this_thread::sleep_for (4s);
return 0 ;
} Un objet de retard est un objet de résultat paresseux qui devient prêt lorsqu'il est co_await ed et que son temps prévue est atteint. Les applications peuvent co_await cet objet de résultat pour retarder la coroutine actuelle de manière non bloquante. La coroutine actuelle est reprise par l'exécuteur qui a été transmis à make_delay_object .
Dans cet exemple, nous engendrer une tâche (qui ne renvoie aucun résultat ou exception lancé), qui se retarde dans une boucle en appelant co_await sur un objet de retard.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
concurrencpp::null_result delayed_task (
std::shared_ptr<concurrencpp::timer_queue> tq,
std::shared_ptr<concurrencpp::thread_pool_executor> ex) {
size_t counter = 1 ;
while ( true ) {
std::cout << " task was invoked " << counter << " times. " << std::endl;
counter++;
co_await tq-> make_delay_object (1500ms, ex);
}
}
int main () {
concurrencpp::runtime runtime;
delayed_task (runtime. timer_queue (), runtime. thread_pool_executor ());
std::this_thread::sleep_for (10s);
return 0 ;
} Un générateur est une coroutine paresseuse et synchrone qui est capable de produire un flux de valeurs à consommer. Les générateurs utilisent le mot-clé co_yield pour réintégrer les valeurs à leurs consommateurs.
Les générateurs sont censés être utilisés de manière synchrone - ils ne peuvent utiliser que le mot-clé co_yield et ne doivent pas utiliser le mot-clé co_await . Un générateur continuera de produire des valeurs tant que le mot-clé co_yield est appelé. Si le mot clé co_return est appelé (explicitement ou implicitement), le générateur cessera de produire des valeurs. De même, si une exception est lancée, le générateur cessera de produire des valeurs et l'exception lancée sera remontée au consommateur du générateur.
Les générateurs sont destinés à être utilisés dans une boucle de range-for : les générateurs produisent implicitement deux itérateurs - begin et end qui contrôlent l'exécution de la boucle for . Ces itérateurs ne doivent pas être manipulés ou accessibles manuellement.
Lorsqu'un générateur est créé, il commence comme une tâche paresseuse. Lorsque sa méthode begin est appelée, le générateur est repris pour la première fois et un itérateur est renvoyé. La tâche paresseuse est reprise à plusieurs reprises en appelant operator++ sur l'itérateur retourné. L'itérateur retourné sera égal à Iterator end lorsque le générateur termine l'exécution soit en sortant gracieusement ou en lançant une exception. Comme mentionné précédemment, cela se produit dans les coulisses par le mécanisme intérieur de la boucle et du générateur, et ne doit pas être appelé directement.
Comme d'autres objets dans Concurrencp, les générateurs sont un type de déplacement uniquement. Une fois qu'un générateur a été déplacé, il est considéré comme vide et essaie d'accéder à ses méthodes intérieures (autres que operator bool ) lancera une exception. Le vide d'un générateur ne doit généralement pas se produire - il est conseillé de consommer des générateurs sur leur création dans une for pour ne pas essayer d'appeler leurs méthodes individuellement.
generator class generator {
/*
Move constructor. After this call, rhs is empty.
*/
generator (generator&& rhs) noexcept ;
/*
Destructor. Invalidates existing iterators.
*/
~generator () noexcept ;
generator ( const generator& rhs) = delete ;
generator& operator =(generator&& rhs) = delete ;
generator& operator =( const generator& rhs) = delete ;
/*
Returns true if this generator is not empty.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Starts running this generator and returns an iterator.
Throws errors::empty_generator if *this is empty.
Re-throws any exception that is thrown inside the generator code.
*/
iterator begin ();
/*
Returns an end iterator.
*/
static generator_end_iterator end () noexcept ;
};
class generator_iterator {
using value_type = std:: remove_reference_t <type>;
using reference = value_type&;
using pointer = value_type*;
using iterator_category = std::input_iterator_tag;
using difference_type = std:: ptrdiff_t ;
/*
Resumes the suspended generator and returns *this.
Re-throws any exception that was thrown inside the generator code.
*/
generator_iterator& operator ++();
/*
Post-increment version of operator++.
*/
void operator ++( int );
/*
Returns the latest value produced by the associated generator.
*/
reference operator *() const noexcept ;
/*
Returns a pointer to the latest value produced by the associated generator.
*/
pointer operator ->() const noexcept ;
/*
Comparision operators.
*/
friend bool operator ==( const generator_iterator& it0, const generator_iterator& it1) noexcept ;
friend bool operator ==( const generator_iterator& it, generator_end_iterator) noexcept ;
friend bool operator ==(generator_end_iterator end_it, const generator_iterator& it) noexcept ;
friend bool operator !=( const generator_iterator& it, generator_end_iterator end_it) noexcept ;
friend bool operator !=(generator_end_iterator end_it, const generator_iterator& it) noexcept ;
};generator : Dans cet exemple, nous écrivons un générateur qui donne le membre n-th de la séquence S(n) = 1 + 2 + 3 + ... + n où n <= 100 :
concurrencpp::generator< int > sequence () {
int i = 1 ;
int sum = 0 ;
while (i <= 100 ) {
sum += i;
++i;
co_yield sum;
}
}
int main () {
for ( auto value : sequence ()) {
std::cout << value << std::end;
}
return 0 ;
} Les serrures synchrones régulières ne peuvent pas être utilisées en toute sécurité à l'intérieur des tâches pour un certain nombre de raisons:
std::mutex , devraient être verrouillés et déverrouillés dans le même thread d'exécution. Déverrouiller un verrou synchrone dans un fil qui n'avait pas verrouillé il est un comportement non défini. Étant donné que les tâches peuvent être suspendues et reprendre dans n'importe quel thread d'exécution, les verrous synchrones se briseront lorsqu'ils sont utilisés à l'intérieur des tâches. concurrencpp::async_lock résout ces problèmes en fournissant une API similaire à std::mutex , avec la principale différence qui appelle à concurrencpp::async_lock renverra un résulte paresseux qui peut être co_awaited en toute sécurité à l'intérieur des tâches. Si une tâche essaie de verrouiller un verrouillage asynchrone et échoue, la tâche sera suspendue et reprendra lorsque le verrou est déverrouillé et acquis par la tâche suspendue. Cela permet aux exécuteurs testamentaires de traiter une énorme quantité de tâches en attendant d'acquérir un verrou sans commutation de contexte coûteuse et des appels de noyau coûteux.
Semblable à la fonctionnalité std::mutex , une seule tâche peut acquérir async_lock à tout moment, et une barrière de lecture est place au moment de l'acquisition. La libération d'un verrouillage asynchrone place une barrière d'écriture et permet à la prochaine tâche de l'acquérir, créant une chaîne de modificateurs à un moment qui voit les changements que d'autres modificateurs avaient effectués et publie ses modifications pour les modificateurs suivants.
Comme std::mutex , concurrencpp::async_lock n'est pas récursif . Une attention supplémentaire doit être accordée lors de l'acquisition d'un tel verrou - une serrure ne doit pas être acquise à nouveau dans une tâche qui a été engendrée par une autre tâche qui avait déjà acquis le verrou. Dans un tel cas, un verrouillage mort inévitable se produira. Contrairement à d'autres objets dans concurrencp, async_lock n'est ni copiable ni mobile.
Comme les verrous standard, concurrencpp::async_lock est destiné à être utilisé avec des emballages dans les lavabos qui exploitent l'idiome RAII C ++ pour s'assurer que les verrous sont toujours déverrouillés lors du retour de la fonction ou une exception lancée. async_lock::lock renvoie un résulte paresseux d'un wrapper de portée qui appelle async_lock::unlock sur la destruction. Les utilisations brutes de async_lock::unlock sont découragées. concurrencpp::scoped_async_lock agit comme l'emballage dans le cadre et fournit une API qui est presque identique à std::unique_lock . concurrencpp::scoped_async_lock est mobile, mais pas copiable.
async_lock::lock et scoped_async_lock::lock nécessitent un reopproc-excutor comme paramètre. Lors de l'appel de ces méthodes, si le verrou est disponible pour le verrouillage, il est verrouillé et la tâche actuelle reprend immédiatement. Sinon, la tâche actuelle est suspendue et reprendra à l'intérieur de l'exécuteur de curriculum vitae donné lorsque le verrou est finalement acquis.
concurrencpp::scoped_async_lock enveloppe un async_lock et s'assurez qu'il est correctement déverrouillé. Comme std::unique_lock , il y a des cas où il n'enroule aucune serrure, et dans ce cas, il est considéré comme vide. Un scoped_async_lock vide peut se produire lorsqu'il est construit par défaut, déplacé ou scoped_async_lock::release est appelée. Un Lock-Lock de portée vide ne déverrouillera aucune serrure sur la destruction.
Même si le Lamage-Async-Lock n'est pas vide, cela ne signifie pas qu'il possède le verrouillage asynchrone sous-jacent et qu'il le déverrouillera sur la destruction. Les verrous non excités et non accessibles peuvent se produire si scoped_async_lock::unlock a été appelé ou si le constructeur SCOPED-ASYNC a été construit à l'aide du constructeur scoped_async_lock(async_lock&, std::defer_lock_t) .
async_lock class async_lock {
/*
Constructs an async lock object.
*/
async_lock () noexcept ;
/*
Destructs an async lock object.
*this is not automatically unlocked at the moment of destruction.
*/
~async_lock () noexcept ;
/*
Asynchronously acquires the async lock.
If *this has already been locked by another non-parent task, the current task will be suspended
and will be resumed when *this is acquired, inside resume_executor.
If *this has not been locked by another task, then *this will be acquired and the current task will be resumed
immediately in the calling thread of execution.
If *this has already been locked by a parent task, then unavoidable dead-lock will occur.
Throws std::invalid_argument if resume_executor is null.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result<scoped_async_lock> lock (std::shared_ptr<executor> resume_executor);
/*
Tries to acquire *this in the calling thread of execution.
Returns true if *this is acquired, false otherwise.
In any case, the current task is resumed immediately in the calling thread of execution.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result< bool > try_lock ();
/*
Releases *this and allows other tasks (including suspended tasks waiting for *this) to acquire it.
Throws std::system error if *this is not locked at the moment of calling this method.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
void unlock ();
};scoped_async_lock class scoped_async_lock {
/*
Constructs an async lock wrapper that does not wrap any async lock.
*/
scoped_async_lock () noexcept = default ;
/*
If *this wraps async_lock, this method releases the wrapped lock.
*/
~scoped_async_lock () noexcept ;
/*
Moves rhs to *this.
After this call, *rhs does not wrap any async lock.
*/
scoped_async_lock (scoped_async_lock&& rhs) noexcept ;
/*
Wrapps unlocked lock.
lock must not be in acquired mode when calling this method.
*/
scoped_async_lock (async_lock& lock, std:: defer_lock_t ) noexcept ;
/*
Wrapps locked lock.
lock must be already acquired when calling this method.
*/
scoped_async_lock (async_lock& lock, std:: adopt_lock_t ) noexcept ;
/*
Calls async_lock::lock on the wrapped locked, using resume_executor as a parameter.
Throws std::invalid_argument if resume_executor is nulll.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::lock throws.
*/
lazy_result< void > lock (std::shared_ptr<executor> resume_executor);
/*
Calls async_lock::try_lock on the wrapped lock.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::try_lock throws.
*/
lazy_result< bool > try_lock ();
/*
Calls async_lock::unlock on the wrapped lock.
If *this does not wrap any lock, this method does nothing.
Throws std::system_error if *this wraps a lock and it is not locked.
*/
void unlock ();
/*
Checks whether *this wraps a locked mutex or not.
Returns true if wrapped locked is in acquired state, false otherwise.
*/
bool owns_lock () const noexcept ;
/*
Equivalent to owns_lock.
*/
explicit operator bool () const noexcept ;
/*
Swaps the contents of *this and rhs.
*/
void swap (scoped_async_lock& rhs) noexcept ;
/*
Empties *this and returns a pointer to the previously wrapped lock.
After a call to this method, *this doesn't wrap any lock.
The previously wrapped lock is not released,
it must be released by either unlocking it manually through the returned pointer or by
capturing the pointer with another scoped_async_lock which will take ownerwhip over it.
*/
async_lock* release () noexcept ;
/*
Returns a pointer to the wrapped async_lock, or a null pointer if there is no wrapped async_lock.
*/
async_lock* mutex () const noexcept ;
};async_lock : Dans cet exemple, nous poussons 10 000 000 entiers vers un objet std::vector à partir de différentes tâches simultanément, tout en utilisant async_lock pour s'assurer qu'aucune race de données ne se produit et que l'exactitude de l'état interne de cet objet vectoriel n'est préservé.
# include " concurrencpp/concurrencpp.h "
# include < vector >
# include < iostream >
std::vector< size_t > numbers;
concurrencpp::async_lock lock;
concurrencpp::result< void > add_numbers (concurrencpp::executor_tag,
std::shared_ptr<concurrencpp::executor> executor,
size_t begin,
size_t end) {
for ( auto i = begin; i < end; i++) {
concurrencpp::scoped_async_lock raii_wrapper = co_await lock. lock (executor);
numbers. push_back (i);
}
}
int main () {
concurrencpp::runtime runtime;
constexpr size_t range = 10'000'000 ;
constexpr size_t sections = 4 ;
concurrencpp::result< void > results[sections];
for ( size_t i = 0 ; i < 4 ; i++) {
const auto range_start = i * range / sections;
const auto range_end = (i + 1 ) * range / sections;
results[i] = add_numbers ({}, runtime. thread_pool_executor (), range_start, range_end);
}
for ( auto & result : results) {
result. get ();
}
std::cout << " vector size is " << numbers. size () << std::endl;
// make sure the vector state has not been corrupted by unprotected concurrent accesses
std::sort (numbers. begin (), numbers. end ());
for ( size_t i = 0 ; i < range; i++) {
if (numbers[i] != i) {
std::cerr << " vector state is corrupted. " << std::endl;
return - 1 ;
}
}
std::cout << " succeeded pushing range [0 - 10,000,000] concurrently to the vector! " << std::endl;
return 0 ;
} async_condition_variable imite la condition_variable standard_variable et peut être utilisé en toute sécurité avec des tâches aux côtés async_lock . async_condition_variable fonctionne avec async_lock pour suspendre une tâche jusqu'à ce qu'une mémoire partagée (protégée par le verrou) ait modifié. Les tâches qui souhaitent surveiller les modifications de mémoire partagées verrouilleront une instance d' async_lock et appellent async_condition_variable::await . Cela déverrouillera atomiquement le verrou et suspendra la tâche actuelle jusqu'à ce qu'une tâche de modificateur avise la variable de condition. Une tâche de modificateur acquiert le verrou, modifie la mémoire partagée, déverrouille le verrou et appelle notify_one ou notify_all . Lorsqu'une tâche suspendue est reprise (en utilisant l'exécuteur de CV qui a été donné à await ), il verrouille à nouveau le verrou, permettant à la tâche de continuer à partir du point de suspension de manière transparente. Comme async_lock , async_condition_variable n'est ni mobile ni copiable - il est censé être créé en un seul endroit et accessible par plusieurs tâches.
async_condition_variable::await les surcharges nécessitent un exécuteur de CV, qui sera utilisé pour reprendre la tâche, et un scoped_async_lock verrouillé. async_condition_variable::await est livré avec deux surcharges - une qui accepte un prédicat et qui ne le fait pas. La surcharge qui n'accepte pas un prédicat suspendre la tâche d'appel immédiatement après l'invocation jusqu'à ce qu'elle soit reproduite par un appel à notify_* . La surcharge qui accepte un prédicat fonctionne en laissant le prédicat inspecter la mémoire partagée et suspendre la tâche à plusieurs reprises jusqu'à ce que la mémoire partagée atteigne son état recherché. schématiquement cela fonctionne comme appeler
while (!pred()) { // pred() inspects the shared memory and returns true or false
co_await await (resume_executor, lock); // suspend the current task until another task calls `notify_xxx`
} Tout comme la variable de condition standard, les applications sont encouragées à utiliser la surcharge de prédicat, car elle permet un contrôle plus fin sur les suspensions et les reprise. async_condition_variable peut être utilisé pour écrire des collections et des structures de données simultanés comme des files d'attente et des canaux simultanés.
En interne, async_condition_variable tient une file de suspension, dans laquelle les tâches se sont en cours lorsqu'ils attendent la variable de condition à notifier. Lorsque l'une des méthodes notify_* est appelée, la tâche de notification désoblige soit une tâche ou toutes les tâches, en fonction de la méthode invoquée. Les tâches sont désactivées de la fibre de suspension de manière FIFO. Par exemple, si la tâche A appelle await , puis les appels de la tâche B await , alors les appels de tâche C notify_one , alors la tâche en interne A sera désactivée et reprise. La tâche B restera suspendue jusqu'à ce qu'un autre appel à notify_one ou notify_all soit appelé. Si la tâche A et la tâche B sont suspendues et que la tâche C appelle notify_all , les deux tâches seront désactivées et relèveront.
async_condition_variable class async_condition_variable {
/*
Constructor.
*/
async_condition_variable () noexcept ;
/*
Atomically releases lock and suspends the current task by adding it to *this suspension-queue.
Throws std::invalid_argument if resume_executor is null.
Throws std::invalid_argument if lock is not locked at the moment of calling this method.
Might throw std::system_error if the underlying std::mutex throws.
*/
lazy_result< void > await (std::shared_ptr<executor> resume_executor, scoped_async_lock& lock);
/*
Equivalent to:
while (!pred()) {
co_await await(resume_executor, lock);
}
Might throw any exception that await(resume_executor, lock) might throw.
Might throw any exception that pred might throw.
*/
template < class predicate_type >
lazy_result< void > await (std::shared_ptr<executor> resume_executor, scoped_async_lock& lock, predicate_type pred);
/*
Dequeues one task from *this suspension-queue and resumes it, if any available at the moment of calling this method.
The suspended task is resumed by scheduling it to run on the executor given when await was called.
Might throw std::system_error if the underlying std::mutex throws.
*/
void notify_one ();
/*
Dequeues all tasks from *this suspension-queue and resumes them, if any available at the moment of calling this method.
The suspended tasks are resumed by scheduling them to run on the executors given when await was called.
Might throw std::system_error if the underlying std::mutex throws.
*/
void notify_all ();
};async_condition_variable : Dans cet exemple, async_lock et async_condition_variable travaillent ensemble pour implémenter une file d'attente simultanée qui peut être utilisée pour envoyer des données (dans cet exemple, entiers) entre les tâches. Notez que certaines méthodes renvoient un result tandis qu'un autre retourne lazy_result , montrant comment les tâches désireuses et paresseuses peuvent fonctionner ensemble.
# include " concurrencpp/concurrencpp.h "
# include < queue >
# include < iostream >
using namespace concurrencpp ;
class concurrent_queue {
private:
async_lock _lock;
async_condition_variable _cv;
std::queue< int > _queue;
bool _abort = false ;
public:
concurrent_queue () = default ;
result< void > shutdown (std::shared_ptr<executor> resume_executor) {
{
auto guard = co_await _lock. lock (resume_executor);
_abort = true ;
}
_cv. notify_all ();
}
lazy_result< void > push (std::shared_ptr<executor> resume_executor, int i) {
{
auto guard = co_await _lock. lock (resume_executor);
_queue. push (i);
}
_cv. notify_one ();
}
lazy_result< int > pop (std::shared_ptr<executor> resume_executor) {
auto guard = co_await _lock. lock (resume_executor);
co_await _cv. await (resume_executor, guard, [ this ] {
return _abort || !_queue. empty ();
});
if (!_queue. empty ()) {
auto result = _queue. front ();
_queue. pop ();
co_return result;
}
assert (_abort);
throw std::runtime_error ( " queue has been shut down. " );
}
};
result< void > producer_loop (executor_tag,
std::shared_ptr<thread_pool_executor> tpe,
concurrent_queue& queue,
int range_start,
int range_end) {
for (; range_start < range_end; ++range_start) {
co_await queue. push (tpe, range_start);
}
}
result< void > consumer_loop (executor_tag, std::shared_ptr<thread_pool_executor> tpe, concurrent_queue& queue) {
try {
while ( true ) {
std::cout << co_await queue. pop (tpe) << std::endl;
}
} catch ( const std:: exception & e) {
std::cerr << e. what () << std::endl;
}
}
int main () {
runtime runtime;
const auto thread_pool_executor = runtime. thread_pool_executor ();
concurrent_queue queue;
result< void > producers[ 4 ];
result< void > consumers[ 4 ];
for ( int i = 0 ; i < 4 ; i++) {
producers[i] = producer_loop ({}, thread_pool_executor, queue, i * 5 , (i + 1 ) * 5 );
}
for ( int i = 0 ; i < 4 ; i++) {
consumers[i] = consumer_loop ({}, thread_pool_executor, queue);
}
for ( int i = 0 ; i < 4 ; i++) {
producers[i]. get ();
}
queue. shutdown (thread_pool_executor). get ();
for ( int i = 0 ; i < 4 ; i++) {
consumers[i]. get ();
}
return 0 ;
} L'objet Runtime ConcurrenCP est l'agent utilisé pour acquérir, stocker et créer de nouveaux exécuteurs.
Le runtime doit être créé en tant que type de valeur dès que la fonction principale commence à s'exécuter. Lorsque le runtime concurrencp sort de la portée, il itère sur ses exécuteurs stockés et les arrête un par un en appelant executor::shutdown . Les exécuteurs quittent ensuite leur boucle de travail intérieure et toute tentative ultérieure de planifier une nouvelle tâche lancera une exception concurrencpp::runtime_shutdown . Le runtime contient également la file d'attente de minuterie globale utilisée pour créer des minuteries et retarder des objets. Lors de la destruction, les exécuteurs stockés détruisent des tâches non exécutées et attendent que les tâches en cours se terminent. Si une tâche en cours essaie d'utiliser un exécuteur testamentaire pour engendrer de nouvelles tâches ou planifier sa propre continuation de tâche - une exception sera lancée. Dans ce cas, les tâches en cours doivent arrêter le plus tôt possible, permettant à leurs exécuteurs sous-jacents d'arrêter. La file d'attente de la minuterie sera également arrêtée, annulant tous les minuteries d'exécution. Avec ce style de code RAII, aucune tâche ne peut être traitée avant la création de l'objet d'exécution, et tandis que / après l'exécution sort de la portée. Cela libère des applications simultanées de la nécessité de communiquer explicitement les messages de terminaison. Les tâches sont des exécuteurs à utiliser gratuits tant que l'objet d'exécution est vivant.
runtime class runtime {
/*
Creates a runtime object with default options.
*/
runtime ();
/*
Creates a runtime object with user defined options.
*/
runtime ( const concurrencpp::runtime_options& options);
/*
Destroys this runtime object.
Calls executor::shutdown on each monitored executor.
Calls timer_queue::shutdown on the global timer queue.
*/
~runtime () noexcept ;
/*
Returns this runtime timer queue used to create new times.
*/
std::shared_ptr<concurrencpp::timer_queue> timer_queue () const noexcept ;
/*
Returns this runtime concurrencpp::inline_executor
*/
std::shared_ptr<concurrencpp::inline_executor> inline_executor () const noexcept ;
/*
Returns this runtime concurrencpp::thread_pool_executor
*/
std::shared_ptr<concurrencpp::thread_pool_executor> thread_pool_executor () const noexcept ;
/*
Returns this runtime concurrencpp::background_executor
*/
std::shared_ptr<concurrencpp::thread_pool_executor> background_executor () const noexcept ;
/*
Returns this runtime concurrencpp::thread_executor
*/
std::shared_ptr<concurrencpp::thread_executor> thread_executor () const noexcept ;
/*
Creates a new concurrencpp::worker_thread_executor and registers it in this runtime.
Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
*/
std::shared_ptr<concurrencpp::worker_thread_executor> make_worker_thread_executor ();
/*
Creates a new concurrencpp::manual_executor and registers it in this runtime.
Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
*/
std::shared_ptr<concurrencpp::manual_executor> make_manual_executor ();
/*
Creates a new user defined executor and registers it in this runtime.
executor_type must be a valid concrete class of concurrencpp::executor.
Might throw std::bad_alloc if no memory is available.
Might throw any exception that the constructor of <<executor_type>> might throw.
*/
template < class executor_type , class ... argument_types>
std::shared_ptr<executor_type> make_executor (argument_types&& ... arguments);
/*
returns the version of concurrencpp that the library was built with.
*/
static std::tuple< unsigned int , unsigned int , unsigned int > version () noexcept ;
}; Dans certains cas, les applications sont intéressées à surveiller la création et la résiliation des threads, par exemple, certains allocateurs de mémoire nécessitent que de nouveaux threads soient enregistrés et non enregistrés lors de leur création et de leur résiliation. Le concurrencp Runtime permet de définir un rappel de création de threads et un rappel de terminaison de thread. Ces rappels seront appelés chaque fois que l'un des travailleurs concurrencp créera un nouveau thread et lorsque ce thread se termine. Ces rappels sont toujours appelés de l'intérieur du fil créé / terminant, donc std::this_thread::get_id renvoie toujours l'ID de thread pertinent. La signature de ces rappels est void callback (std::string_view thread_name) . thread_name est un titre spécifique concurrencp donné au thread et peut être observé dans certains débogueurs qui présentent le nom du fil. Le nom du fil n'est pas garanti d'être unique et doit être utilisé pour l'exploitation forestière et le débogage.
Afin de définir un rappel de création de threads et / ou un rappel de terminaison de thread, les applications peuvent définir les membres thread_started_callback et / ou thread_terminated_callback des runtime_options qui sont transmis au constructeur d'exécution. Étant donné que ces rappels sont copiés pour chaque travailleur concurrencp qui pourrait créer des threads, ces rappels doivent être copiables.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::runtime_options options;
options. thread_started_callback = [](std::string_view thread_name) {
std::cout << " A new thread is starting to run, name: " << thread_name << " , thread id: " << std::this_thread::get_id ()
<< std::endl;
};
options. thread_terminated_callback = [](std::string_view thread_name) {
std::cout << " A thread is terminating, name: " << thread_name << " , thread id: " << std::this_thread::get_id () << std::endl;
};
concurrencpp::runtime runtime (options);
const auto timer_queue = runtime. timer_queue ();
const auto thread_pool_executor = runtime. thread_pool_executor ();
concurrencpp::timer timer =
timer_queue-> make_timer ( std::chrono::milliseconds ( 100 ), std::chrono::milliseconds ( 500 ), thread_pool_executor, [] {
std::cout << " A timer callable is executing " << std::endl;
});
std::this_thread::sleep_for ( std::chrono::seconds ( 3 ));
return 0 ;
}Sortie possible:
A new thread is starting to run, name: concurrencpp::timer_queue worker, thread id: 7496
A new thread is starting to run, name: concurrencpp::thread_pool_executor worker, thread id: 21620
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A thread is terminating, name: concurrencpp::timer_queue worker, thread id: 7496
A thread is terminating, name: concurrencpp::thread_pool_executor worker, thread id: 21620
Les applications peuvent créer leur propre type d'exécuteur personnalisé en héritant de la classe derivable_executor . Il y a quelques points à considérer lors de la mise en œuvre d'exécuteurs définis par l'utilisateur: la chose la plus importante est de se rappeler que les exécuteurs sont utilisés à partir de plusieurs threads, donc les méthodes implémentées doivent être des filets.
Les nouveaux exécuteurs peuvent être créés à l'aide de runtime::make_executor . Les applications ne doivent pas créer de nouveaux exécuteurs avec une instanciation simple (comme std::make_shared ou Plain new ), uniquement en utilisant runtime::make_executor . En outre, les applications ne doivent pas essayer de réinstaller les exécuteurs concurrencp intégrés, comme le thread_pool_executor ou le thread_executor , ces exécuteurs ne doivent être accessibles que par leurs instances existantes dans l'objet d'exécution.
Un autre point important consiste à gérer correctement l'arrêt: shutdown , shutdown_requested et enqueue devraient tous surveiller l'état de l'exécuteur et se comporter en conséquence lorsqu'ils sont invoqués:
shutdown devrait indiquer aux fils sous-jacents de quitter, puis de les rejoindre.shutdown peut être appelé plusieurs fois et la méthode doit gérer ce scénario en ignorant les appels ultérieurs à shutdown après la première invocation.enqueue doit lancer un concurrencpp::errors::runtime_shutdown si shutdown avait été appelé auparavant. task La mise en œuvre des exécuteurs est l'un des rares cas où les applications doivent travailler directement avec concurrencpp::task Class. concurrencpp::task est une std::function comme objet, mais avec quelques différences. Comme std::function , l'objet Task stocke un appelable qui agit comme l'opération asynchrone. Contrairement à std::function , task est un type de mouvement uniquement. Sur Invocation, les objets de tâche ne reçoivent aucun paramètre et retourne void . De plus, chaque objet de tâche ne peut être invoqué qu'une seule fois. Après la première invocation, l'objet de tâche devient vide. Invoquer un objet de tâche vide équivaut à invoquer un lambda vide ( []{} ), et ne lancera aucune exception. Les objets de tâche reçoivent leur appelable en tant que référence de transfert ( type&& où type est un paramètre de modèle), et non par copie (comme std::function ). La construction de l'appelable stocké se produit en place. Cela permet aux objets de tâche de contenir des callables qui sont de type uniquement (comme std::unique_ptr et concurrencpp::result ). Les objets de tâche essaient d'utiliser différentes méthodes pour optimiser l'utilisation des types stockés, par exemple, les objets de tâche appliquent l'optimisation courte-bouffère (SBO) pour les petites callables régulières et en inserteront les appels à std::coroutine_handle<void> en les appelant directement sans répartition virtuelle.
task class task {
/*
Creates an empty task object.
*/
task () noexcept ;
/*
Creates a task object by moving the stored callable of rhs to *this.
If rhs is empty, then *this will also be empty after construction.
After this call, rhs is empty.
*/
task (task&& rhs) noexcept ;
/*
Creates a task object by storing callable in *this.
<<typename std::decay<callable_type>::type>> will be in-place-
constructed inside *this by perfect forwarding callable.
*/
template < class callable_type >
task (callable_type&& callable);
/*
Destroys stored callable, does nothing if empty.
*/
~task () noexcept ;
/*
If *this is empty, does nothing.
Invokes stored callable, and immediately destroys it.
After this call, *this is empty.
May throw any exception that the invoked callable may throw.
*/
void operator ()();
/*
Moves the stored callable of rhs to *this.
If rhs is empty, then *this will also be empty after this call.
If *this already contains a stored callable, operator = destroys it first.
*/
task& operator =(task&& rhs) noexcept ;
/*
If *this is not empty, task::clear destroys the stored callable and empties *this.
If *this is empty, clear does nothing.
*/
void clear () noexcept ;
/*
Returns true if *this stores a callable. false otherwise.
*/
explicit operator bool () const noexcept ;
/*
Returns true if *this stores a callable,
and that stored callable has the same type as <<typename std::decay<callable_type>::type>>
*/
template < class callable_type >
bool contains () const noexcept ;
}; Lors de la mise en œuvre d'exécuteurs définis par l'utilisateur, il appartient à l'implémentation de stocker des objets task (lorsque enqueue est appelé) et de les exécuter en fonction du mécanisme interne exécuteur.
Dans cet exemple, nous créons un exécuteur testamentaire qui enregistre des actions comme enterre les tâches ou les exécuter. Nous implémentons l'interface executor et nous demandons l'exécution pour créer et en stocker une instance en appelant runtime::make_executor . Le reste de l'application se comporte exactement de la même manière que si nous devions utiliser des exécuteurs non définis par l'utilisateur.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < queue >
# include < thread >
# include < mutex >
# include < condition_variable >
class logging_executor : public concurrencpp ::derivable_executor<logging_executor> {
private:
mutable std::mutex _lock;
std::queue<concurrencpp::task> _queue;
std::condition_variable _condition;
bool _shutdown_requested;
std::thread _thread;
const std::string _prefix;
void work_loop () {
while ( true ) {
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
return ;
}
if (!_queue. empty ()) {
auto task = std::move (_queue. front ());
_queue. pop ();
lock. unlock ();
std::cout << _prefix << " A task is being executed " << std::endl;
task ();
continue ;
}
_condition. wait (lock, [ this ] {
return !_queue. empty () || _shutdown_requested;
});
}
}
public:
logging_executor (std::string_view prefix) :
derivable_executor<logging_executor>( " logging_executor " ),
_shutdown_requested ( false ),
_prefix (prefix) {
_thread = std::thread ([ this ] {
work_loop ();
});
}
void enqueue (concurrencpp::task task) override {
std::cout << _prefix << " A task is being enqueued! " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
throw concurrencpp::errors::runtime_shutdown ( " logging executor - executor was shutdown. " );
}
_queue. emplace ( std::move (task));
_condition. notify_one ();
}
void enqueue (std::span<concurrencpp::task> tasks) override {
std::cout << _prefix << tasks. size () << " tasks are being enqueued! " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
throw concurrencpp::errors::runtime_shutdown ( " logging executor - executor was shutdown. " );
}
for ( auto & task : tasks) {
_queue. emplace ( std::move (task));
}
_condition. notify_one ();
}
int max_concurrency_level () const noexcept override {
return 1 ;
}
bool shutdown_requested () const noexcept override {
std::unique_lock<std::mutex> lock (_lock);
return _shutdown_requested;
}
void shutdown () noexcept override {
std::cout << _prefix << " shutdown requested " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) return ; // nothing to do.
_shutdown_requested = true ;
lock. unlock ();
_condition. notify_one ();
_thread. join ();
}
};
int main () {
concurrencpp::runtime runtime;
auto logging_ex = runtime. make_executor <logging_executor>( " Session #1234 " );
for ( size_t i = 0 ; i < 10 ; i++) {
logging_ex-> post ([] {
std::cout << " hello world " << std::endl;
});
}
std::getchar ();
return 0 ;
}$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S . -B build /lib
$ cmake -- build build /lib --config Release$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S test -B build / test
$ cmake -- build build / test
< # for release mode: cmake --build build/test --config Release #>
$ cd build / test
$ ctest . -V -C Debug
< # for release mode: ctest . -V -C Release #> $ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -DCMAKE_BUILD_TYPE=Release -S . -B build /lib
$ cmake -- build build /lib
#optional, install the library: sudo cmake --install build/lib Avec Clang et GCC, il est également possible d'exécuter les tests avec la prise en charge de TSAN (désinfectant Thread).
$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S test -B build / test
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S test -B build/test
#for TSAN mode: cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_THREAD_SANITIZER=Yes -S test -B build/test
$ cmake -- build build / test
$ cd build / test
$ ctest . -V Lors de la compilation sur Linux, la bibliothèque essaie d'utiliser libstdc++ par défaut. Si vous avez l'intention d'utiliser libc++ comme implémentation de la bibliothèque standard, l'indicateur CMAKE_TOOLCHAIN_FILE doit être spécifié comme ci-dessous:
$ cmake -DCMAKE_TOOLCHAIN_FILE=../cmake/libc++.cmake -DCMAKE_BUILD_TYPE=Release -S . -B build /libAlternativement à la construction et à l'installation manuellement de la bibliothèque, les développeurs peuvent obtenir des versions stables de concurrencp via les gestionnaires de packages VCPKG et Conan:
VCPKG:
$ vcpkg install concurrencppConan: concurrencp sur Conopenter
ConcurrenCP est livré avec un programme de bac à sable intégré que les développeurs peuvent modifier et expérimenter, sans avoir à installer ou à lier la bibliothèque compilée à une base de code différente. Afin de jouer avec le bac à sable, les développeurs peuvent modifier sandbox/main.cpp et compiler l'application à l'aide des commandes suivantes:
$ cmake -S sandbox -B build /sandbox
$ cmake -- build build /sandbox
< # for release mode: cmake --build build/sandbox --config Release #>
$ ./ build /sandbox < # runs the sandbox> $ cmake -S sandbox -B build /sandbox
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S sandbox -B build/sandbox
$ cmake -- build build /sandbox
$ ./ build /sandbox #runs the sandbox