“ CPPCORO”库提供了一系列通用原语,用于利用N4680中描述的Coroutines TS提案。
其中包括:
task<T>shared_task<T>generator<T>recursive_generator<T>async_generator<T>single_consumer_eventsingle_consumer_async_auto_reset_eventasync_mutexasync_manual_reset_eventasync_auto_reset_eventasync_latchsequence_barriermulti_producer_sequencersingle_producer_sequencersync_wait()when_all()when_all_ready()fmap()schedule_on()resume_on()cancellation_tokencancellation_sourcecancellation_registrationstatic_thread_poolio_service和io_work_scopefile , readable_file , writable_fileread_only_file , write_only_file , read_write_filesocketip_address , ipv4_address , ipv6_addressip_endpoint , ipv4_endpoint , ipv6_endpointis_awaitable<T>awaitable_traits<T>Awaitable<T>Awaiter<T>SchedulerDelayedScheduler该库是一个实验库,正在探索可以在C ++ Coroutines建议顶部构建的高性能,可扩展异步编程摘要的空间。
它是开源的,希望其他人会发现它有用,并且C ++社区可以提供有关它的反馈和改善它的方法。
它需要一个支持Coroutines ts的编译器:
Linux版本具有功能性,除了io_context和File I/O相关类,该类尚未针对Linux实施(有关更多信息,请参见第15期)。
task<T>一项任务代表了一个异步计算,该计算懒惰地执行,因为在等待任务之前,coroutine的执行不会启动。
例子:
# include < cppcoro/read_only_file.hpp >
# include < cppcoro/task.hpp >
cppcoro::task< int > count_lines (std::string path)
{
auto file = co_await cppcoro::read_only_file::open (path);
int lineCount = 0 ;
char buffer[ 1024 ];
size_t bytesRead;
std:: uint64_t offset = 0 ;
do
{
bytesRead = co_await file. read (offset, buffer, sizeof (buffer));
lineCount += std::count (buffer, buffer + bytesRead, ' n ' );
offset += bytesRead;
} while (bytesRead > 0 );
co_return lineCount;
}
cppcoro::task<> usage_example ()
{
// Calling function creates a new task but doesn't start
// executing the coroutine yet.
cppcoro::task< int > countTask = count_lines ( " foo.txt " );
// ...
// Coroutine is only started when we later co_await the task.
int lineCount = co_await countTask;
std::cout << " line count = " << lineCount << std::endl;
}API概述:
// <cppcoro/task.hpp>
namespace cppcoro
{
template < typename T>
class task
{
public:
using promise_type = <unspecified>;
using value_type = T;
task () noexcept ;
task (task&& other) noexcept ;
task& operator =(task&& other);
// task is a move-only type.
task ( const task& other) = delete ;
task& operator =( const task& other) = delete ;
// Query if the task result is ready.
bool is_ready () const noexcept ;
// Wait for the task to complete and return the result or rethrow the
// exception if the operation completed with an unhandled exception.
//
// If the task is not yet ready then the awaiting coroutine will be
// suspended until the task completes. If the the task is_ready() then
// this operation will return the result synchronously without suspending.
Awaiter<T&> operator co_await () const & noexcept ;
Awaiter<T&&> operator co_await () const && noexcept ;
// Returns an awaitable that can be co_await'ed to suspend the current
// coroutine until the task completes.
//
// The 'co_await t.when_ready()' expression differs from 'co_await t' in
// that when_ready() only performs synchronization, it does not return
// the result or rethrow the exception.
//
// This can be useful if you want to synchronize with the task without
// the possibility of it throwing an exception.
Awaitable< void > when_ready () const noexcept ;
};
template < typename T>
void swap (task<T>& a, task<T>& b);
// Creates a task that yields the result of co_await'ing the specified awaitable.
//
// This can be used as a form of type-erasure of the concrete awaitable, allowing
// different awaitables that return the same await-result type to be stored in
// the same task<RESULT> type.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t >
task<RESULT> make_task (AWAITABLE awaitable);
}您可以通过调用返回task<T>的coroutine函数来创建task<T>对象。
Coroutine必须包含co_await或co_return的用法。请注意, task<T> coroutine可能不使用co_yield关键字。
当调用返回task<T>的Coroutine时,必要时会分配Coroutine框架,并在Coroutine框架中捕获参数。 Coroutine在Coroutine主体的开头悬挂,并将执行返回给呼叫者,并且一个代表异步计算的task<T>值将从函数调用返回。
当task<T>值为co_await ed时,Coroutine主体将开始执行。这将暂停等待的Coroutine,并开始执行与已久的task<T>值相关的Coroutine。
等待的coroutine将在完成已久task<T>的Coroutine执行的线程上恢复。 IE。执行co_return线程或抛出未经处理的异常,以终止执行coroutine。
如果任务已经完成完成,那么再次等待它将获得已经计算的结果,而无需暂停等待Coroutine。
如果task对象在等待之前被销毁,那么Coroutine将永远不会执行,而破坏者只是破坏了捕获的参数,并释放了Coroutine框架使用的任何内存。
shared_task<T> shared_task<T>类是一种coroutine类型,可异步产生单个值。
这是“懒惰的”,因为任务的执行才能启动,直到某些Coroutine等待它。
它是“共享”的,因为可以复制任务值,从而可以对创建任务的结果进行多个引用。它还允许多个Coroutines同时等待结果。
该任务将开始在首次co_await线程上执行任务。随后的等待者将被暂停,并在任务完成后排队以恢复,或者如果任务已经完成完成,则将继续同步。
如果等待任务完成时暂停了等待者,则将在完成任务执行的线程上恢复。 IE。执行co_return线程或抛出终止执行coroutine的未经处理的异常。
API摘要
namespace cppcoro
{
template < typename T = void >
class shared_task
{
public:
using promise_type = <unspecified>;
using value_type = T;
shared_task () noexcept ;
shared_task ( const shared_task& other) noexcept ;
shared_task (shared_task&& other) noexcept ;
shared_task& operator =( const shared_task& other) noexcept ;
shared_task& operator =(shared_task&& other) noexcept ;
void swap (shared_task& other) noexcept ;
// Query if the task has completed and the result is ready.
bool is_ready () const noexcept ;
// Returns an operation that when awaited will suspend the
// current coroutine until the task completes and the result
// is available.
//
// The type of the result of the 'co_await someTask' expression
// is an l-value reference to the task's result value (unless T
// is void in which case the expression has type 'void').
// If the task completed with an unhandled exception then the
// exception will be rethrown by the co_await expression.
Awaiter<T&> operator co_await () const noexcept ;
// Returns an operation that when awaited will suspend the
// calling coroutine until the task completes and the result
// is available.
//
// The result is not returned from the co_await expression.
// This can be used to synchronize with the task without the
// possibility of the co_await expression throwing an exception.
Awaiter< void > when_ready () const noexcept ;
};
template < typename T>
bool operator ==( const shared_task<T>& a, const shared_task<T>& b) noexcept ;
template < typename T>
bool operator !=( const shared_task<T>& a, const shared_task<T>& b) noexcept ;
template < typename T>
void swap (shared_task<T>& a, shared_task<T>& b) noexcept ;
// Wrap an awaitable value in a shared_task to allow multiple coroutines
// to concurrently await the result.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t >
shared_task<RESULT> make_shared_task (AWAITABLE awaitable);
} shared_task<T>上的所有const方法都是安全地与来自多个线程的同一实例上的其他const方法同时调用的。在shared_task<T>同一实例上同时调用共享shared_task<T>的非const方法是不安全的。
task<T> shared_task<T>类类似于task<T> ,因为该任务在调用coroutine函数时不会立即启动执行。该任务首先在等待时才开始执行。
它与task<T>不同,因为可以复制所得的任务对象,从而允许多个任务对象引用相同的异步结果。它还支持多个珊瑚酸,同时等待任务的结果。
权衡取舍是结果始终是对结果的L值参考,从来没有R值参考(由于结果可以共享),这可能会限制将结果构造到局部变量中的能力。由于需要维持参考数量并支持多个等待者,因此它的运行时间成本也略高。
generator<T> generator代表一种产生类型T值序列的Coroutine类型,其中值懒惰和同步产生。
Coroutine主体能够使用co_yield关键字产生T型的值。但是请注意,Coroutine主体无法使用co_await关键字。值必须同步产生。
例如:
cppcoro::generator< const std:: uint64_t > fibonacci ()
{
std:: uint64_t a = 0 , b = 1 ;
while ( true )
{
co_yield b;
auto tmp = a;
a = b;
b += tmp;
}
}
void usage ()
{
for ( auto i : fibonacci ())
{
if (i > 1'000'000 ) break ;
std::cout << i << std::endl;
}
}当Coroutine函数返回generator<T>时,将创建Coroutine最初悬挂。当generator<T>::begin()方法被调用并继续进行直到到达第一个co_yield语句或完成Coroutine运行到完成时,执行Coroutine进入Coroutine主体。
如果返回的迭代器不等于end()迭代器,则指定迭代器将返回对传递给co_yield语句的值的引用。
在迭代器上调用operator++()将恢复执行coroutine,并继续执行,直到达到下一个co_yield点或coroutine运行到完成()。
Coroutine抛出的任何未经处理的异常都会从begin()或operator++()调用对呼叫者中传播。
API摘要:
namespace cppcoro
{
template < typename T>
class generator
{
public:
using promise_type = <unspecified>;
class iterator
{
public:
using iterator_category = std::input_iterator_tag;
using value_type = std:: remove_reference_t <T>;
using reference = value_type&;
using pointer = value_type*;
using difference_type = std:: size_t ;
iterator ( const iterator& other) noexcept ;
iterator& operator =( const iterator& other) noexcept ;
// If the generator coroutine throws an unhandled exception before producing
// the next element then the exception will propagate out of this call.
iterator& operator ++();
reference operator *() const noexcept ;
pointer operator ->() const noexcept ;
bool operator ==( const iterator& other) const noexcept ;
bool operator !=( const iterator& other) const noexcept ;
};
// Constructs to the empty sequence.
generator () noexcept ;
generator (generator&& other) noexcept ;
generator& operator =(generator&& other) noexcept ;
generator ( const generator& other) = delete ;
generator& operator =( const generator&) = delete ;
~generator ();
// Starts executing the generator coroutine which runs until either a value is yielded
// or the coroutine runs to completion or an unhandled exception propagates out of the
// the coroutine.
iterator begin ();
iterator end () noexcept ;
// Swap the contents of two generators.
void swap (generator& other) noexcept ;
};
template < typename T>
void swap (generator<T>& a, generator<T>& b) noexcept ;
// Apply function, func, lazily to each element of the source generator
// and yield a sequence of the results of calls to func().
template < typename FUNC, typename T>
generator<std:: invoke_result_t <FUNC, T&>> fmap (FUNC func, generator<T> source);
}recursive_generator<T> recursive_generator类似于generator ,除了它被设计为更有效地支持将嵌套序列的元素作为外序元素的元素的元素。
除了能够co_yield二型T型值外,您还可以将recursive_generator<T>类型的值co_yield 。
当您co_yield时, recursive_generator<T>值<t value,屈服发生器的所有元素作为当前发电机的元素产生。当前的Coroutine被暂停,直到消费者消耗了嵌套发生器的所有元素为止,此后,当前Coroutine的点执行将恢复执行以产生下一个元素。
在递归数据结构上迭代的递归generator<T> recursive_generator<T>的好处是, iterator::operator++()能够直接恢复叶片的coroutine以产生下一个元素,而不必为每个元素恢复/悬浮o(deptens o(Depth)Coroutines。侧面是还有其他开销
例如:
// Lists the immediate contents of a directory.
cppcoro::generator<dir_entry> list_directory (std::filesystem::path path);
cppcoro::recursive_generator<dir_entry> list_directory_recursive (std::filesystem::path path)
{
for ( auto & entry : list_directory (path))
{
co_yield entry;
if (entry. is_directory ())
{
co_yield list_directory_recursive (entry. path ());
}
}
}请注意,将fmap()运算符应用于recursive_generator<T>将产生generator<U>类型而不是recursive_generator<U> 。这是因为fmap的用途通常在递归上下文中不使用,我们试图避免递归recursive_generator产生的额外开销。
async_generator<T> async_generator代表一种产生类型T值序列的Coroutine类型,其中懒惰产生值,并且可能异步产生值。
Coroutine主体能够同时使用co_await和co_yield表达式。
发电机的消费者可以将基于for co_await范围的for-loop使用来消耗这些值。
例子
cppcoro::async_generator< int > ticker ( int count, threadpool& tp)
{
for ( int i = 0 ; i < count; ++i)
{
co_await tp. delay ( std::chrono::seconds ( 1 ));
co_yield i;
}
}
cppcoro::task<> consumer (threadpool& tp)
{
auto sequence = ticker ( 10 , tp);
for co_await (std:: uint32_t i : sequence)
{
std::cout << " Tick " << i << std::endl;
}
}API摘要
// <cppcoro/async_generator.hpp>
namespace cppcoro
{
template < typename T>
class async_generator
{
public:
class iterator
{
public:
using iterator_tag = std::forward_iterator_tag;
using difference_type = std:: size_t ;
using value_type = std:: remove_reference_t <T>;
using reference = value_type&;
using pointer = value_type*;
iterator ( const iterator& other) noexcept ;
iterator& operator =( const iterator& other) noexcept ;
// Resumes the generator coroutine if suspended
// Returns an operation object that must be awaited to wait
// for the increment operation to complete.
// If the coroutine runs to completion then the iterator
// will subsequently become equal to the end() iterator.
// If the coroutine completes with an unhandled exception then
// that exception will be rethrown from the co_await expression.
Awaitable<iterator&> operator ++() noexcept ;
// Dereference the iterator.
pointer operator ->() const noexcept ;
reference operator *() const noexcept ;
bool operator ==( const iterator& other) const noexcept ;
bool operator !=( const iterator& other) const noexcept ;
};
// Construct to the empty sequence.
async_generator () noexcept ;
async_generator ( const async_generator&) = delete ;
async_generator (async_generator&& other) noexcept ;
~async_generator ();
async_generator& operator =( const async_generator&) = delete ;
async_generator& operator =(async_generator&& other) noexcept ;
void swap (async_generator& other) noexcept ;
// Starts execution of the coroutine and returns an operation object
// that must be awaited to wait for the first value to become available.
// The result of co_await'ing the returned object is an iterator that
// can be used to advance to subsequent elements of the sequence.
//
// This method is not valid to be called once the coroutine has
// run to completion.
Awaitable<iterator> begin () noexcept ;
iterator end () noexcept ;
};
template < typename T>
void swap (async_generator<T>& a, async_generator<T>& b);
// Apply 'func' to each element of the source generator, yielding a sequence of
// the results of calling 'func' on the source elements.
template < typename FUNC, typename T>
async_generator<std:: invoke_result_t <FUNC, T&>> fmap (FUNC func, async_generator<T> source);
}当破坏async_generator对象时,请求取消基础Coroutine。如果Coroutine已经运行到完成或目前被暂停在co_yield表达式中,则立即将Coroutine销毁。否则,Coroutine将继续执行,直到完成完成或到达下一个co_yield表达式为止。
当Coroutine框架被销毁时,将执行所有变量范围中所有变量的破坏者,以确保清理发电机的资源。
请注意,呼叫者必须确保在消费者coroutine执行co_await表达式等待下一个项目时,必须确保不得破坏async_generator对象。
single_consumer_event这是一种简单的手动递归事件类型,一次仅支持一个coroutine等待它。这可以用来
API摘要:
// <cppcoro/single_consumer_event.hpp>
namespace cppcoro
{
class single_consumer_event
{
public:
single_consumer_event ( bool initiallySet = false ) noexcept ;
bool is_set () const noexcept ;
void set ();
void reset () noexcept ;
Awaiter< void > operator co_await () const noexcept ;
};
}例子:
# include < cppcoro/single_consumer_event.hpp >
cppcoro::single_consumer_event event;
std::string value;
cppcoro::task<> consumer ()
{
// Coroutine will suspend here until some thread calls event.set()
// eg. inside the producer() function below.
co_await event;
std::cout << value << std::endl;
}
void producer ()
{
value = " foo " ;
// This will resume the consumer() coroutine inside the call to set()
// if it is currently suspended.
event. set ();
}single_consumer_async_auto_reset_event该类提供了异步同步原始的原始性,它允许单个coroutine等到事件通过呼叫对set()方法发出信号。
一旦等待事件的coroutine由以前或后续调用set()释放()事件将自动重置回“不设置”状态。
该类是async_auto_reset_event的更有效版本,在一次只能等待事件的情况下,可以使用。如果您需要支持在事件上的多个同时等待coroutines,请改用async_auto_reset_event类。
API摘要:
// <cppcoro/single_consumer_async_auto_reset_event.hpp>
namespace cppcoro
{
class single_consumer_async_auto_reset_event
{
public:
single_consumer_async_auto_reset_event (
bool initiallySet = false ) noexcept ;
// Change the event to the 'set' state. If a coroutine is awaiting the
// event then the event is immediately transitioned back to the 'not set'
// state and the coroutine is resumed.
void set () noexcept ;
// Returns an Awaitable type that can be awaited to wait until
// the event becomes 'set' via a call to the .set() method. If
// the event is already in the 'set' state then the coroutine
// continues without suspending.
// The event is automatically reset back to the 'not set' state
// before resuming the coroutine.
Awaiter< void > operator co_await () const noexcept ;
};
}示例用法:
std::atomic< int > value;
cppcoro::single_consumer_async_auto_reset_event valueDecreasedEvent;
cppcoro::task<> wait_until_value_is_below ( int limit)
{
while (value. load (std::memory_order_relaxed) >= limit)
{
// Wait until there has been some change that we're interested in.
co_await valueDecreasedEvent;
}
}
void change_value ( int delta)
{
value. fetch_add (delta, std::memory_order_relaxed);
// Notify the waiter if there has been some change.
if (delta < 0 ) valueDecreasedEvent. set ();
}async_mutex提供了一个简单的相互排除抽象,该抽象使呼叫者可以从Coroutine内“ co_await”“ co_await”,以暂停Coroutine,直到获得静音锁。
该实现是无锁的,因为等待Mutex的Coroutine不会阻止线程,而是将暂停Coroutine,然后将其恢复到以前的锁定器中的呼叫unlock()中。
API摘要:
// <cppcoro/async_mutex.hpp>
namespace cppcoro
{
class async_mutex_lock ;
class async_mutex_lock_operation ;
class async_mutex_scoped_lock_operation ;
class async_mutex
{
public:
async_mutex () noexcept ;
~async_mutex ();
async_mutex ( const async_mutex&) = delete ;
async_mutex& operator ( const async_mutex&) = delete;
bool try_lock () noexcept ;
async_mutex_lock_operation lock_async () noexcept ;
async_mutex_scoped_lock_operation scoped_lock_async () noexcept ;
void unlock ();
};
class async_mutex_lock_operation
{
public:
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () const noexcept ;
};
class async_mutex_scoped_lock_operation
{
public:
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
[[nodiscard]] async_mutex_lock await_resume () const noexcept ;
};
class async_mutex_lock
{
public:
// Takes ownership of the lock.
async_mutex_lock (async_mutex& mutex, std:: adopt_lock_t ) noexcept ;
// Transfer ownership of the lock.
async_mutex_lock (async_mutex_lock&& other) noexcept ;
async_mutex_lock ( const async_mutex_lock&) = delete ;
async_mutex_lock& operator =( const async_mutex_lock&) = delete ;
// Releases the lock by calling unlock() on the mutex.
~async_mutex_lock ();
};
}示例用法:
# include < cppcoro/async_mutex.hpp >
# include < cppcoro/task.hpp >
# include < set >
# include < string >
cppcoro::async_mutex mutex;
std::set<std::string> values;
cppcoro::task<> add_item (std::string value)
{
cppcoro::async_mutex_lock lock = co_await mutex. scoped_lock_async ();
values. insert ( std::move (value));
}async_manual_reset_event手动递归事件是一个coroutine/螺纹同步原始化,它允许一个或多个线程等待,直到事件通过调用set()线程发出信号。
该活动在两个州之一。 “设置”和“未设置” 。
如果事件处于“集合”状态时,当Coroutine等待事件时,那么Coroutine继续而不会暂停。但是,如果Coroutine处于“未集”状态,则暂停Coroutine,直到某个线程随后调用set()方法为止。
等待事件变为“集合”时悬挂的任何线程都将通过某个线程恢复到set()的下一个呼叫中。
请注意,您必须确保在事件被破坏时不在等待“不设置”事件,因为它们不会恢复。
例子:
cppcoro::async_manual_reset_event event;
std::string value;
void producer ()
{
value = get_some_string_value ();
// Publish a value by setting the event.
event. set ();
}
// Can be called many times to create many tasks.
// All consumer tasks will wait until value has been published.
cppcoro::task<> consumer ()
{
// Wait until value has been published by awaiting event.
co_await event;
consume_value (value);
}API摘要:
namespace cppcoro
{
class async_manual_reset_event_operation ;
class async_manual_reset_event
{
public:
async_manual_reset_event ( bool initiallySet = false ) noexcept ;
~async_manual_reset_event ();
async_manual_reset_event ( const async_manual_reset_event&) = delete ;
async_manual_reset_event (async_manual_reset_event&&) = delete ;
async_manual_reset_event& operator =( const async_manual_reset_event&) = delete ;
async_manual_reset_event& operator =(async_manual_reset_event&&) = delete ;
// Wait until the event becomes set.
async_manual_reset_event_operation operator co_await () const noexcept ;
bool is_set () const noexcept ;
void set () noexcept ;
void reset () noexcept ;
};
class async_manual_reset_event_operation
{
public:
async_manual_reset_event_operation (async_manual_reset_event& event) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () const noexcept ;
};
}async_auto_reset_event一个自动收集事件是一个coroutine/螺纹同步原始事件,它允许一个或多个线程等到通过调用set()通过线程发出事件的信号,直到事件发出信号()。
一旦在等待事件的Coroutine被以前或后续的调用set()发布()事件会自动重置回“不设置”状态。
API摘要:
// <cppcoro/async_auto_reset_event.hpp>
namespace cppcoro
{
class async_auto_reset_event_operation ;
class async_auto_reset_event
{
public:
async_auto_reset_event ( bool initiallySet = false ) noexcept ;
~async_auto_reset_event ();
async_auto_reset_event ( const async_auto_reset_event&) = delete ;
async_auto_reset_event (async_auto_reset_event&&) = delete ;
async_auto_reset_event& operator =( const async_auto_reset_event&) = delete ;
async_auto_reset_event& operator =(async_auto_reset_event&&) = delete ;
// Wait for the event to enter the 'set' state.
//
// If the event is already 'set' then the event is set to the 'not set'
// state and the awaiting coroutine continues without suspending.
// Otherwise, the coroutine is suspended and later resumed when some
// thread calls 'set()'.
//
// Note that the coroutine may be resumed inside a call to 'set()'
// or inside another thread's call to 'operator co_await()'.
async_auto_reset_event_operation operator co_await () const noexcept ;
// Set the state of the event to 'set'.
//
// If there are pending coroutines awaiting the event then one
// pending coroutine is resumed and the state is immediately
// set back to the 'not set' state.
//
// This operation is a no-op if the event was already 'set'.
void set () noexcept ;
// Set the state of the event to 'not-set'.
//
// This is a no-op if the state was already 'not set'.
void reset () noexcept ;
};
class async_auto_reset_event_operation
{
public:
explicit async_auto_reset_event_operation (async_auto_reset_event& event) noexcept ;
async_auto_reset_event_operation ( const async_auto_reset_event_operation& other) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () const noexcept ;
};
}async_latch异步闩锁是一种同步原始性,它允许旋ou等待,直到将计数器降低到零为止。
闩锁是一次使用对象。一旦计数器达到零,闩锁就会“准备就绪”,并将保持准备,直到闩锁被摧毁为止。
API摘要:
// <cppcoro/async_latch.hpp>
namespace cppcoro
{
class async_latch
{
public:
// Initialise the latch with the specified count.
async_latch (std:: ptrdiff_t initialCount) noexcept ;
// Query if the count has reached zero yet.
bool is_ready () const noexcept ;
// Decrement the count by n.
// This will resume any waiting coroutines if the count reaches zero
// as a result of this call.
// It is undefined behaviour to decrement the count below zero.
void count_down (std:: ptrdiff_t n = 1 ) noexcept ;
// Wait until the latch becomes ready.
// If the latch count is not yet zero then the awaiting coroutine will
// be suspended and later resumed by a call to count_down() that decrements
// the count to zero. If the latch count was already zero then the coroutine
// continues without suspending.
Awaiter< void > operator co_await () const noexcept ;
};
}sequence_barrier sequence_barrier是一种同步原始化,它允许单个生产者和多个消费者相对于单调增加的序列数进行协调。
单个生产者通过以单调增加的顺序发布新序列编号来推进序列编号。一个或多个消费者可以查询最后发布的序列编号,并且可以等到发布特定序列编号为止。
序列屏障可以用来将光标表示为线程安全/消费者环形弹跳器
有关更多背景:https://lmax-exchange.github.io/disruptor/files/files/disruptor-1.0.pdf
API摘要:
namespace cppcoro
{
template < typename SEQUENCE = std:: size_t ,
typename TRAITS = sequence_traits<SEQUENCE>>
class sequence_barrier
{
public:
sequence_barrier (SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept ;
~sequence_barrier ();
SEQUENCE last_published () const noexcept ;
// Wait until the specified targetSequence number has been published.
//
// If the operation does not complete synchronously then the awaiting
// coroutine is resumed on the specified scheduler. Otherwise, the
// coroutine continues without suspending.
//
// The co_await expression resumes with the updated last_published()
// value, which is guaranteed to be at least 'targetSequence'.
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<SEQUENCE> wait_until_published (SEQUENCE targetSequence,
SCHEDULER& scheduler) const noexcept ;
void publish (SEQUENCE sequence) noexcept ;
};
}single_producer_sequencer single_producer_sequencer是一种同步原始化,可用于协调单个生产商和一个或多个消费者的弹跳器的访问。
生产者首先在弹跳器中获取一个或多个插槽,将与这些插槽相对应的环形弹跳器元素写入,然后最终发布写入这些插槽的值。在消费者消费之前,生产商永远不会生产更多的“缓冲”元素。
然后,消费者等待发布某些元素,处理项目,然后通过发布sequence_barrier对象完成的序列编号完成处理项目时通知生产商。
API摘要:
// <cppcoro/single_producer_sequencer.hpp>
namespace cppcoro
{
template <
typename SEQUENCE = std:: size_t ,
typename TRAITS = sequence_traits<SEQUENCE>>
class single_producer_sequencer
{
public:
using size_type = typename sequence_range<SEQUENCE, TRAITS>::size_type;
single_producer_sequencer (
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
std:: size_t bufferSize,
SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept ;
// Publisher API:
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<SEQUENCE> claim_one (SCHEDULER& scheduler) noexcept ;
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<sequence_range<SEQUENCE>> claim_up_to (
std:: size_t count,
SCHEDULER& scheduler) noexcept ;
void publish (SEQUENCE sequence) noexcept ;
// Consumer API:
SEQUENCE last_published () const noexcept ;
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<SEQUENCE> wait_until_published (
SEQUENCE targetSequence,
SCHEDULER& scheduler) const noexcept ;
};
}示例用法:
using namespace cppcoro ;
using namespace std ::chrono ;
struct message
{
int id;
steady_clock::time_point timestamp;
float data;
};
constexpr size_t bufferSize = 16384 ; // Must be power-of-two
constexpr size_t indexMask = bufferSize - 1 ;
message buffer[bufferSize];
task< void > producer (
io_service& ioSvc,
single_producer_sequencer< size_t >& sequencer)
{
auto start = steady_clock::now ();
for ( int i = 0 ; i < 1'000'000 ; ++i)
{
// Wait until a slot is free in the buffer.
size_t seq = co_await sequencer. claim_one (ioSvc);
// Populate the message.
auto & msg = buffer[seq & indexMask];
msg. id = i;
msg. timestamp = steady_clock::now ();
msg. data = 123 ;
// Publish the message.
sequencer. publish (seq);
}
// Publish a sentinel
auto seq = co_await sequencer. claim_one (ioSvc);
auto & msg = buffer[seq & indexMask];
msg. id = - 1 ;
sequencer. publish (seq);
}
task< void > consumer (
static_thread_pool& threadPool,
const single_producer_sequencer< size_t >& sequencer,
sequence_barrier< size_t >& consumerBarrier)
{
size_t nextToRead = 0 ;
while ( true )
{
// Wait until the next message is available
// There may be more than one available.
const size_t available = co_await sequencer. wait_until_published (nextToRead, threadPool);
do {
auto & msg = buffer[nextToRead & indexMask];
if (msg. id == - 1 )
{
consumerBarrier. publish (nextToRead);
co_return ;
}
processMessage (msg);
} while (nextToRead++ != available);
// Notify the producer that we've finished processing
// up to 'nextToRead - 1'.
consumerBarrier. publish (available);
}
}
task< void > example (io_service& ioSvc, static_thread_pool& threadPool)
{
sequence_barrier< size_t > barrier;
single_producer_sequencer< size_t > sequencer{barrier, bufferSize};
co_await when_all (
producer (tp, sequencer),
consumer (tp, sequencer, barrier));
}multi_producer_sequencer multi_producer_sequencer类是一种同步原始化,可协调多个生产商和一个或多个消费者的弹跳器的访问。
有关单生产商变体,请参见single_producer_sequencer类。
请注意,弹跳器必须具有两个尺寸的两个尺寸。这是因为实现使用BitMasks代替整数除法/模块来计算缓冲区中的偏移量。另外,这允许序列编号安全地包裹在32位/64位值周围。
API摘要:
// <cppcoro/multi_producer_sequencer.hpp>
namespace cppcoro
{
template < typename SEQUENCE = std:: size_t ,
typename TRAITS = sequence_traits<SEQUENCE>>
class multi_producer_sequencer
{
public:
multi_producer_sequencer (
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
SEQUENCE initialSequence = TRAITS::initial_sequence);
std:: size_t buffer_size () const noexcept ;
// Consumer interface
//
// Each consumer keeps track of their own 'lastKnownPublished' value
// and must pass this to the methods that query for an updated last-known
// published sequence number.
SEQUENCE last_published_after (SEQUENCE lastKnownPublished) const noexcept ;
template < typename SCHEDULER>
Awaitable<SEQUENCE> wait_until_published (
SEQUENCE targetSequence,
SEQUENCE lastKnownPublished,
SCHEDULER& scheduler) const noexcept ;
// Producer interface
// Query whether any slots available for claiming (approx.)
bool any_available () const noexcept ;
template < typename SCHEDULER>
Awaitable<SEQUENCE> claim_one (SCHEDULER& scheduler) noexcept ;
template < typename SCHEDULER>
Awaitable<sequence_range<SEQUENCE, TRAITS>> claim_up_to (
std:: size_t count,
SCHEDULER& scheduler) noexcept ;
// Mark the specified sequence number as published.
void publish (SEQUENCE sequence) noexcept ;
// Mark all sequence numbers in the specified range as published.
void publish ( const sequence_range<SEQUENCE, TRAITS>& range) noexcept ;
};
}cancellation_token是一个值,它可以传递给函数,该功能允许呼叫者随后将请求传达以取消操作的请求。
要获得能够取消的cancellation_token ,必须首先创建一个cancellation_source对象。 cancellation_source::token()方法可用于制造与cancellation_source对象链接的新cancellation_token值。
当您稍后请求取消操作时,您已传递了cancellation_token ,您可以在关联的cancellation_source对象上调用cancellation_source::request_cancellation() 。
功能可以通过两种方式之一响应取消请求:
cancellation_token::is_cancellation_requested()或cancellation_token::throw_if_cancellation_requested() ,以定期取消进行投票。cancellation_registration类请求取消时,注册要执行的回调。API摘要:
namespace cppcoro
{
class cancellation_source
{
public:
// Construct a new, independently cancellable cancellation source.
cancellation_source ();
// Construct a new reference to the same cancellation state.
cancellation_source ( const cancellation_source& other) noexcept ;
cancellation_source (cancellation_source&& other) noexcept ;
~cancellation_source ();
cancellation_source& operator =( const cancellation_source& other) noexcept ;
cancellation_source& operator =(cancellation_source&& other) noexcept ;
bool is_cancellation_requested () const noexcept ;
bool can_be_cancelled () const noexcept ;
void request_cancellation ();
cancellation_token token () const noexcept ;
};
class cancellation_token
{
public:
// Construct a token that can't be cancelled.
cancellation_token () noexcept ;
cancellation_token ( const cancellation_token& other) noexcept ;
cancellation_token (cancellation_token&& other) noexcept ;
~cancellation_token ();
cancellation_token& operator =( const cancellation_token& other) noexcept ;
cancellation_token& operator =(cancellation_token&& other) noexcept ;
bool is_cancellation_requested () const noexcept ;
void throw_if_cancellation_requested () const ;
// Query if this token can ever have cancellation requested.
// Code can use this to take a more efficient code-path in cases
// that the operation does not need to handle cancellation.
bool can_be_cancelled () const noexcept ;
};
// RAII class for registering a callback to be executed if cancellation
// is requested on a particular cancellation token.
class cancellation_registration
{
public:
// Register a callback to be executed if cancellation is requested.
// Callback will be called with no arguments on the thread that calls
// request_cancellation() if cancellation is not yet requested, or
// called immediately if cancellation has already been requested.
// Callback must not throw an unhandled exception when called.
template < typename CALLBACK>
cancellation_registration (cancellation_token token, CALLBACK&& callback);
cancellation_registration ( const cancellation_registration& other) = delete ;
~cancellation_registration ();
};
class operation_cancelled : public std :: exception
{
public:
operation_cancelled ();
const char * what () const override ;
};
}示例:投票方法
cppcoro::task<> do_something_async (cppcoro::cancellation_token token)
{
// Explicitly define cancellation points within the function
// by calling throw_if_cancellation_requested().
token. throw_if_cancellation_requested ();
co_await do_step_1 ();
token. throw_if_cancellation_requested ();
do_step_2 ();
// Alternatively, you can query if cancellation has been
// requested to allow yourself to do some cleanup before
// returning.
if (token. is_cancellation_requested ())
{
display_message_to_user ( " Cancelling operation... " );
do_cleanup ();
throw cppcoro::operation_cancelled{};
}
do_final_step ();
}示例:回调方法
// Say we already have a timer abstraction that supports being
// cancelled but it doesn't support cancellation_tokens natively.
// You can use a cancellation_registration to register a callback
// that calls the existing cancellation API. e.g.
cppcoro::task<> cancellable_timer_wait (cppcoro::cancellation_token token)
{
auto timer = create_timer (10s);
cppcoro::cancellation_registration registration (token, [&]
{
// Call existing timer cancellation API.
timer. cancel ();
});
co_await timer;
}static_thread_pool static_thread_pool类提供了一个抽象,使您可以安排在固定尺寸的线程池上工作。
该课程实现了调度程序概念(见下文)。
您可以通过执行co_await threadPool.schedule()来加入线程池。此操作将暂停当前的coroutine,在螺纹池上执行它,然后在接下来免费运行螺纹池中的线程时,线程池将恢复coroutine。保证不投掷此操作,在常见情况下,不会分配任何内存。
该课程利用偷窃算法来负载跨多个线程的负载平衡工作。将从线程池螺纹汇到线程池的工作将安排在LIFO队列中的同一线程上执行。从远程线程中加入线程池的工作将被纳入全局FIFO队列。当工人线程从其本地队列中用完工作用尽时,它首先尝试从全球队列中脱离工作。如果该队列为空,那么接下来它会试图从其他工人线程的队列的背面窃取工作。
API摘要:
namespace cppcoro
{
class static_thread_pool
{
public:
// Initialise the thread-pool with a number of threads equal to
// std::thread::hardware_concurrency().
static_thread_pool ();
// Initialise the thread pool with the specified number of threads.
explicit static_thread_pool (std:: uint32_t threadCount);
std:: uint32_t thread_count () const noexcept ;
class schedule_operation
{
public:
schedule_operation (static_thread_pool* tp) noexcept ;
bool await_ready () noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> h) noexcept ;
bool await_resume () noexcept ;
private:
// unspecified
};
// Return an operation that can be awaited by a coroutine.
//
//
[[nodiscard]]
schedule_operation schedule () noexcept ;
private:
// Unspecified
};
}示例用法:简单
cppcoro::task<std::string> do_something_on_threadpool (cppcoro::static_thread_pool& tp)
{
// First schedule the coroutine onto the threadpool.
co_await tp. schedule ();
// When it resumes, this coroutine is now running on the threadpool.
do_something ();
}示例用法:并行执行操作 - 使用static_thread_pool使用schedule_on()运算符。
cppcoro::task< double > dot_product (static_thread_pool& tp, double a[], double b[], size_t count)
{
if (count > 1000 )
{
// Subdivide the work recursively into two equal tasks
// The first half is scheduled to the thread pool so it can run concurrently
// with the second half which continues on this thread.
size_t halfCount = count / 2 ;
auto [first, second] = co_await when_all (
schedule_on (tp, dot_product (tp, a, b, halfCount),
dot_product (tp, a + halfCount, b + halfCount, count - halfCount));
co_return first + second;
}
else
{
double sum = 0.0 ;
for ( size_t i = 0 ; i < count; ++i)
{
sum += a[i] * b[i];
}
co_return sum;
}
}io_service和io_work_scope io_service类为从异步I/O操作处理I/O完成事件提供了一个抽象。
当异步I/O操作完成时,等待该操作的Coroutine将在呼叫中的I/O线程上恢复到一个事件处理方法之一: process_events() , process_pending_events() ,process_one_event(), process_one_event()或process_one_pending_event() 。
io_service类不管理任何I/O线程。您必须确保某些线程调用一种用于派遣I/O完成事件的事件处理方法之一。这可以是一个专用线程,通过通过调用process_events()或process_one_pending_event()定期轮询新事件来调用process_pending_events()或与其他事件循环(例如UI事件循环)混合。
这允许将io_service事件环与其他事件循环(例如用户界面事件循环)集成。
您可以通过拥有多个线程call process_events()来对事件进行多路复用处理。您可以指定通过可选的io_service构造函数参数积极处理事件的最大线程数量的提示。
在Windows上,该实现利用Windows I/O完成端口工厂以可扩展的方式将事件分配给I/O线程。
API摘要:
namespace cppcoro
{
class io_service
{
public:
class schedule_operation ;
class timed_schedule_operation ;
io_service ();
io_service (std:: uint32_t concurrencyHint);
io_service (io_service&&) = delete ;
io_service ( const io_service&) = delete ;
io_service& operator =(io_service&&) = delete ;
io_service& operator =( const io_service&) = delete ;
~io_service ();
// Scheduler methods
[[nodiscard]]
schedule_operation schedule () noexcept ;
template < typename REP, typename RATIO>
[[nodiscard]]
timed_schedule_operation schedule_after (
std::chrono::duration<REP, RATIO> delay,
cppcoro::cancellation_token cancellationToken = {}) noexcept ;
// Event-loop methods
//
// I/O threads must call these to process I/O events and execute
// scheduled coroutines.
std:: uint64_t process_events ();
std:: uint64_t process_pending_events ();
std:: uint64_t process_one_event ();
std:: uint64_t process_one_pending_event ();
// Request that all threads processing events exit their event loops.
void stop () noexcept ;
// Query if some thread has called stop()
bool is_stop_requested () const noexcept ;
// Reset the event-loop after a call to stop() so that threads can
// start processing events again.
void reset ();
// Reference-counting methods for tracking outstanding references
// to the io_service.
//
// The io_service::stop() method will be called when the last work
// reference is decremented.
//
// Use the io_work_scope RAII class to manage calling these methods on
// entry-to and exit-from a scope.
void notify_work_started () noexcept ;
void notify_work_finished () noexcept ;
};
class io_service ::schedule_operation
{
public:
schedule_operation ( const schedule_operation&) noexcept ;
schedule_operation& operator =( const schedule_operation&) noexcept ;
bool await_ready () const noexcept ;
void await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () noexcept ;
};
class io_service ::timed_schedule_operation
{
public:
timed_schedule_operation (timed_schedule_operation&&) noexcept ;
timed_schedule_operation ( const timed_schedule_operation&) = delete ;
timed_schedule_operation& operator =( const timed_schedule_operation&) = delete ;
timed_schedule_operation& operator =(timed_schedule_operation&&) = delete ;
bool await_ready () const noexcept ;
void await_suspend (std::experimental::coroutine_handle<> awaiter);
void await_resume ();
};
class io_work_scope
{
public:
io_work_scope (io_service& ioService) noexcept ;
io_work_scope ( const io_work_scope& other) noexcept ;
io_work_scope (io_work_scope&& other) noexcept ;
~io_work_scope ();
io_work_scope& operator =( const io_work_scope& other) noexcept ;
io_work_scope& operator =(io_work_scope&& other) noexcept ;
io_service& service () const noexcept ;
};
}例子:
# include < cppcoro/task.hpp >
# include < cppcoro/task.hpp >
# include < cppcoro/io_service.hpp >
# include < cppcoro/read_only_file.hpp >
# include < experimental/filesystem >
# include < memory >
# include < algorithm >
# include < iostream >
namespace fs = std::experimental::filesystem;
cppcoro::task<std:: uint64_t > count_lines (cppcoro::io_service& ioService, fs::path path)
{
auto file = cppcoro::read_only_file::open (ioService, path);
constexpr size_t bufferSize = 4096 ;
auto buffer = std::make_unique<std:: uint8_t []>(bufferSize);
std:: uint64_t newlineCount = 0 ;
for (std:: uint64_t offset = 0 , fileSize = file. size (); offset < fileSize;)
{
const auto bytesToRead = static_cast < size_t >(
std::min<std:: uint64_t >(bufferSize, fileSize - offset));
const auto bytesRead = co_await file. read (offset, buffer. get (), bytesToRead);
newlineCount += std::count (buffer. get (), buffer. get () + bytesRead, ' n ' );
offset += bytesRead;
}
co_return newlineCount;
}
cppcoro::task<> run (cppcoro::io_service& ioService)
{
cppcoro::io_work_scope ioScope (ioService);
auto lineCount = co_await count_lines (ioService, fs::path{ " foo.txt " });
std::cout << " foo.txt has " << lineCount << " lines. " << std::endl;;
}
cppcoro::task<> process_events (cppcoro::io_service& ioService)
{
// Process events until the io_service is stopped.
// ie. when the last io_work_scope goes out of scope.
ioService. process_events ();
co_return ;
}
int main ()
{
cppcoro::io_service ioService;
cppcoro::sync_wait ( cppcoro::when_all_ready (
run (ioService),
process_events (ioService)));
return 0 ;
}io_service作为调度程序io_service类实现Scheduler和DelayedScheduler概念的接口。
这允许Coroutine暂停当前线程上的执行,并安排与特定io_service对象关联的I/O线程恢复。
例子:
cppcoro::task<> do_something (cppcoro::io_service& ioService)
{
// Coroutine starts execution on the thread of the task awaiter.
// A coroutine can transfer execution to an I/O thread by awaiting the
// result of io_service::schedule().
co_await ioService. schedule ();
// At this point, the coroutine is now executing on an I/O thread
// inside a call to one of the io_service event processing methods.
// A coroutine can also perform a delayed-schedule that will suspend
// the coroutine for a specified duration of time before scheduling
// it for resumption on an I/O thread.
co_await ioService. schedule_after (100ms);
// At this point, the coroutine is executing on a potentially different I/O thread.
}file , readable_file , writable_file这些类型是用于执行混凝土文件I/O的抽象基类。
API摘要:
namespace cppcoro
{
class file_read_operation ;
class file_write_operation ;
class file
{
public:
virtual ~file ();
std:: uint64_t size () const ;
protected:
file (file&& other) noexcept ;
};
class readable_file : public virtual file
{
public:
[[nodiscard]]
file_read_operation read (
std:: uint64_t offset,
void * buffer,
std:: size_t byteCount,
cancellation_token ct = {}) const noexcept ;
};
class writable_file : public virtual file
{
public:
void set_size (std:: uint64_t fileSize);
[[nodiscard]]
file_write_operation write (
std:: uint64_t offset,
const void * buffer,
std:: size_t byteCount,
cancellation_token ct = {}) noexcept ;
};
class file_read_operation
{
public:
file_read_operation (file_read_operation&& other) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter);
std:: size_t await_resume ();
};
class file_write_operation
{
public:
file_write_operation (file_write_operation&& other) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter);
std:: size_t await_resume ();
};
}read_only_file , write_only_file , read_write_file这些类型代表具体文件I/O类。
API摘要:
namespace cppcoro
{
class read_only_file : public readable_file
{
public:
[[nodiscard]]
static read_only_file open (
io_service& ioService,
const std::experimental::filesystem::path& path,
file_share_mode shareMode = file_share_mode::read,
file_buffering_mode bufferingMode = file_buffering_mode::default_);
};
class write_only_file : public writable_file
{
public:
[[nodiscard]]
static write_only_file open (
io_service& ioService,
const std::experimental::filesystem::path& path,
file_open_mode openMode = file_open_mode::create_or_open,
file_share_mode shareMode = file_share_mode::none,
file_buffering_mode bufferingMode = file_buffering_mode::default_);
};
class read_write_file : public readable_file , public writable_file
{
public:
[[nodiscard]]
static read_write_file open (
io_service& ioService,
const std::experimental::filesystem::path& path,
file_open_mode openMode = file_open_mode::create_or_open,
file_share_mode shareMode = file_share_mode::none,
file_buffering_mode bufferingMode = file_buffering_mode::default_);
};
}所有open()函数投掷std::system_error失败。
注意:当前仅在Windows平台上支持网络抽象。 Linux支持即将推出。
socket插座类可用于异步通过网络发送/接收数据。
当前仅支持IPv4和IPv6上的TCP/IP,UDP/IP。
API摘要:
// <cppcoro/net/socket.hpp>
namespace cppcoro ::net
{
class socket
{
public:
static socket create_tcpv4 (ip_service& ioSvc);
static socket create_tcpv6 (ip_service& ioSvc);
static socket create_updv4 (ip_service& ioSvc);
static socket create_udpv6 (ip_service& ioSvc);
socket (socket&& other) noexcept ;
~socket ();
socket& operator =(socket&& other) noexcept ;
// Return the native socket handle for the socket
<platform-specific> native_handle () noexcept ;
const ip_endpoint& local_endpoint () const noexcept ;
const ip_endpoint& remote_endpoint () const noexcept ;
void bind ( const ip_endpoint& localEndPoint);
void listen ();
[[nodiscard]]
Awaitable< void > connect ( const ip_endpoint& remoteEndPoint) noexcept ;
[[nodiscard]]
Awaitable< void > connect ( const ip_endpoint& remoteEndPoint,
cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable< void > accept (socket& acceptingSocket) noexcept ;
[[nodiscard]]
Awaitable< void > accept (socket& acceptingSocket,
cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable< void > disconnect () noexcept ;
[[nodiscard]]
Awaitable< void > disconnect (cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > send ( const void * buffer, std:: size_t size) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > send ( const void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > recv ( void * buffer, std:: size_t size) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > recv ( void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
[[nodiscard]]
socket_recv_from_operation recv_from (
void * buffer,
std:: size_t size) noexcept ;
[[nodiscard]]
socket_recv_from_operation_cancellable recv_from (
void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
[[nodiscard]]
socket_send_to_operation send_to (
const ip_endpoint& destination,
const void * buffer,
std:: size_t size) noexcept ;
[[nodiscard]]
socket_send_to_operation_cancellable send_to (
const ip_endpoint& destination,
const void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
void close_send ();
void close_recv ();
};
}示例:回声服务器
# include < cppcoro/net/socket.hpp >
# include < cppcoro/io_service.hpp >
# include < cppcoro/cancellation_source.hpp >
# include < cppcoro/async_scope.hpp >
# include < cppcoro/on_scope_exit.hpp >
# include < memory >
# include < iostream >
cppcoro::task< void > handle_connection (socket s)
{
try
{
const size_t bufferSize = 16384 ;
auto buffer = std::make_unique< unsigned char []>(bufferSize);
size_t bytesRead;
do {
// Read some bytes
bytesRead = co_await s. recv (buffer. get (), bufferSize);
// Write some bytes
size_t bytesWritten = 0 ;
while (bytesWritten < bytesRead) {
bytesWritten += co_await s. send (
buffer. get () + bytesWritten,
bytesRead - bytesWritten);
}
} while (bytesRead != 0 );
s. close_send ();
co_await s. disconnect ();
}
catch (...)
{
std::cout << " connection failed " << std::
}
}
cppcoro::task< void > echo_server (
cppcoro::net::ipv4_endpoint endpoint,
cppcoro::io_service& ioSvc,
cancellation_token ct)
{
cppcoro::async_scope scope;
std::exception_ptr ex;
try
{
auto listeningSocket = cppcoro::net::socket::create_tcpv4 (ioSvc);
listeningSocket. bind (endpoint);
listeningSocket. listen ();
while ( true ) {
auto connection = cppcoro::net::socket::create_tcpv4 (ioSvc);
co_await listeningSocket. accept (connection, ct);
scope. spawn ( handle_connection ( std::move (connection)));
}
}
catch (cppcoro::operation_cancelled)
{
}
catch (...)
{
ex = std::current_exception ();
}
// Wait until all handle_connection tasks have finished.
co_await scope. join ();
if (ex) std::rethrow_exception (ex);
}
int main ( int argc, const char * argv[])
{
cppcoro::io_service ioSvc;
if (argc != 2 ) return - 1 ;
auto endpoint = cppcoro::ipv4_endpoint::from_string (argv[ 1 ]);
if (!endpoint) return - 1 ;
( void ) cppcoro::sync_wait ( cppcoro::when_all (
[&]() -> task<>
{
// Shutdown the event loop once finished.
auto stopOnExit = cppcoro::on_scope_exit ([&] { ioSvc. stop (); });
cppcoro::cancellation_source canceller;
co_await cppcoro::when_all (
[&]() -> task<>
{
// Run for 30s then stop accepting new connections.
co_await ioSvc. schedule_after ( std::chrono::seconds ( 30 ));
canceller. request_cancellation ();
}(),
echo_server (*endpoint, ioSvc, canceller. token ()));
}(),
[&]() -> task<>
{
ioSvc. process_events ();
}()));
return 0 ;
}ip_address , ipv4_address , ipv6_address用于表示IP地址的辅助类。
API摘要:
namespace cppcoro ::net
{
class ipv4_address
{
using bytes_t = std:: uint8_t [ 4 ];
public:
constexpr ipv4_address ();
explicit constexpr ipv4_address (std:: uint32_t integer);
explicit constexpr ipv4_address ( const std::uint8_t (&bytes)[4]);
explicit constexpr ipv4_address (std:: uint8_t b0,
std:: uint8_t b1,
std:: uint8_t b2,
std:: uint8_t b3);
constexpr const bytes_t & bytes () const ;
constexpr std:: uint32_t to_integer () const ;
static constexpr ipv4_address loopback ();
constexpr bool is_loopback () const ;
constexpr bool is_private_network () const ;
constexpr bool operator ==(ipv4_address other) const ;
constexpr bool operator !=(ipv4_address other) const ;
constexpr bool operator <(ipv4_address other) const ;
constexpr bool operator >(ipv4_address other) const ;
constexpr bool operator <=(ipv4_address other) const ;
constexpr bool operator >=(ipv4_address other) const ;
std::string to_string ();
static std::optional<ipv4_address> from_string (std::string_view string) noexcept ;
};
class ipv6_address
{
using bytes_t = std:: uint8_t [ 16 ];
public:
constexpr ipv6_address ();
explicit constexpr ipv6_address (
std:: uint64_t subnetPrefix,
std:: uint64_t interfaceIdentifier);
constexpr ipv6_address (
std:: uint16_t part0,
std:: uint16_t part1,
std:: uint16_t part2,
std:: uint16_t part3,
std:: uint16_t part4,
std:: uint16_t part5,
std:: uint16_t part6,
std:: uint16_t part7);
explicit constexpr ipv6_address (
const std::uint16_t (&parts)[8]);
explicit constexpr ipv6_address (
const std::uint8_t (bytes)[16]);
constexpr const bytes_t & bytes () const ;
constexpr std:: uint64_t subnet_prefix () const ;
constexpr std:: uint64_t interface_identifier () const ;
static constexpr ipv6_address unspecified ();
static constexpr ipv6_address loopback ();
static std::optional<ipv6_address> from_string (std::string_view string) noexcept ;
std::string to_string () const ;
constexpr bool operator ==( const ipv6_address& other) const ;
constexpr bool operator !=( const ipv6_address& other) const ;
constexpr bool operator <( const ipv6_address& other) const ;
constexpr bool operator >( const ipv6_address& other) const ;
constexpr bool operator <=( const ipv6_address& other) const ;
constexpr bool operator >=( const ipv6_address& other) const ;
};
class ip_address
{
public:
// Constructs to IPv4 address 0.0.0.0
ip_address () noexcept ;
ip_address (ipv4_address address) noexcept ;
ip_address (ipv6_address address) noexcept ;
bool is_ipv4 () const noexcept ;
bool is_ipv6 () const noexcept ;
const ipv4_address& to_ipv4 () const ;
const ipv6_address& to_ipv6 () const ;
const std:: uint8_t * bytes () const noexcept ;
std::string to_string () const ;
static std::optional<ip_address> from_string (std::string_view string) noexcept ;
bool operator ==( const ip_address& rhs) const noexcept ;
bool operator !=( const ip_address& rhs) const noexcept ;
// ipv4_address sorts less than ipv6_address
bool operator <( const ip_address& rhs) const noexcept ;
bool operator >( const ip_address& rhs) const noexcept ;
bool operator <=( const ip_address& rhs) const noexcept ;
bool operator >=( const ip_address& rhs) const noexcept ;
};
}ip_endpoint , ipv4_endpoint ipv6_endpoint用于表示IP地址和端口数的帮助课程。
API摘要:
namespace cppcoro ::net
{
class ipv4_endpoint
{
public:
ipv4_endpoint () noexcept ;
explicit ipv4_endpoint (ipv4_address address, std:: uint16_t port = 0 ) noexcept ;
const ipv4_address& address () const noexcept ;
std:: uint16_t port () const noexcept ;
std::string to_string () const ;
static std::optional<ipv4_endpoint> from_string (std::string_view string) noexcept ;
};
bool operator ==( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator !=( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator <( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator >( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator <=( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator >=( const ipv4_endpoint& a, const ipv4_endpoint& b);
class ipv6_endpoint
{
public:
ipv6_endpoint () noexcept ;
explicit ipv6_endpoint (ipv6_address address, std:: uint16_t port = 0 ) noexcept ;
const ipv6_address& address () const noexcept ;
std:: uint16_t port () const noexcept ;
std::string to_string () const ;
static std::optional<ipv6_endpoint> from_string (std::string_view string) noexcept ;
};
bool operator ==( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator !=( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator <( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator >( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator <=( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator >=( const ipv6_endpoint& a, const ipv6_endpoint& b);
class ip_endpoint
{
public:
// Constructs to IPv4 end-point 0.0.0.0:0
ip_endpoint () noexcept ;
ip_endpoint (ipv4_endpoint endpoint) noexcept ;
ip_endpoint (ipv6_endpoint endpoint) noexcept ;
bool is_ipv4 () const noexcept ;
bool is_ipv6 () const noexcept ;
const ipv4_endpoint& to_ipv4 () const ;
const ipv6_endpoint& to_ipv6 () const ;
ip_address address () const noexcept ;
std:: uint16_t port () const noexcept ;
std::string to_string () const ;
static std::optional<ip_endpoint> from_string (std::string_view string) noexcept ;
bool operator ==( const ip_endpoint& rhs) const noexcept ;
bool operator !=( const ip_endpoint& rhs) const noexcept ;
// ipv4_endpoint sorts less than ipv6_endpoint
bool operator <( const ip_endpoint& rhs) const noexcept ;
bool operator >( const ip_endpoint& rhs) const noexcept ;
bool operator <=( const ip_endpoint& rhs) const noexcept ;
bool operator >=( const ip_endpoint& rhs) const noexcept ;
};
}sync_wait() sync_wait()函数可用于同步等待,直到指定的awaitable完成。
指定的等待期将在新创建的coroutine内部的当前线程上co_await 。
sync_wait()调用将阻塞直到操作完成,并返回co_await表达式的结果,或者如果co_await表达式完成,则以未手动异常而重新归还例外。
sync_wait()函数对于从main()内部启动顶级任务并等到任务完成为止非常有用,实际上,这是启动第一个/顶级task唯一方法。
API摘要:
// <cppcoro/sync_wait.hpp>
namespace cppcoro
{
template < typename AWAITABLE>
auto sync_wait (AWAITABLE&& awaitable)
-> typename awaitable_traits<AWAITABLE&&>::await_result_t;
}示例:
void example_task ()
{
auto makeTask = []() -> task<std::string>
{
co_return " foo " ;
};
auto task = makeTask ();
// start the lazy task and wait until it completes
sync_wait (task); // -> "foo"
sync_wait ( makeTask ()); // -> "foo"
}
void example_shared_task ()
{
auto makeTask = []() -> shared_task<std::string>
{
co_return " foo " ;
};
auto task = makeTask ();
// start the shared task and wait until it completes
sync_wait (task) == " foo " ;
sync_wait ( makeTask ()) == " foo " ;
}when_all_ready() when_all_ready()函数可用于创建一个新的等待,该函数在所有输入等待设备完成后完成。
输入任务可以是任何类型的期待。
当返回的等待期被co_await使用时,它将co_await其传递给when_all_ready()函数的顺序依次在等待线程的每个输入等待线索中。如果这些任务不同步完成,则它们将同时执行。
一旦输入就诊的所有co_await表达式已经完成完成,返回的等待功能将完成并恢复等待的Coroutine。等待的Coroutine将恢复在最后要完成的输入等待的线程上。
当co_await Ed时,即使某些输入等待因素失败而没有例外,也可以保证返回的等待期。
但是,请注意,如果when_all_ready()呼叫本身可能会抛出std::bad_alloc如果它无法为等待每个输入的apotables所需的coroutine帧分配内存。如果任何输入等待的对象都会从其复制/移动构造函数中抛出任何输入的对象,也可能会引发异常。
返回的等待co_await的结果是std::tuple或std::vector的when_all_task<RESULT>结果>对象的vector。这些对象允许您通过调用相应输出任务的when_all_task<RESULT>::result()方法分别获得每个输入等待的结果(或异常)。这使呼叫者可以同时等待多个等待材料并同步其完成,同时仍保留随后检查每个co_await操作的结果的能力,以获得成功/失败。
这不同于when_all()任何单个co_await操作的故障导致整体操作失败,而异常。这意味着您无法确定哪些组件co_await操作失败了,也可以阻止您获得其他co_await操作的结果。
API摘要:
// <cppcoro/when_all_ready.hpp>
namespace cppcoro
{
// Concurrently await multiple awaitables.
//
// Returns an awaitable object that, when co_await'ed, will co_await each of the input
// awaitable objects and will resume the awaiting coroutine only when all of the
// component co_await operations complete.
//
// Result of co_await'ing the returned awaitable is a std::tuple of detail::when_all_task<T>,
// one for each input awaitable and where T is the result-type of the co_await expression
// on the corresponding awaitable.
//
// AWAITABLES must be awaitable types and must be movable (if passed as rvalue) or copyable
// (if passed as lvalue). The co_await expression will be executed on an rvalue of the
// copied awaitable.
template < typename ... AWAITABLES>
auto when_all_ready (AWAITABLES&&... awaitables)
-> Awaitable<std::tuple<detail::when_all_task<typename awaitable_traits<AWAITABLES>::await_result_t>...>>;
// Concurrently await each awaitable in a vector of input awaitables.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t >
auto when_all_ready (std::vector<AWAITABLE> awaitables)
-> Awaitable<std::vector<detail::when_all_task<RESULT>>>;
}示例用法:
task<std::string> get_record ( int id);
task<> example1 ()
{
// Run 3 get_record() operations concurrently and wait until they're all ready.
// Returns a std::tuple of tasks that can be unpacked using structured bindings.
auto [task1, task2, task3] = co_await when_all_ready (
get_record ( 123 ),
get_record ( 456 ),
get_record ( 789 ));
// Unpack the result of each task
std::string& record1 = task1. result ();
std::string& record2 = task2. result ();
std::string& record3 = task3. result ();
// Use records....
}
task<> example2 ()
{
// Create the input tasks. They don't start executing yet.
std::vector<task<std::string>> tasks;
for ( int i = 0 ; i < 1000 ; ++i)
{
tasks. emplace_back ( get_record (i));
}
// Execute all tasks concurrently.
std::vector<detail::when_all_task<std::string>> resultTasks =
co_await when_all_ready ( std::move (tasks));
// Unpack and handle each result individually once they're all complete.
for ( int i = 0 ; i < 1000 ; ++i)
{
try
{
std::string& record = tasks[i]. result ();
std::cout << i << " = " << record << std::endl;
}
catch ( const std:: exception & ex)
{
std::cout << i << " : " << ex. what () << std::endl;
}
}
}when_all() when_all()函数可用于创建一个新的等待,当co_await Ed将同时co_await同时同时返回其各个结果的汇总。
当等待返回的等待返回时,它将在当前线程上co_await每个输入等待。一旦第一个等待的暂停,将开始第二任任务,依此类推。操作同时执行,直到它们全部完成为止。
一旦所有组件co_await操作都运行到完成,则将从每个单独的结果构建结果的汇总。如果任何输入任务都会抛出异常,或者骨料结果的构造会引发异常,则例外将从返回的等待期的co_await中传播。
如果多个co_await操作失败而例外失败,则co_await when_all()中传播,其他例外将被默默地忽略。未指定将选择哪个操作例外。
如果知道哪个组件co_await操作失败或保留获得其他操作结果的能力很重要,即使其中某些操作失败,您应该使用when_all_ready()而不是使用。
API摘要:
// <cppcoro/when_all.hpp>
namespace cppcoro
{
// Variadic version.
//
// Note that if the result of `co_await awaitable` yields a void-type
// for some awaitables then the corresponding component for that awaitable
// in the tuple will be an empty struct of type detail::void_value.
template < typename ... AWAITABLES>
auto when_all (AWAITABLES&&... awaitables)
-> Awaitable<std::tuple<typename awaitable_traits<AWAITABLES>::await_result_t...>>;
// Overload for vector<Awaitable<void>>.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t ,
std:: enable_if_t <std::is_void_v<RESULT>, int > = 0 >
auto when_all (std::vector<AWAITABLE> awaitables)
-> Awaitable<void>;
// Overload for vector<Awaitable<NonVoid>> that yield a value when awaited.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t ,
std:: enable_if_t <!std::is_void_v<RESULT>, int > = 0 >
auto when_all (std::vector<AWAITABLE> awaitables)
-> Awaitable<std::vector<std::conditional_t<
std::is_lvalue_reference_v<RESULT>,
std::reference_wrapper<std::remove_reference_t<RESULT>>,
std::remove_reference_t<RESULT>>>>;
}示例:
task<A> get_a ();
task<B> get_b ();
task<> example1 ()
{
// Run get_a() and get_b() concurrently.
// Task yields a std::tuple<A, B> which can be unpacked using structured bindings.
auto [a, b] = co_await when_all ( get_a (), get_b ());
// use a, b
}
task<std::string> get_record ( int id);
task<> example2 ()
{
std::vector<task<std::string>> tasks;
for ( int i = 0 ; i < 1000 ; ++i)
{
tasks. emplace_back ( get_record (i));
}
// Concurrently execute all get_record() tasks.
// If any of them fail with an exception then the exception will propagate
// out of the co_await expression once they have all completed.
std::vector<std::string> records = co_await when_all ( std::move (tasks));
// Process results
for ( int i = 0 ; i < 1000 ; ++i)
{
std::cout << i << " = " << records[i] << std::endl;
}
}fmap() fmap()函数可用于将可karble函数应用于容器类型中包含的值(s),并返回应用函数的结果的新容器类型的型函数。
fmap()函数可以将函数应用于类型generator<T> , recursive_generator<T>和async_generator<T>的值,以及任何支持Awaitable概念的值(例如task<T> )。
这些类型中的每一种都为fmap()提供了两个参数的过载。要应用的函数和容器值。有关支持的fmap()超载的每种类型的文档。
例如, fmap()函数可用于将函数应用于task<T>的最终结果,从而产生一个新task<U> ,该函数将与该函数的返回值一起完成。
// Given a function you want to apply that converts
// a value of type A to value of type B.
B a_to_b (A value);
// And a task that yields a value of type A
cppcoro::task<A> get_an_a ();
// We can apply the function to the result of the task using fmap()
// and obtain a new task yielding the result.
cppcoro::task<B> bTask = fmap(a_to_b, get_an_a());
// An alternative syntax is to use the pipe notation.
cppcoro::task<B> bTask = get_an_a() | cppcoro::fmap(a_to_b);API摘要:
// <cppcoro/fmap.hpp>
namespace cppcoro
{
template < typename FUNC>
struct fmap_transform
{
fmap_transform (FUNC&& func) noexcept (std::is_nothrow_move_constructible_v<FUNC>);
FUNC func;
};
// Type-deducing constructor for fmap_transform object that can be used
// in conjunction with operator|.
template < typename FUNC>
fmap_transform<FUNC> fmap (FUNC&& func);
// operator| overloads for providing pipe-based syntactic sugar for fmap()
// such that the expression:
// <value-expr> | cppcoro::fmap(<func-expr>)
// is equivalent to:
// fmap(<func-expr>, <value-expr>)
template < typename T, typename FUNC>
decltype ( auto ) operator |(T&& value, fmap_transform<FUNC>&& transform);
template < typename T, typename FUNC>
decltype ( auto ) operator |(T&& value, fmap_transform<FUNC>& transform);
template < typename T, typename FUNC>
decltype ( auto ) operator |(T&& value, const fmap_transform<FUNC>& transform);
// Generic overload for all awaitable types.
//
// Returns an awaitable that when co_awaited, co_awaits the specified awaitable
// and applies the specified func to the result of the 'co_await awaitable'
// expression as if by 'std::invoke(func, co_await awaitable)'.
//
// If the type of 'co_await awaitable' expression is 'void' then co_awaiting the
// returned awaitable is equivalent to 'co_await awaitable, func()'.
template <
typename FUNC,
typename AWAITABLE,
std:: enable_if_t <is_awaitable_v<AWAITABLE>, int > = 0 >
auto fmap (FUNC&& func, AWAITABLE&& awaitable)
-> Awaitable<std::invoke_result_t<FUNC, typename awaitable_traits<AWAITABLE>::await_result_t>>;
} fmap()函数设计为通过参数依赖性查找(ADL)查找正确的过载,因此通常应在没有cppcoro::前缀的情况下调用它。
resume_on() resume_on()函数可用于控制等待的执行上下文,即等待等待时会恢复等待的coroutine。当应用于async_generator时,它会控制哪个执行上下文co_await g.begin()和co_await ++it操作恢复等待的coroutines。
通常,正在等待的等待coroutine(例如, task )或async_generator将在操作完成的任何线程上恢复执行。在某些情况下,这可能不是您要继续执行的线程。在这些情况下,您可以使用resume_on()函数来创建一个新的等待或生成器,该函数将在与指定计划程序关联的线程上恢复执行。
resume_on()函数可以用作返回新的等待/发电机的普通函数。否则可以在管道辅助词中使用。
例子:
task<record> load_record ( int id);
ui_thread_scheduler uiThreadScheduler;
task<> example ()
{
// This will start load_record() on the current thread.
// Then when load_record() completes (probably on an I/O thread)
// it will reschedule execution onto thread pool and call to_json
// Once to_json completes it will transfer execution onto the
// ui thread before resuming this coroutine and returning the json text.
task<std::string> jsonTask =
load_record ( 123 )
| cppcoro::resume_on ( threadpool::default ())
| cppcoro::fmap (to_json)
| cppcoro::resume_on (uiThreadScheduler);
// At this point, all we've done is create a pipeline of tasks.
// The tasks haven't started executing yet.
// Await the result. Starts the pipeline of tasks.
std::string jsonText = co_await jsonTask;
// Guaranteed to be executing on ui thread here.
someUiControl. set_text (jsonText);
}API摘要:
// <cppcoro/resume_on.hpp>
namespace cppcoro
{
template < typename SCHEDULER, typename AWAITABLE>
auto resume_on (SCHEDULER& scheduler, AWAITABLE awaitable)
-> Awaitable<typename awaitable_traits<AWAITABLE>::await_traits_t>;
template < typename SCHEDULER, typename T>
async_generator<T> resume_on (SCHEDULER& scheduler, async_generator<T> source);
template < typename SCHEDULER>
struct resume_on_transform
{
explicit resume_on_transform (SCHEDULER& scheduler) noexcept ;
SCHEDULER& scheduler;
};
// Construct a transform/operation that can be applied to a source object
// using "pipe" notation (ie. operator|).
template < typename SCHEDULER>
resume_on_transform<SCHEDULER> resume_on (SCHEDULER& scheduler) noexcept ;
// Equivalent to 'resume_on(transform.scheduler, std::forward<T>(value))'
template < typename T, typename SCHEDULER>
decltype ( auto ) operator |(T&& value, resume_on_transform<SCHEDULER> transform)
{
return resume_on (transform. scheduler , std::forward<T>(value));
}
}schedule_on() schedule_on()函数可用于更改给定的已久或async_generator开始执行的执行上下文。
当应用于async_generator时,它还会影响co_yield语句之后恢复的执行上下文。
请注意, schedule_on变换并未指定所期待或async_generator将完成或产生结果的线程,这取决于所期待或生成器的实现。
有关控制操作完成的线程的转换,请参见resume_on()操作员。
例如:
task< int > get_value ();
io_service ioSvc;
task<> example ()
{
// Starts executing get_value() on the current thread.
int a = co_await get_value ();
// Starts executing get_value() on a thread associated with ioSvc.
int b = co_await schedule_on (ioSvc, get_value ());
}API摘要:
// <cppcoro/schedule_on.hpp>
namespace cppcoro
{
// Return a task that yields the same result as 't' but that
// ensures that 't' is co_await'ed on a thread associated with
// the specified scheduler. Resulting task will complete on
// whatever thread 't' would normally complete on.
template < typename SCHEDULER, typename AWAITABLE>
auto schedule_on (SCHEDULER& scheduler, AWAITABLE awaitable)
-> Awaitable<typename awaitable_traits<AWAITABLE>::await_result_t>;
// Return a generator that yields the same sequence of results as
// 'source' but that ensures that execution of the coroutine starts
// execution on a thread associated with 'scheduler' and resumes
// after a 'co_yield' on a thread associated with 'scheduler'.
template < typename SCHEDULER, typename T>
async_generator<T> schedule_on (SCHEDULER& scheduler, async_generator<T> source);
template < typename SCHEDULER>
struct schedule_on_transform
{
explicit schedule_on_transform (SCHEDULER& scheduler) noexcept ;
SCHEDULER& scheduler;
};
template < typename SCHEDULER>
schedule_on_transform<SCHEDULER> schedule_on (SCHEDULER& scheduler) noexcept ;
template < typename T, typename SCHEDULER>
decltype ( auto ) operator |(T&& value, schedule_on_transform<SCHEDULER> transform);
}awaitable_traits<T>如果应用于T型的表达式,则该模板元函数可用于确定co_await表达式所产生的类型。
请注意,这是假设T型的值正在等待中,在这种情况下,它不受Coroutine Promise对象应用的任何await_transform的影响。如果在这种情况下等待T型值的值,则结果可能会有所不同。
awaitable_traits<T>模板元功能不能定义awaiter_t或await_result_t nested typedefs如果类型T ,则不等待。这允许其在不等待T时禁用超载的Sfinae上下文中使用。
API摘要:
// <cppcoro/awaitable_traits.hpp>
namespace cppcoro
{
template < typename T>
struct awaitable_traits
{
// The type that results from applying `operator co_await()` to a value
// of type T, if T supports an `operator co_await()`, otherwise is type `T&&`.
typename awaiter_t = <unspecified>;
// The type of the result of co_await'ing a value of type T.
typename await_result_t = <unspecified>;
};
}is_awaitable<T> is_awaitable<T>模板元功能使您可以查询是否可以从Coroutine内部进行co_await ed。
API摘要:
// <cppcoro/is_awaitable.hpp>
namespace cppcoro
{
template < typename T>
struct is_awaitable : std::bool_constant<...>
{};
template < typename T>
constexpr bool is_awaitable_v = is_awaitable<T>::value;
}Awaitable<T>概念Awaitable<T>是一个概念,表明可以在没有await_transform过载的coroutine上下文中co_await类型,并且co_await表达式的结果具有类型, T 。
例如,类型task<T>实现了Awaitable<T&&>概念,而类型task<T>&实现了Awaitable<T&>概念。
Awaiter<T>概念Awaiter<T>是一个概念,指示某种类型包含await_ready await_suspend和await_resume方法,以实现暂停/恢复等待coroutine的协议所需的方法。
满足Awaiter<T>的类型必须具有awaiter的实例:
awaiter.await_ready() - > boolawaiter.await_suspend(std::experimental::coroutine_handle<void>{}) - > void or bool or bool or std::experimental::coroutine_handle<P> for P >awaiter.await_resume() - > T任何实现Awaiter<T>概念的类型也都实现了Awaitable<T>概念。
Scheduler概念Scheduler是一个概念,允许在某些执行上下文中调度Coroutines执行。
concept Scheduler
{
Awaitable< void > schedule ();
}给定类型的S ,可以实现Scheduler概念,而类型S的实例s :
s.schedule()方法返回等待类型的型号,使得co_await s.schedule()将无条件暂停当前的coroutine并将其安排在与调度s相关的执行上下文中。co_await s.schedule()表达的结果具有void 。 cppcoro::task<> f (Scheduler& scheduler)
{
// Execution of the coroutine is initially on the caller's execution context.
// Suspends execution of the coroutine and schedules it for resumption on
// the scheduler's execution context.
co_await scheduler. schedule ();
// At this point the coroutine is now executing on the scheduler's
// execution context.
}DelayedScheduler概念DelayedScheduler是一个概念,它允许Coroutine安排在指定的持续时间之后,在调度程序的执行上下文上进行执行。
concept DelayedScheduler : Scheduler
{
template < typename REP, typename RATIO>
Awaitable< void > schedule_after (std::chrono::duration<REP, RATIO> delay);
template < typename REP, typename RATIO>
Awaitable< void > schedule_after (
std::chrono::duration<REP, RATIO> delay,
cppcoro::cancellation_token cancellationToken);
}给定一种类型的S ,它实现了DelayedScheduler和一个实例S类型的s :
s.schedule_after(delay)方法返回一个可以等待的对象,以使co_await s.schedule_after(delay)暂停了当前的coroutine delay时间,然后再安排与s程序相关的执行上下文恢复执行式。co_await s.schedule_after(delay)表达式具有void 。CPPCORO库支持Visual Studio 2017和Linux在Windows下进行建筑,并带有Clang 5.0+的Linux。
该库利用蛋糕制造系统(不,而不是C#一个)。
蛋糕制造系统会自动检查为Git子模块,因此您无需单独下载或安装。
该库当前需要Visual Studio 2017或更高版本和Windows 10 SDK。
计划支持Clang(#3)和Linux(#15)。
蛋糕构建系统在Python中实施,要求安装Python 2.7。
确保Python 2.7口译员在您的路径中,并以“ Python”的形式使用。
确保安装了Visual Studio 2017更新3或更高版本。请注意,更新2或更早的Coroutines有一些已知问题已在Update 3中修复。
您还可以通过从https:///vcppdogfooding.azurewebsites.net/下载Nuget软件包,并将.nuget文件解压缩到目录中,从而使用Visual Studio编译器的实验版本。只需更新config.cake文件以通过修改和删除以下行来指向未拉链的位置:
nugetPath = None # r'C:PathToVisualCppTools.14.0.25224-Pre'确保已安装Windows 10 SDK。默认情况下,它将使用最新的Windows 10 SDK和通用C运行时版本。
CPPCORO存储库利用Git subsodules吸引了蛋糕制造系统的来源。
这意味着您需要将--recursive标志传递给git clone命令。例如。
c:Code> git clone --recursive https://github.com/lewissbaker/cppcoro.git
如果您已经克隆了cppcoro,则应在拉更改后更新子模型。
c:Codecppcoro> git submodule update --init --recursive
要从命令行构建,只需在工作区根中运行“ cake.bat”。
例如。
C:cppcoro> cake.bat
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Compiling testmain.cpp
Compiling testmain.cpp
Compiling testmain.cpp
Compiling testmain.cpp
...
Linking buildwindows_x86_msvc14.10_debugtestrun.exe
Linking buildwindows_x64_msvc14.10_optimisedtestrun.exe
Linking buildwindows_x86_msvc14.10_optimisedtestrun.exe
Linking buildwindows_x64_msvc14.10_debugtestrun.exe
Generating code
Finished generating code
Generating code
Finished generating code
Build succeeded.
Build took 0:00:02.419.
默认情况下,没有参数的cake将建立所有具有所有构建变体的项目并执行单位测试。您可以通过传递其他命令行参数来缩小构建的内容。例如。
c:cppcoro> cake.bat release=debug architecture=x64 lib/build.cake
Building with C:UsersLewisCodecppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Archiving buildwindows_x64_msvc14.10_debuglibcppcoro.lib
Build succeeded.
Build took 0:00:00.321.
您可以运行cake --help来列出可用的命令行选项。
要从Visual Studio内开发您可以通过运行cake.bat -p构建.vcproj/.sln文件。
例如。
c:cppcoro> cake.bat -p
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Generating Solution build/project/cppcoro.sln
Generating Project build/project/cppcoro_tests.vcxproj
Generating Filters build/project/cppcoro_tests.vcxproj.filters
Generating Project build/project/cppcoro.vcxproj
Generating Filters build/project/cppcoro.vcxproj.filters
Build succeeded.
Build took 0:00:00.247.
当您从Visual Studio内部构建这些项目时,它将召集Cake进行汇编。
CPPCORO项目也可以使用Clang+ Libc ++ 5.0或更高版本在Linux下构建。
Cppcoro建筑物已在Ubuntu 17.04下进行了测试。
确保已安装以下软件包:
这是假设您已经建立和安装了Clang和Libc ++。
如果您还没有配置clang,请参见以下各节,以获取有关使用CPPCORO构建的Clang的详细信息。
结帐cppcoro及其子模型:
git clone --recursive https://github.com/lewissbaker/cppcoro.git cppcoro
运行init.sh设置cake bash功能:
cd cppcoro
source init.sh
然后,您可以从工作区根开始运行cake以构建cppcoro并进行测试:
$ cake
您可以指定其他命令行参数来自定义构建:
--help将打印出命令行参数的帮助--debug=run将显示正在运行的构建命令线release=debug或release=optimised将限制构建变体进行调试或优化(默认情况下,它将同时构建)。lib/build.cake将仅构建CPPCORO库,而不是测试。test/build.cake@task_tests.cpp只需编译特定的源文件test/build.cake@testresult将建立和运行测试例如:
$ cake --debug=run release=debug lib/build.cake
如果您的clang编译器不在/usr/bin/clang ,则可以使用以下一个或多个cake的命令行选项来指定替代位置:
--clang-executable=<name> - 指定要使用的clang可执行名称而不是clang 。例如。强制使用clang 8.0 Pass --clang-executable=clang-8--clang-executable=<abspath> - 指定Clang可执行文件的完整路径。构建系统还将在同一目录中寻找其他可执行文件。如果此路径具有<prefix>/bin/<name>的形式,则将其设置为默认的clang-install-prefix <prefix> 。--clang-install-prefix=<path> - 指定已安装clang的路径。这将导致构建系统在<path>/bin下寻找叮当声(除非由--clang-executable覆盖)。--libcxx-install-prefix=<path> - 指定已安装LIBC ++的路径。默认情况下,构建系统将在与Clang同一位置寻找LIBC ++。如果将此命令行选项安装在其他位置,请使用此命令行。示例:使用默认位置中安装的clang的特定版本
$ cake --clang-executable=clang-8
示例:从自定义位置使用clang的默认版本
$ cake --clang-install-prefix=/path/to/clang-install
示例:在自定义位置使用clang的特定版本,来自不同位置的libc ++
$ cake --clang-executable=/path/to/clang-install/bin/clang-8 --libcxx-install-prefix=/path/to/libcxx-install
If your Linux distribution does not have a version of Clang 5.0 or later available, you can install a snapshot build from the LLVM project.
Follow instructions at http://apt.llvm.org/ to setup your package manager to support pulling from the LLVM package manager.
For example, for Ubuntu 17.04 Zesty:
Edit /etc/apt/sources.list and add the following lines:
deb http://apt.llvm.org/zesty/ llvm-toolchain-zesty main
deb-src http://apt.llvm.org/zesty/ llvm-toolchain-zesty main
Install the PGP key for those packages:
$ wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | sudo apt-key add -
Install Clang and LLD:
$ sudo apt-get install clang-6.0 lld-6.0
The LLVM snapshot builds do not include libc++ versions so you'll need to build that yourself.见下文。
You can also use the bleeding-edge Clang version by building Clang from source yourself.
See instructions here:
To do this you will need to install the following pre-requisites:
$ sudo apt-get install git cmake ninja-build clang lld
Note that we are using your distribution's version of clang to build clang from source. GCC could also be used here instead.
Checkout LLVM + Clang + LLD + libc++ repositories:
mkdir llvm
cd llvm
git clone --depth=1 https://github.com/llvm-mirror/llvm.git llvm
git clone --depth=1 https://github.com/llvm-mirror/clang.git llvm/tools/clang
git clone --depth=1 https://github.com/llvm-mirror/lld.git llvm/tools/lld
git clone --depth=1 https://github.com/llvm-mirror/libcxx.git llvm/projects/libcxx
ln -s llvm/tools/clang clang
ln -s llvm/tools/lld lld
ln -s llvm/projects/libcxx libcxx
Configure and build Clang:
mkdir clang-build
cd clang-build
cmake -GNinja
-DCMAKE_CXX_COMPILER=/usr/bin/clang++
-DCMAKE_C_COMPILER=/usr/bin/clang
-DCMAKE_BUILD_TYPE=MinSizeRel
-DCMAKE_INSTALL_PREFIX="/path/to/clang/install"
-DCMAKE_BUILD_WITH_INSTALL_RPATH="yes"
-DLLVM_TARGETS_TO_BUILD=X86
-DLLVM_ENABLE_PROJECTS="lld;clang"
../llvm
ninja install-clang
install-clang-headers
install-llvm-ar
install-lld
The cppcoro project requires libc++ as it contains the <experimental/coroutine> header required to use C++ coroutines under Clang.
Checkout libc++ + llvm :
mkdir llvm
cd llvm
git clone --depth=1 https://github.com/llvm-mirror/llvm.git llvm
git clone --depth=1 https://github.com/llvm-mirror/libcxx.git llvm/projects/libcxx
ln -s llvm/projects/libcxx libcxx
Build libc++ :
mkdir libcxx-build
cd libcxx-build
cmake -GNinja
-DCMAKE_CXX_COMPILER="/path/to/clang/install/bin/clang++"
-DCMAKE_C_COMPILER="/path/to/clang/install/bin/clang"
-DCMAKE_BUILD_TYPE=Release
-DCMAKE_INSTALL_PREFIX="/path/to/clang/install"
-DLLVM_PATH="../llvm"
-DLIBCXX_CXX_ABI=libstdc++
-DLIBCXX_CXX_ABI_INCLUDE_PATHS="/usr/include/c++/6.3.0/;/usr/include/x86_64-linux-gnu/c++/6.3.0/"
../libcxx
ninja cxx
ninja install
This will build and install libc++ into the same install directory where you have clang installed.
The cppcoro port in vcpkg is kept up to date by Microsoft team members and community contributors. The url of vcpkg is: https://github.com/Microsoft/vcpkg . You can download and install cppcoro using the vcpkg dependency manager:
git clone https://github.com/Microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh # ./bootstrap-vcpkg.bat for Windows
./vcpkg integrate install
./vcpkg install cppcoro如果该版本已过时,请在VCPKG存储库上创建问题或拉出请求。
GitHub issues are the primary mechanism for support, bug reports and feature requests.
Contributions are welcome and pull-requests will be happily reviewed. I only ask that you agree to license any contributions that you make under the MIT license.
If you have general questions about C++ coroutines, you can generally find someone to help in the #coroutines channel on Cpplang Slack group.