La bibliothèque «CPPCORO» fournit un large ensemble de primitives à usage général pour utiliser la proposition Coroutines TS décrite dans N4680.
Ceux-ci incluent:
task<T>shared_task<T>generator<T>recursive_generator<T>async_generator<T>single_consumer_eventsingle_consumer_async_auto_reset_eventasync_mutexasync_manual_reset_eventasync_auto_reset_eventasync_latchsequence_barriermulti_producer_sequencersingle_producer_sequencersync_wait()when_all()when_all_ready()fmap()schedule_on()resume_on()cancellation_tokencancellation_sourcecancellation_registrationstatic_thread_poolio_service et io_work_scopefile , readable_file , writable_fileread_only_file , write_only_file , read_write_filesocketip_address , ipv4_address , ipv6_addressip_endpoint , ipv4_endpoint , ipv6_endpointis_awaitable<T>awaitable_traits<T>Awaitable<T>Awaiter<T>SchedulerDelayedSchedulerCette bibliothèque est une bibliothèque expérimentale qui explore l'espace des abstractions de programmation asynchrones à haute performance et évolutives qui peuvent être construites sur la proposition C ++ Coroutines.
Il a été open source dans l'espoir que d'autres le trouveront utile et que la communauté C ++ peut fournir des commentaires à ce sujet et des moyens de l'améliorer.
Il nécessite un compilateur qui prend en charge les coroutines TS:
La version Linux est fonctionnelle, à l'exception des classes liées io_context et de fichiers qui n'ont pas encore été implémentées pour Linux (voir le numéro 15 pour plus d'informations).
task<T>Une tâche représente un calcul asynchrone qui est exécuté paresseusement en ce que l'exécution de la coroutine ne commence pas tant que la tâche est attendue.
Exemple:
# include < cppcoro/read_only_file.hpp >
# include < cppcoro/task.hpp >
cppcoro::task< int > count_lines (std::string path)
{
auto file = co_await cppcoro::read_only_file::open (path);
int lineCount = 0 ;
char buffer[ 1024 ];
size_t bytesRead;
std:: uint64_t offset = 0 ;
do
{
bytesRead = co_await file. read (offset, buffer, sizeof (buffer));
lineCount += std::count (buffer, buffer + bytesRead, ' n ' );
offset += bytesRead;
} while (bytesRead > 0 );
co_return lineCount;
}
cppcoro::task<> usage_example ()
{
// Calling function creates a new task but doesn't start
// executing the coroutine yet.
cppcoro::task< int > countTask = count_lines ( " foo.txt " );
// ...
// Coroutine is only started when we later co_await the task.
int lineCount = co_await countTask;
std::cout << " line count = " << lineCount << std::endl;
}Présentation de l'API:
// <cppcoro/task.hpp>
namespace cppcoro
{
template < typename T>
class task
{
public:
using promise_type = <unspecified>;
using value_type = T;
task () noexcept ;
task (task&& other) noexcept ;
task& operator =(task&& other);
// task is a move-only type.
task ( const task& other) = delete ;
task& operator =( const task& other) = delete ;
// Query if the task result is ready.
bool is_ready () const noexcept ;
// Wait for the task to complete and return the result or rethrow the
// exception if the operation completed with an unhandled exception.
//
// If the task is not yet ready then the awaiting coroutine will be
// suspended until the task completes. If the the task is_ready() then
// this operation will return the result synchronously without suspending.
Awaiter<T&> operator co_await () const & noexcept ;
Awaiter<T&&> operator co_await () const && noexcept ;
// Returns an awaitable that can be co_await'ed to suspend the current
// coroutine until the task completes.
//
// The 'co_await t.when_ready()' expression differs from 'co_await t' in
// that when_ready() only performs synchronization, it does not return
// the result or rethrow the exception.
//
// This can be useful if you want to synchronize with the task without
// the possibility of it throwing an exception.
Awaitable< void > when_ready () const noexcept ;
};
template < typename T>
void swap (task<T>& a, task<T>& b);
// Creates a task that yields the result of co_await'ing the specified awaitable.
//
// This can be used as a form of type-erasure of the concrete awaitable, allowing
// different awaitables that return the same await-result type to be stored in
// the same task<RESULT> type.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t >
task<RESULT> make_task (AWAITABLE awaitable);
} Vous pouvez créer un objet task<T> en appelant une fonction Coroutine qui renvoie une task<T> .
La coroutine doit contenir une utilisation de co_await ou co_return . Notez qu'une task<T> Coroutine peut ne pas utiliser le mot-clé co_yield .
Lorsqu'une coroutine qui renvoie une task<T> est appelée, une trame coroutine est allouée si nécessaire et les paramètres sont capturés dans le cadre de la coroutine. La coroutine est suspendue au début du corps de Coroutine et l'exécution est renvoyée à l'appelant et une valeur task<T> qui représente le calcul asynchrone est renvoyée de l'appel de fonction.
Le corps Coroutine commencera à s'exécuter lorsque la valeur task<T> est co_await ed. Cela suspendra la coroutine en attente et commencera l'exécution de la coroutine associée à la valeur task<T> attendue.
La coroutine en attente sera plus tard repris sur le fil qui termine l'exécution de la coroutine de la task<T> attendue. c'est-à-dire Le thread qui exécute le co_return ou qui lance une exception non gérée qui termine l'exécution de la coroutine.
Si la tâche s'est déjà déroulée jusqu'à l'attente, l'attendre obtiendra à nouveau le résultat déjà composé sans suspendre la coroutine en attente.
Si l'objet task est détruit avant qu'il ne soit attendu, la coroutine ne s'exécute jamais et le destructeur détruit simplement les paramètres capturés et libère toute mémoire utilisée par le cadre coroutine.
shared_task<T> La classe shared_task<T> est un type de coroutine qui donne une seule valeur de manière asynchrone.
Il est «paresseux» dans cette exécution de la tâche ne démarre pas avant qu'il ne soit attendu par une coroutine.
Il est «partagé» en ce que la valeur de la tâche peut être copiée, permettant de créer plusieurs références au résultat de la tâche. Il permet également à plusieurs coroutines d'attendre simultanément le résultat.
La tâche commencera à s'exécuter sur le thread qui co_await s la tâche. Les attentes ultérieurs seront soit suspendus et seront mis en file d'attente pour la reprise à la fin de la tâche ou continueront de manière synchrone si la tâche s'est déjà exécutée à la fin.
Si un attente est suspendu en attendant que la tâche se termine, elle reprendra sur le thread qui termine l'exécution de la tâche. c'est-à-dire Le thread qui exécute le co_return ou qui lance l'exception non perdue qui termine l'exécution de la coroutine.
Résumé de l'API
namespace cppcoro
{
template < typename T = void >
class shared_task
{
public:
using promise_type = <unspecified>;
using value_type = T;
shared_task () noexcept ;
shared_task ( const shared_task& other) noexcept ;
shared_task (shared_task&& other) noexcept ;
shared_task& operator =( const shared_task& other) noexcept ;
shared_task& operator =(shared_task&& other) noexcept ;
void swap (shared_task& other) noexcept ;
// Query if the task has completed and the result is ready.
bool is_ready () const noexcept ;
// Returns an operation that when awaited will suspend the
// current coroutine until the task completes and the result
// is available.
//
// The type of the result of the 'co_await someTask' expression
// is an l-value reference to the task's result value (unless T
// is void in which case the expression has type 'void').
// If the task completed with an unhandled exception then the
// exception will be rethrown by the co_await expression.
Awaiter<T&> operator co_await () const noexcept ;
// Returns an operation that when awaited will suspend the
// calling coroutine until the task completes and the result
// is available.
//
// The result is not returned from the co_await expression.
// This can be used to synchronize with the task without the
// possibility of the co_await expression throwing an exception.
Awaiter< void > when_ready () const noexcept ;
};
template < typename T>
bool operator ==( const shared_task<T>& a, const shared_task<T>& b) noexcept ;
template < typename T>
bool operator !=( const shared_task<T>& a, const shared_task<T>& b) noexcept ;
template < typename T>
void swap (shared_task<T>& a, shared_task<T>& b) noexcept ;
// Wrap an awaitable value in a shared_task to allow multiple coroutines
// to concurrently await the result.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t >
shared_task<RESULT> make_shared_task (AWAITABLE awaitable);
} Toutes les constants constants sur shared_task<T> sont sûrs d'appeler simultanément avec d'autres constants sur la même instance à partir de plusieurs threads. Il n'est pas sûr d'appeler les méthodes non-constant de shared_task<T> simultanément avec toute autre méthode sur la même instance d'un shared_task<T> .
task<T> La classe shared_task<T> est similaire à task<T> en ce que la tâche ne commence pas l'exécution immédiatement après l'appel de la fonction coroutine. La tâche commence à s'exécuter uniquement lorsqu'elle est attendue pour la première fois.
Il diffère de task<T> en ce que l'objet de tâche résultant peut être copié, permettant à plusieurs objets de tâche de référencer le même résultat asynchrone. Il prend également en charge plusieurs coroutines en attendant simultanément le résultat de la tâche.
Le compromis est que le résultat est toujours une référence de valeur en L au résultat, jamais une référence de valeur R (puisque le résultat peut être partagé) qui peut limiter la capacité à déplacer le résultat dans une variable locale. Il a également un coût d'exécution légèrement plus élevé en raison de la nécessité de maintenir un nombre de références et de prendre en charge plusieurs attentes.
generator<T> Un generator représente un type de coroutine qui produit une séquence de valeurs de type, T , où les valeurs sont produites paresseusement et de manière synchrone.
Le corps Coroutine est capable de produire des valeurs de type T en utilisant le mot-clé co_yield . Notez cependant que le corps Coroutine n'est pas en mesure d'utiliser le mot-clé co_await ; Les valeurs doivent être produites de manière synchrone.
Par exemple:
cppcoro::generator< const std:: uint64_t > fibonacci ()
{
std:: uint64_t a = 0 , b = 1 ;
while ( true )
{
co_yield b;
auto tmp = a;
a = b;
b += tmp;
}
}
void usage ()
{
for ( auto i : fibonacci ())
{
if (i > 1'000'000 ) break ;
std::cout << i << std::endl;
}
} Lorsqu'une fonction coroutine renvoyant un generator<T> est appelée la coroutine est créée initialement suspendue. L'exécution de la coroutine entre dans le corps de Coroutine lorsque la méthode generator<T>::begin() est appelée et se poursuit jusqu'à ce que la première instruction co_yield soit atteinte ou que la coroutine se termine.
Si l'itérateur retourné n'est pas égal à l'itérateur end() , la déréférence, l'itérateur renvoie une référence à la valeur transmise à l'instruction co_yield .
Appel operator++() sur l'itérateur reprendra l'exécution de la coroutine et continuera jusqu'à ce que le prochain point co_yield soit atteint ou que la coroutine s'exécute vers l'achèvement ().
Toutes les exceptions non gérées lancées par la coroutine se propageont à partir des appels begin() ou operator++() à l'appelant.
Résumé de l'API:
namespace cppcoro
{
template < typename T>
class generator
{
public:
using promise_type = <unspecified>;
class iterator
{
public:
using iterator_category = std::input_iterator_tag;
using value_type = std:: remove_reference_t <T>;
using reference = value_type&;
using pointer = value_type*;
using difference_type = std:: size_t ;
iterator ( const iterator& other) noexcept ;
iterator& operator =( const iterator& other) noexcept ;
// If the generator coroutine throws an unhandled exception before producing
// the next element then the exception will propagate out of this call.
iterator& operator ++();
reference operator *() const noexcept ;
pointer operator ->() const noexcept ;
bool operator ==( const iterator& other) const noexcept ;
bool operator !=( const iterator& other) const noexcept ;
};
// Constructs to the empty sequence.
generator () noexcept ;
generator (generator&& other) noexcept ;
generator& operator =(generator&& other) noexcept ;
generator ( const generator& other) = delete ;
generator& operator =( const generator&) = delete ;
~generator ();
// Starts executing the generator coroutine which runs until either a value is yielded
// or the coroutine runs to completion or an unhandled exception propagates out of the
// the coroutine.
iterator begin ();
iterator end () noexcept ;
// Swap the contents of two generators.
void swap (generator& other) noexcept ;
};
template < typename T>
void swap (generator<T>& a, generator<T>& b) noexcept ;
// Apply function, func, lazily to each element of the source generator
// and yield a sequence of the results of calls to func().
template < typename FUNC, typename T>
generator<std:: invoke_result_t <FUNC, T&>> fmap (FUNC func, generator<T> source);
}recursive_generator<T> Un recursive_generator est similaire à un generator , sauf qu'il est conçu pour soutenir plus efficacement les éléments d'une séquence imbriquée en tant qu'éléments d'une séquence extérieure.
En plus de pouvoir co_yield une valeur de type T vous pouvez également co_yield une valeur de type recursive_generator<T> .
Lorsque vous co_yield une valeur recursive_generator<T> tous les éléments du générateur donné sont cédés sous forme d'éléments du générateur de courant. La coroutine actuelle est suspendue jusqu'à ce que le consommateur ait fini de consommer tous les éléments du générateur imbriqué, après quoi l'exécution de la coroutine actuelle reprendra l'exécution pour produire le prochain élément.
L'avantage de recursive_generator<T> sur generator<T> pour itération sur les structures de données récursives est que l' iterator::operator++() est capable de reprendre directement la coroutine la plus à la feuille pour produire l'élément suivant, plutôt que d'avoir à reprendre / suspendre les cooutines (de profondeur) pour chaque élément. L'inconvénient est qu'il y a des frais généraux supplémentaires
Par exemple:
// Lists the immediate contents of a directory.
cppcoro::generator<dir_entry> list_directory (std::filesystem::path path);
cppcoro::recursive_generator<dir_entry> list_directory_recursive (std::filesystem::path path)
{
for ( auto & entry : list_directory (path))
{
co_yield entry;
if (entry. is_directory ())
{
co_yield list_directory_recursive (entry. path ());
}
}
} Notez que l'application de l'opérateur fmap() à un recursive_generator<T> produira un type de generator<U> plutôt qu'un recursive_generator<U> . En effet, les utilisations de fmap ne sont généralement pas utilisées dans des contextes récursifs et nous essayons d'éviter les frais généraux supplémentaires encourus par recursive_generator .
async_generator<T> Un async_generator représente un type de coroutine qui produit une séquence de valeurs de type, T , où les valeurs sont produites paresseusement et les valeurs peuvent être produites de manière asynchrone.
Le corps de Coroutine est capable d'utiliser les expressions co_await et co_yield .
Les consommateurs du générateur peuvent utiliser A for co_await pour consommer les valeurs.
Exemple
cppcoro::async_generator< int > ticker ( int count, threadpool& tp)
{
for ( int i = 0 ; i < count; ++i)
{
co_await tp. delay ( std::chrono::seconds ( 1 ));
co_yield i;
}
}
cppcoro::task<> consumer (threadpool& tp)
{
auto sequence = ticker ( 10 , tp);
for co_await (std:: uint32_t i : sequence)
{
std::cout << " Tick " << i << std::endl;
}
}Résumé de l'API
// <cppcoro/async_generator.hpp>
namespace cppcoro
{
template < typename T>
class async_generator
{
public:
class iterator
{
public:
using iterator_tag = std::forward_iterator_tag;
using difference_type = std:: size_t ;
using value_type = std:: remove_reference_t <T>;
using reference = value_type&;
using pointer = value_type*;
iterator ( const iterator& other) noexcept ;
iterator& operator =( const iterator& other) noexcept ;
// Resumes the generator coroutine if suspended
// Returns an operation object that must be awaited to wait
// for the increment operation to complete.
// If the coroutine runs to completion then the iterator
// will subsequently become equal to the end() iterator.
// If the coroutine completes with an unhandled exception then
// that exception will be rethrown from the co_await expression.
Awaitable<iterator&> operator ++() noexcept ;
// Dereference the iterator.
pointer operator ->() const noexcept ;
reference operator *() const noexcept ;
bool operator ==( const iterator& other) const noexcept ;
bool operator !=( const iterator& other) const noexcept ;
};
// Construct to the empty sequence.
async_generator () noexcept ;
async_generator ( const async_generator&) = delete ;
async_generator (async_generator&& other) noexcept ;
~async_generator ();
async_generator& operator =( const async_generator&) = delete ;
async_generator& operator =(async_generator&& other) noexcept ;
void swap (async_generator& other) noexcept ;
// Starts execution of the coroutine and returns an operation object
// that must be awaited to wait for the first value to become available.
// The result of co_await'ing the returned object is an iterator that
// can be used to advance to subsequent elements of the sequence.
//
// This method is not valid to be called once the coroutine has
// run to completion.
Awaitable<iterator> begin () noexcept ;
iterator end () noexcept ;
};
template < typename T>
void swap (async_generator<T>& a, async_generator<T>& b);
// Apply 'func' to each element of the source generator, yielding a sequence of
// the results of calling 'func' on the source elements.
template < typename FUNC, typename T>
async_generator<std:: invoke_result_t <FUNC, T&>> fmap (FUNC func, async_generator<T> source);
} Lorsque l'objet async_generator est détruit, il demande l'annulation de la coroutine sous-jacente. Si la coroutine s'est déjà déroulée à la fin ou est actuellement suspendue dans une expression co_yield , la coroutine est immédiatement détruite. Sinon, la Coroutine continuera l'exécution jusqu'à ce qu'elle s'exécute jusqu'à la fin ou atteigne l'expression co_yield suivante.
Lorsque le cadre de la coroutine est détruit, les destructeurs de toutes les variables dans la portée à ce stade seront exécutés pour garantir que les ressources du générateur sont nettoyées.
Notez que l'appelant doit s'assurer que l'objet async_generator ne doit pas être détruit pendant qu'une coroutine grand public exécute une expression co_await en attendant la production de l'élément suivant.
single_consumer_eventIl s'agit d'un type d'événement manuel simple qui ne prend en charge qu'une seule coroutine qui l'attend à la fois. Cela peut être utilisé pour
Résumé de l'API:
// <cppcoro/single_consumer_event.hpp>
namespace cppcoro
{
class single_consumer_event
{
public:
single_consumer_event ( bool initiallySet = false ) noexcept ;
bool is_set () const noexcept ;
void set ();
void reset () noexcept ;
Awaiter< void > operator co_await () const noexcept ;
};
}Exemple:
# include < cppcoro/single_consumer_event.hpp >
cppcoro::single_consumer_event event;
std::string value;
cppcoro::task<> consumer ()
{
// Coroutine will suspend here until some thread calls event.set()
// eg. inside the producer() function below.
co_await event;
std::cout << value << std::endl;
}
void producer ()
{
value = " foo " ;
// This will resume the consumer() coroutine inside the call to set()
// if it is currently suspended.
event. set ();
}single_consumer_async_auto_reset_event Cette classe fournit une primitive de synchronisation asynchrone qui permet à une seule coroutine d'attendre que l'événement soit signalé par un appel à la méthode set() .
Une fois que la coroutine qui attend l'événement est publiée par un appel préalable ou ultérieur à set() l'événement est automatiquement réinitialisé à l'état «non défini».
Cette classe est une version plus efficace d' async_auto_reset_event qui peut être utilisée dans les cas où une seule coroutine attendra l'événement à la fois. Si vous devez prendre en charge plusieurs coroutines en attente simultanée sur l'événement, utilisez plutôt la classe async_auto_reset_event .
Résumé de l'API:
// <cppcoro/single_consumer_async_auto_reset_event.hpp>
namespace cppcoro
{
class single_consumer_async_auto_reset_event
{
public:
single_consumer_async_auto_reset_event (
bool initiallySet = false ) noexcept ;
// Change the event to the 'set' state. If a coroutine is awaiting the
// event then the event is immediately transitioned back to the 'not set'
// state and the coroutine is resumed.
void set () noexcept ;
// Returns an Awaitable type that can be awaited to wait until
// the event becomes 'set' via a call to the .set() method. If
// the event is already in the 'set' state then the coroutine
// continues without suspending.
// The event is automatically reset back to the 'not set' state
// before resuming the coroutine.
Awaiter< void > operator co_await () const noexcept ;
};
}Exemple d'utilisation:
std::atomic< int > value;
cppcoro::single_consumer_async_auto_reset_event valueDecreasedEvent;
cppcoro::task<> wait_until_value_is_below ( int limit)
{
while (value. load (std::memory_order_relaxed) >= limit)
{
// Wait until there has been some change that we're interested in.
co_await valueDecreasedEvent;
}
}
void change_value ( int delta)
{
value. fetch_add (delta, std::memory_order_relaxed);
// Notify the waiter if there has been some change.
if (delta < 0 ) valueDecreasedEvent. set ();
}async_mutexFournit une abstraction d'exclusion mutuelle simple qui permet à l'appelant de «co_await» le mutex à partir d'une coroutine pour suspendre la coroutine jusqu'à ce que le verrou de mutex soit acquis.
L'implémentation est sans serrure en ce sens qu'une coroutine qui attend le mutex ne bloquera pas le thread mais qui suspendra à la place la coroutine et la reprendra plus tard à l'intérieur de l'appel à unlock() par le porte-bloc précédent.
Résumé de l'API:
// <cppcoro/async_mutex.hpp>
namespace cppcoro
{
class async_mutex_lock ;
class async_mutex_lock_operation ;
class async_mutex_scoped_lock_operation ;
class async_mutex
{
public:
async_mutex () noexcept ;
~async_mutex ();
async_mutex ( const async_mutex&) = delete ;
async_mutex& operator ( const async_mutex&) = delete;
bool try_lock () noexcept ;
async_mutex_lock_operation lock_async () noexcept ;
async_mutex_scoped_lock_operation scoped_lock_async () noexcept ;
void unlock ();
};
class async_mutex_lock_operation
{
public:
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () const noexcept ;
};
class async_mutex_scoped_lock_operation
{
public:
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
[[nodiscard]] async_mutex_lock await_resume () const noexcept ;
};
class async_mutex_lock
{
public:
// Takes ownership of the lock.
async_mutex_lock (async_mutex& mutex, std:: adopt_lock_t ) noexcept ;
// Transfer ownership of the lock.
async_mutex_lock (async_mutex_lock&& other) noexcept ;
async_mutex_lock ( const async_mutex_lock&) = delete ;
async_mutex_lock& operator =( const async_mutex_lock&) = delete ;
// Releases the lock by calling unlock() on the mutex.
~async_mutex_lock ();
};
}Exemple d'utilisation:
# include < cppcoro/async_mutex.hpp >
# include < cppcoro/task.hpp >
# include < set >
# include < string >
cppcoro::async_mutex mutex;
std::set<std::string> values;
cppcoro::task<> add_item (std::string value)
{
cppcoro::async_mutex_lock lock = co_await mutex. scoped_lock_async ();
values. insert ( std::move (value));
}async_manual_reset_event Un événement de réinitialisation manuelle est une primitive coroutine / synchronisation de filetage qui permet à un ou plusieurs threads d'attendre que l'événement soit signalé par un thread qui appelle set() .
L'événement se trouve dans l'un des deux États; «set» et «pas set» .
Si l'événement est dans l'état «défini» lorsqu'une coroutine attend l'événement, la coroutine continue sans suspendre. Cependant, si la coroutine est à l'état «non défini» , la coroutine est suspendue jusqu'à ce qu'un thread appelle par la suite la méthode set() .
Tous les fils qui ont été suspendus en attendant que l'événement deviendront «set» sera reprise à l'intérieur de l'appel suivant pour set() par un thread.
Notez que vous devez vous assurer qu'aucune coroutine n'attend un événement «pas défini» lorsque l'événement est détruit car ils ne reprendront pas.
Exemple:
cppcoro::async_manual_reset_event event;
std::string value;
void producer ()
{
value = get_some_string_value ();
// Publish a value by setting the event.
event. set ();
}
// Can be called many times to create many tasks.
// All consumer tasks will wait until value has been published.
cppcoro::task<> consumer ()
{
// Wait until value has been published by awaiting event.
co_await event;
consume_value (value);
}Résumé de l'API:
namespace cppcoro
{
class async_manual_reset_event_operation ;
class async_manual_reset_event
{
public:
async_manual_reset_event ( bool initiallySet = false ) noexcept ;
~async_manual_reset_event ();
async_manual_reset_event ( const async_manual_reset_event&) = delete ;
async_manual_reset_event (async_manual_reset_event&&) = delete ;
async_manual_reset_event& operator =( const async_manual_reset_event&) = delete ;
async_manual_reset_event& operator =(async_manual_reset_event&&) = delete ;
// Wait until the event becomes set.
async_manual_reset_event_operation operator co_await () const noexcept ;
bool is_set () const noexcept ;
void set () noexcept ;
void reset () noexcept ;
};
class async_manual_reset_event_operation
{
public:
async_manual_reset_event_operation (async_manual_reset_event& event) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () const noexcept ;
};
}async_auto_reset_event Un événement de réinitialisation automatique est une primitive coroutine / fil de filetage qui permet à un ou plusieurs threads d'attendre que l'événement soit signalé par un thread en appelant set() .
Une fois qu'une coroutine qui attend l'événement est publiée par un appel préalable ou ultérieur à set() l'événement est automatiquement réinitialisé à l'état «non défini».
Résumé de l'API:
// <cppcoro/async_auto_reset_event.hpp>
namespace cppcoro
{
class async_auto_reset_event_operation ;
class async_auto_reset_event
{
public:
async_auto_reset_event ( bool initiallySet = false ) noexcept ;
~async_auto_reset_event ();
async_auto_reset_event ( const async_auto_reset_event&) = delete ;
async_auto_reset_event (async_auto_reset_event&&) = delete ;
async_auto_reset_event& operator =( const async_auto_reset_event&) = delete ;
async_auto_reset_event& operator =(async_auto_reset_event&&) = delete ;
// Wait for the event to enter the 'set' state.
//
// If the event is already 'set' then the event is set to the 'not set'
// state and the awaiting coroutine continues without suspending.
// Otherwise, the coroutine is suspended and later resumed when some
// thread calls 'set()'.
//
// Note that the coroutine may be resumed inside a call to 'set()'
// or inside another thread's call to 'operator co_await()'.
async_auto_reset_event_operation operator co_await () const noexcept ;
// Set the state of the event to 'set'.
//
// If there are pending coroutines awaiting the event then one
// pending coroutine is resumed and the state is immediately
// set back to the 'not set' state.
//
// This operation is a no-op if the event was already 'set'.
void set () noexcept ;
// Set the state of the event to 'not-set'.
//
// This is a no-op if the state was already 'not set'.
void reset () noexcept ;
};
class async_auto_reset_event_operation
{
public:
explicit async_auto_reset_event_operation (async_auto_reset_event& event) noexcept ;
async_auto_reset_event_operation ( const async_auto_reset_event_operation& other) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () const noexcept ;
};
}async_latchUn verrou asynchrone est une primitive de synchronisation qui permet aux coroutines d'attendre de manière asynchrone qu'un compteur soit décrémenté à zéro.
Le verrou est un objet à usage unique. Une fois que le comptoir atteint zéro, le verrou devient «prêt» et restera prêt jusqu'à ce que le verrou soit détruit.
Résumé de l'API:
// <cppcoro/async_latch.hpp>
namespace cppcoro
{
class async_latch
{
public:
// Initialise the latch with the specified count.
async_latch (std:: ptrdiff_t initialCount) noexcept ;
// Query if the count has reached zero yet.
bool is_ready () const noexcept ;
// Decrement the count by n.
// This will resume any waiting coroutines if the count reaches zero
// as a result of this call.
// It is undefined behaviour to decrement the count below zero.
void count_down (std:: ptrdiff_t n = 1 ) noexcept ;
// Wait until the latch becomes ready.
// If the latch count is not yet zero then the awaiting coroutine will
// be suspended and later resumed by a call to count_down() that decrements
// the count to zero. If the latch count was already zero then the coroutine
// continues without suspending.
Awaiter< void > operator co_await () const noexcept ;
};
}sequence_barrier Une sequence_barrier est une primitive de synchronisation qui permet à un seul producteur et à plusieurs consommateurs de se coordonner en ce qui concerne un numéro de séquence augmentant monotone.
Un seul producteur fait progresser le numéro de séquence en publiant de nouveaux numéros de séquence dans un ordre croissant monotone. Un ou plusieurs consommateurs peuvent interroger le dernier numéro de séquence publié et attendre qu'un numéro de séquence particulier ait été publié.
Une barrière de séquence peut être utilisée pour représenter un curseur dans un producteur / anneau de consommation
Voir le motif de perturbateur LMAX pour plus de fond: https://lmax-exchange.github.io/disruptor/files/disruptor-1.0.pdf
Synopsis API:
namespace cppcoro
{
template < typename SEQUENCE = std:: size_t ,
typename TRAITS = sequence_traits<SEQUENCE>>
class sequence_barrier
{
public:
sequence_barrier (SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept ;
~sequence_barrier ();
SEQUENCE last_published () const noexcept ;
// Wait until the specified targetSequence number has been published.
//
// If the operation does not complete synchronously then the awaiting
// coroutine is resumed on the specified scheduler. Otherwise, the
// coroutine continues without suspending.
//
// The co_await expression resumes with the updated last_published()
// value, which is guaranteed to be at least 'targetSequence'.
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<SEQUENCE> wait_until_published (SEQUENCE targetSequence,
SCHEDULER& scheduler) const noexcept ;
void publish (SEQUENCE sequence) noexcept ;
};
}single_producer_sequencer Un single_producer_sequencer est une primitive de synchronisation qui peut être utilisée pour coordonner l'accès à un bouffoir de ring pour un seul producteur et un ou plusieurs consommateurs.
Un producteur acquiert d'abord un ou plusieurs emplacements dans un bouffon d'anneau, écrit sur les éléments de bouffée d'anneau correspondant à ces emplacements, puis publie enfin les valeurs écrites à ces créneaux. Un producteur ne peut jamais produire plus que des éléments «tamponze» avant où le consommateur a consommé.
Un consommateur attend alors que certains éléments soient publiés, traite les éléments, puis informe le producteur lorsqu'il a terminé les éléments de traitement en publiant le numéro de séquence qu'il a fini de consommer dans un objet sequence_barrier .
Synopsis API:
// <cppcoro/single_producer_sequencer.hpp>
namespace cppcoro
{
template <
typename SEQUENCE = std:: size_t ,
typename TRAITS = sequence_traits<SEQUENCE>>
class single_producer_sequencer
{
public:
using size_type = typename sequence_range<SEQUENCE, TRAITS>::size_type;
single_producer_sequencer (
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
std:: size_t bufferSize,
SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept ;
// Publisher API:
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<SEQUENCE> claim_one (SCHEDULER& scheduler) noexcept ;
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<sequence_range<SEQUENCE>> claim_up_to (
std:: size_t count,
SCHEDULER& scheduler) noexcept ;
void publish (SEQUENCE sequence) noexcept ;
// Consumer API:
SEQUENCE last_published () const noexcept ;
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<SEQUENCE> wait_until_published (
SEQUENCE targetSequence,
SCHEDULER& scheduler) const noexcept ;
};
}Exemple d'utilisation:
using namespace cppcoro ;
using namespace std ::chrono ;
struct message
{
int id;
steady_clock::time_point timestamp;
float data;
};
constexpr size_t bufferSize = 16384 ; // Must be power-of-two
constexpr size_t indexMask = bufferSize - 1 ;
message buffer[bufferSize];
task< void > producer (
io_service& ioSvc,
single_producer_sequencer< size_t >& sequencer)
{
auto start = steady_clock::now ();
for ( int i = 0 ; i < 1'000'000 ; ++i)
{
// Wait until a slot is free in the buffer.
size_t seq = co_await sequencer. claim_one (ioSvc);
// Populate the message.
auto & msg = buffer[seq & indexMask];
msg. id = i;
msg. timestamp = steady_clock::now ();
msg. data = 123 ;
// Publish the message.
sequencer. publish (seq);
}
// Publish a sentinel
auto seq = co_await sequencer. claim_one (ioSvc);
auto & msg = buffer[seq & indexMask];
msg. id = - 1 ;
sequencer. publish (seq);
}
task< void > consumer (
static_thread_pool& threadPool,
const single_producer_sequencer< size_t >& sequencer,
sequence_barrier< size_t >& consumerBarrier)
{
size_t nextToRead = 0 ;
while ( true )
{
// Wait until the next message is available
// There may be more than one available.
const size_t available = co_await sequencer. wait_until_published (nextToRead, threadPool);
do {
auto & msg = buffer[nextToRead & indexMask];
if (msg. id == - 1 )
{
consumerBarrier. publish (nextToRead);
co_return ;
}
processMessage (msg);
} while (nextToRead++ != available);
// Notify the producer that we've finished processing
// up to 'nextToRead - 1'.
consumerBarrier. publish (available);
}
}
task< void > example (io_service& ioSvc, static_thread_pool& threadPool)
{
sequence_barrier< size_t > barrier;
single_producer_sequencer< size_t > sequencer{barrier, bufferSize};
co_await when_all (
producer (tp, sequencer),
consumer (tp, sequencer, barrier));
}multi_producer_sequencer La classe multi_producer_sequencer est une primitive de synchronisation qui coordonne l'accès à un bouffon de ring pour plusieurs producteurs et un ou plusieurs consommateurs.
Pour une variante unique du produit, consultez la classe single_producer_sequencer .
Notez que la bague doit avoir une taille qui est une puissance de deux. En effet En outre, cela permet au numéro de séquence de s'enrouler en toute sécurité autour de la valeur 32 bits / 64 bits.
Résumé de l'API:
// <cppcoro/multi_producer_sequencer.hpp>
namespace cppcoro
{
template < typename SEQUENCE = std:: size_t ,
typename TRAITS = sequence_traits<SEQUENCE>>
class multi_producer_sequencer
{
public:
multi_producer_sequencer (
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
SEQUENCE initialSequence = TRAITS::initial_sequence);
std:: size_t buffer_size () const noexcept ;
// Consumer interface
//
// Each consumer keeps track of their own 'lastKnownPublished' value
// and must pass this to the methods that query for an updated last-known
// published sequence number.
SEQUENCE last_published_after (SEQUENCE lastKnownPublished) const noexcept ;
template < typename SCHEDULER>
Awaitable<SEQUENCE> wait_until_published (
SEQUENCE targetSequence,
SEQUENCE lastKnownPublished,
SCHEDULER& scheduler) const noexcept ;
// Producer interface
// Query whether any slots available for claiming (approx.)
bool any_available () const noexcept ;
template < typename SCHEDULER>
Awaitable<SEQUENCE> claim_one (SCHEDULER& scheduler) noexcept ;
template < typename SCHEDULER>
Awaitable<sequence_range<SEQUENCE, TRAITS>> claim_up_to (
std:: size_t count,
SCHEDULER& scheduler) noexcept ;
// Mark the specified sequence number as published.
void publish (SEQUENCE sequence) noexcept ;
// Mark all sequence numbers in the specified range as published.
void publish ( const sequence_range<SEQUENCE, TRAITS>& range) noexcept ;
};
} Un cancellation_token est une valeur qui peut être transmise à une fonction qui permet à l'appelant de communiquer par la suite une demande d'annulation de l'opération à cette fonction.
Pour obtenir une cancellation_token qui peut être annulée, vous devez d'abord créer un objet cancellation_source . La méthode cancellation_source::token() peut être utilisée pour fabriquer de nouvelles valeurs cancellation_token qui sont liées à cet objet cancellation_source .
Lorsque vous souhaitez demander plus tard l'annulation d'une opération, vous avez transmis une cancellation_token à vous pouvez appeler cancellation_source::request_cancellation() sur un objet associé cancellation_source .
Les fonctions peuvent répondre à une demande d'annulation de deux manières:
cancellation_token::is_cancellation_requested() ou cancellation_token::throw_if_cancellation_requested() .cancellation_registration .Résumé de l'API:
namespace cppcoro
{
class cancellation_source
{
public:
// Construct a new, independently cancellable cancellation source.
cancellation_source ();
// Construct a new reference to the same cancellation state.
cancellation_source ( const cancellation_source& other) noexcept ;
cancellation_source (cancellation_source&& other) noexcept ;
~cancellation_source ();
cancellation_source& operator =( const cancellation_source& other) noexcept ;
cancellation_source& operator =(cancellation_source&& other) noexcept ;
bool is_cancellation_requested () const noexcept ;
bool can_be_cancelled () const noexcept ;
void request_cancellation ();
cancellation_token token () const noexcept ;
};
class cancellation_token
{
public:
// Construct a token that can't be cancelled.
cancellation_token () noexcept ;
cancellation_token ( const cancellation_token& other) noexcept ;
cancellation_token (cancellation_token&& other) noexcept ;
~cancellation_token ();
cancellation_token& operator =( const cancellation_token& other) noexcept ;
cancellation_token& operator =(cancellation_token&& other) noexcept ;
bool is_cancellation_requested () const noexcept ;
void throw_if_cancellation_requested () const ;
// Query if this token can ever have cancellation requested.
// Code can use this to take a more efficient code-path in cases
// that the operation does not need to handle cancellation.
bool can_be_cancelled () const noexcept ;
};
// RAII class for registering a callback to be executed if cancellation
// is requested on a particular cancellation token.
class cancellation_registration
{
public:
// Register a callback to be executed if cancellation is requested.
// Callback will be called with no arguments on the thread that calls
// request_cancellation() if cancellation is not yet requested, or
// called immediately if cancellation has already been requested.
// Callback must not throw an unhandled exception when called.
template < typename CALLBACK>
cancellation_registration (cancellation_token token, CALLBACK&& callback);
cancellation_registration ( const cancellation_registration& other) = delete ;
~cancellation_registration ();
};
class operation_cancelled : public std :: exception
{
public:
operation_cancelled ();
const char * what () const override ;
};
}Exemple: approche de sondage
cppcoro::task<> do_something_async (cppcoro::cancellation_token token)
{
// Explicitly define cancellation points within the function
// by calling throw_if_cancellation_requested().
token. throw_if_cancellation_requested ();
co_await do_step_1 ();
token. throw_if_cancellation_requested ();
do_step_2 ();
// Alternatively, you can query if cancellation has been
// requested to allow yourself to do some cleanup before
// returning.
if (token. is_cancellation_requested ())
{
display_message_to_user ( " Cancelling operation... " );
do_cleanup ();
throw cppcoro::operation_cancelled{};
}
do_final_step ();
}Exemple: approche de rappel
// Say we already have a timer abstraction that supports being
// cancelled but it doesn't support cancellation_tokens natively.
// You can use a cancellation_registration to register a callback
// that calls the existing cancellation API. e.g.
cppcoro::task<> cancellable_timer_wait (cppcoro::cancellation_token token)
{
auto timer = create_timer (10s);
cppcoro::cancellation_registration registration (token, [&]
{
// Call existing timer cancellation API.
timer. cancel ();
});
co_await timer;
}static_thread_pool La classe static_thread_pool fournit une abstraction qui vous permet de planifier le travail sur un pool de threads de taille fixe.
Cette classe implémente le concept de planificateur (voir ci-dessous).
Vous pouvez enterrer le travail sur le filetage en exécutant co_await threadPool.schedule() . Cette opération suspendra la coroutine actuelle, en a-t-elle pour exécuter l'exécution sur le pool de thread et le pool de threads reprendra la coroutine lorsqu'un fil dans le pool de threads est le prochain libre pour exécuter la coroutine. Cette opération est garantie de ne pas lancer et, dans le cas commun, n'allouera aucune mémoire .
Cette classe utilise un algorithme de vol de travail pour le travail de chargement sur plusieurs threads. Le travail en cours de filetage à partir d'un thread de filetage sera prévu pour l'exécution sur le même thread dans une file d'attente LIFO. Le travail en cours de filetage à partir d'un fil distant sera mis en file d'attente à une file d'attente FIFO mondiale. Lorsqu'un fil de travail ne manque pas de travail à partir de sa file d'attente locale, il essaie d'abord de désactiver le travail de la file d'attente mondiale. Si cette file d'attente est vide, elle essaie ensuite de voler des travaux à l'arrière des files d'attente des autres fils de travailleur.
Résumé de l'API:
namespace cppcoro
{
class static_thread_pool
{
public:
// Initialise the thread-pool with a number of threads equal to
// std::thread::hardware_concurrency().
static_thread_pool ();
// Initialise the thread pool with the specified number of threads.
explicit static_thread_pool (std:: uint32_t threadCount);
std:: uint32_t thread_count () const noexcept ;
class schedule_operation
{
public:
schedule_operation (static_thread_pool* tp) noexcept ;
bool await_ready () noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> h) noexcept ;
bool await_resume () noexcept ;
private:
// unspecified
};
// Return an operation that can be awaited by a coroutine.
//
//
[[nodiscard]]
schedule_operation schedule () noexcept ;
private:
// Unspecified
};
}Exemple d'utilisation: simple
cppcoro::task<std::string> do_something_on_threadpool (cppcoro::static_thread_pool& tp)
{
// First schedule the coroutine onto the threadpool.
co_await tp. schedule ();
// When it resumes, this coroutine is now running on the threadpool.
do_something ();
} Exemple d'utilisation: faire des choses en parallèle - en utilisant l'opérateur schedule_on() avec static_thread_pool .
cppcoro::task< double > dot_product (static_thread_pool& tp, double a[], double b[], size_t count)
{
if (count > 1000 )
{
// Subdivide the work recursively into two equal tasks
// The first half is scheduled to the thread pool so it can run concurrently
// with the second half which continues on this thread.
size_t halfCount = count / 2 ;
auto [first, second] = co_await when_all (
schedule_on (tp, dot_product (tp, a, b, halfCount),
dot_product (tp, a + halfCount, b + halfCount, count - halfCount));
co_return first + second;
}
else
{
double sum = 0.0 ;
for ( size_t i = 0 ; i < count; ++i)
{
sum += a[i] * b[i];
}
co_return sum;
}
}io_service et io_work_scope La classe io_service fournit une abstraction pour le traitement des événements d'achèvement d'E / S à partir d'opérations d'E / S asynchrones.
Lorsqu'une opération d'E / S asynchrone se termine, la coroutine qui attendait cette opération reprendra sur un fil d'E / S à l'intérieur d'un appel à l'une des méthodes de traitement des événements: process_events() , process_pending_events() , process_one_event() ou process_one_pending_event() .
La classe io_service ne gère aucun threads d'E / S. Vous devez vous assurer que certains discussions appellent l'une des méthodes de traitement des événements pour les Coroutines en attente d'événements d'achèvement d'E / S à expédier. Cela peut être un thread dédié qui appelle process_events() ou mélangé avec une autre boucle d'événements (par exemple une boucle d'événements d'interface utilisateur) en interrogeant périodiquement pour de nouveaux événements via un appel à process_pending_events() ou process_one_pending_event() .
Cela permet l'intégration de la boucle d'événements io_service avec d'autres boucles d'événements, telles qu'une boucle d'événement d'interface utilisateur.
Vous pouvez multiplexer le traitement des événements sur plusieurs threads en ayant plusieurs threads Call process_events() . Vous pouvez spécifier un indice quant au nombre maximum de threads pour avoir un traitement actif des événements via un paramètre de constructeur io_service facultatif.
Sous Windows, l'implémentation utilise l'installation de port d'achèvement d'E / S Windows pour expédier des événements aux threads d'E / S de manière évolutive.
Résumé de l'API:
namespace cppcoro
{
class io_service
{
public:
class schedule_operation ;
class timed_schedule_operation ;
io_service ();
io_service (std:: uint32_t concurrencyHint);
io_service (io_service&&) = delete ;
io_service ( const io_service&) = delete ;
io_service& operator =(io_service&&) = delete ;
io_service& operator =( const io_service&) = delete ;
~io_service ();
// Scheduler methods
[[nodiscard]]
schedule_operation schedule () noexcept ;
template < typename REP, typename RATIO>
[[nodiscard]]
timed_schedule_operation schedule_after (
std::chrono::duration<REP, RATIO> delay,
cppcoro::cancellation_token cancellationToken = {}) noexcept ;
// Event-loop methods
//
// I/O threads must call these to process I/O events and execute
// scheduled coroutines.
std:: uint64_t process_events ();
std:: uint64_t process_pending_events ();
std:: uint64_t process_one_event ();
std:: uint64_t process_one_pending_event ();
// Request that all threads processing events exit their event loops.
void stop () noexcept ;
// Query if some thread has called stop()
bool is_stop_requested () const noexcept ;
// Reset the event-loop after a call to stop() so that threads can
// start processing events again.
void reset ();
// Reference-counting methods for tracking outstanding references
// to the io_service.
//
// The io_service::stop() method will be called when the last work
// reference is decremented.
//
// Use the io_work_scope RAII class to manage calling these methods on
// entry-to and exit-from a scope.
void notify_work_started () noexcept ;
void notify_work_finished () noexcept ;
};
class io_service ::schedule_operation
{
public:
schedule_operation ( const schedule_operation&) noexcept ;
schedule_operation& operator =( const schedule_operation&) noexcept ;
bool await_ready () const noexcept ;
void await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () noexcept ;
};
class io_service ::timed_schedule_operation
{
public:
timed_schedule_operation (timed_schedule_operation&&) noexcept ;
timed_schedule_operation ( const timed_schedule_operation&) = delete ;
timed_schedule_operation& operator =( const timed_schedule_operation&) = delete ;
timed_schedule_operation& operator =(timed_schedule_operation&&) = delete ;
bool await_ready () const noexcept ;
void await_suspend (std::experimental::coroutine_handle<> awaiter);
void await_resume ();
};
class io_work_scope
{
public:
io_work_scope (io_service& ioService) noexcept ;
io_work_scope ( const io_work_scope& other) noexcept ;
io_work_scope (io_work_scope&& other) noexcept ;
~io_work_scope ();
io_work_scope& operator =( const io_work_scope& other) noexcept ;
io_work_scope& operator =(io_work_scope&& other) noexcept ;
io_service& service () const noexcept ;
};
}Exemple:
# include < cppcoro/task.hpp >
# include < cppcoro/task.hpp >
# include < cppcoro/io_service.hpp >
# include < cppcoro/read_only_file.hpp >
# include < experimental/filesystem >
# include < memory >
# include < algorithm >
# include < iostream >
namespace fs = std::experimental::filesystem;
cppcoro::task<std:: uint64_t > count_lines (cppcoro::io_service& ioService, fs::path path)
{
auto file = cppcoro::read_only_file::open (ioService, path);
constexpr size_t bufferSize = 4096 ;
auto buffer = std::make_unique<std:: uint8_t []>(bufferSize);
std:: uint64_t newlineCount = 0 ;
for (std:: uint64_t offset = 0 , fileSize = file. size (); offset < fileSize;)
{
const auto bytesToRead = static_cast < size_t >(
std::min<std:: uint64_t >(bufferSize, fileSize - offset));
const auto bytesRead = co_await file. read (offset, buffer. get (), bytesToRead);
newlineCount += std::count (buffer. get (), buffer. get () + bytesRead, ' n ' );
offset += bytesRead;
}
co_return newlineCount;
}
cppcoro::task<> run (cppcoro::io_service& ioService)
{
cppcoro::io_work_scope ioScope (ioService);
auto lineCount = co_await count_lines (ioService, fs::path{ " foo.txt " });
std::cout << " foo.txt has " << lineCount << " lines. " << std::endl;;
}
cppcoro::task<> process_events (cppcoro::io_service& ioService)
{
// Process events until the io_service is stopped.
// ie. when the last io_work_scope goes out of scope.
ioService. process_events ();
co_return ;
}
int main ()
{
cppcoro::io_service ioService;
cppcoro::sync_wait ( cppcoro::when_all_ready (
run (ioService),
process_events (ioService)));
return 0 ;
}io_service en tant que planificateur Une classe io_service implémente les interfaces pour le Scheduler et les concepts DelayedScheduler .
Cela permet à une coroutine de suspendre l'exécution sur le thread actuel et de planifier lui-même la reprise sur un thread d'E / S associé à un objet io_service particulier.
Exemple:
cppcoro::task<> do_something (cppcoro::io_service& ioService)
{
// Coroutine starts execution on the thread of the task awaiter.
// A coroutine can transfer execution to an I/O thread by awaiting the
// result of io_service::schedule().
co_await ioService. schedule ();
// At this point, the coroutine is now executing on an I/O thread
// inside a call to one of the io_service event processing methods.
// A coroutine can also perform a delayed-schedule that will suspend
// the coroutine for a specified duration of time before scheduling
// it for resumption on an I/O thread.
co_await ioService. schedule_after (100ms);
// At this point, the coroutine is executing on a potentially different I/O thread.
}file , readable_file , writable_fileCes types sont des classes de base abstraites pour effectuer des E / S de fichiers en béton.
Résumé de l'API:
namespace cppcoro
{
class file_read_operation ;
class file_write_operation ;
class file
{
public:
virtual ~file ();
std:: uint64_t size () const ;
protected:
file (file&& other) noexcept ;
};
class readable_file : public virtual file
{
public:
[[nodiscard]]
file_read_operation read (
std:: uint64_t offset,
void * buffer,
std:: size_t byteCount,
cancellation_token ct = {}) const noexcept ;
};
class writable_file : public virtual file
{
public:
void set_size (std:: uint64_t fileSize);
[[nodiscard]]
file_write_operation write (
std:: uint64_t offset,
const void * buffer,
std:: size_t byteCount,
cancellation_token ct = {}) noexcept ;
};
class file_read_operation
{
public:
file_read_operation (file_read_operation&& other) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter);
std:: size_t await_resume ();
};
class file_write_operation
{
public:
file_write_operation (file_write_operation&& other) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter);
std:: size_t await_resume ();
};
}read_only_file , write_only_file , read_write_fileCes types représentent des classes d'E / S de fichiers concrètes.
Résumé de l'API:
namespace cppcoro
{
class read_only_file : public readable_file
{
public:
[[nodiscard]]
static read_only_file open (
io_service& ioService,
const std::experimental::filesystem::path& path,
file_share_mode shareMode = file_share_mode::read,
file_buffering_mode bufferingMode = file_buffering_mode::default_);
};
class write_only_file : public writable_file
{
public:
[[nodiscard]]
static write_only_file open (
io_service& ioService,
const std::experimental::filesystem::path& path,
file_open_mode openMode = file_open_mode::create_or_open,
file_share_mode shareMode = file_share_mode::none,
file_buffering_mode bufferingMode = file_buffering_mode::default_);
};
class read_write_file : public readable_file , public writable_file
{
public:
[[nodiscard]]
static read_write_file open (
io_service& ioService,
const std::experimental::filesystem::path& path,
file_open_mode openMode = file_open_mode::create_or_open,
file_share_mode shareMode = file_share_mode::none,
file_buffering_mode bufferingMode = file_buffering_mode::default_);
};
} Toutes les fonctions open() lancent std::system_error sur l'échec.
Remarque: Les abstractions de réseautage ne sont actuellement prises en charge que sur la plate-forme Windows. Le support Linux arrivera bientôt.
socketLa classe de socket peut être utilisée pour envoyer / recevoir des données sur le réseau de manière asynchrone.
Actuellement, ne prend en charge que TCP / IP, UDP / IP sur IPv4 et IPv6.
Résumé de l'API:
// <cppcoro/net/socket.hpp>
namespace cppcoro ::net
{
class socket
{
public:
static socket create_tcpv4 (ip_service& ioSvc);
static socket create_tcpv6 (ip_service& ioSvc);
static socket create_updv4 (ip_service& ioSvc);
static socket create_udpv6 (ip_service& ioSvc);
socket (socket&& other) noexcept ;
~socket ();
socket& operator =(socket&& other) noexcept ;
// Return the native socket handle for the socket
<platform-specific> native_handle () noexcept ;
const ip_endpoint& local_endpoint () const noexcept ;
const ip_endpoint& remote_endpoint () const noexcept ;
void bind ( const ip_endpoint& localEndPoint);
void listen ();
[[nodiscard]]
Awaitable< void > connect ( const ip_endpoint& remoteEndPoint) noexcept ;
[[nodiscard]]
Awaitable< void > connect ( const ip_endpoint& remoteEndPoint,
cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable< void > accept (socket& acceptingSocket) noexcept ;
[[nodiscard]]
Awaitable< void > accept (socket& acceptingSocket,
cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable< void > disconnect () noexcept ;
[[nodiscard]]
Awaitable< void > disconnect (cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > send ( const void * buffer, std:: size_t size) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > send ( const void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > recv ( void * buffer, std:: size_t size) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > recv ( void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
[[nodiscard]]
socket_recv_from_operation recv_from (
void * buffer,
std:: size_t size) noexcept ;
[[nodiscard]]
socket_recv_from_operation_cancellable recv_from (
void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
[[nodiscard]]
socket_send_to_operation send_to (
const ip_endpoint& destination,
const void * buffer,
std:: size_t size) noexcept ;
[[nodiscard]]
socket_send_to_operation_cancellable send_to (
const ip_endpoint& destination,
const void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
void close_send ();
void close_recv ();
};
}Exemple: Echo Server
# include < cppcoro/net/socket.hpp >
# include < cppcoro/io_service.hpp >
# include < cppcoro/cancellation_source.hpp >
# include < cppcoro/async_scope.hpp >
# include < cppcoro/on_scope_exit.hpp >
# include < memory >
# include < iostream >
cppcoro::task< void > handle_connection (socket s)
{
try
{
const size_t bufferSize = 16384 ;
auto buffer = std::make_unique< unsigned char []>(bufferSize);
size_t bytesRead;
do {
// Read some bytes
bytesRead = co_await s. recv (buffer. get (), bufferSize);
// Write some bytes
size_t bytesWritten = 0 ;
while (bytesWritten < bytesRead) {
bytesWritten += co_await s. send (
buffer. get () + bytesWritten,
bytesRead - bytesWritten);
}
} while (bytesRead != 0 );
s. close_send ();
co_await s. disconnect ();
}
catch (...)
{
std::cout << " connection failed " << std::
}
}
cppcoro::task< void > echo_server (
cppcoro::net::ipv4_endpoint endpoint,
cppcoro::io_service& ioSvc,
cancellation_token ct)
{
cppcoro::async_scope scope;
std::exception_ptr ex;
try
{
auto listeningSocket = cppcoro::net::socket::create_tcpv4 (ioSvc);
listeningSocket. bind (endpoint);
listeningSocket. listen ();
while ( true ) {
auto connection = cppcoro::net::socket::create_tcpv4 (ioSvc);
co_await listeningSocket. accept (connection, ct);
scope. spawn ( handle_connection ( std::move (connection)));
}
}
catch (cppcoro::operation_cancelled)
{
}
catch (...)
{
ex = std::current_exception ();
}
// Wait until all handle_connection tasks have finished.
co_await scope. join ();
if (ex) std::rethrow_exception (ex);
}
int main ( int argc, const char * argv[])
{
cppcoro::io_service ioSvc;
if (argc != 2 ) return - 1 ;
auto endpoint = cppcoro::ipv4_endpoint::from_string (argv[ 1 ]);
if (!endpoint) return - 1 ;
( void ) cppcoro::sync_wait ( cppcoro::when_all (
[&]() -> task<>
{
// Shutdown the event loop once finished.
auto stopOnExit = cppcoro::on_scope_exit ([&] { ioSvc. stop (); });
cppcoro::cancellation_source canceller;
co_await cppcoro::when_all (
[&]() -> task<>
{
// Run for 30s then stop accepting new connections.
co_await ioSvc. schedule_after ( std::chrono::seconds ( 30 ));
canceller. request_cancellation ();
}(),
echo_server (*endpoint, ioSvc, canceller. token ()));
}(),
[&]() -> task<>
{
ioSvc. process_events ();
}()));
return 0 ;
}ip_address , ipv4_address , ipv6_addressClasses auxiliaires pour représenter une adresse IP.
Synopsis API:
namespace cppcoro ::net
{
class ipv4_address
{
using bytes_t = std:: uint8_t [ 4 ];
public:
constexpr ipv4_address ();
explicit constexpr ipv4_address (std:: uint32_t integer);
explicit constexpr ipv4_address ( const std::uint8_t (&bytes)[4]);
explicit constexpr ipv4_address (std:: uint8_t b0,
std:: uint8_t b1,
std:: uint8_t b2,
std:: uint8_t b3);
constexpr const bytes_t & bytes () const ;
constexpr std:: uint32_t to_integer () const ;
static constexpr ipv4_address loopback ();
constexpr bool is_loopback () const ;
constexpr bool is_private_network () const ;
constexpr bool operator ==(ipv4_address other) const ;
constexpr bool operator !=(ipv4_address other) const ;
constexpr bool operator <(ipv4_address other) const ;
constexpr bool operator >(ipv4_address other) const ;
constexpr bool operator <=(ipv4_address other) const ;
constexpr bool operator >=(ipv4_address other) const ;
std::string to_string ();
static std::optional<ipv4_address> from_string (std::string_view string) noexcept ;
};
class ipv6_address
{
using bytes_t = std:: uint8_t [ 16 ];
public:
constexpr ipv6_address ();
explicit constexpr ipv6_address (
std:: uint64_t subnetPrefix,
std:: uint64_t interfaceIdentifier);
constexpr ipv6_address (
std:: uint16_t part0,
std:: uint16_t part1,
std:: uint16_t part2,
std:: uint16_t part3,
std:: uint16_t part4,
std:: uint16_t part5,
std:: uint16_t part6,
std:: uint16_t part7);
explicit constexpr ipv6_address (
const std::uint16_t (&parts)[8]);
explicit constexpr ipv6_address (
const std::uint8_t (bytes)[16]);
constexpr const bytes_t & bytes () const ;
constexpr std:: uint64_t subnet_prefix () const ;
constexpr std:: uint64_t interface_identifier () const ;
static constexpr ipv6_address unspecified ();
static constexpr ipv6_address loopback ();
static std::optional<ipv6_address> from_string (std::string_view string) noexcept ;
std::string to_string () const ;
constexpr bool operator ==( const ipv6_address& other) const ;
constexpr bool operator !=( const ipv6_address& other) const ;
constexpr bool operator <( const ipv6_address& other) const ;
constexpr bool operator >( const ipv6_address& other) const ;
constexpr bool operator <=( const ipv6_address& other) const ;
constexpr bool operator >=( const ipv6_address& other) const ;
};
class ip_address
{
public:
// Constructs to IPv4 address 0.0.0.0
ip_address () noexcept ;
ip_address (ipv4_address address) noexcept ;
ip_address (ipv6_address address) noexcept ;
bool is_ipv4 () const noexcept ;
bool is_ipv6 () const noexcept ;
const ipv4_address& to_ipv4 () const ;
const ipv6_address& to_ipv6 () const ;
const std:: uint8_t * bytes () const noexcept ;
std::string to_string () const ;
static std::optional<ip_address> from_string (std::string_view string) noexcept ;
bool operator ==( const ip_address& rhs) const noexcept ;
bool operator !=( const ip_address& rhs) const noexcept ;
// ipv4_address sorts less than ipv6_address
bool operator <( const ip_address& rhs) const noexcept ;
bool operator >( const ip_address& rhs) const noexcept ;
bool operator <=( const ip_address& rhs) const noexcept ;
bool operator >=( const ip_address& rhs) const noexcept ;
};
}ip_endpoint , ipv4_endpoint ipv6_endpointClasses d'assistance pour représenter une adresse IP et un numéro de port.
Synopsis API:
namespace cppcoro ::net
{
class ipv4_endpoint
{
public:
ipv4_endpoint () noexcept ;
explicit ipv4_endpoint (ipv4_address address, std:: uint16_t port = 0 ) noexcept ;
const ipv4_address& address () const noexcept ;
std:: uint16_t port () const noexcept ;
std::string to_string () const ;
static std::optional<ipv4_endpoint> from_string (std::string_view string) noexcept ;
};
bool operator ==( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator !=( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator <( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator >( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator <=( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator >=( const ipv4_endpoint& a, const ipv4_endpoint& b);
class ipv6_endpoint
{
public:
ipv6_endpoint () noexcept ;
explicit ipv6_endpoint (ipv6_address address, std:: uint16_t port = 0 ) noexcept ;
const ipv6_address& address () const noexcept ;
std:: uint16_t port () const noexcept ;
std::string to_string () const ;
static std::optional<ipv6_endpoint> from_string (std::string_view string) noexcept ;
};
bool operator ==( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator !=( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator <( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator >( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator <=( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator >=( const ipv6_endpoint& a, const ipv6_endpoint& b);
class ip_endpoint
{
public:
// Constructs to IPv4 end-point 0.0.0.0:0
ip_endpoint () noexcept ;
ip_endpoint (ipv4_endpoint endpoint) noexcept ;
ip_endpoint (ipv6_endpoint endpoint) noexcept ;
bool is_ipv4 () const noexcept ;
bool is_ipv6 () const noexcept ;
const ipv4_endpoint& to_ipv4 () const ;
const ipv6_endpoint& to_ipv6 () const ;
ip_address address () const noexcept ;
std:: uint16_t port () const noexcept ;
std::string to_string () const ;
static std::optional<ip_endpoint> from_string (std::string_view string) noexcept ;
bool operator ==( const ip_endpoint& rhs) const noexcept ;
bool operator !=( const ip_endpoint& rhs) const noexcept ;
// ipv4_endpoint sorts less than ipv6_endpoint
bool operator <( const ip_endpoint& rhs) const noexcept ;
bool operator >( const ip_endpoint& rhs) const noexcept ;
bool operator <=( const ip_endpoint& rhs) const noexcept ;
bool operator >=( const ip_endpoint& rhs) const noexcept ;
};
}sync_wait() La fonction sync_wait() peut être utilisée pour attendre de manière synchrone jusqu'à ce que les fins awaitable spécifiés.
L'attendu spécifié sera co_await ed sur le fil actuel à l'intérieur d'une coroutine nouvellement créée.
L'appel sync_wait() se bloquera jusqu'à la fin de l'opération et renverra le résultat de l'expression co_await ou reménagera l'exception si l'expression co_await s'est terminée avec une exception non perdue.
La fonction sync_wait() est principalement utile pour démarrer une tâche de niveau supérieur à partir de main() et en attendant que la tâche se termine, en pratique, c'est la seule façon de démarrer la task du premier / de haut niveau.
Résumé de l'API:
// <cppcoro/sync_wait.hpp>
namespace cppcoro
{
template < typename AWAITABLE>
auto sync_wait (AWAITABLE&& awaitable)
-> typename awaitable_traits<AWAITABLE&&>::await_result_t;
}Exemples:
void example_task ()
{
auto makeTask = []() -> task<std::string>
{
co_return " foo " ;
};
auto task = makeTask ();
// start the lazy task and wait until it completes
sync_wait (task); // -> "foo"
sync_wait ( makeTask ()); // -> "foo"
}
void example_shared_task ()
{
auto makeTask = []() -> shared_task<std::string>
{
co_return " foo " ;
};
auto task = makeTask ();
// start the shared task and wait until it completes
sync_wait (task) == " foo " ;
sync_wait ( makeTask ()) == " foo " ;
}when_all_ready() La fonction when_all_ready() peut être utilisée pour créer un nouvel attente qui se termine lorsque toutes les entrées attendables sont terminées.
Les tâches d'entrée peuvent être n'importe quel type attendu.
Lorsque l'attente retourné est co_await ed, il co_await chacun des attentes d'entrée à son tour sur le thread en attente dans l'ordre, il est transmis à la fonction when_all_ready() . Si ces tâches pour ne pas terminer de manière synchrone, elles s'exécuteront simultanément.
Une fois que toutes les expressions co_await sur l'entrée Awaitables ont été exécutées pour s'achever, le retour retourné terminera et reprendra la coroutine en attente. La coroutine en attente sera reprise sur le thread de l'entrée attendable qui est la dernière à terminer.
L'attendu retourné est garanti de ne pas lancer d'exception lorsque co_await Ed, même si certains des entrées attendables échouent à une exception non perdue.
Remarque, cependant, que l'appel when_all_ready() peut lancer std::bad_alloc s'il n'a pas pu allouer de mémoire aux cadres Coroutine requis pour attendre chacun des entrées attendables. Il peut également lancer une exception si l'un des objets en attente d'entrée jette de leurs constructeurs de copie / déplace.
Le résultat de co_await Ing The Returned Waitable est un std::tuple ou std::vector d'objets when_all_task<RESULT> . Ces objets vous permettent d'obtenir le résultat (ou l'exception) de chaque entrée attendue séparément en appelant la méthode when_all_task<RESULT>::result() de la tâche de sortie correspondante. Cela permet à l'appelant d'attendre simultanément plusieurs attendables et de se synchroniser sur leur achèvement tout en conservant la possibilité d'inspecter par la suite les résultats de chacune des opérations co_await pour succès / échec.
Cela diffère de when_all() lorsque la défaillance de toute opération co_await individuelle fait échouer l'opération globale à une exception. Cela signifie que vous ne pouvez pas déterminer lequel des opérations co_await des composants a échoué et vous empêche également d'obtenir les résultats des autres opérations co_await .
Résumé de l'API:
// <cppcoro/when_all_ready.hpp>
namespace cppcoro
{
// Concurrently await multiple awaitables.
//
// Returns an awaitable object that, when co_await'ed, will co_await each of the input
// awaitable objects and will resume the awaiting coroutine only when all of the
// component co_await operations complete.
//
// Result of co_await'ing the returned awaitable is a std::tuple of detail::when_all_task<T>,
// one for each input awaitable and where T is the result-type of the co_await expression
// on the corresponding awaitable.
//
// AWAITABLES must be awaitable types and must be movable (if passed as rvalue) or copyable
// (if passed as lvalue). The co_await expression will be executed on an rvalue of the
// copied awaitable.
template < typename ... AWAITABLES>
auto when_all_ready (AWAITABLES&&... awaitables)
-> Awaitable<std::tuple<detail::when_all_task<typename awaitable_traits<AWAITABLES>::await_result_t>...>>;
// Concurrently await each awaitable in a vector of input awaitables.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t >
auto when_all_ready (std::vector<AWAITABLE> awaitables)
-> Awaitable<std::vector<detail::when_all_task<RESULT>>>;
}Exemple d'utilisation:
task<std::string> get_record ( int id);
task<> example1 ()
{
// Run 3 get_record() operations concurrently and wait until they're all ready.
// Returns a std::tuple of tasks that can be unpacked using structured bindings.
auto [task1, task2, task3] = co_await when_all_ready (
get_record ( 123 ),
get_record ( 456 ),
get_record ( 789 ));
// Unpack the result of each task
std::string& record1 = task1. result ();
std::string& record2 = task2. result ();
std::string& record3 = task3. result ();
// Use records....
}
task<> example2 ()
{
// Create the input tasks. They don't start executing yet.
std::vector<task<std::string>> tasks;
for ( int i = 0 ; i < 1000 ; ++i)
{
tasks. emplace_back ( get_record (i));
}
// Execute all tasks concurrently.
std::vector<detail::when_all_task<std::string>> resultTasks =
co_await when_all_ready ( std::move (tasks));
// Unpack and handle each result individually once they're all complete.
for ( int i = 0 ; i < 1000 ; ++i)
{
try
{
std::string& record = tasks[i]. result ();
std::cout << i << " = " << record << std::endl;
}
catch ( const std:: exception & ex)
{
std::cout << i << " : " << ex. what () << std::endl;
}
}
}when_all() La fonction when_all() peut être utilisée pour créer un nouveau attendable que lorsque co_await Ed co_await chacun des entrées est attendu simultanément et renvoie un agrégat de leurs résultats individuels.
Lorsque l'attente retourné est attendu, il co_await chacun des attentes d'entrée sur le thread actuel. Une fois les premiers suspendus attendables, la deuxième tâche sera lancée, etc. Les opérations s'exécutent simultanément jusqu'à ce qu'elles aient tous couru jusqu'à la fin.
Une fois que toutes les opérations co_await des composants sont exécutées jusqu'à la fin, un agrégat des résultats est construit à partir de chaque résultat individuel. Si une exception est lancée par l'une des tâches d'entrée ou si la construction du résultat agrégé lance une exception, l'exception se propagera à partir du co_await des attendables retournés.
Si plusieurs opérations co_await échouent à une exception, l'une des exceptions se propagera à partir de l'expression co_await when_all() les autres exceptions seront ignorées silencieusement. Il n'est pas précisé quelle exception de l'opération sera choisie.
S'il est important de savoir quel composant l'opération co_await a échoué ou de conserver la possibilité d'obtenir des résultats d'autres opérations même si certains d'entre eux échouent, vous devez utiliser when_all_ready() .
Résumé de l'API:
// <cppcoro/when_all.hpp>
namespace cppcoro
{
// Variadic version.
//
// Note that if the result of `co_await awaitable` yields a void-type
// for some awaitables then the corresponding component for that awaitable
// in the tuple will be an empty struct of type detail::void_value.
template < typename ... AWAITABLES>
auto when_all (AWAITABLES&&... awaitables)
-> Awaitable<std::tuple<typename awaitable_traits<AWAITABLES>::await_result_t...>>;
// Overload for vector<Awaitable<void>>.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t ,
std:: enable_if_t <std::is_void_v<RESULT>, int > = 0 >
auto when_all (std::vector<AWAITABLE> awaitables)
-> Awaitable<void>;
// Overload for vector<Awaitable<NonVoid>> that yield a value when awaited.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t ,
std:: enable_if_t <!std::is_void_v<RESULT>, int > = 0 >
auto when_all (std::vector<AWAITABLE> awaitables)
-> Awaitable<std::vector<std::conditional_t<
std::is_lvalue_reference_v<RESULT>,
std::reference_wrapper<std::remove_reference_t<RESULT>>,
std::remove_reference_t<RESULT>>>>;
}Exemples:
task<A> get_a ();
task<B> get_b ();
task<> example1 ()
{
// Run get_a() and get_b() concurrently.
// Task yields a std::tuple<A, B> which can be unpacked using structured bindings.
auto [a, b] = co_await when_all ( get_a (), get_b ());
// use a, b
}
task<std::string> get_record ( int id);
task<> example2 ()
{
std::vector<task<std::string>> tasks;
for ( int i = 0 ; i < 1000 ; ++i)
{
tasks. emplace_back ( get_record (i));
}
// Concurrently execute all get_record() tasks.
// If any of them fail with an exception then the exception will propagate
// out of the co_await expression once they have all completed.
std::vector<std::string> records = co_await when_all ( std::move (tasks));
// Process results
for ( int i = 0 ; i < 1000 ; ++i)
{
std::cout << i << " = " << records[i] << std::endl;
}
}fmap() La fonction fmap() peut être utilisée pour appliquer une fonction appelable à la valeur (s) contenue dans un type de conteneur, renvoyant un nouveau type de conteneur des résultats de l'application de la fonction (s) contenue.
La fonction fmap() peut appliquer une fonction aux valeurs du generator<T> , recursive_generator<T> et async_generator<T> ainsi que toute valeur qui prend en charge le concept Awaitable (par exemple task<T> ).
Chacun de ces types fournit une surcharge pour fmap() qui prend deux arguments; une fonction à appliquer et la valeur du conteneur. Voir la documentation de chaque type pour les surcharges fmap() prises en charge.
Par exemple, la fonction fmap() peut être utilisée pour appliquer une fonction au résultat éventuel d'une task<T> , produisant une nouvelle task<U> qui s'achevera avec la valeur de retour de la fonction.
// Given a function you want to apply that converts
// a value of type A to value of type B.
B a_to_b (A value);
// And a task that yields a value of type A
cppcoro::task<A> get_an_a ();
// We can apply the function to the result of the task using fmap()
// and obtain a new task yielding the result.
cppcoro::task<B> bTask = fmap(a_to_b, get_an_a());
// An alternative syntax is to use the pipe notation.
cppcoro::task<B> bTask = get_an_a() | cppcoro::fmap(a_to_b);Résumé de l'API:
// <cppcoro/fmap.hpp>
namespace cppcoro
{
template < typename FUNC>
struct fmap_transform
{
fmap_transform (FUNC&& func) noexcept (std::is_nothrow_move_constructible_v<FUNC>);
FUNC func;
};
// Type-deducing constructor for fmap_transform object that can be used
// in conjunction with operator|.
template < typename FUNC>
fmap_transform<FUNC> fmap (FUNC&& func);
// operator| overloads for providing pipe-based syntactic sugar for fmap()
// such that the expression:
// <value-expr> | cppcoro::fmap(<func-expr>)
// is equivalent to:
// fmap(<func-expr>, <value-expr>)
template < typename T, typename FUNC>
decltype ( auto ) operator |(T&& value, fmap_transform<FUNC>&& transform);
template < typename T, typename FUNC>
decltype ( auto ) operator |(T&& value, fmap_transform<FUNC>& transform);
template < typename T, typename FUNC>
decltype ( auto ) operator |(T&& value, const fmap_transform<FUNC>& transform);
// Generic overload for all awaitable types.
//
// Returns an awaitable that when co_awaited, co_awaits the specified awaitable
// and applies the specified func to the result of the 'co_await awaitable'
// expression as if by 'std::invoke(func, co_await awaitable)'.
//
// If the type of 'co_await awaitable' expression is 'void' then co_awaiting the
// returned awaitable is equivalent to 'co_await awaitable, func()'.
template <
typename FUNC,
typename AWAITABLE,
std:: enable_if_t <is_awaitable_v<AWAITABLE>, int > = 0 >
auto fmap (FUNC&& func, AWAITABLE&& awaitable)
-> Awaitable<std::invoke_result_t<FUNC, typename awaitable_traits<AWAITABLE>::await_result_t>>;
} La fonction fmap() est conçue pour rechercher la surcharge correcte par la recherche dépendante de l'argument (ADL), il doit donc généralement être appelé sans le préfixe cppcoro:: .
resume_on() La fonction resume_on() peut être utilisée pour contrôler le contexte d'exécution selon lequel un attendable reprendra la coroutine en attente lors de l'attente. Lorsqu'il est appliqué à un async_generator , il contrôle le contexte d'exécution, le contexte co_await g.begin() et co_await ++it opérations informatiques reprennent les coroutines en attente.
Normalement, la coroutine en attente d'un attendable (par exemple une task ) ou async_generator reprendra l'exécution sur le thread de l'opération terminée. Dans certains cas, ce n'est peut-être pas le fil sur lequel vous souhaitez continuer à exécuter. Dans ces cas, vous pouvez utiliser la fonction resume_on() pour créer un nouveau attendable ou un générateur qui reprendra l'exécution sur un thread associé à un planificateur spécifié.
La fonction resume_on() peut être utilisée soit comme une fonction normale, renvoyant un nouveau attendable / générateur. Ou il peut être utilisé dans un pipeline-syntax.
Exemple:
task<record> load_record ( int id);
ui_thread_scheduler uiThreadScheduler;
task<> example ()
{
// This will start load_record() on the current thread.
// Then when load_record() completes (probably on an I/O thread)
// it will reschedule execution onto thread pool and call to_json
// Once to_json completes it will transfer execution onto the
// ui thread before resuming this coroutine and returning the json text.
task<std::string> jsonTask =
load_record ( 123 )
| cppcoro::resume_on ( threadpool::default ())
| cppcoro::fmap (to_json)
| cppcoro::resume_on (uiThreadScheduler);
// At this point, all we've done is create a pipeline of tasks.
// The tasks haven't started executing yet.
// Await the result. Starts the pipeline of tasks.
std::string jsonText = co_await jsonTask;
// Guaranteed to be executing on ui thread here.
someUiControl. set_text (jsonText);
}Résumé de l'API:
// <cppcoro/resume_on.hpp>
namespace cppcoro
{
template < typename SCHEDULER, typename AWAITABLE>
auto resume_on (SCHEDULER& scheduler, AWAITABLE awaitable)
-> Awaitable<typename awaitable_traits<AWAITABLE>::await_traits_t>;
template < typename SCHEDULER, typename T>
async_generator<T> resume_on (SCHEDULER& scheduler, async_generator<T> source);
template < typename SCHEDULER>
struct resume_on_transform
{
explicit resume_on_transform (SCHEDULER& scheduler) noexcept ;
SCHEDULER& scheduler;
};
// Construct a transform/operation that can be applied to a source object
// using "pipe" notation (ie. operator|).
template < typename SCHEDULER>
resume_on_transform<SCHEDULER> resume_on (SCHEDULER& scheduler) noexcept ;
// Equivalent to 'resume_on(transform.scheduler, std::forward<T>(value))'
template < typename T, typename SCHEDULER>
decltype ( auto ) operator |(T&& value, resume_on_transform<SCHEDULER> transform)
{
return resume_on (transform. scheduler , std::forward<T>(value));
}
}schedule_on() La fonction schedule_on() peut être utilisée pour modifier le contexte d'exécution sur lequel un peu attendable ou async_generator commence à exécuter.
Lorsqu'il est appliqué à un async_generator il affecte également le contexte d'exécution sur lequel il reprend après l'instruction co_yield .
Notez que la transformée schedule_on ne spécifie pas le thread sur lequel les résultats attendables ou async_generator compléteront ou donnent des résultats, ce qui dépend de l'implémentation de l'attente ou du générateur.
Voir l'opérateur resume_on() pour une transformation qui contrôle le thread sur lequel l'opération complète.
Par exemple:
task< int > get_value ();
io_service ioSvc;
task<> example ()
{
// Starts executing get_value() on the current thread.
int a = co_await get_value ();
// Starts executing get_value() on a thread associated with ioSvc.
int b = co_await schedule_on (ioSvc, get_value ());
}Résumé de l'API:
// <cppcoro/schedule_on.hpp>
namespace cppcoro
{
// Return a task that yields the same result as 't' but that
// ensures that 't' is co_await'ed on a thread associated with
// the specified scheduler. Resulting task will complete on
// whatever thread 't' would normally complete on.
template < typename SCHEDULER, typename AWAITABLE>
auto schedule_on (SCHEDULER& scheduler, AWAITABLE awaitable)
-> Awaitable<typename awaitable_traits<AWAITABLE>::await_result_t>;
// Return a generator that yields the same sequence of results as
// 'source' but that ensures that execution of the coroutine starts
// execution on a thread associated with 'scheduler' and resumes
// after a 'co_yield' on a thread associated with 'scheduler'.
template < typename SCHEDULER, typename T>
async_generator<T> schedule_on (SCHEDULER& scheduler, async_generator<T> source);
template < typename SCHEDULER>
struct schedule_on_transform
{
explicit schedule_on_transform (SCHEDULER& scheduler) noexcept ;
SCHEDULER& scheduler;
};
template < typename SCHEDULER>
schedule_on_transform<SCHEDULER> schedule_on (SCHEDULER& scheduler) noexcept ;
template < typename T, typename SCHEDULER>
decltype ( auto ) operator |(T&& value, schedule_on_transform<SCHEDULER> transform);
}awaitable_traits<T> Cette métafonction de modèle peut être utilisée pour déterminer le type résultant d'une expression co_await s'il est appliqué à une expression de type T .
Notez que cela suppose que la valeur du type T est attendue dans un contexte où il n'est pas affecté par aucun await_transform appliqué par l'objet de promesse de Coroutine. Les résultats peuvent différer si une valeur de type T est attendue dans un tel contexte.
La métafonction awaitable_traits<T> ne définit pas les types de types NESSETS awaiter_t ou await_result_t si type, T , n'est pas attendu. Cela permet son utilisation dans des contextes SFINAE qui désactive les surcharges lorsque T n'est pas attendu.
Résumé de l'API:
// <cppcoro/awaitable_traits.hpp>
namespace cppcoro
{
template < typename T>
struct awaitable_traits
{
// The type that results from applying `operator co_await()` to a value
// of type T, if T supports an `operator co_await()`, otherwise is type `T&&`.
typename awaiter_t = <unspecified>;
// The type of the result of co_await'ing a value of type T.
typename await_result_t = <unspecified>;
};
}is_awaitable<T> La métafonction is_awaitable<T> vous permet de demander si un type donné peut être co_await ed ou non à partir d'une coroutine.
Résumé de l'API:
// <cppcoro/is_awaitable.hpp>
namespace cppcoro
{
template < typename T>
struct is_awaitable : std::bool_constant<...>
{};
template < typename T>
constexpr bool is_awaitable_v = is_awaitable<T>::value;
}Awaitable<T> Un Awaitable<T> est un concept qui indique qu'un type peut être co_await ed dans un contexte coroutine qui n'a pas de surcharges await_transform et que le résultat de l'expression co_await a le type, T .
Par exemple, la task<T> implémente le concept Awaitable<T&&> alors que la task<T>& implémente le concept Awaitable<T&> .
Awaiter<T> Concept Un Awaiter<T> est un concept qui indique qu'un type contient les méthodes await_ready , await_suspend et await_resume requises pour implémenter le protocole pour suspendre / reprendre une coroutine en attente.
Un type qui satisfait Awaiter<T> doit avoir, pour une instance du type, awaiter :
awaiter.await_ready() -> boolawaiter.await_suspend(std::experimental::coroutine_handle<void>{}) -> void ou bool ou std::experimental::coroutine_handle<P> pour certains P .awaiter.await_resume() -> T Tout type qui met en œuvre le concept Awaiter<T> implémente également le concept Awaitable<T> .
Scheduler Un Scheduler est un concept qui permet de planifier l'exécution de coroutines dans un contexte d'exécution.
concept Scheduler
{
Awaitable< void > schedule ();
} Étant donné un type, S , qui implémente le concept Scheduler , et une instance, s , de type S :
s.schedule() renvoie un type attendable tel que co_await s.schedule() suspendra inconditionnellement la coroutine actuelle et la planifiera pour reprise sur le contexte d'exécution associé au planificateur, s .co_await s.schedule() a du type void . cppcoro::task<> f (Scheduler& scheduler)
{
// Execution of the coroutine is initially on the caller's execution context.
// Suspends execution of the coroutine and schedules it for resumption on
// the scheduler's execution context.
co_await scheduler. schedule ();
// At this point the coroutine is now executing on the scheduler's
// execution context.
}DelayedScheduler Un DelayedScheduler est un concept qui permet à une coroutine de se planifier l'exécution sur le contexte d'exécution du planificateur après que une durée spécifiée s'est écoulée.
concept DelayedScheduler : Scheduler
{
template < typename REP, typename RATIO>
Awaitable< void > schedule_after (std::chrono::duration<REP, RATIO> delay);
template < typename REP, typename RATIO>
Awaitable< void > schedule_after (
std::chrono::duration<REP, RATIO> delay,
cppcoro::cancellation_token cancellationToken);
} Étant donné un type, S , qui implémente le DelayedScheduler et une instance, s de type S :
s.schedule_after(delay) renvoie un objet qui peut être attendu de telle sorte que co_await s.schedule_after(delay) suspend la coroutine actuelle pour une durée de delay avant de planifier la coroutine pour la reprise du contexte d'exécution associé au planificateur, s .co_await s.schedule_after(delay) a un type void .La bibliothèque CPPCoro prend en charge la construction sous Windows avec Visual Studio 2017 et Linux avec Clang 5.0+.
Cette bibliothèque utilise le système de construction de gâteaux (non, pas le C # One).
Le système de construction de gâteaux est vérifié automatiquement sous forme de sous-module GIT afin que vous n'ayez pas besoin de le télécharger ou de l'installer séparément.
Cette bibliothèque nécessite actuellement Visual Studio 2017 ou version ultérieure et le SDK Windows 10.
La prise en charge de Clang (# 3) et Linux (# 15) est prévue.
Le système de construction de gâteaux est implémenté en Python et nécessite l'installation de Python 2.7.
Assurez-vous que l'interprète Python 2.7 est sur votre chemin et disponible en tant que «Python».
Assurez-vous que Visual Studio 2017 Update 3 ou version ultérieure est installé. Notez qu'il y a des problèmes connus avec les coroutines dans la mise à jour 2 ou antérieurs qui ont été résolus dans la mise à jour 3.
Vous pouvez également utiliser une version expérimentale du compilateur Visual Studio en téléchargeant un package NuGet à partir de https://vcppdogfooding.azurewebsites.net/ et dézipper le fichier .nuget dans un répertoire. Mettez simplement à jour le fichier config.cake pour pointer à l'emplacement dézippé en modifiant et en décrochant la ligne suivante:
nugetPath = None # r'C:PathToVisualCppTools.14.0.25224-Pre'Assurez-vous que le SDK Windows 10 a installé. Il utilisera le dernier SDK Windows 10 et Universal C Runtime Version par défaut.
Le référentiel CPPCORO utilise des sous-modules Git pour tirer la source du système de construction de gâteaux.
Cela signifie que vous devez passer l'indicateur --recursive à la commande git clone . par exemple, par exemple
c:Code> git clone --recursive https://github.com/lewissbaker/cppcoro.git
Si vous avez déjà cloné CPPCoro, vous devez mettre à jour les sous-modules après avoir tiré des modifications.
c:Codecppcoro> git submodule update --init --recursive
Pour construire à partir de la ligne de commande, exécutez «Cake.bat» dans la racine de l'espace de travail.
par exemple, par exemple
C:cppcoro> cake.bat
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Compiling testmain.cpp
Compiling testmain.cpp
Compiling testmain.cpp
Compiling testmain.cpp
...
Linking buildwindows_x86_msvc14.10_debugtestrun.exe
Linking buildwindows_x64_msvc14.10_optimisedtestrun.exe
Linking buildwindows_x86_msvc14.10_optimisedtestrun.exe
Linking buildwindows_x64_msvc14.10_debugtestrun.exe
Generating code
Finished generating code
Generating code
Finished generating code
Build succeeded.
Build took 0:00:02.419.
Par défaut, l'exécution cake sans arguments créera tous les projets avec toutes les variantes de construction et exécutera les tests unitaires. Vous pouvez restreindre ce qui est construit en faisant passer des arguments de ligne de commande supplémentaires. par exemple, par exemple
c:cppcoro> cake.bat release=debug architecture=x64 lib/build.cake
Building with C:UsersLewisCodecppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Archiving buildwindows_x64_msvc14.10_debuglibcppcoro.lib
Build succeeded.
Build took 0:00:00.321.
Vous pouvez exécuter cake --help pour énumérer les options de ligne de commande disponibles.
Pour développer à partir de Visual Studio, vous pouvez créer des fichiers .vcproj / .sln en exécutant cake.bat -p .
par exemple, par exemple
c:cppcoro> cake.bat -p
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Generating Solution build/project/cppcoro.sln
Generating Project build/project/cppcoro_tests.vcxproj
Generating Filters build/project/cppcoro_tests.vcxproj.filters
Generating Project build/project/cppcoro.vcxproj
Generating Filters build/project/cppcoro.vcxproj.filters
Build succeeded.
Build took 0:00:00.247.
Lorsque vous construisez ces projets à partir de Visual Studio, il appellera le gâteau pour effectuer la compilation.
Le projet CPPCORO peut également être construit sous Linux à l'aide de Clang + LIBC ++ 5.0 ou ultérieure.
Le bâtiment CPPCORO a été testé sous Ubuntu 17.04.
Assurez-vous que les packages suivants sont installés:
Cela suppose que Clang et Libc ++ sont construits et installés.
Si vous n'avez pas encore de configuration de Clang, consultez les sections suivantes pour plus de détails sur la configuration de Clang pour la construction avec CPPCORO.
Découvrez Cppcoro et ses sous-modules:
git clone --recursive https://github.com/lewissbaker/cppcoro.git cppcoro
Exécutez init.sh pour configurer la fonction cake Bash:
cd cppcoro
source init.sh
Ensuite, vous pouvez exécuter cake à partir de la racine de l'espace de travail pour construire Cppcoro et exécuter des tests:
$ cake
Vous pouvez spécifier des arguments de ligne de commande supplémentaires pour personnaliser la version:
--help imprimera l'aide pour les arguments en ligne de commande--debug=run affichera les lignes de commande de construction en cours d'exécutionrelease=debug ou release=optimised limitera la variante de build à déboguer ou optimisé (par défaut, il construire les deux).lib/build.cake ne fera que construire la bibliothèque CPPCORO et non les tests.test/build.cake@task_tests.cpp ne fera que compiler un fichier source particuliertest/build.cake@testresult construire et exécuter les testsPar exemple:
$ cake --debug=run release=debug lib/build.cake
Si votre compilateur Clang n'est pas situé sur /usr/bin/clang , vous pouvez spécifier un autre emplacement en utilisant une ou plusieurs des options de ligne de commande suivantes pour cake :
--clang-executable=<name> - Spécifiez le nom exécutable Clang à utiliser au lieu de clang . par exemple, par exemple Pour forcer l'utilisation de Clang 8.0 Pass --clang-executable=clang-8--clang-executable=<abspath> - Spécifiez le chemin complet vers Clang Exécutable. Le système de construction recherchera également d'autres exécutables dans le même répertoire. Si ce chemin a le formulaire <prefix>/bin/<name> , cela définira également le clang-stall-prefix par défaut sur <prefix> .--clang-install-prefix=<path> - Spécifiez le chemin où Clang a été installé. Cela entraînera la recherche du système de construction sous <path>/bin (sauf dépassement de --clang-executable ).--libcxx-install-prefix=<path> - Spécifiez le chemin où LIBC ++ a été installé. Par défaut, le système de construction recherchera Libc ++ au même endroit que Clang. Utilisez cette option de ligne de commande si elle est installée à un emplacement différent.Exemple: utilisez une version spécifique de Clang installé dans l'emplacement par défaut
$ cake --clang-executable=clang-8
Exemple: utilisez la version par défaut de Clang à partir d'un emplacement personnalisé
$ cake --clang-install-prefix=/path/to/clang-install
Exemple: utilisez une version spécifique de Clang, dans un emplacement personnalisé, avec libc ++ à partir d'un emplacement différent
$ cake --clang-executable=/path/to/clang-install/bin/clang-8 --libcxx-install-prefix=/path/to/libcxx-install
If your Linux distribution does not have a version of Clang 5.0 or later available, you can install a snapshot build from the LLVM project.
Follow instructions at http://apt.llvm.org/ to setup your package manager to support pulling from the LLVM package manager.
For example, for Ubuntu 17.04 Zesty:
Edit /etc/apt/sources.list and add the following lines:
deb http://apt.llvm.org/zesty/ llvm-toolchain-zesty main
deb-src http://apt.llvm.org/zesty/ llvm-toolchain-zesty main
Install the PGP key for those packages:
$ wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | sudo apt-key add -
Install Clang and LLD:
$ sudo apt-get install clang-6.0 lld-6.0
The LLVM snapshot builds do not include libc++ versions so you'll need to build that yourself. Voir ci-dessous.
You can also use the bleeding-edge Clang version by building Clang from source yourself.
See instructions here:
To do this you will need to install the following pre-requisites:
$ sudo apt-get install git cmake ninja-build clang lld
Note that we are using your distribution's version of clang to build clang from source. GCC could also be used here instead.
Checkout LLVM + Clang + LLD + libc++ repositories:
mkdir llvm
cd llvm
git clone --depth=1 https://github.com/llvm-mirror/llvm.git llvm
git clone --depth=1 https://github.com/llvm-mirror/clang.git llvm/tools/clang
git clone --depth=1 https://github.com/llvm-mirror/lld.git llvm/tools/lld
git clone --depth=1 https://github.com/llvm-mirror/libcxx.git llvm/projects/libcxx
ln -s llvm/tools/clang clang
ln -s llvm/tools/lld lld
ln -s llvm/projects/libcxx libcxx
Configure and build Clang:
mkdir clang-build
cd clang-build
cmake -GNinja
-DCMAKE_CXX_COMPILER=/usr/bin/clang++
-DCMAKE_C_COMPILER=/usr/bin/clang
-DCMAKE_BUILD_TYPE=MinSizeRel
-DCMAKE_INSTALL_PREFIX="/path/to/clang/install"
-DCMAKE_BUILD_WITH_INSTALL_RPATH="yes"
-DLLVM_TARGETS_TO_BUILD=X86
-DLLVM_ENABLE_PROJECTS="lld;clang"
../llvm
ninja install-clang
install-clang-headers
install-llvm-ar
install-lld
The cppcoro project requires libc++ as it contains the <experimental/coroutine> header required to use C++ coroutines under Clang.
Checkout libc++ + llvm :
mkdir llvm
cd llvm
git clone --depth=1 https://github.com/llvm-mirror/llvm.git llvm
git clone --depth=1 https://github.com/llvm-mirror/libcxx.git llvm/projects/libcxx
ln -s llvm/projects/libcxx libcxx
Build libc++ :
mkdir libcxx-build
cd libcxx-build
cmake -GNinja
-DCMAKE_CXX_COMPILER="/path/to/clang/install/bin/clang++"
-DCMAKE_C_COMPILER="/path/to/clang/install/bin/clang"
-DCMAKE_BUILD_TYPE=Release
-DCMAKE_INSTALL_PREFIX="/path/to/clang/install"
-DLLVM_PATH="../llvm"
-DLIBCXX_CXX_ABI=libstdc++
-DLIBCXX_CXX_ABI_INCLUDE_PATHS="/usr/include/c++/6.3.0/;/usr/include/x86_64-linux-gnu/c++/6.3.0/"
../libcxx
ninja cxx
ninja install
This will build and install libc++ into the same install directory where you have clang installed.
The cppcoro port in vcpkg is kept up to date by Microsoft team members and community contributors. The url of vcpkg is: https://github.com/Microsoft/vcpkg . You can download and install cppcoro using the vcpkg dependency manager:
git clone https://github.com/Microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh # ./bootstrap-vcpkg.bat for Windows
./vcpkg integrate install
./vcpkg install cppcoroSi la version est obsolète, veuillez créer une demande de problème ou d'extraction sur le référentiel VCPKG.
GitHub issues are the primary mechanism for support, bug reports and feature requests.
Contributions are welcome and pull-requests will be happily reviewed. I only ask that you agree to license any contributions that you make under the MIT license.
If you have general questions about C++ coroutines, you can generally find someone to help in the #coroutines channel on Cpplang Slack group.