La biblioteca 'CPPCORO' proporciona un gran conjunto de primitivas de uso general para hacer uso de la propuesta de Coroutinas TS descrita en N4680.
Estos incluyen:
task<T>shared_task<T>generator<T>recursive_generator<T>async_generator<T>single_consumer_eventsingle_consumer_async_auto_reset_eventasync_mutexasync_manual_reset_eventasync_auto_reset_eventasync_latchsequence_barriermulti_producer_sequencersingle_producer_sequencersync_wait()when_all()when_all_ready()fmap()schedule_on()resume_on()cancellation_tokencancellation_sourcecancellation_registrationstatic_thread_poolio_service y io_work_scopefile , readable_file , writable_fileread_only_file , write_only_file , read_write_filesocketip_address , ipv4_address , ipv6_addressip_endpoint , ipv4_endpoint , ipv6_endpointis_awaitable<T>awaitable_traits<T>Awaitable<T>Awaiter<T>SchedulerDelayedSchedulerEsta biblioteca es una biblioteca experimental que está explorando el espacio de abstracciones de programación asíncrona escalable y de alto rendimiento que se pueden construir sobre la propuesta de Coroutinas C ++.
Ha sido de origen abierto con la esperanza de que otros les resulte útil y que la comunidad C ++ pueda proporcionar comentarios y formas de mejorarlo.
Requiere un compilador que admite las coroutinas TS:
La versión de Linux es funcional, excepto para las clases relacionadas con io_context y Archone I/O que aún no se han implementado para Linux (consulte el número 15 para obtener más información).
task<T>Una tarea representa un cálculo asincrónico que se ejecuta perezosamente en el sentido de que la ejecución de la coroutina no comienza hasta que se espera la tarea.
Ejemplo:
# include < cppcoro/read_only_file.hpp >
# include < cppcoro/task.hpp >
cppcoro::task< int > count_lines (std::string path)
{
auto file = co_await cppcoro::read_only_file::open (path);
int lineCount = 0 ;
char buffer[ 1024 ];
size_t bytesRead;
std:: uint64_t offset = 0 ;
do
{
bytesRead = co_await file. read (offset, buffer, sizeof (buffer));
lineCount += std::count (buffer, buffer + bytesRead, ' n ' );
offset += bytesRead;
} while (bytesRead > 0 );
co_return lineCount;
}
cppcoro::task<> usage_example ()
{
// Calling function creates a new task but doesn't start
// executing the coroutine yet.
cppcoro::task< int > countTask = count_lines ( " foo.txt " );
// ...
// Coroutine is only started when we later co_await the task.
int lineCount = co_await countTask;
std::cout << " line count = " << lineCount << std::endl;
}Descripción general de la API:
// <cppcoro/task.hpp>
namespace cppcoro
{
template < typename T>
class task
{
public:
using promise_type = <unspecified>;
using value_type = T;
task () noexcept ;
task (task&& other) noexcept ;
task& operator =(task&& other);
// task is a move-only type.
task ( const task& other) = delete ;
task& operator =( const task& other) = delete ;
// Query if the task result is ready.
bool is_ready () const noexcept ;
// Wait for the task to complete and return the result or rethrow the
// exception if the operation completed with an unhandled exception.
//
// If the task is not yet ready then the awaiting coroutine will be
// suspended until the task completes. If the the task is_ready() then
// this operation will return the result synchronously without suspending.
Awaiter<T&> operator co_await () const & noexcept ;
Awaiter<T&&> operator co_await () const && noexcept ;
// Returns an awaitable that can be co_await'ed to suspend the current
// coroutine until the task completes.
//
// The 'co_await t.when_ready()' expression differs from 'co_await t' in
// that when_ready() only performs synchronization, it does not return
// the result or rethrow the exception.
//
// This can be useful if you want to synchronize with the task without
// the possibility of it throwing an exception.
Awaitable< void > when_ready () const noexcept ;
};
template < typename T>
void swap (task<T>& a, task<T>& b);
// Creates a task that yields the result of co_await'ing the specified awaitable.
//
// This can be used as a form of type-erasure of the concrete awaitable, allowing
// different awaitables that return the same await-result type to be stored in
// the same task<RESULT> type.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t >
task<RESULT> make_task (AWAITABLE awaitable);
} Puede crear una task<T> objeto llamando a una función de Coroutine que devuelve una task<T> .
La coroutina debe contener un uso de co_await o co_return . Tenga en cuenta que una task<T> Coroutine no puede usar la palabra clave co_yield .
Cuando se llama a una corutina que devuelve una task<T> , se asigna un marco de coroutina si es necesario y los parámetros se capturan en el marco de Coroutine. El Coroutine se suspende al comienzo del cuerpo de Coroutine y la ejecución se devuelve a la persona que llama y un valor task<T> que representa el cálculo asincrónico se devuelve de la llamada de función.
El cuerpo de Coroutine comenzará a ejecutar cuando la task<T> valor sea co_await ed. Esto suspenderá la coroutina en espera y comenzará la ejecución de la coroutina asociada con el valor de la task<T> .
El Coroutine en espera luego se reanudará en el hilo que completa la ejecución de la coroutina de la task<T> . es decir. El hilo que ejecuta el co_return o que lanza una excepción no controlada que termina la ejecución de la coroutina.
Si la tarea ya se ha ejecutado hasta la finalización, entonces la espera nuevamente obtendrá el resultado ya computado sin suspender la coroutina en espera.
Si el objeto task se destruye antes de que se espere, la coroutina nunca se ejecuta y el destructor simplemente destruye los parámetros capturados y libera cualquier memoria utilizada por el marco de la coroutina.
shared_task<T> La clase shared_task<T> es un tipo de coroutina que produce un solo valor asincrónico.
Es 'perezoso' en esa ejecución de la tarea no comienza hasta que la coroutina la espera.
Se 'comparte' en que el valor de la tarea se puede copiar, lo que permite múltiples referencias al resultado de la tarea que se creará. También permite que múltiples coroutinas esperen simultáneamente el resultado.
La tarea comenzará a ejecutar en el hilo que primero co_await es la tarea. Los pilotos posteriores serán suspendidos y se colocarán en cola para su reanudación cuando la tarea se complete o continuará sincrónicamente si la tarea ya se ha ejecutado hasta su finalización.
Si se suspende a un agresor mientras espera que la tarea se complete, se reanudará en el hilo que completa la ejecución de la tarea. es decir. El hilo que ejecuta el co_return o que lanza la excepción no controlada que termina la ejecución de la coroutina.
Resumen de la API
namespace cppcoro
{
template < typename T = void >
class shared_task
{
public:
using promise_type = <unspecified>;
using value_type = T;
shared_task () noexcept ;
shared_task ( const shared_task& other) noexcept ;
shared_task (shared_task&& other) noexcept ;
shared_task& operator =( const shared_task& other) noexcept ;
shared_task& operator =(shared_task&& other) noexcept ;
void swap (shared_task& other) noexcept ;
// Query if the task has completed and the result is ready.
bool is_ready () const noexcept ;
// Returns an operation that when awaited will suspend the
// current coroutine until the task completes and the result
// is available.
//
// The type of the result of the 'co_await someTask' expression
// is an l-value reference to the task's result value (unless T
// is void in which case the expression has type 'void').
// If the task completed with an unhandled exception then the
// exception will be rethrown by the co_await expression.
Awaiter<T&> operator co_await () const noexcept ;
// Returns an operation that when awaited will suspend the
// calling coroutine until the task completes and the result
// is available.
//
// The result is not returned from the co_await expression.
// This can be used to synchronize with the task without the
// possibility of the co_await expression throwing an exception.
Awaiter< void > when_ready () const noexcept ;
};
template < typename T>
bool operator ==( const shared_task<T>& a, const shared_task<T>& b) noexcept ;
template < typename T>
bool operator !=( const shared_task<T>& a, const shared_task<T>& b) noexcept ;
template < typename T>
void swap (shared_task<T>& a, shared_task<T>& b) noexcept ;
// Wrap an awaitable value in a shared_task to allow multiple coroutines
// to concurrently await the result.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t >
shared_task<RESULT> make_shared_task (AWAITABLE awaitable);
} Todos los métodos constantes en shared_task<T> son seguros de llamar simultáneamente con otros métodos constantes en la misma instancia de múltiples hilos. No es seguro llamar a los métodos no constados de shared_task<T> simultáneamente con cualquier otro método en la misma instancia de un shared_task<T> .
task<T> La clase shared_task<T> es similar a task<T> en que la tarea no comienza a la ejecución inmediatamente tras la función de Coroutine que se llama. La tarea solo comienza a ejecutar cuando se espera por primera vez.
Se diferencia de task<T> en que el objeto de tarea resultante se puede copiar, lo que permite que múltiples objetos de tarea haga referencia al mismo resultado asincrónico. También admite múltiples corutinas que contienen al mismo tiempo el resultado de la tarea.
La compensación es que el resultado es siempre una referencia de valor L al resultado, nunca una referencia de valor R (ya que el resultado puede compartirse) que puede limitar la capacidad de mover el resultado en una variable local. También tiene un costo de tiempo de ejecución ligeramente más alto debido a la necesidad de mantener un recuento de referencias y apoyar múltiples a los esperanzas.
generator<T> Un generator representa un tipo de coroutina que produce una secuencia de valores de tipo, T , donde los valores se producen perezosamente y sincrónicamente.
El cuerpo de Coroutine puede producir valores del Tipo T utilizando la palabra clave co_yield . Tenga en cuenta, sin embargo, que el cuerpo de Coroutine no puede usar la palabra clave co_await ; Los valores deben producirse sincrónicamente.
Por ejemplo:
cppcoro::generator< const std:: uint64_t > fibonacci ()
{
std:: uint64_t a = 0 , b = 1 ;
while ( true )
{
co_yield b;
auto tmp = a;
a = b;
b += tmp;
}
}
void usage ()
{
for ( auto i : fibonacci ())
{
if (i > 1'000'000 ) break ;
std::cout << i << std::endl;
}
} Cuando una función de Coroutina que devuelve un generator<T> se llama Coroutine, se crea inicialmente suspendido. La ejecución de la coroutina ingresa al cuerpo de la coroutina cuando se llama y continúa el método del generator<T>::begin() hasta que se alcanza la primera declaración co_yield o la coroutina funciona hasta su finalización.
Si el iterador devuelto no es igual al iterador end() entonces desamparando el iterador devolverá una referencia al valor pasado a la instrucción co_yield .
Llamar operator++() en el iterador reanudará la ejecución de la coroutina y continuará hasta que se alcance el próximo punto co_yield o el Coroutine se desarrolle hasta su finalización ().
Cualquier excepción no controlada lanzadas por el Coroutine se propagará fuera de las llamadas begin() o operator++() a la persona que llama.
Resumen de la API:
namespace cppcoro
{
template < typename T>
class generator
{
public:
using promise_type = <unspecified>;
class iterator
{
public:
using iterator_category = std::input_iterator_tag;
using value_type = std:: remove_reference_t <T>;
using reference = value_type&;
using pointer = value_type*;
using difference_type = std:: size_t ;
iterator ( const iterator& other) noexcept ;
iterator& operator =( const iterator& other) noexcept ;
// If the generator coroutine throws an unhandled exception before producing
// the next element then the exception will propagate out of this call.
iterator& operator ++();
reference operator *() const noexcept ;
pointer operator ->() const noexcept ;
bool operator ==( const iterator& other) const noexcept ;
bool operator !=( const iterator& other) const noexcept ;
};
// Constructs to the empty sequence.
generator () noexcept ;
generator (generator&& other) noexcept ;
generator& operator =(generator&& other) noexcept ;
generator ( const generator& other) = delete ;
generator& operator =( const generator&) = delete ;
~generator ();
// Starts executing the generator coroutine which runs until either a value is yielded
// or the coroutine runs to completion or an unhandled exception propagates out of the
// the coroutine.
iterator begin ();
iterator end () noexcept ;
// Swap the contents of two generators.
void swap (generator& other) noexcept ;
};
template < typename T>
void swap (generator<T>& a, generator<T>& b) noexcept ;
// Apply function, func, lazily to each element of the source generator
// and yield a sequence of the results of calls to func().
template < typename FUNC, typename T>
generator<std:: invoke_result_t <FUNC, T&>> fmap (FUNC func, generator<T> source);
}recursive_generator<T> Un recursive_generator es similar a un generator , excepto que está diseñado para apoyar de manera más eficiente producir los elementos de una secuencia anidada como elementos de una secuencia externa.
Además de poder co_yield un valor del tipo T también puede co_yield un valor de tipo recursive_generator<T> .
Cuando co_yield un valor recursive_generator<T> todos los elementos del generador producido se producen como elementos del generador de corriente. La coroutina actual se suspende hasta que el consumidor haya terminado de consumir todos los elementos del generador anidado, después de lo cual la ejecución del punto de la coroutina actual reanudará la ejecución para producir el siguiente elemento.
El beneficio de recursive_generator<T> sobre generator<T> para iterar sobre estructuras de datos recursivas es que el iterator::operator++() puede reanudar directamente la coroutina más hojas para producir el siguiente elemento, en lugar de tener que reanudar/suspender los coroutines o (profundidad) para cada elemento. La parte baja es que hay una sobrecarga adicional
Por ejemplo:
// Lists the immediate contents of a directory.
cppcoro::generator<dir_entry> list_directory (std::filesystem::path path);
cppcoro::recursive_generator<dir_entry> list_directory_recursive (std::filesystem::path path)
{
for ( auto & entry : list_directory (path))
{
co_yield entry;
if (entry. is_directory ())
{
co_yield list_directory_recursive (entry. path ());
}
}
} Tenga en cuenta que la aplicación del operador fmap() a un recursive_generator<T> producirá un tipo de generator<U> en lugar de un recursive_generator<U> . Esto se debe a que los usos de fmap generalmente no se usan en contextos recursivos e intentamos evitar la sobrecarga adicional incurrida por recursive_generator .
async_generator<T> Un async_generator representa un tipo de coroutina que produce una secuencia de valores de tipo, T , donde los valores se producen perezosamente y los valores pueden producirse de manera asincrónica.
El cuerpo de Coroutine puede usar las expresiones co_await y co_yield .
Los consumidores del generador pueden usar un bucle for co_await para consumir los valores.
Ejemplo
cppcoro::async_generator< int > ticker ( int count, threadpool& tp)
{
for ( int i = 0 ; i < count; ++i)
{
co_await tp. delay ( std::chrono::seconds ( 1 ));
co_yield i;
}
}
cppcoro::task<> consumer (threadpool& tp)
{
auto sequence = ticker ( 10 , tp);
for co_await (std:: uint32_t i : sequence)
{
std::cout << " Tick " << i << std::endl;
}
}Resumen de la API
// <cppcoro/async_generator.hpp>
namespace cppcoro
{
template < typename T>
class async_generator
{
public:
class iterator
{
public:
using iterator_tag = std::forward_iterator_tag;
using difference_type = std:: size_t ;
using value_type = std:: remove_reference_t <T>;
using reference = value_type&;
using pointer = value_type*;
iterator ( const iterator& other) noexcept ;
iterator& operator =( const iterator& other) noexcept ;
// Resumes the generator coroutine if suspended
// Returns an operation object that must be awaited to wait
// for the increment operation to complete.
// If the coroutine runs to completion then the iterator
// will subsequently become equal to the end() iterator.
// If the coroutine completes with an unhandled exception then
// that exception will be rethrown from the co_await expression.
Awaitable<iterator&> operator ++() noexcept ;
// Dereference the iterator.
pointer operator ->() const noexcept ;
reference operator *() const noexcept ;
bool operator ==( const iterator& other) const noexcept ;
bool operator !=( const iterator& other) const noexcept ;
};
// Construct to the empty sequence.
async_generator () noexcept ;
async_generator ( const async_generator&) = delete ;
async_generator (async_generator&& other) noexcept ;
~async_generator ();
async_generator& operator =( const async_generator&) = delete ;
async_generator& operator =(async_generator&& other) noexcept ;
void swap (async_generator& other) noexcept ;
// Starts execution of the coroutine and returns an operation object
// that must be awaited to wait for the first value to become available.
// The result of co_await'ing the returned object is an iterator that
// can be used to advance to subsequent elements of the sequence.
//
// This method is not valid to be called once the coroutine has
// run to completion.
Awaitable<iterator> begin () noexcept ;
iterator end () noexcept ;
};
template < typename T>
void swap (async_generator<T>& a, async_generator<T>& b);
// Apply 'func' to each element of the source generator, yielding a sequence of
// the results of calling 'func' on the source elements.
template < typename FUNC, typename T>
async_generator<std:: invoke_result_t <FUNC, T&>> fmap (FUNC func, async_generator<T> source);
} Cuando se destruye el objeto async_generator , solicita la cancelación de la coroutina subyacente. Si la coroutina ya se ha completado o actualmente está suspendida en una expresión co_yield , la coroutina se destruye de inmediato. De lo contrario, la coroutina continuará la ejecución hasta que se ejecute hasta su finalización o alcance la siguiente expresión co_yield .
Cuando se destruye el marco de Coroutine, los destructores de todas las variables en el alcance en ese punto se ejecutarán para garantizar que los recursos del generador se limpien.
Tenga en cuenta que la persona que llama debe asegurarse de que el objeto async_generator no debe destruirse mientras una coroutina de consumo está ejecutando una expresión co_await esperando que se produzca el siguiente elemento.
single_consumer_eventEste es un tipo de evento de resolución manual simple que admite una sola corutina que lo espera a la vez. Esto se puede usar para
Resumen de la API:
// <cppcoro/single_consumer_event.hpp>
namespace cppcoro
{
class single_consumer_event
{
public:
single_consumer_event ( bool initiallySet = false ) noexcept ;
bool is_set () const noexcept ;
void set ();
void reset () noexcept ;
Awaiter< void > operator co_await () const noexcept ;
};
}Ejemplo:
# include < cppcoro/single_consumer_event.hpp >
cppcoro::single_consumer_event event;
std::string value;
cppcoro::task<> consumer ()
{
// Coroutine will suspend here until some thread calls event.set()
// eg. inside the producer() function below.
co_await event;
std::cout << value << std::endl;
}
void producer ()
{
value = " foo " ;
// This will resume the consumer() coroutine inside the call to set()
// if it is currently suspended.
event. set ();
}single_consumer_async_auto_reset_event Esta clase proporciona una primitiva de sincronización asíncrona que permite que una sola coroutina espere hasta que el evento sea señalado por una llamada al método set() .
Una vez que la coroutina que está en espera del evento es lanzada por una llamada anterior o posterior para set() el evento se restablece automáticamente al estado 'no establecido'.
Esta clase es una versión más eficiente de async_auto_reset_event que puede usarse en casos en que solo una coroutina solo estará esperando el evento a la vez. Si necesita admitir múltiples coroutinas de espera concurrentes en el evento, use la clase async_auto_reset_event en su lugar.
Resumen de la API:
// <cppcoro/single_consumer_async_auto_reset_event.hpp>
namespace cppcoro
{
class single_consumer_async_auto_reset_event
{
public:
single_consumer_async_auto_reset_event (
bool initiallySet = false ) noexcept ;
// Change the event to the 'set' state. If a coroutine is awaiting the
// event then the event is immediately transitioned back to the 'not set'
// state and the coroutine is resumed.
void set () noexcept ;
// Returns an Awaitable type that can be awaited to wait until
// the event becomes 'set' via a call to the .set() method. If
// the event is already in the 'set' state then the coroutine
// continues without suspending.
// The event is automatically reset back to the 'not set' state
// before resuming the coroutine.
Awaiter< void > operator co_await () const noexcept ;
};
}Ejemplo de uso:
std::atomic< int > value;
cppcoro::single_consumer_async_auto_reset_event valueDecreasedEvent;
cppcoro::task<> wait_until_value_is_below ( int limit)
{
while (value. load (std::memory_order_relaxed) >= limit)
{
// Wait until there has been some change that we're interested in.
co_await valueDecreasedEvent;
}
}
void change_value ( int delta)
{
value. fetch_add (delta, std::memory_order_relaxed);
// Notify the waiter if there has been some change.
if (delta < 0 ) valueDecreasedEvent. set ();
}async_mutexProporciona una simple abstracción de exclusión mutua que le permite a la persona que llama 'co_await' el mutex desde dentro de una coroutina suspender la corutina hasta que se adquiere la cerradura mutex.
La implementación está libre de bloqueo en que una coroutina que espera el Mutex no bloqueará el hilo, sino que suspenderá la coroutina y luego la reanudará dentro de la llamada para unlock() por el anterior titular de bloqueo.
Resumen de la API:
// <cppcoro/async_mutex.hpp>
namespace cppcoro
{
class async_mutex_lock ;
class async_mutex_lock_operation ;
class async_mutex_scoped_lock_operation ;
class async_mutex
{
public:
async_mutex () noexcept ;
~async_mutex ();
async_mutex ( const async_mutex&) = delete ;
async_mutex& operator ( const async_mutex&) = delete;
bool try_lock () noexcept ;
async_mutex_lock_operation lock_async () noexcept ;
async_mutex_scoped_lock_operation scoped_lock_async () noexcept ;
void unlock ();
};
class async_mutex_lock_operation
{
public:
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () const noexcept ;
};
class async_mutex_scoped_lock_operation
{
public:
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
[[nodiscard]] async_mutex_lock await_resume () const noexcept ;
};
class async_mutex_lock
{
public:
// Takes ownership of the lock.
async_mutex_lock (async_mutex& mutex, std:: adopt_lock_t ) noexcept ;
// Transfer ownership of the lock.
async_mutex_lock (async_mutex_lock&& other) noexcept ;
async_mutex_lock ( const async_mutex_lock&) = delete ;
async_mutex_lock& operator =( const async_mutex_lock&) = delete ;
// Releases the lock by calling unlock() on the mutex.
~async_mutex_lock ();
};
}Ejemplo de uso:
# include < cppcoro/async_mutex.hpp >
# include < cppcoro/task.hpp >
# include < set >
# include < string >
cppcoro::async_mutex mutex;
std::set<std::string> values;
cppcoro::task<> add_item (std::string value)
{
cppcoro::async_mutex_lock lock = co_await mutex. scoped_lock_async ();
values. insert ( std::move (value));
}async_manual_reset_event Un evento de reinicio manual es un primitivo de coroutina/sincronización de hilos que permite que uno o más hilos esperen hasta que el evento sea señalado por un hilo que llama set() .
El evento está en uno de los dos estados; 'set' y 'no establecido' .
Si el evento está en el estado 'establecido' cuando una coroutina espera el evento, entonces la coroutina continúa sin suspender. Sin embargo, si la coroutina está en el estado 'no establecido' , la coroutina se suspende hasta que algún hilo llama posteriormente el método set() .
Cualquier hilo que se suspendiera mientras se esperaba que el evento se reanude se reanudará dentro de la próxima llamada para set() por algún hilo.
Tenga en cuenta que debe asegurarse de que no hay coroutinas esperando un evento 'no establecido' cuando el evento se destruya, ya que no se reanudan.
Ejemplo:
cppcoro::async_manual_reset_event event;
std::string value;
void producer ()
{
value = get_some_string_value ();
// Publish a value by setting the event.
event. set ();
}
// Can be called many times to create many tasks.
// All consumer tasks will wait until value has been published.
cppcoro::task<> consumer ()
{
// Wait until value has been published by awaiting event.
co_await event;
consume_value (value);
}Resumen de la API:
namespace cppcoro
{
class async_manual_reset_event_operation ;
class async_manual_reset_event
{
public:
async_manual_reset_event ( bool initiallySet = false ) noexcept ;
~async_manual_reset_event ();
async_manual_reset_event ( const async_manual_reset_event&) = delete ;
async_manual_reset_event (async_manual_reset_event&&) = delete ;
async_manual_reset_event& operator =( const async_manual_reset_event&) = delete ;
async_manual_reset_event& operator =(async_manual_reset_event&&) = delete ;
// Wait until the event becomes set.
async_manual_reset_event_operation operator co_await () const noexcept ;
bool is_set () const noexcept ;
void set () noexcept ;
void reset () noexcept ;
};
class async_manual_reset_event_operation
{
public:
async_manual_reset_event_operation (async_manual_reset_event& event) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () const noexcept ;
};
}async_auto_reset_event Un evento de resumen automático es un primitivo de sincronización de coroutina/hilo que permite que uno o más hilos esperen hasta que el evento sea señalado por un hilo llamando set() .
Una vez que una corutina que está a la espera del evento es lanzada por una llamada previa o posterior para set() el evento se restablece automáticamente al estado 'no establecido'.
Resumen de la API:
// <cppcoro/async_auto_reset_event.hpp>
namespace cppcoro
{
class async_auto_reset_event_operation ;
class async_auto_reset_event
{
public:
async_auto_reset_event ( bool initiallySet = false ) noexcept ;
~async_auto_reset_event ();
async_auto_reset_event ( const async_auto_reset_event&) = delete ;
async_auto_reset_event (async_auto_reset_event&&) = delete ;
async_auto_reset_event& operator =( const async_auto_reset_event&) = delete ;
async_auto_reset_event& operator =(async_auto_reset_event&&) = delete ;
// Wait for the event to enter the 'set' state.
//
// If the event is already 'set' then the event is set to the 'not set'
// state and the awaiting coroutine continues without suspending.
// Otherwise, the coroutine is suspended and later resumed when some
// thread calls 'set()'.
//
// Note that the coroutine may be resumed inside a call to 'set()'
// or inside another thread's call to 'operator co_await()'.
async_auto_reset_event_operation operator co_await () const noexcept ;
// Set the state of the event to 'set'.
//
// If there are pending coroutines awaiting the event then one
// pending coroutine is resumed and the state is immediately
// set back to the 'not set' state.
//
// This operation is a no-op if the event was already 'set'.
void set () noexcept ;
// Set the state of the event to 'not-set'.
//
// This is a no-op if the state was already 'not set'.
void reset () noexcept ;
};
class async_auto_reset_event_operation
{
public:
explicit async_auto_reset_event_operation (async_auto_reset_event& event) noexcept ;
async_auto_reset_event_operation ( const async_auto_reset_event_operation& other) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () const noexcept ;
};
}async_latchUn pestillo de asíncrono es un primitivo de sincronización que permite que las coroutinas esperen asincrónicamente hasta que un contador se haya disminuido a cero.
El pestillo es un objeto de uso único. Una vez que el contador alcanza cero, el pestillo se 'listo' y permanecerá listo hasta que el pestillo sea destruido.
Resumen de la API:
// <cppcoro/async_latch.hpp>
namespace cppcoro
{
class async_latch
{
public:
// Initialise the latch with the specified count.
async_latch (std:: ptrdiff_t initialCount) noexcept ;
// Query if the count has reached zero yet.
bool is_ready () const noexcept ;
// Decrement the count by n.
// This will resume any waiting coroutines if the count reaches zero
// as a result of this call.
// It is undefined behaviour to decrement the count below zero.
void count_down (std:: ptrdiff_t n = 1 ) noexcept ;
// Wait until the latch becomes ready.
// If the latch count is not yet zero then the awaiting coroutine will
// be suspended and later resumed by a call to count_down() that decrements
// the count to zero. If the latch count was already zero then the coroutine
// continues without suspending.
Awaiter< void > operator co_await () const noexcept ;
};
}sequence_barrier Un sequence_barrier es una primitiva de sincronización que permite que un solo productor y múltiples consumidores coordinen con respecto a un número de secuencia que aumenta monotónicamente.
Un solo productor avanza el número de secuencia publicando nuevos números de secuencia en un orden monotónicamente creciente. Uno o más consumidores pueden consultar el último número de secuencia publicada y pueden esperar hasta que se haya publicado un número de secuencia particular.
Se puede utilizar una barrera de secuencia para representar un cursor en un productor de hilo/bufón de anillo de consumo
Consulte el patrón de disruptor LMAX para obtener más antecedentes: https://lmax-exchange.github.io/disrupruptor/files/disruprupor-1.0.pdf
Sinopsis de API:
namespace cppcoro
{
template < typename SEQUENCE = std:: size_t ,
typename TRAITS = sequence_traits<SEQUENCE>>
class sequence_barrier
{
public:
sequence_barrier (SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept ;
~sequence_barrier ();
SEQUENCE last_published () const noexcept ;
// Wait until the specified targetSequence number has been published.
//
// If the operation does not complete synchronously then the awaiting
// coroutine is resumed on the specified scheduler. Otherwise, the
// coroutine continues without suspending.
//
// The co_await expression resumes with the updated last_published()
// value, which is guaranteed to be at least 'targetSequence'.
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<SEQUENCE> wait_until_published (SEQUENCE targetSequence,
SCHEDULER& scheduler) const noexcept ;
void publish (SEQUENCE sequence) noexcept ;
};
}single_producer_sequencer Un single_producer_sequencer es un primitivo de sincronización que se puede usar para coordinar el acceso a un bufón de anillo para un solo productor y uno o más consumidores.
Un productor primero adquiere una o más ranuras en un bufón de anillo, escribe en los elementos de bufón de anillo correspondientes a esas ranuras y finalmente publica los valores escritos en esas ranuras. Un productor nunca puede producir más que los elementos de 'amortiguamiento' antes de dónde se ha consumido el consumidor.
Luego, un consumidor espera que se publiquen ciertos elementos, procesa los elementos y luego notifica al productor cuando ha terminado de procesar elementos publicando el número de secuencia que ha terminado de consumir en un objeto sequence_barrier .
Sinopsis de API:
// <cppcoro/single_producer_sequencer.hpp>
namespace cppcoro
{
template <
typename SEQUENCE = std:: size_t ,
typename TRAITS = sequence_traits<SEQUENCE>>
class single_producer_sequencer
{
public:
using size_type = typename sequence_range<SEQUENCE, TRAITS>::size_type;
single_producer_sequencer (
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
std:: size_t bufferSize,
SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept ;
// Publisher API:
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<SEQUENCE> claim_one (SCHEDULER& scheduler) noexcept ;
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<sequence_range<SEQUENCE>> claim_up_to (
std:: size_t count,
SCHEDULER& scheduler) noexcept ;
void publish (SEQUENCE sequence) noexcept ;
// Consumer API:
SEQUENCE last_published () const noexcept ;
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<SEQUENCE> wait_until_published (
SEQUENCE targetSequence,
SCHEDULER& scheduler) const noexcept ;
};
}Ejemplo de uso:
using namespace cppcoro ;
using namespace std ::chrono ;
struct message
{
int id;
steady_clock::time_point timestamp;
float data;
};
constexpr size_t bufferSize = 16384 ; // Must be power-of-two
constexpr size_t indexMask = bufferSize - 1 ;
message buffer[bufferSize];
task< void > producer (
io_service& ioSvc,
single_producer_sequencer< size_t >& sequencer)
{
auto start = steady_clock::now ();
for ( int i = 0 ; i < 1'000'000 ; ++i)
{
// Wait until a slot is free in the buffer.
size_t seq = co_await sequencer. claim_one (ioSvc);
// Populate the message.
auto & msg = buffer[seq & indexMask];
msg. id = i;
msg. timestamp = steady_clock::now ();
msg. data = 123 ;
// Publish the message.
sequencer. publish (seq);
}
// Publish a sentinel
auto seq = co_await sequencer. claim_one (ioSvc);
auto & msg = buffer[seq & indexMask];
msg. id = - 1 ;
sequencer. publish (seq);
}
task< void > consumer (
static_thread_pool& threadPool,
const single_producer_sequencer< size_t >& sequencer,
sequence_barrier< size_t >& consumerBarrier)
{
size_t nextToRead = 0 ;
while ( true )
{
// Wait until the next message is available
// There may be more than one available.
const size_t available = co_await sequencer. wait_until_published (nextToRead, threadPool);
do {
auto & msg = buffer[nextToRead & indexMask];
if (msg. id == - 1 )
{
consumerBarrier. publish (nextToRead);
co_return ;
}
processMessage (msg);
} while (nextToRead++ != available);
// Notify the producer that we've finished processing
// up to 'nextToRead - 1'.
consumerBarrier. publish (available);
}
}
task< void > example (io_service& ioSvc, static_thread_pool& threadPool)
{
sequence_barrier< size_t > barrier;
single_producer_sequencer< size_t > sequencer{barrier, bufferSize};
co_await when_all (
producer (tp, sequencer),
consumer (tp, sequencer, barrier));
}multi_producer_sequencer La clase multi_producer_sequencer es una primitiva de sincronización que coordina el acceso a un buffer de anillo para múltiples productores y uno o más consumidores.
Para una variante de productor único, consulte la clase single_producer_sequencer .
Tenga en cuenta que el bufón de anillo debe tener un tamaño que sea un poder de dos. Esto se debe a que la implementación usa maestros de bits en lugar de división/módulo enteros para calcular el desplazamiento en el búfer. Además, esto permite que el número de secuencia envuelva de manera segura el valor de 32 bits/64 bits.
Resumen de la API:
// <cppcoro/multi_producer_sequencer.hpp>
namespace cppcoro
{
template < typename SEQUENCE = std:: size_t ,
typename TRAITS = sequence_traits<SEQUENCE>>
class multi_producer_sequencer
{
public:
multi_producer_sequencer (
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
SEQUENCE initialSequence = TRAITS::initial_sequence);
std:: size_t buffer_size () const noexcept ;
// Consumer interface
//
// Each consumer keeps track of their own 'lastKnownPublished' value
// and must pass this to the methods that query for an updated last-known
// published sequence number.
SEQUENCE last_published_after (SEQUENCE lastKnownPublished) const noexcept ;
template < typename SCHEDULER>
Awaitable<SEQUENCE> wait_until_published (
SEQUENCE targetSequence,
SEQUENCE lastKnownPublished,
SCHEDULER& scheduler) const noexcept ;
// Producer interface
// Query whether any slots available for claiming (approx.)
bool any_available () const noexcept ;
template < typename SCHEDULER>
Awaitable<SEQUENCE> claim_one (SCHEDULER& scheduler) noexcept ;
template < typename SCHEDULER>
Awaitable<sequence_range<SEQUENCE, TRAITS>> claim_up_to (
std:: size_t count,
SCHEDULER& scheduler) noexcept ;
// Mark the specified sequence number as published.
void publish (SEQUENCE sequence) noexcept ;
// Mark all sequence numbers in the specified range as published.
void publish ( const sequence_range<SEQUENCE, TRAITS>& range) noexcept ;
};
} Un cancellation_token es un valor que se puede pasar a una función que permite a la persona que llama comunicar posteriormente una solicitud para cancelar la operación a esa función.
Para obtener un cancellation_token que se puede cancelar primero, primero debe crear un objeto cancellation_source . El método cancellation_source::token() se puede usar para fabricar nuevos valores cancellation_token que están vinculados a ese objeto cancellation_source .
Cuando desee solicitar la cancelación posterior de una operación, ha aprobado una cancellation_token , puede llamar a cancellation_source::request_cancellation() en un objeto cancellation_source asociado.
Las funciones pueden responder a una solicitud de cancelación de una de dos maneras:
cancellation_token::is_cancellation_requested() o cancellation_token::throw_if_cancellation_requested() .cancellation_registration .Resumen de la API:
namespace cppcoro
{
class cancellation_source
{
public:
// Construct a new, independently cancellable cancellation source.
cancellation_source ();
// Construct a new reference to the same cancellation state.
cancellation_source ( const cancellation_source& other) noexcept ;
cancellation_source (cancellation_source&& other) noexcept ;
~cancellation_source ();
cancellation_source& operator =( const cancellation_source& other) noexcept ;
cancellation_source& operator =(cancellation_source&& other) noexcept ;
bool is_cancellation_requested () const noexcept ;
bool can_be_cancelled () const noexcept ;
void request_cancellation ();
cancellation_token token () const noexcept ;
};
class cancellation_token
{
public:
// Construct a token that can't be cancelled.
cancellation_token () noexcept ;
cancellation_token ( const cancellation_token& other) noexcept ;
cancellation_token (cancellation_token&& other) noexcept ;
~cancellation_token ();
cancellation_token& operator =( const cancellation_token& other) noexcept ;
cancellation_token& operator =(cancellation_token&& other) noexcept ;
bool is_cancellation_requested () const noexcept ;
void throw_if_cancellation_requested () const ;
// Query if this token can ever have cancellation requested.
// Code can use this to take a more efficient code-path in cases
// that the operation does not need to handle cancellation.
bool can_be_cancelled () const noexcept ;
};
// RAII class for registering a callback to be executed if cancellation
// is requested on a particular cancellation token.
class cancellation_registration
{
public:
// Register a callback to be executed if cancellation is requested.
// Callback will be called with no arguments on the thread that calls
// request_cancellation() if cancellation is not yet requested, or
// called immediately if cancellation has already been requested.
// Callback must not throw an unhandled exception when called.
template < typename CALLBACK>
cancellation_registration (cancellation_token token, CALLBACK&& callback);
cancellation_registration ( const cancellation_registration& other) = delete ;
~cancellation_registration ();
};
class operation_cancelled : public std :: exception
{
public:
operation_cancelled ();
const char * what () const override ;
};
}Ejemplo: enfoque de votación
cppcoro::task<> do_something_async (cppcoro::cancellation_token token)
{
// Explicitly define cancellation points within the function
// by calling throw_if_cancellation_requested().
token. throw_if_cancellation_requested ();
co_await do_step_1 ();
token. throw_if_cancellation_requested ();
do_step_2 ();
// Alternatively, you can query if cancellation has been
// requested to allow yourself to do some cleanup before
// returning.
if (token. is_cancellation_requested ())
{
display_message_to_user ( " Cancelling operation... " );
do_cleanup ();
throw cppcoro::operation_cancelled{};
}
do_final_step ();
}Ejemplo: enfoque de devolución de llamada
// Say we already have a timer abstraction that supports being
// cancelled but it doesn't support cancellation_tokens natively.
// You can use a cancellation_registration to register a callback
// that calls the existing cancellation API. e.g.
cppcoro::task<> cancellable_timer_wait (cppcoro::cancellation_token token)
{
auto timer = create_timer (10s);
cppcoro::cancellation_registration registration (token, [&]
{
// Call existing timer cancellation API.
timer. cancel ();
});
co_await timer;
}static_thread_pool La clase static_thread_pool proporciona una abstracción que le permite programar el trabajo en un grupo de hilos de tamaño fijo.
Esta clase implementa el concepto de programador (ver más abajo).
Puede enqueue el trabajo en el grupo de subprocesos ejecutando co_await threadPool.schedule() . Esta operación suspenderá la coroutina actual, la eneue para su ejecución en el pido de subprocesos y la piscina de subprocesos reanudará la coroutina cuando un hilo en el pecho de subprocesos sea el siguiente libre para ejecutar la coroutina. Esta operación está garantizada para no lanzar y, en el caso común, no asignará ningún recuerdo .
Esta clase utiliza un algoritmo de robo de trabajo para el trabajo de equilibrio de carga en múltiples hilos. El trabajo enqueado a la piscina de subprocesos desde un hilo de pisciobras de hilo se programará para la ejecución en el mismo hilo en una cola LIFO. El trabajo enqueado a la piscina de subprocesos de un hilo remoto se enmarará a una cola Global FIFO. Cuando un hilo de trabajadores se queda sin trabajo de su cola local, primero intenta eliminar el trabajo de la cola global. Si esa cola está vacía, entonces trata de robar el trabajo desde la parte posterior de las colas de los otros hilos de trabajadores.
Resumen de la API:
namespace cppcoro
{
class static_thread_pool
{
public:
// Initialise the thread-pool with a number of threads equal to
// std::thread::hardware_concurrency().
static_thread_pool ();
// Initialise the thread pool with the specified number of threads.
explicit static_thread_pool (std:: uint32_t threadCount);
std:: uint32_t thread_count () const noexcept ;
class schedule_operation
{
public:
schedule_operation (static_thread_pool* tp) noexcept ;
bool await_ready () noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> h) noexcept ;
bool await_resume () noexcept ;
private:
// unspecified
};
// Return an operation that can be awaited by a coroutine.
//
//
[[nodiscard]]
schedule_operation schedule () noexcept ;
private:
// Unspecified
};
}Uso de ejemplo: simple
cppcoro::task<std::string> do_something_on_threadpool (cppcoro::static_thread_pool& tp)
{
// First schedule the coroutine onto the threadpool.
co_await tp. schedule ();
// When it resumes, this coroutine is now running on the threadpool.
do_something ();
} Uso de ejemplo: Hacer las cosas en paralelo - usando el operador schedule_on() con static_thread_pool .
cppcoro::task< double > dot_product (static_thread_pool& tp, double a[], double b[], size_t count)
{
if (count > 1000 )
{
// Subdivide the work recursively into two equal tasks
// The first half is scheduled to the thread pool so it can run concurrently
// with the second half which continues on this thread.
size_t halfCount = count / 2 ;
auto [first, second] = co_await when_all (
schedule_on (tp, dot_product (tp, a, b, halfCount),
dot_product (tp, a + halfCount, b + halfCount, count - halfCount));
co_return first + second;
}
else
{
double sum = 0.0 ;
for ( size_t i = 0 ; i < count; ++i)
{
sum += a[i] * b[i];
}
co_return sum;
}
}io_service y io_work_scope La clase io_service proporciona una abstracción para procesar eventos de finalización de E/S de operaciones de E/S asíncronas.
Cuando se completa una operación de E/S asincrónica, la coroutina que esperaba esa operación se reanudará en un hilo de E/S dentro de una llamada a uno de los métodos de procesamiento de eventos: process_events() , process_pending_events() , process_one_event() o process_one_pending_event() .
La clase io_service no administra ningún hilo de E/S. Debe asegurarse de que algún hilo llame a uno de los métodos de procesamiento de eventos para las corutinas que esperan eventos de finalización de E/S para enviar. Este puede ser un hilo dedicado que llama process_events() o mezclar con algún otro bucle de eventos (por ejemplo, un bucle de evento de interfaz de usuario) sondeando periódicamente para nuevos eventos a través de una llamada a process_pending_events() o process_one_pending_event() .
Esto permite la integración del bucle de evento io_service con otros bucles de eventos, como un bucle de evento de interfaz de usuario.
Puede procesar multiplex de eventos a través de múltiples hilos haciendo múltiples hilos de llamada process_events() . Puede especificar una pista sobre el número máximo de hilos para tener eventos de procesamiento activo a través de un parámetro de constructor io_service opcional.
En Windows, la implementación utiliza la instalación del puerto de finalización de I/O de Windows para enviar eventos a hilos de E/S de manera escalable.
Resumen de la API:
namespace cppcoro
{
class io_service
{
public:
class schedule_operation ;
class timed_schedule_operation ;
io_service ();
io_service (std:: uint32_t concurrencyHint);
io_service (io_service&&) = delete ;
io_service ( const io_service&) = delete ;
io_service& operator =(io_service&&) = delete ;
io_service& operator =( const io_service&) = delete ;
~io_service ();
// Scheduler methods
[[nodiscard]]
schedule_operation schedule () noexcept ;
template < typename REP, typename RATIO>
[[nodiscard]]
timed_schedule_operation schedule_after (
std::chrono::duration<REP, RATIO> delay,
cppcoro::cancellation_token cancellationToken = {}) noexcept ;
// Event-loop methods
//
// I/O threads must call these to process I/O events and execute
// scheduled coroutines.
std:: uint64_t process_events ();
std:: uint64_t process_pending_events ();
std:: uint64_t process_one_event ();
std:: uint64_t process_one_pending_event ();
// Request that all threads processing events exit their event loops.
void stop () noexcept ;
// Query if some thread has called stop()
bool is_stop_requested () const noexcept ;
// Reset the event-loop after a call to stop() so that threads can
// start processing events again.
void reset ();
// Reference-counting methods for tracking outstanding references
// to the io_service.
//
// The io_service::stop() method will be called when the last work
// reference is decremented.
//
// Use the io_work_scope RAII class to manage calling these methods on
// entry-to and exit-from a scope.
void notify_work_started () noexcept ;
void notify_work_finished () noexcept ;
};
class io_service ::schedule_operation
{
public:
schedule_operation ( const schedule_operation&) noexcept ;
schedule_operation& operator =( const schedule_operation&) noexcept ;
bool await_ready () const noexcept ;
void await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () noexcept ;
};
class io_service ::timed_schedule_operation
{
public:
timed_schedule_operation (timed_schedule_operation&&) noexcept ;
timed_schedule_operation ( const timed_schedule_operation&) = delete ;
timed_schedule_operation& operator =( const timed_schedule_operation&) = delete ;
timed_schedule_operation& operator =(timed_schedule_operation&&) = delete ;
bool await_ready () const noexcept ;
void await_suspend (std::experimental::coroutine_handle<> awaiter);
void await_resume ();
};
class io_work_scope
{
public:
io_work_scope (io_service& ioService) noexcept ;
io_work_scope ( const io_work_scope& other) noexcept ;
io_work_scope (io_work_scope&& other) noexcept ;
~io_work_scope ();
io_work_scope& operator =( const io_work_scope& other) noexcept ;
io_work_scope& operator =(io_work_scope&& other) noexcept ;
io_service& service () const noexcept ;
};
}Ejemplo:
# include < cppcoro/task.hpp >
# include < cppcoro/task.hpp >
# include < cppcoro/io_service.hpp >
# include < cppcoro/read_only_file.hpp >
# include < experimental/filesystem >
# include < memory >
# include < algorithm >
# include < iostream >
namespace fs = std::experimental::filesystem;
cppcoro::task<std:: uint64_t > count_lines (cppcoro::io_service& ioService, fs::path path)
{
auto file = cppcoro::read_only_file::open (ioService, path);
constexpr size_t bufferSize = 4096 ;
auto buffer = std::make_unique<std:: uint8_t []>(bufferSize);
std:: uint64_t newlineCount = 0 ;
for (std:: uint64_t offset = 0 , fileSize = file. size (); offset < fileSize;)
{
const auto bytesToRead = static_cast < size_t >(
std::min<std:: uint64_t >(bufferSize, fileSize - offset));
const auto bytesRead = co_await file. read (offset, buffer. get (), bytesToRead);
newlineCount += std::count (buffer. get (), buffer. get () + bytesRead, ' n ' );
offset += bytesRead;
}
co_return newlineCount;
}
cppcoro::task<> run (cppcoro::io_service& ioService)
{
cppcoro::io_work_scope ioScope (ioService);
auto lineCount = co_await count_lines (ioService, fs::path{ " foo.txt " });
std::cout << " foo.txt has " << lineCount << " lines. " << std::endl;;
}
cppcoro::task<> process_events (cppcoro::io_service& ioService)
{
// Process events until the io_service is stopped.
// ie. when the last io_work_scope goes out of scope.
ioService. process_events ();
co_return ;
}
int main ()
{
cppcoro::io_service ioService;
cppcoro::sync_wait ( cppcoro::when_all_ready (
run (ioService),
process_events (ioService)));
return 0 ;
}io_service como programador Una clase io_service implementa las interfaces para los conceptos de Scheduler y DelayedScheduler .
Esto permite que una coroutina suspenda la ejecución en el hilo actual y se programe para su reanudación en un hilo de E/S asociado con un objeto io_service particular.
Ejemplo:
cppcoro::task<> do_something (cppcoro::io_service& ioService)
{
// Coroutine starts execution on the thread of the task awaiter.
// A coroutine can transfer execution to an I/O thread by awaiting the
// result of io_service::schedule().
co_await ioService. schedule ();
// At this point, the coroutine is now executing on an I/O thread
// inside a call to one of the io_service event processing methods.
// A coroutine can also perform a delayed-schedule that will suspend
// the coroutine for a specified duration of time before scheduling
// it for resumption on an I/O thread.
co_await ioService. schedule_after (100ms);
// At this point, the coroutine is executing on a potentially different I/O thread.
}file , readable_file , writable_fileEstos tipos son clases de base abstractas para realizar E/S de archivos concretos.
Resumen de la API:
namespace cppcoro
{
class file_read_operation ;
class file_write_operation ;
class file
{
public:
virtual ~file ();
std:: uint64_t size () const ;
protected:
file (file&& other) noexcept ;
};
class readable_file : public virtual file
{
public:
[[nodiscard]]
file_read_operation read (
std:: uint64_t offset,
void * buffer,
std:: size_t byteCount,
cancellation_token ct = {}) const noexcept ;
};
class writable_file : public virtual file
{
public:
void set_size (std:: uint64_t fileSize);
[[nodiscard]]
file_write_operation write (
std:: uint64_t offset,
const void * buffer,
std:: size_t byteCount,
cancellation_token ct = {}) noexcept ;
};
class file_read_operation
{
public:
file_read_operation (file_read_operation&& other) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter);
std:: size_t await_resume ();
};
class file_write_operation
{
public:
file_write_operation (file_write_operation&& other) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter);
std:: size_t await_resume ();
};
}read_only_file , write_only_file , read_write_fileEstos tipos representan clases de E/S de archivos concretos.
Resumen de la API:
namespace cppcoro
{
class read_only_file : public readable_file
{
public:
[[nodiscard]]
static read_only_file open (
io_service& ioService,
const std::experimental::filesystem::path& path,
file_share_mode shareMode = file_share_mode::read,
file_buffering_mode bufferingMode = file_buffering_mode::default_);
};
class write_only_file : public writable_file
{
public:
[[nodiscard]]
static write_only_file open (
io_service& ioService,
const std::experimental::filesystem::path& path,
file_open_mode openMode = file_open_mode::create_or_open,
file_share_mode shareMode = file_share_mode::none,
file_buffering_mode bufferingMode = file_buffering_mode::default_);
};
class read_write_file : public readable_file , public writable_file
{
public:
[[nodiscard]]
static read_write_file open (
io_service& ioService,
const std::experimental::filesystem::path& path,
file_open_mode openMode = file_open_mode::create_or_open,
file_share_mode shareMode = file_share_mode::none,
file_buffering_mode bufferingMode = file_buffering_mode::default_);
};
} Todas las funciones open() lanzan std::system_error en la falla.
Nota: Actualmente, las abstracciones de redes solo son compatibles en la plataforma Windows. El soporte de Linux llegará pronto.
socketLa clase Socket se puede usar para enviar/recibir datos a través de la red de forma asincrónica.
Actualmente solo admite TCP/IP, UDP/IP a través de IPv4 e IPv6.
Resumen de la API:
// <cppcoro/net/socket.hpp>
namespace cppcoro ::net
{
class socket
{
public:
static socket create_tcpv4 (ip_service& ioSvc);
static socket create_tcpv6 (ip_service& ioSvc);
static socket create_updv4 (ip_service& ioSvc);
static socket create_udpv6 (ip_service& ioSvc);
socket (socket&& other) noexcept ;
~socket ();
socket& operator =(socket&& other) noexcept ;
// Return the native socket handle for the socket
<platform-specific> native_handle () noexcept ;
const ip_endpoint& local_endpoint () const noexcept ;
const ip_endpoint& remote_endpoint () const noexcept ;
void bind ( const ip_endpoint& localEndPoint);
void listen ();
[[nodiscard]]
Awaitable< void > connect ( const ip_endpoint& remoteEndPoint) noexcept ;
[[nodiscard]]
Awaitable< void > connect ( const ip_endpoint& remoteEndPoint,
cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable< void > accept (socket& acceptingSocket) noexcept ;
[[nodiscard]]
Awaitable< void > accept (socket& acceptingSocket,
cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable< void > disconnect () noexcept ;
[[nodiscard]]
Awaitable< void > disconnect (cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > send ( const void * buffer, std:: size_t size) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > send ( const void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > recv ( void * buffer, std:: size_t size) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > recv ( void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
[[nodiscard]]
socket_recv_from_operation recv_from (
void * buffer,
std:: size_t size) noexcept ;
[[nodiscard]]
socket_recv_from_operation_cancellable recv_from (
void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
[[nodiscard]]
socket_send_to_operation send_to (
const ip_endpoint& destination,
const void * buffer,
std:: size_t size) noexcept ;
[[nodiscard]]
socket_send_to_operation_cancellable send_to (
const ip_endpoint& destination,
const void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
void close_send ();
void close_recv ();
};
}Ejemplo: echo servidor
# include < cppcoro/net/socket.hpp >
# include < cppcoro/io_service.hpp >
# include < cppcoro/cancellation_source.hpp >
# include < cppcoro/async_scope.hpp >
# include < cppcoro/on_scope_exit.hpp >
# include < memory >
# include < iostream >
cppcoro::task< void > handle_connection (socket s)
{
try
{
const size_t bufferSize = 16384 ;
auto buffer = std::make_unique< unsigned char []>(bufferSize);
size_t bytesRead;
do {
// Read some bytes
bytesRead = co_await s. recv (buffer. get (), bufferSize);
// Write some bytes
size_t bytesWritten = 0 ;
while (bytesWritten < bytesRead) {
bytesWritten += co_await s. send (
buffer. get () + bytesWritten,
bytesRead - bytesWritten);
}
} while (bytesRead != 0 );
s. close_send ();
co_await s. disconnect ();
}
catch (...)
{
std::cout << " connection failed " << std::
}
}
cppcoro::task< void > echo_server (
cppcoro::net::ipv4_endpoint endpoint,
cppcoro::io_service& ioSvc,
cancellation_token ct)
{
cppcoro::async_scope scope;
std::exception_ptr ex;
try
{
auto listeningSocket = cppcoro::net::socket::create_tcpv4 (ioSvc);
listeningSocket. bind (endpoint);
listeningSocket. listen ();
while ( true ) {
auto connection = cppcoro::net::socket::create_tcpv4 (ioSvc);
co_await listeningSocket. accept (connection, ct);
scope. spawn ( handle_connection ( std::move (connection)));
}
}
catch (cppcoro::operation_cancelled)
{
}
catch (...)
{
ex = std::current_exception ();
}
// Wait until all handle_connection tasks have finished.
co_await scope. join ();
if (ex) std::rethrow_exception (ex);
}
int main ( int argc, const char * argv[])
{
cppcoro::io_service ioSvc;
if (argc != 2 ) return - 1 ;
auto endpoint = cppcoro::ipv4_endpoint::from_string (argv[ 1 ]);
if (!endpoint) return - 1 ;
( void ) cppcoro::sync_wait ( cppcoro::when_all (
[&]() -> task<>
{
// Shutdown the event loop once finished.
auto stopOnExit = cppcoro::on_scope_exit ([&] { ioSvc. stop (); });
cppcoro::cancellation_source canceller;
co_await cppcoro::when_all (
[&]() -> task<>
{
// Run for 30s then stop accepting new connections.
co_await ioSvc. schedule_after ( std::chrono::seconds ( 30 ));
canceller. request_cancellation ();
}(),
echo_server (*endpoint, ioSvc, canceller. token ()));
}(),
[&]() -> task<>
{
ioSvc. process_events ();
}()));
return 0 ;
}ip_address , ipv4_address , ipv6_addressClases auxiliares para representar una dirección IP.
Sinopsis de API:
namespace cppcoro ::net
{
class ipv4_address
{
using bytes_t = std:: uint8_t [ 4 ];
public:
constexpr ipv4_address ();
explicit constexpr ipv4_address (std:: uint32_t integer);
explicit constexpr ipv4_address ( const std::uint8_t (&bytes)[4]);
explicit constexpr ipv4_address (std:: uint8_t b0,
std:: uint8_t b1,
std:: uint8_t b2,
std:: uint8_t b3);
constexpr const bytes_t & bytes () const ;
constexpr std:: uint32_t to_integer () const ;
static constexpr ipv4_address loopback ();
constexpr bool is_loopback () const ;
constexpr bool is_private_network () const ;
constexpr bool operator ==(ipv4_address other) const ;
constexpr bool operator !=(ipv4_address other) const ;
constexpr bool operator <(ipv4_address other) const ;
constexpr bool operator >(ipv4_address other) const ;
constexpr bool operator <=(ipv4_address other) const ;
constexpr bool operator >=(ipv4_address other) const ;
std::string to_string ();
static std::optional<ipv4_address> from_string (std::string_view string) noexcept ;
};
class ipv6_address
{
using bytes_t = std:: uint8_t [ 16 ];
public:
constexpr ipv6_address ();
explicit constexpr ipv6_address (
std:: uint64_t subnetPrefix,
std:: uint64_t interfaceIdentifier);
constexpr ipv6_address (
std:: uint16_t part0,
std:: uint16_t part1,
std:: uint16_t part2,
std:: uint16_t part3,
std:: uint16_t part4,
std:: uint16_t part5,
std:: uint16_t part6,
std:: uint16_t part7);
explicit constexpr ipv6_address (
const std::uint16_t (&parts)[8]);
explicit constexpr ipv6_address (
const std::uint8_t (bytes)[16]);
constexpr const bytes_t & bytes () const ;
constexpr std:: uint64_t subnet_prefix () const ;
constexpr std:: uint64_t interface_identifier () const ;
static constexpr ipv6_address unspecified ();
static constexpr ipv6_address loopback ();
static std::optional<ipv6_address> from_string (std::string_view string) noexcept ;
std::string to_string () const ;
constexpr bool operator ==( const ipv6_address& other) const ;
constexpr bool operator !=( const ipv6_address& other) const ;
constexpr bool operator <( const ipv6_address& other) const ;
constexpr bool operator >( const ipv6_address& other) const ;
constexpr bool operator <=( const ipv6_address& other) const ;
constexpr bool operator >=( const ipv6_address& other) const ;
};
class ip_address
{
public:
// Constructs to IPv4 address 0.0.0.0
ip_address () noexcept ;
ip_address (ipv4_address address) noexcept ;
ip_address (ipv6_address address) noexcept ;
bool is_ipv4 () const noexcept ;
bool is_ipv6 () const noexcept ;
const ipv4_address& to_ipv4 () const ;
const ipv6_address& to_ipv6 () const ;
const std:: uint8_t * bytes () const noexcept ;
std::string to_string () const ;
static std::optional<ip_address> from_string (std::string_view string) noexcept ;
bool operator ==( const ip_address& rhs) const noexcept ;
bool operator !=( const ip_address& rhs) const noexcept ;
// ipv4_address sorts less than ipv6_address
bool operator <( const ip_address& rhs) const noexcept ;
bool operator >( const ip_address& rhs) const noexcept ;
bool operator <=( const ip_address& rhs) const noexcept ;
bool operator >=( const ip_address& rhs) const noexcept ;
};
}ip_endpoint , ipv4_endpoint ipv6_endpointClases auxiliares para representar una dirección IP y un número de puerto.
Sinopsis de API:
namespace cppcoro ::net
{
class ipv4_endpoint
{
public:
ipv4_endpoint () noexcept ;
explicit ipv4_endpoint (ipv4_address address, std:: uint16_t port = 0 ) noexcept ;
const ipv4_address& address () const noexcept ;
std:: uint16_t port () const noexcept ;
std::string to_string () const ;
static std::optional<ipv4_endpoint> from_string (std::string_view string) noexcept ;
};
bool operator ==( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator !=( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator <( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator >( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator <=( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator >=( const ipv4_endpoint& a, const ipv4_endpoint& b);
class ipv6_endpoint
{
public:
ipv6_endpoint () noexcept ;
explicit ipv6_endpoint (ipv6_address address, std:: uint16_t port = 0 ) noexcept ;
const ipv6_address& address () const noexcept ;
std:: uint16_t port () const noexcept ;
std::string to_string () const ;
static std::optional<ipv6_endpoint> from_string (std::string_view string) noexcept ;
};
bool operator ==( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator !=( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator <( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator >( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator <=( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator >=( const ipv6_endpoint& a, const ipv6_endpoint& b);
class ip_endpoint
{
public:
// Constructs to IPv4 end-point 0.0.0.0:0
ip_endpoint () noexcept ;
ip_endpoint (ipv4_endpoint endpoint) noexcept ;
ip_endpoint (ipv6_endpoint endpoint) noexcept ;
bool is_ipv4 () const noexcept ;
bool is_ipv6 () const noexcept ;
const ipv4_endpoint& to_ipv4 () const ;
const ipv6_endpoint& to_ipv6 () const ;
ip_address address () const noexcept ;
std:: uint16_t port () const noexcept ;
std::string to_string () const ;
static std::optional<ip_endpoint> from_string (std::string_view string) noexcept ;
bool operator ==( const ip_endpoint& rhs) const noexcept ;
bool operator !=( const ip_endpoint& rhs) const noexcept ;
// ipv4_endpoint sorts less than ipv6_endpoint
bool operator <( const ip_endpoint& rhs) const noexcept ;
bool operator >( const ip_endpoint& rhs) const noexcept ;
bool operator <=( const ip_endpoint& rhs) const noexcept ;
bool operator >=( const ip_endpoint& rhs) const noexcept ;
};
}sync_wait() La función sync_wait() se puede usar para esperar sincrónicamente hasta que se complete el especificado awaitable .
El ATEATABLE especificado será co_await en el hilo actual dentro de una coroutina recién creada.
La llamada sync_wait() se bloqueará hasta que la operación se complete y devolverá el resultado de la expresión co_await o vuelva a solucionar la excepción si la expresión co_await se completó con una excepción no controlada.
La función sync_wait() es principalmente útil para iniciar una tarea de nivel superior desde main() y esperar hasta que termine la tarea, en la práctica es la única forma de iniciar la task de primer/nivel superior.
Resumen de la API:
// <cppcoro/sync_wait.hpp>
namespace cppcoro
{
template < typename AWAITABLE>
auto sync_wait (AWAITABLE&& awaitable)
-> typename awaitable_traits<AWAITABLE&&>::await_result_t;
}Ejemplos:
void example_task ()
{
auto makeTask = []() -> task<std::string>
{
co_return " foo " ;
};
auto task = makeTask ();
// start the lazy task and wait until it completes
sync_wait (task); // -> "foo"
sync_wait ( makeTask ()); // -> "foo"
}
void example_shared_task ()
{
auto makeTask = []() -> shared_task<std::string>
{
co_return " foo " ;
};
auto task = makeTask ();
// start the shared task and wait until it completes
sync_wait (task) == " foo " ;
sync_wait ( makeTask ()) == " foo " ;
}when_all_ready() La función when_all_ready() se puede usar para crear un nuevo esperable que se complete cuando se completen todas las entradas.
Las tareas de entrada pueden ser cualquier tipo de espera.
Cuando el devuelto esperable sea co_await ed, co_await cada uno de los de entrada espera, a su vez, el hilo de espera en el orden de que se pasan a la función when_all_ready() . Si estas tareas no se completan sincrónicamente, se ejecutarán simultáneamente.
Una vez que todas las expresiones co_await en la entrada, los esperanza de entrada se hayan ejecutado hasta la finalización, los devueltos esperados se completen y reanudarán la coroutina en espera. El Coroutine en espera se reanudará en el hilo de la entrada esperable que se complete por última vez.
Se garantiza que el esperable returado no arroje una excepción cuando co_await ed, incluso si algunos de los esperanzas de entrada fallan con una excepción no controlada.
Tenga en cuenta, sin embargo, que la llamada when_all_ready() en sí misma puede lanzar std::bad_alloc si no pudo asignar memoria para los marcos de coroutine requeridos para esperar cada uno de los esperanzas de entrada. También puede lanzar una excepción si alguno de los objetos esperados de entrada arrojan desde sus constructores de copia/movimiento.
El resultado de co_await el returado en espera es un std::tuple o std::vector de when_all_task<RESULT> objetos. Estos objetos le permiten obtener el resultado (o excepción) de cada entrada que se espera por separado llamando al método when_all_task<RESULT>::result() de la tarea de salida correspondiente. Esto permite que la persona que llama espera simultáneamente múltiples esperanza y sincronice su finalización, mientras que conserva la capacidad de inspeccionar posteriormente los resultados de cada una de las operaciones co_await para el éxito/falla.
Esto difiere de when_all() donde la falla de cualquier operación individual co_await hace que la operación general falle con una excepción. Esto significa que no puede determinar cuál de las operaciones co_await de componentes falló y también evita que obtenga los resultados de las otras operaciones co_await .
Resumen de la API:
// <cppcoro/when_all_ready.hpp>
namespace cppcoro
{
// Concurrently await multiple awaitables.
//
// Returns an awaitable object that, when co_await'ed, will co_await each of the input
// awaitable objects and will resume the awaiting coroutine only when all of the
// component co_await operations complete.
//
// Result of co_await'ing the returned awaitable is a std::tuple of detail::when_all_task<T>,
// one for each input awaitable and where T is the result-type of the co_await expression
// on the corresponding awaitable.
//
// AWAITABLES must be awaitable types and must be movable (if passed as rvalue) or copyable
// (if passed as lvalue). The co_await expression will be executed on an rvalue of the
// copied awaitable.
template < typename ... AWAITABLES>
auto when_all_ready (AWAITABLES&&... awaitables)
-> Awaitable<std::tuple<detail::when_all_task<typename awaitable_traits<AWAITABLES>::await_result_t>...>>;
// Concurrently await each awaitable in a vector of input awaitables.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t >
auto when_all_ready (std::vector<AWAITABLE> awaitables)
-> Awaitable<std::vector<detail::when_all_task<RESULT>>>;
}Ejemplo de uso:
task<std::string> get_record ( int id);
task<> example1 ()
{
// Run 3 get_record() operations concurrently and wait until they're all ready.
// Returns a std::tuple of tasks that can be unpacked using structured bindings.
auto [task1, task2, task3] = co_await when_all_ready (
get_record ( 123 ),
get_record ( 456 ),
get_record ( 789 ));
// Unpack the result of each task
std::string& record1 = task1. result ();
std::string& record2 = task2. result ();
std::string& record3 = task3. result ();
// Use records....
}
task<> example2 ()
{
// Create the input tasks. They don't start executing yet.
std::vector<task<std::string>> tasks;
for ( int i = 0 ; i < 1000 ; ++i)
{
tasks. emplace_back ( get_record (i));
}
// Execute all tasks concurrently.
std::vector<detail::when_all_task<std::string>> resultTasks =
co_await when_all_ready ( std::move (tasks));
// Unpack and handle each result individually once they're all complete.
for ( int i = 0 ; i < 1000 ; ++i)
{
try
{
std::string& record = tasks[i]. result ();
std::cout << i << " = " << record << std::endl;
}
catch ( const std:: exception & ex)
{
std::cout << i << " : " << ex. what () << std::endl;
}
}
}when_all() La función when_all() se puede utilizar para crear un nuevo esperable que cuando co_await ed co_await cada uno de los esperanza de entrada al mismo tiempo y devuelva un agregado de sus resultados individuales.
Cuando se espera el returado en espera, co_await cada uno de los de entrada espera en el hilo actual. Una vez que se suspenda el primero en espera, se iniciará la segunda tarea, y así sucesivamente. Las operaciones se ejecutan simultáneamente hasta que todas se han completado.
Una vez que todas las operaciones co_await de componentes se han ejecutado hasta su finalización, se construye un agregado de los resultados a partir de cada resultado individual. Si una excepción es lanzada por cualquiera de las tareas de entrada o si la construcción del resultado agregado presenta una excepción, la excepción se propagará fuera del co_await del returado esperado.
Si las operaciones co_await múltiples fallan con una excepción, una de las excepciones se propagará de la expresión de co_await when_all() Las otras excepciones serán ignoradas en silencio. No se especifica qué excepción de la operación se eligirá.
Si es importante saber qué componente falló la operación co_await o retener la capacidad de obtener resultados de otras operaciones, incluso si algunas de ellas fallan, entonces debe usar when_all_ready() en su lugar.
Resumen de la API:
// <cppcoro/when_all.hpp>
namespace cppcoro
{
// Variadic version.
//
// Note that if the result of `co_await awaitable` yields a void-type
// for some awaitables then the corresponding component for that awaitable
// in the tuple will be an empty struct of type detail::void_value.
template < typename ... AWAITABLES>
auto when_all (AWAITABLES&&... awaitables)
-> Awaitable<std::tuple<typename awaitable_traits<AWAITABLES>::await_result_t...>>;
// Overload for vector<Awaitable<void>>.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t ,
std:: enable_if_t <std::is_void_v<RESULT>, int > = 0 >
auto when_all (std::vector<AWAITABLE> awaitables)
-> Awaitable<void>;
// Overload for vector<Awaitable<NonVoid>> that yield a value when awaited.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t ,
std:: enable_if_t <!std::is_void_v<RESULT>, int > = 0 >
auto when_all (std::vector<AWAITABLE> awaitables)
-> Awaitable<std::vector<std::conditional_t<
std::is_lvalue_reference_v<RESULT>,
std::reference_wrapper<std::remove_reference_t<RESULT>>,
std::remove_reference_t<RESULT>>>>;
}Ejemplos:
task<A> get_a ();
task<B> get_b ();
task<> example1 ()
{
// Run get_a() and get_b() concurrently.
// Task yields a std::tuple<A, B> which can be unpacked using structured bindings.
auto [a, b] = co_await when_all ( get_a (), get_b ());
// use a, b
}
task<std::string> get_record ( int id);
task<> example2 ()
{
std::vector<task<std::string>> tasks;
for ( int i = 0 ; i < 1000 ; ++i)
{
tasks. emplace_back ( get_record (i));
}
// Concurrently execute all get_record() tasks.
// If any of them fail with an exception then the exception will propagate
// out of the co_await expression once they have all completed.
std::vector<std::string> records = co_await when_all ( std::move (tasks));
// Process results
for ( int i = 0 ; i < 1000 ; ++i)
{
std::cout << i << " = " << records[i] << std::endl;
}
}fmap() La función fmap() se puede utilizar para aplicar una función invocable al valor (s) contenido dentro de un tipo de contenedor, devolviendo un nuevo tipo de contenedor de los resultados de aplicar la función de los valores contenidos.
La función fmap() puede aplicar una función a los valores del generator<T> , recursive_generator<T> y async_generator<T> , así como cualquier valor que admita el concepto Awaitable (por ejemplo, task<T> ).
Cada uno de estos tipos proporciona una sobrecarga para fmap() que toma dos argumentos; una función para aplicar y el valor del contenedor. Consulte la documentación para cada tipo para las sobrecargas fmap() compatibles.
Por ejemplo, la función fmap() se puede usar para aplicar una función al resultado eventual de una task<T> , produciendo una nueva task<U> que se completará con el valor de retorno de la función.
// Given a function you want to apply that converts
// a value of type A to value of type B.
B a_to_b (A value);
// And a task that yields a value of type A
cppcoro::task<A> get_an_a ();
// We can apply the function to the result of the task using fmap()
// and obtain a new task yielding the result.
cppcoro::task<B> bTask = fmap(a_to_b, get_an_a());
// An alternative syntax is to use the pipe notation.
cppcoro::task<B> bTask = get_an_a() | cppcoro::fmap(a_to_b);Resumen de la API:
// <cppcoro/fmap.hpp>
namespace cppcoro
{
template < typename FUNC>
struct fmap_transform
{
fmap_transform (FUNC&& func) noexcept (std::is_nothrow_move_constructible_v<FUNC>);
FUNC func;
};
// Type-deducing constructor for fmap_transform object that can be used
// in conjunction with operator|.
template < typename FUNC>
fmap_transform<FUNC> fmap (FUNC&& func);
// operator| overloads for providing pipe-based syntactic sugar for fmap()
// such that the expression:
// <value-expr> | cppcoro::fmap(<func-expr>)
// is equivalent to:
// fmap(<func-expr>, <value-expr>)
template < typename T, typename FUNC>
decltype ( auto ) operator |(T&& value, fmap_transform<FUNC>&& transform);
template < typename T, typename FUNC>
decltype ( auto ) operator |(T&& value, fmap_transform<FUNC>& transform);
template < typename T, typename FUNC>
decltype ( auto ) operator |(T&& value, const fmap_transform<FUNC>& transform);
// Generic overload for all awaitable types.
//
// Returns an awaitable that when co_awaited, co_awaits the specified awaitable
// and applies the specified func to the result of the 'co_await awaitable'
// expression as if by 'std::invoke(func, co_await awaitable)'.
//
// If the type of 'co_await awaitable' expression is 'void' then co_awaiting the
// returned awaitable is equivalent to 'co_await awaitable, func()'.
template <
typename FUNC,
typename AWAITABLE,
std:: enable_if_t <is_awaitable_v<AWAITABLE>, int > = 0 >
auto fmap (FUNC&& func, AWAITABLE&& awaitable)
-> Awaitable<std::invoke_result_t<FUNC, typename awaitable_traits<AWAITABLE>::await_result_t>>;
} La función fmap() está diseñada para buscar la sobrecarga correcta por búsqueda dependiente del argumento (ADL), por lo que generalmente debe llamarse sin el cppcoro:: Prefix.
resume_on() La función resume_on() se puede utilizar para controlar el contexto de ejecución que un esperable reanudará la coroutina en espera cuando se espera. Cuando se aplica a un async_generator , controla qué contexto de ejecución el co_await g.begin() y las operaciones co_await ++it reanudan las coroutinas en espera.
Normalmente, la coroutina en espera de una espera (por ejemplo, una task ) o async_generator reanudará la ejecución en cualquier subproceso en el que se haya completado la operación. En algunos casos, este puede no ser el hilo en el que desea continuar ejecutando. En estos casos, puede usar la función resume_on() para crear un nuevo esperable o generador que reanudará la ejecución en un hilo asociado con un programador especificado.
La función resume_on() se puede usar como una función normal que devuelve un nuevo agente/generador. O se puede usar en una tubería sintaxa.
Ejemplo:
task<record> load_record ( int id);
ui_thread_scheduler uiThreadScheduler;
task<> example ()
{
// This will start load_record() on the current thread.
// Then when load_record() completes (probably on an I/O thread)
// it will reschedule execution onto thread pool and call to_json
// Once to_json completes it will transfer execution onto the
// ui thread before resuming this coroutine and returning the json text.
task<std::string> jsonTask =
load_record ( 123 )
| cppcoro::resume_on ( threadpool::default ())
| cppcoro::fmap (to_json)
| cppcoro::resume_on (uiThreadScheduler);
// At this point, all we've done is create a pipeline of tasks.
// The tasks haven't started executing yet.
// Await the result. Starts the pipeline of tasks.
std::string jsonText = co_await jsonTask;
// Guaranteed to be executing on ui thread here.
someUiControl. set_text (jsonText);
}Resumen de la API:
// <cppcoro/resume_on.hpp>
namespace cppcoro
{
template < typename SCHEDULER, typename AWAITABLE>
auto resume_on (SCHEDULER& scheduler, AWAITABLE awaitable)
-> Awaitable<typename awaitable_traits<AWAITABLE>::await_traits_t>;
template < typename SCHEDULER, typename T>
async_generator<T> resume_on (SCHEDULER& scheduler, async_generator<T> source);
template < typename SCHEDULER>
struct resume_on_transform
{
explicit resume_on_transform (SCHEDULER& scheduler) noexcept ;
SCHEDULER& scheduler;
};
// Construct a transform/operation that can be applied to a source object
// using "pipe" notation (ie. operator|).
template < typename SCHEDULER>
resume_on_transform<SCHEDULER> resume_on (SCHEDULER& scheduler) noexcept ;
// Equivalent to 'resume_on(transform.scheduler, std::forward<T>(value))'
template < typename T, typename SCHEDULER>
decltype ( auto ) operator |(T&& value, resume_on_transform<SCHEDULER> transform)
{
return resume_on (transform. scheduler , std::forward<T>(value));
}
}schedule_on() La función schedule_on() se puede usar para cambiar el contexto de ejecución en el que un dado esperable o async_generator comienza a ejecutarse.
Cuando se aplica a un async_generator , también afecta en qué contexto de ejecución se reanuda después de la instrucción co_yield .
Tenga en cuenta que la transformación schedule_on no especifica el hilo que el esperable o async_generator completará o producirá resultados, que depende de la implementación del esperable o generador.
Consulte el operador resume_on() para obtener una transformación que controla el subproceso en la que se completa la operación.
Por ejemplo:
task< int > get_value ();
io_service ioSvc;
task<> example ()
{
// Starts executing get_value() on the current thread.
int a = co_await get_value ();
// Starts executing get_value() on a thread associated with ioSvc.
int b = co_await schedule_on (ioSvc, get_value ());
}Resumen de la API:
// <cppcoro/schedule_on.hpp>
namespace cppcoro
{
// Return a task that yields the same result as 't' but that
// ensures that 't' is co_await'ed on a thread associated with
// the specified scheduler. Resulting task will complete on
// whatever thread 't' would normally complete on.
template < typename SCHEDULER, typename AWAITABLE>
auto schedule_on (SCHEDULER& scheduler, AWAITABLE awaitable)
-> Awaitable<typename awaitable_traits<AWAITABLE>::await_result_t>;
// Return a generator that yields the same sequence of results as
// 'source' but that ensures that execution of the coroutine starts
// execution on a thread associated with 'scheduler' and resumes
// after a 'co_yield' on a thread associated with 'scheduler'.
template < typename SCHEDULER, typename T>
async_generator<T> schedule_on (SCHEDULER& scheduler, async_generator<T> source);
template < typename SCHEDULER>
struct schedule_on_transform
{
explicit schedule_on_transform (SCHEDULER& scheduler) noexcept ;
SCHEDULER& scheduler;
};
template < typename SCHEDULER>
schedule_on_transform<SCHEDULER> schedule_on (SCHEDULER& scheduler) noexcept ;
template < typename T, typename SCHEDULER>
decltype ( auto ) operator |(T&& value, schedule_on_transform<SCHEDULER> transform);
}awaitable_traits<T> Esta metafunción de plantilla se puede usar para determinar cuál será el tipo resultante de una expresión de co_await si se aplica a una expresión del tipo T .
Tenga en cuenta que esto supone que el valor del tipo T se está esperando en un contexto en el que no se ve afectado por ningún tipo await_transform aplicado por el objeto prometedor del Coroutine. Los resultados pueden diferir si se espera un valor de tipo T en dicho contexto.
La metafunción de la plantilla awaitable_traits<T> no define el awaiter_t o await_result_t anidados typedefs si el tipo, T , no se puede esperar. Esto permite su uso en los contextos SFINAE que deshabilita las sobrecargas cuando no T puede esperar.
Resumen de la API:
// <cppcoro/awaitable_traits.hpp>
namespace cppcoro
{
template < typename T>
struct awaitable_traits
{
// The type that results from applying `operator co_await()` to a value
// of type T, if T supports an `operator co_await()`, otherwise is type `T&&`.
typename awaiter_t = <unspecified>;
// The type of the result of co_await'ing a value of type T.
typename await_result_t = <unspecified>;
};
}is_awaitable<T> La metafunción is_awaitable<T> le permite consultar si un tipo dado puede ser co_await ed o no desde una coroutina.
Resumen de la API:
// <cppcoro/is_awaitable.hpp>
namespace cppcoro
{
template < typename T>
struct is_awaitable : std::bool_constant<...>
{};
template < typename T>
constexpr bool is_awaitable_v = is_awaitable<T>::value;
}Awaitable<T> Un Awaitable<T> es un concepto que indica que un tipo puede ser co_await en un contexto de coroutina que no tiene sobrecargas await_transform y que el resultado de la expresión co_await tiene tipo, T .
Por ejemplo, la task<T> implementa el concepto Awaitable<T&&> mientras que la task<T>& implementa el concepto Awaitable<T&> .
Awaiter<T> concepto Un Awaiter<T> es un concepto que indica un tipo contiene los métodos await_ready , await_suspend y await_resume requeridos para implementar el protocolo para suspender/reanudar una coroutina en espera.
Un tipo que satisface Awaiter<T> debe tener, por un caso del tipo, awaiter :
awaiter.await_ready() -> boolawaiter.await_suspend(std::experimental::coroutine_handle<void>{}) -> void o bool o std::experimental::coroutine_handle<P> Para algunos P .awaiter.await_resume() -> T Cualquier tipo que implementa el concepto Awaiter<T> también implementa el concepto Awaitable<T> .
Scheduler Un Scheduler es un concepto que permite programar la ejecución de las coroutinas dentro de algún contexto de ejecución.
concept Scheduler
{
Awaitable< void > schedule ();
} Dado un tipo, S , que implementa el concepto Scheduler , y una instancia, s , del tipo S :
s.schedule() devuelve un tipo esperable de tal manera que co_await s.schedule() suspenderá incondicionalmente la coroutina actual y la programará para su reanudación en el contexto de ejecución asociado con el programador, s .co_await s.schedule() tiene tipo void . cppcoro::task<> f (Scheduler& scheduler)
{
// Execution of the coroutine is initially on the caller's execution context.
// Suspends execution of the coroutine and schedules it for resumption on
// the scheduler's execution context.
co_await scheduler. schedule ();
// At this point the coroutine is now executing on the scheduler's
// execution context.
}DelayedScheduler Un DelayedScheduler es un concepto que permite que una coroutina se programe para su ejecución en el contexto de ejecución del planificador después de que haya transcurrido una duración específica del tiempo.
concept DelayedScheduler : Scheduler
{
template < typename REP, typename RATIO>
Awaitable< void > schedule_after (std::chrono::duration<REP, RATIO> delay);
template < typename REP, typename RATIO>
Awaitable< void > schedule_after (
std::chrono::duration<REP, RATIO> delay,
cppcoro::cancellation_token cancellationToken);
} Dado un tipo, S , que implementa el DelayedScheduler y una instancia, s de tipo S :
s.schedule_after(delay) devuelve un objeto que se puede esperar de manera que co_await s.schedule_after(delay) suspenda la coroutina actual durante una duración de delay antes de programar la coroutina para la reanudación en el contexto de ejecución asociado con el programador, s .co_await s.schedule_after(delay) tiene tipo void .La biblioteca CPPCORO admite la construcción en Windows con Visual Studio 2017 y Linux con Clang 5.0+.
Esta biblioteca utiliza el sistema de construcción de pasteles (no, no el C# uno).
El sistema de compilación de pasteles se revisa automáticamente como un submódulo GIT para que no necesite descargarlo o instalarlo por separado.
Esta biblioteca actualmente requiere Visual Studio 2017 o posterior y el SDK de Windows 10.
Se planifica el soporte para Clang (#3) y Linux (#15).
El sistema de compilación de pasteles se implementa en Python y requiere que se instale Python 2.7.
Asegúrese de que el intérprete Python 2.7 esté en su camino y esté disponible como 'Python'.
Asegúrese de que se instale la actualización 3 o posterior de Visual Studio 2017. Tenga en cuenta que hay algunos problemas conocidos con las coroutinas en la actualización 2 o anterior que se han solucionado en la actualización 3.
También puede usar una versión experimental del compilador de Visual Studio descargando un paquete Nuget de https://vcppdogfooding.azurewebsites.net/ y desabrochando el archivo .nuget a un directorio. Simplemente actualice el archivo config.cake para apuntar en la ubicación desabrochable modificando y sin comodidad la siguiente línea:
nugetPath = None # r'C:PathToVisualCppTools.14.0.25224-Pre'Asegúrese de tener el SDK de Windows 10 instalado. Utilizará la última versión de Windows 10 SDK y Universal C Runtime de forma predeterminada.
El repositorio de CPPCORO utiliza submódulos GIT para atraer la fuente para el sistema de compilación de pasteles.
Esto significa que debe pasar la bandera --recursive al comando git clone . p.ej.
c:Code> git clone --recursive https://github.com/lewissbaker/cppcoro.git
Si ya ha clonado cppcoro, debe actualizar los submódulos después de extraer cambios.
c:Codecppcoro> git submodule update --init --recursive
Para construir desde la línea de comandos, simplemente ejecute 'Cake.Bat' en la raíz del espacio de trabajo.
p.ej.
C:cppcoro> cake.bat
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Compiling testmain.cpp
Compiling testmain.cpp
Compiling testmain.cpp
Compiling testmain.cpp
...
Linking buildwindows_x86_msvc14.10_debugtestrun.exe
Linking buildwindows_x64_msvc14.10_optimisedtestrun.exe
Linking buildwindows_x86_msvc14.10_optimisedtestrun.exe
Linking buildwindows_x64_msvc14.10_debugtestrun.exe
Generating code
Finished generating code
Generating code
Finished generating code
Build succeeded.
Build took 0:00:02.419.
Por defecto, ejecutar cake sin argumentos construirá todos los proyectos con todas las variantes de compilación y ejecutará las pruebas unitarias. Puede reducir lo que se construye al pasar argumentos de línea de comandos adicionales. p.ej.
c:cppcoro> cake.bat release=debug architecture=x64 lib/build.cake
Building with C:UsersLewisCodecppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Archiving buildwindows_x64_msvc14.10_debuglibcppcoro.lib
Build succeeded.
Build took 0:00:00.321.
Puede ejecutar cake --help para enumerar las opciones de línea de comandos disponibles.
Para desarrollar desde Visual Studio, puede construir archivos .vcproj/.sln ejecutando cake.bat -p .
p.ej.
c:cppcoro> cake.bat -p
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Generating Solution build/project/cppcoro.sln
Generating Project build/project/cppcoro_tests.vcxproj
Generating Filters build/project/cppcoro_tests.vcxproj.filters
Generating Project build/project/cppcoro.vcxproj
Generating Filters build/project/cppcoro.vcxproj.filters
Build succeeded.
Build took 0:00:00.247.
Cuando construya estos proyectos desde Visual Studio, llamará a Cake para realizar la compilación.
El proyecto CPPCORO también se puede construir en Linux usando Clang+ LibC ++ 5.0 o posterior.
El edificio CPPCORO ha sido probado bajo Ubuntu 17.04.
Asegúrese de tener los siguientes paquetes instalados:
Esto supone que tiene Clang y LibC ++ construidos e instalados.
Si aún no tiene CLANG configurado, consulte las siguientes secciones para obtener detalles sobre la configuración de Clang para construir con CPPCORO.
Vea el cppcoro y sus submódulos:
git clone --recursive https://github.com/lewissbaker/cppcoro.git cppcoro
Ejecute init.sh para configurar la función de tarta de cake :
cd cppcoro
source init.sh
Luego puede ejecutar cake desde la raíz del espacio de trabajo para construir cppcoro y ejecutar pruebas:
$ cake
Puede especificar argumentos de línea de comandos adicionales para personalizar la compilación:
--help imprimirá ayuda para los argumentos de línea de comandos--debug=run mostrará las líneas de comandos de compilación que se ejecutanrelease=debug o release=optimised limitará la variante de compilación a depurar u optimizar (por defecto, construirá ambos).lib/build.cake simplemente construirá la biblioteca CPPCORO y no las pruebas.test/build.cake@task_tests.cpp simplemente compilará un archivo fuente particulartest/build.cake@testresult construirá y ejecutará las pruebasPor ejemplo:
$ cake --debug=run release=debug lib/build.cake
Si su compilador Clang no está ubicado AT /usr/bin/clang , puede especificar una ubicación alternativa utilizando una o más de las siguientes opciones de línea de comandos para cake :
--clang-executable=<name> -Especifique el nombre ejecutable de Clang para usar en lugar de clang . p.ej. Para forzar el uso de Clang 8.0 Pass --clang-executable=clang-8--clang-executable=<abspath> -Especifique la ruta completa al ejecutable de Clang. El sistema de compilación también buscará otros ejecutables en el mismo directorio. Si esta ruta tiene el formulario <prefix>/bin/<name> entonces esto también establecerá el PREFIX PREFACIO DE CLANG-INSTALL-PREFIX en <prefix> .--clang-install-prefix=<path> -Especifique la ruta donde se ha instalado ruido. Esto hará que el sistema de compilación busque el rango en <path>/bin (a menos que se anule por --clang-executable ).--libcxx-install-prefix=<path> -Especifique la ruta donde se ha instalado libc ++. Por defecto, el sistema de compilación buscará libc ++ en la misma ubicación que el clang. Use esta opción de línea de comandos si está instalada en una ubicación diferente.Ejemplo: use una versión específica de Clang instalada en la ubicación predeterminada
$ cake --clang-executable=clang-8
Ejemplo: use la versión predeterminada de Clang desde una ubicación personalizada
$ cake --clang-install-prefix=/path/to/clang-install
Ejemplo: use una versión específica de Clang, en una ubicación personalizada, con libc ++ desde una ubicación diferente
$ cake --clang-executable=/path/to/clang-install/bin/clang-8 --libcxx-install-prefix=/path/to/libcxx-install
If your Linux distribution does not have a version of Clang 5.0 or later available, you can install a snapshot build from the LLVM project.
Follow instructions at http://apt.llvm.org/ to setup your package manager to support pulling from the LLVM package manager.
For example, for Ubuntu 17.04 Zesty:
Edit /etc/apt/sources.list and add the following lines:
deb http://apt.llvm.org/zesty/ llvm-toolchain-zesty main
deb-src http://apt.llvm.org/zesty/ llvm-toolchain-zesty main
Install the PGP key for those packages:
$ wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | sudo apt-key add -
Install Clang and LLD:
$ sudo apt-get install clang-6.0 lld-6.0
The LLVM snapshot builds do not include libc++ versions so you'll need to build that yourself. Vea abajo.
You can also use the bleeding-edge Clang version by building Clang from source yourself.
See instructions here:
To do this you will need to install the following pre-requisites:
$ sudo apt-get install git cmake ninja-build clang lld
Note that we are using your distribution's version of clang to build clang from source. GCC could also be used here instead.
Checkout LLVM + Clang + LLD + libc++ repositories:
mkdir llvm
cd llvm
git clone --depth=1 https://github.com/llvm-mirror/llvm.git llvm
git clone --depth=1 https://github.com/llvm-mirror/clang.git llvm/tools/clang
git clone --depth=1 https://github.com/llvm-mirror/lld.git llvm/tools/lld
git clone --depth=1 https://github.com/llvm-mirror/libcxx.git llvm/projects/libcxx
ln -s llvm/tools/clang clang
ln -s llvm/tools/lld lld
ln -s llvm/projects/libcxx libcxx
Configure and build Clang:
mkdir clang-build
cd clang-build
cmake -GNinja
-DCMAKE_CXX_COMPILER=/usr/bin/clang++
-DCMAKE_C_COMPILER=/usr/bin/clang
-DCMAKE_BUILD_TYPE=MinSizeRel
-DCMAKE_INSTALL_PREFIX="/path/to/clang/install"
-DCMAKE_BUILD_WITH_INSTALL_RPATH="yes"
-DLLVM_TARGETS_TO_BUILD=X86
-DLLVM_ENABLE_PROJECTS="lld;clang"
../llvm
ninja install-clang
install-clang-headers
install-llvm-ar
install-lld
The cppcoro project requires libc++ as it contains the <experimental/coroutine> header required to use C++ coroutines under Clang.
Checkout libc++ + llvm :
mkdir llvm
cd llvm
git clone --depth=1 https://github.com/llvm-mirror/llvm.git llvm
git clone --depth=1 https://github.com/llvm-mirror/libcxx.git llvm/projects/libcxx
ln -s llvm/projects/libcxx libcxx
Build libc++ :
mkdir libcxx-build
cd libcxx-build
cmake -GNinja
-DCMAKE_CXX_COMPILER="/path/to/clang/install/bin/clang++"
-DCMAKE_C_COMPILER="/path/to/clang/install/bin/clang"
-DCMAKE_BUILD_TYPE=Release
-DCMAKE_INSTALL_PREFIX="/path/to/clang/install"
-DLLVM_PATH="../llvm"
-DLIBCXX_CXX_ABI=libstdc++
-DLIBCXX_CXX_ABI_INCLUDE_PATHS="/usr/include/c++/6.3.0/;/usr/include/x86_64-linux-gnu/c++/6.3.0/"
../libcxx
ninja cxx
ninja install
This will build and install libc++ into the same install directory where you have clang installed.
The cppcoro port in vcpkg is kept up to date by Microsoft team members and community contributors. The url of vcpkg is: https://github.com/Microsoft/vcpkg . You can download and install cppcoro using the vcpkg dependency manager:
git clone https://github.com/Microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh # ./bootstrap-vcpkg.bat for Windows
./vcpkg integrate install
./vcpkg install cppcoroSi la versión está desactualizada, cree un problema o extraiga la solicitud en el repositorio de VCPKG.
GitHub issues are the primary mechanism for support, bug reports and feature requests.
Contributions are welcome and pull-requests will be happily reviewed. I only ask that you agree to license any contributions that you make under the MIT license.
If you have general questions about C++ coroutines, you can generally find someone to help in the #coroutines channel on Cpplang Slack group.