「Cppcoro」ライブラリは、N4680で説明されているCoroutines TS提案を利用するための汎用プリミティブの大規模なセットを提供します。
これらには以下が含まれます:
task<T>shared_task<T>generator<T>recursive_generator<T>async_generator<T>single_consumer_eventsingle_consumer_async_auto_reset_eventasync_mutexasync_manual_reset_eventasync_auto_reset_eventasync_latchsequence_barriermulti_producer_sequencersingle_producer_sequencersync_wait()when_all()when_all_ready()fmap()schedule_on()resume_on()cancellation_tokencancellation_sourcecancellation_registrationstatic_thread_poolio_serviceおよびio_work_scopefile 、 readable_file 、 writable_fileread_only_file 、 write_only_file 、 read_write_filesocketip_address 、 ipv4_address 、 ipv6_addressip_endpoint 、 ipv4_endpoint 、 ipv6_endpointis_awaitable<T>awaitable_traits<T>Awaitable<T>Awaiter<T>SchedulerDelayedSchedulerこのライブラリは、C ++ Coroutinesの提案の上に構築できる高性能でスケーラブルな非同期プログラミングの抽象化の空間を探索している実験ライブラリです。
他の人がそれが有用であると感じることを期待してオープンソーリングされており、C ++コミュニティがそれについてフィードバックとそれを改善する方法を提供できることを期待しています。
Coroutines TSをサポートするコンパイラが必要です。
Linuxバージョンは、Linux用にまだ実装されていないio_contextおよびFile I/O関連クラスを除いて機能的です(詳細については、問題#15を参照)。
task<T>タスクは、コルーチンの実行が待機するまで開始されないという点で怠lazで実行される非同期計算を表します。
例:
# include < cppcoro/read_only_file.hpp >
# include < cppcoro/task.hpp >
cppcoro::task< int > count_lines (std::string path)
{
auto file = co_await cppcoro::read_only_file::open (path);
int lineCount = 0 ;
char buffer[ 1024 ];
size_t bytesRead;
std:: uint64_t offset = 0 ;
do
{
bytesRead = co_await file. read (offset, buffer, sizeof (buffer));
lineCount += std::count (buffer, buffer + bytesRead, ' n ' );
offset += bytesRead;
} while (bytesRead > 0 );
co_return lineCount;
}
cppcoro::task<> usage_example ()
{
// Calling function creates a new task but doesn't start
// executing the coroutine yet.
cppcoro::task< int > countTask = count_lines ( " foo.txt " );
// ...
// Coroutine is only started when we later co_await the task.
int lineCount = co_await countTask;
std::cout << " line count = " << lineCount << std::endl;
}APIの概要:
// <cppcoro/task.hpp>
namespace cppcoro
{
template < typename T>
class task
{
public:
using promise_type = <unspecified>;
using value_type = T;
task () noexcept ;
task (task&& other) noexcept ;
task& operator =(task&& other);
// task is a move-only type.
task ( const task& other) = delete ;
task& operator =( const task& other) = delete ;
// Query if the task result is ready.
bool is_ready () const noexcept ;
// Wait for the task to complete and return the result or rethrow the
// exception if the operation completed with an unhandled exception.
//
// If the task is not yet ready then the awaiting coroutine will be
// suspended until the task completes. If the the task is_ready() then
// this operation will return the result synchronously without suspending.
Awaiter<T&> operator co_await () const & noexcept ;
Awaiter<T&&> operator co_await () const && noexcept ;
// Returns an awaitable that can be co_await'ed to suspend the current
// coroutine until the task completes.
//
// The 'co_await t.when_ready()' expression differs from 'co_await t' in
// that when_ready() only performs synchronization, it does not return
// the result or rethrow the exception.
//
// This can be useful if you want to synchronize with the task without
// the possibility of it throwing an exception.
Awaitable< void > when_ready () const noexcept ;
};
template < typename T>
void swap (task<T>& a, task<T>& b);
// Creates a task that yields the result of co_await'ing the specified awaitable.
//
// This can be used as a form of type-erasure of the concrete awaitable, allowing
// different awaitables that return the same await-result type to be stored in
// the same task<RESULT> type.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t >
task<RESULT> make_task (AWAITABLE awaitable);
}タスクtask<T>を返すCoroutine関数を呼び出すことにより、 task<T>オブジェクトを作成できます。
Coroutineには、 co_awaitまたはco_returnの使用が含まれている必要があります。 task<T> Coroutineはco_yieldキーワードを使用できないことに注意してください。
task<T>を返すコルーチンが呼び出されると、必要に応じてコルーチンフレームが割り当てられ、パラメーターがコルーチンフレームにキャプチャされます。 CoroutineはCoroutineボディの開始時に停止され、実行が発信者に返され、非同期計算を表すtask<T>値が関数呼び出しから返されます。
Coroutineボディはtask<T>値がco_await edの場合に実行を開始します。これにより、待機中のコルーチンが一時停止され、待ち望まれているtask<T>値に関連するコルーチンの実行が開始されます。
待ち望まれているコルーチンは、待ち望まれているtask<T>のコルーチンの実行を完了するスレッドで後に再開されます。すなわち。 co_returnを実行するスレッド、またはCoroutineの実行を終了する未処理の例外をスローするスレッド。
タスクがすでに完了している場合、それを再び待つことで、待ち望んでいるコルーチンを一時停止することなく、すでに計算された結果が得られます。
taskオブジェクトが待機する前に破壊された場合、Coroutineは決して実行されず、Destructorはキャプチャされたパラメーターを破壊し、Coroutineフレームで使用されるメモリを解放します。
shared_task<T> shared_task<T>クラスは、単一の値を非同期に生成するCoroutineタイプです。
タスクの実行は、コルーチンによって待機されるまで開始されないという点で「怠zy」です。
タスク値をコピーできるという点で「共有」され、タスクの結果に複数の参照が作成される可能性があります。また、複数のコルーチンが結果を同時に待つことができます。
タスクは、最初のco_awaitタスクであるスレッドで実行を開始します。その後の待望は、タスクが完了したときに再開するために停止され、再開のために列に並ぶか、タスクが既に完了している場合に同期し続けます。
タスクが完了するのを待っている間に待望が中断された場合、タスクの実行を完了するスレッドで再開されます。すなわち。 co_returnを実行するスレッド、またはCoroutineの実行を終了する未処理の例外をスローするスレッド。
APIサマリー
namespace cppcoro
{
template < typename T = void >
class shared_task
{
public:
using promise_type = <unspecified>;
using value_type = T;
shared_task () noexcept ;
shared_task ( const shared_task& other) noexcept ;
shared_task (shared_task&& other) noexcept ;
shared_task& operator =( const shared_task& other) noexcept ;
shared_task& operator =(shared_task&& other) noexcept ;
void swap (shared_task& other) noexcept ;
// Query if the task has completed and the result is ready.
bool is_ready () const noexcept ;
// Returns an operation that when awaited will suspend the
// current coroutine until the task completes and the result
// is available.
//
// The type of the result of the 'co_await someTask' expression
// is an l-value reference to the task's result value (unless T
// is void in which case the expression has type 'void').
// If the task completed with an unhandled exception then the
// exception will be rethrown by the co_await expression.
Awaiter<T&> operator co_await () const noexcept ;
// Returns an operation that when awaited will suspend the
// calling coroutine until the task completes and the result
// is available.
//
// The result is not returned from the co_await expression.
// This can be used to synchronize with the task without the
// possibility of the co_await expression throwing an exception.
Awaiter< void > when_ready () const noexcept ;
};
template < typename T>
bool operator ==( const shared_task<T>& a, const shared_task<T>& b) noexcept ;
template < typename T>
bool operator !=( const shared_task<T>& a, const shared_task<T>& b) noexcept ;
template < typename T>
void swap (shared_task<T>& a, shared_task<T>& b) noexcept ;
// Wrap an awaitable value in a shared_task to allow multiple coroutines
// to concurrently await the result.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t >
shared_task<RESULT> make_shared_task (AWAITABLE awaitable);
} shared_task<T>のすべてのconst-methodsは、複数のスレッドから同じインスタンスで他のconstメソッドと同時に呼び出すことができます。 shared_task<T> shared_task<T>の同じインスタンスで、他の方法と同時に共有の非Constメソッドを同時に呼び出すことは安全ではありません。
task<T>との比較shared_task<T>クラスは、タスクが呼び出されてすぐに実行を開始しないという点でtask<T>に似ています。タスクは、最初に待機されたときにのみ実行を開始します。
結果のタスクオブジェクトをコピーできるため、 task<T>とは異なり、複数のタスクオブジェクトが同じ非同期結果を参照できるようにします。また、タスクの結果を同時に待っている複数のコルーチンもサポートしています。
トレードオフは、結果が常に結果へのL値の参照であり、結果をローカル変数に移動する能力を制限するR値参照(結果が共有される可能性があるため)であることです。また、参照カウントを維持し、複数の待望をサポートする必要があるため、わずかにランタイムコストが高くなります。
generator<T> generator 、 Tの値のシーケンスを生成するコルーチンタイプを表します。ここで、値はゆっくりと同期して生成されます。
Coroutineボディは、 co_yieldキーワードを使用してタイプTの値を生成できます。ただし、Coroutineボディはco_awaitキーワードを使用できないことに注意してください。値は同期して生成する必要があります。
例えば:
cppcoro::generator< const std:: uint64_t > fibonacci ()
{
std:: uint64_t a = 0 , b = 1 ;
while ( true )
{
co_yield b;
auto tmp = a;
a = b;
b += tmp;
}
}
void usage ()
{
for ( auto i : fibonacci ())
{
if (i > 1'000'000 ) break ;
std::cout << i << std::endl;
}
} generator<T>が呼ばれる場合、コルーチンは最初に吊り下げられます。 Coroutineの実行は、 generator<T>::begin()メソッドが呼び出され、最初のco_yieldステートメントに到達するか、Coroutineが完了するまで実行されるまで続行します。
返されたイテレータがend() iteratorに等しくない場合、イテレーターの逆も繰り返しもco_yieldステートメントに渡された値への参照を返します。
Iteratorでoperator++()を呼び出して、コルーチンの実行を再開し、次のco_yieldポイントに到達するか、Coroutineが完了()に実行されるまで続きます。
Coroutineによって投げられた未処理の例外はbegin()またはoperator++()呼び出しから発信者への呼び出しから伝播します。
API概要:
namespace cppcoro
{
template < typename T>
class generator
{
public:
using promise_type = <unspecified>;
class iterator
{
public:
using iterator_category = std::input_iterator_tag;
using value_type = std:: remove_reference_t <T>;
using reference = value_type&;
using pointer = value_type*;
using difference_type = std:: size_t ;
iterator ( const iterator& other) noexcept ;
iterator& operator =( const iterator& other) noexcept ;
// If the generator coroutine throws an unhandled exception before producing
// the next element then the exception will propagate out of this call.
iterator& operator ++();
reference operator *() const noexcept ;
pointer operator ->() const noexcept ;
bool operator ==( const iterator& other) const noexcept ;
bool operator !=( const iterator& other) const noexcept ;
};
// Constructs to the empty sequence.
generator () noexcept ;
generator (generator&& other) noexcept ;
generator& operator =(generator&& other) noexcept ;
generator ( const generator& other) = delete ;
generator& operator =( const generator&) = delete ;
~generator ();
// Starts executing the generator coroutine which runs until either a value is yielded
// or the coroutine runs to completion or an unhandled exception propagates out of the
// the coroutine.
iterator begin ();
iterator end () noexcept ;
// Swap the contents of two generators.
void swap (generator& other) noexcept ;
};
template < typename T>
void swap (generator<T>& a, generator<T>& b) noexcept ;
// Apply function, func, lazily to each element of the source generator
// and yield a sequence of the results of calls to func().
template < typename FUNC, typename T>
generator<std:: invoke_result_t <FUNC, T&>> fmap (FUNC func, generator<T> source);
}recursive_generator<T> recursive_generator 、外部シーケンスの要素としてネストされたシーケンスの要素をより効率的にサポートするように設計されていることを除いて、 generatorに似ています。
タイプTの値をco_yieldにできることに加えて、タイプのrecursive_generator<T>の値co_yieldこともできます。
co_yield recursive_generator<T> value ricursive_generator <t>生成されたジェネレーターのすべての要素は、現在のジェネレーターの要素として生成されます。現在のコルーチンは、消費者がネストされたジェネレーターのすべての要素の消費を終了するまで中断されます。その後、現在のコルーチンのポイント実行は、次の要素を生成するために実行を再開します。
再帰的なデータ構造を繰り返すためにrecursive_generator<T> over generator<T>の利点は、 iterator::operator++()が、各要素のref/suspend o(depth)coroutinesを再開/停止するのではなく、次の要素を直接再開することができることです。ダウンサイドは、追加のオーバーヘッドがあることです
例えば:
// Lists the immediate contents of a directory.
cppcoro::generator<dir_entry> list_directory (std::filesystem::path path);
cppcoro::recursive_generator<dir_entry> list_directory_recursive (std::filesystem::path path)
{
for ( auto & entry : list_directory (path))
{
co_yield entry;
if (entry. is_directory ())
{
co_yield list_directory_recursive (entry. path ());
}
}
} fmap()演算子をrecursive_generator<T>に適用すると、 recursive_generator<U>ではなく、 generator<U>タイプが生成されることに注意してください。これは、 fmapの使用が一般的に再帰的なコンテキストでは使用されておらず、 recursive_generatorによって発生した追加のオーバーヘッドを避けようとしているためです。
async_generator<T> async_generator 、値が怠lazで生成され、値が非同期に生成される可能性のあるTの値のシーケンスを生成するコルーチンタイプを表します。
Coroutineボディは、 co_awaitとco_yield両方の式を使用できます。
ジェネレーターの消費者は、 for co_await Rangeベースのループ用Aを使用して値を消費できます。
例
cppcoro::async_generator< int > ticker ( int count, threadpool& tp)
{
for ( int i = 0 ; i < count; ++i)
{
co_await tp. delay ( std::chrono::seconds ( 1 ));
co_yield i;
}
}
cppcoro::task<> consumer (threadpool& tp)
{
auto sequence = ticker ( 10 , tp);
for co_await (std:: uint32_t i : sequence)
{
std::cout << " Tick " << i << std::endl;
}
}APIサマリー
// <cppcoro/async_generator.hpp>
namespace cppcoro
{
template < typename T>
class async_generator
{
public:
class iterator
{
public:
using iterator_tag = std::forward_iterator_tag;
using difference_type = std:: size_t ;
using value_type = std:: remove_reference_t <T>;
using reference = value_type&;
using pointer = value_type*;
iterator ( const iterator& other) noexcept ;
iterator& operator =( const iterator& other) noexcept ;
// Resumes the generator coroutine if suspended
// Returns an operation object that must be awaited to wait
// for the increment operation to complete.
// If the coroutine runs to completion then the iterator
// will subsequently become equal to the end() iterator.
// If the coroutine completes with an unhandled exception then
// that exception will be rethrown from the co_await expression.
Awaitable<iterator&> operator ++() noexcept ;
// Dereference the iterator.
pointer operator ->() const noexcept ;
reference operator *() const noexcept ;
bool operator ==( const iterator& other) const noexcept ;
bool operator !=( const iterator& other) const noexcept ;
};
// Construct to the empty sequence.
async_generator () noexcept ;
async_generator ( const async_generator&) = delete ;
async_generator (async_generator&& other) noexcept ;
~async_generator ();
async_generator& operator =( const async_generator&) = delete ;
async_generator& operator =(async_generator&& other) noexcept ;
void swap (async_generator& other) noexcept ;
// Starts execution of the coroutine and returns an operation object
// that must be awaited to wait for the first value to become available.
// The result of co_await'ing the returned object is an iterator that
// can be used to advance to subsequent elements of the sequence.
//
// This method is not valid to be called once the coroutine has
// run to completion.
Awaitable<iterator> begin () noexcept ;
iterator end () noexcept ;
};
template < typename T>
void swap (async_generator<T>& a, async_generator<T>& b);
// Apply 'func' to each element of the source generator, yielding a sequence of
// the results of calling 'func' on the source elements.
template < typename FUNC, typename T>
async_generator<std:: invoke_result_t <FUNC, T&>> fmap (FUNC func, async_generator<T> source);
}async_generatorオブジェクトが破壊されると、基礎となるコルーチンのキャンセルを要求します。 Coroutineがすでに完了しているか、現在co_yield式で吊り下げられている場合、Coroutineはすぐに破壊されます。それ以外の場合、Coroutineは完了まで実行されるか、次のco_yield式に到達するまで実行を継続します。
コルーチンフレームが破壊されると、その時点でのスコープ内のすべての変数の破壊者が実行され、発電機のリソースがクリーンアップされるようにします。
発信者は、消費者Coroutineが次のアイテムが生成されるのを待っているco_await式を実行している間、 async_generatorオブジェクトを破壊しないようにする必要があることに注意してください。
single_consumer_eventこれは、一度にそれを待っている単一のコルーチンのみをサポートするシンプルなマニュアルリセットイベントタイプです。これは使用できます
API概要:
// <cppcoro/single_consumer_event.hpp>
namespace cppcoro
{
class single_consumer_event
{
public:
single_consumer_event ( bool initiallySet = false ) noexcept ;
bool is_set () const noexcept ;
void set ();
void reset () noexcept ;
Awaiter< void > operator co_await () const noexcept ;
};
}例:
# include < cppcoro/single_consumer_event.hpp >
cppcoro::single_consumer_event event;
std::string value;
cppcoro::task<> consumer ()
{
// Coroutine will suspend here until some thread calls event.set()
// eg. inside the producer() function below.
co_await event;
std::cout << value << std::endl;
}
void producer ()
{
value = " foo " ;
// This will resume the consumer() coroutine inside the call to set()
// if it is currently suspended.
event. set ();
}single_consumer_async_auto_reset_eventこのクラスはset()メソッドへの呼び出しによってイベントが通知されるまで単一のコルーチンが待機できるようにする非同期同期プリミティブを提供します。
イベントを待っているCoroutineが、以前またはその後のCALLのいずれかによってリリースされるとset()イベントは「セットされていない」状態に自動的にリセットされます。
このクラスは、 async_auto_reset_eventのより効率的なバージョンであり、一度にイベントを待っている場合に単一のコルーチンのみが待っている場合に使用できます。イベントでCoroutinesを待っている複数の同時のサポートをサポートする必要がある場合は、代わりにasync_auto_reset_eventクラスを使用してください。
API概要:
// <cppcoro/single_consumer_async_auto_reset_event.hpp>
namespace cppcoro
{
class single_consumer_async_auto_reset_event
{
public:
single_consumer_async_auto_reset_event (
bool initiallySet = false ) noexcept ;
// Change the event to the 'set' state. If a coroutine is awaiting the
// event then the event is immediately transitioned back to the 'not set'
// state and the coroutine is resumed.
void set () noexcept ;
// Returns an Awaitable type that can be awaited to wait until
// the event becomes 'set' via a call to the .set() method. If
// the event is already in the 'set' state then the coroutine
// continues without suspending.
// The event is automatically reset back to the 'not set' state
// before resuming the coroutine.
Awaiter< void > operator co_await () const noexcept ;
};
}使用例:
std::atomic< int > value;
cppcoro::single_consumer_async_auto_reset_event valueDecreasedEvent;
cppcoro::task<> wait_until_value_is_below ( int limit)
{
while (value. load (std::memory_order_relaxed) >= limit)
{
// Wait until there has been some change that we're interested in.
co_await valueDecreasedEvent;
}
}
void change_value ( int delta)
{
value. fetch_add (delta, std::memory_order_relaxed);
// Notify the waiter if there has been some change.
if (delta < 0 ) valueDecreasedEvent. set ();
}async_mutexCoroutine内からMutexを「Co_Await」して、Mutex Lockが取得されるまでコルーチンを吊り下げることができるシンプルな相互除外抽象化を提供します。
ミューテックスを待つコルーチンはスレッドをブロックしないが、代わりにコルーチンを一時停止し、その後、前のロックホルダーがロックをunlock()コール内で再開するという点で、実装がロックされていません。
API概要:
// <cppcoro/async_mutex.hpp>
namespace cppcoro
{
class async_mutex_lock ;
class async_mutex_lock_operation ;
class async_mutex_scoped_lock_operation ;
class async_mutex
{
public:
async_mutex () noexcept ;
~async_mutex ();
async_mutex ( const async_mutex&) = delete ;
async_mutex& operator ( const async_mutex&) = delete;
bool try_lock () noexcept ;
async_mutex_lock_operation lock_async () noexcept ;
async_mutex_scoped_lock_operation scoped_lock_async () noexcept ;
void unlock ();
};
class async_mutex_lock_operation
{
public:
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () const noexcept ;
};
class async_mutex_scoped_lock_operation
{
public:
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
[[nodiscard]] async_mutex_lock await_resume () const noexcept ;
};
class async_mutex_lock
{
public:
// Takes ownership of the lock.
async_mutex_lock (async_mutex& mutex, std:: adopt_lock_t ) noexcept ;
// Transfer ownership of the lock.
async_mutex_lock (async_mutex_lock&& other) noexcept ;
async_mutex_lock ( const async_mutex_lock&) = delete ;
async_mutex_lock& operator =( const async_mutex_lock&) = delete ;
// Releases the lock by calling unlock() on the mutex.
~async_mutex_lock ();
};
}使用例:
# include < cppcoro/async_mutex.hpp >
# include < cppcoro/task.hpp >
# include < set >
# include < string >
cppcoro::async_mutex mutex;
std::set<std::string> values;
cppcoro::task<> add_item (std::string value)
{
cppcoro::async_mutex_lock lock = co_await mutex. scoped_lock_async ();
values. insert ( std::move (value));
}async_manual_reset_eventマニュアルレスセットイベントは、Coroutine/Thread-Synchronization Primitiveであり、1つ以上のスレッドがset()呼び出すスレッドでシグナルになるまで1つ以上のスレッドが待機できるようにします。
イベントは2つの州のいずれかにあります。 「セット」と「セットなし」 。
イベントがイベントを待っているときにイベントが「セット」状態にある場合、コルーチンは一時停止せずに継続します。ただし、Coroutineが「セットされていない」状態にある場合、Coroutineは、一部のスレッドがその後set()メソッドを呼び出すまで中断されます。
イベントが「セット」になるのを待っている間に中断されたスレッドは、いくつかのスレッドによってset()の次の呼び出し内で再開されます。
イベントが再開されないため、イベントが破壊されたときに「セットされていない」イベントを待っていることを確認する必要があることに注意してください。
例:
cppcoro::async_manual_reset_event event;
std::string value;
void producer ()
{
value = get_some_string_value ();
// Publish a value by setting the event.
event. set ();
}
// Can be called many times to create many tasks.
// All consumer tasks will wait until value has been published.
cppcoro::task<> consumer ()
{
// Wait until value has been published by awaiting event.
co_await event;
consume_value (value);
}API概要:
namespace cppcoro
{
class async_manual_reset_event_operation ;
class async_manual_reset_event
{
public:
async_manual_reset_event ( bool initiallySet = false ) noexcept ;
~async_manual_reset_event ();
async_manual_reset_event ( const async_manual_reset_event&) = delete ;
async_manual_reset_event (async_manual_reset_event&&) = delete ;
async_manual_reset_event& operator =( const async_manual_reset_event&) = delete ;
async_manual_reset_event& operator =(async_manual_reset_event&&) = delete ;
// Wait until the event becomes set.
async_manual_reset_event_operation operator co_await () const noexcept ;
bool is_set () const noexcept ;
void set () noexcept ;
void reset () noexcept ;
};
class async_manual_reset_event_operation
{
public:
async_manual_reset_event_operation (async_manual_reset_event& event) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () const noexcept ;
};
}async_auto_reset_event自動リセットイベントは、Coroutine/Thread-Synchronization Primitiveであり、1つ以上のスレッドがset()を呼び出すことにより、スレッドでイベントが信号を送るまで待つことができます。
イベントを待っているコルーチンが、以前またはその後のset()への呼び出しによってリリースされると、イベントは自動的に「セットされていない」状態に戻ります。
API概要:
// <cppcoro/async_auto_reset_event.hpp>
namespace cppcoro
{
class async_auto_reset_event_operation ;
class async_auto_reset_event
{
public:
async_auto_reset_event ( bool initiallySet = false ) noexcept ;
~async_auto_reset_event ();
async_auto_reset_event ( const async_auto_reset_event&) = delete ;
async_auto_reset_event (async_auto_reset_event&&) = delete ;
async_auto_reset_event& operator =( const async_auto_reset_event&) = delete ;
async_auto_reset_event& operator =(async_auto_reset_event&&) = delete ;
// Wait for the event to enter the 'set' state.
//
// If the event is already 'set' then the event is set to the 'not set'
// state and the awaiting coroutine continues without suspending.
// Otherwise, the coroutine is suspended and later resumed when some
// thread calls 'set()'.
//
// Note that the coroutine may be resumed inside a call to 'set()'
// or inside another thread's call to 'operator co_await()'.
async_auto_reset_event_operation operator co_await () const noexcept ;
// Set the state of the event to 'set'.
//
// If there are pending coroutines awaiting the event then one
// pending coroutine is resumed and the state is immediately
// set back to the 'not set' state.
//
// This operation is a no-op if the event was already 'set'.
void set () noexcept ;
// Set the state of the event to 'not-set'.
//
// This is a no-op if the state was already 'not set'.
void reset () noexcept ;
};
class async_auto_reset_event_operation
{
public:
explicit async_auto_reset_event_operation (async_auto_reset_event& event) noexcept ;
async_auto_reset_event_operation ( const async_auto_reset_event_operation& other) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () const noexcept ;
};
}async_latch非同期ラッチは、カウンターがゼロに減少するまでコルタインが非同期に待機できるようにする同期原始です。
ラッチは単一使用オブジェクトです。カウンターがゼロになると、ラッチが「準備ができている」になり、ラッチが破壊されるまで準備が整います。
API概要:
// <cppcoro/async_latch.hpp>
namespace cppcoro
{
class async_latch
{
public:
// Initialise the latch with the specified count.
async_latch (std:: ptrdiff_t initialCount) noexcept ;
// Query if the count has reached zero yet.
bool is_ready () const noexcept ;
// Decrement the count by n.
// This will resume any waiting coroutines if the count reaches zero
// as a result of this call.
// It is undefined behaviour to decrement the count below zero.
void count_down (std:: ptrdiff_t n = 1 ) noexcept ;
// Wait until the latch becomes ready.
// If the latch count is not yet zero then the awaiting coroutine will
// be suspended and later resumed by a call to count_down() that decrements
// the count to zero. If the latch count was already zero then the coroutine
// continues without suspending.
Awaiter< void > operator co_await () const noexcept ;
};
}sequence_barrier sequence_barrierは、単一プロデューサーと複数の消費者が単調に増加するシーケンス数に関して調整できる同期プリミティブです。
単一のプロデューサーは、単調に増加する順序で新しいシーケンス番号を公開することにより、シーケンス番号を進めます。 1人以上の消費者は、最後に公開されたシーケンス番号を照会でき、特定のシーケンス番号が公開されるまで待つことができます。
シーケンスバリアを使用して、スレッドセーフプロデューサー/コンシューマーリングバッファにカーソルを表すことができます
詳細については、lmax durruptorパターンを参照してください:https://lmax-exchange.github.io/disuptor/files/disuptor-1.0.pdf
API概要:
namespace cppcoro
{
template < typename SEQUENCE = std:: size_t ,
typename TRAITS = sequence_traits<SEQUENCE>>
class sequence_barrier
{
public:
sequence_barrier (SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept ;
~sequence_barrier ();
SEQUENCE last_published () const noexcept ;
// Wait until the specified targetSequence number has been published.
//
// If the operation does not complete synchronously then the awaiting
// coroutine is resumed on the specified scheduler. Otherwise, the
// coroutine continues without suspending.
//
// The co_await expression resumes with the updated last_published()
// value, which is guaranteed to be at least 'targetSequence'.
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<SEQUENCE> wait_until_published (SEQUENCE targetSequence,
SCHEDULER& scheduler) const noexcept ;
void publish (SEQUENCE sequence) noexcept ;
};
}single_producer_sequencer single_producer_sequencerは、単一の生産者と1人以上の消費者のリングバッファーへのアクセスを調整するために使用できる同期プリミティブです。
プロデューサーは最初にリングバッファで1つ以上のスロットを取得し、それらのスロットに対応するリングバッファ要素に書き込み、最後にそれらのスロットに書かれた値を公開します。プロデューサーは、消費者が消費した場所に先立って、「緩衝」以上の要素を生産することはできません。
その後、消費者は特定の要素が公開されるのを待って、アイテムを処理し、 sequence_barrierオブジェクトでの消費が終了したシーケンス番号を公開することでアイテムの処理が完了したときにプロデューサーに通知します。
API概要:
// <cppcoro/single_producer_sequencer.hpp>
namespace cppcoro
{
template <
typename SEQUENCE = std:: size_t ,
typename TRAITS = sequence_traits<SEQUENCE>>
class single_producer_sequencer
{
public:
using size_type = typename sequence_range<SEQUENCE, TRAITS>::size_type;
single_producer_sequencer (
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
std:: size_t bufferSize,
SEQUENCE initialSequence = TRAITS::initial_sequence) noexcept ;
// Publisher API:
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<SEQUENCE> claim_one (SCHEDULER& scheduler) noexcept ;
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<sequence_range<SEQUENCE>> claim_up_to (
std:: size_t count,
SCHEDULER& scheduler) noexcept ;
void publish (SEQUENCE sequence) noexcept ;
// Consumer API:
SEQUENCE last_published () const noexcept ;
template < typename SCHEDULER>
[[nodiscard]]
Awaitable<SEQUENCE> wait_until_published (
SEQUENCE targetSequence,
SCHEDULER& scheduler) const noexcept ;
};
}使用例:
using namespace cppcoro ;
using namespace std ::chrono ;
struct message
{
int id;
steady_clock::time_point timestamp;
float data;
};
constexpr size_t bufferSize = 16384 ; // Must be power-of-two
constexpr size_t indexMask = bufferSize - 1 ;
message buffer[bufferSize];
task< void > producer (
io_service& ioSvc,
single_producer_sequencer< size_t >& sequencer)
{
auto start = steady_clock::now ();
for ( int i = 0 ; i < 1'000'000 ; ++i)
{
// Wait until a slot is free in the buffer.
size_t seq = co_await sequencer. claim_one (ioSvc);
// Populate the message.
auto & msg = buffer[seq & indexMask];
msg. id = i;
msg. timestamp = steady_clock::now ();
msg. data = 123 ;
// Publish the message.
sequencer. publish (seq);
}
// Publish a sentinel
auto seq = co_await sequencer. claim_one (ioSvc);
auto & msg = buffer[seq & indexMask];
msg. id = - 1 ;
sequencer. publish (seq);
}
task< void > consumer (
static_thread_pool& threadPool,
const single_producer_sequencer< size_t >& sequencer,
sequence_barrier< size_t >& consumerBarrier)
{
size_t nextToRead = 0 ;
while ( true )
{
// Wait until the next message is available
// There may be more than one available.
const size_t available = co_await sequencer. wait_until_published (nextToRead, threadPool);
do {
auto & msg = buffer[nextToRead & indexMask];
if (msg. id == - 1 )
{
consumerBarrier. publish (nextToRead);
co_return ;
}
processMessage (msg);
} while (nextToRead++ != available);
// Notify the producer that we've finished processing
// up to 'nextToRead - 1'.
consumerBarrier. publish (available);
}
}
task< void > example (io_service& ioSvc, static_thread_pool& threadPool)
{
sequence_barrier< size_t > barrier;
single_producer_sequencer< size_t > sequencer{barrier, bufferSize};
co_await when_all (
producer (tp, sequencer),
consumer (tp, sequencer, barrier));
}multi_producer_sequencer multi_producer_sequencerクラスは、複数の生産者と1人以上の消費者のリングバッファーへのアクセスを調整する同期プリミティブです。
単一プロデューサーのバリアントについては、 single_producer_sequencerクラスを参照してください。
リングバッファーには2つのパワーであるサイズが必要であることに注意してください。これは、実装が整数分割/Moduloの代わりにビットマスクを使用してバッファーにオフセットを計算するためです。また、これにより、シーケンス番号が32ビット/64ビット値を安全にラップすることができます。
API概要:
// <cppcoro/multi_producer_sequencer.hpp>
namespace cppcoro
{
template < typename SEQUENCE = std:: size_t ,
typename TRAITS = sequence_traits<SEQUENCE>>
class multi_producer_sequencer
{
public:
multi_producer_sequencer (
const sequence_barrier<SEQUENCE, TRAITS>& consumerBarrier,
SEQUENCE initialSequence = TRAITS::initial_sequence);
std:: size_t buffer_size () const noexcept ;
// Consumer interface
//
// Each consumer keeps track of their own 'lastKnownPublished' value
// and must pass this to the methods that query for an updated last-known
// published sequence number.
SEQUENCE last_published_after (SEQUENCE lastKnownPublished) const noexcept ;
template < typename SCHEDULER>
Awaitable<SEQUENCE> wait_until_published (
SEQUENCE targetSequence,
SEQUENCE lastKnownPublished,
SCHEDULER& scheduler) const noexcept ;
// Producer interface
// Query whether any slots available for claiming (approx.)
bool any_available () const noexcept ;
template < typename SCHEDULER>
Awaitable<SEQUENCE> claim_one (SCHEDULER& scheduler) noexcept ;
template < typename SCHEDULER>
Awaitable<sequence_range<SEQUENCE, TRAITS>> claim_up_to (
std:: size_t count,
SCHEDULER& scheduler) noexcept ;
// Mark the specified sequence number as published.
void publish (SEQUENCE sequence) noexcept ;
// Mark all sequence numbers in the specified range as published.
void publish ( const sequence_range<SEQUENCE, TRAITS>& range) noexcept ;
};
}cancellation_token 、発信者がその機能の操作をキャンセルするリクエストをその後通信できるようにする関数に渡すことができる値です。
キャンセルできるcancellation_tokenを取得するには、最初にcancellation_sourceオブジェクトを作成する必要があります。 cancellation_source::token()メソッドを使用して、そのcancellation_sourceオブジェクトにリンクされている新しいcancellation_token値を製造できます。
後で操作のキャンセルを要求したい場合は、 cancellation_tokenに合格した場合は、関連するcancellation_sourceオブジェクトでcancellation_source::request_cancellation()を呼び出すことができます。
関数は、2つの方法のいずれかでキャンセルのリクエストに応答できます。
cancellation_token::is_cancellation_requested()またはcancellation_token::throw_if_cancellation_requested()いずれかを呼び出すことにより、定期的にキャンセルの世論調査cancellation_registrationクラスを使用してキャンセルが要求されたときに実行されるコールバックを登録します。API概要:
namespace cppcoro
{
class cancellation_source
{
public:
// Construct a new, independently cancellable cancellation source.
cancellation_source ();
// Construct a new reference to the same cancellation state.
cancellation_source ( const cancellation_source& other) noexcept ;
cancellation_source (cancellation_source&& other) noexcept ;
~cancellation_source ();
cancellation_source& operator =( const cancellation_source& other) noexcept ;
cancellation_source& operator =(cancellation_source&& other) noexcept ;
bool is_cancellation_requested () const noexcept ;
bool can_be_cancelled () const noexcept ;
void request_cancellation ();
cancellation_token token () const noexcept ;
};
class cancellation_token
{
public:
// Construct a token that can't be cancelled.
cancellation_token () noexcept ;
cancellation_token ( const cancellation_token& other) noexcept ;
cancellation_token (cancellation_token&& other) noexcept ;
~cancellation_token ();
cancellation_token& operator =( const cancellation_token& other) noexcept ;
cancellation_token& operator =(cancellation_token&& other) noexcept ;
bool is_cancellation_requested () const noexcept ;
void throw_if_cancellation_requested () const ;
// Query if this token can ever have cancellation requested.
// Code can use this to take a more efficient code-path in cases
// that the operation does not need to handle cancellation.
bool can_be_cancelled () const noexcept ;
};
// RAII class for registering a callback to be executed if cancellation
// is requested on a particular cancellation token.
class cancellation_registration
{
public:
// Register a callback to be executed if cancellation is requested.
// Callback will be called with no arguments on the thread that calls
// request_cancellation() if cancellation is not yet requested, or
// called immediately if cancellation has already been requested.
// Callback must not throw an unhandled exception when called.
template < typename CALLBACK>
cancellation_registration (cancellation_token token, CALLBACK&& callback);
cancellation_registration ( const cancellation_registration& other) = delete ;
~cancellation_registration ();
};
class operation_cancelled : public std :: exception
{
public:
operation_cancelled ();
const char * what () const override ;
};
}例:ポーリングアプローチ
cppcoro::task<> do_something_async (cppcoro::cancellation_token token)
{
// Explicitly define cancellation points within the function
// by calling throw_if_cancellation_requested().
token. throw_if_cancellation_requested ();
co_await do_step_1 ();
token. throw_if_cancellation_requested ();
do_step_2 ();
// Alternatively, you can query if cancellation has been
// requested to allow yourself to do some cleanup before
// returning.
if (token. is_cancellation_requested ())
{
display_message_to_user ( " Cancelling operation... " );
do_cleanup ();
throw cppcoro::operation_cancelled{};
}
do_final_step ();
}例:コールバックアプローチ
// Say we already have a timer abstraction that supports being
// cancelled but it doesn't support cancellation_tokens natively.
// You can use a cancellation_registration to register a callback
// that calls the existing cancellation API. e.g.
cppcoro::task<> cancellable_timer_wait (cppcoro::cancellation_token token)
{
auto timer = create_timer (10s);
cppcoro::cancellation_registration registration (token, [&]
{
// Call existing timer cancellation API.
timer. cancel ();
});
co_await timer;
}static_thread_pool static_thread_poolクラスは、固定サイズのスレッドプールで作業をスケジュールできる抽象化を提供します。
このクラスは、スケジューラの概念を実装しています(以下を参照)。
co_await threadPool.schedule()を実行することにより、スレッドプールに作業をenceueできます。この操作は、現在のコルーチンを停止し、スレッドプールで実行するためにそれをenqueueし、スレッドプールがスレッドプールのスレッドが次にコルーチンを自由に実行できる場合にコルーチンを再開します。この操作は、投げることが保証されており、一般的な場合はメモリを割り当てません。
このクラスでは、ワークスチールアルゴリズムを使用して、複数のスレッドにわたってバランスを負います。スレッドプールのスレッドからスレッドプールに囲まれた作業は、LIFOキューの同じスレッドで実行される予定です。リモートスレッドからスレッドプールに囲まれた作業は、グローバルなFIFOキューに囲まれます。労働者のスレッドが地元のキューから仕事がなくなったとき、最初にグローバルキューから作業をdequeueしようとします。そのキューが空の場合、次に他のワーカースレッドのキューの背面から作業を盗もうとします。
API概要:
namespace cppcoro
{
class static_thread_pool
{
public:
// Initialise the thread-pool with a number of threads equal to
// std::thread::hardware_concurrency().
static_thread_pool ();
// Initialise the thread pool with the specified number of threads.
explicit static_thread_pool (std:: uint32_t threadCount);
std:: uint32_t thread_count () const noexcept ;
class schedule_operation
{
public:
schedule_operation (static_thread_pool* tp) noexcept ;
bool await_ready () noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> h) noexcept ;
bool await_resume () noexcept ;
private:
// unspecified
};
// Return an operation that can be awaited by a coroutine.
//
//
[[nodiscard]]
schedule_operation schedule () noexcept ;
private:
// Unspecified
};
}使用例:シンプル
cppcoro::task<std::string> do_something_on_threadpool (cppcoro::static_thread_pool& tp)
{
// First schedule the coroutine onto the threadpool.
co_await tp. schedule ();
// When it resumes, this coroutine is now running on the threadpool.
do_something ();
}使用法の例:並行して物事を行う - static_thread_poolを使用してschedule_on()オペレーターを使用します。
cppcoro::task< double > dot_product (static_thread_pool& tp, double a[], double b[], size_t count)
{
if (count > 1000 )
{
// Subdivide the work recursively into two equal tasks
// The first half is scheduled to the thread pool so it can run concurrently
// with the second half which continues on this thread.
size_t halfCount = count / 2 ;
auto [first, second] = co_await when_all (
schedule_on (tp, dot_product (tp, a, b, halfCount),
dot_product (tp, a + halfCount, b + halfCount, count - halfCount));
co_return first + second;
}
else
{
double sum = 0.0 ;
for ( size_t i = 0 ; i < count; ++i)
{
sum += a[i] * b[i];
}
co_return sum;
}
}io_serviceおよびio_work_scope io_serviceクラスは、非同期I/O操作からI/O完了イベントを処理するための抽象化を提供します。
非同期I/O操作が完了すると、その操作がイベント処理方法のいずれかへの呼び出し内のI/Oスレッドで再開されるのを待っていたCoroutine: process_events() 、 process_pending_events() 、 process_one_event()またはprocess_one_pending_event() 。
io_serviceクラスは、I/Oスレッドを管理しません。一部のスレッドは、I/Oの完了イベントが発送されるのを待っているコルーチンのイベント加工方法の1つを呼び出すことを確認する必要があります。これは、 process_events()またはprocess_one_pending_event()を介して新しいイベントを定期的にポーリングすることにより、 process_pending_events()を呼び出すか、他のイベントループ(UIイベントループなど)と混合する専用のスレッドです。
これにより、 io_serviceイベントループと、ユーザーインターフェイスイベントループなどの他のイベントループと統合できます。
複数のスレッドにprocess_events()を呼び出すことにより、複数のスレッドでイベントのマルチプレックス処理ができます。オプションのio_serviceコンストラクターパラメーターを介してイベントをアクティブに処理するように、スレッドの最大数に関するヒントを指定できます。
Windowsでは、実装がWindows I/O完了ポート機能を使用して、イベントをスケーラブルな方法でI/Oスレッドに派遣します。
API概要:
namespace cppcoro
{
class io_service
{
public:
class schedule_operation ;
class timed_schedule_operation ;
io_service ();
io_service (std:: uint32_t concurrencyHint);
io_service (io_service&&) = delete ;
io_service ( const io_service&) = delete ;
io_service& operator =(io_service&&) = delete ;
io_service& operator =( const io_service&) = delete ;
~io_service ();
// Scheduler methods
[[nodiscard]]
schedule_operation schedule () noexcept ;
template < typename REP, typename RATIO>
[[nodiscard]]
timed_schedule_operation schedule_after (
std::chrono::duration<REP, RATIO> delay,
cppcoro::cancellation_token cancellationToken = {}) noexcept ;
// Event-loop methods
//
// I/O threads must call these to process I/O events and execute
// scheduled coroutines.
std:: uint64_t process_events ();
std:: uint64_t process_pending_events ();
std:: uint64_t process_one_event ();
std:: uint64_t process_one_pending_event ();
// Request that all threads processing events exit their event loops.
void stop () noexcept ;
// Query if some thread has called stop()
bool is_stop_requested () const noexcept ;
// Reset the event-loop after a call to stop() so that threads can
// start processing events again.
void reset ();
// Reference-counting methods for tracking outstanding references
// to the io_service.
//
// The io_service::stop() method will be called when the last work
// reference is decremented.
//
// Use the io_work_scope RAII class to manage calling these methods on
// entry-to and exit-from a scope.
void notify_work_started () noexcept ;
void notify_work_finished () noexcept ;
};
class io_service ::schedule_operation
{
public:
schedule_operation ( const schedule_operation&) noexcept ;
schedule_operation& operator =( const schedule_operation&) noexcept ;
bool await_ready () const noexcept ;
void await_suspend (std::experimental::coroutine_handle<> awaiter) noexcept ;
void await_resume () noexcept ;
};
class io_service ::timed_schedule_operation
{
public:
timed_schedule_operation (timed_schedule_operation&&) noexcept ;
timed_schedule_operation ( const timed_schedule_operation&) = delete ;
timed_schedule_operation& operator =( const timed_schedule_operation&) = delete ;
timed_schedule_operation& operator =(timed_schedule_operation&&) = delete ;
bool await_ready () const noexcept ;
void await_suspend (std::experimental::coroutine_handle<> awaiter);
void await_resume ();
};
class io_work_scope
{
public:
io_work_scope (io_service& ioService) noexcept ;
io_work_scope ( const io_work_scope& other) noexcept ;
io_work_scope (io_work_scope&& other) noexcept ;
~io_work_scope ();
io_work_scope& operator =( const io_work_scope& other) noexcept ;
io_work_scope& operator =(io_work_scope&& other) noexcept ;
io_service& service () const noexcept ;
};
}例:
# include < cppcoro/task.hpp >
# include < cppcoro/task.hpp >
# include < cppcoro/io_service.hpp >
# include < cppcoro/read_only_file.hpp >
# include < experimental/filesystem >
# include < memory >
# include < algorithm >
# include < iostream >
namespace fs = std::experimental::filesystem;
cppcoro::task<std:: uint64_t > count_lines (cppcoro::io_service& ioService, fs::path path)
{
auto file = cppcoro::read_only_file::open (ioService, path);
constexpr size_t bufferSize = 4096 ;
auto buffer = std::make_unique<std:: uint8_t []>(bufferSize);
std:: uint64_t newlineCount = 0 ;
for (std:: uint64_t offset = 0 , fileSize = file. size (); offset < fileSize;)
{
const auto bytesToRead = static_cast < size_t >(
std::min<std:: uint64_t >(bufferSize, fileSize - offset));
const auto bytesRead = co_await file. read (offset, buffer. get (), bytesToRead);
newlineCount += std::count (buffer. get (), buffer. get () + bytesRead, ' n ' );
offset += bytesRead;
}
co_return newlineCount;
}
cppcoro::task<> run (cppcoro::io_service& ioService)
{
cppcoro::io_work_scope ioScope (ioService);
auto lineCount = co_await count_lines (ioService, fs::path{ " foo.txt " });
std::cout << " foo.txt has " << lineCount << " lines. " << std::endl;;
}
cppcoro::task<> process_events (cppcoro::io_service& ioService)
{
// Process events until the io_service is stopped.
// ie. when the last io_work_scope goes out of scope.
ioService. process_events ();
co_return ;
}
int main ()
{
cppcoro::io_service ioService;
cppcoro::sync_wait ( cppcoro::when_all_ready (
run (ioService),
process_events (ioService)));
return 0 ;
}io_service io_serviceクラスは、 SchedulerとDelayedScheduler概念のインターフェイスを実装します。
これにより、Coroutineは現在のスレッドで実行を一時停止し、特定のio_serviceオブジェクトに関連付けられたI/Oスレッドの再開をスケジュールすることができます。
例:
cppcoro::task<> do_something (cppcoro::io_service& ioService)
{
// Coroutine starts execution on the thread of the task awaiter.
// A coroutine can transfer execution to an I/O thread by awaiting the
// result of io_service::schedule().
co_await ioService. schedule ();
// At this point, the coroutine is now executing on an I/O thread
// inside a call to one of the io_service event processing methods.
// A coroutine can also perform a delayed-schedule that will suspend
// the coroutine for a specified duration of time before scheduling
// it for resumption on an I/O thread.
co_await ioService. schedule_after (100ms);
// At this point, the coroutine is executing on a potentially different I/O thread.
}file 、 readable_file 、 writable_fileこれらのタイプは、コンクリートファイルI/Oを実行するための抽象的なベースクラスです。
API概要:
namespace cppcoro
{
class file_read_operation ;
class file_write_operation ;
class file
{
public:
virtual ~file ();
std:: uint64_t size () const ;
protected:
file (file&& other) noexcept ;
};
class readable_file : public virtual file
{
public:
[[nodiscard]]
file_read_operation read (
std:: uint64_t offset,
void * buffer,
std:: size_t byteCount,
cancellation_token ct = {}) const noexcept ;
};
class writable_file : public virtual file
{
public:
void set_size (std:: uint64_t fileSize);
[[nodiscard]]
file_write_operation write (
std:: uint64_t offset,
const void * buffer,
std:: size_t byteCount,
cancellation_token ct = {}) noexcept ;
};
class file_read_operation
{
public:
file_read_operation (file_read_operation&& other) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter);
std:: size_t await_resume ();
};
class file_write_operation
{
public:
file_write_operation (file_write_operation&& other) noexcept ;
bool await_ready () const noexcept ;
bool await_suspend (std::experimental::coroutine_handle<> awaiter);
std:: size_t await_resume ();
};
}read_only_file 、 write_only_file 、 read_write_fileこれらのタイプは、コンクリートファイルI/Oクラスを表しています。
API概要:
namespace cppcoro
{
class read_only_file : public readable_file
{
public:
[[nodiscard]]
static read_only_file open (
io_service& ioService,
const std::experimental::filesystem::path& path,
file_share_mode shareMode = file_share_mode::read,
file_buffering_mode bufferingMode = file_buffering_mode::default_);
};
class write_only_file : public writable_file
{
public:
[[nodiscard]]
static write_only_file open (
io_service& ioService,
const std::experimental::filesystem::path& path,
file_open_mode openMode = file_open_mode::create_or_open,
file_share_mode shareMode = file_share_mode::none,
file_buffering_mode bufferingMode = file_buffering_mode::default_);
};
class read_write_file : public readable_file , public writable_file
{
public:
[[nodiscard]]
static read_write_file open (
io_service& ioService,
const std::experimental::filesystem::path& path,
file_open_mode openMode = file_open_mode::create_or_open,
file_share_mode shareMode = file_share_mode::none,
file_buffering_mode bufferingMode = file_buffering_mode::default_);
};
}すべてのopen()関数がstd::system_error障害時にスローします。
注:ネットワーキングの抽象化は現在、Windowsプラットフォームでのみサポートされています。 Linuxのサポートはまもなく登場します。
socketソケットクラスを使用して、ネットワーク上のデータを非同期に送信/受信できます。
現在、TCP/IP、UDP/IPオーバーIPv4およびIPv6のみをサポートしています。
API概要:
// <cppcoro/net/socket.hpp>
namespace cppcoro ::net
{
class socket
{
public:
static socket create_tcpv4 (ip_service& ioSvc);
static socket create_tcpv6 (ip_service& ioSvc);
static socket create_updv4 (ip_service& ioSvc);
static socket create_udpv6 (ip_service& ioSvc);
socket (socket&& other) noexcept ;
~socket ();
socket& operator =(socket&& other) noexcept ;
// Return the native socket handle for the socket
<platform-specific> native_handle () noexcept ;
const ip_endpoint& local_endpoint () const noexcept ;
const ip_endpoint& remote_endpoint () const noexcept ;
void bind ( const ip_endpoint& localEndPoint);
void listen ();
[[nodiscard]]
Awaitable< void > connect ( const ip_endpoint& remoteEndPoint) noexcept ;
[[nodiscard]]
Awaitable< void > connect ( const ip_endpoint& remoteEndPoint,
cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable< void > accept (socket& acceptingSocket) noexcept ;
[[nodiscard]]
Awaitable< void > accept (socket& acceptingSocket,
cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable< void > disconnect () noexcept ;
[[nodiscard]]
Awaitable< void > disconnect (cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > send ( const void * buffer, std:: size_t size) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > send ( const void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > recv ( void * buffer, std:: size_t size) noexcept ;
[[nodiscard]]
Awaitable<std:: size_t > recv ( void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
[[nodiscard]]
socket_recv_from_operation recv_from (
void * buffer,
std:: size_t size) noexcept ;
[[nodiscard]]
socket_recv_from_operation_cancellable recv_from (
void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
[[nodiscard]]
socket_send_to_operation send_to (
const ip_endpoint& destination,
const void * buffer,
std:: size_t size) noexcept ;
[[nodiscard]]
socket_send_to_operation_cancellable send_to (
const ip_endpoint& destination,
const void * buffer,
std:: size_t size,
cancellation_token ct) noexcept ;
void close_send ();
void close_recv ();
};
}例:エコーサーバー
# include < cppcoro/net/socket.hpp >
# include < cppcoro/io_service.hpp >
# include < cppcoro/cancellation_source.hpp >
# include < cppcoro/async_scope.hpp >
# include < cppcoro/on_scope_exit.hpp >
# include < memory >
# include < iostream >
cppcoro::task< void > handle_connection (socket s)
{
try
{
const size_t bufferSize = 16384 ;
auto buffer = std::make_unique< unsigned char []>(bufferSize);
size_t bytesRead;
do {
// Read some bytes
bytesRead = co_await s. recv (buffer. get (), bufferSize);
// Write some bytes
size_t bytesWritten = 0 ;
while (bytesWritten < bytesRead) {
bytesWritten += co_await s. send (
buffer. get () + bytesWritten,
bytesRead - bytesWritten);
}
} while (bytesRead != 0 );
s. close_send ();
co_await s. disconnect ();
}
catch (...)
{
std::cout << " connection failed " << std::
}
}
cppcoro::task< void > echo_server (
cppcoro::net::ipv4_endpoint endpoint,
cppcoro::io_service& ioSvc,
cancellation_token ct)
{
cppcoro::async_scope scope;
std::exception_ptr ex;
try
{
auto listeningSocket = cppcoro::net::socket::create_tcpv4 (ioSvc);
listeningSocket. bind (endpoint);
listeningSocket. listen ();
while ( true ) {
auto connection = cppcoro::net::socket::create_tcpv4 (ioSvc);
co_await listeningSocket. accept (connection, ct);
scope. spawn ( handle_connection ( std::move (connection)));
}
}
catch (cppcoro::operation_cancelled)
{
}
catch (...)
{
ex = std::current_exception ();
}
// Wait until all handle_connection tasks have finished.
co_await scope. join ();
if (ex) std::rethrow_exception (ex);
}
int main ( int argc, const char * argv[])
{
cppcoro::io_service ioSvc;
if (argc != 2 ) return - 1 ;
auto endpoint = cppcoro::ipv4_endpoint::from_string (argv[ 1 ]);
if (!endpoint) return - 1 ;
( void ) cppcoro::sync_wait ( cppcoro::when_all (
[&]() -> task<>
{
// Shutdown the event loop once finished.
auto stopOnExit = cppcoro::on_scope_exit ([&] { ioSvc. stop (); });
cppcoro::cancellation_source canceller;
co_await cppcoro::when_all (
[&]() -> task<>
{
// Run for 30s then stop accepting new connections.
co_await ioSvc. schedule_after ( std::chrono::seconds ( 30 ));
canceller. request_cancellation ();
}(),
echo_server (*endpoint, ioSvc, canceller. token ()));
}(),
[&]() -> task<>
{
ioSvc. process_events ();
}()));
return 0 ;
}ip_address 、 ipv4_address 、 ipv6_addressIPアドレスを表すためのヘルパークラス。
API概要:
namespace cppcoro ::net
{
class ipv4_address
{
using bytes_t = std:: uint8_t [ 4 ];
public:
constexpr ipv4_address ();
explicit constexpr ipv4_address (std:: uint32_t integer);
explicit constexpr ipv4_address ( const std::uint8_t (&bytes)[4]);
explicit constexpr ipv4_address (std:: uint8_t b0,
std:: uint8_t b1,
std:: uint8_t b2,
std:: uint8_t b3);
constexpr const bytes_t & bytes () const ;
constexpr std:: uint32_t to_integer () const ;
static constexpr ipv4_address loopback ();
constexpr bool is_loopback () const ;
constexpr bool is_private_network () const ;
constexpr bool operator ==(ipv4_address other) const ;
constexpr bool operator !=(ipv4_address other) const ;
constexpr bool operator <(ipv4_address other) const ;
constexpr bool operator >(ipv4_address other) const ;
constexpr bool operator <=(ipv4_address other) const ;
constexpr bool operator >=(ipv4_address other) const ;
std::string to_string ();
static std::optional<ipv4_address> from_string (std::string_view string) noexcept ;
};
class ipv6_address
{
using bytes_t = std:: uint8_t [ 16 ];
public:
constexpr ipv6_address ();
explicit constexpr ipv6_address (
std:: uint64_t subnetPrefix,
std:: uint64_t interfaceIdentifier);
constexpr ipv6_address (
std:: uint16_t part0,
std:: uint16_t part1,
std:: uint16_t part2,
std:: uint16_t part3,
std:: uint16_t part4,
std:: uint16_t part5,
std:: uint16_t part6,
std:: uint16_t part7);
explicit constexpr ipv6_address (
const std::uint16_t (&parts)[8]);
explicit constexpr ipv6_address (
const std::uint8_t (bytes)[16]);
constexpr const bytes_t & bytes () const ;
constexpr std:: uint64_t subnet_prefix () const ;
constexpr std:: uint64_t interface_identifier () const ;
static constexpr ipv6_address unspecified ();
static constexpr ipv6_address loopback ();
static std::optional<ipv6_address> from_string (std::string_view string) noexcept ;
std::string to_string () const ;
constexpr bool operator ==( const ipv6_address& other) const ;
constexpr bool operator !=( const ipv6_address& other) const ;
constexpr bool operator <( const ipv6_address& other) const ;
constexpr bool operator >( const ipv6_address& other) const ;
constexpr bool operator <=( const ipv6_address& other) const ;
constexpr bool operator >=( const ipv6_address& other) const ;
};
class ip_address
{
public:
// Constructs to IPv4 address 0.0.0.0
ip_address () noexcept ;
ip_address (ipv4_address address) noexcept ;
ip_address (ipv6_address address) noexcept ;
bool is_ipv4 () const noexcept ;
bool is_ipv6 () const noexcept ;
const ipv4_address& to_ipv4 () const ;
const ipv6_address& to_ipv6 () const ;
const std:: uint8_t * bytes () const noexcept ;
std::string to_string () const ;
static std::optional<ip_address> from_string (std::string_view string) noexcept ;
bool operator ==( const ip_address& rhs) const noexcept ;
bool operator !=( const ip_address& rhs) const noexcept ;
// ipv4_address sorts less than ipv6_address
bool operator <( const ip_address& rhs) const noexcept ;
bool operator >( const ip_address& rhs) const noexcept ;
bool operator <=( const ip_address& rhs) const noexcept ;
bool operator >=( const ip_address& rhs) const noexcept ;
};
}ip_endpoint 、 ipv4_endpoint ipv6_endpointIPアドレスとポート番号を表すためのヘルパークラス。
API概要:
namespace cppcoro ::net
{
class ipv4_endpoint
{
public:
ipv4_endpoint () noexcept ;
explicit ipv4_endpoint (ipv4_address address, std:: uint16_t port = 0 ) noexcept ;
const ipv4_address& address () const noexcept ;
std:: uint16_t port () const noexcept ;
std::string to_string () const ;
static std::optional<ipv4_endpoint> from_string (std::string_view string) noexcept ;
};
bool operator ==( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator !=( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator <( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator >( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator <=( const ipv4_endpoint& a, const ipv4_endpoint& b);
bool operator >=( const ipv4_endpoint& a, const ipv4_endpoint& b);
class ipv6_endpoint
{
public:
ipv6_endpoint () noexcept ;
explicit ipv6_endpoint (ipv6_address address, std:: uint16_t port = 0 ) noexcept ;
const ipv6_address& address () const noexcept ;
std:: uint16_t port () const noexcept ;
std::string to_string () const ;
static std::optional<ipv6_endpoint> from_string (std::string_view string) noexcept ;
};
bool operator ==( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator !=( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator <( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator >( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator <=( const ipv6_endpoint& a, const ipv6_endpoint& b);
bool operator >=( const ipv6_endpoint& a, const ipv6_endpoint& b);
class ip_endpoint
{
public:
// Constructs to IPv4 end-point 0.0.0.0:0
ip_endpoint () noexcept ;
ip_endpoint (ipv4_endpoint endpoint) noexcept ;
ip_endpoint (ipv6_endpoint endpoint) noexcept ;
bool is_ipv4 () const noexcept ;
bool is_ipv6 () const noexcept ;
const ipv4_endpoint& to_ipv4 () const ;
const ipv6_endpoint& to_ipv6 () const ;
ip_address address () const noexcept ;
std:: uint16_t port () const noexcept ;
std::string to_string () const ;
static std::optional<ip_endpoint> from_string (std::string_view string) noexcept ;
bool operator ==( const ip_endpoint& rhs) const noexcept ;
bool operator !=( const ip_endpoint& rhs) const noexcept ;
// ipv4_endpoint sorts less than ipv6_endpoint
bool operator <( const ip_endpoint& rhs) const noexcept ;
bool operator >( const ip_endpoint& rhs) const noexcept ;
bool operator <=( const ip_endpoint& rhs) const noexcept ;
bool operator >=( const ip_endpoint& rhs) const noexcept ;
};
}sync_wait() sync_wait()関数を使用して、指定されたawaitableが完了するまで同期的に待機できます。
指定されたAwaitableは、新しく作成されたCoroutine内の現在のスレッドでco_awaitになります。
sync_wait()コールは、操作が完了するまでブロックされ、 co_await式の結果が返されるか、 co_await式が未処理の例外で完了した場合に例外をrethrowします。
sync_wait()関数はmain()内からトップレベルのタスクを開始し、タスクが終了するまで待機するのに主に役立ちます。実際には、最初の/トップレベルtaskを開始する唯一の方法です。
API概要:
// <cppcoro/sync_wait.hpp>
namespace cppcoro
{
template < typename AWAITABLE>
auto sync_wait (AWAITABLE&& awaitable)
-> typename awaitable_traits<AWAITABLE&&>::await_result_t;
}例:
void example_task ()
{
auto makeTask = []() -> task<std::string>
{
co_return " foo " ;
};
auto task = makeTask ();
// start the lazy task and wait until it completes
sync_wait (task); // -> "foo"
sync_wait ( makeTask ()); // -> "foo"
}
void example_shared_task ()
{
auto makeTask = []() -> shared_task<std::string>
{
co_return " foo " ;
};
auto task = makeTask ();
// start the shared task and wait until it completes
sync_wait (task) == " foo " ;
sync_wait ( makeTask ()) == " foo " ;
}when_all_ready() when_all_ready()関数を使用して、すべての入力awaitablesが完了したときに完了する新しいawaitableを作成できます。
入力タスクは、あらゆる種類のおもしろいものにすることができます。
返されたAwaitableがco_await edになると、それぞれの入力が順番に順番に入力when_all_ready()れたスレッドを順番に順番にco_awaitます。これらのタスクが同期して完了しない場合、それらは同時に実行されます。
入力のすべてのco_await式が、Awaitablesのすべてが完了するまで実行されると、返されたAwaitableが完了し、待ち望まれているCoroutineを再開します。待ち望んでいるコルーチンは、最後に完了するために、入力の糸で再開されます。
返されたAwaitableは、 co_await EDの場合、未解決の例外で失敗したとしても、CO_AWAIT EDの場合、例外をスローしないことが保証されています。
ただし、 when_all_ready()コール自体は、各入力のwaitablesを待つために必要なコルーチンフレームにメモリを割り当てることができない場合std::bad_allocをスローする場合があることに注意してください。また、コピー/移動コンストラクターから待機可能なオブジェクトが投げられた場合、例外をスローすることもできます。
co_awaitが返されたawaitableの結果は、 when_all_task<RESULT>オブジェクトのstd::tupleまたはstd::vectorです。これらのオブジェクトを使用すると、対応する出力タスクのwhen_all_task<RESULT>::result()メソッドを呼び出すことにより、各入力の結果(または例外)を個別に取得できます。これにより、発信者は複数のAwaitablesを同時に待って、完成を同期することができ、その後、 co_await操作の各操作の結果を成功/失敗の結果を検査する能力を保持します。
これはwhen_all()個々のco_await操作の障害により、全体的な操作が例外なく失敗する場合とは異なります。これは、どのコンポーネントco_await操作が失敗したかを判断できず、他のco_await操作の結果を取得できないことを意味します。
API概要:
// <cppcoro/when_all_ready.hpp>
namespace cppcoro
{
// Concurrently await multiple awaitables.
//
// Returns an awaitable object that, when co_await'ed, will co_await each of the input
// awaitable objects and will resume the awaiting coroutine only when all of the
// component co_await operations complete.
//
// Result of co_await'ing the returned awaitable is a std::tuple of detail::when_all_task<T>,
// one for each input awaitable and where T is the result-type of the co_await expression
// on the corresponding awaitable.
//
// AWAITABLES must be awaitable types and must be movable (if passed as rvalue) or copyable
// (if passed as lvalue). The co_await expression will be executed on an rvalue of the
// copied awaitable.
template < typename ... AWAITABLES>
auto when_all_ready (AWAITABLES&&... awaitables)
-> Awaitable<std::tuple<detail::when_all_task<typename awaitable_traits<AWAITABLES>::await_result_t>...>>;
// Concurrently await each awaitable in a vector of input awaitables.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t >
auto when_all_ready (std::vector<AWAITABLE> awaitables)
-> Awaitable<std::vector<detail::when_all_task<RESULT>>>;
}使用例:
task<std::string> get_record ( int id);
task<> example1 ()
{
// Run 3 get_record() operations concurrently and wait until they're all ready.
// Returns a std::tuple of tasks that can be unpacked using structured bindings.
auto [task1, task2, task3] = co_await when_all_ready (
get_record ( 123 ),
get_record ( 456 ),
get_record ( 789 ));
// Unpack the result of each task
std::string& record1 = task1. result ();
std::string& record2 = task2. result ();
std::string& record3 = task3. result ();
// Use records....
}
task<> example2 ()
{
// Create the input tasks. They don't start executing yet.
std::vector<task<std::string>> tasks;
for ( int i = 0 ; i < 1000 ; ++i)
{
tasks. emplace_back ( get_record (i));
}
// Execute all tasks concurrently.
std::vector<detail::when_all_task<std::string>> resultTasks =
co_await when_all_ready ( std::move (tasks));
// Unpack and handle each result individually once they're all complete.
for ( int i = 0 ; i < 1000 ; ++i)
{
try
{
std::string& record = tasks[i]. result ();
std::cout << i << " = " << record << std::endl;
}
catch ( const std:: exception & ex)
{
std::cout << i << " : " << ex. what () << std::endl;
}
}
}when_all() when_all()関数を使用して、 co_await edが入力のそれぞれを同時にco_await 、個々の結果の集合体を返すという新しいawaitableを作成できます。
返されたAwaitableが待っていると、現在のスレッド上の各入力がco_await 。最初の待望が一時停止すると、2番目のタスクが開始されます。オペレーションは、すべてが完了するまで実行されるまで同時に実行します。
すべてのコンポーネントco_await操作が完了するまで実行されると、結果の集合体は個々の結果から構築されます。入力タスクのいずれかによって例外がスローされる場合、または集計結果の構築が例外をスローする場合、例外は返されたAwaitableのco_awaitから伝播します。
複数のco_await操作が例外を除いて失敗した場合、例外の1つはco_await when_all()から伝播します。どの操作の例外が選択されるかは指定されていません。
どのコンポーネントco_await操作が失敗したかを知ることが重要である場合、または他の操作の結果を取得する能力を保持する場合、一部の操作が失敗した場合でも、代わりにwhen_all_ready()を使用する必要があります。
API概要:
// <cppcoro/when_all.hpp>
namespace cppcoro
{
// Variadic version.
//
// Note that if the result of `co_await awaitable` yields a void-type
// for some awaitables then the corresponding component for that awaitable
// in the tuple will be an empty struct of type detail::void_value.
template < typename ... AWAITABLES>
auto when_all (AWAITABLES&&... awaitables)
-> Awaitable<std::tuple<typename awaitable_traits<AWAITABLES>::await_result_t...>>;
// Overload for vector<Awaitable<void>>.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t ,
std:: enable_if_t <std::is_void_v<RESULT>, int > = 0 >
auto when_all (std::vector<AWAITABLE> awaitables)
-> Awaitable<void>;
// Overload for vector<Awaitable<NonVoid>> that yield a value when awaited.
template <
typename AWAITABLE,
typename RESULT = typename awaitable_traits<AWAITABLE>:: await_result_t ,
std:: enable_if_t <!std::is_void_v<RESULT>, int > = 0 >
auto when_all (std::vector<AWAITABLE> awaitables)
-> Awaitable<std::vector<std::conditional_t<
std::is_lvalue_reference_v<RESULT>,
std::reference_wrapper<std::remove_reference_t<RESULT>>,
std::remove_reference_t<RESULT>>>>;
}例:
task<A> get_a ();
task<B> get_b ();
task<> example1 ()
{
// Run get_a() and get_b() concurrently.
// Task yields a std::tuple<A, B> which can be unpacked using structured bindings.
auto [a, b] = co_await when_all ( get_a (), get_b ());
// use a, b
}
task<std::string> get_record ( int id);
task<> example2 ()
{
std::vector<task<std::string>> tasks;
for ( int i = 0 ; i < 1000 ; ++i)
{
tasks. emplace_back ( get_record (i));
}
// Concurrently execute all get_record() tasks.
// If any of them fail with an exception then the exception will propagate
// out of the co_await expression once they have all completed.
std::vector<std::string> records = co_await when_all ( std::move (tasks));
// Process results
for ( int i = 0 ; i < 1000 ; ++i)
{
std::cout << i << " = " << records[i] << std::endl;
}
}fmap() fmap()関数を使用して、コンテナタイプ内に含まれる値に呼び出し可能な関数を適用し、関数を適用した結果の新しいコンテナ型を含む値に戻すことができます。
fmap()関数は、型generator<T> 、 recursive_generator<T> 、 async_generator<T> task<T>値に関数Awaitable適用できます。
これらの各タイプは、2つの引数を取得するfmap()のオーバーロードを提供します。適用する関数とコンテナ値。サポートされているfmap()オーバーロードの各タイプのドキュメントを参照してください。
たとえば、 fmap()関数を使用して、 task<T>の最終的な結果に関数を適用することができ、関数のreturn-valueに完全な新しいtask<U>を生成します。
// Given a function you want to apply that converts
// a value of type A to value of type B.
B a_to_b (A value);
// And a task that yields a value of type A
cppcoro::task<A> get_an_a ();
// We can apply the function to the result of the task using fmap()
// and obtain a new task yielding the result.
cppcoro::task<B> bTask = fmap(a_to_b, get_an_a());
// An alternative syntax is to use the pipe notation.
cppcoro::task<B> bTask = get_an_a() | cppcoro::fmap(a_to_b);API概要:
// <cppcoro/fmap.hpp>
namespace cppcoro
{
template < typename FUNC>
struct fmap_transform
{
fmap_transform (FUNC&& func) noexcept (std::is_nothrow_move_constructible_v<FUNC>);
FUNC func;
};
// Type-deducing constructor for fmap_transform object that can be used
// in conjunction with operator|.
template < typename FUNC>
fmap_transform<FUNC> fmap (FUNC&& func);
// operator| overloads for providing pipe-based syntactic sugar for fmap()
// such that the expression:
// <value-expr> | cppcoro::fmap(<func-expr>)
// is equivalent to:
// fmap(<func-expr>, <value-expr>)
template < typename T, typename FUNC>
decltype ( auto ) operator |(T&& value, fmap_transform<FUNC>&& transform);
template < typename T, typename FUNC>
decltype ( auto ) operator |(T&& value, fmap_transform<FUNC>& transform);
template < typename T, typename FUNC>
decltype ( auto ) operator |(T&& value, const fmap_transform<FUNC>& transform);
// Generic overload for all awaitable types.
//
// Returns an awaitable that when co_awaited, co_awaits the specified awaitable
// and applies the specified func to the result of the 'co_await awaitable'
// expression as if by 'std::invoke(func, co_await awaitable)'.
//
// If the type of 'co_await awaitable' expression is 'void' then co_awaiting the
// returned awaitable is equivalent to 'co_await awaitable, func()'.
template <
typename FUNC,
typename AWAITABLE,
std:: enable_if_t <is_awaitable_v<AWAITABLE>, int > = 0 >
auto fmap (FUNC&& func, AWAITABLE&& awaitable)
-> Awaitable<std::invoke_result_t<FUNC, typename awaitable_traits<AWAITABLE>::await_result_t>>;
} fmap()関数は、引数依存のルックアップ(ADL)によって正しい過負荷を検索するように設計されているため、通常、 cppcoro:: prefixなしで呼び出される必要があります。
resume_on() resume_on()関数を使用して、待ち望まれているときに待ち望まれているコルーチンを再開する実行コンテキストを制御できます。 async_generatorに適用すると、どの実行コンテキストco_await g.begin()およびco_await ++it操作が待機中のコルーチンを再開するかを制御します。
通常、待ち望まれているCoroutine of Awaitable( taskなど)またはasync_generator 、操作が完了したスレッドで実行を再開します。場合によっては、これが実行を続けたいスレッドではない場合があります。これらの場合resume_on()関数を使用して、指定されたスケジューラに関連付けられたスレッドで実行を再開する新しいwaitableまたはジェネレーターを作成できます。
resume_on()関数は、新しいawaitable/generatorを返す通常の関数として使用できます。または、パイプラインシンタックスで使用できます。
例:
task<record> load_record ( int id);
ui_thread_scheduler uiThreadScheduler;
task<> example ()
{
// This will start load_record() on the current thread.
// Then when load_record() completes (probably on an I/O thread)
// it will reschedule execution onto thread pool and call to_json
// Once to_json completes it will transfer execution onto the
// ui thread before resuming this coroutine and returning the json text.
task<std::string> jsonTask =
load_record ( 123 )
| cppcoro::resume_on ( threadpool::default ())
| cppcoro::fmap (to_json)
| cppcoro::resume_on (uiThreadScheduler);
// At this point, all we've done is create a pipeline of tasks.
// The tasks haven't started executing yet.
// Await the result. Starts the pipeline of tasks.
std::string jsonText = co_await jsonTask;
// Guaranteed to be executing on ui thread here.
someUiControl. set_text (jsonText);
}API概要:
// <cppcoro/resume_on.hpp>
namespace cppcoro
{
template < typename SCHEDULER, typename AWAITABLE>
auto resume_on (SCHEDULER& scheduler, AWAITABLE awaitable)
-> Awaitable<typename awaitable_traits<AWAITABLE>::await_traits_t>;
template < typename SCHEDULER, typename T>
async_generator<T> resume_on (SCHEDULER& scheduler, async_generator<T> source);
template < typename SCHEDULER>
struct resume_on_transform
{
explicit resume_on_transform (SCHEDULER& scheduler) noexcept ;
SCHEDULER& scheduler;
};
// Construct a transform/operation that can be applied to a source object
// using "pipe" notation (ie. operator|).
template < typename SCHEDULER>
resume_on_transform<SCHEDULER> resume_on (SCHEDULER& scheduler) noexcept ;
// Equivalent to 'resume_on(transform.scheduler, std::forward<T>(value))'
template < typename T, typename SCHEDULER>
decltype ( auto ) operator |(T&& value, resume_on_transform<SCHEDULER> transform)
{
return resume_on (transform. scheduler , std::forward<T>(value));
}
}schedule_on() schedule_on()関数を使用して、特定のawaitableまたはasync_generator実行を開始する実行コンテキストを変更できます。
async_generatorに適用されると、 co_yieldステートメントの後に再開する実行コンテキストにも影響します。
schedule_on Transformは、Awaitableまたはasync_generatorが結果を完了または獲得するスレッドを指定していないことに注意してください。
操作が完了するスレッドを制御する変換についてはresume_on()演算子を参照してください。
例えば:
task< int > get_value ();
io_service ioSvc;
task<> example ()
{
// Starts executing get_value() on the current thread.
int a = co_await get_value ();
// Starts executing get_value() on a thread associated with ioSvc.
int b = co_await schedule_on (ioSvc, get_value ());
}API概要:
// <cppcoro/schedule_on.hpp>
namespace cppcoro
{
// Return a task that yields the same result as 't' but that
// ensures that 't' is co_await'ed on a thread associated with
// the specified scheduler. Resulting task will complete on
// whatever thread 't' would normally complete on.
template < typename SCHEDULER, typename AWAITABLE>
auto schedule_on (SCHEDULER& scheduler, AWAITABLE awaitable)
-> Awaitable<typename awaitable_traits<AWAITABLE>::await_result_t>;
// Return a generator that yields the same sequence of results as
// 'source' but that ensures that execution of the coroutine starts
// execution on a thread associated with 'scheduler' and resumes
// after a 'co_yield' on a thread associated with 'scheduler'.
template < typename SCHEDULER, typename T>
async_generator<T> schedule_on (SCHEDULER& scheduler, async_generator<T> source);
template < typename SCHEDULER>
struct schedule_on_transform
{
explicit schedule_on_transform (SCHEDULER& scheduler) noexcept ;
SCHEDULER& scheduler;
};
template < typename SCHEDULER>
schedule_on_transform<SCHEDULER> schedule_on (SCHEDULER& scheduler) noexcept ;
template < typename T, typename SCHEDULER>
decltype ( auto ) operator |(T&& value, schedule_on_transform<SCHEDULER> transform);
}awaitable_traits<T>このテンプレートのメタ機能を使用して、結果のco_await式のタイプがタイプTの式に適用された場合に何をするかを判断できます。
これは、CoroutineのPromiseオブジェクトによって適用されたawait_transformの影響を受けないコンテキストで、タイプTの値が待機されていることに注意してください。そのようなコンテキストでタイプTの値が待機されている場合、結果は異なる場合があります。
awaitable_traits<T>テンプレートメタフェンションは、 awaiter_tまたはawait_result_t Tネストされたtypedefsを定義しません。これにより、 Tがお待ちしていない場合に過負荷を無効にするSFINAEコンテキストでの使用が可能になります。
API概要:
// <cppcoro/awaitable_traits.hpp>
namespace cppcoro
{
template < typename T>
struct awaitable_traits
{
// The type that results from applying `operator co_await()` to a value
// of type T, if T supports an `operator co_await()`, otherwise is type `T&&`.
typename awaiter_t = <unspecified>;
// The type of the result of co_await'ing a value of type T.
typename await_result_t = <unspecified>;
};
}is_awaitable<T> is_awaitable<T>テンプレートメタ機能を使用すると、特定のタイプをco_await EDにすることができるかどうかを照会できます。
API概要:
// <cppcoro/is_awaitable.hpp>
namespace cppcoro
{
template < typename T>
struct is_awaitable : std::bool_constant<...>
{};
template < typename T>
constexpr bool is_awaitable_v = is_awaitable<T>::value;
}Awaitable<T>コンセプトAwaitable<T>は、 await_transform co_await負荷がないコルーチンのコンテキストでタイプをco_await Tできることを示す概念です。
たとえば、タイプtask<T>概念Awaitable<T&&>に実装しますが、タイプtask<T>& concept Awaitable<T&>を実装します。
Awaiter<T>コンセプトAn Awaiter<T> 、待ち合わせコルーチンを一時停止/再開するためにプロトコルを実装するために必要なawait_ready 、 await_suspend 、 await_resumeメソッドを含むタイプを示す概念です。
Awaiter<T>を満足させるタイプは、タイプのインスタンスのために、 awaiter :
awaiter.await_ready() - > boolawaiter.await_suspend(std::experimental::coroutine_handle<void>{}) - > void or boolまたはstd::experimental::coroutine_handle<P> for P 。awaiter.await_resume() - > T Awaiter<T>コンセプトを実装する任意のタイプは、 Awaitable<T>概念も実装します。
SchedulerコンセプトScheduler 、いくつかの実行コンテキスト内でコルーチンのスケジューリングを実行できるようにする概念です。
concept Scheduler
{
Awaitable< void > schedule ();
} Scheduler概念を実装し、タイプSのインスタンスsを実装するSを与えられます。
s.schedule()メソッドは、 co_await s.schedule()現在のコルーチンを無条件に停止し、 sに関連付けられた実行コンテキストの再開をスケジュールするように、awaitable-typeを返します。co_await s.schedule()式の結果にはタイプがvoidにあります。 cppcoro::task<> f (Scheduler& scheduler)
{
// Execution of the coroutine is initially on the caller's execution context.
// Suspends execution of the coroutine and schedules it for resumption on
// the scheduler's execution context.
co_await scheduler. schedule ();
// At this point the coroutine is now executing on the scheduler's
// execution context.
}DelayedScheduler概念DelayedScheduler 、指定された期間が経過した後、コルーチンがスケジューラの実行コンテキストで実行のためにそれ自体をスケジュールできるようにする概念です。
concept DelayedScheduler : Scheduler
{
template < typename REP, typename RATIO>
Awaitable< void > schedule_after (std::chrono::duration<REP, RATIO> delay);
template < typename REP, typename RATIO>
Awaitable< void > schedule_after (
std::chrono::duration<REP, RATIO> delay,
cppcoro::cancellation_token cancellationToken);
} DelayedSchedulerとインスタンス、 Sのsを実装するタイプ、 Sが与えられます。
s.schedule_after(delay)メソッドは、Co_await s.schedule_after(遅延)がsに関連する実行コンテキストの再開のためにCoroutineをdelayする前に、 co_await s.schedule_after(delay)を停止するように待っているオブジェクトを返します。co_await s.schedule_after(delay)式にはタイプのvoidがあります。Cppcoro Libraryは、Visual Studio 2017のWindowsの下に建物とClang 5.0+を備えたLinuxをサポートしています。
このライブラリは、ケーキビルドシステムを使用しています(いいえ、C#ONEではなく)。
ケーキビルドシステムはGitサブモジュールとして自動的にチェックアウトされるため、個別にダウンロードまたはインストールする必要はありません。
このライブラリには現在、Visual Studio 2017以降とWindows 10 SDKが必要です。
Clang(#3)とLinux(#15)のサポートが計画されています。
ケーキビルドシステムはPythonに実装されており、Python 2.7をインストールする必要があります。
Python 2.7インタープリターがあなたのパスにあり、「Python」として利用可能であることを確認してください。
Visual Studio 2017アップデート3がインストールされていることを確認してください。アップデート2以前のコルーチンには、更新3で修正されたいくつかの既知の問題があることに注意してください。
また、https://vcppdogfooding.azurewabsites.net/からNugetパッケージをダウンロードし、.nyugetファイルをディレクトリに解凍することにより、Visual Studioコンパイラの実験バージョンを使用することもできます。 config.cakeファイルを更新して、次の行を変更および除外して、解凍した場所を指すようにします。
nugetPath = None # r'C:PathToVisualCppTools.14.0.25224-Pre'Windows 10 SDKがインストールされていることを確認してください。デフォルトでは、最新のWindows 10 SDKとUniversal C Runtimeバージョンを使用します。
CPPCOROリポジトリは、GITサブモジュールを使用して、ケーキビルドシステムのソースを引き込みます。
これはgit cloneコマンドに--recursiveフラグを渡す必要があることを意味します。例えば。
c:Code> git clone --recursive https://github.com/lewissbaker/cppcoro.git
すでにCPPCOROをクローニングしている場合は、変更を引いた後にサブモジュールを更新する必要があります。
c:Codecppcoro> git submodule update --init --recursive
コマンドラインから構築するには、ワークスペースルートで「cake.bat」を実行するだけです。
例えば。
C:cppcoro> cake.bat
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Compiling testmain.cpp
Compiling testmain.cpp
Compiling testmain.cpp
Compiling testmain.cpp
...
Linking buildwindows_x86_msvc14.10_debugtestrun.exe
Linking buildwindows_x64_msvc14.10_optimisedtestrun.exe
Linking buildwindows_x86_msvc14.10_optimisedtestrun.exe
Linking buildwindows_x64_msvc14.10_debugtestrun.exe
Generating code
Finished generating code
Generating code
Finished generating code
Build succeeded.
Build took 0:00:02.419.
デフォルトでは、引数なしでcake実行すると、すべてのビルドバリアントを使用してすべてのプロジェクトを構築し、ユニットテストを実行します。追加のコマンドライン引数を渡すことにより、構築されたものを絞り込むことができます。例えば。
c:cppcoro> cake.bat release=debug architecture=x64 lib/build.cake
Building with C:UsersLewisCodecppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Archiving buildwindows_x64_msvc14.10_debuglibcppcoro.lib
Build succeeded.
Build took 0:00:00.321.
cake --help利用可能なコマンドラインオプションをリストします。
Visual Studio内から開発するには、 cake.bat -pを実行することで.vcproj/.slnファイルを構築できます。
例えば。
c:cppcoro> cake.bat -p
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='debug', platform='windows', architecture='x64', compilerFamily='msvc', compiler='msvc14.10')
Building with C:cppcoroconfig.cake - Variant(release='optimised', platform='windows', architecture='x86', compilerFamily='msvc', compiler='msvc14.10')
Generating Solution build/project/cppcoro.sln
Generating Project build/project/cppcoro_tests.vcxproj
Generating Filters build/project/cppcoro_tests.vcxproj.filters
Generating Project build/project/cppcoro.vcxproj
Generating Filters build/project/cppcoro.vcxproj.filters
Build succeeded.
Build took 0:00:00.247.
Visual Studio内からこれらのプロジェクトを構築すると、コンピレーションを実行するためにケーキに呼びかけます。
CPPCOROプロジェクトは、Clang+ LIBC ++ 5.0以降を使用してLinuxの下で構築することもできます。
CPPCOROの構築は、Ubuntu 17.04でテストされています。
次のパッケージをインストールしていることを確認してください。
これは、ClangとLibc ++が構築およびインストールされていると仮定しています。
Clangがまだ構成されていない場合は、Cppcoroで構築するためのClangのセットアップに関する詳細については、次のセクションを参照してください。
CPPCOROとそのサブモジュールをチェックアウトします。
git clone --recursive https://github.com/lewissbaker/cppcoro.git cppcoro
init.shを実行してcakeバッシュ機能をセットアップします。
cd cppcoro
source init.sh
次に、ワークスペースルートからcakeを実行してCPPCOROを構築し、テストを実行できます。
$ cake
追加のコマンドライン引数を指定して、ビルドをカスタマイズできます。
--help 、コマンドライン引数のためにヘルプを印刷します--debug=run実行中のビルドコマンドラインが表示されますrelease=debugまたはrelease=optimisedビルドバリアントがデバッグまたは最適化されたものに制限されます(デフォルトでは両方をビルドします)。lib/build.cakeテストではなく、cppcoroライブラリを構築するだけです。test/build.cake@task_tests.cpp 、特定のソースファイルをコンパイルするだけですtest/build.cake@testresult 、テストをビルドして実行します例えば:
$ cake --debug=run release=debug lib/build.cake
Clangコンパイラが/usr/bin/clangにない場合は、 cakeの次のコマンドラインオプションの1つ以上を使用して代替場所を指定できます。
--clang-executable=<name> - clangの代わりに使用するClang実行可能ファイル名を指定します。例えば。 Clang 8.0 Passの使用を強制するために--clang-executable=clang-8--clang-executable=<abspath> - Clang Execupableへのフルパスを指定します。ビルドシステムは、同じディレクトリ内の他の実行可能ファイルも探します。このパスにフォーム<prefix>/bin/<name>がある場合、これにより、デフォルトのClang-Install-Prefixを<prefix>に設定します。--clang-install-prefix=<path> -Clangがインストールされているパスを指定します。これにより、ビルドシステムは<path>/binの下でClangを探します( --clang-executableによってオーバーライドされない限り)。--libcxx-install-prefix=<path> -LIBC ++がインストールされているパスを指定します。デフォルトでは、ビルドシステムは、Clangと同じ場所にあるLIBC ++を探します。別の場所にインストールされている場合は、このコマンドラインオプションを使用します。例:デフォルトの場所にインストールされたClangの特定のバージョンを使用します
$ cake --clang-executable=clang-8
例:カスタム場所からClangのデフォルトバージョンを使用します
$ cake --clang-install-prefix=/path/to/clang-install
例:別の場所からLIBC ++を使用して、カスタムの場所でClangの特定のバージョンを使用します
$ cake --clang-executable=/path/to/clang-install/bin/clang-8 --libcxx-install-prefix=/path/to/libcxx-install
If your Linux distribution does not have a version of Clang 5.0 or later available, you can install a snapshot build from the LLVM project.
Follow instructions at http://apt.llvm.org/ to setup your package manager to support pulling from the LLVM package manager.
For example, for Ubuntu 17.04 Zesty:
Edit /etc/apt/sources.list and add the following lines:
deb http://apt.llvm.org/zesty/ llvm-toolchain-zesty main
deb-src http://apt.llvm.org/zesty/ llvm-toolchain-zesty main
Install the PGP key for those packages:
$ wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | sudo apt-key add -
Install Clang and LLD:
$ sudo apt-get install clang-6.0 lld-6.0
The LLVM snapshot builds do not include libc++ versions so you'll need to build that yourself.以下を参照してください。
You can also use the bleeding-edge Clang version by building Clang from source yourself.
See instructions here:
To do this you will need to install the following pre-requisites:
$ sudo apt-get install git cmake ninja-build clang lld
Note that we are using your distribution's version of clang to build clang from source. GCC could also be used here instead.
Checkout LLVM + Clang + LLD + libc++ repositories:
mkdir llvm
cd llvm
git clone --depth=1 https://github.com/llvm-mirror/llvm.git llvm
git clone --depth=1 https://github.com/llvm-mirror/clang.git llvm/tools/clang
git clone --depth=1 https://github.com/llvm-mirror/lld.git llvm/tools/lld
git clone --depth=1 https://github.com/llvm-mirror/libcxx.git llvm/projects/libcxx
ln -s llvm/tools/clang clang
ln -s llvm/tools/lld lld
ln -s llvm/projects/libcxx libcxx
Configure and build Clang:
mkdir clang-build
cd clang-build
cmake -GNinja
-DCMAKE_CXX_COMPILER=/usr/bin/clang++
-DCMAKE_C_COMPILER=/usr/bin/clang
-DCMAKE_BUILD_TYPE=MinSizeRel
-DCMAKE_INSTALL_PREFIX="/path/to/clang/install"
-DCMAKE_BUILD_WITH_INSTALL_RPATH="yes"
-DLLVM_TARGETS_TO_BUILD=X86
-DLLVM_ENABLE_PROJECTS="lld;clang"
../llvm
ninja install-clang
install-clang-headers
install-llvm-ar
install-lld
The cppcoro project requires libc++ as it contains the <experimental/coroutine> header required to use C++ coroutines under Clang.
Checkout libc++ + llvm :
mkdir llvm
cd llvm
git clone --depth=1 https://github.com/llvm-mirror/llvm.git llvm
git clone --depth=1 https://github.com/llvm-mirror/libcxx.git llvm/projects/libcxx
ln -s llvm/projects/libcxx libcxx
Build libc++ :
mkdir libcxx-build
cd libcxx-build
cmake -GNinja
-DCMAKE_CXX_COMPILER="/path/to/clang/install/bin/clang++"
-DCMAKE_C_COMPILER="/path/to/clang/install/bin/clang"
-DCMAKE_BUILD_TYPE=Release
-DCMAKE_INSTALL_PREFIX="/path/to/clang/install"
-DLLVM_PATH="../llvm"
-DLIBCXX_CXX_ABI=libstdc++
-DLIBCXX_CXX_ABI_INCLUDE_PATHS="/usr/include/c++/6.3.0/;/usr/include/x86_64-linux-gnu/c++/6.3.0/"
../libcxx
ninja cxx
ninja install
This will build and install libc++ into the same install directory where you have clang installed.
The cppcoro port in vcpkg is kept up to date by Microsoft team members and community contributors. The url of vcpkg is: https://github.com/Microsoft/vcpkg . You can download and install cppcoro using the vcpkg dependency manager:
git clone https://github.com/Microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh # ./bootstrap-vcpkg.bat for Windows
./vcpkg integrate install
./vcpkg install cppcoroバージョンが古くなっている場合は、VCPKGリポジトリに問題を作成するか、リクエストをプルしてください。
GitHub issues are the primary mechanism for support, bug reports and feature requests.
Contributions are welcome and pull-requests will be happily reviewed. I only ask that you agree to license any contributions that you make under the MIT license.
If you have general questions about C++ coroutines, you can generally find someone to help in the #coroutines channel on Cpplang Slack group.