Concurrencpp aporta el poder de las tareas concurrentes al mundo C ++, lo que permite a los desarrolladores escribir aplicaciones altamente concurrentes de manera fácil y segura mediante el uso de tareas, ejecutores y coroutinas. Al utilizar las aplicaciones concurrencppp, pueden desglosar grandes procedimientos que deben procesarse asincrónicamente en tareas más pequeñas que se ejecutan simultáneamente y funcionan de manera cooperativa para lograr el resultado deseado. concurrencpp también permite que las aplicaciones escriban algoritmos paralelos fácilmente mediante el uso de coroutinas paralelas.
Las ventajas principales de concurrencpp son:
std::thread y std::mutex .co_await .executor APIthread_pool_executor APImanual_executorresultresultlazy_resultlazy_result APIresult_promiseresult_promise Ejemploshared_result APIshared_resultmake_ready_resultmake_exceptional_resultwhen_allwhen_anyresume_ontimer_queue APItimergeneratorgeneratorasync_lockscoped_async_lock APIasync_lockasync_condition_variable APIasync_condition_variable Ejemploruntimetasktaskconcurrencpp se basa en el concepto de tareas concurrentes. Una tarea es una operación asincrónica. Las tareas ofrecen un mayor nivel de abstracción para el código concurrente que los enfoques tradicionales centrados en el hilo. Las tareas se pueden encadenar, lo que significa que las tareas pasan su resultado asincrónico de uno a otro, donde el resultado de una tarea se usa como si fuera un parámetro o un valor intermedio de otra tarea en curso. Las tareas permiten que las aplicaciones utilicen los recursos de hardware disponibles mejor y escalen mucho más que usar subprocesos sin procesar, ya que las tareas se pueden suspender, esperando otra tarea para producir un resultado, sin bloquear los subyacentes subyacentes. Las tareas aportan mucha más productividad a los desarrolladores al permitirles concentrarse más en el lógico de negocios y menos en conceptos de bajo nivel como la gestión de hilos y la sincronización entre subcontrol.
Mientras que las tareas especifican qué acciones deben ejecutarse, los ejecutores son objetos de trabajadores que especifican dónde y cómo ejecutar tareas. Los ejecutores de repuesto de las aplicaciones de la gestión tediosa de los grupos de subprocesos y las colas de tareas. Los ejecutores también desacoplan esos conceptos del código de aplicación, al proporcionar una API unificada para crear y programar tareas.
Las tareas se comunican entre sí utilizando objetos de resultados . Un objeto de resultado es una tubería asincrónica que pasa el resultado asincrónico de una tarea a otra tarea en curso. Los resultados se pueden esperar y resolver de manera que no sea bloqueante.
Estos tres conceptos: la tarea, el ejecutor y el resultado asociado son los bloques de construcción de Concurrencpp. Los ejecutores ejecutan tareas que se comunican entre sí enviando resultados a través de objetivos de resultados. Las tareas, los ejecutores y los objetos de resultados funcionan juntos simbióticamente para producir un código concurrente que es rápido y limpio.
concurrencpp se construye alrededor del concepto RAII. Para usar tareas y ejecutores, las aplicaciones crean una instancia runtime al comienzo de la función main . El tiempo de ejecución se usa para adquirir ejecutores existentes y registrar nuevos ejecutores definidos por el usuario. Los ejecutores se utilizan para crear y programar tareas para ejecutar, y pueden devolver un objeto result que se puede usar para pasar el resultado asincrónico a otra tarea que actúa como su consumidor. Cuando se destruye el tiempo de ejecución, itera sobre cada albacea almacenado y llama a su método shutdown . Todo albacea sale con gracia. Las tareas no programadas se destruyen, y los intentos de crear nuevas tareas lanzarán una excepción.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::runtime runtime;
auto result = runtime. thread_executor ()-> submit ([] {
std::cout << " hello world " << std::endl;
});
result. get ();
return 0 ;
} En este ejemplo básico, creamos un objeto de tiempo de ejecución, luego adquirimos el ejecutor de hilo desde el tiempo de ejecución. Usamos submit para pasar una lambda como nuestro dio llamable. Este lambda devuelve void , por lo tanto, el ejecutor devuelve un result<void> que pasa el resultado asíncrono a la persona que llama. Las llamadas main get que bloquea el hilo principal hasta que el resultado esté listo. Si no se lanzó ninguna excepción, get devoluciones void . Si se lanzó una excepción, get rehacerlo. Asíncronamente, thread_executor lanza un nuevo hilo de ejecución y ejecuta la Lambda dada. Implícitamente co_return void y la tarea está terminada. main está desbloqueado.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < vector >
# include < algorithm >
# include < ctime >
using namespace concurrencpp ;
std::vector< int > make_random_vector () {
std::vector< int > vec ( 64 * 1'024 );
std::srand ( std::time ( nullptr ));
for ( auto & i : vec) {
i = :: rand ();
}
return vec;
}
result< size_t > count_even (std::shared_ptr<thread_pool_executor> tpe, const std::vector< int >& vector) {
const auto vecor_size = vector. size ();
const auto concurrency_level = tpe-> max_concurrency_level ();
const auto chunk_size = vecor_size / concurrency_level;
std::vector<result< size_t >> chunk_count;
for ( auto i = 0 ; i < concurrency_level; i++) {
const auto chunk_begin = i * chunk_size;
const auto chunk_end = chunk_begin + chunk_size;
auto result = tpe-> submit ([&vector, chunk_begin, chunk_end]() -> size_t {
return std::count_if (vector. begin () + chunk_begin, vector. begin () + chunk_end, []( auto i) {
return i % 2 == 0 ;
});
});
chunk_count. emplace_back ( std::move (result));
}
size_t total_count = 0 ;
for ( auto & result : chunk_count) {
total_count += co_await result;
}
co_return total_count;
}
int main () {
concurrencpp::runtime runtime;
const auto vector = make_random_vector ();
auto result = count_even (runtime. thread_pool_executor (), vector);
const auto total_count = result. get ();
std::cout << " there are " << total_count << " even numbers in the vector " << std::endl;
return 0 ;
} En este ejemplo, comenzamos el programa creando un objeto de tiempo de ejecución. Creamos un vector lleno de números aleatorios, luego adquirimos el thread_pool_executor desde el tiempo de ejecución y count_even de llamadas. count_even es una coroutina que genera más tareas y co_await s para que terminen por dentro. max_concurrency_level Devuelve la cantidad máxima de trabajadores que el ejecutor apoya, en el caso del ejecutor de Threadpool, el número de trabajadores se calcula a partir del número de núcleos. Luego dividimos la matriz para que coincida con el número de trabajadores y enviemos cada fragmento para ser procesado en su propia tarea. Asincrónicamente, los trabajadores cuentan cuántos números pares cada fragmento contiene, y co_return el resultado. count_even suman cada resultado extrayendo el recuento usando co_await , el resultado final es luego co_return . El hilo principal, que fue bloqueado por llamar get está desbloqueado y se devuelve el recuento total. Principal imprime el número de números pares y el programa termina con gracia.
Cada operación grande o compleja se puede descomponer en pasos más pequeños y encadenables. Las tareas son operaciones asincrónicas que implementan esos pasos computacionales. Las tareas pueden ejecutarse en cualquier lugar con la ayuda de los ejecutores. Si bien las tareas se pueden crear a partir de llamadas regulares (como functores y lambdas), las tareas se usan principalmente con coroutinas, que permiten una suspensión y reanudación suaves. En concurrencpp, el concepto de tarea está representado por la clase concurrencpp::task . Aunque el concepto de tarea es fundamental para Concurrenpp, las aplicaciones rara vez tendrán que crear y manipular objetos de tarea, ya que los objetos de tarea son creados y programados por el tiempo de ejecución sin ayuda externa.
concurrencpp permite a las aplicaciones producir y consumir coroutinas como la forma principal de crear tareas. Concurrencpp admite tareas ansiosas y perezosas.
Las tareas ansiosas comienzan a ejecutarse en el momento en que se invocan. Este tipo de ejecución se recomienda cuando las aplicaciones necesitan disparar una acción asincrónica y consumir su resultado más adelante (disparar y consumir más tarde), o ignorar por completo el resultado asincrónico (fuego y olvidar).
Las tareas ansiosas pueden devolver result o null_result . El tipo de retorno result le dice a la coroutina que pase el valor devuelto o la excepción lanzada (fuego y consuma más tarde) mientras que el tipo de retorno null_result le dice a la coroutina que caiga e ignore cualquiera de ellos (fuego y olvida).
Las coroutinas ansiosas pueden comenzar a funcionar sincrónicamente, en el hilo de llamadas. Este tipo de corutinas se llama "corutinas regulares". Concurrencpp Anser Roroutines también puede comenzar a funcionar en paralelo, dentro de un albacea determinado, este tipo de coroutinas se llama "corutinas paralelas".
Las tareas perezosas, por otro lado, comienzan a ejecutarse solo cuando co_await ed. Este tipo de tareas se recomienda cuando el resultado de la tarea debe consumirse inmediatamente después de crear la tarea. Las tareas perezosas, diferidas, están un poco más optimizadas para el caso de consumo inmediato, ya que no necesitan sincronización especial de hilos para pasar el resultado asincrónico a su consumidor. El compilador también podría optimizar algunas asignaciones de memoria necesarias para formar la promesa de coroutina subyacente. No es posible disparar una tarea perezosa y ejecutar algo más mientras tanto: el disparo de una coroutina de la pereza-callee significa necesariamente la suspensión de la persona que llama. El Coroutine de llamadas solo se reanudará cuando se complete el Coroutine Lazy-Callee. Las tareas perezosas solo pueden regresar lazy_result .
Las tareas perezosas se pueden convertir en tareas ansiosas llamando lazy_result::run . Este método ejecuta la tarea perezosa en línea y devuelve un objeto result que monitorea la tarea recién iniciada. Si los desarrolladores no están seguros de que el tipo de resultado usar, se les anima a usar resultados perezosos, ya que pueden convertirse en resultados regulares (ansiosos) si es necesario.
Cuando una función devuelve cualquiera de lazy_result , result o null_result y contiene al menos un co_await o co_return en su cuerpo, la función es una coroutina concurrencpp. Cada coroutina válida de concurrencpp es una tarea válida. En nuestro ejemplo de recuento anterior, count_even es una coroutina. Primero generamos count_even , luego, al interior, el ejecutor de Threadpool generó más tareas infantiles (que se crean a partir de llamadas regulares), que finalmente se unieron usando co_await .
Un albacea de Constrencencp es un objeto que puede programar y ejecutar tareas. Los ejecutores simplifican el trabajo de administrar recursos como subprocesos, grupos de subprocesos y colas de tareas al desacoplarlos del código de aplicación. Los ejecutores proporcionan una forma unificada de programar y ejecutar tareas, ya que todos extienden concurrencpp::executor .
executor API class executor {
/*
Initializes a new executor and gives it a name.
*/
executor (std::string_view name);
/*
Destroys this executor.
*/
virtual ~executor () noexcept = default ;
/*
The name of the executor, used for logging and debugging.
*/
const std::string name;
/*
Schedules a task to run in this executor.
Throws concurrencpp::errors::runtime_shutdown exception if shutdown was called before.
*/
virtual void enqueue (concurrencpp::task task) = 0;
/*
Schedules a range of tasks to run in this executor.
Throws concurrencpp::errors::runtime_shutdown exception if shutdown was called before.
*/
virtual void enqueue (std::span<concurrencpp::task> tasks) = 0;
/*
Returns the maximum count of real OS threads this executor supports.
The actual count of threads this executor is running might be smaller than this number.
returns numeric_limits<int>::max if the executor does not have a limit for OS threads.
*/
virtual int max_concurrency_level () const noexcept = 0;
/*
Returns true if shutdown was called before, false otherwise.
*/
virtual bool shutdown_requested () const noexcept = 0;
/*
Shuts down the executor:
- Tells underlying threads to exit their work loop and joins them.
- Destroys unexecuted coroutines.
- Makes subsequent calls to enqueue, post, submit, bulk_post and
bulk_submit to throw concurrencpp::errors::runtime_shutdown exception.
- Makes shutdown_requested return true.
*/
virtual void shutdown () noexcept = 0;
/*
Turns a callable and its arguments into a task object and
schedules it to run in this executor using enqueue.
Arguments are passed to the task by decaying them first.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type , class ... argument_types>
void post (callable_type&& callable, argument_types&& ... arguments);
/*
Like post, but returns a result object that passes the asynchronous result.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type , class ... argument_types>
result<type> submit (callable_type&& callable, argument_types&& ... arguments);
/*
Turns an array of callables into an array of tasks and
schedules them to run in this executor using enqueue.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type >
void bulk_post (std::span<callable_type> callable_list);
/*
Like bulk_post, but returns an array of result objects that passes the asynchronous results.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type >
std::vector<concurrencpp::result<type>> bulk_submit (std::span<callable_type> callable_list);
};Como se mencionó anteriormente, Concurrencpp proporciona ejecutores de uso común. Estos tipos de ejecutores son:
Ejecutor del grupo de hilos : un albacea de propósito general que mantiene un grupo de hilos. El ejecutor de grupo de subprocesos es adecuado para tareas cortas de CPU que no bloquean. Se alienta a las aplicaciones a usar este ejecutor como ejecutor predeterminado para tareas que no son de bloqueo. El grupo de subprocesos concurrencpp proporciona inyección dinámica de hilos y equilibrio de trabajo dinámico.
Ejecutor de fondo : un ejecutor de Threadpool con un grupo más grande de hilos. Adecuado para lanzar tareas de bloqueo corto como las consultas de archivo IO y DB. Nota importante: al consumir resultados, este ejecutor devolvió llamando submit y bulk_submit , es importante cambiar la ejecución utilizando resume_on a un ejecutor unido a CPU, para evitar que las tareas unidas a CPU se procesen dentro de fondo_executor.
ejemplo:
auto result = background_executor.submit([] { /* some blocking action */ });
auto done_result = co_await result.resolve();
co_await resume_on (some_cpu_executor);
auto val = co_await done_result; // runs inside some_cpu_executorEjecutor de hilos : un ejecutor que lanza cada tarea enqueada para ejecutarse en un nuevo hilo de ejecución. Los hilos no se reutilizan. Este ejecutor es bueno para tareas de larga duración, como objetos que ejecutan un bucle de trabajo o operaciones de bloqueo largas.
Ejecutor de subprocesos de trabajadores : un ejecutor de un solo hilo que mantiene una única cola de tareas. Adecuado cuando las aplicaciones desean un hilo dedicado que ejecute muchas tareas relacionadas.
Ejecutor manual : un albacea que no ejecuta coroutinas por sí misma. El código de aplicación puede ejecutar tareas previamente enqueadas invocando manualmente sus métodos de ejecución.
Ejecutor derivable : una clase base para ejecutores definidos por el usuario. Aunque es posible heredar directamente de concurrencpp::executor , derivable_executor utiliza el patrón CRTP que proporciona algunas oportunidades de optimización para el compilador.
Ejecutor en línea : se utiliza principalmente para anular el comportamiento de otros ejecutores. Encarear una tarea es equivalente a invocarla en línea.
El mecanismo desnudo de un albacea se encapsula en su método enqueue . Este método enquesa una tarea para la ejecución y tiene dos sobrecargas: una sobrecarga recibe un objeto de tarea único como argumento, y otro que recibe un lapso de objetos de tarea. La segunda sobrecarga se usa para enzar un lote de tareas. Esto permite una mejor programación de la heurística y una disminución de la contención.
Las aplicaciones no tienen que confiar solo en enqueue , concurrencpp::executor proporciona una API para programar llamadas de usuario al convertirlos en objetos de tareas detrás de escena. Las solicitudes pueden solicitar a los ejecutores que devuelvan un objeto de resultado que apruebe el resultado asíncrono del invocado proporcionado. Esto se hace llamando executor::submit y executor::bulk_submit . submit recibe un llamado y devuelve un objeto de resultado. executor::bulk_submit obtiene un span de llamadas y devuelve un vector de objetos de resultados de manera similar, submit obras. En muchos casos, las aplicaciones no están interesadas en el valor o la excepción asíncrona. En este caso, las aplicaciones pueden usar executor:::post y executor::bulk_post para programar una llamada o un span de llamadas para ser ejecutado, pero también le dice a la tarea que elimine cualquier valor devuelto o excepción lanzada. No pasar el resultado asincrónico es más rápido que pasar, pero no tenemos forma de conocer el estado o el resultado de la tarea en curso.
post , bulk_post , submit y bulk_submit usan enqueue detrás de escena para el mecanismo de programación subyacente.
thread_pool_executor API Además de post , submit , bulk_post y bulk_submit , el thread_pool_executor proporciona estos métodos adicionales.
class thread_pool_executor {
/*
Returns the number of milliseconds each thread-pool worker
remains idle (lacks any task to execute) before exiting.
This constant can be set by passing a runtime_options object
to the constructor of the runtime class.
*/
std::chrono::milliseconds max_worker_idle_time () const noexcept ;
};manual_executor Además de post , submit , bulk_post y bulk_submit , el manual_executor proporciona estos métodos adicionales.
class manual_executor {
/*
Destructor. Equivalent to clear.
*/
~manual_executor () noexcept ;
/*
Returns the number of enqueued tasks at the moment of invocation.
This number can change quickly by the time the application handles it, it should be used as a hint.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
size_t size () const noexcept ;
/*
Queries whether the executor is empty from tasks at the moment of invocation.
This value can change quickly by the time the application handles it, it should be used as a hint.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
bool empty () const noexcept ;
/*
Clears the executor from any enqueued but yet to-be-executed tasks,
and returns the number of cleared tasks.
Tasks enqueued to this executor by (post_)submit method are resumed
and errors::broken_task exception is thrown inside them.
Ongoing tasks that are being executed by loop_once(_XXX) or loop(_XXX) are uneffected.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t clear ();
/*
Tries to execute a single task. If at the moment of invocation the executor
is empty, the method does nothing.
Returns true if a task was executed, false otherwise.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool loop_once ();
/*
Tries to execute a single task.
This method returns when either a task was executed or max_waiting_time
(in milliseconds) has reached.
If max_waiting_time is 0, the method is equivalent to loop_once.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool loop_once_for (std::chrono::milliseconds max_waiting_time);
/*
Tries to execute a single task.
This method returns when either a task was executed or timeout_time has reached.
If timeout_time has already expired, this method is equivalent to loop_once.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
bool loop_once_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Tries to execute max_count enqueued tasks and returns the number of tasks that were executed.
This method does not wait: it returns when the executor
becomes empty from tasks or max_count tasks have been executed.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t loop ( size_t max_count);
/*
Tries to execute max_count tasks.
This method returns when either max_count tasks were executed or a
total amount of max_waiting_time has passed.
If max_waiting_time is 0, the method is equivalent to loop.
Returns the actual amount of tasks that were executed.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t loop_for ( size_t max_count, std::chrono::milliseconds max_waiting_time);
/*
Tries to execute max_count tasks.
This method returns when either max_count tasks were executed or timeout_time has reached.
If timeout_time has already expired, the method is equivalent to loop.
Returns the actual amount of tasks that were executed.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
size_t loop_until ( size_t max_count, std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Waits for at least one task to be available for execution.
This method should be used as a hint,
as other threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
void wait_for_task ();
/*
This method returns when one or more tasks are available for
execution or max_waiting_time has passed.
Returns true if at at least one task is available for execution, false otherwise.
This method should be used as a hint, as other threads (calling loop, for example)
might empty the executor, before this thread has a chance to do something
with the newly enqueued tasks.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool wait_for_task_for (std::chrono::milliseconds max_waiting_time);
/*
This method returns when one or more tasks are available for execution or timeout_time has reached.
Returns true if at at least one task is available for execution, false otherwise.
This method should be used as a hint,
as other threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
bool wait_for_task_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
This method returns when max_count or more tasks are available for execution.
This method should be used as a hint, as other threads
(calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
void wait_for_tasks ( size_t max_count);
/*
This method returns when max_count or more tasks are available for execution
or max_waiting_time (in milliseconds) has passed.
Returns the number of tasks available for execution when the method returns.
This method should be used as a hint, as other
threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t wait_for_tasks_for ( size_t count, std::chrono::milliseconds max_waiting_time);
/*
This method returns when max_count or more tasks are available for execution
or timeout_time is reached.
Returns the number of tasks available for execution when the method returns.
This method should be used as a hint, as other threads
(calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
size_t wait_for_tasks_until ( size_t count, std::chrono::time_point<clock_type, duration_type> timeout_time);
}; Los valores y excepciones asincrónicas se pueden consumir utilizando objetos de resultados concurrencpp. El tipo result representa el resultado asincrónico de una tarea ansiosa, mientras que lazy_result representa el resultado diferido de una tarea perezosa.
Cuando se completa una tarea (ansiosa o perezosa), devuelve un valor válido o lanza una excepción. En cualquier caso, este resultado asincrónico se pasa al consumidor del objeto de resultado.
Los objetos result forman coroutinas asimétricas: la ejecución de una persona que llama-corutina no se ve afectada por la ejecución de una callee-corutina, ambas coroutinas pueden ejecutarse de forma independiente. Solo cuando consume el resultado de la Callee-Coroutine, la persona que llama podría suspenderse a la espera de que la Callee complete. Hasta ese momento, ambas coroutinas funcionan de forma independiente. La Callee-Coroutine se ejecuta si su resultado se consume o no.
Los objetos lazy_result forman coroutinas simétricas: la ejecución de una callee-corutina ocurre solo después de la suspensión de la persona que llama. Cuando se espera un resultado perezoso, la coroutina actual se suspende y la tarea perezosa asociada con el resultado perezoso comienza a funcionar. Después de que la Callee-Coroutine se completa y produce un resultado, se reanuda a la persona que llama. Si no se consume un resultado perezoso, su tarea perezosa asociada nunca comienza a ejecutarse.
Todos los objetos de resultados son un tipo solo de movimiento, y como tal, no se pueden usar después de que su contenido se trasladó a otro objeto de resultado. En este caso, el objeto de resultado se considera vacío e intenta llamar a cualquier método que no sea operator bool y operator = lanzará una excepción.
Después de que el resultado asíncrono se haya retirado del objeto de resultado (por ejemplo, llamando get u operator co_await ), el objeto de resultado se vacía. El vacío se puede probar con operator bool .
Esperar un resultado significa suspender la coroutina actual hasta que el objeto de resultado esté listo. Si se devolvió un valor válido de la tarea asociada, se devuelve del objeto de resultado. Si la tarea asociada lanza una excepción, se remonta. En el momento de la espera, si el resultado ya está listo, la coroutina actual se reanuda de inmediato. De lo contrario, se reanuda el hilo que establece el resultado o excepción asíncrona.
Resolver un resultado es similar a esperarlo. La diferencia es que la expresión co_await devolverá el objeto de resultado en sí, en una forma no vacía, en un estado listo. El resultado asíncrono se puede extraer usando get o co_await .
Cada objeto de resultados tiene un estado que indica el estado del resultado asincrónico. El estado del resultado varía de result_status::idle (el resultado asíncrono o la excepción aún no se ha producido) a result_status::value (la tarea asociada terminada con gracia al devolver un valor válido) a result_status::exception (la tarea terminada al lanzar una excepción). El estado se puede consultar llamando (lazy_)result::status .
result El tipo result representa el resultado de una tarea asincrónica en curso, similar a std::future .
Además de esperar y resolver objetos de resultados, también se pueden esperar llamando a cualquiera de result::wait , result::wait_for , result::wait_until o result::get . Esperar que el resultado finalice es una operación de bloqueo (en el caso de que el resultado asincrónico no esté listo), y suspenderá todo el hilo de ejecución que espera que el resultado asincrónico esté disponible. Las operaciones de espera generalmente se desaniman y solo se permiten en tareas a nivel de raíz o en contextos que lo permiten, como bloquear el hilo principal esperando que el resto de la aplicación termine con gracia, o usando concurrencpp::blocking_executor o concurrencpp::thread_executor .
Esperar objetos de resultados utilizando co_await (y al hacerlo, convertir la función/tarea actual en una coroutina también) es la forma preferida de consumir objetos de resultados, ya que no bloquea los subprocesos subyacentes.
result class result {
/*
Creates an empty result that isn't associated with any task.
*/
result () noexcept = default ;
/*
Destroys the result. Associated tasks are not cancelled.
The destructor does not block waiting for the asynchronous result to become ready.
*/
~result () noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result (result&& rhs) noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.
*/
result& operator = (result&& rhs) noexcept = default ;
/*
Returns true if this is a non-empty result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The returned value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Blocks the current thread of execution until this result is ready,
when status() != result_status::idle.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void wait ();
/*
Blocks until this result is ready or duration has passed. Returns the status
of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class duration_unit , class ratio >
result_status wait_for (std::chrono::duration<duration_unit, ratio> duration);
/*
Blocks until this result is ready or timeout_time has reached. Returns the status
of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class clock , class duration >
result_status wait_until (std::chrono::time_point<clock, duration> timeout_time);
/*
Blocks the current thread of execution until this result is ready,
when status() != result_status::idle.
If the result is a valid value, it is returned, otherwise, get rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
type get ();
/*
Returns an awaitable used to await this result.
If the result is already ready - the current coroutine resumes
immediately in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended
and resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to resolve this result.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
};lazy_resultUn objeto de resultado perezoso representa el resultado de una tarea perezosa diferida.
lazy_result tiene la responsabilidad de comenzar la tarea perezosa asociada y transmitir su resultado diferido a su consumidor. Cuando se espera o resuelve, el resultado perezoso suspende la coroutina actual y comienza la tarea perezosa asociada. Cuando se completa la tarea asociada, su valor asincrónico se pasa a la tarea de llamadas, que luego se reanuda.
A veces, una API puede devolver un resultado perezoso, pero las aplicaciones necesitan su tarea asociada para ejecutarse con entusiasmo (sin suspender la tarea de la persona que llama). En este caso, las tareas perezosas se pueden convertir en tareas ansiosas run a su resultado perezoso asociado. En este caso, la tarea asociada comenzará a ejecutarse en línea, sin suspender la tarea de llamadas. El resultado perezoso original se vacía y se devolverá un objeto result válido que monitorea la tarea recién iniciada.
lazy_result API class lazy_result {
/*
Creates an empty lazy result that isn't associated with any task.
*/
lazy_result () noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
lazy_result (lazy_result&& rhs) noexcept ;
/*
Destroys the result. If not empty, the destructor destroys the associated task without resuming it.
*/
~lazy_result () noexcept ;
/*
Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.
If *this is not empty, then operator= destroys the associated task without resuming it.
*/
lazy_result& operator =(lazy_result&& rhs) noexcept ;
/*
Returns true if this is a non-empty result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The returned value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Returns an awaitable used to start the associated task and await this result.
If the result is already ready - the current coroutine resumes immediately
in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended and
resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to start the associated task and resolve this result.
If the result is already ready - the current coroutine resumes immediately
in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended and resumed
when the asynchronous result is ready, by the thread which
had set the asynchronous value or exception.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
/*
Runs the associated task inline and returns a result object that monitors the newly started task.
After this call, *this is empty.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
*/
result<type> run ();
};Las coroutinas ansiosas regulares comienzan a ejecutarse sincrónicamente en el hilo de ejecución de llamadas. La ejecución podría cambiar a otro hilo de ejecución si una coroutina se somete a una reprogramación, por ejemplo, esperando un objeto de resultado no listo dentro de él. concurrencpp también proporciona coroutinas paralelas, que comienzan a funcionar dentro de un albacea determinado, no en el hilo de ejecución invocador. Este estilo de programación de las corutinas es especialmente útil cuando se escribe algoritmos paralelos, algoritmos recursivos y algoritmos concurrentes que usan el modelo de bifurcación.
Cada coroutina paralela debe cumplir con las siguientes condiciones previas:
result / null_result .executor_tag como su primer argumento.type* / type& / std::shared_ptr<type> , donde type es una clase concreta de executor como segundo argumento.co_await o co_return en su cuerpo. Si se aplica todo lo anterior, la función es una coroutina paralela: Concurrencpp comenzará la coroutina suspendida e inmediatamente lo reprogramará para que se ejecute en el ejecutor proporcionado. concurrencpp::executor_tag es un marcador de posición ficticio para decirle al tiempo de ejecución de concurrencpp que esta función no es una función regular, debe comenzar a funcionar dentro del ejecutor dado. Si el albacea pasado a la coroutina paralela es nula, la coroutina no comenzará a funcionar y una excepción std::invalid_argument se lanzará sincrónicamente. Si se cumplen todas las condiciones previas, las aplicaciones pueden consumir el resultado de la coroutina paralela utilizando el objeto de resultado devuelto.
En este ejemplo, calculamos el miembro 30 de la secuencia Fibonacci de manera paralela. Comenzamos a lanzar cada paso de Fibonacci en su propia coroutina paralela. El primer argumento es un executor_tag ficticio_tag y el segundo argumento es el ejecutor de Threadpool. Cada paso recursivo invoca una nueva coroutina paralela que se ejecuta en paralelo. Cada resultado se co_return a su tarea principal y se adquire mediante el uso de co_await .
Cuando consideramos que la entrada es lo suficientemente pequeña como para calcularse sincrónicamente (cuando curr <= 10 ), dejamos de ejecutar cada paso recursivo en su propia tarea y simplemente resolver el algoritmo sincrónicamente.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace concurrencpp ;
int fibonacci_sync ( int i) {
if (i == 0 ) {
return 0 ;
}
if (i == 1 ) {
return 1 ;
}
return fibonacci_sync (i - 1 ) + fibonacci_sync (i - 2 );
}
result< int > fibonacci (executor_tag, std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10 ) {
co_return fibonacci_sync (curr);
}
auto fib_1 = fibonacci ({}, tpe, curr - 1 );
auto fib_2 = fibonacci ({}, tpe, curr - 2 );
co_return co_await fib_1 + co_await fib_2;
}
int main () {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci ({}, runtime. thread_pool_executor (), 30 ). get ();
std::cout << " fibonacci(30) = " << fibb_30 << std::endl;
return 0 ;
} Para comparar, así es como se escribe el mismo código sin usar coroutinas paralelas y confiar en executor::submit solo. Dado que fibonacci devuelve un result<int> , enviarlo de manera recursiva a través executor::submit el resultado de un result<result<int>> .
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace concurrencpp ;
int fibonacci_sync ( int i) {
if (i == 0 ) {
return 0 ;
}
if (i == 1 ) {
return 1 ;
}
return fibonacci_sync (i - 1 ) + fibonacci_sync (i - 2 );
}
result< int > fibonacci (std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10 ) {
co_return fibonacci_sync (curr);
}
auto fib_1 = tpe-> submit (fibonacci, tpe, curr - 1 );
auto fib_2 = tpe-> submit (fibonacci, tpe, curr - 2 );
co_return co_await co_await fib_1 +
co_await co_await fib_2;
}
int main () {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci (runtime. thread_pool_executor (), 30 ). get ();
std::cout << " fibonacci(30) = " << fibb_30 << std::endl;
return 0 ;
} Los objetos de resultados son la forma principal de aprobar datos entre tareas en concurrencpp y hemos visto cómo los ejecutores y las coroutinas producen dichos objetos. A veces queremos usar las capacidades de los objetos de resultados con no tareas, por ejemplo, cuando usamos una biblioteca de terceros. En este caso, podemos completar un objeto de resultado utilizando un result_promise . result_promise se asemeja a un objeto std::promise : las aplicaciones pueden establecer manualmente el resultado o excepción asincrónicos y hacer que el objeto result asociado esté listo.
Al igual que los objetos de resultados, los propios de resultados son un tipo de movimiento solo que se vacía después de moverse. Del mismo modo, después de establecer un resultado o una excepción, la promesa del resultado también se vuelve vacía. Si una promesa de resultados sale del alcance y no se ha establecido un resultado/excepción, el destructor de promise de resultados establece una excepción concurrencpp::errors::broken_task usando el método set_exception . Las tareas suspendidas y bloqueadas que esperan el objeto de resultado asociado se reanudan/desbloquean.
Las promesas de resultados pueden convertir el estilo de código de devolución de llamada en el estilo de código async/await : cada vez que un componente requiere una devolución de llamada para pasar el resultado asíncrono, podemos pasar una devolución de llamada que llama set_result o set_exception (dependiendo del resultado asincrónico en sí) en la promesa de resultado aprobada y devolver el resultado asociado.
result_promise template < class type >
class result_promise {
/*
Constructs a valid result_promise.
Might throw std::bad_alloc if fails to allocate memory.
*/
result_promise ();
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result_promise (result_promise&& rhs) noexcept ;
/*
Destroys *this, possibly setting an errors::broken_task exception
by calling set_exception if *this is not empty at the time of destruction.
*/
~result_promise () noexcept ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result_promise& operator = (result_promise&& rhs) noexcept ;
/*
Returns true if this is a non-empty result-promise.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Sets a value by constructing <<type>> from arguments... in-place.
Makes the associated result object become ready - tasks waiting for it
to become ready are unblocked.
Suspended tasks are resumed inline.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Might throw any exception that the constructor
of type(std::forward<argument_types>(arguments)...) throws.
*/
template < class ... argument_types>
void set_result (argument_types&& ... arguments);
/*
Sets an exception.
Makes the associated result object become ready - tasks waiting for it
to become ready are unblocked.
Suspended tasks are resumed inline.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Throws std::invalid_argument exception if exception_ptr is null.
*/
void set_exception (std::exception_ptr exception_ptr);
/*
A convenience method that invokes a callable with arguments... and calls set_result
with the result of the invocation.
If an exception is thrown, the thrown exception is caught and set instead by calling set_exception.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Might throw any exception that callable(std::forward<argument_types>(arguments)...)
or the contructor of type(type&&) throw.
*/
template < class callable_type , class ... argument_types>
void set_from_function (callable_type&& callable, argument_types&& ... arguments);
/*
Gets the associated result object.
Throws errors::empty_result_promise exception If *this is empty.
Throws errors::result_already_retrieved exception if this method had been called before.
*/
result<type> get_result ();
};result_promise Ejemplo: En este ejemplo, result_promise se usa para empujar los datos de un hilo, y se puede extraer de su objeto result asociado de otro hilo.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::result_promise<std::string> promise;
auto result = promise. get_result ();
std::thread my_3_party_executor ([promise = std::move (promise)] () mutable {
std::this_thread::sleep_for ( std::chrono::seconds ( 1 )); // Imitate real work
promise. set_result ( " hello world " );
});
auto asynchronous_string = result. get ();
std::cout << " result promise returned string: " << asynchronous_string << std::endl;
my_3_party_executor. join ();
} En este ejemplo, usamos std::thread como un ejecutor de terceros. Esto representa un escenario en el que se usa un albacea que no se usa como parte del ciclo de vida de la aplicación. Extraemos el objeto de resultado antes de pasar la promesa y bloquear el hilo principal hasta que el resultado esté listo. En my_3_party_executor , establecemos un resultado como si lo co_return .
Los resultados compartidos son un tipo especial de objetos de resultados que permiten a múltiples consumidores acceder al resultado asíncrono, similar a std::shared_future . Diferentes consumidores de diferentes hilos pueden llamar a funciones como await , get y resolve de manera segura.
Los resultados compartidos se crean a partir de objetos de resultados regulares y, a diferencia de los objetos de resultados regulares, son copiables y móviles. Como tal, shared_result se comporta como std::shared_ptr Type. Si una instancia de resultado compartida se mueve a otra instancia, la instancia se vacía, e intentar acceder a ella lanzará una excepción.
Para apoyar a múltiples consumidores, los resultados compartidos devuelven una referencia al valor asincrónico en lugar de moverlo (como un resultado regular). Por ejemplo, se llama a un shared_result<int> un int& when get , await etc. Si el tipo subyacente del shared_result es void o un tipo de referencia (como int& ), se devuelven como de costumbre. Si el resultado asincrónico es una excepción lanzada, se vuelve a publicar.
Tenga en cuenta que si bien adquirir el resultado asincrónico que usa shared_result de múltiples subprocesos es seguro de subprocesos, el valor real podría no ser seguro de subprocesos. Por ejemplo, múltiples hilos pueden adquirir un entero asíncrono recibiendo su referencia ( int& ). No hace que el hilo entero en sí mismo sea seguro. Está bien mutar el valor asincrónico si el valor asincrónico ya es seguro. Alternativamente, se alienta a las aplicaciones a usar tipos const para comenzar (como const int ), y adquirir referencias constantes (como const int& ) que evitan la mutación.
shared_result API class share_result {
/*
Creates an empty shared-result that isn't associated with any task.
*/
shared_result () noexcept = default ;
/*
Destroys the shared-result. Associated tasks are not cancelled.
The destructor does not block waiting for the asynchronous result to become ready.
*/
~shared_result () noexcept = default ;
/*
Converts a regular result object to a shared-result object.
After this call, rhs is empty.
Might throw std::bad_alloc if fails to allocate memory.
*/
shared_result (result<type> rhs);
/*
Copy constructor. Creates a copy of the shared result object that monitors the same task.
*/
shared_result ( const shared_result&) noexcept = default ;
/*
Move constructor. Moves rhs to *this. After this call, rhs is empty.
*/
shared_result (shared_result&& rhs) noexcept = default ;
/*
Copy assignment operator. Copies rhs to *this and monitors the same task that rhs monitors.
*/
shared_result& operator =( const shared_result& rhs) noexcept ;
/*
Move assignment operator. Moves rhs to *this. After this call, rhs is empty.
*/
shared_result& operator =(shared_result&& rhs) noexcept ;
/*
Returns true if this is a non-empty shared-result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The return value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Blocks the current thread of execution until this shared-result is ready,
when status() != result_status::idle.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void wait ();
/*
Blocks until this shared-result is ready or duration has passed.
Returns the status of this shared-result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class duration_type , class ratio_type >
result_status wait_for (std::chrono::duration<duration_type, ratio_type> duration);
/*
Blocks until this shared-result is ready or timeout_time has reached.
Returns the status of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class clock_type , class duration_type >
result_status wait_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Blocks the current thread of execution until this shared-result is ready,
when status() != result_status::idle.
If the result is a valid value, a reference to it is returned,
otherwise, get rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
std:: add_lvalue_reference_t <type> get ();
/*
Returns an awaitable used to await this shared-result.
If the shared-result is already ready - the current coroutine resumes
immediately in the calling thread of execution.
If the shared-result is not ready yet, the current coroutine is
suspended and resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, a reference to it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to resolve this shared-result.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
};shared_result : En este ejemplo, un objeto result se convierte en un objeto shared_result y una referencia a un resultado intsíncrono int se adquiere por muchas tareas generadas con thread_executor .
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < chrono >
concurrencpp::result< void > consume_shared_result (concurrencpp::shared_result< int > shared_result,
std::shared_ptr<concurrencpp::executor> resume_executor) {
std::cout << " Awaiting shared_result to have a value " << std::endl;
const auto & async_value = co_await shared_result;
concurrencpp::resume_on (resume_executor);
std::cout << " In thread id " << std::this_thread::get_id () << " , got: " << async_value << " , memory address: " << &async_value << std::endl;
}
int main () {
concurrencpp::runtime runtime;
auto result = runtime. background_executor ()-> submit ([] {
std::this_thread::sleep_for ( std::chrono::seconds ( 1 ));
return 100 ;
});
concurrencpp::shared_result< int > shared_result ( std::move (result));
concurrencpp::result< void > results[ 8 ];
for ( size_t i = 0 ; i < 8 ; i++) {
results[i] = consume_shared_result (shared_result, runtime. thread_pool_executor ());
}
std::cout << " Main thread waiting for all consumers to finish " << std::endl;
auto tpe = runtime. thread_pool_executor ();
auto all_consumed = concurrencpp::when_all (tpe, std::begin (results), std::end (results)). run ();
all_consumed. get ();
std::cout << " All consumers are done, exiting " << std::endl;
return 0 ;
} Cuando el objeto de tiempo de ejecución sale del alcance de main , itera a cada ejecutor almacenado y llama a su método shutdown . Intentar acceder al temporizador o cualquier ejecutor lanzará una excepción errors::runtime_shutdown . Cuando un albacea se apaga, borra sus colas de tareas internas, destruyendo objetos task no ejecutados. Si un objeto de tarea almacena una concurrencppp-coroutine, esa coroutina se reanuda en línea y una excepción errors::broken_task se arroja dentro de él. En cualquier caso en el que se lance una excepción runtime_shutdown o una excepción broken_task , las aplicaciones deben terminar su flujo de código actual con gracia lo antes posible. Esas excepciones no deben ignorarse. Tanto runtime_shutdown como broken_task Hered de errors::interrupted_task Class Base, y este tipo también se puede usar en una cláusula catch para manejar la terminación de manera unificada.
Muchas acciones asíncronas de Concurrencpp requieren una instancia de un albacea como su ejecutor de currículum . Cuando una acción asíncrona (implementada como una coroutina) puede terminar sincrónicamente, se reanuda inmediatamente en el hilo de ejecución de llamadas. Si la acción asíncrona no puede terminar sincrónicamente, se reanudará cuando termine, dentro del ejecutor de currículum dado. Por ejemplo, when_any una función de utilidad requiere una instancia de un currículum-ejecutor como su primer argumento. when_any devuelve un lazy_result que se vuelve listo cuando al menos un resultado dado se prepara. Si uno de los resultados ya está listo en el momento de llamar when_any , el Coroutine de llamadas se reanuda sincrónicamente en el hilo de ejecución de llamadas. De lo contrario, la coroutina de llamadas se reanudará cuando al menos el resultado esté terminado, dentro del ejecutor de currículum dado. Los ejecutores de currículum son importantes porque exigen dónde se reanudan las corutinas en los casos en que no está claro dónde se supone que se reanuda una corutina (por ejemplo, en el caso de when_any y when_all ), o en los casos en que la acción asíncrona se procesa dentro de uno de los trabajadores concurrencppp, que solo se usan para procesar esa acción específica y no código de aplicación.
make_ready_result make_ready_result crea un objeto de resultado listo de los argumentos dados. Esperar dicho resultado hará que la coroutina actual se reanude de inmediato. get y operator co_await devolverá el valor construido.
/*
Creates a ready result object by building <<type>> from arguments&&... in-place.
Might throw any exception that the constructor
of type(std::forward<argument_types>(arguments)...) throws.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type , class ... argument_types>
result<type> make_ready_result (argument_types&& ... arguments);
/*
An overload for void type.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
result< void > make_ready_result ();make_exceptional_result make_exceptional_result crea un objeto de resultado listo de una excepción dada. Esperar dicho resultado hará que la coroutina actual se reanude de inmediato. get y operator co_await volverá a retirar la excepción dada.
/*
Creates a ready result object from an exception pointer.
The returned result object will re-throw exception_ptr when calling get or await.
Throws std::invalid_argument if exception_ptr is null.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type >
result<type> make_exceptional_result (std::exception_ptr exception_ptr);
/*
Overload. Similar to make_exceptional_result(std::exception_ptr),
but gets an exception object directly.
Might throw any exception that the constructor of exception_type(std::move(exception)) might throw.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type , class exception_type >
result<type> make_exceptional_result (exception_type exception );when_all function when_all es una función de utilidad que crea un objeto de resultado perezoso que se prepara cuando se completan todos los resultados de entrada. La espera de este resultado perezoso devuelve todos los objetos de resultar de entrada en un estado listo, listo para ser consumido.
La función when_all viene con tres sabores, uno que acepta un rango heterogéneo de objetos de resultados, otro que lleva a un par de iteradores a una gama de objetos de resultados del mismo tipo y, por último, una sobrecarga que no acepta objetos de resultados. En el caso de objetos de resultado sin entrada: la función devuelve un objeto de resultado listo de una tupla vacía.
Si uno de los objetivos de resultados aprobados está vacío, se lanzará una excepción. En este caso, los objetos de resistencia a la entrada no se ven afectados por la función y se pueden usar nuevamente después de que se manejó la excepción. Si todos los objetos de resultado de entrada son válidos, se vacían por esta función y se devuelven en un estado válido y listo como resultado de salida.
Actualmente, when_all solo acepta objetos result .
Todas las sobrecargas aceptan un ejecutor de currículum como su primer parámetro. Al esperar un resultado devuelto por when_all , el Coroutine de llamadas será reanudado por el ejecutor de currículum dado.
/*
Creates a result object that becomes ready when all the input results become ready.
Passed result objects are emptied and returned as a tuple.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class ... result_types>
lazy_result<std::tuple< typename std::decay<result_types>::type...>>
when_all (std::shared_ptr<executor_type> resume_executor,
result_types&& ... results);
/*
Overload. Similar to when_all(result_types&& ...) but receives a pair of iterators referencing a range.
Passed result objects are emptied and returned as a vector.
If begin == end, the function returns immediately with an empty vector.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class iterator_type >
lazy_result<std::vector< typename std::iterator_traits<iterator_type>::value_type>>
when_all (std::shared_ptr<executor_type> resume_executor,
iterator_type begin, iterator_type end);
/*
Overload. Returns a ready result object that doesn't monitor any asynchronous result.
Might throw an std::bad_alloc exception if no memory is available.
*/
lazy_result<std::tuple<>> when_all (std::shared_ptr<executor_type> resume_executor);when_any función when_any es una función de utilidad que crea un objeto de resultado perezoso que se prepara cuando se completa al menos un resultado de entrada. La espera de este resultado devolverá una estructura auxiliar que contenga todos los objetos resistentes a la entrada más el índice de la tarea completa. Podría ser que en el momento de consumir el resultado listo, otros resultados ya podrían haberse completado de manera asincrónica. Las solicitudes pueden llamar when_any se repetidamente para consumir resultados listos a medida que se completan hasta que se consuman todos los resultados.
when_any función viene con solo dos sabores, uno que acepta un rango heterogéneo de objetos de resultados y otro que lleva a un par de iteradores a una gama de objetivos de resultados del mismo tipo. A diferencia de when_all , no tiene sentido esperar al menos una tarea para terminar cuando el rango de resultados está completamente vacío. Por lo tanto, no hay sobrecarga sin argumentos. Además, la sobrecarga de dos iteradores lanzará una excepción si esos iteradores hacen referencia a un rango vacío (cuando begin == end ).
Si uno de los objetivos de resultados aprobados está vacío, se lanzará una excepción. En cualquier caso, se lanza una excepción, los objetos de resultado de entrada no se ven afectados por la función y pueden usarse nuevamente después de que se manejara la excepción. Si todos los objetos de resultado de entrada son válidos, se vacían por esta función y se devuelven en un estado válido como resultado de salida.
Actualmente, when_any solo acepta objetos result .
Todas las sobrecargas aceptan un ejecutor de currículum como su primer parámetro. Al esperar un resultado devuelto por when_any , el Coroutine de llamadas será reanudado por el ejecutor de currículum dado.
/*
Helper struct returned from when_any.
index is the position of the ready result in results sequence.
results is either an std::tuple or an std::vector of the results that were passed to when_any.
*/
template < class sequence_type >
struct when_any_result {
std:: size_t index;
sequence_type results;
};
/*
Creates a result object that becomes ready when at least one of the input results is ready.
Passed result objects are emptied and returned as a tuple.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class ... result_types>
lazy_result<when_any_result<std::tuple<result_types...>>>
when_any (std::shared_ptr<executor_type> resume_executor,
result_types&& ... results);
/*
Overload. Similar to when_any(result_types&& ...) but receives a pair of iterators referencing a range.
Passed result objects are emptied and returned as a vector.
Throws std::invalid_argument if begin == end.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class iterator_type >
lazy_result<when_any_result<std::vector< typename std::iterator_traits<iterator_type>::value_type>>>
when_any (std::shared_ptr<executor_type> resume_executor,
iterator_type begin, iterator_type end);resume_on resume_on Devuelve un esperable que suspende la coroutina actual y la reanuda dentro del executor . Esta es una función importante que asegura que una coroutina se esté ejecutando en el ejecutor correcto. Por ejemplo, las aplicaciones pueden programar una tarea de fondo utilizando el background_executor y esperar el objeto de resultado devuelto. En este caso, el Coroutine en espera se reanudará dentro del albacea de fondo. Una llamada para resume_on con otro albacea unido a CPU se asegura de que las líneas de código unidas a CPU no se ejecuten en el ejecutor de fondo una vez que se complete la tarea de fondo. Si una tarea se vuelve a programar para ejecutarse en otro albacea usando resume_on , pero ese albacea se cierra antes de que pueda reanudar la tarea suspendida, esa tarea se reanuda de inmediato y se lanza una excepción erros::broken_task . En este caso, las aplicaciones necesitan con bastante gracia.
/*
Returns an awaitable that suspends the current coroutine and resumes it inside executor.
Might throw any exception that executor_type::enqueue throws.
*/
template < class executor_type >
auto resume_on (std::shared_ptr<executor_type> executor);concurrencpp también proporciona temporizadores y colas de temporizador. Los temporizadores son objetos que definen acciones asincrónicas que se ejecutan en un ejecutor dentro de un intervalo de tiempo bien definido. Hay tres tipos de temporizadores: temporizadores regulares , temporizadores y objetos de retraso .
Los temporizadores regulares tienen cuatro propiedades que los definen:
Like other objects in concurrencpp, timers are a move only type that can be empty. When a timer is destructed or timer::cancel is called, the timer cancels its scheduled but not yet executed tasks. Ongoing tasks are uneffected. The timer callable must be thread safe. It is recommended to set the due time and the frequency of timers to a granularity of 50 milliseconds.
A timer queue is a concurrencpp worker that manages a collection of timers and processes them in just one thread of execution. It is also the agent used to create new timers. When a timer deadline (whether it is the timer's due-time or frequency) has reached, the timer queue "fires" the timer by scheduling its callable to run on the associated executor as a task.
Just like executors, timer queues also adhere to the RAII concept. When the runtime object gets out of scope, It shuts down the timer queue, cancelling all pending timers. After a timer queue has been shut down, any subsequent call to make_timer , make_onshot_timer and make_delay_object will throw an errors::runtime_shutdown exception. Applications must not try to shut down timer queues by themselves.
timer_queue API: class timer_queue {
/*
Destroys this timer_queue.
*/
~timer_queue () noexcept ;
/*
Shuts down this timer_queue:
Tells the underlying thread of execution to quit and joins it.
Cancels all pending timers.
After this call, invocation of any method besides shutdown
and shutdown_requested will throw an errors::runtime_shutdown.
If shutdown had been called before, this method has no effect.
*/
void shutdown () noexcept ;
/*
Returns true if shutdown had been called before, false otherwise.
*/
bool shutdown_requested () const noexcept ;
/*
Creates a new running timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
template < class callable_type , class ... argumet_types>
timer make_timer (
std::chrono::milliseconds due_time,
std::chrono::milliseconds frequency,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new one-shot timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
template < class callable_type , class ... argumet_types>
timer make_one_shot_timer (
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new delay object where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
result< void > make_delay_object (
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor);
};timer API: class timer {
/*
Creates an empty timer.
*/
timer () noexcept = default ;
/*
Cancels the timer, if not empty.
*/
~timer () noexcept ;
/*
Moves the content of rhs to *this.
rhs is empty after this call.
*/
timer (timer&& rhs) noexcept = default ;
/*
Moves the content of rhs to *this.
rhs is empty after this call.
Returns *this.
*/
timer& operator = (timer&& rhs) noexcept ;
/*
Cancels this timer.
After this call, the associated timer_queue will not schedule *this
to run again and *this becomes empty.
Scheduled, but not yet executed tasks are cancelled.
Ongoing tasks are uneffected.
This method has no effect if *this is empty or the associated timer_queue has already expired.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void cancel ();
/*
Returns the associated executor of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::shared_ptr<executor> get_executor () const ;
/*
Returns the associated timer_queue of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::weak_ptr<timer_queue> get_timer_queue () const ;
/*
Returns the due time of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::chrono::milliseconds get_due_time () const ;
/*
Returns the frequency of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::chrono::milliseconds get_frequency () const ;
/*
Sets new frequency for this timer.
Callables already scheduled to run at the time of invocation are not affected.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
void set_frequency (std::chrono::milliseconds new_frequency);
/*
Returns true is *this is not an empty timer, false otherwise.
The timer should not be used if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
};In this example we create a regular timer by using the timer queue. The timer schedules its callable to run after 1.5 seconds, then fires its callable every 2 seconds. The given callable runs on the threadpool executor.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
int main () {
concurrencpp::runtime runtime;
std:: atomic_size_t counter = 1 ;
concurrencpp::timer timer = runtime. timer_queue ()-> make_timer (
1500ms,
2000ms,
runtime. thread_pool_executor (),
[&] {
const auto c = counter. fetch_add ( 1 );
std::cout << " timer was invoked for the " << c << " th time " << std::endl;
});
std::this_thread::sleep_for (12s);
return 0 ;
}A oneshot timer is a one-time timer with only a due time - after it schedules its callable to run once it never reschedules it to run again.
In this example, we create a timer that runs only once - after 3 seconds from its creation, the timer will schedule its callable to run on a new thread of execution (using thread_executor ).
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
int main () {
concurrencpp::runtime runtime;
concurrencpp::timer timer = runtime. timer_queue ()-> make_one_shot_timer (
3000ms,
runtime. thread_executor (),
[&] {
std::cout << " hello and goodbye " << std::endl;
});
std::this_thread::sleep_for (4s);
return 0 ;
} A delay object is a lazy result object that becomes ready when it's co_await ed and its due time is reached. Applications can co_await this result object to delay the current coroutine in a non-blocking way. The current coroutine is resumed by the executor that was passed to make_delay_object .
In this example, we spawn a task (that does not return any result or thrown exception), which delays itself in a loop by calling co_await on a delay object.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
concurrencpp::null_result delayed_task (
std::shared_ptr<concurrencpp::timer_queue> tq,
std::shared_ptr<concurrencpp::thread_pool_executor> ex) {
size_t counter = 1 ;
while ( true ) {
std::cout << " task was invoked " << counter << " times. " << std::endl;
counter++;
co_await tq-> make_delay_object (1500ms, ex);
}
}
int main () {
concurrencpp::runtime runtime;
delayed_task (runtime. timer_queue (), runtime. thread_pool_executor ());
std::this_thread::sleep_for (10s);
return 0 ;
} A generator is a lazy, synchronous coroutine that is able to produce a stream of values to consume. Generators use the co_yield keyword to yield values back to their consumers.
Generators are meant to be used synchronously - they can only use the co_yield keyword and must not use the co_await keyword. A generator will continue to produce values as long as the co_yield keyword is called. If the co_return keyword is called (explicitly or implicitly), then the generator will stop producing values. Similarly, if an exception is thrown then the generator will stop producing values and the thrown exception will be re-thrown to the consumer of the generator.
Generators are meant to be used in a range-for loop: Generators implicitly produce two iterators - begin and end which control the execution of the for loop. These iterators should not be handled or accessed manually.
When a generator is created, it starts as a lazy task. When its begin method is called, the generator is resumed for the first time and an iterator is returned. The lazy task is resumed repeatedly by calling operator++ on the returned iterator. The returned iterator will be equal to end iterator when the generator finishes execution either by exiting gracefully or throwing an exception. As mentioned earlier, this happens behind the scenes by the inner mechanism of the loop and the generator, and should not be called directly.
Like other objects in concurrencpp, Generators are a move-only type. After a generator was moved, it is considered empty and trying to access its inner methods (other than operator bool ) will throw an exception. The emptiness of a generator should not generally occur - it is advised to consume generators upon their creation in a for loop and not to try to call their methods individually.
generator API class generator {
/*
Move constructor. After this call, rhs is empty.
*/
generator (generator&& rhs) noexcept ;
/*
Destructor. Invalidates existing iterators.
*/
~generator () noexcept ;
generator ( const generator& rhs) = delete ;
generator& operator =(generator&& rhs) = delete ;
generator& operator =( const generator& rhs) = delete ;
/*
Returns true if this generator is not empty.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Starts running this generator and returns an iterator.
Throws errors::empty_generator if *this is empty.
Re-throws any exception that is thrown inside the generator code.
*/
iterator begin ();
/*
Returns an end iterator.
*/
static generator_end_iterator end () noexcept ;
};
class generator_iterator {
using value_type = std:: remove_reference_t <type>;
using reference = value_type&;
using pointer = value_type*;
using iterator_category = std::input_iterator_tag;
using difference_type = std:: ptrdiff_t ;
/*
Resumes the suspended generator and returns *this.
Re-throws any exception that was thrown inside the generator code.
*/
generator_iterator& operator ++();
/*
Post-increment version of operator++.
*/
void operator ++( int );
/*
Returns the latest value produced by the associated generator.
*/
reference operator *() const noexcept ;
/*
Returns a pointer to the latest value produced by the associated generator.
*/
pointer operator ->() const noexcept ;
/*
Comparision operators.
*/
friend bool operator ==( const generator_iterator& it0, const generator_iterator& it1) noexcept ;
friend bool operator ==( const generator_iterator& it, generator_end_iterator) noexcept ;
friend bool operator ==(generator_end_iterator end_it, const generator_iterator& it) noexcept ;
friend bool operator !=( const generator_iterator& it, generator_end_iterator end_it) noexcept ;
friend bool operator !=(generator_end_iterator end_it, const generator_iterator& it) noexcept ;
};generator example: In this example, we will write a generator that yields the n-th member of the Sequence S(n) = 1 + 2 + 3 + ... + n where n <= 100 :
concurrencpp::generator< int > sequence () {
int i = 1 ;
int sum = 0 ;
while (i <= 100 ) {
sum += i;
++i;
co_yield sum;
}
}
int main () {
for ( auto value : sequence ()) {
std::cout << value << std::end;
}
return 0 ;
} Regular synchronous locks cannot be used safely inside tasks for a number of reasons:
std::mutex , are expected to be locked and unlocked in the same thread of execution. Unlocking a synchronous lock in a thread which had not locked it is undefined behavior. Since tasks can be suspended and resumed in any thread of execution, synchronous locks will break when used inside tasks. concurrencpp::async_lock solves those issues by providing a similar API to std::mutex , with the main difference that calls to concurrencpp::async_lock will return a lazy-result that can be co_awaited safely inside tasks. If one task tries to lock an async-lock and fails, the task will be suspended, and will be resumed when the lock is unlocked and acquired by the suspended task. This allows executors to process a huge amount of tasks waiting to acquire a lock without expensive context-switching and expensive kernel calls.
Similar to how std::mutex works, only one task can acquire async_lock at any given time, and a read barrier is place at the moment of acquiring. Releasing an async lock places a write barrier and allows the next task to acquire it, creating a chain of one-modifier at a time which sees the changes other modifiers had done and posts its modifications for the next modifiers to see.
Like std::mutex , concurrencpp::async_lock is not recursive . Extra attention must be given when acquiring such lock - A lock must not be acquired again in a task that has been spawned by another task which had already acquired the lock. In such case, an unavoidable dead-lock will occur. Unlike other objects in concurrencpp, async_lock is neither copiable nor movable.
Like standard locks, concurrencpp::async_lock is meant to be used with scoped wrappers which leverage C++ RAII idiom to ensure locks are always unlocked upon function return or thrown exception. async_lock::lock returns a lazy-result of a scoped wrapper that calls async_lock::unlock on destruction. Raw uses of async_lock::unlock are discouraged. concurrencpp::scoped_async_lock acts as the scoped wrapper and provides an API which is almost identical to std::unique_lock . concurrencpp::scoped_async_lock is movable, but not copiable.
async_lock::lock and scoped_async_lock::lock require a resume-executor as their parameter. Upon calling those methods, if the lock is available for locking, then it is locked and the current task is resumed immediately. If not, then the current task is suspended, and will be resumed inside the given resume-executor when the lock is finally acquired.
concurrencpp::scoped_async_lock wraps an async_lock and ensure it's properly unlocked. like std::unique_lock , there are cases it does not wrap any lock, and in this case it's considered to be empty. An empty scoped_async_lock can happen when it's defaultly constructed, moved, or scoped_async_lock::release method is called. An empty scoped-async-lock will not unlock any lock on destruction.
Even if the scoped-async-lock is not empty, it does not mean that it owns the underlying async-lock and it will unlock it on destruction. Non-empty and non-owning scoped-async locks can happen if scoped_async_lock::unlock was called or the scoped-async-lock was constructed using scoped_async_lock(async_lock&, std::defer_lock_t) constructor.
async_lock API class async_lock {
/*
Constructs an async lock object.
*/
async_lock () noexcept ;
/*
Destructs an async lock object.
*this is not automatically unlocked at the moment of destruction.
*/
~async_lock () noexcept ;
/*
Asynchronously acquires the async lock.
If *this has already been locked by another non-parent task, the current task will be suspended
and will be resumed when *this is acquired, inside resume_executor.
If *this has not been locked by another task, then *this will be acquired and the current task will be resumed
immediately in the calling thread of execution.
If *this has already been locked by a parent task, then unavoidable dead-lock will occur.
Throws std::invalid_argument if resume_executor is null.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result<scoped_async_lock> lock (std::shared_ptr<executor> resume_executor);
/*
Tries to acquire *this in the calling thread of execution.
Returns true if *this is acquired, false otherwise.
In any case, the current task is resumed immediately in the calling thread of execution.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result< bool > try_lock ();
/*
Releases *this and allows other tasks (including suspended tasks waiting for *this) to acquire it.
Throws std::system error if *this is not locked at the moment of calling this method.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
void unlock ();
};scoped_async_lock API class scoped_async_lock {
/*
Constructs an async lock wrapper that does not wrap any async lock.
*/
scoped_async_lock () noexcept = default ;
/*
If *this wraps async_lock, this method releases the wrapped lock.
*/
~scoped_async_lock () noexcept ;
/*
Moves rhs to *this.
After this call, *rhs does not wrap any async lock.
*/
scoped_async_lock (scoped_async_lock&& rhs) noexcept ;
/*
Wrapps unlocked lock.
lock must not be in acquired mode when calling this method.
*/
scoped_async_lock (async_lock& lock, std:: defer_lock_t ) noexcept ;
/*
Wrapps locked lock.
lock must be already acquired when calling this method.
*/
scoped_async_lock (async_lock& lock, std:: adopt_lock_t ) noexcept ;
/*
Calls async_lock::lock on the wrapped locked, using resume_executor as a parameter.
Throws std::invalid_argument if resume_executor is nulll.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::lock throws.
*/
lazy_result< void > lock (std::shared_ptr<executor> resume_executor);
/*
Calls async_lock::try_lock on the wrapped lock.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::try_lock throws.
*/
lazy_result< bool > try_lock ();
/*
Calls async_lock::unlock on the wrapped lock.
If *this does not wrap any lock, this method does nothing.
Throws std::system_error if *this wraps a lock and it is not locked.
*/
void unlock ();
/*
Checks whether *this wraps a locked mutex or not.
Returns true if wrapped locked is in acquired state, false otherwise.
*/
bool owns_lock () const noexcept ;
/*
Equivalent to owns_lock.
*/
explicit operator bool () const noexcept ;
/*
Swaps the contents of *this and rhs.
*/
void swap (scoped_async_lock& rhs) noexcept ;
/*
Empties *this and returns a pointer to the previously wrapped lock.
After a call to this method, *this doesn't wrap any lock.
The previously wrapped lock is not released,
it must be released by either unlocking it manually through the returned pointer or by
capturing the pointer with another scoped_async_lock which will take ownerwhip over it.
*/
async_lock* release () noexcept ;
/*
Returns a pointer to the wrapped async_lock, or a null pointer if there is no wrapped async_lock.
*/
async_lock* mutex () const noexcept ;
};async_lock example: In this example we push 10,000,000 integers to an std::vector object from different tasks concurrently, while using async_lock to make sure no data race occurs and the correctness of the internal state of that vector object is preserved.
# include " concurrencpp/concurrencpp.h "
# include < vector >
# include < iostream >
std::vector< size_t > numbers;
concurrencpp::async_lock lock;
concurrencpp::result< void > add_numbers (concurrencpp::executor_tag,
std::shared_ptr<concurrencpp::executor> executor,
size_t begin,
size_t end) {
for ( auto i = begin; i < end; i++) {
concurrencpp::scoped_async_lock raii_wrapper = co_await lock. lock (executor);
numbers. push_back (i);
}
}
int main () {
concurrencpp::runtime runtime;
constexpr size_t range = 10'000'000 ;
constexpr size_t sections = 4 ;
concurrencpp::result< void > results[sections];
for ( size_t i = 0 ; i < 4 ; i++) {
const auto range_start = i * range / sections;
const auto range_end = (i + 1 ) * range / sections;
results[i] = add_numbers ({}, runtime. thread_pool_executor (), range_start, range_end);
}
for ( auto & result : results) {
result. get ();
}
std::cout << " vector size is " << numbers. size () << std::endl;
// make sure the vector state has not been corrupted by unprotected concurrent accesses
std::sort (numbers. begin (), numbers. end ());
for ( size_t i = 0 ; i < range; i++) {
if (numbers[i] != i) {
std::cerr << " vector state is corrupted. " << std::endl;
return - 1 ;
}
}
std::cout << " succeeded pushing range [0 - 10,000,000] concurrently to the vector! " << std::endl;
return 0 ;
} async_condition_variable imitates the standard condition_variable and can be used safely with tasks alongside async_lock . async_condition_variable works with async_lock to suspend a task until some shared memory (protected by the lock) has changed. Tasks that want to monitor shared memory changes will lock an instance of async_lock , and call async_condition_variable::await . This will atomically unlock the lock and suspend the current task until some modifier task notifies the condition variable. A modifier task acquires the lock, modifies the shared memory, unlocks the lock and call either notify_one or notify_all . When a suspended task is resumed (using the resume executor that was given to await ), it locks the lock again, allowing the task to continue from the point of suspension seamlessly. Like async_lock , async_condition_variable is neither movable or copiable - it is meant to be created in one place and accessed by multiple tasks.
async_condition_variable::await overloads require a resume-executor, which will be used to resume the task, and a locked scoped_async_lock . async_condition_variable::await comes with two overloads - one that accepts a predicate and one that doesn't. The overload which does not accept a predicate will suspend the calling task immediately upon invocation until it's resumed by a call to notify_* . The overload which does accept a predicate works by letting the predicate inspect the shared memory and suspend the task repeatedly until the shared memory has reached its wanted state. schematically it works like calling
while (!pred()) { // pred() inspects the shared memory and returns true or false
co_await await (resume_executor, lock); // suspend the current task until another task calls `notify_xxx`
} Just like the standard condition variable, applications are encouraged to use the predicate-overload, as it allows more fine-grained control over suspensions and resumptions. async_condition_variable can be used to write concurrent collections and data-structures like concurrent queues and channels.
Internally, async_condition_variable holds a suspension-queue, in which tasks enqueue themselves when they await the condition variable to be notified. When any of notify_* methods are called, the notifying task dequeues either one task or all of the tasks, depending on the invoked method. Tasks are dequeued from the suspension-queue in a fifo manner. For example, if Task A calls await and then Task B calls await , then Task C calls notify_one , then internally task A will be dequeued and and resumed. Task B will remain suspended until another call to notify_one or notify_all is called. If task A and task B are suspended and task C calls notify_all , then both tasks will be dequeued and resumed.
async_condition_variable API class async_condition_variable {
/*
Constructor.
*/
async_condition_variable () noexcept ;
/*
Atomically releases lock and suspends the current task by adding it to *this suspension-queue.
Throws std::invalid_argument if resume_executor is null.
Throws std::invalid_argument if lock is not locked at the moment of calling this method.
Might throw std::system_error if the underlying std::mutex throws.
*/
lazy_result< void > await (std::shared_ptr<executor> resume_executor, scoped_async_lock& lock);
/*
Equivalent to:
while (!pred()) {
co_await await(resume_executor, lock);
}
Might throw any exception that await(resume_executor, lock) might throw.
Might throw any exception that pred might throw.
*/
template < class predicate_type >
lazy_result< void > await (std::shared_ptr<executor> resume_executor, scoped_async_lock& lock, predicate_type pred);
/*
Dequeues one task from *this suspension-queue and resumes it, if any available at the moment of calling this method.
The suspended task is resumed by scheduling it to run on the executor given when await was called.
Might throw std::system_error if the underlying std::mutex throws.
*/
void notify_one ();
/*
Dequeues all tasks from *this suspension-queue and resumes them, if any available at the moment of calling this method.
The suspended tasks are resumed by scheduling them to run on the executors given when await was called.
Might throw std::system_error if the underlying std::mutex throws.
*/
void notify_all ();
};async_condition_variable example: In this example, async_lock and async_condition_variable work together to implement a concurrent queue that can be used to send data (in this example, integers) between tasks. Note that some methods return a result while another return lazy_result , showing how both eager and lazy tasks can work together.
# include " concurrencpp/concurrencpp.h "
# include < queue >
# include < iostream >
using namespace concurrencpp ;
class concurrent_queue {
private:
async_lock _lock;
async_condition_variable _cv;
std::queue< int > _queue;
bool _abort = false ;
public:
concurrent_queue () = default ;
result< void > shutdown (std::shared_ptr<executor> resume_executor) {
{
auto guard = co_await _lock. lock (resume_executor);
_abort = true ;
}
_cv. notify_all ();
}
lazy_result< void > push (std::shared_ptr<executor> resume_executor, int i) {
{
auto guard = co_await _lock. lock (resume_executor);
_queue. push (i);
}
_cv. notify_one ();
}
lazy_result< int > pop (std::shared_ptr<executor> resume_executor) {
auto guard = co_await _lock. lock (resume_executor);
co_await _cv. await (resume_executor, guard, [ this ] {
return _abort || !_queue. empty ();
});
if (!_queue. empty ()) {
auto result = _queue. front ();
_queue. pop ();
co_return result;
}
assert (_abort);
throw std::runtime_error ( " queue has been shut down. " );
}
};
result< void > producer_loop (executor_tag,
std::shared_ptr<thread_pool_executor> tpe,
concurrent_queue& queue,
int range_start,
int range_end) {
for (; range_start < range_end; ++range_start) {
co_await queue. push (tpe, range_start);
}
}
result< void > consumer_loop (executor_tag, std::shared_ptr<thread_pool_executor> tpe, concurrent_queue& queue) {
try {
while ( true ) {
std::cout << co_await queue. pop (tpe) << std::endl;
}
} catch ( const std:: exception & e) {
std::cerr << e. what () << std::endl;
}
}
int main () {
runtime runtime;
const auto thread_pool_executor = runtime. thread_pool_executor ();
concurrent_queue queue;
result< void > producers[ 4 ];
result< void > consumers[ 4 ];
for ( int i = 0 ; i < 4 ; i++) {
producers[i] = producer_loop ({}, thread_pool_executor, queue, i * 5 , (i + 1 ) * 5 );
}
for ( int i = 0 ; i < 4 ; i++) {
consumers[i] = consumer_loop ({}, thread_pool_executor, queue);
}
for ( int i = 0 ; i < 4 ; i++) {
producers[i]. get ();
}
queue. shutdown (thread_pool_executor). get ();
for ( int i = 0 ; i < 4 ; i++) {
consumers[i]. get ();
}
return 0 ;
} The concurrencpp runtime object is the agent used to acquire, store and create new executors.
The runtime must be created as a value type as soon as the main function starts to run. When the concurrencpp runtime gets out of scope, it iterates over its stored executors and shuts them down one by one by calling executor::shutdown . Executors then exit their inner work loop and any subsequent attempt to schedule a new task will throw a concurrencpp::runtime_shutdown exception. The runtime also contains the global timer queue used to create timers and delay objects. Upon destruction, stored executors destroy unexecuted tasks, and wait for ongoing tasks to finish. If an ongoing task tries to use an executor to spawn new tasks or schedule its own task continuation - an exception will be thrown. In this case, ongoing tasks need to quit as soon as possible, allowing their underlying executors to quit. The timer queue will also be shut down, cancelling all running timers. With this RAII style of code, no tasks can be processed before the creation of the runtime object, and while/after the runtime gets out of scope. This frees concurrent applications from needing to communicate termination messages explicitly. Tasks are free use executors as long as the runtime object is alive.
runtime API class runtime {
/*
Creates a runtime object with default options.
*/
runtime ();
/*
Creates a runtime object with user defined options.
*/
runtime ( const concurrencpp::runtime_options& options);
/*
Destroys this runtime object.
Calls executor::shutdown on each monitored executor.
Calls timer_queue::shutdown on the global timer queue.
*/
~runtime () noexcept ;
/*
Returns this runtime timer queue used to create new times.
*/
std::shared_ptr<concurrencpp::timer_queue> timer_queue () const noexcept ;
/*
Returns this runtime concurrencpp::inline_executor
*/
std::shared_ptr<concurrencpp::inline_executor> inline_executor () const noexcept ;
/*
Returns this runtime concurrencpp::thread_pool_executor
*/
std::shared_ptr<concurrencpp::thread_pool_executor> thread_pool_executor () const noexcept ;
/*
Returns this runtime concurrencpp::background_executor
*/
std::shared_ptr<concurrencpp::thread_pool_executor> background_executor () const noexcept ;
/*
Returns this runtime concurrencpp::thread_executor
*/
std::shared_ptr<concurrencpp::thread_executor> thread_executor () const noexcept ;
/*
Creates a new concurrencpp::worker_thread_executor and registers it in this runtime.
Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
*/
std::shared_ptr<concurrencpp::worker_thread_executor> make_worker_thread_executor ();
/*
Creates a new concurrencpp::manual_executor and registers it in this runtime.
Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
*/
std::shared_ptr<concurrencpp::manual_executor> make_manual_executor ();
/*
Creates a new user defined executor and registers it in this runtime.
executor_type must be a valid concrete class of concurrencpp::executor.
Might throw std::bad_alloc if no memory is available.
Might throw any exception that the constructor of <<executor_type>> might throw.
*/
template < class executor_type , class ... argument_types>
std::shared_ptr<executor_type> make_executor (argument_types&& ... arguments);
/*
returns the version of concurrencpp that the library was built with.
*/
static std::tuple< unsigned int , unsigned int , unsigned int > version () noexcept ;
}; In some cases, applications are interested in monitoring thread creation and termination, for example, some memory allocators require new threads to be registered and unregistered upon their creation and termination. The concurrencpp runtime allows setting a thread creation callback and a thread termination callback. those callbacks will be called whenever one of the concurrencpp workers create a new thread and when that thread is terminating. Those callbacks are always called from inside the created/terminating thread, so std::this_thread::get_id will always return the relevant thread ID. The signature of those callbacks is void callback (std::string_view thread_name) . thread_name is a concurrencpp specific title that is given to the thread and can be observed in some debuggers that present the thread name. The thread name is not guaranteed to be unique and should be used for logging and debugging.
In order to set a thread-creation callback and/or a thread termination callback, applications can set the thread_started_callback and/or thread_terminated_callback members of the runtime_options which is passed to the runtime constructor. Since those callbacks are copied to each concurrencpp worker that might create threads, those callbacks have to be copiable.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::runtime_options options;
options. thread_started_callback = [](std::string_view thread_name) {
std::cout << " A new thread is starting to run, name: " << thread_name << " , thread id: " << std::this_thread::get_id ()
<< std::endl;
};
options. thread_terminated_callback = [](std::string_view thread_name) {
std::cout << " A thread is terminating, name: " << thread_name << " , thread id: " << std::this_thread::get_id () << std::endl;
};
concurrencpp::runtime runtime (options);
const auto timer_queue = runtime. timer_queue ();
const auto thread_pool_executor = runtime. thread_pool_executor ();
concurrencpp::timer timer =
timer_queue-> make_timer ( std::chrono::milliseconds ( 100 ), std::chrono::milliseconds ( 500 ), thread_pool_executor, [] {
std::cout << " A timer callable is executing " << std::endl;
});
std::this_thread::sleep_for ( std::chrono::seconds ( 3 ));
return 0 ;
}Possible output:
A new thread is starting to run, name: concurrencpp::timer_queue worker, thread id: 7496
A new thread is starting to run, name: concurrencpp::thread_pool_executor worker, thread id: 21620
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A thread is terminating, name: concurrencpp::timer_queue worker, thread id: 7496
A thread is terminating, name: concurrencpp::thread_pool_executor worker, thread id: 21620
Applications can create their own custom executor type by inheriting the derivable_executor class. There are a few points to consider when implementing user defined executors: The most important thing is to remember that executors are used from multiple threads, so implemented methods must be thread-safe.
New executors can be created using runtime::make_executor . Applications must not create new executors with plain instantiation (such as std::make_shared or plain new ), only by using runtime::make_executor . Also, applications must not try to re-instantiate the built-in concurrencpp executors, like the thread_pool_executor or the thread_executor , those executors must only be accessed through their existing instances in the runtime object.
Another important point is to handle shutdown correctly: shutdown , shutdown_requested and enqueue should all monitor the executor state and behave accordingly when invoked:
shutdown should tell underlying threads to quit and then join them.shutdown might be called multiple times, and the method must handle this scenario by ignoring any subsequent calls to shutdown after the first invocation.enqueue must throw a concurrencpp::errors::runtime_shutdown exception if shutdown had been called before. task objects Implementing executors is one of the rare cases where applications need to work with concurrencpp::task class directly. concurrencpp::task is an std::function like object, but with a few differences. Like std::function , the task object stores a callable that acts as the asynchronous operation. Unlike std::function , task is a move only type. On invocation, task objects receive no parameters and return void . Moreover, every task object can be invoked only once. After the first invocation, the task object becomes empty. Invoking an empty task object is equivalent to invoking an empty lambda ( []{} ), and will not throw any exception. Task objects receive their callable as a forwarding reference ( type&& where type is a template parameter), and not by copy (like std::function ). Construction of the stored callable happens in-place. This allows task objects to contain callables that are move-only type (like std::unique_ptr and concurrencpp::result ). Task objects try to use different methods to optimize the usage of the stored types, for example, task objects apply the short-buffer-optimization (sbo) for regular, small callables, and will inline calls to std::coroutine_handle<void> by calling them directly without virtual dispatch.
task API class task {
/*
Creates an empty task object.
*/
task () noexcept ;
/*
Creates a task object by moving the stored callable of rhs to *this.
If rhs is empty, then *this will also be empty after construction.
After this call, rhs is empty.
*/
task (task&& rhs) noexcept ;
/*
Creates a task object by storing callable in *this.
<<typename std::decay<callable_type>::type>> will be in-place-
constructed inside *this by perfect forwarding callable.
*/
template < class callable_type >
task (callable_type&& callable);
/*
Destroys stored callable, does nothing if empty.
*/
~task () noexcept ;
/*
If *this is empty, does nothing.
Invokes stored callable, and immediately destroys it.
After this call, *this is empty.
May throw any exception that the invoked callable may throw.
*/
void operator ()();
/*
Moves the stored callable of rhs to *this.
If rhs is empty, then *this will also be empty after this call.
If *this already contains a stored callable, operator = destroys it first.
*/
task& operator =(task&& rhs) noexcept ;
/*
If *this is not empty, task::clear destroys the stored callable and empties *this.
If *this is empty, clear does nothing.
*/
void clear () noexcept ;
/*
Returns true if *this stores a callable. false otherwise.
*/
explicit operator bool () const noexcept ;
/*
Returns true if *this stores a callable,
and that stored callable has the same type as <<typename std::decay<callable_type>::type>>
*/
template < class callable_type >
bool contains () const noexcept ;
}; When implementing user-defined executors, it is up to the implementation to store task objects (when enqueue is called), and execute them according to the executor inner-mechanism.
In this example, we create an executor which logs actions like enqueuing tasks or executing them. We implement the executor interface, and we request the runtime to create and store an instance of it by calling runtime::make_executor . The rest of the application behaves exactly the same as if we were to use non user-defined executors.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < queue >
# include < thread >
# include < mutex >
# include < condition_variable >
class logging_executor : public concurrencpp ::derivable_executor<logging_executor> {
private:
mutable std::mutex _lock;
std::queue<concurrencpp::task> _queue;
std::condition_variable _condition;
bool _shutdown_requested;
std::thread _thread;
const std::string _prefix;
void work_loop () {
while ( true ) {
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
return ;
}
if (!_queue. empty ()) {
auto task = std::move (_queue. front ());
_queue. pop ();
lock. unlock ();
std::cout << _prefix << " A task is being executed " << std::endl;
task ();
continue ;
}
_condition. wait (lock, [ this ] {
return !_queue. empty () || _shutdown_requested;
});
}
}
public:
logging_executor (std::string_view prefix) :
derivable_executor<logging_executor>( " logging_executor " ),
_shutdown_requested ( false ),
_prefix (prefix) {
_thread = std::thread ([ this ] {
work_loop ();
});
}
void enqueue (concurrencpp::task task) override {
std::cout << _prefix << " A task is being enqueued! " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
throw concurrencpp::errors::runtime_shutdown ( " logging executor - executor was shutdown. " );
}
_queue. emplace ( std::move (task));
_condition. notify_one ();
}
void enqueue (std::span<concurrencpp::task> tasks) override {
std::cout << _prefix << tasks. size () << " tasks are being enqueued! " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
throw concurrencpp::errors::runtime_shutdown ( " logging executor - executor was shutdown. " );
}
for ( auto & task : tasks) {
_queue. emplace ( std::move (task));
}
_condition. notify_one ();
}
int max_concurrency_level () const noexcept override {
return 1 ;
}
bool shutdown_requested () const noexcept override {
std::unique_lock<std::mutex> lock (_lock);
return _shutdown_requested;
}
void shutdown () noexcept override {
std::cout << _prefix << " shutdown requested " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) return ; // nothing to do.
_shutdown_requested = true ;
lock. unlock ();
_condition. notify_one ();
_thread. join ();
}
};
int main () {
concurrencpp::runtime runtime;
auto logging_ex = runtime. make_executor <logging_executor>( " Session #1234 " );
for ( size_t i = 0 ; i < 10 ; i++) {
logging_ex-> post ([] {
std::cout << " hello world " << std::endl;
});
}
std::getchar ();
return 0 ;
}$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S . -B build /lib
$ cmake -- build build /lib --config Release$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S test -B build / test
$ cmake -- build build / test
< # for release mode: cmake --build build/test --config Release #>
$ cd build / test
$ ctest . -V -C Debug
< # for release mode: ctest . -V -C Release #> $ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -DCMAKE_BUILD_TYPE=Release -S . -B build /lib
$ cmake -- build build /lib
#optional, install the library: sudo cmake --install build/lib With clang and gcc, it is also possible to run the tests with TSAN (thread sanitizer) support.
$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S test -B build / test
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S test -B build/test
#for TSAN mode: cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_THREAD_SANITIZER=Yes -S test -B build/test
$ cmake -- build build / test
$ cd build / test
$ ctest . -V When compiling on Linux, the library tries to use libstdc++ by default. If you intend to use libc++ as your standard library implementation, CMAKE_TOOLCHAIN_FILE flag should be specified as below:
$ cmake -DCMAKE_TOOLCHAIN_FILE=../cmake/libc++.cmake -DCMAKE_BUILD_TYPE=Release -S . -B build /libAlternatively to building and installing the library manually, developers may get stable releases of concurrencpp via the vcpkg and Conan package managers:
vcpkg:
$ vcpkg install concurrencppConan: concurrencpp on ConanCenter
concurrencpp comes with a built-in sandbox program which developers can modify and experiment, without having to install or link the compiled library to a different code-base. In order to play with the sandbox, developers can modify sandbox/main.cpp and compile the application using the following commands:
$ cmake -S sandbox -B build /sandbox
$ cmake -- build build /sandbox
< # for release mode: cmake --build build/sandbox --config Release #>
$ ./ build /sandbox < # runs the sandbox> $ cmake -S sandbox -B build /sandbox
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S sandbox -B build/sandbox
$ cmake -- build build /sandbox
$ ./ build /sandbox #runs the sandbox