Библиотека «Cppcoro» предоставляет большой набор примитивов общего назначения для использования предложения Coroutines TS, описанного в N4680.
К ним относятся:
task<T>shared_task<T>generator<T>recursive_generator<T>async_generator<T>single_consumer_eventsingle_consumer_async_auto_reset_eventasync_mutexasync_manual_reset_eventasync_auto_reset_eventasync_latchsequence_barriermulti_producer_sequencersingle_producer_sequencersync_wait()when_all()when_all_ready()fmap()schedule_on()resume_on()cancellation_tokencancellation_sourcecancellation_registrationstatic_thread_poolio_service и io_work_scopefile , readable_file , writable_fileread_only_file , write_only_file , read_write_filesocketip_address , ipv4_address , ipv6_addressip_endpoint , ipv4_endpoint , ipv6_endpointis_awaitable<T>awaitable_traits<T>Awaitable<T>Awaiter<T>SchedulerDelayedSchedulerЭта библиотека представляет собой экспериментальную библиотеку, которая изучает пространство высокопроизводительных, масштабируемых асинхронных абстракций программирования, которые могут быть построены поверх предложения C ++ Coroutines.
Он был открыт в надежде, что другие найдут его полезным и что сообщество C ++ может дать отзывы об этом и способы ее улучшения.
Требуется компилятор, который поддерживает Coroutines TS:
Версия Linux функциональна, за исключением классов io_context и File I/O, которые еще не были реализованы для Linux (см. Выпуск № 15 для получения дополнительной информации).
task<T>Задача представляет собой асинхронное вычисление, которое лениво выполняется в том, что выполнение коратики не начинается до тех пор, пока задача не будет ожидается.
Пример:
# include < cppcoro/read_only_file.hpp >
# include < cppcoro/task.hpp >
cppcoro::task< int > count_lines (std::string path)
{
auto file = co_await cppcoro::read_only_file::open (path);
int lineCount = 0 ;
char buffer[ 1024 ];
size_t bytesRead;
std:: uint64_t offset = 0 ;
do
{
bytesRead = co_await file. read (offset, buffer, sizeof (buffer));
lineCount += std::count (buffer, buffer + bytesRead, ' n ' );
offset += bytesRead;
} while (bytesRead > 0 );
co_return lineCount;
}
cppcoro::task<> usage_example ()
{
// Calling function creates a new task but doesn't start
// executing the coroutine yet.
cppcoro::task< int > countTask = count_lines ( " foo.txt " );
// ...
// Coroutine is only started when we later co_await the task.
int lineCount = co_await countTask;
std::cout << " line count = " << lineCount << std::endl;
}Обзор API:
// <cppcoro/task.hpp>
namespace cppcoro
{
template < typename T>
class task
{
public:
using promise_type = <unspecified>;
using value_type = T;
task () noexcept ;
task (task&& other) noexcept ;
task& operator =(task&& other);
// task is a move-only type.
task ( const task& other) = delete ;
task& operator =( const task& other) = delete ;
// Query if the task result is ready.
bool is_ready () const noexcept ;
// Wait for the task to complete and return the result or rethrow the
// exception if the operation completed with an unhandled exception.
//
// If the task is not yet ready then the awaiting coroutine will be
// suspended until the task completes. If the the task is_ready() then
// this operation will return the result synchronously without suspending.
Awaiter<T&> operator co_await () const & noexcept ;
Awaiter<T&&> operator co_await () const && noexcept ;
// Returns an awaitable that can be co_await'ed to suspend the current
// coroutine until the task completes.
//
// The 'co_await t.when_ready()' expression differs from 'co_await t' in
// that when_ready() only performs synchronization, it does not return
// the result or rethrow the exception.
//
// This can be useful if you want to synchronize with the task without
// the possibility of it throwing an exception.
Awaitable< void > when_ready () const noexcept ;
};
template < typename T>
void swap (task<T>& a, task<T>& b);
// Creates a task that yields the result of co_await'ing the specified awaitable.
//
// This can be used as a form of type-erasure of the concrete awaitable, allowing
// different awaitables that return the same await-result type to be stored in
// the same task<RESULT> type.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t >
task<RESULT> make_task (AWAITABLE awaitable);
} Вы можете создать объект task<T> , вызывая функцию CORUTINE, которая возвращает task<T> .
Коратика должна содержать использование co_await или co_return . Обратите внимание, что task<T> Coroutine не может использовать ключевое слово co_yield .
Когда вызывается коратика, которая возвращает task<T> , при необходимости выделяется кадр -коратика, а параметры фиксируются в кадре корауки. Коратика приостановлена в начале корпуса коратики, и выполнение возвращается вызывающему абоненту, а значение task<T> , представляющее асинхронное вычисление, возвращается из вызова функции.
Тело Coroutine начнет выполнять, когда значение task<T> co_await Ed. Это приостановит ожидающуюся корутину и запустить выполнение коратики, связанной с ожидаемой task<T> .
В ожидании в ожидании Coroutine будет возобновлена возобновляемость в потоке, который завершает выполнение ожидаемой task<T> Coroutine. т.е. Поток, который выполняет co_return или который бросает нездоровое исключение, которое завершает выполнение коратики.
Если задача уже выполнилась до завершения, ожидание ее снова получит уже скомпионатный результат без приостановки ожидающей коратики.
Если объект task разрушен до того, как он будет ожидается, то Coroutine никогда не выполняется, и деструктор просто разрушает захваченные параметры и освобождает любую память, используемую в коратике.
shared_task<T> Класс shared_task<T> - это тип Coroutine, который дает одно значение асинхронно.
Это «ленивое» в этом выполнении задачи не начинается, пока не будет ожидается какая -то коратика.
Он «общий» в том смысле, что значение задачи может быть скопировано, что позволяет несколько ссылок на результат создания задачи. Это также позволяет нескольким коратикам одновременно ждать результата.
Задача начнет выполнять в потоке, который первым co_await задачу. Последующие аспекты будут либо приостановлены, и будут в очереди на возобновление, когда задача выполнится, либо будет продолжаться синхронно, если задача уже выполнилась до завершения.
Если awaiter приостановлен в ожидании выполнения задачи, то он будет возобновлен в потоке, который завершает выполнение задачи. т.е. Поток, который выполняет co_return или который бросает невозможное исключение, которое завершает выполнение коратики.
АПИ резюме
namespace cppcoro
{
template < typename T = void >
class shared_task
{
public:
using promise_type = <unspecified>;
using value_type = T;
shared_task () noexcept ;
shared_task ( const shared_task& other) noexcept ;
shared_task (shared_task&& other) noexcept ;
shared_task& operator =( const shared_task& other) noexcept ;
shared_task& operator =(shared_task&& other) noexcept ;
void swap (shared_task& other) noexcept ;
// Query if the task has completed and the result is ready.
bool is_ready () const noexcept ;
// Returns an operation that when awaited will suspend the
// current coroutine until the task completes and the result
// is available.
//
// The type of the result of the 'co_await someTask' expression
// is an l-value reference to the task's result value (unless T
// is void in which case the expression has type 'void').
// If the task completed with an unhandled exception then the
// exception will be rethrown by the co_await expression.
Awaiter<T&> operator co_await () const noexcept ;
// Returns an operation that when awaited will suspend the
// calling coroutine until the task completes and the result
// is available.
//
// The result is not returned from the co_await expression.
// This can be used to synchronize with the task without the
// possibility of the co_await expression throwing an exception.
Awaiter< void > when_ready () const noexcept ;
};
template < typename T>
bool operator ==( const shared_task<T>& a, const shared_task<T>& b) noexcept ;
template < typename T>
bool operator !=( const shared_task<T>& a, const shared_task<T>& b) noexcept ;
template < typename T>
void swap (shared_task<T>& a, shared_task<T>& b) noexcept ;
// Wrap an awaitable value in a shared_task to allow multiple coroutines
// to concurrently await the result.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t >
shared_task<RESULT> make_shared_task (AWAITABLE awaitable);
} Все константы на shared_task<T> безопасны для вызова одновременно с другими константами в одном и том же экземпляре из нескольких потоков. Небезопасно называть неконтролируемые методы shared_task<T> одновременно с любым другим методом в том же экземпляре shared_task<T> .
task<T> Класс shared_task<T> аналогичен task<T> в том, что задача не запускает выполнение сразу после вызванной функции Coroutine. Задача начинает выполнять только тогда, когда она впервые ожидается.
Он отличается от task<T> тем, что полученный объект задачи может быть скопирован, что позволяет нескольким объектам задачи ссылаться на один и тот же асинхронный результат. Он также поддерживает несколько коратиков одновременно в ожидании результата задачи.
Компромисс заключается в том, что результатом всегда является ссылка на значение L, никогда не является ссылкой на R-значение (поскольку результат может быть общий), которая может ограничивать способность перемещать результат в локальную переменную. Он также имеет немного более высокие затраты на выполнение из-за необходимости поддерживать количество справочных материалов и поддержать несколько avauters.
generator<T> generator представляет собой тип Coroutine, который дает последовательность значений типа T , где значения создаются лениво и синхронно.
Тело Coroutine может дать значения типа T используя ключевое слово co_yield . Обратите внимание, однако, что корпус Coroutine не может использовать ключевое слово co_await ; Значения должны быть созданы синхронно.
Например:
cppcoro::generator< const std:: uint64_t > fibonacci ()
{
std:: uint64_t a = 0 , b = 1 ;
while ( true )
{
co_yield b;
auto tmp = a;
a = b;
b += tmp;
}
}
void usage ()
{
for ( auto i : fibonacci ())
{
if (i > 1'000'000 ) break ;
std::cout << i << std::endl;
}
} Когда функция COROUTINE, возвращающая generator<T> , называется COROUTINE, создается первоначально. Выполнение Coroutine поступает в корпус Coroutine, когда метод generator<T>::begin() вызывается и продолжается до тех пор, пока не будет достигнут первый оператор co_yield , либо коратика не выполнится до завершения.
Если возвращенный итератор не равен итератору end() то Derefercring, итератор вернет ссылку на значение, передаваемое оператору co_yield .
operator++() на итераторе возобновит выполнение COROUTINE и будет продолжаться до тех пор, пока не будет достигнута следующая точка co_yield , либо COROUTINE выполняется до завершения ().
Любые нездоровые исключения, брошенные Coroutine, будут распространяться из begin() или operator++() вызовов для вызывающего абонента.
Резюме API:
namespace cppcoro
{
template < typename T>
class generator
{
public:
using promise_type = <unspecified>;
class iterator
{
public:
using iterator_category = std::input_iterator_tag;
using value_type = std:: remove_reference_t <T>;
using reference = value_type&;
using pointer = value_type*;
using difference_type = std:: size_t ;
iterator ( const iterator& other) noexcept ;
iterator& operator =( const iterator& other) noexcept ;
// If the generator coroutine throws an unhandled exception before producing
// the next element then the exception will propagate out of this call.
iterator& operator ++();
reference operator *() const noexcept ;
pointer operator ->() const noexcept ;
bool operator ==( const iterator& other) const noexcept ;
bool operator !=( const iterator& other) const noexcept ;
};
// Constructs to the empty sequence.
generator () noexcept ;
generator (generator&& other) noexcept ;
generator& operator =(generator&& other) noexcept ;
generator ( const generator& other) = delete ;
generator& operator =( const generator&) = delete ;
~generator ();
// Starts executing the generator coroutine which runs until either a value is yielded
// or the coroutine runs to completion or an unhandled exception propagates out of the
// the coroutine.
iterator begin ();
iterator end () noexcept ;
// Swap the contents of two generators.
void swap (generator& other) noexcept ;
};
template < typename T>
void swap (generator<T>& a, generator<T>& b) noexcept ;
// Apply function, func, lazily to each element of the source generator
// and yield a sequence of the results of calls to func().
template < typename FUNC, typename T>
generator<std:: invoke_result_t <FUNC, T&>> fmap (FUNC func, generator<T> source);
}recursive_generator<T> recursive_generator аналогичен generator , за исключением того, что он предназначен для более эффективной поддержки, давая элементы вложенной последовательности в качестве элементов внешней последовательности.
В дополнение к тому, что вы можете co_yield значение типа T вы также можете co_yield значение типа recursive_generator<T> .
Когда вы co_yield значение recursive_generator<T> все элементы выходного генератора даются в качестве элементов текущего генератора. Нынешняя коратика приостановлен до тех пор, пока потребитель не закончит потребление всех элементов вложенного генератора, после чего выполнение точки текущей коратики возобновит выполнение для создания следующего элемента.
Преимущество recursive_generator<T> над generator<T> для итерации над рекурсивными структурами данных заключается в том, что iterator::operator++() может непосредственно возобновить кораутину листья для получения следующего элемента, а не возобновить/подвешивать C (глубину) Coroutines для каждого элемента. Нижняя сторона заключается в том, что есть дополнительные накладные расходы
Например:
// Lists the immediate contents of a directory.
cppcoro::generator<dir_entry> list_directory (std::filesystem::path path);
cppcoro::recursive_generator<dir_entry> list_directory_recursive (std::filesystem::path path)
{
for ( auto & entry : list_directory (path))
{
co_yield entry;
if (entry. is_directory ())
{
co_yield list_directory_recursive (entry. path ());
}
}
} Обратите внимание, что применение оператора fmap() к recursive_generator<T> даст generator<U> тип, а не recursive_generator<U> . Это связано с тем, что использование fmap , как правило, не используется в рекурсивных контекстах, и мы стараемся избегать дополнительных накладных расходов, понесенных recursive_generator .
async_generator<T> async_generator представляет собой тип коратики, который дает последовательность значений типа, T , где значения создаются лениво, а значения могут быть созданы асинхронно.
Тело Coroutine может использовать выражения как co_await , так и co_yield .
Потребители генератора могут использовать for co_await -диапазона для использования значений.
Пример
cppcoro::async_generator< int > ticker ( int count, threadpool& tp)
{
for ( int i = 0 ; i < count; ++i)
{
co_await tp. delay ( std::chrono::seconds ( 1 ));
co_yield i;
}
}
cppcoro::task<> consumer (threadpool& tp)
{
auto sequence = ticker ( 10 , tp);
for co_await (std:: uint32_t i : sequence)
{
std::cout << " Tick " << i << std::endl;
}
}АПИ резюме
// <cppcoro/async_generator.hpp>
namespace cppcoro
{
template < typename T>
class async_generator
{
public:
class iterator
{
public:
using iterator_tag = std::forward_iterator_tag;
using difference_type = std:: size_t ;
using value_type = std:: remove_reference_t <T>;
using reference = value_type&;
using pointer = value_type*;
iterator ( const iterator& other) noexcept ;
iterator& operator =( const iterator& other) noexcept ;
// Resumes the generator coroutine if suspended
// Returns an operation object that must be awaited to wait
// for the increment operation to complete.
// If the coroutine runs to completion then the iterator
// will subsequently become equal to the end() iterator.
// If the coroutine completes with an unhandled exception then
// that exception will be rethrown from the co_await expression.
Awaitable<iterator&> operator ++() noexcept ;
// Dereference the iterator.
pointer operator ->() const noexcept ;
reference operator *() const noexcept ;
bool operator ==( const iterator& other) const noexcept ;
bool operator !=( const iterator& other) const noexcept ;
};
// Construct to the empty sequence.
async_generator () noexcept ;
async_generator ( const async_generator&) = delete ;
async_generator (async_generator&& other) noexcept ;
~async_generator ();
async_generator& operator =( const async_generator&) = delete ;
async_generator& operator =(async_generator&& other) noexcept ;
void swap (async_generator& other) noexcept ;
// Starts execution of the coroutine and returns an operation object
// that must be awaited to wait for the first value to become available.
// The result of co_await'ing the returned object is an iterator that
// can be used to advance to subsequent elements of the sequence.
//
// This method is not valid to be called once the coroutine has
// run to completion.
Awaitable<iterator> begin () noexcept ;
iterator end () noexcept ;
};
template < typename T>
void swap (async_generator<T>& a, async_generator<T>& b);
// Apply 'func' to each element of the source generator, yielding a sequence of
// the results of calling 'func' on the source elements.
template < typename FUNC, typename T>
async_generator<std:: invoke_result_t <FUNC, T&>> fmap (FUNC func, async_generator<T> source);
} Когда объект async_generator разрушается, он запрашивает отмену базовой коратики. Если Coroutine уже заканчивается до завершения или в настоящее время приостановлен в выражении co_yield , то коратика уничтожается немедленно. В противном случае, Coroutine будет продолжать выполнять, пока она не выполнится до завершения, либо не достигнет следующего выражения co_yield .
Когда рамка коратика разрушена, деструкторы всех переменных в области масштаба в этот момент будут выполнены, чтобы обеспечить очистку ресурсов генератора.
Обратите внимание, что вызывающий абонент должен убедиться, что объект async_generator не должен быть уничтожен, в то время как потребительская коратика выполняет выражение co_await ожидающее получения следующего элемента.
single_consumer_eventЭто простой тип события вручную, который поддерживает только одну корутину, ожидающую его за раз. Это можно использовать для
Резюме API:
// <cppcoro/single_consumer_event.hpp>
namespace cppcoro
{
class single_consumer_event
{
public:
single_consumer_event ( bool initiallySet = false ) noexcept ;
bool is_set () const noexcept ;
void set ();
void reset () noexcept ;
Awaiter< void > operator co_await () const noexcept ;
};
}Пример:
# include < cppcoro/single_consumer_event.hpp >
cppcoro::single_consumer_event event;
std::string value;
cppcoro::task<> consumer ()
{
// Coroutine will suspend here until some thread calls event.set()
// eg. inside the producer() function below.
co_await event;
std::cout << value << std::endl;
}
void producer ()
{
value = " foo " ;
// This will resume the consumer() coroutine inside the call to set()
// if it is currently suspended.
event. set ();
}single_consumer_async_auto_reset_event Этот класс обеспечивает асинхронную синхронизационную примитиву, который позволяет одной коратике ждать, пока событие не будет сигнализировано вызовом метода set() .
После того, как коратика, которая ожидает события, будет выпущена либо предварительным, либо последующим вызовом set() событие автоматически сбросится обратно в состояние «не установлен».
Этот класс является более эффективной версией async_auto_reset_event , которая может использоваться в тех случаях, когда за раз будет ожидать только одного коратика. Если вам нужно поддержать несколько параллельных ожидающих кораток на событии, вместо этого используйте класс async_auto_reset_event .
Резюме API:
// <cppcoro/single_consumer_async_auto_reset_event.hpp>
namespace cppcoro
{
class single_consumer_async_auto_reset_event
{
public:
single_consumer_async_auto_reset_event (
bool initiallySet = false ) noexcept ;
// Change the event to the 'set' state. If a coroutine is awaiting the
// event then the event is immediately transitioned back to the 'not set'
// state and the coroutine is resumed.
void set () noexcept ;
// Returns an Awaitable type that can be awaited to wait until
// the event becomes 'set' via a call to the .set() method. If
// the event is already in the 'set' state then the coroutine
// continues without suspending.
// The event is automatically reset back to the 'not set' state
// before resuming the coroutine.
Awaiter< void > operator co_await () const noexcept ;
};
}Пример использования:
std::atomic< int > value;
cppcoro::single_consumer_async_auto_reset_event valueDecreasedEvent;
cppcoro::task<> wait_until_value_is_below ( int limit)
{
while (value. load (std::memory_order_relaxed) >= limit)
{
// Wait until there has been some change that we're interested in.
co_await valueDecreasedEvent;
}
}
void change_value ( int delta)
{
value. fetch_add (delta, std::memory_order_relaxed);
// Notify the waiter if there has been some change.
if (delta < 0 ) valueDecreasedEvent. set ();
}async_mutexОбеспечивает простую взаимную исключительную абстракцию, которая позволяет вызывающему абоненту «Co_await» мутекс изнутри коратики, чтобы приостановить кораку, пока не будет получена блокировка Mutex.
Реализация не содержит блокировки в том смысле, что вмешательство, которая ожидает, что мутекс не будет блокировать поток, а вместо этого приостановит корутину, а затем возобновит его в вызове, чтобы unlock() предыдущим держателем блокировки.
Резюме API:
// <cppcoro/async_mutex.hpp>
namespace cppcoro
{
class async_mutex_lock ;
class async_mutex_lock_operation ;
class async_mutex_scoped_lock_operation ;
class async_mutex
{
public:
async_mutex () noexcept ;
~async_mutex ();
async_mutex ( const async_mutex&) = delete ;
async_mutex& operator ( const async_mutex&) = delete;
bool try_lock () noexcept ;
async_mutex_lock_operation lock_async () noexcept ;
async_mutex_scoped_lock_operation scoped_lock_async () noexcept ;
void unlock ();
};
class async_mutex_lock_operation
{
public:
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () const noexcept ;
};
class async_mutex_scoped_lock_operation
{
public:
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
[[nodiscard]] async_mutex_lock await_resume () const noexcept ;
};
class async_mutex_lock
{
public:
// Takes ownership of the lock.
async_mutex_lock (async_mutex& mutex, std:: adopt_lock_t ) noexcept ;
// Transfer ownership of the lock.
async_mutex_lock (async_mutex_lock&& other) noexcept ;
async_mutex_lock ( const async_mutex_lock&) = delete ;
async_mutex_lock& operator =( const async_mutex_lock&) = delete ;
// Releases the lock by calling unlock() on the mutex.
~async_mutex_lock ();
};
}Пример использования:
# include < cppcoro/async_mutex.hpp >
# include < cppcoro/task.hpp >
# include < set >
# include < string >
cppcoro::async_mutex mutex;
std::set<std::string> values;
cppcoro::task<> add_item (std::string value)
{
cppcoro::async_mutex_lock lock = co_await mutex. scoped_lock_async ();
values. insert ( std::move (value));
}async_manual_reset_event Событие с ручным разрешением-это примитив с помощью коратины/синхронизации потока, который позволяет одним или нескольким потокам ждать, пока событие не будет сигнализировано потоком, который вызывает set() .
Мероприятие находится в одном из двух штатов; 'set' и 'не Set' .
Если событие находится в состоянии «установленного» , когда коратика ждет события, то Coroutine продолжается без приостановки. Однако, если Coroutine находится в состоянии «не установлен» , то коратика приостановлен, пока впоследствии какой -либо поток не вызовет метод set() .
Любые потоки, которые были приостановлены во время ожидания, когда событие станет «установленным», будут возобновлены в следующем вызове set() некоторым потоком.
Обратите внимание, что вы должны убедиться, что ни один из случаев не ожидает события «не установлен», когда событие будет разрушено, поскольку они не будут возобновлены.
Пример:
cppcoro::async_manual_reset_event event;
std::string value;
void producer ()
{
value = get_some_string_value ();
// Publish a value by setting the event.
event. set ();
}
// Can be called many times to create many tasks.
// All consumer tasks will wait until value has been published.
cppcoro::task<> consumer ()
{
// Wait until value has been published by awaiting event.
co_await event;
consume_value (value);
}Резюме API:
namespace cppcoro
{
class async_manual_reset_event_operation ;
class async_manual_reset_event
{
public:
async_manual_reset_event ( bool initiallySet = false ) noexcept ;
~async_manual_reset_event ();
async_manual_reset_event ( const async_manual_reset_event&) = delete ;
async_manual_reset_event (async_manual_reset_event&&) = delete ;
async_manual_reset_event& operator =( const async_manual_reset_event&) = delete ;
async_manual_reset_event& operator =(async_manual_reset_event&&) = delete ;
// Wait until the event becomes set.
async_manual_reset_event_operation operator co_await () const noexcept ;
bool is_set () const noexcept ;
void set () noexcept ;
void reset () noexcept ;
};
class async_manual_reset_event_operation
{
public:
async_manual_reset_event_operation (async_manual_reset_event& event) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () const noexcept ;
};
}async_auto_reset_event Событие с автоматическим разрешением-это примитив с помощью коратины/синхронизации потока, который позволяет одним или нескольким потокам ждать, пока событие не будет сигнализировано потоком, вызывая set() .
После того, как коратика, которая ожидает события, будет выпущена либо предварительным, так и последующим вызовом set() событие автоматически сбросится обратно в состояние «не установлен».
Резюме API:
// <cppcoro/async_auto_reset_event.hpp>
namespace cppcoro
{
class async_auto_reset_event_operation ;
class async_auto_reset_event
{
public:
async_auto_reset_event ( bool initiallySet = false ) noexcept ;
~async_auto_reset_event ();
async_auto_reset_event ( const async_auto_reset_event&) = delete ;
async_auto_reset_event (async_auto_reset_event&&) = delete ;
async_auto_reset_event& operator =( const async_auto_reset_event&) = delete ;
async_auto_reset_event& operator =(async_auto_reset_event&&) = delete ;
// Wait for the event to enter the 'set' state.
//
// If the event is already 'set' then the event is set to the 'not set'
// state and the awaiting coroutine continues without suspending.
// Otherwise, the coroutine is suspended and later resumed when some
// thread calls 'set()'.
//
// Note that the coroutine may be resumed inside a call to 'set()'
// or inside another thread's call to 'operator co_await()'.
async_auto_reset_event_operation operator co_await () const noexcept ;
// Set the state of the event to 'set'.
//
// If there are pending coroutines awaiting the event then one
// pending coroutine is resumed and the state is immediately
// set back to the 'not set' state.
//
// This operation is a no-op if the event was already 'set'.
void set () noexcept ;
// Set the state of the event to 'not-set'.
//
// This is a no-op if the state was already 'not set'.
void reset () noexcept ;
};
class async_auto_reset_event_operation
{
public:
explicit async_auto_reset_event_operation (async_auto_reset_event& event) noexcept ;
async_auto_reset_event_operation ( const async_auto_reset_event_operation& other) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () const noexcept ;
};
}async_latchАсинхронная защелка - это примитив синхронизации, которая позволяет асинхронно ждать, пока счетчик не будет уменьшен до нуля.
Защелка является объектом с одним использованием. Как только счетчик достигнет нуля, защелка станет «готовой» и останется готовой, пока защелка не будет уничтожена.
Резюме API:
// <cppcoro/async_latch.hpp>
namespace cppcoro
{
class async_latch
{
public:
// Initialise the latch with the specified count.
async_latch (std:: ptrdiff_t initialCount) noexcept ;
// Query if the count has reached zero yet.
bool is_ready () const noexcept ;
// Decrement the count by n.
// This will resume any waiting coroutines if the count reaches zero
// as a result of this call.
// It is undefined behaviour to decrement the count below zero.
void count_down (std:: ptrdiff_t n = 1 ) noexcept ;
// Wait until the latch becomes ready.
// If the latch count is not yet zero then the awaiting coroutine will
// be suspended and later resumed by a call to count_down() that decrements
// the count to zero. If the latch count was already zero then the coroutine
// continues without suspending.
Awaiter< void > operator co_await () const noexcept ;
};
}sequence_barrier sequence_barrier -это примитив синхронизации, которая позволяет однопродуцированному и нескольким потребителям координировать по отношению к монотонно увеличивающемуся номеру последовательности.
Один продюсер продвигает номер последовательности, публикуя новые номера последовательности в монотонно увеличивающемся порядке. Один или несколько потребителей могут запросить последний опубликованный номер последовательности и могут подождать, пока не будет опубликован конкретный номер последовательности.
Барьер последовательности может быть использован для представления курсора в защитный производитель/потребительский продюсер/потребительский кольцо
См. Паттерн Disruptor LMAX для получения дополнительной информации: https://lmax-exchange.github.io/disruptor/files/disruptor-1.0.pdf
Synopsis API:
namespace cppcoro
{
template < typename SEQUENCE = std:: size_t ,
typename TRAITS = sequence_traits<SEQUENCE>>
class sequence_barrier
{
public:
sequence_barrier (SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept ;
~sequence_barrier ();
SEQUENCE last_published () const noexcept ;
// Wait until the specified targetSequence number has been published.
//
// If the operation does not complete synchronously then the awaiting
// coroutine is resumed on the specified scheduler. Otherwise, the
// coroutine continues without suspending.
//
// The co_await expression resumes with the updated last_published()
// value, which is guaranteed to be at least 'targetSequence'.
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<SEQUENCE> wait_until_published (SEQUENCE targetSequence,
SCHEDULER& scheduler) const noexcept ;
void publish (SEQUENCE sequence) noexcept ;
};
}single_producer_sequencer single_producer_sequencer -это примитив синхронизации, который можно использовать для координации доступа к кольцевому буферу для одного производителя и одного или нескольких потребителей.
Продюсер сначала приобретает один или несколько слотов в кольцевом буфере, пишет элементам буфера кольца, соответствующим этим слотам, а затем, наконец, публикует значения, записанные в эти слоты. Производитель никогда не может производить больше, чем «буферизовать» элементы до того, где потребитель потреблялся.
Затем потребитель ожидает публикации определенных элементов, обрабатывает элементы, а затем уведомляет продюсера, когда он закончил обработку элементов, опубликовав номер последовательности, который он закончил потреблять в объекте sequence_barrier .
Synopsis API:
// <cppcoro/single_producer_sequencer.hpp>
namespace cppcoro
{
template <
typename SEQUENCE = std:: size_t ,
typename TRAITS = sequence_traits<SEQUENCE>>
class single_producer_sequencer
{
public:
using size_type = typename sequence_range<SEQUENCE, TRAITS>::size_type;
single_producer_sequencer (
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
std:: size_t bufferSize,
SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept ;
// Publisher API:
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<SEQUENCE> claim_one (SCHEDULER& scheduler) noexcept ;
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<sequence_range<SEQUENCE>> claim_up_to (
std:: size_t count,
SCHEDULER& scheduler) noexcept ;
void publish (SEQUENCE sequence) noexcept ;
// Consumer API:
SEQUENCE last_published () const noexcept ;
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<SEQUENCE> wait_until_published (
SEQUENCE targetSequence,
SCHEDULER& scheduler) const noexcept ;
};
}Пример использования:
using namespace cppcoro ;
using namespace std ::chrono ;
struct message
{
int id;
steady_clock::time_point timestamp;
float data;
};
constexpr size_t bufferSize = 16384 ; // Must be power-of-two
constexpr size_t indexMask = bufferSize - 1 ;
message buffer[bufferSize];
task< void > producer (
io_service& ioSvc,
single_producer_sequencer< size_t >& sequencer)
{
auto start = steady_clock::now ();
for ( int i = 0 ; i < 1'000'000 ; ++i)
{
// Wait until a slot is free in the buffer.
size_t seq = co_await sequencer. claim_one (ioSvc);
// Populate the message.
auto & msg = buffer[seq & indexMask];
msg. id = i;
msg. timestamp = steady_clock::now ();
msg. data = 123 ;
// Publish the message.
sequencer. publish (seq);
}
// Publish a sentinel
auto seq = co_await sequencer. claim_one (ioSvc);
auto & msg = buffer[seq & indexMask];
msg. id = - 1 ;
sequencer. publish (seq);
}
task< void > consumer (
static_thread_pool& threadPool,
const single_producer_sequencer< size_t >& sequencer,
sequence_barrier< size_t >& consumerBarrier)
{
size_t nextToRead = 0 ;
while ( true )
{
// Wait until the next message is available
// There may be more than one available.
const size_t available = co_await sequencer. wait_until_published (nextToRead, threadPool);
do {
auto & msg = buffer[nextToRead & indexMask];
if (msg. id == - 1 )
{
consumerBarrier. publish (nextToRead);
co_return ;
}
processMessage (msg);
} while (nextToRead++ != available);
// Notify the producer that we've finished processing
// up to 'nextToRead - 1'.
consumerBarrier. publish (available);
}
}
task< void > example (io_service& ioSvc, static_thread_pool& threadPool)
{
sequence_barrier< size_t > barrier;
single_producer_sequencer< size_t > sequencer{barrier, bufferSize};
co_await when_all (
producer (tp, sequencer),
consumer (tp, sequencer, barrier));
}multi_producer_sequencer Класс multi_producer_sequencer -это примитив синхронизации, который координирует доступ к кольцевому буферу для нескольких производителей и одного или нескольких потребителей.
Для одного продюсерного варианта см. single_producer_sequencer Class.
Обратите внимание, что кольцевой буфер должен иметь размер, который является мощностью двоих. Это связано с тем, что реализация использует Bitmasks вместо целочисленного деления/модуля для расчета смещения в буфер. Кроме того, это позволяет номеру последовательности безопасно обернуться вокруг 32-битного/64-битного значения.
Резюме API:
// <cppcoro/multi_producer_sequencer.hpp>
namespace cppcoro
{
template < typename SEQUENCE = std:: size_t ,
typename TRAITS = sequence_traits<SEQUENCE>>
class multi_producer_sequencer
{
public:
multi_producer_sequencer (
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
SEQUENCE initialSequence = TRAITS::initial_sequence);
std:: size_t buffer_size () const noexcept ;
// Consumer interface
//
// Each consumer keeps track of their own 'lastKnownPublished' value
// and must pass this to the methods that query for an updated last-known
// published sequence number.
SEQUENCE last_published_after (SEQUENCE lastKnownPublished) const noexcept ;
template < typename SCHEDULER>
Awaitable<SEQUENCE> wait_until_published (
SEQUENCE targetSequence,
SEQUENCE lastKnownPublished,
SCHEDULER& scheduler) const noexcept ;
// Producer interface
// Query whether any slots available for claiming (approx.)
bool any_available () const noexcept ;
template < typename SCHEDULER>
Awaitable<SEQUENCE> claim_one (SCHEDULER& scheduler) noexcept ;
template < typename SCHEDULER>
Awaitable<sequence_range<SEQUENCE, TRAITS>> claim_up_to (
std:: size_t count,
SCHEDULER& scheduler) noexcept ;
// Mark the specified sequence number as published.
void publish (SEQUENCE sequence) noexcept ;
// Mark all sequence numbers in the specified range as published.
void publish ( const sequence_range<SEQUENCE, TRAITS>& range) noexcept ;
};
} cancellation_token - это значение, которое может быть передано функции, которая позволяет вызывающему абоненту впоследствии передавать запрос на отмену операции этой функции.
Чтобы получить cancellation_token , который может быть отменен, вы должны сначала создать объект cancellation_source . Метод cancellation_source::token() может использоваться для изготовления новых значений cancellation_token , которые связаны с этим объектом cancellation_source .
Если вы захотите запросить отмену операции, вы прошли cancellation_token , который вы можете вызвать cancellation_source::request_cancellation() на связанном объекте cancellation_source .
Функции могут ответить на запрос об отмене одним из двух способов:
cancellation_token::is_cancellation_requested() или cancellation_token::throw_if_cancellation_requested() .cancellation_registration .Резюме API:
namespace cppcoro
{
class cancellation_source
{
public:
// Construct a new, independently cancellable cancellation source.
cancellation_source ();
// Construct a new reference to the same cancellation state.
cancellation_source ( const cancellation_source& other) noexcept ;
cancellation_source (cancellation_source&& other) noexcept ;
~cancellation_source ();
cancellation_source& operator =( const cancellation_source& other) noexcept ;
cancellation_source& operator =(cancellation_source&& other) noexcept ;
bool is_cancellation_requested () const noexcept ;
bool can_be_cancelled () const noexcept ;
void request_cancellation ();
cancellation_token token () const noexcept ;
};
class cancellation_token
{
public:
// Construct a token that can't be cancelled.
cancellation_token () noexcept ;
cancellation_token ( const cancellation_token& other) noexcept ;
cancellation_token (cancellation_token&& other) noexcept ;
~cancellation_token ();
cancellation_token& operator =( const cancellation_token& other) noexcept ;
cancellation_token& operator =(cancellation_token&& other) noexcept ;
bool is_cancellation_requested () const noexcept ;
void throw_if_cancellation_requested () const ;
// Query if this token can ever have cancellation requested.
// Code can use this to take a more efficient code-path in cases
// that the operation does not need to handle cancellation.
bool can_be_cancelled () const noexcept ;
};
// RAII class for registering a callback to be executed if cancellation
// is requested on a particular cancellation token.
class cancellation_registration
{
public:
// Register a callback to be executed if cancellation is requested.
// Callback will be called with no arguments on the thread that calls
// request_cancellation() if cancellation is not yet requested, or
// called immediately if cancellation has already been requested.
// Callback must not throw an unhandled exception when called.
template < typename CALLBACK>
cancellation_registration (cancellation_token token, CALLBACK&& callback);
cancellation_registration ( const cancellation_registration& other) = delete ;
~cancellation_registration ();
};
class operation_cancelled : public std :: exception
{
public:
operation_cancelled ();
const char * what () const override ;
};
}Пример: подход к опросу
cppcoro::task<> do_something_async (cppcoro::cancellation_token token)
{
// Explicitly define cancellation points within the function
// by calling throw_if_cancellation_requested().
token. throw_if_cancellation_requested ();
co_await do_step_1 ();
token. throw_if_cancellation_requested ();
do_step_2 ();
// Alternatively, you can query if cancellation has been
// requested to allow yourself to do some cleanup before
// returning.
if (token. is_cancellation_requested ())
{
display_message_to_user ( " Cancelling operation... " );
do_cleanup ();
throw cppcoro::operation_cancelled{};
}
do_final_step ();
}Пример: подход обратного вызова
// Say we already have a timer abstraction that supports being
// cancelled but it doesn't support cancellation_tokens natively.
// You can use a cancellation_registration to register a callback
// that calls the existing cancellation API. e.g.
cppcoro::task<> cancellable_timer_wait (cppcoro::cancellation_token token)
{
auto timer = create_timer (10s);
cppcoro::cancellation_registration registration (token, [&]
{
// Call existing timer cancellation API.
timer. cancel ();
});
co_await timer;
}static_thread_pool Класс static_thread_pool обеспечивает абстракцию, которая позволяет планировать работу над пулом потоков фиксированного размера.
Этот класс реализует концепцию планировщика (см. Ниже).
Вы можете подать работу в The Thread-Pool, выполнив co_await threadPool.schedule() . Эта операция приостановит текущую кораку, включите ее для выполнения на резьбе, а пул резьбы затем возобновит корешку, когда поток в резьбе будет следующим, чтобы запустить Coroutine. Эта операция гарантированно не бросить, и в общем случае не будет распределять память .
В этом классе используется алгоритм очистки работы для работы по загрузке с несколькими потоками. Работа, включенная в резьбу из потока потока, будет запланирована для выполнения в том же потоке в очереди LIFO. Работа, включенная в резьбу из дистанционного потока, будет включена в глобальную очередь FIFO. Когда рабочая нить исчерпает работу из своей локальной очереди, она сначала пытается выполнить работу из глобальной очереди. Если этот очередь пуст, то он затем пытается украсть работу из задней части очередей других рабочих потоков.
Резюме API:
namespace cppcoro
{
class static_thread_pool
{
public:
// Initialise the thread-pool with a number of threads equal to
// std::thread::hardware_concurrency().
static_thread_pool ();
// Initialise the thread pool with the specified number of threads.
explicit static_thread_pool (std:: uint32_t threadCount);
std:: uint32_t thread_count () const noexcept ;
class schedule_operation
{
public:
schedule_operation (static_thread_pool* tp) noexcept ;
bool await_ready () noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> h) noexcept ;
bool await_resume () noexcept ;
private:
// unspecified
};
// Return an operation that can be awaited by a coroutine.
//
//
[[nodiscard]]
schedule_operation schedule () noexcept ;
private:
// Unspecified
};
}Пример использования: просто
cppcoro::task<std::string> do_something_on_threadpool (cppcoro::static_thread_pool& tp)
{
// First schedule the coroutine onto the threadpool.
co_await tp. schedule ();
// When it resumes, this coroutine is now running on the threadpool.
do_something ();
} Пример Использования: Делать вещи параллельно - с помощью оператора schedule_on() с помощью static_thread_pool .
cppcoro::task< double > dot_product (static_thread_pool& tp, double a[], double b[], size_t count)
{
if (count > 1000 )
{
// Subdivide the work recursively into two equal tasks
// The first half is scheduled to the thread pool so it can run concurrently
// with the second half which continues on this thread.
size_t halfCount = count / 2 ;
auto [first, second] = co_await when_all (
schedule_on (tp, dot_product (tp, a, b, halfCount),
dot_product (tp, a + halfCount, b + halfCount, count - halfCount));
co_return first + second;
}
else
{
double sum = 0.0 ;
for ( size_t i = 0 ; i < count; ++i)
{
sum += a[i] * b[i];
}
co_return sum;
}
}io_service и io_work_scope Класс io_service обеспечивает абстракцию для обработки событий ввода -вывода из асинхронных операций ввода -вывода.
Когда завершается асинхронная операция ввода/вывода, в корутине, которая ожидалась, что операция будет возобновлена на потоке ввода/вывода в вызове одного из методов обработки событий: process_events() , process_pending_events() , process_one_event() или process_one_pending_event() .
Класс io_service не управляет ними ввода -вывода. Вы должны убедиться, что некоторые потоки вызовывают один из методов обработки событий для Coroutines, ожидающих отправки событий ввода-вывода. Это может быть выделенный поток, который вызывает process_events() или смешан с некоторым другим циклом события (например, цикл событий пользовательского интерфейса), периодически опрошенным для новых событий с помощью вызова process_pending_events() или process_one_pending_event() .
Это позволяет интегрировать петлю событий io_service с другими петлями событий, такими как контура событий пользователя.
Вы можете мультиплексная обработка событий по нескольким потокам, имея несколько потоков Call process_events() . Вы можете указать намек на максимальное количество потоков, чтобы активно обрабатывать события через необязательный параметр конструктора io_service .
В Windows реализация использует установку порта ввода/вывода Windows для рассылки событий в потоки ввода/вывода масштабируемым образом.
Резюме API:
namespace cppcoro
{
class io_service
{
public:
class schedule_operation ;
class timed_schedule_operation ;
io_service ();
io_service (std:: uint32_t concurrencyHint);
io_service (io_service&&) = delete ;
io_service ( const io_service&) = delete ;
io_service& operator =(io_service&&) = delete ;
io_service& operator =( const io_service&) = delete ;
~io_service ();
// Scheduler methods
[[nodiscard]]
schedule_operation schedule () noexcept ;
template < typename REP, typename RATIO>
[[nodiscard]]
timed_schedule_operation schedule_after (
std::chrono::duration<REP, RATIO> delay,
cppcoro::cancellation_token cancellationToken = {}) noexcept ;
// Event-loop methods
//
// I/O threads must call these to process I/O events and execute
// scheduled coroutines.
std:: uint64_t process_events ();
std:: uint64_t process_pending_events ();
std:: uint64_t process_one_event ();
std:: uint64_t process_one_pending_event ();
// Request that all threads processing events exit their event loops.
void stop () noexcept ;
// Query if some thread has called stop()
bool is_stop_requested () const noexcept ;
// Reset the event-loop after a call to stop() so that threads can
// start processing events again.
void reset ();
// Reference-counting methods for tracking outstanding references
// to the io_service.
//
// The io_service::stop() method will be called when the last work
// reference is decremented.
//
// Use the io_work_scope RAII class to manage calling these methods on
// entry-to and exit-from a scope.
void notify_work_started () noexcept ;
void notify_work_finished () noexcept ;
};
class io_service ::schedule_operation
{
public:
schedule_operation ( const schedule_operation&) noexcept ;
schedule_operation& operator =( const schedule_operation&) noexcept ;
bool await_ready () const noexcept ;
void await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () noexcept ;
};
class io_service ::timed_schedule_operation
{
public:
timed_schedule_operation (timed_schedule_operation&&) noexcept ;
timed_schedule_operation ( const timed_schedule_operation&) = delete ;
timed_schedule_operation& operator =( const timed_schedule_operation&) = delete ;
timed_schedule_operation& operator =(timed_schedule_operation&&) = delete ;
bool await_ready () const noexcept ;
void await_suspend (std::experimental::coroutine_handle<> awaiter);
void await_resume ();
};
class io_work_scope
{
public:
io_work_scope (io_service& ioService) noexcept ;
io_work_scope ( const io_work_scope& other) noexcept ;
io_work_scope (io_work_scope&& other) noexcept ;
~io_work_scope ();
io_work_scope& operator =( const io_work_scope& other) noexcept ;
io_work_scope& operator =(io_work_scope&& other) noexcept ;
io_service& service () const noexcept ;
};
}Пример:
# include < cppcoro/task.hpp >
# include < cppcoro/task.hpp >
# include < cppcoro/io_service.hpp >
# include < cppcoro/read_only_file.hpp >
# include < experimental/filesystem >
# include < memory >
# include < algorithm >
# include < iostream >
namespace fs = std::experimental::filesystem;
cppcoro::task<std:: uint64_t > count_lines (cppcoro::io_service& ioService, fs::path path)
{
auto file = cppcoro::read_only_file::open (ioService, path);
constexpr size_t bufferSize = 4096 ;
auto buffer = std::make_unique<std:: uint8_t []>(bufferSize);
std:: uint64_t newlineCount = 0 ;
for (std:: uint64_t offset = 0 , fileSize = file. size (); offset < fileSize;)
{
const auto bytesToRead = static_cast < size_t >(
std::min<std:: uint64_t >(bufferSize, fileSize - offset));
const auto bytesRead = co_await file. read (offset, buffer. get (), bytesToRead);
newlineCount += std::count (buffer. get (), buffer. get () + bytesRead, ' n ' );
offset += bytesRead;
}
co_return newlineCount;
}
cppcoro::task<> run (cppcoro::io_service& ioService)
{
cppcoro::io_work_scope ioScope (ioService);
auto lineCount = co_await count_lines (ioService, fs::path{ " foo.txt " });
std::cout << " foo.txt has " << lineCount << " lines. " << std::endl;;
}
cppcoro::task<> process_events (cppcoro::io_service& ioService)
{
// Process events until the io_service is stopped.
// ie. when the last io_work_scope goes out of scope.
ioService. process_events ();
co_return ;
}
int main ()
{
cppcoro::io_service ioService;
cppcoro::sync_wait ( cppcoro::when_all_ready (
run (ioService),
process_events (ioService)));
return 0 ;
}io_service как планировщик Класс io_service реализует интерфейсы для концепций Scheduler и DelayedScheduler .
Это позволяет коратике приостановить выполнение в текущем потоке и самом расписании для возобновления в потоке ввода/вывода, связанном с конкретным объектом io_service .
Пример:
cppcoro::task<> do_something (cppcoro::io_service& ioService)
{
// Coroutine starts execution on the thread of the task awaiter.
// A coroutine can transfer execution to an I/O thread by awaiting the
// result of io_service::schedule().
co_await ioService. schedule ();
// At this point, the coroutine is now executing on an I/O thread
// inside a call to one of the io_service event processing methods.
// A coroutine can also perform a delayed-schedule that will suspend
// the coroutine for a specified duration of time before scheduling
// it for resumption on an I/O thread.
co_await ioService. schedule_after (100ms);
// At this point, the coroutine is executing on a potentially different I/O thread.
}file , readable_file , writable_fileЭти типы являются абстрактными базовыми классами для выполнения конкретного ввода/вывода файла.
Резюме API:
namespace cppcoro
{
class file_read_operation ;
class file_write_operation ;
class file
{
public:
virtual ~file ();
std:: uint64_t size () const ;
protected:
file (file&& other) noexcept ;
};
class readable_file : public virtual file
{
public:
[[nodiscard]]
file_read_operation read (
std:: uint64_t offset,
void * buffer,
std:: size_t byteCount,
cancellation_token ct = {}) const noexcept ;
};
class writable_file : public virtual file
{
public:
void set_size (std:: uint64_t fileSize);
[[nodiscard]]
file_write_operation write (
std:: uint64_t offset,
const void * buffer,
std:: size_t byteCount,
cancellation_token ct = {}) noexcept ;
};
class file_read_operation
{
public:
file_read_operation (file_read_operation&& other) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter);
std:: size_t await_resume ();
};
class file_write_operation
{
public:
file_write_operation (file_write_operation&& other) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter);
std:: size_t await_resume ();
};
}read_only_file , write_only_file , read_write_fileЭти типы представляют собой занятия ввода/вывода.
Резюме API:
namespace cppcoro
{
class read_only_file : public readable_file
{
public:
[[nodiscard]]
static read_only_file open (
io_service& ioService,
const std::experimental::filesystem::path& path,
file_share_mode shareMode = file_share_mode::read,
file_buffering_mode bufferingMode = file_buffering_mode::default_);
};
class write_only_file : public writable_file
{
public:
[[nodiscard]]
static write_only_file open (
io_service& ioService,
const std::experimental::filesystem::path& path,
file_open_mode openMode = file_open_mode::create_or_open,
file_share_mode shareMode = file_share_mode::none,
file_buffering_mode bufferingMode = file_buffering_mode::default_);
};
class read_write_file : public readable_file , public writable_file
{
public:
[[nodiscard]]
static read_write_file open (
io_service& ioService,
const std::experimental::filesystem::path& path,
file_open_mode openMode = file_open_mode::create_or_open,
file_share_mode shareMode = file_share_mode::none,
file_buffering_mode bufferingMode = file_buffering_mode::default_);
};
} Все функции open() бросают std::system_error при сбое.
ПРИМЕЧАНИЕ. Сетевые абстракции в настоящее время поддерживаются только на платформе Windows. Поддержка Linux скоро появится.
socketКласс сокетов может использоваться для отправки/получения данных по сети асинхронно.
В настоящее время поддерживает только TCP/IP, UDP/IP через IPv4 и IPv6.
Резюме API:
// <cppcoro/net/socket.hpp>
namespace cppcoro ::net
{
class socket
{
public:
static socket create_tcpv4 (ip_service& ioSvc);
static socket create_tcpv6 (ip_service& ioSvc);
static socket create_updv4 (ip_service& ioSvc);
static socket create_udpv6 (ip_service& ioSvc);
socket (socket&& other) noexcept ;
~socket ();
socket& operator =(socket&& other) noexcept ;
// Return the native socket handle for the socket
<platform-specific> native_handle () noexcept ;
const ip_endpoint& local_endpoint () const noexcept ;
const ip_endpoint& remote_endpoint () const noexcept ;
void bind ( const ip_endpoint& localEndPoint);
void listen ();
[[nodiscard]]
Awaitable< void > connect ( const ip_endpoint& remoteEndPoint) noexcept ;
[[nodiscard]]
Awaitable< void > connect ( const ip_endpoint& remoteEndPoint,
cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable< void > accept (socket& acceptingSocket) noexcept ;
[[nodiscard]]
Awaitable< void > accept (socket& acceptingSocket,
cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable< void > disconnect () noexcept ;
[[nodiscard]]
Awaitable< void > disconnect (cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > send ( const void * buffer, std:: size_t size) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > send ( const void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > recv ( void * buffer, std:: size_t size) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > recv ( void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
[[nodiscard]]
socket_recv_from_operation recv_from (
void * buffer,
std:: size_t size) noexcept ;
[[nodiscard]]
socket_recv_from_operation_cancellable recv_from (
void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
[[nodiscard]]
socket_send_to_operation send_to (
const ip_endpoint& destination,
const void * buffer,
std:: size_t size) noexcept ;
[[nodiscard]]
socket_send_to_operation_cancellable send_to (
const ip_endpoint& destination,
const void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
void close_send ();
void close_recv ();
};
}Пример: Echo Server
# include < cppcoro/net/socket.hpp >
# include < cppcoro/io_service.hpp >
# include < cppcoro/cancellation_source.hpp >
# include < cppcoro/async_scope.hpp >
# include < cppcoro/on_scope_exit.hpp >
# include < memory >
# include < iostream >
cppcoro::task< void > handle_connection (socket s)
{
try
{
const size_t bufferSize = 16384 ;
auto buffer = std::make_unique< unsigned char []>(bufferSize);
size_t bytesRead;
do {
// Read some bytes
bytesRead = co_await s. recv (buffer. get (), bufferSize);
// Write some bytes
size_t bytesWritten = 0 ;
while (bytesWritten < bytesRead) {
bytesWritten += co_await s. send (
buffer. get () + bytesWritten,
bytesRead - bytesWritten);
}
} while (bytesRead != 0 );
s. close_send ();
co_await s. disconnect ();
}
catch (...)
{
std::cout << " connection failed " << std::
}
}
cppcoro::task< void > echo_server (
cppcoro::net::ipv4_endpoint endpoint,
cppcoro::io_service& ioSvc,
cancellation_token ct)
{
cppcoro::async_scope scope;
std::exception_ptr ex;
try
{
auto listeningSocket = cppcoro::net::socket::create_tcpv4 (ioSvc);
listeningSocket. bind (endpoint);
listeningSocket. listen ();
while ( true ) {
auto connection = cppcoro::net::socket::create_tcpv4 (ioSvc);
co_await listeningSocket. accept (connection, ct);
scope. spawn ( handle_connection ( std::move (connection)));
}
}
catch (cppcoro::operation_cancelled)
{
}
catch (...)
{
ex = std::current_exception ();
}
// Wait until all handle_connection tasks have finished.
co_await scope. join ();
if (ex) std::rethrow_exception (ex);
}
int main ( int argc, const char * argv[])
{
cppcoro::io_service ioSvc;
if (argc != 2 ) return - 1 ;
auto endpoint = cppcoro::ipv4_endpoint::from_string (argv[ 1 ]);
if (!endpoint) return - 1 ;
( void ) cppcoro::sync_wait ( cppcoro::when_all (
[&]() -> task<>
{
// Shutdown the event loop once finished.
auto stopOnExit = cppcoro::on_scope_exit ([&] { ioSvc. stop (); });
cppcoro::cancellation_source canceller;
co_await cppcoro::when_all (
[&]() -> task<>
{
// Run for 30s then stop accepting new connections.
co_await ioSvc. schedule_after ( std::chrono::seconds ( 30 ));
canceller. request_cancellation ();
}(),
echo_server (*endpoint, ioSvc, canceller. token ()));
}(),
[&]() -> task<>
{
ioSvc. process_events ();
}()));
return 0 ;
}ip_address , ipv4_address , ipv6_addressПомощные классы для представления IP -адреса.
Synopsis API:
namespace cppcoro ::net
{
class ipv4_address
{
using bytes_t = std:: uint8_t [ 4 ];
public:
constexpr ipv4_address ();
explicit constexpr ipv4_address (std:: uint32_t integer);
explicit constexpr ipv4_address ( const std::uint8_t (&bytes)[4]);
explicit constexpr ipv4_address (std:: uint8_t b0,
std:: uint8_t b1,
std:: uint8_t b2,
std:: uint8_t b3);
constexpr const bytes_t & bytes () const ;
constexpr std:: uint32_t to_integer () const ;
static constexpr ipv4_address loopback ();
constexpr bool is_loopback () const ;
constexpr bool is_private_network () const ;
constexpr bool operator ==(ipv4_address other) const ;
constexpr bool operator !=(ipv4_address other) const ;
constexpr bool operator <(ipv4_address other) const ;
constexpr bool operator >(ipv4_address other) const ;
constexpr bool operator <=(ipv4_address other) const ;
constexpr bool operator >=(ipv4_address other) const ;
std::string to_string ();
static std::optional<ipv4_address> from_string (std::string_view string) noexcept ;
};
class ipv6_address
{
using bytes_t = std:: uint8_t [ 16 ];
public:
constexpr ipv6_address ();
explicit constexpr ipv6_address (
std:: uint64_t subnetPrefix,
std:: uint64_t interfaceIdentifier);
constexpr ipv6_address (
std:: uint16_t part0,
std:: uint16_t part1,
std:: uint16_t part2,
std:: uint16_t part3,
std:: uint16_t part4,
std:: uint16_t part5,
std:: uint16_t part6,
std:: uint16_t part7);
explicit constexpr ipv6_address (
const std::uint16_t (&parts)[8]);
explicit constexpr ipv6_address (
const std::uint8_t (bytes)[16]);
constexpr const bytes_t & bytes () const ;
constexpr std:: uint64_t subnet_prefix () const ;
constexpr std:: uint64_t interface_identifier () const ;
static constexpr ipv6_address unspecified ();
static constexpr ipv6_address loopback ();
static std::optional<ipv6_address> from_string (std::string_view string) noexcept ;
std::string to_string () const ;
constexpr bool operator ==( const ipv6_address& other) const ;
constexpr bool operator !=( const ipv6_address& other) const ;
constexpr bool operator <( const ipv6_address& other) const ;
constexpr bool operator >( const ipv6_address& other) const ;
constexpr bool operator <=( const ipv6_address& other) const ;
constexpr bool operator >=( const ipv6_address& other) const ;
};
class ip_address
{
public:
// Constructs to IPv4 address 0.0.0.0
ip_address () noexcept ;
ip_address (ipv4_address address) noexcept ;
ip_address (ipv6_address address) noexcept ;
bool is_ipv4 () const noexcept ;
bool is_ipv6 () const noexcept ;
const ipv4_address& to_ipv4 () const ;
const ipv6_address& to_ipv6 () const ;
const std:: uint8_t * bytes () const noexcept ;
std::string to_string () const ;
static std::optional<ip_address> from_string (std::string_view string) noexcept ;
bool operator ==( const ip_address& rhs) const noexcept ;
bool operator !=( const ip_address& rhs) const noexcept ;
// ipv4_address sorts less than ipv6_address
bool operator <( const ip_address& rhs) const noexcept ;
bool operator >( const ip_address& rhs) const noexcept ;
bool operator <=( const ip_address& rhs) const noexcept ;
bool operator >=( const ip_address& rhs) const noexcept ;
};
}ip_endpoint , ipv4_endpoint ipv6_endpointПомощные классы для представления IP-адреса и портового номера.
Synopsis API:
namespace cppcoro ::net
{
class ipv4_endpoint
{
public:
ipv4_endpoint () noexcept ;
explicit ipv4_endpoint (ipv4_address address, std:: uint16_t port = 0 ) noexcept ;
const ipv4_address& address () const noexcept ;
std:: uint16_t port () const noexcept ;
std::string to_string () const ;
static std::optional<ipv4_endpoint> from_string (std::string_view string) noexcept ;
};
bool operator ==( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator !=( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator <( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator >( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator <=( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator >=( const ipv4_endpoint& a, const ipv4_endpoint& b);
class ipv6_endpoint
{
public:
ipv6_endpoint () noexcept ;
explicit ipv6_endpoint (ipv6_address address, std:: uint16_t port = 0 ) noexcept ;
const ipv6_address& address () const noexcept ;
std:: uint16_t port () const noexcept ;
std::string to_string () const ;
static std::optional<ipv6_endpoint> from_string (std::string_view string) noexcept ;
};
bool operator ==( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator !=( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator <( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator >( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator <=( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator >=( const ipv6_endpoint& a, const ipv6_endpoint& b);
class ip_endpoint
{
public:
// Constructs to IPv4 end-point 0.0.0.0:0
ip_endpoint () noexcept ;
ip_endpoint (ipv4_endpoint endpoint) noexcept ;
ip_endpoint (ipv6_endpoint endpoint) noexcept ;
bool is_ipv4 () const noexcept ;
bool is_ipv6 () const noexcept ;
const ipv4_endpoint& to_ipv4 () const ;
const ipv6_endpoint& to_ipv6 () const ;
ip_address address () const noexcept ;
std:: uint16_t port () const noexcept ;
std::string to_string () const ;
static std::optional<ip_endpoint> from_string (std::string_view string) noexcept ;
bool operator ==( const ip_endpoint& rhs) const noexcept ;
bool operator !=( const ip_endpoint& rhs) const noexcept ;
// ipv4_endpoint sorts less than ipv6_endpoint
bool operator <( const ip_endpoint& rhs) const noexcept ;
bool operator >( const ip_endpoint& rhs) const noexcept ;
bool operator <=( const ip_endpoint& rhs) const noexcept ;
bool operator >=( const ip_endpoint& rhs) const noexcept ;
};
}sync_wait() Функцию sync_wait() может использоваться для синхронного ожидания, пока не завершится указанное awaitable .
Указанный ожидаемый co_await будет в текущем потоке внутри недавно созданной Coroutine.
Вызов sync_wait() будет блокироваться до тех пор, пока операция не завершится, и не вернет результат выражения co_await или переосмыслит исключение, если выражение co_await завершено с нечетному исключению.
Функция sync_wait() в основном полезна для запуска задачи верхнего уровня из main() и ожидания до завершения задачи, на практике это единственный способ начать task первого/верхнего уровня.
Резюме API:
// <cppcoro/sync_wait.hpp>
namespace cppcoro
{
template < typename AWAITABLE>
auto sync_wait (AWAITABLE&& awaitable)
-> typename awaitable_traits<AWAITABLE&&>::await_result_t;
}Примеры:
void example_task ()
{
auto makeTask = []() -> task<std::string>
{
co_return " foo " ;
};
auto task = makeTask ();
// start the lazy task and wait until it completes
sync_wait (task); // -> "foo"
sync_wait ( makeTask ()); // -> "foo"
}
void example_shared_task ()
{
auto makeTask = []() -> shared_task<std::string>
{
co_return " foo " ;
};
auto task = makeTask ();
// start the shared task and wait until it completes
sync_wait (task) == " foo " ;
sync_wait ( makeTask ()) == " foo " ;
}when_all_ready() Функция when_all_ready() может использоваться для создания нового ожидаемого, которое завершается, когда все входные ожидания завершены.
Входные задачи могут быть любым типом ожидаемого.
Когда возвращаемый ожидаемый co_await получить, он будет co_await каждый из ожидаемых вводов в очереди на ожидающем потоке в том порядке, которую они передаются в функцию when_all_ready() . Если эти задачи не выполняют синхронно, они выполнятся одновременно.
После того, как все выражения co_await в ожидаемых вводах пройдут, чтобы завершить возвращенные ожидаемые, которые завершат и возобновит ожидающуюся коратину. В ожидании Coroutine будет возобновлено в потоке ожидаемого ввода ожидаемого, которое последнее.
Возвращенное ожидаемое гарантированно не выкладывает исключение при co_await ED, даже если некоторые из ожидаемых ввода не удастся с нездоровым исключением.
Обратите внимание, однако, что сам вызов when_all_ready() может бросить std::bad_alloc если он не смог выделить память для кадров Coroutine, необходимых для ожидания каждого из ожидаемых ввода. Это также может добавить исключение, если какой -либо из входных ожидаемых объектов бросается из их конструкторов копирования/перемещения.
Результатом co_await возвращаемого ожидаемого является std::tuple или std::vector when_all_task<RESULT> объекты. Эти объекты позволяют вам получить результат (или исключение) каждого входного ожидания отдельно, вызывая метод when_all_task<RESULT>::result() соответствующей задачи вывода. Это позволяет вызывающему одновременно ожидание нескольких ожидаемых и синхронизировать их завершение, сохраняя при этом возможность впоследствии осмотреть результаты каждой из операций co_await для успеха/сбоя.
Это отличается от when_all() , где сбой любой отдельной операции co_await приводит к сбои общей операции с исключением. Это означает, что вы не можете определить, какая из операций Component co_await не удалась, а также не позволяет вам получить результаты других операций co_await .
Резюме API:
// <cppcoro/when_all_ready.hpp>
namespace cppcoro
{
// Concurrently await multiple awaitables.
//
// Returns an awaitable object that, when co_await'ed, will co_await each of the input
// awaitable objects and will resume the awaiting coroutine only when all of the
// component co_await operations complete.
//
// Result of co_await'ing the returned awaitable is a std::tuple of detail::when_all_task<T>,
// one for each input awaitable and where T is the result-type of the co_await expression
// on the corresponding awaitable.
//
// AWAITABLES must be awaitable types and must be movable (if passed as rvalue) or copyable
// (if passed as lvalue). The co_await expression will be executed on an rvalue of the
// copied awaitable.
template < typename ... AWAITABLES>
auto when_all_ready (AWAITABLES&&... awaitables)
-> Awaitable<std::tuple<detail::when_all_task<typename awaitable_traits<AWAITABLES>::await_result_t>...>>;
// Concurrently await each awaitable in a vector of input awaitables.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t >
auto when_all_ready (std::vector<AWAITABLE> awaitables)
-> Awaitable<std::vector<detail::when_all_task<RESULT>>>;
}Пример использования:
task<std::string> get_record ( int id);
task<> example1 ()
{
// Run 3 get_record() operations concurrently and wait until they're all ready.
// Returns a std::tuple of tasks that can be unpacked using structured bindings.
auto [task1, task2, task3] = co_await when_all_ready (
get_record ( 123 ),
get_record ( 456 ),
get_record ( 789 ));
// Unpack the result of each task
std::string& record1 = task1. result ();
std::string& record2 = task2. result ();
std::string& record3 = task3. result ();
// Use records....
}
task<> example2 ()
{
// Create the input tasks. They don't start executing yet.
std::vector<task<std::string>> tasks;
for ( int i = 0 ; i < 1000 ; ++i)
{
tasks. emplace_back ( get_record (i));
}
// Execute all tasks concurrently.
std::vector<detail::when_all_task<std::string>> resultTasks =
co_await when_all_ready ( std::move (tasks));
// Unpack and handle each result individually once they're all complete.
for ( int i = 0 ; i < 1000 ; ++i)
{
try
{
std::string& record = tasks[i]. result ();
std::cout << i << " = " << record << std::endl;
}
catch ( const std:: exception & ex)
{
std::cout << i << " : " << ex. what () << std::endl;
}
}
}when_all() Функция when_all() может использоваться для создания нового ожидаемого, что, когда co_await Ed будет одновременно co_await каждого из входных ожидаемых, и вернет заполнитель их индивидуальных результатов.
Когда ожидается возвращенное ожидаемое, он будет продолжаться, он будет co_await каждый из ожидаемых ввода в текущем потоке. После первого ожидаемого приостановления будет начато вторая задача и так далее. Операции выполняются одновременно до тех пор, пока они не будут выполнены до завершения.
После того, как все операции компонентов co_await пройдут до завершения, совокупность результатов создается из каждого отдельного результата. Если исключение брошено любой из входных задач или если конструкция совокупного результата вызывает исключение, то исключение будет распространяться из co_await возвращаемого ожидаемого.
Если несколько операций co_await не выполняются за исключением, то одно из исключений будет распространяться из выражения co_await when_all() другие исключения будут молча игнорируются. Не указано, какая исключение операции будет выбрано.
Если важно знать, какая операция COMPONT co_await не удалась или сохранить возможность получить результаты других операций, даже если некоторые из них не стержли, то вам следует использовать when_all_ready() .
Резюме API:
// <cppcoro/when_all.hpp>
namespace cppcoro
{
// Variadic version.
//
// Note that if the result of `co_await awaitable` yields a void-type
// for some awaitables then the corresponding component for that awaitable
// in the tuple will be an empty struct of type detail::void_value.
template < typename ... AWAITABLES>
auto when_all (AWAITABLES&&... awaitables)
-> Awaitable<std::tuple<typename awaitable_traits<AWAITABLES>::await_result_t...>>;
// Overload for vector<Awaitable<void>>.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t ,
std:: enable_if_t <std::is_void_v<RESULT>, int > = 0 >
auto when_all (std::vector<AWAITABLE> awaitables)
-> Awaitable<void>;
// Overload for vector<Awaitable<NonVoid>> that yield a value when awaited.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t ,
std:: enable_if_t <!std::is_void_v<RESULT>, int > = 0 >
auto when_all (std::vector<AWAITABLE> awaitables)
-> Awaitable<std::vector<std::conditional_t<
std::is_lvalue_reference_v<RESULT>,
std::reference_wrapper<std::remove_reference_t<RESULT>>,
std::remove_reference_t<RESULT>>>>;
}Примеры:
task<A> get_a ();
task<B> get_b ();
task<> example1 ()
{
// Run get_a() and get_b() concurrently.
// Task yields a std::tuple<A, B> which can be unpacked using structured bindings.
auto [a, b] = co_await when_all ( get_a (), get_b ());
// use a, b
}
task<std::string> get_record ( int id);
task<> example2 ()
{
std::vector<task<std::string>> tasks;
for ( int i = 0 ; i < 1000 ; ++i)
{
tasks. emplace_back ( get_record (i));
}
// Concurrently execute all get_record() tasks.
// If any of them fail with an exception then the exception will propagate
// out of the co_await expression once they have all completed.
std::vector<std::string> records = co_await when_all ( std::move (tasks));
// Process results
for ( int i = 0 ; i < 1000 ; ++i)
{
std::cout << i << " = " << records[i] << std::endl;
}
}fmap() Функция fmap() может использоваться для применения выявленной функции к значению, содержащемуся в контейнерном типе, возвращая новый тип контейнера результатов применения функции.
Функция fmap() может применить функцию к значениям generator<T> , recursive_generator<T> и async_generator<T> , а также любое значение, которое поддерживает Awaitable концепцию (например, task<T> ).
Каждый из этих типов обеспечивает перегрузку для fmap() , которая принимает два аргумента; функция для применения и значение контейнера. См. Документацию для каждого типа для поддерживаемых перегрузки fmap() .
Например, функция fmap() может использоваться для применения функции к возможному результату task<T> , создавая новую task<U> , которая будет завершена с возвратной стоимостью функции.
// Given a function you want to apply that converts
// a value of type A to value of type B.
B a_to_b (A value);
// And a task that yields a value of type A
cppcoro::task<A> get_an_a ();
// We can apply the function to the result of the task using fmap()
// and obtain a new task yielding the result.
cppcoro::task<B> bTask = fmap(a_to_b, get_an_a());
// An alternative syntax is to use the pipe notation.
cppcoro::task<B> bTask = get_an_a() | cppcoro::fmap(a_to_b);Резюме API:
// <cppcoro/fmap.hpp>
namespace cppcoro
{
template < typename FUNC>
struct fmap_transform
{
fmap_transform (FUNC&& func) noexcept (std::is_nothrow_move_constructible_v<FUNC>);
FUNC func;
};
// Type-deducing constructor for fmap_transform object that can be used
// in conjunction with operator|.
template < typename FUNC>
fmap_transform<FUNC> fmap (FUNC&& func);
// operator| overloads for providing pipe-based syntactic sugar for fmap()
// such that the expression:
// <value-expr> | cppcoro::fmap(<func-expr>)
// is equivalent to:
// fmap(<func-expr>, <value-expr>)
template < typename T, typename FUNC>
decltype ( auto ) operator |(T&& value, fmap_transform<FUNC>&& transform);
template < typename T, typename FUNC>
decltype ( auto ) operator |(T&& value, fmap_transform<FUNC>& transform);
template < typename T, typename FUNC>
decltype ( auto ) operator |(T&& value, const fmap_transform<FUNC>& transform);
// Generic overload for all awaitable types.
//
// Returns an awaitable that when co_awaited, co_awaits the specified awaitable
// and applies the specified func to the result of the 'co_await awaitable'
// expression as if by 'std::invoke(func, co_await awaitable)'.
//
// If the type of 'co_await awaitable' expression is 'void' then co_awaiting the
// returned awaitable is equivalent to 'co_await awaitable, func()'.
template <
typename FUNC,
typename AWAITABLE,
std:: enable_if_t <is_awaitable_v<AWAITABLE>, int > = 0 >
auto fmap (FUNC&& func, AWAITABLE&& awaitable)
-> Awaitable<std::invoke_result_t<FUNC, typename awaitable_traits<AWAITABLE>::await_result_t>>;
} Функция fmap() предназначена для поиска правильной перегрузки с помощью аргументации, зависящего от поиска (ADL), поэтому ее обычно следует вызывать без префикса cppcoro:: .
resume_on() Функция resume_on() может использоваться для управления контекстом выполнения, на котором ожидается возобновление ожидаемой коратики, когда ожидается. При применении к async_generator он контролирует, какой контекст выполнения co_await g.begin() и co_await ++it -операции возобновляют ожидающие коратики.
Обычно ожидающая коратика ожидаемой (например, task ) или async_generator возобновит выполнение в любом потоке, на которой выполнена операция. В некоторых случаях это может быть не тот поток, который вы хотите продолжать выполнять. В этих случаях вы можете использовать функцию resume_on() для создания нового ожидаемого или генератора, который возобновит выполнение в потоке, связанном с указанным планировщиком.
Функцию resume_on() может использоваться либо в качестве нормальной функции, возвращающей новый ожидаемый/генератор. Или это может быть использовано в трубопроводе-синтаксисе.
Пример:
task<record> load_record ( int id);
ui_thread_scheduler uiThreadScheduler;
task<> example ()
{
// This will start load_record() on the current thread.
// Then when load_record() completes (probably on an I/O thread)
// it will reschedule execution onto thread pool and call to_json
// Once to_json completes it will transfer execution onto the
// ui thread before resuming this coroutine and returning the json text.
task<std::string> jsonTask =
load_record ( 123 )
| cppcoro::resume_on ( threadpool::default ())
| cppcoro::fmap (to_json)
| cppcoro::resume_on (uiThreadScheduler);
// At this point, all we've done is create a pipeline of tasks.
// The tasks haven't started executing yet.
// Await the result. Starts the pipeline of tasks.
std::string jsonText = co_await jsonTask;
// Guaranteed to be executing on ui thread here.
someUiControl. set_text (jsonText);
}Резюме API:
// <cppcoro/resume_on.hpp>
namespace cppcoro
{
template < typename SCHEDULER, typename AWAITABLE>
auto resume_on (SCHEDULER& scheduler, AWAITABLE awaitable)
-> Awaitable<typename awaitable_traits<AWAITABLE>::await_traits_t>;
template < typename SCHEDULER, typename T>
async_generator<T> resume_on (SCHEDULER& scheduler, async_generator<T> source);
template < typename SCHEDULER>
struct resume_on_transform
{
explicit resume_on_transform (SCHEDULER& scheduler) noexcept ;
SCHEDULER& scheduler;
};
// Construct a transform/operation that can be applied to a source object
// using "pipe" notation (ie. operator|).
template < typename SCHEDULER>
resume_on_transform<SCHEDULER> resume_on (SCHEDULER& scheduler) noexcept ;
// Equivalent to 'resume_on(transform.scheduler, std::forward<T>(value))'
template < typename T, typename SCHEDULER>
decltype ( auto ) operator |(T&& value, resume_on_transform<SCHEDULER> transform)
{
return resume_on (transform. scheduler , std::forward<T>(value));
}
}schedule_on() Функция schedule_on() может использоваться для изменения контекста выполнения, который начинает выполнять данный ожидаемый или async_generator .
Применительно к async_generator , он также влияет на то, какой контекст выполнения он возобновляется после оператора co_yield .
Обратите внимание, что преобразование schedule_on не указывает поток, который ожидается или async_generator заполнит или даст результаты, что зависит от реализации ожидаемого или генератора.
См. Оператор resume_on() для преобразования, который управляет потоком, включенным.
Например:
task< int > get_value ();
io_service ioSvc;
task<> example ()
{
// Starts executing get_value() on the current thread.
int a = co_await get_value ();
// Starts executing get_value() on a thread associated with ioSvc.
int b = co_await schedule_on (ioSvc, get_value ());
}Резюме API:
// <cppcoro/schedule_on.hpp>
namespace cppcoro
{
// Return a task that yields the same result as 't' but that
// ensures that 't' is co_await'ed on a thread associated with
// the specified scheduler. Resulting task will complete on
// whatever thread 't' would normally complete on.
template < typename SCHEDULER, typename AWAITABLE>
auto schedule_on (SCHEDULER& scheduler, AWAITABLE awaitable)
-> Awaitable<typename awaitable_traits<AWAITABLE>::await_result_t>;
// Return a generator that yields the same sequence of results as
// 'source' but that ensures that execution of the coroutine starts
// execution on a thread associated with 'scheduler' and resumes
// after a 'co_yield' on a thread associated with 'scheduler'.
template < typename SCHEDULER, typename T>
async_generator<T> schedule_on (SCHEDULER& scheduler, async_generator<T> source);
template < typename SCHEDULER>
struct schedule_on_transform
{
explicit schedule_on_transform (SCHEDULER& scheduler) noexcept ;
SCHEDULER& scheduler;
};
template < typename SCHEDULER>
schedule_on_transform<SCHEDULER> schedule_on (SCHEDULER& scheduler) noexcept ;
template < typename T, typename SCHEDULER>
decltype ( auto ) operator |(T&& value, schedule_on_transform<SCHEDULER> transform);
}awaitable_traits<T> Эта метафункция шаблона может быть использована для определения того, каким будет результирующий тип выражения co_await , если применить к выражению типа T
Обратите внимание, что это предполагает, что значение типа T ожидается в контексте, в котором он не зависит от любого await_transform , применяемого объектом обещания Coroutine. Результаты могут отличаться, если в таком контексте ожидается значение типа T
Метафункция шаблона awaitable_traits<T> не определяет awaiter_t или await_result_t вложенные typedefs, если тип, T , не может быть ожидается. Это позволяет использовать его в контекстах Sfinae, который отключает перегрузку, когда T не может быть ожидается.
Резюме API:
// <cppcoro/awaitable_traits.hpp>
namespace cppcoro
{
template < typename T>
struct awaitable_traits
{
// The type that results from applying `operator co_await()` to a value
// of type T, if T supports an `operator co_await()`, otherwise is type `T&&`.
typename awaiter_t = <unspecified>;
// The type of the result of co_await'ing a value of type T.
typename await_result_t = <unspecified>;
};
}is_awaitable<T> Метафункция шаблона is_awaitable<T> позволяет вам запросить, может ли определенный тип быть co_await ED или нет из -за коратины.
Резюме API:
// <cppcoro/is_awaitable.hpp>
namespace cppcoro
{
template < typename T>
struct is_awaitable : std::bool_constant<...>
{};
template < typename T>
constexpr bool is_awaitable_v = is_awaitable<T>::value;
}Awaitable<T> концепция Awaitable<T> - это концепция, которая указывает на то, что тип может быть co_await в контексте коратиков, который не имеет перегрузки await_transform и что результат выражения co_await имеет тип, T .
Например, Type task<T> реализует концепцию Awaitable<T&&> тогда как Type task<T>& реализует концепцию Awaitable<T&> .
Awaiter<T> концепция Awaiter<T> - это концепция, которая указывает на тип, содержит методы await_ready , await_suspend и await_resume необходимые для реализации протокола для приостановки/возобновления ожидающегося коратики.
Тип, который удовлетворяет Awaiter<T> должен иметь экземпляр типа, awaiter :
awaiter.await_ready() -> boolawaiter.await_suspend(std::experimental::coroutine_handle<void>{}) -> void или bool или std::experimental::coroutine_handle<P> для некоторого P .awaiter.await_resume() -> T Любой тип, который реализует концепцию Awaiter<T> также реализует Awaitable<T> .
Scheduler Scheduler - это концепция, которая позволяет планировать выполнение CORUTINES в некотором контексте выполнения.
concept Scheduler
{
Awaitable< void > schedule ();
} Учитывая тип, S , который реализует концепцию Scheduler и экземпляр s , типа S :
s.schedule() возвращает ожидаемый тип, который co_await s.schedule() безоговорочно приостановит текущую скручину и планирует его для возобновления в контексте выполнения, связанном с планировщиком, s .co_await s.schedule() имеет тип void . cppcoro::task<> f (Scheduler& scheduler)
{
// Execution of the coroutine is initially on the caller's execution context.
// Suspends execution of the coroutine and schedules it for resumption on
// the scheduler's execution context.
co_await scheduler. schedule ();
// At this point the coroutine is now executing on the scheduler's
// execution context.
}DelayedScheduler DelayedScheduler - это концепция, которая позволяет коратике планировать себя для выполнения в контексте выполнения планировщика после указанной продолжительности времени.
concept DelayedScheduler : Scheduler
{
template < typename REP, typename RATIO>
Awaitable< void > schedule_after (std::chrono::duration<REP, RATIO> delay);
template < typename REP, typename RATIO>
Awaitable< void > schedule_after (
std::chrono::duration<REP, RATIO> delay,
cppcoro::cancellation_token cancellationToken);
} Учитывая тип, S , который реализует DelayedScheduler и s типа S :
s.schedule_after(delay) возвращает объект, который можно ожидать таким образом, что co_await s.schedule_after(delay) приостанавливает текущую корутину для продолжительности delay перед планированием коратики для возобновления в контексте выполнения, связанного с планировщиком, s .co_await s.schedule_after(delay) имеет тип void .Библиотека CPPCORO поддерживает здание под Windows с Visual Studio 2017 и Linux с Clang 5.0+.
Эта библиотека использует систему сборки торта (нет, а не C# One).
Система сборки торта автоматически проверяется как подмодуль GIT, поэтому вам не нужно загружать или устанавливать его отдельно.
В настоящее время эта библиотека требует Visual Studio 2017 или более поздней версии, а также SDK Windows 10.
Поддержка Clang (#3) и Linux (#15) запланирована.
Система сборки торта реализована в Python и требует установки Python 2.7.
Убедитесь, что интерпретатор Python 2.7 находится на вашем пути и доступен как «Python».
Убедитесь, что Visual Studio 2017 Обновление 3 или более поздней версии установлено. Обратите внимание, что есть некоторые известные проблемы с Coroutines в обновлении 2 или ранее, которые были зафиксированы в обновлении 3.
Вы также можете использовать экспериментальную версию компилятора Visual Studio, загрузив пакет Nuget с https://vcppdogfooding.azurewebsites.net/ и разобщив файл .nuget в каталог. Просто обновите файл config.cake , чтобы указать в отделении рассмотрения, изменяя и не наносив пост следующей строки:
nugetPath = None # r'C:PathToVisualCppTools.14.0.25224-Pre'Убедитесь, что у вас установлен Windows 10 SDK. По умолчанию он будет использовать последнюю версию Windows 10 SDK и Universal C.
Репозиторий CPPCORO использует подмодулы GIT, чтобы втянуть источник для системы сборки торта.
Это означает, что вам нужно передать --recursive флаг команде git clone . например.
c:Code> git clone --recursive https://github.com/lewissbaker/cppcoro.git
Если вы уже клонировали CPPCORO, вам следует обновить подмодули после вытягивания изменений.
c:Codecppcoro> git submodule update --init --recursive
Чтобы построить из командной строки, просто запустите «Cake.bat» в корне рабочей области.
например.
C:cppcoro> cake.bat
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Compiling testmain.cpp
Compiling testmain.cpp
Compiling testmain.cpp
Compiling testmain.cpp
...
Linking buildwindows_x86_msvc14.10_debugtestrun.exe
Linking buildwindows_x64_msvc14.10_optimisedtestrun.exe
Linking buildwindows_x86_msvc14.10_optimisedtestrun.exe
Linking buildwindows_x64_msvc14.10_debugtestrun.exe
Generating code
Finished generating code
Generating code
Finished generating code
Build succeeded.
Build took 0:00:02.419.
По умолчанию, запуск cake без аргументов будет создавать все проекты со всеми вариантами сборки и выполнит единичные тесты. Вы можете сузить то, что построено, передавая дополнительные аргументы командной строки. например.
c:cppcoro> cake.bat release=debug architecture=x64 lib/build.cake
Building with C:UsersLewisCodecppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Archiving buildwindows_x64_msvc14.10_debuglibcppcoro.lib
Build succeeded.
Build took 0:00:00.321.
Вы можете запустить cake --help чтобы перечислить доступные параметры командной строки.
Для разработки из Visual Studio вы можете построить файлы .vcproj/.sln, используя cake.bat -p .
например.
c:cppcoro> cake.bat -p
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Generating Solution build/project/cppcoro.sln
Generating Project build/project/cppcoro_tests.vcxproj
Generating Filters build/project/cppcoro_tests.vcxproj.filters
Generating Project build/project/cppcoro.vcxproj
Generating Filters build/project/cppcoro.vcxproj.filters
Build succeeded.
Build took 0:00:00.247.
Когда вы создаете эти проекты из Visual Studio, он вызовет торт, чтобы выполнить сборник.
Проект CPPCORO также может быть построен под Linux, используя CLANG+ LIBC ++ 5.0 или более поздней версии.
Здание CPPCORO было проверено под Ubuntu 17.04.
Убедитесь, что у вас установлены следующие пакеты:
Это предполагает, что вы создали и установили Clang и Libc ++.
Если у вас еще нет настроения Clang, см. Следующие разделы для получения подробной информации о настройке Clang для построения с помощью CPPCoro.
Посмотреть Cppcoro и его подмодули:
git clone --recursive https://github.com/lewissbaker/cppcoro.git cppcoro
Запустите init.sh , чтобы настроить функцию cake Bash:
cd cppcoro
source init.sh
Затем вы можете запустить cake из корня рабочей области, чтобы построить cppcoro и запустить тесты:
$ cake
Вы можете указать дополнительные аргументы командной строки для настройки сборки:
--help распечатает помощь для аргументов командной строки--debug=run покажет запуск командных линий сборкиrelease=debug или release=optimised ограничит вариант сборки либо отладкой, либо оптимизированным (по умолчанию он построит оба).lib/build.cake просто построит библиотеку Cppcoro, а не тесты.test/build.cake@task_tests.cpp просто скомпилируется конкретным исходным файломtest/build.cake@testresult построит и запустит тестыНапример:
$ cake --debug=run release=debug lib/build.cake
Если ваш компилятор Clang не находится по адресу /usr/bin/clang , вы можете указать альтернативное местоположение, используя один или несколько из следующих вариантов командной строки для cake :
--clang-executable=<name> -укажите исполняемое имя Clang для использования вместо clang . например. Чтобы заставить использование Clang 8.0 Pass --clang-executable=clang-8--clang-executable=<abspath> -Укажите полный путь к исполнению Clang. Система сборки также будет искать другие исполняемые файлы в том же каталоге. Если этот путь имеет форму <prefix>/bin/<name> то это также установит Clang-install-prefix по умолчанию <prefix> .--clang-install-prefix=<path> -Укажите путь, где был установлен Clang. Это приведет к тому, что система сборки будет искать кланг под <path>/bin (если только переопределяется по --clang-executable ).--libcxx-install-prefix=<path> -Укажите путь, где был установлен LIBC ++. По умолчанию система сборки будет искать Libc ++ в том же месте, что и Clang. Используйте эту опцию командной строки, если она установлена в другом месте.Пример: используйте определенную версию Clang, установленную в месте по умолчанию
$ cake --clang-executable=clang-8
Пример: используйте версию Clang по умолчанию из пользовательского местоположения
$ cake --clang-install-prefix=/path/to/clang-install
Пример: используйте конкретную версию Clang, в пользовательском месте, с LIBC ++ из другого места
$ cake --clang-executable=/path/to/clang-install/bin/clang-8 --libcxx-install-prefix=/path/to/libcxx-install
If your Linux distribution does not have a version of Clang 5.0 or later available, you can install a snapshot build from the LLVM project.
Follow instructions at http://apt.llvm.org/ to setup your package manager to support pulling from the LLVM package manager.
For example, for Ubuntu 17.04 Zesty:
Edit /etc/apt/sources.list and add the following lines:
deb http://apt.llvm.org/zesty/ llvm-toolchain-zesty main
deb-src http://apt.llvm.org/zesty/ llvm-toolchain-zesty main
Install the PGP key for those packages:
$ wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | sudo apt-key add -
Install Clang and LLD:
$ sudo apt-get install clang-6.0 lld-6.0
The LLVM snapshot builds do not include libc++ versions so you'll need to build that yourself. См. ниже.
You can also use the bleeding-edge Clang version by building Clang from source yourself.
See instructions here:
To do this you will need to install the following pre-requisites:
$ sudo apt-get install git cmake ninja-build clang lld
Note that we are using your distribution's version of clang to build clang from source. GCC could also be used here instead.
Checkout LLVM + Clang + LLD + libc++ repositories:
mkdir llvm
cd llvm
git clone --depth=1 https://github.com/llvm-mirror/llvm.git llvm
git clone --depth=1 https://github.com/llvm-mirror/clang.git llvm/tools/clang
git clone --depth=1 https://github.com/llvm-mirror/lld.git llvm/tools/lld
git clone --depth=1 https://github.com/llvm-mirror/libcxx.git llvm/projects/libcxx
ln -s llvm/tools/clang clang
ln -s llvm/tools/lld lld
ln -s llvm/projects/libcxx libcxx
Configure and build Clang:
mkdir clang-build
cd clang-build
cmake -GNinja
-DCMAKE_CXX_COMPILER=/usr/bin/clang++
-DCMAKE_C_COMPILER=/usr/bin/clang
-DCMAKE_BUILD_TYPE=MinSizeRel
-DCMAKE_INSTALL_PREFIX="/path/to/clang/install"
-DCMAKE_BUILD_WITH_INSTALL_RPATH="yes"
-DLLVM_TARGETS_TO_BUILD=X86
-DLLVM_ENABLE_PROJECTS="lld;clang"
../llvm
ninja install-clang
install-clang-headers
install-llvm-ar
install-lld
The cppcoro project requires libc++ as it contains the <experimental/coroutine> header required to use C++ coroutines under Clang.
Checkout libc++ + llvm :
mkdir llvm
cd llvm
git clone --depth=1 https://github.com/llvm-mirror/llvm.git llvm
git clone --depth=1 https://github.com/llvm-mirror/libcxx.git llvm/projects/libcxx
ln -s llvm/projects/libcxx libcxx
Build libc++ :
mkdir libcxx-build
cd libcxx-build
cmake -GNinja
-DCMAKE_CXX_COMPILER="/path/to/clang/install/bin/clang++"
-DCMAKE_C_COMPILER="/path/to/clang/install/bin/clang"
-DCMAKE_BUILD_TYPE=Release
-DCMAKE_INSTALL_PREFIX="/path/to/clang/install"
-DLLVM_PATH="../llvm"
-DLIBCXX_CXX_ABI=libstdc++
-DLIBCXX_CXX_ABI_INCLUDE_PATHS="/usr/include/c++/6.3.0/;/usr/include/x86_64-linux-gnu/c++/6.3.0/"
../libcxx
ninja cxx
ninja install
This will build and install libc++ into the same install directory where you have clang installed.
The cppcoro port in vcpkg is kept up to date by Microsoft team members and community contributors. The url of vcpkg is: https://github.com/Microsoft/vcpkg . You can download and install cppcoro using the vcpkg dependency manager:
git clone https://github.com/Microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh # ./bootstrap-vcpkg.bat for Windows
./vcpkg integrate install
./vcpkg install cppcoroЕсли версия установлена на устаре, пожалуйста, создайте проблему или запрос на вытягивание в репозитории VCPKG.
GitHub issues are the primary mechanism for support, bug reports and feature requests.
Contributions are welcome and pull-requests will be happily reviewed. I only ask that you agree to license any contributions that you make under the MIT license.
If you have general questions about C++ coroutines, you can generally find someone to help in the #coroutines channel on Cpplang Slack group.