“ CPPCORO”庫提供了一系列通用原語,用於利用N4680中描述的Coroutines TS提案。
其中包括:
task<T>shared_task<T>generator<T>recursive_generator<T>async_generator<T>single_consumer_eventsingle_consumer_async_auto_reset_eventasync_mutexasync_manual_reset_eventasync_auto_reset_eventasync_latchsequence_barriermulti_producer_sequencersingle_producer_sequencersync_wait()when_all()when_all_ready()fmap()schedule_on()resume_on()cancellation_tokencancellation_sourcecancellation_registrationstatic_thread_poolio_service和io_work_scopefile , readable_file , writable_fileread_only_file , write_only_file , read_write_filesocketip_address , ipv4_address , ipv6_addressip_endpoint , ipv4_endpoint , ipv6_endpointis_awaitable<T>awaitable_traits<T>Awaitable<T>Awaiter<T>SchedulerDelayedScheduler該庫是一個實驗庫,正在探索可以在C ++ Coroutines建議頂部構建的高性能,可擴展異步編程摘要的空間。
它是開源的,希望其他人會發現它有用,並且C ++社區可以提供有關它的反饋和改善它的方法。
它需要一個支持Coroutines ts的編譯器:
Linux版本具有功能性,除了io_context和File I/O相關類,該類尚未針對Linux實施(有關更多信息,請參見第15期)。
task<T>一項任務代表了一個異步計算,該計算懶惰地執行,因為在等待任務之前,coroutine的執行不會啟動。
例子:
# include < cppcoro/read_only_file.hpp >
# include < cppcoro/task.hpp >
cppcoro::task< int > count_lines (std::string path)
{
auto file = co_await cppcoro::read_only_file::open (path);
int lineCount = 0 ;
char buffer[ 1024 ];
size_t bytesRead;
std:: uint64_t offset = 0 ;
do
{
bytesRead = co_await file. read (offset, buffer, sizeof (buffer));
lineCount += std::count (buffer, buffer + bytesRead, ' n ' );
offset += bytesRead;
} while (bytesRead > 0 );
co_return lineCount;
}
cppcoro::task<> usage_example ()
{
// Calling function creates a new task but doesn't start
// executing the coroutine yet.
cppcoro::task< int > countTask = count_lines ( " foo.txt " );
// ...
// Coroutine is only started when we later co_await the task.
int lineCount = co_await countTask;
std::cout << " line count = " << lineCount << std::endl;
}API概述:
// <cppcoro/task.hpp>
namespace cppcoro
{
template < typename T>
class task
{
public:
using promise_type = <unspecified>;
using value_type = T;
task () noexcept ;
task (task&& other) noexcept ;
task& operator =(task&& other);
// task is a move-only type.
task ( const task& other) = delete ;
task& operator =( const task& other) = delete ;
// Query if the task result is ready.
bool is_ready () const noexcept ;
// Wait for the task to complete and return the result or rethrow the
// exception if the operation completed with an unhandled exception.
//
// If the task is not yet ready then the awaiting coroutine will be
// suspended until the task completes. If the the task is_ready() then
// this operation will return the result synchronously without suspending.
Awaiter<T&> operator co_await () const & noexcept ;
Awaiter<T&&> operator co_await () const && noexcept ;
// Returns an awaitable that can be co_await'ed to suspend the current
// coroutine until the task completes.
//
// The 'co_await t.when_ready()' expression differs from 'co_await t' in
// that when_ready() only performs synchronization, it does not return
// the result or rethrow the exception.
//
// This can be useful if you want to synchronize with the task without
// the possibility of it throwing an exception.
Awaitable< void > when_ready () const noexcept ;
};
template < typename T>
void swap (task<T>& a, task<T>& b);
// Creates a task that yields the result of co_await'ing the specified awaitable.
//
// This can be used as a form of type-erasure of the concrete awaitable, allowing
// different awaitables that return the same await-result type to be stored in
// the same task<RESULT> type.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t >
task<RESULT> make_task (AWAITABLE awaitable);
}您可以通過調用返回task<T>的coroutine函數來創建task<T>對象。
Coroutine必須包含co_await或co_return的用法。請注意, task<T> coroutine可能不使用co_yield關鍵字。
當調用返回task<T>的Coroutine時,必要時會分配Coroutine框架,並在Coroutine框架中捕獲參數。 Coroutine在Coroutine主體的開頭懸掛,並將執行返回給呼叫者,並且一個代表異步計算的task<T>值將從函數調用返回。
當task<T>值為co_await ed時,Coroutine主體將開始執行。這將暫停等待的Coroutine,並開始執行與已久的task<T>值相關的Coroutine。
等待的coroutine將在完成已久task<T>的Coroutine執行的線程上恢復。 IE。執行co_return線程或拋出未經處理的異常,以終止執行coroutine。
如果任務已經完成完成,那麼再次等待它將獲得已經計算的結果,而無需暫停等待Coroutine。
如果task對像在等待之前被銷毀,那麼Coroutine將永遠不會執行,而破壞者只是破壞了捕獲的參數,並釋放了Coroutine框架使用的任何內存。
shared_task<T> shared_task<T>類是一種coroutine類型,可異步產生單個值。
這是“懶惰的”,因為任務的執行才能啟動,直到某些Coroutine等待它。
它是“共享”的,因為可以復制任務值,從而可以對創建任務的結果進行多個引用。它還允許多個Coroutines同時等待結果。
該任務將開始在首次co_await線程上執行任務。隨後的等待者將被暫停,並在任務完成後排隊以恢復,或者如果任務已經完成完成,則將繼續同步。
如果等待任務完成時暫停了等待者,則將在完成任務執行的線程上恢復。 IE。執行co_return線程或拋出終止執行coroutine的未經處理的異常。
API摘要
namespace cppcoro
{
template < typename T = void >
class shared_task
{
public:
using promise_type = <unspecified>;
using value_type = T;
shared_task () noexcept ;
shared_task ( const shared_task& other) noexcept ;
shared_task (shared_task&& other) noexcept ;
shared_task& operator =( const shared_task& other) noexcept ;
shared_task& operator =(shared_task&& other) noexcept ;
void swap (shared_task& other) noexcept ;
// Query if the task has completed and the result is ready.
bool is_ready () const noexcept ;
// Returns an operation that when awaited will suspend the
// current coroutine until the task completes and the result
// is available.
//
// The type of the result of the 'co_await someTask' expression
// is an l-value reference to the task's result value (unless T
// is void in which case the expression has type 'void').
// If the task completed with an unhandled exception then the
// exception will be rethrown by the co_await expression.
Awaiter<T&> operator co_await () const noexcept ;
// Returns an operation that when awaited will suspend the
// calling coroutine until the task completes and the result
// is available.
//
// The result is not returned from the co_await expression.
// This can be used to synchronize with the task without the
// possibility of the co_await expression throwing an exception.
Awaiter< void > when_ready () const noexcept ;
};
template < typename T>
bool operator ==( const shared_task<T>& a, const shared_task<T>& b) noexcept ;
template < typename T>
bool operator !=( const shared_task<T>& a, const shared_task<T>& b) noexcept ;
template < typename T>
void swap (shared_task<T>& a, shared_task<T>& b) noexcept ;
// Wrap an awaitable value in a shared_task to allow multiple coroutines
// to concurrently await the result.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t >
shared_task<RESULT> make_shared_task (AWAITABLE awaitable);
} shared_task<T>上的所有const方法都是安全地與來自多個線程的同一實例上的其他const方法同時調用的。在shared_task<T>同一實例上同時調用共享shared_task<T>的非const方法是不安全的。
task<T> shared_task<T>類類似於task<T> ,因為該任務在調用coroutine函數時不會立即啟動執行。該任務首先在等待時才開始執行。
它與task<T>不同,因為可以復制所得的任務對象,從而允許多個任務對象引用相同的異步結果。它還支持多個珊瑚酸,同時等待任務的結果。
權衡取捨是結果始終是對結果的L值參考,從來沒有R值參考(由於結果可以共享),這可能會限制將結果構造到局部變量中的能力。由於需要維持參考數量並支持多個等待者,因此它的運行時間成本也略高。
generator<T> generator代表一種產生類型T值序列的Coroutine類型,其中值懶惰和同步產生。
Coroutine主體能夠使用co_yield關鍵字產生T型的值。但是請注意,Coroutine主體無法使用co_await關鍵字。值必須同步產生。
例如:
cppcoro::generator< const std:: uint64_t > fibonacci ()
{
std:: uint64_t a = 0 , b = 1 ;
while ( true )
{
co_yield b;
auto tmp = a;
a = b;
b += tmp;
}
}
void usage ()
{
for ( auto i : fibonacci ())
{
if (i > 1'000'000 ) break ;
std::cout << i << std::endl;
}
}當Coroutine函數返回generator<T>時,將創建Coroutine最初懸掛。當generator<T>::begin()方法被調用並繼續進行直到到達第一個co_yield語句或完成Coroutine運行到完成時,執行Coroutine進入Coroutine主體。
如果返回的迭代器不等於end()迭代器,則指定迭代器將返回對傳遞給co_yield語句的值的引用。
在迭代器上調用operator++()將恢復執行coroutine,並繼續執行,直到達到下一個co_yield點或coroutine運行到完成()。
Coroutine拋出的任何未經處理的異常都會從begin()或operator++()調用對呼叫者中傳播。
API摘要:
namespace cppcoro
{
template < typename T>
class generator
{
public:
using promise_type = <unspecified>;
class iterator
{
public:
using iterator_category = std::input_iterator_tag;
using value_type = std:: remove_reference_t <T>;
using reference = value_type&;
using pointer = value_type*;
using difference_type = std:: size_t ;
iterator ( const iterator& other) noexcept ;
iterator& operator =( const iterator& other) noexcept ;
// If the generator coroutine throws an unhandled exception before producing
// the next element then the exception will propagate out of this call.
iterator& operator ++();
reference operator *() const noexcept ;
pointer operator ->() const noexcept ;
bool operator ==( const iterator& other) const noexcept ;
bool operator !=( const iterator& other) const noexcept ;
};
// Constructs to the empty sequence.
generator () noexcept ;
generator (generator&& other) noexcept ;
generator& operator =(generator&& other) noexcept ;
generator ( const generator& other) = delete ;
generator& operator =( const generator&) = delete ;
~generator ();
// Starts executing the generator coroutine which runs until either a value is yielded
// or the coroutine runs to completion or an unhandled exception propagates out of the
// the coroutine.
iterator begin ();
iterator end () noexcept ;
// Swap the contents of two generators.
void swap (generator& other) noexcept ;
};
template < typename T>
void swap (generator<T>& a, generator<T>& b) noexcept ;
// Apply function, func, lazily to each element of the source generator
// and yield a sequence of the results of calls to func().
template < typename FUNC, typename T>
generator<std:: invoke_result_t <FUNC, T&>> fmap (FUNC func, generator<T> source);
}recursive_generator<T> recursive_generator類似於generator ,除了它被設計為更有效地支持將嵌套序列的元素作為外序元素的元素的元素。
除了能夠co_yield二型T型值外,您還可以將recursive_generator<T>類型的值co_yield 。
當您co_yield時, recursive_generator<T>值<t value,屈服發生器的所有元素作為當前發電機的元素產生。當前的Coroutine被暫停,直到消費者消耗了嵌套發生器的所有元素為止,此後,當前Coroutine的點執行將恢復執行以產生下一個元素。
在遞歸數據結構上迭代的遞歸generator<T> recursive_generator<T>的好處是, iterator::operator++()能夠直接恢復葉片的coroutine以產生下一個元素,而不必為每個元素恢復/懸浮o(deptens o(Depth)Coroutines。側面是還有其他開銷
例如:
// Lists the immediate contents of a directory.
cppcoro::generator<dir_entry> list_directory (std::filesystem::path path);
cppcoro::recursive_generator<dir_entry> list_directory_recursive (std::filesystem::path path)
{
for ( auto & entry : list_directory (path))
{
co_yield entry;
if (entry. is_directory ())
{
co_yield list_directory_recursive (entry. path ());
}
}
}請注意,將fmap()運算符應用於recursive_generator<T>將產生generator<U>類型而不是recursive_generator<U> 。這是因為fmap的用途通常在遞歸上下文中不使用,我們試圖避免遞歸recursive_generator產生的額外開銷。
async_generator<T> async_generator代表一種產生類型T值序列的Coroutine類型,其中懶惰產生值,並且可能異步產生值。
Coroutine主體能夠同時使用co_await和co_yield表達式。
發電機的消費者可以將基於for co_await範圍的for-loop使用來消耗這些值。
例子
cppcoro::async_generator< int > ticker ( int count, threadpool& tp)
{
for ( int i = 0 ; i < count; ++i)
{
co_await tp. delay ( std::chrono::seconds ( 1 ));
co_yield i;
}
}
cppcoro::task<> consumer (threadpool& tp)
{
auto sequence = ticker ( 10 , tp);
for co_await (std:: uint32_t i : sequence)
{
std::cout << " Tick " << i << std::endl;
}
}API摘要
// <cppcoro/async_generator.hpp>
namespace cppcoro
{
template < typename T>
class async_generator
{
public:
class iterator
{
public:
using iterator_tag = std::forward_iterator_tag;
using difference_type = std:: size_t ;
using value_type = std:: remove_reference_t <T>;
using reference = value_type&;
using pointer = value_type*;
iterator ( const iterator& other) noexcept ;
iterator& operator =( const iterator& other) noexcept ;
// Resumes the generator coroutine if suspended
// Returns an operation object that must be awaited to wait
// for the increment operation to complete.
// If the coroutine runs to completion then the iterator
// will subsequently become equal to the end() iterator.
// If the coroutine completes with an unhandled exception then
// that exception will be rethrown from the co_await expression.
Awaitable<iterator&> operator ++() noexcept ;
// Dereference the iterator.
pointer operator ->() const noexcept ;
reference operator *() const noexcept ;
bool operator ==( const iterator& other) const noexcept ;
bool operator !=( const iterator& other) const noexcept ;
};
// Construct to the empty sequence.
async_generator () noexcept ;
async_generator ( const async_generator&) = delete ;
async_generator (async_generator&& other) noexcept ;
~async_generator ();
async_generator& operator =( const async_generator&) = delete ;
async_generator& operator =(async_generator&& other) noexcept ;
void swap (async_generator& other) noexcept ;
// Starts execution of the coroutine and returns an operation object
// that must be awaited to wait for the first value to become available.
// The result of co_await'ing the returned object is an iterator that
// can be used to advance to subsequent elements of the sequence.
//
// This method is not valid to be called once the coroutine has
// run to completion.
Awaitable<iterator> begin () noexcept ;
iterator end () noexcept ;
};
template < typename T>
void swap (async_generator<T>& a, async_generator<T>& b);
// Apply 'func' to each element of the source generator, yielding a sequence of
// the results of calling 'func' on the source elements.
template < typename FUNC, typename T>
async_generator<std:: invoke_result_t <FUNC, T&>> fmap (FUNC func, async_generator<T> source);
}當破壞async_generator對象時,請求取消基礎Coroutine。如果Coroutine已經運行到完成或目前被暫停在co_yield表達式中,則立即將Coroutine銷毀。否則,Coroutine將繼續執行,直到完成完成或到達下一個co_yield表達式為止。
當Coroutine框架被銷毀時,將執行所有變量範圍中所有變量的破壞者,以確保清理髮電機的資源。
請注意,呼叫者必須確保在消費者coroutine執行co_await表達式等待下一個項目時,必須確保不得破壞async_generator對象。
single_consumer_event這是一種簡單的手動遞歸事件類型,一次僅支持一個coroutine等待它。這可以用來
API摘要:
// <cppcoro/single_consumer_event.hpp>
namespace cppcoro
{
class single_consumer_event
{
public:
single_consumer_event ( bool initiallySet = false ) noexcept ;
bool is_set () const noexcept ;
void set ();
void reset () noexcept ;
Awaiter< void > operator co_await () const noexcept ;
};
}例子:
# include < cppcoro/single_consumer_event.hpp >
cppcoro::single_consumer_event event;
std::string value;
cppcoro::task<> consumer ()
{
// Coroutine will suspend here until some thread calls event.set()
// eg. inside the producer() function below.
co_await event;
std::cout << value << std::endl;
}
void producer ()
{
value = " foo " ;
// This will resume the consumer() coroutine inside the call to set()
// if it is currently suspended.
event. set ();
}single_consumer_async_auto_reset_event該類提供了異步同步原始的原始性,它允許單個coroutine等到事件通過呼叫對set()方法發出信號。
一旦等待事件的coroutine由以前或後續調用set()釋放()事件將自動重置回“不設置”狀態。
該類是async_auto_reset_event的更有效版本,在一次只能等待事件的情況下,可以使用。如果您需要支持在事件上的多個同時等待coroutines,請改用async_auto_reset_event類。
API摘要:
// <cppcoro/single_consumer_async_auto_reset_event.hpp>
namespace cppcoro
{
class single_consumer_async_auto_reset_event
{
public:
single_consumer_async_auto_reset_event (
bool initiallySet = false ) noexcept ;
// Change the event to the 'set' state. If a coroutine is awaiting the
// event then the event is immediately transitioned back to the 'not set'
// state and the coroutine is resumed.
void set () noexcept ;
// Returns an Awaitable type that can be awaited to wait until
// the event becomes 'set' via a call to the .set() method. If
// the event is already in the 'set' state then the coroutine
// continues without suspending.
// The event is automatically reset back to the 'not set' state
// before resuming the coroutine.
Awaiter< void > operator co_await () const noexcept ;
};
}示例用法:
std::atomic< int > value;
cppcoro::single_consumer_async_auto_reset_event valueDecreasedEvent;
cppcoro::task<> wait_until_value_is_below ( int limit)
{
while (value. load (std::memory_order_relaxed) >= limit)
{
// Wait until there has been some change that we're interested in.
co_await valueDecreasedEvent;
}
}
void change_value ( int delta)
{
value. fetch_add (delta, std::memory_order_relaxed);
// Notify the waiter if there has been some change.
if (delta < 0 ) valueDecreasedEvent. set ();
}async_mutex提供了一個簡單的相互排除抽象,該抽象使呼叫者可以從Coroutine內“ co_await”“ co_await”,以暫停Coroutine,直到獲得靜音鎖。
該實現是無鎖的,因為等待Mutex的Coroutine不會阻止線程,而是將暫停Coroutine,然後將其恢復到以前的鎖定器中的呼叫unlock()中。
API摘要:
// <cppcoro/async_mutex.hpp>
namespace cppcoro
{
class async_mutex_lock ;
class async_mutex_lock_operation ;
class async_mutex_scoped_lock_operation ;
class async_mutex
{
public:
async_mutex () noexcept ;
~async_mutex ();
async_mutex ( const async_mutex&) = delete ;
async_mutex& operator ( const async_mutex&) = delete;
bool try_lock () noexcept ;
async_mutex_lock_operation lock_async () noexcept ;
async_mutex_scoped_lock_operation scoped_lock_async () noexcept ;
void unlock ();
};
class async_mutex_lock_operation
{
public:
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () const noexcept ;
};
class async_mutex_scoped_lock_operation
{
public:
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
[[nodiscard]] async_mutex_lock await_resume () const noexcept ;
};
class async_mutex_lock
{
public:
// Takes ownership of the lock.
async_mutex_lock (async_mutex& mutex, std:: adopt_lock_t ) noexcept ;
// Transfer ownership of the lock.
async_mutex_lock (async_mutex_lock&& other) noexcept ;
async_mutex_lock ( const async_mutex_lock&) = delete ;
async_mutex_lock& operator =( const async_mutex_lock&) = delete ;
// Releases the lock by calling unlock() on the mutex.
~async_mutex_lock ();
};
}示例用法:
# include < cppcoro/async_mutex.hpp >
# include < cppcoro/task.hpp >
# include < set >
# include < string >
cppcoro::async_mutex mutex;
std::set<std::string> values;
cppcoro::task<> add_item (std::string value)
{
cppcoro::async_mutex_lock lock = co_await mutex. scoped_lock_async ();
values. insert ( std::move (value));
}async_manual_reset_event手動遞歸事件是一個coroutine/螺紋同步原始化,它允許一個或多個線程等待,直到事件通過調用set()線程發出信號。
該活動在兩個州之一。 “設置”和“未設置” 。
如果事件處於“集合”狀態時,當Coroutine等待事件時,那麼Coroutine繼續而不會暫停。但是,如果Coroutine處於“未集”狀態,則暫停Coroutine,直到某個線程隨後調用set()方法為止。
等待事件變為“集合”時懸掛的任何線程都將通過某個線程恢復到set()的下一個呼叫中。
請注意,您必須確保在事件被破壞時不在等待“不設置”事件,因為它們不會恢復。
例子:
cppcoro::async_manual_reset_event event;
std::string value;
void producer ()
{
value = get_some_string_value ();
// Publish a value by setting the event.
event. set ();
}
// Can be called many times to create many tasks.
// All consumer tasks will wait until value has been published.
cppcoro::task<> consumer ()
{
// Wait until value has been published by awaiting event.
co_await event;
consume_value (value);
}API摘要:
namespace cppcoro
{
class async_manual_reset_event_operation ;
class async_manual_reset_event
{
public:
async_manual_reset_event ( bool initiallySet = false ) noexcept ;
~async_manual_reset_event ();
async_manual_reset_event ( const async_manual_reset_event&) = delete ;
async_manual_reset_event (async_manual_reset_event&&) = delete ;
async_manual_reset_event& operator =( const async_manual_reset_event&) = delete ;
async_manual_reset_event& operator =(async_manual_reset_event&&) = delete ;
// Wait until the event becomes set.
async_manual_reset_event_operation operator co_await () const noexcept ;
bool is_set () const noexcept ;
void set () noexcept ;
void reset () noexcept ;
};
class async_manual_reset_event_operation
{
public:
async_manual_reset_event_operation (async_manual_reset_event& event) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () const noexcept ;
};
}async_auto_reset_event一個自動收集事件是一個coroutine/螺紋同步原始事件,它允許一個或多個線程等到通過調用set()通過線程發出事件的信號,直到事件發出信號()。
一旦在等待事件的Coroutine被以前或後續的調用set()發布()事件會自動重置回“不設置”狀態。
API摘要:
// <cppcoro/async_auto_reset_event.hpp>
namespace cppcoro
{
class async_auto_reset_event_operation ;
class async_auto_reset_event
{
public:
async_auto_reset_event ( bool initiallySet = false ) noexcept ;
~async_auto_reset_event ();
async_auto_reset_event ( const async_auto_reset_event&) = delete ;
async_auto_reset_event (async_auto_reset_event&&) = delete ;
async_auto_reset_event& operator =( const async_auto_reset_event&) = delete ;
async_auto_reset_event& operator =(async_auto_reset_event&&) = delete ;
// Wait for the event to enter the 'set' state.
//
// If the event is already 'set' then the event is set to the 'not set'
// state and the awaiting coroutine continues without suspending.
// Otherwise, the coroutine is suspended and later resumed when some
// thread calls 'set()'.
//
// Note that the coroutine may be resumed inside a call to 'set()'
// or inside another thread's call to 'operator co_await()'.
async_auto_reset_event_operation operator co_await () const noexcept ;
// Set the state of the event to 'set'.
//
// If there are pending coroutines awaiting the event then one
// pending coroutine is resumed and the state is immediately
// set back to the 'not set' state.
//
// This operation is a no-op if the event was already 'set'.
void set () noexcept ;
// Set the state of the event to 'not-set'.
//
// This is a no-op if the state was already 'not set'.
void reset () noexcept ;
};
class async_auto_reset_event_operation
{
public:
explicit async_auto_reset_event_operation (async_auto_reset_event& event) noexcept ;
async_auto_reset_event_operation ( const async_auto_reset_event_operation& other) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () const noexcept ;
};
}async_latch異步閂鎖是一種同步原始性,它允許旋ou等待,直到將計數器降低到零為止。
閂鎖是一次使用對象。一旦計數器達到零,閂鎖就會“準備就緒”,並將保持準備,直到閂鎖被摧毀為止。
API摘要:
// <cppcoro/async_latch.hpp>
namespace cppcoro
{
class async_latch
{
public:
// Initialise the latch with the specified count.
async_latch (std:: ptrdiff_t initialCount) noexcept ;
// Query if the count has reached zero yet.
bool is_ready () const noexcept ;
// Decrement the count by n.
// This will resume any waiting coroutines if the count reaches zero
// as a result of this call.
// It is undefined behaviour to decrement the count below zero.
void count_down (std:: ptrdiff_t n = 1 ) noexcept ;
// Wait until the latch becomes ready.
// If the latch count is not yet zero then the awaiting coroutine will
// be suspended and later resumed by a call to count_down() that decrements
// the count to zero. If the latch count was already zero then the coroutine
// continues without suspending.
Awaiter< void > operator co_await () const noexcept ;
};
}sequence_barrier sequence_barrier是一種同步原始化,它允許單個生產者和多個消費者相對於單調增加的序列數進行協調。
單個生產者通過以單調增加的順序發布新序列編號來推進序列編號。一個或多個消費者可以查詢最後發布的序列編號,並且可以等到發布特定序列編號為止。
序列屏障可以用來將光標表示為線程安全/消費者環形彈跳器
有關更多背景:https://lmax-exchange.github.io/disruptor/files/files/disruptor-1.0.pdf
API摘要:
namespace cppcoro
{
template < typename SEQUENCE = std:: size_t ,
typename TRAITS = sequence_traits<SEQUENCE>>
class sequence_barrier
{
public:
sequence_barrier (SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept ;
~sequence_barrier ();
SEQUENCE last_published () const noexcept ;
// Wait until the specified targetSequence number has been published.
//
// If the operation does not complete synchronously then the awaiting
// coroutine is resumed on the specified scheduler. Otherwise, the
// coroutine continues without suspending.
//
// The co_await expression resumes with the updated last_published()
// value, which is guaranteed to be at least 'targetSequence'.
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<SEQUENCE> wait_until_published (SEQUENCE targetSequence,
SCHEDULER& scheduler) const noexcept ;
void publish (SEQUENCE sequence) noexcept ;
};
}single_producer_sequencer single_producer_sequencer是一種同步原始化,可用於協調單個生產商和一個或多個消費者的彈跳器的訪問。
生產者首先在彈跳器中獲取一個或多個插槽,將與這些插槽相對應的環形彈跳器元素寫入,然後最終發布寫入這些插槽的值。在消費者消費之前,生產商永遠不會生產更多的“緩衝”元素。
然後,消費者等待發布某些元素,處理項目,然後通過發布sequence_barrier對象完成的序列編號完成處理項目時通知生產商。
API摘要:
// <cppcoro/single_producer_sequencer.hpp>
namespace cppcoro
{
template <
typename SEQUENCE = std:: size_t ,
typename TRAITS = sequence_traits<SEQUENCE>>
class single_producer_sequencer
{
public:
using size_type = typename sequence_range<SEQUENCE, TRAITS>::size_type;
single_producer_sequencer (
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
std:: size_t bufferSize,
SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept ;
// Publisher API:
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<SEQUENCE> claim_one (SCHEDULER& scheduler) noexcept ;
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<sequence_range<SEQUENCE>> claim_up_to (
std:: size_t count,
SCHEDULER& scheduler) noexcept ;
void publish (SEQUENCE sequence) noexcept ;
// Consumer API:
SEQUENCE last_published () const noexcept ;
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<SEQUENCE> wait_until_published (
SEQUENCE targetSequence,
SCHEDULER& scheduler) const noexcept ;
};
}示例用法:
using namespace cppcoro ;
using namespace std ::chrono ;
struct message
{
int id;
steady_clock::time_point timestamp;
float data;
};
constexpr size_t bufferSize = 16384 ; // Must be power-of-two
constexpr size_t indexMask = bufferSize - 1 ;
message buffer[bufferSize];
task< void > producer (
io_service& ioSvc,
single_producer_sequencer< size_t >& sequencer)
{
auto start = steady_clock::now ();
for ( int i = 0 ; i < 1'000'000 ; ++i)
{
// Wait until a slot is free in the buffer.
size_t seq = co_await sequencer. claim_one (ioSvc);
// Populate the message.
auto & msg = buffer[seq & indexMask];
msg. id = i;
msg. timestamp = steady_clock::now ();
msg. data = 123 ;
// Publish the message.
sequencer. publish (seq);
}
// Publish a sentinel
auto seq = co_await sequencer. claim_one (ioSvc);
auto & msg = buffer[seq & indexMask];
msg. id = - 1 ;
sequencer. publish (seq);
}
task< void > consumer (
static_thread_pool& threadPool,
const single_producer_sequencer< size_t >& sequencer,
sequence_barrier< size_t >& consumerBarrier)
{
size_t nextToRead = 0 ;
while ( true )
{
// Wait until the next message is available
// There may be more than one available.
const size_t available = co_await sequencer. wait_until_published (nextToRead, threadPool);
do {
auto & msg = buffer[nextToRead & indexMask];
if (msg. id == - 1 )
{
consumerBarrier. publish (nextToRead);
co_return ;
}
processMessage (msg);
} while (nextToRead++ != available);
// Notify the producer that we've finished processing
// up to 'nextToRead - 1'.
consumerBarrier. publish (available);
}
}
task< void > example (io_service& ioSvc, static_thread_pool& threadPool)
{
sequence_barrier< size_t > barrier;
single_producer_sequencer< size_t > sequencer{barrier, bufferSize};
co_await when_all (
producer (tp, sequencer),
consumer (tp, sequencer, barrier));
}multi_producer_sequencer multi_producer_sequencer類是一種同步原始化,可協調多個生產商和一個或多個消費者的彈跳器的訪問。
有關單生產商變體,請參見single_producer_sequencer類。
請注意,彈跳器必須具有兩個尺寸的兩個尺寸。這是因為實現使用BitMasks代替整數除法/模塊來計算緩衝區中的偏移量。另外,這允許序列編號安全地包裹在32位/64位值周圍。
API摘要:
// <cppcoro/multi_producer_sequencer.hpp>
namespace cppcoro
{
template < typename SEQUENCE = std:: size_t ,
typename TRAITS = sequence_traits<SEQUENCE>>
class multi_producer_sequencer
{
public:
multi_producer_sequencer (
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
SEQUENCE initialSequence = TRAITS::initial_sequence);
std:: size_t buffer_size () const noexcept ;
// Consumer interface
//
// Each consumer keeps track of their own 'lastKnownPublished' value
// and must pass this to the methods that query for an updated last-known
// published sequence number.
SEQUENCE last_published_after (SEQUENCE lastKnownPublished) const noexcept ;
template < typename SCHEDULER>
Awaitable<SEQUENCE> wait_until_published (
SEQUENCE targetSequence,
SEQUENCE lastKnownPublished,
SCHEDULER& scheduler) const noexcept ;
// Producer interface
// Query whether any slots available for claiming (approx.)
bool any_available () const noexcept ;
template < typename SCHEDULER>
Awaitable<SEQUENCE> claim_one (SCHEDULER& scheduler) noexcept ;
template < typename SCHEDULER>
Awaitable<sequence_range<SEQUENCE, TRAITS>> claim_up_to (
std:: size_t count,
SCHEDULER& scheduler) noexcept ;
// Mark the specified sequence number as published.
void publish (SEQUENCE sequence) noexcept ;
// Mark all sequence numbers in the specified range as published.
void publish ( const sequence_range<SEQUENCE, TRAITS>& range) noexcept ;
};
}cancellation_token是一個值,它可以傳遞給函數,該功能允許呼叫者隨後將請求傳達以取消操作的請求。
要獲得能夠取消的cancellation_token ,必須首先創建一個cancellation_source對象。 cancellation_source::token()方法可用於製造與cancellation_source對象鏈接的新cancellation_token值。
當您稍後請求取消操作時,您已傳遞了cancellation_token ,您可以在關聯的cancellation_source對像上調用cancellation_source::request_cancellation() 。
功能可以通過兩種方式之一響應取消請求:
cancellation_token::is_cancellation_requested()或cancellation_token::throw_if_cancellation_requested() ,以定期取消進行投票。cancellation_registration類請求取消時,註冊要執行的回調。API摘要:
namespace cppcoro
{
class cancellation_source
{
public:
// Construct a new, independently cancellable cancellation source.
cancellation_source ();
// Construct a new reference to the same cancellation state.
cancellation_source ( const cancellation_source& other) noexcept ;
cancellation_source (cancellation_source&& other) noexcept ;
~cancellation_source ();
cancellation_source& operator =( const cancellation_source& other) noexcept ;
cancellation_source& operator =(cancellation_source&& other) noexcept ;
bool is_cancellation_requested () const noexcept ;
bool can_be_cancelled () const noexcept ;
void request_cancellation ();
cancellation_token token () const noexcept ;
};
class cancellation_token
{
public:
// Construct a token that can't be cancelled.
cancellation_token () noexcept ;
cancellation_token ( const cancellation_token& other) noexcept ;
cancellation_token (cancellation_token&& other) noexcept ;
~cancellation_token ();
cancellation_token& operator =( const cancellation_token& other) noexcept ;
cancellation_token& operator =(cancellation_token&& other) noexcept ;
bool is_cancellation_requested () const noexcept ;
void throw_if_cancellation_requested () const ;
// Query if this token can ever have cancellation requested.
// Code can use this to take a more efficient code-path in cases
// that the operation does not need to handle cancellation.
bool can_be_cancelled () const noexcept ;
};
// RAII class for registering a callback to be executed if cancellation
// is requested on a particular cancellation token.
class cancellation_registration
{
public:
// Register a callback to be executed if cancellation is requested.
// Callback will be called with no arguments on the thread that calls
// request_cancellation() if cancellation is not yet requested, or
// called immediately if cancellation has already been requested.
// Callback must not throw an unhandled exception when called.
template < typename CALLBACK>
cancellation_registration (cancellation_token token, CALLBACK&& callback);
cancellation_registration ( const cancellation_registration& other) = delete ;
~cancellation_registration ();
};
class operation_cancelled : public std :: exception
{
public:
operation_cancelled ();
const char * what () const override ;
};
}示例:投票方法
cppcoro::task<> do_something_async (cppcoro::cancellation_token token)
{
// Explicitly define cancellation points within the function
// by calling throw_if_cancellation_requested().
token. throw_if_cancellation_requested ();
co_await do_step_1 ();
token. throw_if_cancellation_requested ();
do_step_2 ();
// Alternatively, you can query if cancellation has been
// requested to allow yourself to do some cleanup before
// returning.
if (token. is_cancellation_requested ())
{
display_message_to_user ( " Cancelling operation... " );
do_cleanup ();
throw cppcoro::operation_cancelled{};
}
do_final_step ();
}示例:回調方法
// Say we already have a timer abstraction that supports being
// cancelled but it doesn't support cancellation_tokens natively.
// You can use a cancellation_registration to register a callback
// that calls the existing cancellation API. e.g.
cppcoro::task<> cancellable_timer_wait (cppcoro::cancellation_token token)
{
auto timer = create_timer (10s);
cppcoro::cancellation_registration registration (token, [&]
{
// Call existing timer cancellation API.
timer. cancel ();
});
co_await timer;
}static_thread_pool static_thread_pool類提供了一個抽象,使您可以安排在固定尺寸的線程池上工作。
該課程實現了調度程序概念(見下文)。
您可以通過執行co_await threadPool.schedule()來加入線程池。此操作將暫停當前的coroutine,在螺紋池上執行它,然後在接下來免費運行螺紋池中的線程時,線程池將恢復coroutine。保證不投擲此操作,在常見情況下,不會分配任何內存。
該課程利用偷竊算法來負載跨多個線程的負載平衡工作。將從線程池螺紋匯到線程池的工作將安排在LIFO隊列中的同一線程上執行。從遠程線程中加入線程池的工作將被納入全局FIFO隊列。當工人線程從其本地隊列中用完工作用盡時,它首先嘗試從全球隊列中脫離工作。如果該隊列為空,那麼接下來它會試圖從其他工人線程的隊列的背面竊取工作。
API摘要:
namespace cppcoro
{
class static_thread_pool
{
public:
// Initialise the thread-pool with a number of threads equal to
// std::thread::hardware_concurrency().
static_thread_pool ();
// Initialise the thread pool with the specified number of threads.
explicit static_thread_pool (std:: uint32_t threadCount);
std:: uint32_t thread_count () const noexcept ;
class schedule_operation
{
public:
schedule_operation (static_thread_pool* tp) noexcept ;
bool await_ready () noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> h) noexcept ;
bool await_resume () noexcept ;
private:
// unspecified
};
// Return an operation that can be awaited by a coroutine.
//
//
[[nodiscard]]
schedule_operation schedule () noexcept ;
private:
// Unspecified
};
}示例用法:簡單
cppcoro::task<std::string> do_something_on_threadpool (cppcoro::static_thread_pool& tp)
{
// First schedule the coroutine onto the threadpool.
co_await tp. schedule ();
// When it resumes, this coroutine is now running on the threadpool.
do_something ();
}示例用法:並行執行操作 - 使用static_thread_pool使用schedule_on()運算符。
cppcoro::task< double > dot_product (static_thread_pool& tp, double a[], double b[], size_t count)
{
if (count > 1000 )
{
// Subdivide the work recursively into two equal tasks
// The first half is scheduled to the thread pool so it can run concurrently
// with the second half which continues on this thread.
size_t halfCount = count / 2 ;
auto [first, second] = co_await when_all (
schedule_on (tp, dot_product (tp, a, b, halfCount),
dot_product (tp, a + halfCount, b + halfCount, count - halfCount));
co_return first + second;
}
else
{
double sum = 0.0 ;
for ( size_t i = 0 ; i < count; ++i)
{
sum += a[i] * b[i];
}
co_return sum;
}
}io_service和io_work_scope io_service類為從異步I/O操作處理I/O完成事件提供了一個抽象。
當異步I/O操作完成時,等待該操作的Coroutine將在呼叫中的I/O線程上恢復到一個事件處理方法之一: process_events() , process_pending_events() ,process_one_event(), process_one_event()或process_one_pending_event() 。
io_service類不管理任何I/O線程。您必須確保某些線程調用一種用於派遣I/O完成事件的事件處理方法之一。這可以是一個專用線程,通過通過調用process_events()或process_one_pending_event()定期輪詢新事件來調用process_pending_events()或與其他事件循環(例如UI事件循環)混合。
這允許將io_service事件環與其他事件循環(例如用戶界面事件循環)集成。
您可以通過擁有多個線程call process_events()來對事件進行多路復用處理。您可以指定通過可選的io_service構造函數參數積極處理事件的最大線程數量的提示。
在Windows上,該實現利用Windows I/O完成端口工廠以可擴展的方式將事件分配給I/O線程。
API摘要:
namespace cppcoro
{
class io_service
{
public:
class schedule_operation ;
class timed_schedule_operation ;
io_service ();
io_service (std:: uint32_t concurrencyHint);
io_service (io_service&&) = delete ;
io_service ( const io_service&) = delete ;
io_service& operator =(io_service&&) = delete ;
io_service& operator =( const io_service&) = delete ;
~io_service ();
// Scheduler methods
[[nodiscard]]
schedule_operation schedule () noexcept ;
template < typename REP, typename RATIO>
[[nodiscard]]
timed_schedule_operation schedule_after (
std::chrono::duration<REP, RATIO> delay,
cppcoro::cancellation_token cancellationToken = {}) noexcept ;
// Event-loop methods
//
// I/O threads must call these to process I/O events and execute
// scheduled coroutines.
std:: uint64_t process_events ();
std:: uint64_t process_pending_events ();
std:: uint64_t process_one_event ();
std:: uint64_t process_one_pending_event ();
// Request that all threads processing events exit their event loops.
void stop () noexcept ;
// Query if some thread has called stop()
bool is_stop_requested () const noexcept ;
// Reset the event-loop after a call to stop() so that threads can
// start processing events again.
void reset ();
// Reference-counting methods for tracking outstanding references
// to the io_service.
//
// The io_service::stop() method will be called when the last work
// reference is decremented.
//
// Use the io_work_scope RAII class to manage calling these methods on
// entry-to and exit-from a scope.
void notify_work_started () noexcept ;
void notify_work_finished () noexcept ;
};
class io_service ::schedule_operation
{
public:
schedule_operation ( const schedule_operation&) noexcept ;
schedule_operation& operator =( const schedule_operation&) noexcept ;
bool await_ready () const noexcept ;
void await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () noexcept ;
};
class io_service ::timed_schedule_operation
{
public:
timed_schedule_operation (timed_schedule_operation&&) noexcept ;
timed_schedule_operation ( const timed_schedule_operation&) = delete ;
timed_schedule_operation& operator =( const timed_schedule_operation&) = delete ;
timed_schedule_operation& operator =(timed_schedule_operation&&) = delete ;
bool await_ready () const noexcept ;
void await_suspend (std::experimental::coroutine_handle<> awaiter);
void await_resume ();
};
class io_work_scope
{
public:
io_work_scope (io_service& ioService) noexcept ;
io_work_scope ( const io_work_scope& other) noexcept ;
io_work_scope (io_work_scope&& other) noexcept ;
~io_work_scope ();
io_work_scope& operator =( const io_work_scope& other) noexcept ;
io_work_scope& operator =(io_work_scope&& other) noexcept ;
io_service& service () const noexcept ;
};
}例子:
# include < cppcoro/task.hpp >
# include < cppcoro/task.hpp >
# include < cppcoro/io_service.hpp >
# include < cppcoro/read_only_file.hpp >
# include < experimental/filesystem >
# include < memory >
# include < algorithm >
# include < iostream >
namespace fs = std::experimental::filesystem;
cppcoro::task<std:: uint64_t > count_lines (cppcoro::io_service& ioService, fs::path path)
{
auto file = cppcoro::read_only_file::open (ioService, path);
constexpr size_t bufferSize = 4096 ;
auto buffer = std::make_unique<std:: uint8_t []>(bufferSize);
std:: uint64_t newlineCount = 0 ;
for (std:: uint64_t offset = 0 , fileSize = file. size (); offset < fileSize;)
{
const auto bytesToRead = static_cast < size_t >(
std::min<std:: uint64_t >(bufferSize, fileSize - offset));
const auto bytesRead = co_await file. read (offset, buffer. get (), bytesToRead);
newlineCount += std::count (buffer. get (), buffer. get () + bytesRead, ' n ' );
offset += bytesRead;
}
co_return newlineCount;
}
cppcoro::task<> run (cppcoro::io_service& ioService)
{
cppcoro::io_work_scope ioScope (ioService);
auto lineCount = co_await count_lines (ioService, fs::path{ " foo.txt " });
std::cout << " foo.txt has " << lineCount << " lines. " << std::endl;;
}
cppcoro::task<> process_events (cppcoro::io_service& ioService)
{
// Process events until the io_service is stopped.
// ie. when the last io_work_scope goes out of scope.
ioService. process_events ();
co_return ;
}
int main ()
{
cppcoro::io_service ioService;
cppcoro::sync_wait ( cppcoro::when_all_ready (
run (ioService),
process_events (ioService)));
return 0 ;
}io_service作為調度程序io_service類實現Scheduler和DelayedScheduler概念的接口。
這允許Coroutine暫停當前線程上的執行,並安排與特定io_service對象關聯的I/O線程恢復。
例子:
cppcoro::task<> do_something (cppcoro::io_service& ioService)
{
// Coroutine starts execution on the thread of the task awaiter.
// A coroutine can transfer execution to an I/O thread by awaiting the
// result of io_service::schedule().
co_await ioService. schedule ();
// At this point, the coroutine is now executing on an I/O thread
// inside a call to one of the io_service event processing methods.
// A coroutine can also perform a delayed-schedule that will suspend
// the coroutine for a specified duration of time before scheduling
// it for resumption on an I/O thread.
co_await ioService. schedule_after (100ms);
// At this point, the coroutine is executing on a potentially different I/O thread.
}file , readable_file , writable_file這些類型是用於執行混凝土文件I/O的抽象基類。
API摘要:
namespace cppcoro
{
class file_read_operation ;
class file_write_operation ;
class file
{
public:
virtual ~file ();
std:: uint64_t size () const ;
protected:
file (file&& other) noexcept ;
};
class readable_file : public virtual file
{
public:
[[nodiscard]]
file_read_operation read (
std:: uint64_t offset,
void * buffer,
std:: size_t byteCount,
cancellation_token ct = {}) const noexcept ;
};
class writable_file : public virtual file
{
public:
void set_size (std:: uint64_t fileSize);
[[nodiscard]]
file_write_operation write (
std:: uint64_t offset,
const void * buffer,
std:: size_t byteCount,
cancellation_token ct = {}) noexcept ;
};
class file_read_operation
{
public:
file_read_operation (file_read_operation&& other) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter);
std:: size_t await_resume ();
};
class file_write_operation
{
public:
file_write_operation (file_write_operation&& other) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter);
std:: size_t await_resume ();
};
}read_only_file , write_only_file , read_write_file這些類型代表具體文件I/O類。
API摘要:
namespace cppcoro
{
class read_only_file : public readable_file
{
public:
[[nodiscard]]
static read_only_file open (
io_service& ioService,
const std::experimental::filesystem::path& path,
file_share_mode shareMode = file_share_mode::read,
file_buffering_mode bufferingMode = file_buffering_mode::default_);
};
class write_only_file : public writable_file
{
public:
[[nodiscard]]
static write_only_file open (
io_service& ioService,
const std::experimental::filesystem::path& path,
file_open_mode openMode = file_open_mode::create_or_open,
file_share_mode shareMode = file_share_mode::none,
file_buffering_mode bufferingMode = file_buffering_mode::default_);
};
class read_write_file : public readable_file , public writable_file
{
public:
[[nodiscard]]
static read_write_file open (
io_service& ioService,
const std::experimental::filesystem::path& path,
file_open_mode openMode = file_open_mode::create_or_open,
file_share_mode shareMode = file_share_mode::none,
file_buffering_mode bufferingMode = file_buffering_mode::default_);
};
}所有open()函數投擲std::system_error失敗。
注意:當前僅在Windows平台上支持網絡抽象。 Linux支持即將推出。
socket插座類可用於異步通過網絡發送/接收數據。
當前僅支持IPv4和IPv6上的TCP/IP,UDP/IP。
API摘要:
// <cppcoro/net/socket.hpp>
namespace cppcoro ::net
{
class socket
{
public:
static socket create_tcpv4 (ip_service& ioSvc);
static socket create_tcpv6 (ip_service& ioSvc);
static socket create_updv4 (ip_service& ioSvc);
static socket create_udpv6 (ip_service& ioSvc);
socket (socket&& other) noexcept ;
~socket ();
socket& operator =(socket&& other) noexcept ;
// Return the native socket handle for the socket
<platform-specific> native_handle () noexcept ;
const ip_endpoint& local_endpoint () const noexcept ;
const ip_endpoint& remote_endpoint () const noexcept ;
void bind ( const ip_endpoint& localEndPoint);
void listen ();
[[nodiscard]]
Awaitable< void > connect ( const ip_endpoint& remoteEndPoint) noexcept ;
[[nodiscard]]
Awaitable< void > connect ( const ip_endpoint& remoteEndPoint,
cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable< void > accept (socket& acceptingSocket) noexcept ;
[[nodiscard]]
Awaitable< void > accept (socket& acceptingSocket,
cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable< void > disconnect () noexcept ;
[[nodiscard]]
Awaitable< void > disconnect (cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > send ( const void * buffer, std:: size_t size) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > send ( const void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > recv ( void * buffer, std:: size_t size) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > recv ( void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
[[nodiscard]]
socket_recv_from_operation recv_from (
void * buffer,
std:: size_t size) noexcept ;
[[nodiscard]]
socket_recv_from_operation_cancellable recv_from (
void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
[[nodiscard]]
socket_send_to_operation send_to (
const ip_endpoint& destination,
const void * buffer,
std:: size_t size) noexcept ;
[[nodiscard]]
socket_send_to_operation_cancellable send_to (
const ip_endpoint& destination,
const void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
void close_send ();
void close_recv ();
};
}示例:迴聲服務器
# include < cppcoro/net/socket.hpp >
# include < cppcoro/io_service.hpp >
# include < cppcoro/cancellation_source.hpp >
# include < cppcoro/async_scope.hpp >
# include < cppcoro/on_scope_exit.hpp >
# include < memory >
# include < iostream >
cppcoro::task< void > handle_connection (socket s)
{
try
{
const size_t bufferSize = 16384 ;
auto buffer = std::make_unique< unsigned char []>(bufferSize);
size_t bytesRead;
do {
// Read some bytes
bytesRead = co_await s. recv (buffer. get (), bufferSize);
// Write some bytes
size_t bytesWritten = 0 ;
while (bytesWritten < bytesRead) {
bytesWritten += co_await s. send (
buffer. get () + bytesWritten,
bytesRead - bytesWritten);
}
} while (bytesRead != 0 );
s. close_send ();
co_await s. disconnect ();
}
catch (...)
{
std::cout << " connection failed " << std::
}
}
cppcoro::task< void > echo_server (
cppcoro::net::ipv4_endpoint endpoint,
cppcoro::io_service& ioSvc,
cancellation_token ct)
{
cppcoro::async_scope scope;
std::exception_ptr ex;
try
{
auto listeningSocket = cppcoro::net::socket::create_tcpv4 (ioSvc);
listeningSocket. bind (endpoint);
listeningSocket. listen ();
while ( true ) {
auto connection = cppcoro::net::socket::create_tcpv4 (ioSvc);
co_await listeningSocket. accept (connection, ct);
scope. spawn ( handle_connection ( std::move (connection)));
}
}
catch (cppcoro::operation_cancelled)
{
}
catch (...)
{
ex = std::current_exception ();
}
// Wait until all handle_connection tasks have finished.
co_await scope. join ();
if (ex) std::rethrow_exception (ex);
}
int main ( int argc, const char * argv[])
{
cppcoro::io_service ioSvc;
if (argc != 2 ) return - 1 ;
auto endpoint = cppcoro::ipv4_endpoint::from_string (argv[ 1 ]);
if (!endpoint) return - 1 ;
( void ) cppcoro::sync_wait ( cppcoro::when_all (
[&]() -> task<>
{
// Shutdown the event loop once finished.
auto stopOnExit = cppcoro::on_scope_exit ([&] { ioSvc. stop (); });
cppcoro::cancellation_source canceller;
co_await cppcoro::when_all (
[&]() -> task<>
{
// Run for 30s then stop accepting new connections.
co_await ioSvc. schedule_after ( std::chrono::seconds ( 30 ));
canceller. request_cancellation ();
}(),
echo_server (*endpoint, ioSvc, canceller. token ()));
}(),
[&]() -> task<>
{
ioSvc. process_events ();
}()));
return 0 ;
}ip_address , ipv4_address , ipv6_address用於表示IP地址的輔助類。
API摘要:
namespace cppcoro ::net
{
class ipv4_address
{
using bytes_t = std:: uint8_t [ 4 ];
public:
constexpr ipv4_address ();
explicit constexpr ipv4_address (std:: uint32_t integer);
explicit constexpr ipv4_address ( const std::uint8_t (&bytes)[4]);
explicit constexpr ipv4_address (std:: uint8_t b0,
std:: uint8_t b1,
std:: uint8_t b2,
std:: uint8_t b3);
constexpr const bytes_t & bytes () const ;
constexpr std:: uint32_t to_integer () const ;
static constexpr ipv4_address loopback ();
constexpr bool is_loopback () const ;
constexpr bool is_private_network () const ;
constexpr bool operator ==(ipv4_address other) const ;
constexpr bool operator !=(ipv4_address other) const ;
constexpr bool operator <(ipv4_address other) const ;
constexpr bool operator >(ipv4_address other) const ;
constexpr bool operator <=(ipv4_address other) const ;
constexpr bool operator >=(ipv4_address other) const ;
std::string to_string ();
static std::optional<ipv4_address> from_string (std::string_view string) noexcept ;
};
class ipv6_address
{
using bytes_t = std:: uint8_t [ 16 ];
public:
constexpr ipv6_address ();
explicit constexpr ipv6_address (
std:: uint64_t subnetPrefix,
std:: uint64_t interfaceIdentifier);
constexpr ipv6_address (
std:: uint16_t part0,
std:: uint16_t part1,
std:: uint16_t part2,
std:: uint16_t part3,
std:: uint16_t part4,
std:: uint16_t part5,
std:: uint16_t part6,
std:: uint16_t part7);
explicit constexpr ipv6_address (
const std::uint16_t (&parts)[8]);
explicit constexpr ipv6_address (
const std::uint8_t (bytes)[16]);
constexpr const bytes_t & bytes () const ;
constexpr std:: uint64_t subnet_prefix () const ;
constexpr std:: uint64_t interface_identifier () const ;
static constexpr ipv6_address unspecified ();
static constexpr ipv6_address loopback ();
static std::optional<ipv6_address> from_string (std::string_view string) noexcept ;
std::string to_string () const ;
constexpr bool operator ==( const ipv6_address& other) const ;
constexpr bool operator !=( const ipv6_address& other) const ;
constexpr bool operator <( const ipv6_address& other) const ;
constexpr bool operator >( const ipv6_address& other) const ;
constexpr bool operator <=( const ipv6_address& other) const ;
constexpr bool operator >=( const ipv6_address& other) const ;
};
class ip_address
{
public:
// Constructs to IPv4 address 0.0.0.0
ip_address () noexcept ;
ip_address (ipv4_address address) noexcept ;
ip_address (ipv6_address address) noexcept ;
bool is_ipv4 () const noexcept ;
bool is_ipv6 () const noexcept ;
const ipv4_address& to_ipv4 () const ;
const ipv6_address& to_ipv6 () const ;
const std:: uint8_t * bytes () const noexcept ;
std::string to_string () const ;
static std::optional<ip_address> from_string (std::string_view string) noexcept ;
bool operator ==( const ip_address& rhs) const noexcept ;
bool operator !=( const ip_address& rhs) const noexcept ;
// ipv4_address sorts less than ipv6_address
bool operator <( const ip_address& rhs) const noexcept ;
bool operator >( const ip_address& rhs) const noexcept ;
bool operator <=( const ip_address& rhs) const noexcept ;
bool operator >=( const ip_address& rhs) const noexcept ;
};
}ip_endpoint , ipv4_endpoint ipv6_endpoint用於表示IP地址和端口數的幫助課程。
API摘要:
namespace cppcoro ::net
{
class ipv4_endpoint
{
public:
ipv4_endpoint () noexcept ;
explicit ipv4_endpoint (ipv4_address address, std:: uint16_t port = 0 ) noexcept ;
const ipv4_address& address () const noexcept ;
std:: uint16_t port () const noexcept ;
std::string to_string () const ;
static std::optional<ipv4_endpoint> from_string (std::string_view string) noexcept ;
};
bool operator ==( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator !=( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator <( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator >( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator <=( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator >=( const ipv4_endpoint& a, const ipv4_endpoint& b);
class ipv6_endpoint
{
public:
ipv6_endpoint () noexcept ;
explicit ipv6_endpoint (ipv6_address address, std:: uint16_t port = 0 ) noexcept ;
const ipv6_address& address () const noexcept ;
std:: uint16_t port () const noexcept ;
std::string to_string () const ;
static std::optional<ipv6_endpoint> from_string (std::string_view string) noexcept ;
};
bool operator ==( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator !=( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator <( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator >( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator <=( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator >=( const ipv6_endpoint& a, const ipv6_endpoint& b);
class ip_endpoint
{
public:
// Constructs to IPv4 end-point 0.0.0.0:0
ip_endpoint () noexcept ;
ip_endpoint (ipv4_endpoint endpoint) noexcept ;
ip_endpoint (ipv6_endpoint endpoint) noexcept ;
bool is_ipv4 () const noexcept ;
bool is_ipv6 () const noexcept ;
const ipv4_endpoint& to_ipv4 () const ;
const ipv6_endpoint& to_ipv6 () const ;
ip_address address () const noexcept ;
std:: uint16_t port () const noexcept ;
std::string to_string () const ;
static std::optional<ip_endpoint> from_string (std::string_view string) noexcept ;
bool operator ==( const ip_endpoint& rhs) const noexcept ;
bool operator !=( const ip_endpoint& rhs) const noexcept ;
// ipv4_endpoint sorts less than ipv6_endpoint
bool operator <( const ip_endpoint& rhs) const noexcept ;
bool operator >( const ip_endpoint& rhs) const noexcept ;
bool operator <=( const ip_endpoint& rhs) const noexcept ;
bool operator >=( const ip_endpoint& rhs) const noexcept ;
};
}sync_wait() sync_wait()函數可用於同步等待,直到指定的awaitable完成。
指定的等待期將在新創建的coroutine內部的當前線程上co_await 。
sync_wait()調用將阻塞直到操作完成,並返回co_await表達式的結果,或者如果co_await表達式完成,則以未手動異常而重新歸還例外。
sync_wait()函數對於從main()內部啟動頂級任務並等到任務完成為止非常有用,實際上,這是啟動第一個/頂級task唯一方法。
API摘要:
// <cppcoro/sync_wait.hpp>
namespace cppcoro
{
template < typename AWAITABLE>
auto sync_wait (AWAITABLE&& awaitable)
-> typename awaitable_traits<AWAITABLE&&>::await_result_t;
}示例:
void example_task ()
{
auto makeTask = []() -> task<std::string>
{
co_return " foo " ;
};
auto task = makeTask ();
// start the lazy task and wait until it completes
sync_wait (task); // -> "foo"
sync_wait ( makeTask ()); // -> "foo"
}
void example_shared_task ()
{
auto makeTask = []() -> shared_task<std::string>
{
co_return " foo " ;
};
auto task = makeTask ();
// start the shared task and wait until it completes
sync_wait (task) == " foo " ;
sync_wait ( makeTask ()) == " foo " ;
}when_all_ready() when_all_ready()函數可用於創建一個新的等待,該函數在所有輸入等待設備完成後完成。
輸入任務可以是任何類型的期待。
當返回的等待期被co_await使用時,它將co_await其傳遞給when_all_ready()函數的順序依次在等待線程的每個輸入等待線索中。如果這些任務不同步完成,則它們將同時執行。
一旦輸入就診的所有co_await表達式已經完成完成,返回的等待功能將完成並恢復等待的Coroutine。等待的Coroutine將恢復在最後要完成的輸入等待的線程上。
當co_await Ed時,即使某些輸入等待因素失敗而沒有例外,也可以保證返回的等待期。
但是,請注意,如果when_all_ready()呼叫本身可能會拋出std::bad_alloc如果它無法為等待每個輸入的apotables所需的coroutine幀分配內存。如果任何輸入等待的對像都會從其複制/移動構造函數中拋出任何輸入的對象,也可能會引發異常。
返回的等待co_await的結果是std::tuple或std::vector的when_all_task<RESULT>結果>對象的vector。這些對象允許您通過調用相應輸出任務的when_all_task<RESULT>::result()方法分別獲得每個輸入等待的結果(或異常)。這使呼叫者可以同時等待多個等待材料並同步其完成,同時仍保留隨後檢查每個co_await操作的結果的能力,以獲得成功/失敗。
這不同於when_all()任何單個co_await操作的故障導致整體操作失敗,而異常。這意味著您無法確定哪些組件co_await操作失敗了,也可以阻止您獲得其他co_await操作的結果。
API摘要:
// <cppcoro/when_all_ready.hpp>
namespace cppcoro
{
// Concurrently await multiple awaitables.
//
// Returns an awaitable object that, when co_await'ed, will co_await each of the input
// awaitable objects and will resume the awaiting coroutine only when all of the
// component co_await operations complete.
//
// Result of co_await'ing the returned awaitable is a std::tuple of detail::when_all_task<T>,
// one for each input awaitable and where T is the result-type of the co_await expression
// on the corresponding awaitable.
//
// AWAITABLES must be awaitable types and must be movable (if passed as rvalue) or copyable
// (if passed as lvalue). The co_await expression will be executed on an rvalue of the
// copied awaitable.
template < typename ... AWAITABLES>
auto when_all_ready (AWAITABLES&&... awaitables)
-> Awaitable<std::tuple<detail::when_all_task<typename awaitable_traits<AWAITABLES>::await_result_t>...>>;
// Concurrently await each awaitable in a vector of input awaitables.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t >
auto when_all_ready (std::vector<AWAITABLE> awaitables)
-> Awaitable<std::vector<detail::when_all_task<RESULT>>>;
}示例用法:
task<std::string> get_record ( int id);
task<> example1 ()
{
// Run 3 get_record() operations concurrently and wait until they're all ready.
// Returns a std::tuple of tasks that can be unpacked using structured bindings.
auto [task1, task2, task3] = co_await when_all_ready (
get_record ( 123 ),
get_record ( 456 ),
get_record ( 789 ));
// Unpack the result of each task
std::string& record1 = task1. result ();
std::string& record2 = task2. result ();
std::string& record3 = task3. result ();
// Use records....
}
task<> example2 ()
{
// Create the input tasks. They don't start executing yet.
std::vector<task<std::string>> tasks;
for ( int i = 0 ; i < 1000 ; ++i)
{
tasks. emplace_back ( get_record (i));
}
// Execute all tasks concurrently.
std::vector<detail::when_all_task<std::string>> resultTasks =
co_await when_all_ready ( std::move (tasks));
// Unpack and handle each result individually once they're all complete.
for ( int i = 0 ; i < 1000 ; ++i)
{
try
{
std::string& record = tasks[i]. result ();
std::cout << i << " = " << record << std::endl;
}
catch ( const std:: exception & ex)
{
std::cout << i << " : " << ex. what () << std::endl;
}
}
}when_all() when_all()函數可用於創建一個新的等待,當co_await Ed將同時co_await同時同時返回其各個結果的匯總。
當等待返回的等待返回時,它將在當前線程上co_await每個輸入等待。一旦第一個等待的暫停,將開始第二任任務,依此類推。操作同時執行,直到它們全部完成為止。
一旦所有組件co_await操作都運行到完成,則將從每個單獨的結果構建結果的匯總。如果任何輸入任務都會拋出異常,或者骨料結果的構造會引發異常,則例外將從返回的等待期的co_await中傳播。
如果多個co_await操作失敗而例外失敗,則co_await when_all()中傳播,其他例外將被默默地忽略。未指定將選擇哪個操作例外。
如果知道哪個組件co_await操作失敗或保留獲得其他操作結果的能力很重要,即使其中某些操作失敗,您應該使用when_all_ready()而不是使用。
API摘要:
// <cppcoro/when_all.hpp>
namespace cppcoro
{
// Variadic version.
//
// Note that if the result of `co_await awaitable` yields a void-type
// for some awaitables then the corresponding component for that awaitable
// in the tuple will be an empty struct of type detail::void_value.
template < typename ... AWAITABLES>
auto when_all (AWAITABLES&&... awaitables)
-> Awaitable<std::tuple<typename awaitable_traits<AWAITABLES>::await_result_t...>>;
// Overload for vector<Awaitable<void>>.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t ,
std:: enable_if_t <std::is_void_v<RESULT>, int > = 0 >
auto when_all (std::vector<AWAITABLE> awaitables)
-> Awaitable<void>;
// Overload for vector<Awaitable<NonVoid>> that yield a value when awaited.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t ,
std:: enable_if_t <!std::is_void_v<RESULT>, int > = 0 >
auto when_all (std::vector<AWAITABLE> awaitables)
-> Awaitable<std::vector<std::conditional_t<
std::is_lvalue_reference_v<RESULT>,
std::reference_wrapper<std::remove_reference_t<RESULT>>,
std::remove_reference_t<RESULT>>>>;
}示例:
task<A> get_a ();
task<B> get_b ();
task<> example1 ()
{
// Run get_a() and get_b() concurrently.
// Task yields a std::tuple<A, B> which can be unpacked using structured bindings.
auto [a, b] = co_await when_all ( get_a (), get_b ());
// use a, b
}
task<std::string> get_record ( int id);
task<> example2 ()
{
std::vector<task<std::string>> tasks;
for ( int i = 0 ; i < 1000 ; ++i)
{
tasks. emplace_back ( get_record (i));
}
// Concurrently execute all get_record() tasks.
// If any of them fail with an exception then the exception will propagate
// out of the co_await expression once they have all completed.
std::vector<std::string> records = co_await when_all ( std::move (tasks));
// Process results
for ( int i = 0 ; i < 1000 ; ++i)
{
std::cout << i << " = " << records[i] << std::endl;
}
}fmap() fmap()函數可用於將可karble函數應用於容器類型中包含的值(s),並返回應用函數的結果的新容器類型的型函數。
fmap()函數可以將函數應用於類型generator<T> , recursive_generator<T>和async_generator<T>的值,以及任何支持Awaitable概念的值(例如task<T> )。
這些類型中的每一種都為fmap()提供了兩個參數的過載。要應用的函數和容器值。有關支持的fmap()超載的每種類型的文檔。
例如, fmap()函數可用於將函數應用於task<T>的最終結果,從而產生一個新task<U> ,該函數將與該函數的返回值一起完成。
// Given a function you want to apply that converts
// a value of type A to value of type B.
B a_to_b (A value);
// And a task that yields a value of type A
cppcoro::task<A> get_an_a ();
// We can apply the function to the result of the task using fmap()
// and obtain a new task yielding the result.
cppcoro::task<B> bTask = fmap(a_to_b, get_an_a());
// An alternative syntax is to use the pipe notation.
cppcoro::task<B> bTask = get_an_a() | cppcoro::fmap(a_to_b);API摘要:
// <cppcoro/fmap.hpp>
namespace cppcoro
{
template < typename FUNC>
struct fmap_transform
{
fmap_transform (FUNC&& func) noexcept (std::is_nothrow_move_constructible_v<FUNC>);
FUNC func;
};
// Type-deducing constructor for fmap_transform object that can be used
// in conjunction with operator|.
template < typename FUNC>
fmap_transform<FUNC> fmap (FUNC&& func);
// operator| overloads for providing pipe-based syntactic sugar for fmap()
// such that the expression:
// <value-expr> | cppcoro::fmap(<func-expr>)
// is equivalent to:
// fmap(<func-expr>, <value-expr>)
template < typename T, typename FUNC>
decltype ( auto ) operator |(T&& value, fmap_transform<FUNC>&& transform);
template < typename T, typename FUNC>
decltype ( auto ) operator |(T&& value, fmap_transform<FUNC>& transform);
template < typename T, typename FUNC>
decltype ( auto ) operator |(T&& value, const fmap_transform<FUNC>& transform);
// Generic overload for all awaitable types.
//
// Returns an awaitable that when co_awaited, co_awaits the specified awaitable
// and applies the specified func to the result of the 'co_await awaitable'
// expression as if by 'std::invoke(func, co_await awaitable)'.
//
// If the type of 'co_await awaitable' expression is 'void' then co_awaiting the
// returned awaitable is equivalent to 'co_await awaitable, func()'.
template <
typename FUNC,
typename AWAITABLE,
std:: enable_if_t <is_awaitable_v<AWAITABLE>, int > = 0 >
auto fmap (FUNC&& func, AWAITABLE&& awaitable)
-> Awaitable<std::invoke_result_t<FUNC, typename awaitable_traits<AWAITABLE>::await_result_t>>;
} fmap()函數設計為通過參數依賴性查找(ADL)查找正確的過載,因此通常應在沒有cppcoro::前綴的情況下調用它。
resume_on() resume_on()函數可用於控制等待的執行上下文,即等待等待時會恢復等待的coroutine。當應用於async_generator時,它會控制哪個執行上下文co_await g.begin()和co_await ++it操作恢復等待的coroutines。
通常,正在等待的等待coroutine(例如, task )或async_generator將在操作完成的任何線程上恢復執行。在某些情況下,這可能不是您要繼續執行的線程。在這些情況下,您可以使用resume_on()函數來創建一個新的等待或生成器,該函數將在與指定計劃程序關聯的線程上恢復執行。
resume_on()函數可以用作返回新的等待/發電機的普通函數。否則可以在管道輔助詞中使用。
例子:
task<record> load_record ( int id);
ui_thread_scheduler uiThreadScheduler;
task<> example ()
{
// This will start load_record() on the current thread.
// Then when load_record() completes (probably on an I/O thread)
// it will reschedule execution onto thread pool and call to_json
// Once to_json completes it will transfer execution onto the
// ui thread before resuming this coroutine and returning the json text.
task<std::string> jsonTask =
load_record ( 123 )
| cppcoro::resume_on ( threadpool::default ())
| cppcoro::fmap (to_json)
| cppcoro::resume_on (uiThreadScheduler);
// At this point, all we've done is create a pipeline of tasks.
// The tasks haven't started executing yet.
// Await the result. Starts the pipeline of tasks.
std::string jsonText = co_await jsonTask;
// Guaranteed to be executing on ui thread here.
someUiControl. set_text (jsonText);
}API摘要:
// <cppcoro/resume_on.hpp>
namespace cppcoro
{
template < typename SCHEDULER, typename AWAITABLE>
auto resume_on (SCHEDULER& scheduler, AWAITABLE awaitable)
-> Awaitable<typename awaitable_traits<AWAITABLE>::await_traits_t>;
template < typename SCHEDULER, typename T>
async_generator<T> resume_on (SCHEDULER& scheduler, async_generator<T> source);
template < typename SCHEDULER>
struct resume_on_transform
{
explicit resume_on_transform (SCHEDULER& scheduler) noexcept ;
SCHEDULER& scheduler;
};
// Construct a transform/operation that can be applied to a source object
// using "pipe" notation (ie. operator|).
template < typename SCHEDULER>
resume_on_transform<SCHEDULER> resume_on (SCHEDULER& scheduler) noexcept ;
// Equivalent to 'resume_on(transform.scheduler, std::forward<T>(value))'
template < typename T, typename SCHEDULER>
decltype ( auto ) operator |(T&& value, resume_on_transform<SCHEDULER> transform)
{
return resume_on (transform. scheduler , std::forward<T>(value));
}
}schedule_on() schedule_on()函數可用於更改給定的已久或async_generator開始執行的執行上下文。
當應用於async_generator時,它還會影響co_yield語句之後恢復的執行上下文。
請注意, schedule_on變換並未指定所期待或async_generator將完成或產生結果的線程,這取決於所期待或生成器的實現。
有關控制操作完成的線程的轉換,請參見resume_on()操作員。
例如:
task< int > get_value ();
io_service ioSvc;
task<> example ()
{
// Starts executing get_value() on the current thread.
int a = co_await get_value ();
// Starts executing get_value() on a thread associated with ioSvc.
int b = co_await schedule_on (ioSvc, get_value ());
}API摘要:
// <cppcoro/schedule_on.hpp>
namespace cppcoro
{
// Return a task that yields the same result as 't' but that
// ensures that 't' is co_await'ed on a thread associated with
// the specified scheduler. Resulting task will complete on
// whatever thread 't' would normally complete on.
template < typename SCHEDULER, typename AWAITABLE>
auto schedule_on (SCHEDULER& scheduler, AWAITABLE awaitable)
-> Awaitable<typename awaitable_traits<AWAITABLE>::await_result_t>;
// Return a generator that yields the same sequence of results as
// 'source' but that ensures that execution of the coroutine starts
// execution on a thread associated with 'scheduler' and resumes
// after a 'co_yield' on a thread associated with 'scheduler'.
template < typename SCHEDULER, typename T>
async_generator<T> schedule_on (SCHEDULER& scheduler, async_generator<T> source);
template < typename SCHEDULER>
struct schedule_on_transform
{
explicit schedule_on_transform (SCHEDULER& scheduler) noexcept ;
SCHEDULER& scheduler;
};
template < typename SCHEDULER>
schedule_on_transform<SCHEDULER> schedule_on (SCHEDULER& scheduler) noexcept ;
template < typename T, typename SCHEDULER>
decltype ( auto ) operator |(T&& value, schedule_on_transform<SCHEDULER> transform);
}awaitable_traits<T>如果應用於T型的表達式,則該模板元函數可用於確定co_await表達式所產生的類型。
請注意,這是假設T型的值正在等待中,在這種情況下,它不受Coroutine Promise對象應用的任何await_transform的影響。如果在這種情況下等待T型值的值,則結果可能會有所不同。
awaitable_traits<T>模板元功能不能定義awaiter_t或await_result_t nested typedefs如果類型T ,則不等待。這允許其在不等待T時禁用超載的Sfinae上下文中使用。
API摘要:
// <cppcoro/awaitable_traits.hpp>
namespace cppcoro
{
template < typename T>
struct awaitable_traits
{
// The type that results from applying `operator co_await()` to a value
// of type T, if T supports an `operator co_await()`, otherwise is type `T&&`.
typename awaiter_t = <unspecified>;
// The type of the result of co_await'ing a value of type T.
typename await_result_t = <unspecified>;
};
}is_awaitable<T> is_awaitable<T>模板元功能使您可以查詢是否可以從Coroutine內部進行co_await ed。
API摘要:
// <cppcoro/is_awaitable.hpp>
namespace cppcoro
{
template < typename T>
struct is_awaitable : std::bool_constant<...>
{};
template < typename T>
constexpr bool is_awaitable_v = is_awaitable<T>::value;
}Awaitable<T>概念Awaitable<T>是一個概念,表明可以在沒有await_transform過載的coroutine上下文中co_await類型,並且co_await表達式的結果具有類型, T 。
例如,類型task<T>實現了Awaitable<T&&>概念,而類型task<T>&實現了Awaitable<T&>概念。
Awaiter<T>概念Awaiter<T>是一個概念,指示某種類型包含await_ready await_suspend和await_resume方法,以實現暫停/恢復等待coroutine的協議所需的方法。
滿足Awaiter<T>的類型必須具有awaiter的實例:
awaiter.await_ready() - > boolawaiter.await_suspend(std::experimental::coroutine_handle<void>{}) - > void or bool or bool or std::experimental::coroutine_handle<P> for P >awaiter.await_resume() - > T任何實現Awaiter<T>概念的類型也都實現了Awaitable<T>概念。
Scheduler概念Scheduler是一個概念,允許在某些執行上下文中調度Coroutines執行。
concept Scheduler
{
Awaitable< void > schedule ();
}給定類型的S ,可以實現Scheduler概念,而類型S的實例s :
s.schedule()方法返回等待類型的型號,使得co_await s.schedule()將無條件暫停當前的coroutine並將其安排在與調度s相關的執行上下文中。co_await s.schedule()表達的結果具有void 。 cppcoro::task<> f (Scheduler& scheduler)
{
// Execution of the coroutine is initially on the caller's execution context.
// Suspends execution of the coroutine and schedules it for resumption on
// the scheduler's execution context.
co_await scheduler. schedule ();
// At this point the coroutine is now executing on the scheduler's
// execution context.
}DelayedScheduler概念DelayedScheduler是一個概念,它允許Coroutine安排在指定的持續時間之後,在調度程序的執行上下文上進行執行。
concept DelayedScheduler : Scheduler
{
template < typename REP, typename RATIO>
Awaitable< void > schedule_after (std::chrono::duration<REP, RATIO> delay);
template < typename REP, typename RATIO>
Awaitable< void > schedule_after (
std::chrono::duration<REP, RATIO> delay,
cppcoro::cancellation_token cancellationToken);
}給定一種類型的S ,它實現了DelayedScheduler和一個實例S類型的s :
s.schedule_after(delay)方法返回一個可以等待的對象,以使co_await s.schedule_after(delay)暫停了當前的coroutine delay時間,然後再安排與s程序相關的執行上下文恢復執行式。co_await s.schedule_after(delay)表達式具有void 。CPPCORO庫支持Visual Studio 2017和Linux在Windows下進行建築,並帶有Clang 5.0+的Linux。
該庫利用蛋糕製造系統(不,而不是C#一個)。
蛋糕製造系統會自動檢查為Git子模塊,因此您無需單獨下載或安裝。
該庫當前需要Visual Studio 2017或更高版本和Windows 10 SDK。
計劃支持Clang(#3)和Linux(#15)。
蛋糕構建系統在Python中實施,要求安裝Python 2.7。
確保Python 2.7口譯員在您的路徑中,並以“ Python”的形式使用。
確保安裝了Visual Studio 2017更新3或更高版本。請注意,更新2或更早的Coroutines有一些已知問題已在Update 3中修復。
您還可以通過從https:///vcppdogfooding.azurewebsites.net/下載Nuget軟件包,並將.nuget文件解壓縮到目錄中,從而使用Visual Studio編譯器的實驗版本。只需更新config.cake文件以通過修改和刪除以下行來指向未拉鍊的位置:
nugetPath = None # r'C:PathToVisualCppTools.14.0.25224-Pre'確保已安裝Windows 10 SDK。默認情況下,它將使用最新的Windows 10 SDK和通用C運行時版本。
CPPCORO存儲庫利用Git subsodules吸引了蛋糕製造系統的來源。
這意味著您需要將--recursive標誌傳遞給git clone命令。例如。
c:Code> git clone --recursive https://github.com/lewissbaker/cppcoro.git
如果您已經克隆了cppcoro,則應在拉更改後更新子模型。
c:Codecppcoro> git submodule update --init --recursive
要從命令行構建,只需在工作區根中運行“ cake.bat”。
例如。
C:cppcoro> cake.bat
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Compiling testmain.cpp
Compiling testmain.cpp
Compiling testmain.cpp
Compiling testmain.cpp
...
Linking buildwindows_x86_msvc14.10_debugtestrun.exe
Linking buildwindows_x64_msvc14.10_optimisedtestrun.exe
Linking buildwindows_x86_msvc14.10_optimisedtestrun.exe
Linking buildwindows_x64_msvc14.10_debugtestrun.exe
Generating code
Finished generating code
Generating code
Finished generating code
Build succeeded.
Build took 0:00:02.419.
默認情況下,沒有參數的cake將建立所有具有所有構建變體的項目並執行單位測試。您可以通過傳遞其他命令行參數來縮小構建的內容。例如。
c:cppcoro> cake.bat release=debug architecture=x64 lib/build.cake
Building with C:UsersLewisCodecppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Archiving buildwindows_x64_msvc14.10_debuglibcppcoro.lib
Build succeeded.
Build took 0:00:00.321.
您可以運行cake --help來列出可用的命令行選項。
要從Visual Studio內開發您可以通過運行cake.bat -p構建.vcproj/.sln文件。
例如。
c:cppcoro> cake.bat -p
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Generating Solution build/project/cppcoro.sln
Generating Project build/project/cppcoro_tests.vcxproj
Generating Filters build/project/cppcoro_tests.vcxproj.filters
Generating Project build/project/cppcoro.vcxproj
Generating Filters build/project/cppcoro.vcxproj.filters
Build succeeded.
Build took 0:00:00.247.
當您從Visual Studio內部構建這些項目時,它將召集Cake進行彙編。
CPPCORO項目也可以使用Clang+ Libc ++ 5.0或更高版本在Linux下構建。
Cppcoro建築物已在Ubuntu 17.04下進行了測試。
確保已安裝以下軟件包:
這是假設您已經建立和安裝了Clang和Libc ++。
如果您還沒有配置clang,請參見以下各節,以獲取有關使用CPPCORO構建的Clang的詳細信息。
結帳cppcoro及其子模型:
git clone --recursive https://github.com/lewissbaker/cppcoro.git cppcoro
運行init.sh設置cake bash功能:
cd cppcoro
source init.sh
然後,您可以從工作區根開始運行cake以構建cppcoro並進行測試:
$ cake
您可以指定其他命令行參數來自定義構建:
--help將打印出命令行參數的幫助--debug=run將顯示正在運行的構建命令線release=debug或release=optimised將限制構建變體進行調試或優化(默認情況下,它將同時構建)。lib/build.cake將僅構建CPPCORO庫,而不是測試。test/build.cake@task_tests.cpp只需編譯特定的源文件test/build.cake@testresult將建立和運行測試例如:
$ cake --debug=run release=debug lib/build.cake
如果您的clang編譯器不在/usr/bin/clang ,則可以使用以下一個或多個cake的命令行選項來指定替代位置:
--clang-executable=<name> - 指定要使用的clang可執行名稱而不是clang 。例如。強制使用clang 8.0 Pass --clang-executable=clang-8--clang-executable=<abspath> - 指定Clang可執行文件的完整路徑。構建系統還將在同一目錄中尋找其他可執行文件。如果此路徑具有<prefix>/bin/<name>的形式,則將其設置為默認的clang-install-prefix <prefix> 。--clang-install-prefix=<path> - 指定已安裝clang的路徑。這將導致構建系統在<path>/bin下尋找叮噹聲(除非由--clang-executable覆蓋)。--libcxx-install-prefix=<path> - 指定已安裝LIBC ++的路徑。默認情況下,構建系統將在與Clang同一位置尋找LIBC ++。如果將此命令行選項安裝在其他位置,請使用此命令行。示例:使用默認位置中安裝的clang的特定版本
$ cake --clang-executable=clang-8
示例:從自定義位置使用clang的默認版本
$ cake --clang-install-prefix=/path/to/clang-install
示例:在自定義位置使用clang的特定版本,來自不同位置的libc ++
$ cake --clang-executable=/path/to/clang-install/bin/clang-8 --libcxx-install-prefix=/path/to/libcxx-install
如果您的Linux發行版沒有Clang 5.0或以後的版本,則可以從LLVM項目安裝快照構建。
在http://apt.llvm.org/上按照說明進行操作,以設置您的軟件包管理器,以支持從LLVM軟件包管理器中拉出。
例如,對於Ubuntu 17.04 Zesty:
edit /etc/apt/sources.list sources.list,添加以下行:
deb http://apt.llvm.org/zesty/ llvm-toolchain-zesty main
deb-src http://apt.llvm.org/zesty/ llvm-toolchain-zesty main
為這些軟件包安裝PGP密鑰:
$ wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | sudo apt-key add -
安裝Clang和LLD:
$ sudo apt-get install clang-6.0 lld-6.0
LLVM快照構建不包括LIBC ++版本,因此您需要自己構建。見下文。
您還可以通過自己構建clang來使用出血 - 邊緣版本。
請參閱此處的說明:
為此,您需要安裝以下先決條件:
$ sudo apt-get install git cmake ninja-build clang lld
請注意,我們正在使用您的發行版本的clang版本來構建clang source。 GCC也可以在這裡使用。
結帳LLVM + CLANG + LLD + LIBC ++存儲庫:
mkdir llvm
cd llvm
git clone --depth=1 https://github.com/llvm-mirror/llvm.git llvm
git clone --depth=1 https://github.com/llvm-mirror/clang.git llvm/tools/clang
git clone --depth=1 https://github.com/llvm-mirror/lld.git llvm/tools/lld
git clone --depth=1 https://github.com/llvm-mirror/libcxx.git llvm/projects/libcxx
ln -s llvm/tools/clang clang
ln -s llvm/tools/lld lld
ln -s llvm/projects/libcxx libcxx
配置和構建clang:
mkdir clang-build
cd clang-build
cmake -GNinja
-DCMAKE_CXX_COMPILER=/usr/bin/clang++
-DCMAKE_C_COMPILER=/usr/bin/clang
-DCMAKE_BUILD_TYPE=MinSizeRel
-DCMAKE_INSTALL_PREFIX="/path/to/clang/install"
-DCMAKE_BUILD_WITH_INSTALL_RPATH="yes"
-DLLVM_TARGETS_TO_BUILD=X86
-DLLVM_ENABLE_PROJECTS="lld;clang"
../llvm
ninja install-clang
install-clang-headers
install-llvm-ar
install-lld
CPPCORO項目需要LIBC ++,因為它包含在Clang下使用C ++ Coroutines所需的<experimental/coroutine>標頭。
結帳libc++ llvm :
mkdir llvm
cd llvm
git clone --depth=1 https://github.com/llvm-mirror/llvm.git llvm
git clone --depth=1 https://github.com/llvm-mirror/libcxx.git llvm/projects/libcxx
ln -s llvm/projects/libcxx libcxx
構建libc++ :
mkdir libcxx-build
cd libcxx-build
cmake -GNinja
-DCMAKE_CXX_COMPILER="/path/to/clang/install/bin/clang++"
-DCMAKE_C_COMPILER="/path/to/clang/install/bin/clang"
-DCMAKE_BUILD_TYPE=Release
-DCMAKE_INSTALL_PREFIX="/path/to/clang/install"
-DLLVM_PATH="../llvm"
-DLIBCXX_CXX_ABI=libstdc++
-DLIBCXX_CXX_ABI_INCLUDE_PATHS="/usr/include/c++/6.3.0/;/usr/include/x86_64-linux-gnu/c++/6.3.0/"
../libcxx
ninja cxx
ninja install
這將構建並安裝LIBC ++到您安裝clang的同一安裝目錄中。
Microsoft團隊成員和社區貢獻者保持最新的VCPKG CPCORO港口。 VCPKG的URL為:https://github.com/microsoft/vcpkg。您可以使用VCPKG依賴項管理器下載並安裝CPPCORO:
git clone https://github.com/Microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh # ./bootstrap-vcpkg.bat for Windows
./vcpkg integrate install
./vcpkg install cppcoro如果該版本已過時,請在VCPKG存儲庫上創建問題或拉出請求。
GitHub問題是支持,錯誤報告和功能請求的主要機制。
歡迎捐款,並會愉快地審查拉裝。我只要求您同意根據MIT許可證做出的任何捐款許可。
如果您對C ++ Coroutines有一般性疑問,通常可以在Cpplang Slack組上的#coroutines頻道中找到某人。