Concurrencpp bringt die Kraft von gleichzeitigen Aufgaben in die C ++ - Welt, sodass Entwickler mithilfe von Aufgaben, Ausführern und Coroutinen einfach übereinstimmende Anwendungen schreiben können. Durch die Verwendung von ConcurrenCPP-Anwendungen können große Verfahren aufschlüsseln, die asynchron in kleinere Aufgaben verarbeitet werden müssen, die gleichzeitig ausgeführt werden, und auf kooperative Weise zusammenarbeiten, um das gewünschte Ergebnis zu erzielen. Mit Concurrencpp können Anwendungen auch parallele Coroutinen parallele Algorithmen schreiben.
Die Hauptvorteile von Concurrencpp sind:
std::thread und std::mutex .co_await .executor APIthread_pool_executor apimanual_executor APIresultresult APIlazy_result Typlazy_result APIresult_promise apiresult_promise Beispielshared_result APIshared_result Beispielmake_ready_resultmake_exceptional_resultwhen_allwhen_anyresume_ontimer_queue -APItimer -APIgenerator -APIgeneratorasync_lock APIscoped_async_lock apiasync_lockasync_condition_variable APIasync_condition_variable Beispielruntime -APItasktask -APIConcurrencpp basiert auf dem Konzept der gleichzeitigen Aufgaben. Eine Aufgabe ist eine asynchrone Operation. Aufgaben bieten ein höheres Abstraktionsniveau für den gleichzeitigen Code als herkömmliche thread-zentrierte Ansätze. Aufgaben können miteinander gekettet werden, was bedeutet, dass Aufgaben ihr asynchrones Ergebnis von einer zur anderen übergeben, wobei das Ergebnis einer Aufgabe so verwendet wird, als wäre es ein Parameter oder ein Zwischenwert einer anderen laufenden Aufgabe. Mit Aufgaben können Anwendungen die verfügbaren Hardware-Ressourcen besser verwenden und viel mehr skalieren als die Verwendung von RAW-Threads, da Aufgaben suspendiert werden können und auf eine andere Aufgabe warten, um ein Ergebnis zu erzielen, ohne die zugrunde liegenden OS-Threads zu blockieren. Aufgaben bringen den Entwicklern viel mehr Produktivität, indem sie sich mehr auf geschäftsführende und weniger auf Konzepte auf niedriger Ebene wie Thread Management und Inter-Thread-Synchronisation konzentrieren können.
Während Aufgaben angeben, welche Aktionen ausgeführt werden müssen, sind Executoren Arbeiter-Objekte, die angeben , wo und wie Aufgaben ausgeführt werden. Ausführende ersparen Anwendungen die mühsame Verwaltung von Threadpools und Task -Warteschlangen. Die Executoren entkoppeln diese Konzepte außerdem vom Anwendungscode ab, indem sie eine einheitliche API zum Erstellen und Planungsaufgaben bereitstellen.
Aufgaben kommunizieren unter Verwendung von Ergebnisobjekten miteinander. Ein Ergebnisobjekt ist ein asynchrones Rohr, das das asynchrone Ergebnis einer Aufgabe an eine andere fortlaufende Aufgabe übergibt. Die Ergebnisse können auf nicht blockierende Weise erwartet und gelöst werden.
Diese drei Konzepte - die Aufgabe, der Testamentsvollstrecker und das zugehörige Ergebnis sind die Bausteine von Concurrencpp. Ausführende führen Aufgaben aus, die miteinander kommunizieren, indem sie Ergebnisse über Ergebnis-Objekte senden. Aufgaben, Testamentszeiten und Ergebnisobjekte arbeiten symbiotisch zusammen, um einen gleichzeitigen Code zu erzeugen, der schnell und sauber ist.
Concurrencpp basiert auf dem Raii -Konzept. Um Aufgaben und Ausführende zu verwenden, erstellen Anwendungen zu Beginn der main eine runtime . Die Laufzeit wird dann verwendet, um vorhandene Ausführende zu erwerben und neue benutzerdefinierte Ausführende zu registrieren. Ausführende werden verwendet, um Aufgaben zum Ausführen zu erstellen und zu planen, und sie können ein result zurückgeben, mit dem das asynchrone Ergebnis an eine andere Aufgabe geleitet werden kann, die als Verbraucher fungiert. Wenn die Laufzeit zerstört wird, iteriert sie jeden gespeicherten Testamentsvollstrecker und ruft seine shutdown auf. Jeder Testamentsvollstrecker verlässt dann anmutig. Außerplanmäßige Aufgaben werden zerstört, und Versuche, neue Aufgaben zu erstellen, werden eine Ausnahme machen.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::runtime runtime;
auto result = runtime. thread_executor ()-> submit ([] {
std::cout << " hello world " << std::endl;
});
result. get ();
return 0 ;
} In diesem grundlegenden Beispiel haben wir ein Laufzeitobjekt erstellt, dann haben wir den Thread Executor aus der Laufzeit erfasst. Wir haben uns submit , um eine Lambda als unseren gegebenen Anruf zu bestehen. Diese Lambda kehrt void zurück, daher gibt der Testamentsvollstrecker ein result<void> Objekt zurück, das das asynchrone Ergebnis an den Anrufer zurückgibt. main get , die den Hauptfaden blockiert, bis das Ergebnis fertig ist. Wenn keine Ausnahme geworfen wurde, get die Rückkehr void . Wenn eine Ausnahme ausgelöst wurde, get sie erneut. Asynchron startet thread_executor einen neuen Ausführungsthread und führt die angegebene Lambda aus. Es ist implizit co_return void und die Aufgabe ist abgeschlossen. main wird dann nicht blockiert.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < vector >
# include < algorithm >
# include < ctime >
using namespace concurrencpp ;
std::vector< int > make_random_vector () {
std::vector< int > vec ( 64 * 1'024 );
std::srand ( std::time ( nullptr ));
for ( auto & i : vec) {
i = :: rand ();
}
return vec;
}
result< size_t > count_even (std::shared_ptr<thread_pool_executor> tpe, const std::vector< int >& vector) {
const auto vecor_size = vector. size ();
const auto concurrency_level = tpe-> max_concurrency_level ();
const auto chunk_size = vecor_size / concurrency_level;
std::vector<result< size_t >> chunk_count;
for ( auto i = 0 ; i < concurrency_level; i++) {
const auto chunk_begin = i * chunk_size;
const auto chunk_end = chunk_begin + chunk_size;
auto result = tpe-> submit ([&vector, chunk_begin, chunk_end]() -> size_t {
return std::count_if (vector. begin () + chunk_begin, vector. begin () + chunk_end, []( auto i) {
return i % 2 == 0 ;
});
});
chunk_count. emplace_back ( std::move (result));
}
size_t total_count = 0 ;
for ( auto & result : chunk_count) {
total_count += co_await result;
}
co_return total_count;
}
int main () {
concurrencpp::runtime runtime;
const auto vector = make_random_vector ();
auto result = count_even (runtime. thread_pool_executor (), vector);
const auto total_count = result. get ();
std::cout << " there are " << total_count << " even numbers in the vector " << std::endl;
return 0 ;
} In diesem Beispiel starten wir das Programm, indem wir ein Laufzeitobjekt erstellen. Wir erstellen einen Vektor, der mit Zufallszahlen gefüllt ist, dann erwerben wir den thread_pool_executor aus der Laufzeit und rufen Sie count_even an. count_even ist eine Coroutine, die mehr Aufgaben und co_await erzeugt, damit sie im Inneren fertig werden können. max_concurrency_level gibt die maximale Anzahl von Arbeitnehmern zurück, die der Testamentsvollstrecker im Fall von ThreadPool -Testamentszeiten unterstützt, die Anzahl der Arbeitnehmer wird aus der Anzahl der Kerne berechnet. Anschließend partitionieren wir das Array, um die Anzahl der Arbeitnehmer zu entsprechen, und senden jeden Chunk, der in seiner eigenen Aufgabe verarbeitet werden soll. Asynchron zählen die Arbeiter, wie viele gleiche Zahlen jeder Chunk enthält, und co_return das Ergebnis. count_even fasst jedes Ergebnis zusammen, indem die Anzahl mit co_await gezogen wird. Das Endergebnis wird dann co_return ed. Der Haupt -Thread, der durch Aufrufen get blockiert wurde, ist entsperrt und die Gesamtzahl wird zurückgegeben. Das Hauptgrad druckt die Anzahl der gleichmäßigen Zahlen und das Programm endet anmutig.
Jeder große oder komplexe Betrieb kann zu kleineren und kettenbaren Schritten unterteilt werden. Aufgaben sind asynchrone Operationen, die diese Rechenschritte implementieren. Aufgaben können überall mit Hilfe von Testamentszeiten ausgeführt werden. Während Aufgaben aus regulären Anrufleuten (z. B. Functors und Lambdas) erstellt werden können, werden Aufgaben hauptsächlich mit Coroutinen verwendet, die eine reibungslose Suspension und Wiederaufnahme ermöglichen. In Concurrencpp wird das Task -Konzept durch die concurrencpp::task -Klasse dargestellt. Obwohl das Aufgabenkonzept für übereinstimmende Übereinstimmung von zentraler Bedeutung ist, müssen Anwendungen selten selbst Taskobjekte erstellen und manipulieren, da die Taskobjekte von der Laufzeit ohne externe Hilfe erstellt und geplant werden.
Mit Concurrencpp können Anwendungen Coroutinen als Hauptmethode zum Erstellen von Aufgaben erstellen und konsumieren. Concurrencpp unterstützt sowohl eifrige als auch faule Aufgaben.
Die eifrigen Aufgaben fangen in dem Moment an, in dem sie aufgerufen werden. Diese Art der Ausführung wird empfohlen, wenn Anwendungen eine asynchrone Aktion abfeuern und später sein Ergebnis konsumieren (Feuer und später konsumieren) oder das asynchrone Ergebnis (Feuer und Vergessen) vollständig ignorieren.
Eifrige Aufgaben können result oder null_result zurückgeben. result -Typ fordert die Coroutine an, den zurückgegebenen Wert oder die geworfene Ausnahme (Feuer und später konsumieren) zu bestehen, während null_result die Coroutine an sagt, sie solle fallen und ignorieren (Feuer und vergessen).
Eager Coroutinen können im Anrufer -Thread synchron ausgeführt werden. Diese Art von Coroutinen heißt "reguläre Coroutinen". Concurrencpp Eager Coroutines können auch parallel laufen. In einem bestimmten Testamentsvollstrecker wird diese Art von Coroutinen als "parallele Coroutinen" bezeichnet.
Faule Aufgaben dagegen beginnen nur dann zu laufen, wenn co_await ed. Diese Art von Aufgaben wird empfohlen, wenn das Ergebnis der Aufgabe unmittelbar nach dem Erstellen der Aufgabe konsumiert werden soll. Aufgeschobene faule Aufgaben sind für den Fall des sofortigen Konsums etwas mehr optimiert, da sie keine spezielle Threadsynchronisation benötigen, um das asynchrone Ergebnis an seinen Verbraucher zurückzugeben. Der Compiler könnte auch einige Speicherzuweisungen optimieren, die zur Bildung des zugrunde liegenden Coroutine -Versprechens erforderlich sind. Es ist nicht möglich, eine faule Aufgabe zu entlassen und in der Zwischenzeit etwas anderes auszuführen-das Schießen einer faulen Callee-Coroutine bedeutet notwendigerweise die Aufhängung der Anrufer-Coroutine. Der Anrufer Coroutine wird nur wieder aufgenommen, wenn die Faulpallee Coroutine abgeschlossen ist. Faule Aufgaben können nur lazy_result zurückgeben.
Faule Aufgaben können in eifrige Aufgaben konvertiert werden, indem sie lazy_result::run anrufen. Diese Methode führt die faule Aufgabe aus und gibt ein result zurück, das die neu gestartete Aufgabe überwacht. Wenn Entwickler nicht sicher sind, welchen Ergebnistyp sie verwenden sollen, werden sie aufgefordert, faule Ergebnisse zu verwenden, da sie bei Bedarf in reguläre (eifrige) Ergebnisse umgewandelt werden können.
Wenn eine Funktion eine von lazy_result , result oder null_result zurückgibt und mindestens einen co_await oder co_return in ihrem Körper enthält, ist die Funktion eine Concurrencpp -Coroutine. Jede gültige Concurrencpp Coroutine ist eine gültige Aufgabe. In unserem obigen Beispiel für COUNT-Even-Beispiel ist count_even eine solche Korutine. Wir haben zuerst count_even hervorgebracht, dann haben der Threadpool -Testamentsvollstrecker mehr untergeordnete Aufgaben (die aus regulären Anrufleuten erstellt werden) erzeugt, die schließlich mit co_await verbunden wurden.
Ein Concurrencpp -Executor ist ein Objekt, das Aufgaben planen und ausführen kann. Die Manager vereinfachen die Arbeit von Ressourcen wie Threads, Thread -Pools und Aufgabenwarteschlangen, indem sie sie vom Anwendungscode wegkoppeln. Die Manager bieten eine einheitliche Möglichkeit zur Planung und Ausführung von Aufgaben, da sie alle concurrencpp::executor erweitern.
executor API class executor {
/*
Initializes a new executor and gives it a name.
*/
executor (std::string_view name);
/*
Destroys this executor.
*/
virtual ~executor () noexcept = default ;
/*
The name of the executor, used for logging and debugging.
*/
const std::string name;
/*
Schedules a task to run in this executor.
Throws concurrencpp::errors::runtime_shutdown exception if shutdown was called before.
*/
virtual void enqueue (concurrencpp::task task) = 0;
/*
Schedules a range of tasks to run in this executor.
Throws concurrencpp::errors::runtime_shutdown exception if shutdown was called before.
*/
virtual void enqueue (std::span<concurrencpp::task> tasks) = 0;
/*
Returns the maximum count of real OS threads this executor supports.
The actual count of threads this executor is running might be smaller than this number.
returns numeric_limits<int>::max if the executor does not have a limit for OS threads.
*/
virtual int max_concurrency_level () const noexcept = 0;
/*
Returns true if shutdown was called before, false otherwise.
*/
virtual bool shutdown_requested () const noexcept = 0;
/*
Shuts down the executor:
- Tells underlying threads to exit their work loop and joins them.
- Destroys unexecuted coroutines.
- Makes subsequent calls to enqueue, post, submit, bulk_post and
bulk_submit to throw concurrencpp::errors::runtime_shutdown exception.
- Makes shutdown_requested return true.
*/
virtual void shutdown () noexcept = 0;
/*
Turns a callable and its arguments into a task object and
schedules it to run in this executor using enqueue.
Arguments are passed to the task by decaying them first.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type , class ... argument_types>
void post (callable_type&& callable, argument_types&& ... arguments);
/*
Like post, but returns a result object that passes the asynchronous result.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type , class ... argument_types>
result<type> submit (callable_type&& callable, argument_types&& ... arguments);
/*
Turns an array of callables into an array of tasks and
schedules them to run in this executor using enqueue.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type >
void bulk_post (std::span<callable_type> callable_list);
/*
Like bulk_post, but returns an array of result objects that passes the asynchronous results.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type >
std::vector<concurrencpp::result<type>> bulk_submit (std::span<callable_type> callable_list);
};Wie oben erwähnt, bietet Concurrencpp häufig verwendete Ausführende. Diese Testamentsvollstrecker sind:
Thread Pool Executor - Ein Testamentsvollstrecker, der einen Threadpool unterhält. Der Thread Pool Executor eignet sich für kurze CPU-gebundene Aufgaben, die nicht blockieren. Bewerbungen werden aufgefordert, diesen Testamentsvollstrecker als Standard-Ausführungskraft für nicht blockierende Aufgaben zu verwenden. Der Concurrencpp -Thread -Pool bietet eine dynamische Fadeninjektion und ein dynamisches Arbeitsausgleich.
Hintergrund -Executor - Ein Threadpool -Executor mit einem größeren Pool von Threads. Geeignet, kurze Blockierungsaufgaben wie Datei -IO- und DB -Abfragen zu starten. Wichtiger Hinweis: Wenn die Ergebnisse konsumiert werden, hat dieser Testamentsvollstrecker mit dem Aufrufen von submit und bulk_submit zurückgegeben, es ist wichtig, die Ausführung mit resume_on zu einem CPU-gebundenen Ausführern zu wechseln, um zu verhindern, dass die CPU-gebundenen Aufgaben in Backboard_executor verarbeitet werden.
Beispiel:
auto result = background_executor.submit([] { /* some blocking action */ });
auto done_result = co_await result.resolve();
co_await resume_on (some_cpu_executor);
auto val = co_await done_result; // runs inside some_cpu_executorThread Executor - Ein Testamentsvollstrecker, der jede auftretende Aufgabe auf den Markt bringt, um einen neuen Ausführungsthread auszuführen. Themen werden nicht wiederverwendet. Dieser Testamentsvollstrecker ist gut für langjährige Aufgaben, wie Objekte, die eine Arbeitsschleife ausführen, oder lange Blockiervorgänge.
Worker Thread Executor - Ein einzelner Thread -Executor, der eine einzelne Task -Warteschlange verwaltet. Geeignet, wenn Anwendungen einen speziellen Thread wünschen, der viele verwandte Aufgaben ausführt.
Manual Executor - Ein Testamentsvollstrecker, der keine Coroutinen selbst ausführt. Der Anwendungscode kann bisherige Aufgaben ausführen, indem sie seine Ausführungsmethoden manuell aufrufen.
Ableitbarer Ausführender - Eine Basisklasse für benutzerdefinierte Executoren. Obwohl die Erben direkt von concurrencpp::executor möglich ist, verwendet derivable_executor das CRTP -Muster, das dem Compiler einige Optimierungsmöglichkeiten bietet.
Inline -Executor - hauptsächlich verwendet, um das Verhalten anderer Exekutoren zu überschreiben. Eine Aufgabe zu ergreifen, ist gleichbedeutend mit der Inline -Aufgabe.
Der nackte Mechanismus eines Testamentsvollstreckers ist in seiner enqueue -Methode eingekapselt. Diese Methode befasst sich mit einer Aufgabe für die Ausführung und hat zwei Überladungen: Eine Überladung empfängt ein einzelnes Task -Objekt als Argument und eine andere, die eine Zeitspanne von Aufgabenobjekten empfängt. Die zweite Überlastung wird verwendet, um eine Aufgabenstapel zu unterteilen. Dies ermöglicht eine bessere Planung von Heuristiken und eine verminderte Konkurrenz.
Anwendungen müssen sich nicht allein auf enqueue verlassen. concurrencpp::executor bietet eine API für die Planung von Benutzern Calables, indem sie sie in Aufgabenobjekte hinter den Kulissen konvertieren. Anträge können die Ausführende auffordern, ein Ergebnisobjekt zurückzugeben, das das asynchrone Ergebnis des bereitgestellten Anrufs übergibt. Dies erfolgt durch den Aufruf von executor::submit und executor::bulk_submit . submit erhält ein Callable und gibt ein Ergebnisobjekt zurück. executor::bulk_submit erhält eine span von Anrufleuten und gibt einen vector von submit auf ähnliche Weise zurück. In vielen Fällen interessieren sich Anwendungen nicht für den asynchronen Wert oder die Ausnahme. In diesem Fall können Bewerbungen executor:::post und executor::bulk_post verwenden, um eine aufrufbare oder eine span von Anrufleuten zu planen, die ausgeführt werden sollen, aber auch die Aufgabe anweisen, einen zurückgegebenen Wert oder eine Ausnahme auszugeben. Das asynchrone Ergebnis nicht zu bestehen ist schneller als vorübergehend, aber dann haben wir keine Möglichkeit, den Status oder das Ergebnis der laufenden Aufgabe zu kennen.
post , bulk_post , submit und bulk_submit verwenden Sie enqueue hinter den Kulissen für den zugrunde liegenden Planungsmechanismus.
thread_pool_executor api Abgesehen von post , submit , bulk_post und bulk_submit liefert der thread_pool_executor diese zusätzlichen Methoden.
class thread_pool_executor {
/*
Returns the number of milliseconds each thread-pool worker
remains idle (lacks any task to execute) before exiting.
This constant can be set by passing a runtime_options object
to the constructor of the runtime class.
*/
std::chrono::milliseconds max_worker_idle_time () const noexcept ;
};manual_executor API Abgesehen von post , submit , bulk_post und bulk_submit liefert der manual_executor diese zusätzlichen Methoden.
class manual_executor {
/*
Destructor. Equivalent to clear.
*/
~manual_executor () noexcept ;
/*
Returns the number of enqueued tasks at the moment of invocation.
This number can change quickly by the time the application handles it, it should be used as a hint.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
size_t size () const noexcept ;
/*
Queries whether the executor is empty from tasks at the moment of invocation.
This value can change quickly by the time the application handles it, it should be used as a hint.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
bool empty () const noexcept ;
/*
Clears the executor from any enqueued but yet to-be-executed tasks,
and returns the number of cleared tasks.
Tasks enqueued to this executor by (post_)submit method are resumed
and errors::broken_task exception is thrown inside them.
Ongoing tasks that are being executed by loop_once(_XXX) or loop(_XXX) are uneffected.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t clear ();
/*
Tries to execute a single task. If at the moment of invocation the executor
is empty, the method does nothing.
Returns true if a task was executed, false otherwise.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool loop_once ();
/*
Tries to execute a single task.
This method returns when either a task was executed or max_waiting_time
(in milliseconds) has reached.
If max_waiting_time is 0, the method is equivalent to loop_once.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool loop_once_for (std::chrono::milliseconds max_waiting_time);
/*
Tries to execute a single task.
This method returns when either a task was executed or timeout_time has reached.
If timeout_time has already expired, this method is equivalent to loop_once.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
bool loop_once_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Tries to execute max_count enqueued tasks and returns the number of tasks that were executed.
This method does not wait: it returns when the executor
becomes empty from tasks or max_count tasks have been executed.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t loop ( size_t max_count);
/*
Tries to execute max_count tasks.
This method returns when either max_count tasks were executed or a
total amount of max_waiting_time has passed.
If max_waiting_time is 0, the method is equivalent to loop.
Returns the actual amount of tasks that were executed.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t loop_for ( size_t max_count, std::chrono::milliseconds max_waiting_time);
/*
Tries to execute max_count tasks.
This method returns when either max_count tasks were executed or timeout_time has reached.
If timeout_time has already expired, the method is equivalent to loop.
Returns the actual amount of tasks that were executed.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
size_t loop_until ( size_t max_count, std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Waits for at least one task to be available for execution.
This method should be used as a hint,
as other threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
void wait_for_task ();
/*
This method returns when one or more tasks are available for
execution or max_waiting_time has passed.
Returns true if at at least one task is available for execution, false otherwise.
This method should be used as a hint, as other threads (calling loop, for example)
might empty the executor, before this thread has a chance to do something
with the newly enqueued tasks.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool wait_for_task_for (std::chrono::milliseconds max_waiting_time);
/*
This method returns when one or more tasks are available for execution or timeout_time has reached.
Returns true if at at least one task is available for execution, false otherwise.
This method should be used as a hint,
as other threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
bool wait_for_task_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
This method returns when max_count or more tasks are available for execution.
This method should be used as a hint, as other threads
(calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
void wait_for_tasks ( size_t max_count);
/*
This method returns when max_count or more tasks are available for execution
or max_waiting_time (in milliseconds) has passed.
Returns the number of tasks available for execution when the method returns.
This method should be used as a hint, as other
threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t wait_for_tasks_for ( size_t count, std::chrono::milliseconds max_waiting_time);
/*
This method returns when max_count or more tasks are available for execution
or timeout_time is reached.
Returns the number of tasks available for execution when the method returns.
This method should be used as a hint, as other threads
(calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
size_t wait_for_tasks_until ( size_t count, std::chrono::time_point<clock_type, duration_type> timeout_time);
}; Asynchrone Werte und Ausnahmen können mithilfe von Concurrencpp -Ergebnisobjekten konsumiert werden. Der result repräsentiert das asynchrone Ergebnis einer eifrigen Aufgabe, während lazy_result das aufgeschobene Ergebnis einer faulen Aufgabe darstellt.
Wenn eine Aufgabe (eifrig oder faul) abgeschlossen ist, gibt sie entweder einen gültigen Wert zurück oder legt eine Ausnahme aus. In beiden Fällen wird dieses asynchrone Ergebnis an den Verbraucher des Ergebnisobjekts weitergegeben.
result bilden asymmetrische Coroutinen-die Ausführung einer Anrufer-Coroutine erfolgt nicht durch die Ausführung einer Callee-Coroutine, beide Coroutinen können unabhängig ausgeführt werden. Erst wenn das Ergebnis der Callee-Coroutine verbraucht wird, kann die Anrufer-Korutine aufgehängt werden, die auf die Callee wartet, um sie zu vervollständigen. Bis zu diesem Zeitpunkt laufen beide Coroutinen unabhängig. Die Callee-Coroutine läuft, ob sein Ergebnis verzehrt wird oder nicht.
lazy_result Objekte bilden symmetrische Coroutinen-Ausführung einer Callee-Coroutine erfolgt erst nach der Aufhängung der Anrufer-Coroutine. Wenn Sie auf ein faules Ergebnis warten, wird die aktuelle Coroutine suspendiert und die faule Aufgabe, die dem faulen Ergebnis zugeordnet ist, beginnt zu laufen. Nachdem die Callee-Coroutine abgeschlossen und ein Ergebnis erzielt wird, wird die Anrufer-Coroutine wieder aufgenommen. Wenn ein fauler Ergebnis nicht verzehrt wird, beginnt die damit verbundene faule Aufgabe nie zu läuft.
Alle Ergebnisobjekte sind nur ein Bewegungstyp, und als solche können sie nicht verwendet werden, nachdem ihr Inhalt auf ein anderes Ergebnisobjekt verschoben wurde. In diesem Fall wird das Ergebnisobjekt als leer angesehen und versucht, eine andere Methode als operator bool aufzurufen und operator = wird eine Ausnahme ausgeben.
Nachdem das asynchrone Ergebnis aus dem Ergebnisobjekt herausgezogen wurde (zum Beispiel durch Aufrufen von get oder operator co_await ), wird das Ergebnisobjekt leer. Die Leere kann mit operator bool getestet werden.
Warten auf ein Ergebnis bedeutet, die aktuelle Coroutine zu suspendieren, bis das Ergebnisobjekt fertig ist. Wenn ein gültiger Wert aus der zugehörigen Aufgabe zurückgegeben wurde, wird er aus dem Ergebnisobjekt zurückgegeben. Wenn die zugehörige Aufgabe eine Ausnahme ausgelöst hat, wird sie wiedergeworfen. Zum Zeitpunkt des Wartens wird die aktuelle Coroutine sofort wieder aufgenommen, wenn das Ergebnis bereits fertig ist. Andernfalls wird es von dem Thread wieder aufgenommen, der das asynchrone Ergebnis oder die Ausnahme festlegt.
Das Auflösen eines Ergebnisses ähnelt dem Warten auf das, es zu erwarten. Der Unterschied besteht darin, dass der Ausdruck co_await das Ergebnisobjekt selbst in einer nicht leeren Form in einem fertigen Zustand zurückgibt. Das asynchrone Ergebnis kann dann durch Verwendung get oder co_await gezogen werden.
Jedes Ergebnisobjekt hat einen Status, der den Zustand des asynchronen Ergebniss anzeigt. Der Ergebnisstatus variiert von result_status::idle (das asynchrone Ergebnis oder die Ausnahme wurde noch nicht produziert) bis result_status::value (die zugeordnete Aufgabe, die anmutig beendet wird, indem ein gültiger Wert zurückgegeben wird) an result_status::exception (die Aufgabe, die durch Ausschalten einer Ausnahme beendet wurde). Der Status kann durch Anruf (lazy_)result::status abgefragt werden.
result Der result repräsentiert das Ergebnis einer laufenden, asynchronen Aufgabe, ähnlich wie std::future .
Abgesehen davon, dass sie Ergebnis-Objekte warten und auflösen können, können sie auch darauf gewartet werden, indem sie eines von result::wait , result::wait_for , result::wait_until oder result::get anrufen. Das Warten auf ein Ergebnis zu beenden ist eine Blockierungsoperation (für den Fall ist das asynchrone Ergebnis nicht fertig) und wird den gesamten Ausführungsfaden aussetzen, der darauf wartet, dass das asynchrone Ergebnis verfügbar wird. Wartevorgänge sind im Allgemeinen entmutigt und nur in Aufgaben auf Wurzelebene oder in Kontexten zugelassen, die es ermöglichen, wie den Hauptfaden zu blockieren, der darauf wartet, dass der Rest der Anwendung anmutig fertig ist, oder die Verwendung von concurrencpp::blocking_executor oder concurrencpp::thread_executor .
Warten auf Ergebnisobjekte mit co_await (und damit die aktuelle Funktion/Aufgabe auch in eine Coroutine verwandelt), ist die bevorzugte Methode, um Ergebnisobjekte zu konsumieren, da sie nicht zugrunde liegende Threads blockiert.
result API class result {
/*
Creates an empty result that isn't associated with any task.
*/
result () noexcept = default ;
/*
Destroys the result. Associated tasks are not cancelled.
The destructor does not block waiting for the asynchronous result to become ready.
*/
~result () noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result (result&& rhs) noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.
*/
result& operator = (result&& rhs) noexcept = default ;
/*
Returns true if this is a non-empty result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The returned value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Blocks the current thread of execution until this result is ready,
when status() != result_status::idle.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void wait ();
/*
Blocks until this result is ready or duration has passed. Returns the status
of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class duration_unit , class ratio >
result_status wait_for (std::chrono::duration<duration_unit, ratio> duration);
/*
Blocks until this result is ready or timeout_time has reached. Returns the status
of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class clock , class duration >
result_status wait_until (std::chrono::time_point<clock, duration> timeout_time);
/*
Blocks the current thread of execution until this result is ready,
when status() != result_status::idle.
If the result is a valid value, it is returned, otherwise, get rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
type get ();
/*
Returns an awaitable used to await this result.
If the result is already ready - the current coroutine resumes
immediately in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended
and resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to resolve this result.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
};lazy_result TypEin faules Ergebnisobjekt repräsentiert das Ergebnis einer aufgeschobenen faulen Aufgabe.
lazy_result hat die Verantwortung, sowohl die damit verbundene faule Aufgabe zu starten als auch ihr aufgeschobenes Ergebnis an seinen Verbraucher zu übergeben. Wenn Sie erwartet oder gelöst werden, setzt das faule Ergebnis die aktuelle Coroutine aus und startet die zugehörige faule Aufgabe. Wenn die zugehörige Aufgabe abgeschlossen ist, wird ihr asynchroner Wert an die Caller -Aufgabe übergeben, die dann wieder aufgenommen wird.
Manchmal kann eine API ein faules Ergebnis zurückgeben, aber die Anwendungen benötigen ihre zugehörige Aufgabe, um eifrig auszuführen (ohne die Anruferaufgabe auszusetzen). In diesem Fall können faule Aufgaben in eifrige Aufgaben konvertiert werden, indem er den zugehörigen faulen Ergebnis run . In diesem Fall beginnt die zugehörige Aufgabe, Inline auszuführen, ohne die Anruferaufgabe auszusetzen. Das ursprüngliche faule Ergebnis wird geleert und ein gültiges result , das die neu gestartete Aufgabe überwacht, wird stattdessen zurückgegeben.
lazy_result API class lazy_result {
/*
Creates an empty lazy result that isn't associated with any task.
*/
lazy_result () noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
lazy_result (lazy_result&& rhs) noexcept ;
/*
Destroys the result. If not empty, the destructor destroys the associated task without resuming it.
*/
~lazy_result () noexcept ;
/*
Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.
If *this is not empty, then operator= destroys the associated task without resuming it.
*/
lazy_result& operator =(lazy_result&& rhs) noexcept ;
/*
Returns true if this is a non-empty result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The returned value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Returns an awaitable used to start the associated task and await this result.
If the result is already ready - the current coroutine resumes immediately
in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended and
resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to start the associated task and resolve this result.
If the result is already ready - the current coroutine resumes immediately
in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended and resumed
when the asynchronous result is ready, by the thread which
had set the asynchronous value or exception.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
/*
Runs the associated task inline and returns a result object that monitors the newly started task.
After this call, *this is empty.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
*/
result<type> run ();
};Regelmäßige eifrige Coroutinen werden im Rufnetz der Ausführung synchron ausgeführt. Die Ausführung kann sich auf einen anderen Ausführungsthread verlagern, wenn eine Coroutine beispielsweise eine Umschleppung durchläuft, indem auf ein unreweriertes Ergebnisobjekt darin wartet. Concurrencpp bietet auch parallele Coroutinen, die in einem bestimmten Testamentsvollstrecker ausgeführt werden und nicht im aufgerufenen Ausführungsthread. Diese Art der Planung von Coroutinen ist besonders hilfreich beim Schreiben paralleler Algorithmen, rekursiven Algorithmen und gleichzeitigen Algorithmen, die das Fork-Join-Modell verwenden.
Jede parallele Coroutine muss die folgenden Voraussetzungen erfüllen:
result / null_result zurück.executor_tag als erstes Argument.type* / type& / std::shared_ptr<type> , wobei type eine konkrete Klasse von executor als zweites Argument ist.co_await oder co_return in seinem Körper. Wenn alle oben genannten gilt, ist die Funktion eine parallele Coroutine: Concurrencpp startet die Coroutine, die suspendiert ist, und planen Sie sie sofort um, um sie im bereitgestellten Testamentsvollstrecker auszuführen. concurrencpp::executor_tag ist ein Dummy -Platzhalter, der die Concurrencpp -Laufzeit mitteilt, dass diese Funktion keine regelmäßige Funktion ist, sondern in dem gegebenen Ausführenden ausführen muss. Wenn der Testamentsvollstrecker an die parallele Coroutine übergeht, wird die Coroutine nicht ausgeführt und eine Ausnahme std::invalid_argument wird synchron ausgelöst. Wenn alle Voraussetzungen erfüllt sind, können Anwendungen das Ergebnis der parallelen Coroutine mithilfe des zurückgegebenen Ergebnisobjekts konsumieren.
In diesem Beispiel berechnen wir das 30-te Mitglied der Fibonacci-Sequenz parallel. Wir beginnen, jeden Fibonacci -Schritt in einer eigenen parallelen Korutine zu starten. Das erste Argument ist ein Dummy executor_tag und das zweite Argument ist der Threadpool -Ausführende. Jeder rekursive Schritt beruft sich auf eine neue parallele Coroutine, die parallel läuft. Jedes Ergebnis wird an seine übergeordnete Aufgabe co_return und unter Verwendung von co_await erfasst.
Wenn wir den Eingang als klein genug halten, um synchron zu berechnen (wenn curr <= 10 ), hören wir nicht mehr jeden rekursiven Schritt in seiner eigenen Aufgabe aus und lösen nur den Algorithmus synchron.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace concurrencpp ;
int fibonacci_sync ( int i) {
if (i == 0 ) {
return 0 ;
}
if (i == 1 ) {
return 1 ;
}
return fibonacci_sync (i - 1 ) + fibonacci_sync (i - 2 );
}
result< int > fibonacci (executor_tag, std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10 ) {
co_return fibonacci_sync (curr);
}
auto fib_1 = fibonacci ({}, tpe, curr - 1 );
auto fib_2 = fibonacci ({}, tpe, curr - 2 );
co_return co_await fib_1 + co_await fib_2;
}
int main () {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci ({}, runtime. thread_pool_executor (), 30 ). get ();
std::cout << " fibonacci(30) = " << fibb_30 << std::endl;
return 0 ;
} Um zu vergleichen, wird der gleiche Code geschrieben, ohne parallele Coroutinen zu verwenden und sich auf executor::submit allein zu verlassen. Da fibonacci ein result<int> zurückgibt, wird das Senden von Sendern über executor::submit ein result<result<int>> .
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace concurrencpp ;
int fibonacci_sync ( int i) {
if (i == 0 ) {
return 0 ;
}
if (i == 1 ) {
return 1 ;
}
return fibonacci_sync (i - 1 ) + fibonacci_sync (i - 2 );
}
result< int > fibonacci (std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10 ) {
co_return fibonacci_sync (curr);
}
auto fib_1 = tpe-> submit (fibonacci, tpe, curr - 1 );
auto fib_2 = tpe-> submit (fibonacci, tpe, curr - 2 );
co_return co_await co_await fib_1 +
co_await co_await fib_2;
}
int main () {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci (runtime. thread_pool_executor (), 30 ). get ();
std::cout << " fibonacci(30) = " << fibb_30 << std::endl;
return 0 ;
} Ergebnisobjekte sind die Hauptmethode, um Daten zwischen Aufgaben in Concurrencpp zu übergeben, und wir haben gesehen, wie Executoren und Coroutinen solche Objekte erzeugen. Manchmal möchten wir die Funktionen von Ergebnisobjekten mit Nicht-Tasks verwenden, beispielsweise bei der Verwendung einer Bibliothek von Drittanbietern. In diesem Fall können wir ein Ergebnisobjekt mithilfe eines result_promise vervollständigen. result_promise ähnelt result std::promise .
Genau wie Ergebnisobjekte sind Ergebnisversprechen ein nur Bewegungsart, der nach dem Umzug leer wird. In ähnlicher Weise wird das Ergebnisversprechen nach dem Einstellen eines Ergebniss oder einer Ausnahme ebenfalls leer. Wenn ein Ergebnisverständnis aus dem Umfang herauskommt und kein Ergebnis/Ausnahme festgelegt wurde, legt die Ergebnis-Promotion-Destruktorin eine concurrencpp::errors::broken_task CRAKTACE_TASK mithilfe der Methode set_exception fest. Suspendierte und blockierte Aufgaben, die auf das zugehörige Ergebnisobjekt warten, werden wieder aufgenommen/entsperrt.
Ergebnisversprechungen können den CODE -CODE -Stil in async/await : Immer wenn eine Komponente einen Rückruf benötigt, um das asynchrone Ergebnis zu übergeben, können wir einen Rückruf übergeben, set_result oder set_exception (abhängig vom asynchronen Ergebnis selbst) auf das übergebene Ergebnisversprechen und zurückgeben und das zugehörige Ergebnis zurückgeben.
result_promise api template < class type >
class result_promise {
/*
Constructs a valid result_promise.
Might throw std::bad_alloc if fails to allocate memory.
*/
result_promise ();
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result_promise (result_promise&& rhs) noexcept ;
/*
Destroys *this, possibly setting an errors::broken_task exception
by calling set_exception if *this is not empty at the time of destruction.
*/
~result_promise () noexcept ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result_promise& operator = (result_promise&& rhs) noexcept ;
/*
Returns true if this is a non-empty result-promise.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Sets a value by constructing <<type>> from arguments... in-place.
Makes the associated result object become ready - tasks waiting for it
to become ready are unblocked.
Suspended tasks are resumed inline.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Might throw any exception that the constructor
of type(std::forward<argument_types>(arguments)...) throws.
*/
template < class ... argument_types>
void set_result (argument_types&& ... arguments);
/*
Sets an exception.
Makes the associated result object become ready - tasks waiting for it
to become ready are unblocked.
Suspended tasks are resumed inline.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Throws std::invalid_argument exception if exception_ptr is null.
*/
void set_exception (std::exception_ptr exception_ptr);
/*
A convenience method that invokes a callable with arguments... and calls set_result
with the result of the invocation.
If an exception is thrown, the thrown exception is caught and set instead by calling set_exception.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Might throw any exception that callable(std::forward<argument_types>(arguments)...)
or the contructor of type(type&&) throw.
*/
template < class callable_type , class ... argument_types>
void set_from_function (callable_type&& callable, argument_types&& ... arguments);
/*
Gets the associated result object.
Throws errors::empty_result_promise exception If *this is empty.
Throws errors::result_already_retrieved exception if this method had been called before.
*/
result<type> get_result ();
};result_promise Beispiel: In diesem Beispiel wird result_promise verwendet, um Daten aus einem Thread zu drücken, und es kann aus seinem zugehörigen result aus einem anderen Thread gezogen werden.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::result_promise<std::string> promise;
auto result = promise. get_result ();
std::thread my_3_party_executor ([promise = std::move (promise)] () mutable {
std::this_thread::sleep_for ( std::chrono::seconds ( 1 )); // Imitate real work
promise. set_result ( " hello world " );
});
auto asynchronous_string = result. get ();
std::cout << " result promise returned string: " << asynchronous_string << std::endl;
my_3_party_executor. join ();
} In diesem Beispiel verwenden wir std::thread als Drittanbieter. Dies stellt ein Szenario dar, wenn ein nicht-konkrencpp-Executor als Teil des Anwendungslebenszyklus verwendet wird. Wir extrahieren das Ergebnisobjekt, bevor wir das Versprechen übergeben und den Hauptfaden blockieren, bis das Ergebnis fertig ist. In my_3_party_executor setzen wir ein Ergebnis so, als ob wir es co_return ed es haben.
Shared -Ergebnisse sind eine spezielle Art von Ergebnisobjekten, mit denen mehrere Verbraucher auf das asynchrone Ergebnis zugreifen können, ähnlich wie bei std::shared_future . Verschiedene Verbraucher aus verschiedenen Themen können Funktionen wie await , get und resolve auf sicherer Weise aufrufen.
Shared -Ergebnisse werden aus regulären Ergebnisobjekten erstellt und sind im Gegensatz zu regulären Ergebnisobjekten sowohl kopierbar als auch beweglich. Als solches verhält sich shared_result wie std::shared_ptr Typ. Wenn eine gemeinsame Ergebnisinstanz in eine andere Instanz verschoben wird, wird die Instanz leer, und der Versuch, darauf zuzugreifen, wird eine Ausnahme ausgeben.
Um mehrere Verbraucher zu unterstützen, geben gemeinsame Ergebnisse einen Verweis auf den asynchronen Wert zurück, anstatt ihn zu verschieben (wie reguläre Ergebnisse). Beispielsweise gibt ein shared_result<int> ein int& wenn get , await usw. aufgerufen. Wenn der zugrunde liegende Typ des shared_result void oder ein Referenztyp (wie int& ) ist, werden sie wie gewohnt zurückgegeben. Wenn das asynchrone Ergebnis eine geworfene Ausnahme ist, wird es wiedergeworfen.
Beachten Sie, dass der tatsächliche Wert beim Erwerb des asynchronen Ergebniss mit shared_result aus mehreren Threads nicht sicher ist. Beispielsweise können mehrere Threads eine asynchrone Ganzzahl erwerben, indem sie seine Referenz ( int& ) erhalten. Es macht die Ganzzahl selbst nicht sicher. Es ist in Ordnung, den asynchronen Wert zu mutieren, wenn der asynchrone Wert bereits fadensicher ist. Alternativ werden Anwendungen ermutigt, const Typen zunächst (wie const int ) zu verwenden und konstante Referenzen (wie const int& ) zu erwerben, die Mutation verhindern.
shared_result API class share_result {
/*
Creates an empty shared-result that isn't associated with any task.
*/
shared_result () noexcept = default ;
/*
Destroys the shared-result. Associated tasks are not cancelled.
The destructor does not block waiting for the asynchronous result to become ready.
*/
~shared_result () noexcept = default ;
/*
Converts a regular result object to a shared-result object.
After this call, rhs is empty.
Might throw std::bad_alloc if fails to allocate memory.
*/
shared_result (result<type> rhs);
/*
Copy constructor. Creates a copy of the shared result object that monitors the same task.
*/
shared_result ( const shared_result&) noexcept = default ;
/*
Move constructor. Moves rhs to *this. After this call, rhs is empty.
*/
shared_result (shared_result&& rhs) noexcept = default ;
/*
Copy assignment operator. Copies rhs to *this and monitors the same task that rhs monitors.
*/
shared_result& operator =( const shared_result& rhs) noexcept ;
/*
Move assignment operator. Moves rhs to *this. After this call, rhs is empty.
*/
shared_result& operator =(shared_result&& rhs) noexcept ;
/*
Returns true if this is a non-empty shared-result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The return value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Blocks the current thread of execution until this shared-result is ready,
when status() != result_status::idle.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void wait ();
/*
Blocks until this shared-result is ready or duration has passed.
Returns the status of this shared-result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class duration_type , class ratio_type >
result_status wait_for (std::chrono::duration<duration_type, ratio_type> duration);
/*
Blocks until this shared-result is ready or timeout_time has reached.
Returns the status of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class clock_type , class duration_type >
result_status wait_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Blocks the current thread of execution until this shared-result is ready,
when status() != result_status::idle.
If the result is a valid value, a reference to it is returned,
otherwise, get rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
std:: add_lvalue_reference_t <type> get ();
/*
Returns an awaitable used to await this shared-result.
If the shared-result is already ready - the current coroutine resumes
immediately in the calling thread of execution.
If the shared-result is not ready yet, the current coroutine is
suspended and resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, a reference to it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to resolve this shared-result.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
};shared_result Beispiel: In diesem Beispiel wird ein result in ein shared_result -Objekt umgewandelt und ein Verweis auf ein asynchrones int -Ergebnis wird durch viele mit thread_executor hervorgebrachte Aufgaben erfasst.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < chrono >
concurrencpp::result< void > consume_shared_result (concurrencpp::shared_result< int > shared_result,
std::shared_ptr<concurrencpp::executor> resume_executor) {
std::cout << " Awaiting shared_result to have a value " << std::endl;
const auto & async_value = co_await shared_result;
concurrencpp::resume_on (resume_executor);
std::cout << " In thread id " << std::this_thread::get_id () << " , got: " << async_value << " , memory address: " << &async_value << std::endl;
}
int main () {
concurrencpp::runtime runtime;
auto result = runtime. background_executor ()-> submit ([] {
std::this_thread::sleep_for ( std::chrono::seconds ( 1 ));
return 100 ;
});
concurrencpp::shared_result< int > shared_result ( std::move (result));
concurrencpp::result< void > results[ 8 ];
for ( size_t i = 0 ; i < 8 ; i++) {
results[i] = consume_shared_result (shared_result, runtime. thread_pool_executor ());
}
std::cout << " Main thread waiting for all consumers to finish " << std::endl;
auto tpe = runtime. thread_pool_executor ();
auto all_consumed = concurrencpp::when_all (tpe, std::begin (results), std::end (results)). run ();
all_consumed. get ();
std::cout << " All consumers are done, exiting " << std::endl;
return 0 ;
} Wenn das Laufzeitobjekt aus dem Umfang der main herauskommt, iteriert es jeden gespeicherten Ausführern und ruft seine shutdown auf. Der Versuch, auf die Timer-Queue oder einen Ausführenden zuzugreifen, wirft eine errors::runtime_shutdown -Ausnahme aus. Wenn ein Testamentsvollstrecker ausgeschaltet wird, löscht es seine inneren Aufgabenwarteschlangen und zerstört nicht ausgeführte task . Wenn ein Task-Objekt eine Concurrencpp-Coroutine speichert, wird diese Coroutine wieder aufgenommen, und eine errors::broken_task wird in ihn geworfen. In jedem Fall, in dem eine Ausnahme runtime_shutdown oder eine broken_task ausgelöst wird, sollten Anwendungen ihren aktuellen Code-Flow so schnell wie möglich ordnungsgemäß beenden. Diese Ausnahmen sollten nicht ignoriert werden. Sowohl runtime_shutdown als auch broken_task erben von errors::interrupted_task base class, und dieser Typ kann auch in einer catch verwendet werden, um die Beendigung auf einheitliche Weise zu verarbeiten.
Viele asynchrone Aktionen von Concurrencpp erfordern eine Instanz eines Vollstreckers als Lebenslauf -Testamentsvollstrecker . Wenn eine asynchrone Aktion (implementiert als Coroutine) synchron abgeschlossen werden kann, wird im aufrufenden Ausführungsthread unmittelbar fortgesetzt. Wenn die asynchrone Aktion nicht synchron fertig ist, wird sie nach Abschluss des gegebenen Lebenslauf-Exezuters wieder aufgenommen. Wenn beispielsweise when_any Nützlichkeitsfunktion der Nutzung eine Instanz eines Lebenslauf-Exezuters als erstes Argument erfordert. when_any zurückgibt eine lazy_result , die bereit wird, wenn mindestens ein angegebenes Ergebnis fertig wird. Wenn eines der Ergebnisse bereits zum Zeitpunkt der Anrufe, when_any , bereit ist, wird die Coroutine in der Anrufkoroutine in dem aufrufenden Ausführungsthread synchron wieder aufgenommen. Wenn nicht, wird die Coroutine auf dem Laufenden wieder aufgenommen, wenn zumindest das Ergebnis abgeschlossen ist, innerhalb des angegebenen Lebenslauf-Executors. Lebenslauf -Executoren sind wichtig, da sie vorschreiben, wo Coroutinen in Fällen wieder aufgenommen werden, in denen nicht klar ist, wo eine Coroutine wieder aufgenommen werden soll (z. B. bei der Zeit, when_any und when_all ), oder in Fällen, in denen die asynchrone Aktion in einem der Concurencpp -Arbeiter verarbeitet wird, die nur verwendet werden, die nur angewendet werden, und nicht angewendet.
make_ready_result -Funktion make_ready_result erstellt ein bereites Ergebnisobjekt aus angegebenen Argumenten. Wenn Sie auf ein solches Ergebnis warten, wird die aktuelle Coroutine sofort wieder aufgenommen. get und operator co_await gibt den konstruierten Wert zurück.
/*
Creates a ready result object by building <<type>> from arguments&&... in-place.
Might throw any exception that the constructor
of type(std::forward<argument_types>(arguments)...) throws.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type , class ... argument_types>
result<type> make_ready_result (argument_types&& ... arguments);
/*
An overload for void type.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
result< void > make_ready_result ();make_exceptional_result -Funktion make_exceptional_result erstellt ein bereites Ergebnisobjekt aus einer bestimmten Ausnahme. Wenn Sie auf ein solches Ergebnis warten, wird die aktuelle Coroutine sofort wieder aufgenommen. get und operator co_await werden die angegebene Ausnahme wiederwägen.
/*
Creates a ready result object from an exception pointer.
The returned result object will re-throw exception_ptr when calling get or await.
Throws std::invalid_argument if exception_ptr is null.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type >
result<type> make_exceptional_result (std::exception_ptr exception_ptr);
/*
Overload. Similar to make_exceptional_result(std::exception_ptr),
but gets an exception object directly.
Might throw any exception that the constructor of exception_type(std::move(exception)) might throw.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type , class exception_type >
result<type> make_exceptional_result (exception_type exception );when_all Funktion when_all ist eine Dienstprogrammfunktion, die ein faules Ergebnisobjekt erstellt, das bei Fertigstellung aller Eingabeergebnisse fertig ist. Das Warten auf dieses faule Ergebnis gibt alle Input-Reseult-Objekte in einem fertigen Zustand zurück, der bereit ist, konsumiert zu werden.
when_all die Funktion mit drei Geschmacksrichtungen ausgestattet ist - eine, die einen heterogenen Bereich von Ergebnisobjekten akzeptiert, ein anderes, das ein Paar Iteratoren zu einem Bereich von Ergebnisobjekten desselben Typs bringt, und zuletzt eine Überlastung, die überhaupt keine Ergebnisobjekte akzeptiert. Im Fall ohne Eingabeergebnisobjekte gibt die Funktion ein bereites Ergebnisobjekt eines leeren Tupels zurück.
Wenn einer der übergebenen Ergebnis-Objekte leer ist, wird eine Ausnahme ausgelöst. In diesem Fall werden Input-Result-Objekte von der Funktion nicht beeinflusst und können erneut verwendet werden, nachdem die Ausnahme behandelt wurde. Wenn alle Eingabeergebnisobjekte gültig sind, werden sie von dieser Funktion geleert und als Ausgabeergebnis in einem gültigen und bereiten Zustand zurückgegeben.
Derzeit akzeptiert die, when_all nur result .
Alle Überlastungen akzeptieren einen Lebenslauf -Executor als erster Parameter. Wenn Sie auf ein Ergebnis warten, die von when_all zurückgegeben wird, wird der Anrufer -Coroutine von dem gegebenen Lebenslauf -Testamentsvollstrecker wieder aufgenommen.
/*
Creates a result object that becomes ready when all the input results become ready.
Passed result objects are emptied and returned as a tuple.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class ... result_types>
lazy_result<std::tuple< typename std::decay<result_types>::type...>>
when_all (std::shared_ptr<executor_type> resume_executor,
result_types&& ... results);
/*
Overload. Similar to when_all(result_types&& ...) but receives a pair of iterators referencing a range.
Passed result objects are emptied and returned as a vector.
If begin == end, the function returns immediately with an empty vector.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class iterator_type >
lazy_result<std::vector< typename std::iterator_traits<iterator_type>::value_type>>
when_all (std::shared_ptr<executor_type> resume_executor,
iterator_type begin, iterator_type end);
/*
Overload. Returns a ready result object that doesn't monitor any asynchronous result.
Might throw an std::bad_alloc exception if no memory is available.
*/
lazy_result<std::tuple<>> when_all (std::shared_ptr<executor_type> resume_executor);when_any Funktion when_any ist eine Dienstprogrammfunktion, die ein faules Ergebnisobjekt erstellt, das bereit ist, wenn mindestens ein Eingabeergebnis abgeschlossen ist. Wenn Sie dieses Ergebnis erwarten, gibt es eine Helferstruktur zurück, die alle Eingabe-Result-Objekte sowie den Index der ausgefüllten Aufgabe enthält. Es könnte sein, dass zum Zeitpunkt des Verzehrs des Bereitschaftsergebnisses andere Ergebnisse möglicherweise bereits asynchron abgeschlossen worden sein. Anwendungen können anrufen, when_any wiederholt, um die Bereitschaftsergebnisse zu konsumieren, wenn sie abgeschlossen sind, bis alle Ergebnisse konsumiert sind.
when_any die Funktion mit nur zwei Geschmacksrichtungen ausgestattet ist - eine, die einen heterogenen Bereich von Ergebnisobjekten und ein anderes akzeptiert, das ein Paar Iteratoren zu einem Bereich von Ergebnisobjekten desselben Typs bringt. Im Gegensatz zu when_all ist es keine Bedeutung, auf mindestens eine Aufgabe zu warten, die er erledigen muss, wenn der Ergebnisbereich vollständig leer ist. Daher gibt es keine Überlastung ohne Argumente. Außerdem wird die Überlastung von zwei Iteratoren eine Ausnahme ausgelöst, wenn diese Iteratoren auf einen leeren Bereich verweisen (wenn begin == end ).
Wenn einer der übergebenen Ergebnis-Objekte leer ist, wird eine Ausnahme ausgelöst. In jedem Fall wird eine Ausnahme ausgelöst, Eingabe-Result-Objekte werden von der Funktion nicht beeinflusst und können erneut verwendet werden, nachdem die Ausnahme behandelt wurde. Wenn alle Eingabeergebnisobjekte gültig sind, werden sie von dieser Funktion geleert und in einem gültigen Zustand als Ausgabeergebnis zurückgegeben.
Derzeit when_any nur result .
Alle Überlastungen akzeptieren einen Lebenslauf -Executor als erster Parameter. Wenn Sie auf ein Ergebnis warten, die von when_any zurückgegeben wird, wird der Anrufer -Coroutine vom gegebenen Lebenslauf -Testamentsvollstrecker wieder aufgenommen.
/*
Helper struct returned from when_any.
index is the position of the ready result in results sequence.
results is either an std::tuple or an std::vector of the results that were passed to when_any.
*/
template < class sequence_type >
struct when_any_result {
std:: size_t index;
sequence_type results;
};
/*
Creates a result object that becomes ready when at least one of the input results is ready.
Passed result objects are emptied and returned as a tuple.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class ... result_types>
lazy_result<when_any_result<std::tuple<result_types...>>>
when_any (std::shared_ptr<executor_type> resume_executor,
result_types&& ... results);
/*
Overload. Similar to when_any(result_types&& ...) but receives a pair of iterators referencing a range.
Passed result objects are emptied and returned as a vector.
Throws std::invalid_argument if begin == end.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class iterator_type >
lazy_result<when_any_result<std::vector< typename std::iterator_traits<iterator_type>::value_type>>>
when_any (std::shared_ptr<executor_type> resume_executor,
iterator_type begin, iterator_type end);resume_on -Funktion resume_on gibt ein erwartbares zurück, das die aktuelle Korutine ausnutzt und es im Inneren gegebener executor wieder aufnimmt. Dies ist eine wichtige Funktion, die sicherstellt, dass eine Coroutine im richtigen Testamentsvollstrecker ausgeführt wird. Beispielsweise können Anwendungen eine Hintergrundaufgabe unter Verwendung des background_executor planen und auf das zurückgegebene Ergebnisobjekt warten. In diesem Fall wird die wartende Coroutine im Hintergrund -Testamentsvollstrecker wieder aufgenommen. Ein Aufruf an resume_on mit einem anderen CPU-gebundenen Executor stellt sicher, dass CPU-gebundene Codezeilen nicht auf dem Hintergrund-Executor ausgeführt werden, sobald die Hintergrundaufgabe abgeschlossen ist. Wenn eine Aufgabe neu geplant wird, um mit resume_on auf einem anderen Testamentsvollstrecker auszuführen, aber dieser Ausführende wird geschlossen, bevor sie die suspendierte Aufgabe wieder aufnehmen kann, wird diese Aufgabe sofort wieder aufgenommen und eine Ausnahme erros::broken_task wird ausgeworfen. In diesem Fall müssen Bewerbungen ziemlich anmutig.
/*
Returns an awaitable that suspends the current coroutine and resumes it inside executor.
Might throw any exception that executor_type::enqueue throws.
*/
template < class executor_type >
auto resume_on (std::shared_ptr<executor_type> executor);Concurrencpp bietet auch Timer und Timerwarteschlangen. Timer sind Objekte, die asynchrone Aktionen definieren, die auf einem Testamentsvollstrecker in einem gut definierten Zeitraum ausgeführt werden. Es gibt drei Arten von Timern - reguläre Timer , Onshot -Timer und Verzögerungsobjekte .
Regelmäßige Timer haben vier Eigenschaften, die sie definieren:
Wie andere Objekte in ConcurrencPP sind Timer nur ein bewegender Typ, der leer sein kann. Wenn ein Timer zerstört wird oder timer::cancel aufgerufen wird, storniert der Timer seine geplanten, aber noch nicht ausgeführten Aufgaben. Die laufenden Aufgaben sind nicht deaktiviert. Der Timer -Callable muss fadensicher sein. Es wird empfohlen, die ordnungsgemäße Zeit und die Häufigkeit von Timern auf eine Granularität von 50 Millisekunden zu setzen.
Eine Timer -Warteschlange ist ein Concurrencpp -Mitarbeiter, der eine Sammlung von Timern verwaltet und in nur einem Ausführungsthread verarbeitet. Es ist auch der Agent, der zum Erstellen neuer Timer verwendet wird. Wenn eine Timer-Frist (unabhängig davon, ob es sich um die Fälligkeit oder die Frequenz des Timers handelt), "feuert" die Timer-Warteschlange durch, indem er den Timer plant, indem er als Aufgabe auf dem zugehörigen Testamentsvollstrecker ausgeführt wird.
Genau wie ausführende Timer -Warteschlangen halten sich auch an das Raii -Konzept. Wenn das Laufzeitobjekt aus dem Umfang herauskommt, schaltet es die Timerwarteschlange ab und storniert alle anstehenden Timer. Nachdem eine Timer -Warteschlange geschlossen wurde, wird jeder nachfolgende Anruf zu make_timer , make_onshot_timer und make_delay_object eine errors::runtime_shutdown -Ausnahme werfen. Bewerbungen dürfen nicht versuchen, die Timer -Warteschlangen selbst zu schließen.
timer_queue -API: class timer_queue {
/*
Destroys this timer_queue.
*/
~timer_queue () noexcept ;
/*
Shuts down this timer_queue:
Tells the underlying thread of execution to quit and joins it.
Cancels all pending timers.
After this call, invocation of any method besides shutdown
and shutdown_requested will throw an errors::runtime_shutdown.
If shutdown had been called before, this method has no effect.
*/
void shutdown () noexcept ;
/*
Returns true if shutdown had been called before, false otherwise.
*/
bool shutdown_requested () const noexcept ;
/*
Creates a new running timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
template < class callable_type , class ... argumet_types>
timer make_timer (
std::chrono::milliseconds due_time,
std::chrono::milliseconds frequency,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new one-shot timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
template < class callable_type , class ... argumet_types>
timer make_one_shot_timer (
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new delay object where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
result< void > make_delay_object (
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor);
};timer -API: class timer {
/*
Creates an empty timer.
*/
timer () noexcept = default ;
/*
Cancels the timer, if not empty.
*/
~timer () noexcept ;
/*
Moves the content of rhs to *this.
rhs is empty after this call.
*/
timer (timer&& rhs) noexcept = default ;
/*
Moves the content of rhs to *this.
rhs is empty after this call.
Returns *this.
*/
timer& operator = (timer&& rhs) noexcept ;
/*
Cancels this timer.
After this call, the associated timer_queue will not schedule *this
to run again and *this becomes empty.
Scheduled, but not yet executed tasks are cancelled.
Ongoing tasks are uneffected.
This method has no effect if *this is empty or the associated timer_queue has already expired.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void cancel ();
/*
Returns the associated executor of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::shared_ptr<executor> get_executor () const ;
/*
Returns the associated timer_queue of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::weak_ptr<timer_queue> get_timer_queue () const ;
/*
Returns the due time of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::chrono::milliseconds get_due_time () const ;
/*
Returns the frequency of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::chrono::milliseconds get_frequency () const ;
/*
Sets new frequency for this timer.
Callables already scheduled to run at the time of invocation are not affected.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
void set_frequency (std::chrono::milliseconds new_frequency);
/*
Returns true is *this is not an empty timer, false otherwise.
The timer should not be used if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
};In diesem Beispiel erstellen wir einen regulären Timer mit der Timer -Warteschlange. Der Timer plant, dass es nach 1,5 Sekunden dauert, und feuert dann alle 2 Sekunden auf. Der angegebene Anruf wird auf dem Threadpool -Ausführenden ausgeführt.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
int main () {
concurrencpp::runtime runtime;
std:: atomic_size_t counter = 1 ;
concurrencpp::timer timer = runtime. timer_queue ()-> make_timer (
1500ms,
2000ms,
runtime. thread_pool_executor (),
[&] {
const auto c = counter. fetch_add ( 1 );
std::cout << " timer was invoked for the " << c << " th time " << std::endl;
});
std::this_thread::sleep_for (12s);
return 0 ;
}Ein OneShot -Timer ist ein einmaliger Timer mit nur einer ordnungsgemäßen Zeit - nachdem er plant, dass er aufgeregt hat, um auszuführen, sobald er nie wieder geplant ist, um es wieder zu laufen.
In diesem Beispiel erstellen wir einen Timer, der nur einmal ausgeführt wird - nach 3 Sekunden nach seiner Erstellung plant der Timer seinen auf einem neuen Ausführungsthread ausgeführten Timer (mit thread_executor ).
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
int main () {
concurrencpp::runtime runtime;
concurrencpp::timer timer = runtime. timer_queue ()-> make_one_shot_timer (
3000ms,
runtime. thread_executor (),
[&] {
std::cout << " hello and goodbye " << std::endl;
});
std::this_thread::sleep_for (4s);
return 0 ;
} Ein Verzögerungsobjekt ist ein faules Ergebnisobjekt, das bereit wird, wenn es co_await ED ist und seine zuständige Zeit erreicht ist. Anwendungen können dieses Ergebnisobjekt co_await können, um die aktuelle Coroutine auf nicht blockierende Weise zu verzögern. Die aktuelle Coroutine wird von dem Testamentsvollstrecker wieder aufgenommen, der an make_delay_object übergeben wurde.
In diesem Beispiel erzeugen wir eine Aufgabe (die keine Ergebnisse oder Ausnahme zurückgeben), die sich in einer Schleife verzögert, indem sie co_await auf einem Verzögerungsobjekt aufruft.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
concurrencpp::null_result delayed_task (
std::shared_ptr<concurrencpp::timer_queue> tq,
std::shared_ptr<concurrencpp::thread_pool_executor> ex) {
size_t counter = 1 ;
while ( true ) {
std::cout << " task was invoked " << counter << " times. " << std::endl;
counter++;
co_await tq-> make_delay_object (1500ms, ex);
}
}
int main () {
concurrencpp::runtime runtime;
delayed_task (runtime. timer_queue (), runtime. thread_pool_executor ());
std::this_thread::sleep_for (10s);
return 0 ;
} Ein Generator ist eine faule, synchrone Korutine, die einen Strom von Werten erzeugen kann. Generatoren verwenden das Keyword co_yield , um ihren Verbrauchern Werte zurückzugeben.
Generatoren sollen synchron verwendet werden - sie können nur das Schlüsselwort co_yield verwenden und dürfen das Schlüsselwort co_await nicht verwenden. Ein Generator erzeugt weiterhin Werte, solange das Schlüsselwort co_yield aufgerufen wird. Wenn das Schlüsselwort co_return aufgerufen wird (explizit oder implizit), hört der Generator auf, Werte zu produzieren. Wenn eine Ausnahme ausgelöst wird, hört der Generator auf, Werte zu produzieren, und die geworfene Ausnahme wird an den Verbraucher des Generators zurückgeführt.
Generatoren sollen in einer range-for Schleife verwendet werden: Generatoren produzieren implizit zwei Iteratoren - begin und end die Ausführung der for -Loop. Diese Iteratoren sollten nicht manuell behandelt oder abgerufen werden.
Wenn ein Generator erstellt wird, beginnt er als faule Aufgabe. Wenn seine begin aufgerufen wird, wird der Generator zum ersten Mal wieder aufgenommen und ein Iterator zurückgegeben. Die faule Aufgabe wird wiederholt wieder aufgenommen, indem operator++ auf dem zurückgegebenen Iterator aufgerufen wird. Der zurückgegebene Iterator ist gleich dem end -Iterator, wenn der Generator die Ausführung entweder durch anmutiger Ausgang oder eine Ausnahme abschließt. Wie bereits erwähnt, geschieht dies hinter den Kulissen des inneren Mechanismus der Schleife und des Generators und sollte nicht direkt aufgerufen werden.
Wie andere Objekte in Concurrencpp sind Generatoren ein nur Bewegungstyp. Nachdem ein Generator bewegt worden war, wird er als leer angesehen und der Versuch, auf seine inneren Methoden (außer operator bool ) zuzugreifen, wird eine Ausnahme machen. Die Leere eines Generators sollte im Allgemeinen nicht auftreten - es wird empfohlen, Generatoren bei seiner Erstellung in einer for Schleife zu konsumieren und nicht zu versuchen, ihre Methoden einzeln aufzurufen.
generator -API class generator {
/*
Move constructor. After this call, rhs is empty.
*/
generator (generator&& rhs) noexcept ;
/*
Destructor. Invalidates existing iterators.
*/
~generator () noexcept ;
generator ( const generator& rhs) = delete ;
generator& operator =(generator&& rhs) = delete ;
generator& operator =( const generator& rhs) = delete ;
/*
Returns true if this generator is not empty.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Starts running this generator and returns an iterator.
Throws errors::empty_generator if *this is empty.
Re-throws any exception that is thrown inside the generator code.
*/
iterator begin ();
/*
Returns an end iterator.
*/
static generator_end_iterator end () noexcept ;
};
class generator_iterator {
using value_type = std:: remove_reference_t <type>;
using reference = value_type&;
using pointer = value_type*;
using iterator_category = std::input_iterator_tag;
using difference_type = std:: ptrdiff_t ;
/*
Resumes the suspended generator and returns *this.
Re-throws any exception that was thrown inside the generator code.
*/
generator_iterator& operator ++();
/*
Post-increment version of operator++.
*/
void operator ++( int );
/*
Returns the latest value produced by the associated generator.
*/
reference operator *() const noexcept ;
/*
Returns a pointer to the latest value produced by the associated generator.
*/
pointer operator ->() const noexcept ;
/*
Comparision operators.
*/
friend bool operator ==( const generator_iterator& it0, const generator_iterator& it1) noexcept ;
friend bool operator ==( const generator_iterator& it, generator_end_iterator) noexcept ;
friend bool operator ==(generator_end_iterator end_it, const generator_iterator& it) noexcept ;
friend bool operator !=( const generator_iterator& it, generator_end_iterator end_it) noexcept ;
friend bool operator !=(generator_end_iterator end_it, const generator_iterator& it) noexcept ;
};generator : In diesem Beispiel schreiben wir einen Generator, der das n-te Mitglied der Sequenz S(n) = 1 + 2 + 3 + ... + n ergibt, wobei n <= 100 :
concurrencpp::generator< int > sequence () {
int i = 1 ;
int sum = 0 ;
while (i <= 100 ) {
sum += i;
++i;
co_yield sum;
}
}
int main () {
for ( auto value : sequence ()) {
std::cout << value << std::end;
}
return 0 ;
} Regelmäßige synchrone Schlösser können aus mehreren Gründen nicht sicher in Aufgaben verwendet werden:
std::mutex in demselben Ausführungsthread gesperrt und entsperrt werden. Das Entsperren eines synchronen Schloss in einem Faden, das nicht gesperrt war, ist ein undefiniertes Verhalten. Da Aufgaben in einem beliebigen Ausführungsthread suspendiert und wieder aufgenommen werden können, brechen synchrone Schlösser bei Verwendung in Aufgaben. concurrencpp::async_lock löst diese Probleme, indem sie std::mutex eine ähnliche API bereitstellt. Der Hauptunterschied, der auf concurrencpp::async_lock aufruft, wird ein faules Result zurückgeben, das in Aufgaben sicher co_awaited werden kann. Wenn eine Aufgabe versucht, einen Async-Lock zu sperren und fehlschlägt, wird die Aufgabe suspendiert und wird wieder aufgenommen, wenn das Schloss von der suspendierten Aufgabe entsperrt und erworben wird. Dies ermöglicht es den Ausführern, eine große Anzahl von Aufgaben zu erledigen, die darauf warten, ein Schloss ohne teure Kontextabschalt- und teure Kernelanrufe zu erwerben.
Ähnlich wie std::mutex funktioniert, kann zu einem bestimmten Zeitpunkt nur eine Aufgabe async_lock erwerben, und eine Lesebarriere ist zum Zeitpunkt der Erwerben. Durch die Veröffentlichung eines asynchronisierten Sperrens wird eine Schreibbarriere gelegt und ermöglicht es der nächsten Aufgabe, sie zu erwerben, wobei eine Kette von Einmodifier zu einer Zeit erstellt wird, in der die Änderungen, die andere Modifikatoren vorgenommen hatten, und ihre Modifikationen für die nächsten Modifikatoren veröffentlichen.
Wie std::mutex ist concurrencpp::async_lock nicht rekursiv . Beim Erwerb eines solchen Schlosses muss zusätzliche Aufmerksamkeit geschenkt werden - ein Schloss darf in einer Aufgabe, die durch eine andere Aufgabe hervorgerufen wurde, die das Schloss bereits erworben hat, nicht erneut erworben werden. In diesem Fall tritt ein unvermeidbarer toter Lock auf. Im Gegensatz zu anderen Objekten in Concurrencpp ist async_lock weder kopierbar noch beweglich.
Wie bei Standard -Sperren soll concurrencpp::async_lock mit Scoped -Wrappern verwendet werden, die die C ++ -Raii -Idiom nutzen, um sicherzustellen, dass Schlösser bei der Funktionsrendite immer freigeschaltet werden oder eine Ausnahme ausgelöst werden. async_lock::lock gibt eine faule Reseulte eines Scoped-Wrappers zurück, async_lock::unlock auf die Zerstörung aufruft. RAW -Verwendungen von async_lock::unlock sind entmutigt. concurrencpp::scoped_async_lock fungiert als Scoped -Wrapper und bietet eine API, die fast identisch mit std::unique_lock ist. concurrencpp::scoped_async_lock ist beweglich, aber nicht kopierbar.
async_lock::lock UND scoped_async_lock::lock benötigen einen Lebenslauf-Executor als Parameter. Wenn das Schloss aufgerufen wird, ist es gesperrt und die aktuelle Aufgabe wird sofort wieder aufgenommen. Wenn nicht, wird die aktuelle Aufgabe ausgesetzt und wird in dem gegebenen Lebenslauf-Exezutor wieder aufgenommen, wenn das Schloss endgültig erworben wird.
concurrencpp::scoped_async_lock wickelt einen async_lock und sorgt dafür, dass es ordnungsgemäß entsperrt ist. Wie std::unique_lock gibt es Fälle, in denen es kein Schloss einpackt, und in diesem Fall wird es als leer angesehen. Ein leerer scoped_async_lock kann auftreten, wenn es sich bei der standardmäßigen konstruierten, verschobenen oder scoped_async_lock::release -Methode aufgerufen hat. Ein leerer Scoped-Async-Lock schaltet keine Schloss für die Zerstörung frei.
Auch wenn der Scoped-Async-Lock nicht leer ist, bedeutet dies nicht, dass er das zugrunde liegende Async-Lock besitzt und sie bei der Zerstörung entsperren wird. Nicht-leere und nicht besitzende Scoped-async-Sperren können auftreten, wenn scoped_async_lock::unlock genannt wurde oder der Scoped-async-Lock mit scoped_async_lock(async_lock&, std::defer_lock_t) konstruktor konstruiert wurde.
async_lock API class async_lock {
/*
Constructs an async lock object.
*/
async_lock () noexcept ;
/*
Destructs an async lock object.
*this is not automatically unlocked at the moment of destruction.
*/
~async_lock () noexcept ;
/*
Asynchronously acquires the async lock.
If *this has already been locked by another non-parent task, the current task will be suspended
and will be resumed when *this is acquired, inside resume_executor.
If *this has not been locked by another task, then *this will be acquired and the current task will be resumed
immediately in the calling thread of execution.
If *this has already been locked by a parent task, then unavoidable dead-lock will occur.
Throws std::invalid_argument if resume_executor is null.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result<scoped_async_lock> lock (std::shared_ptr<executor> resume_executor);
/*
Tries to acquire *this in the calling thread of execution.
Returns true if *this is acquired, false otherwise.
In any case, the current task is resumed immediately in the calling thread of execution.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result< bool > try_lock ();
/*
Releases *this and allows other tasks (including suspended tasks waiting for *this) to acquire it.
Throws std::system error if *this is not locked at the moment of calling this method.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
void unlock ();
};scoped_async_lock api class scoped_async_lock {
/*
Constructs an async lock wrapper that does not wrap any async lock.
*/
scoped_async_lock () noexcept = default ;
/*
If *this wraps async_lock, this method releases the wrapped lock.
*/
~scoped_async_lock () noexcept ;
/*
Moves rhs to *this.
After this call, *rhs does not wrap any async lock.
*/
scoped_async_lock (scoped_async_lock&& rhs) noexcept ;
/*
Wrapps unlocked lock.
lock must not be in acquired mode when calling this method.
*/
scoped_async_lock (async_lock& lock, std:: defer_lock_t ) noexcept ;
/*
Wrapps locked lock.
lock must be already acquired when calling this method.
*/
scoped_async_lock (async_lock& lock, std:: adopt_lock_t ) noexcept ;
/*
Calls async_lock::lock on the wrapped locked, using resume_executor as a parameter.
Throws std::invalid_argument if resume_executor is nulll.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::lock throws.
*/
lazy_result< void > lock (std::shared_ptr<executor> resume_executor);
/*
Calls async_lock::try_lock on the wrapped lock.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::try_lock throws.
*/
lazy_result< bool > try_lock ();
/*
Calls async_lock::unlock on the wrapped lock.
If *this does not wrap any lock, this method does nothing.
Throws std::system_error if *this wraps a lock and it is not locked.
*/
void unlock ();
/*
Checks whether *this wraps a locked mutex or not.
Returns true if wrapped locked is in acquired state, false otherwise.
*/
bool owns_lock () const noexcept ;
/*
Equivalent to owns_lock.
*/
explicit operator bool () const noexcept ;
/*
Swaps the contents of *this and rhs.
*/
void swap (scoped_async_lock& rhs) noexcept ;
/*
Empties *this and returns a pointer to the previously wrapped lock.
After a call to this method, *this doesn't wrap any lock.
The previously wrapped lock is not released,
it must be released by either unlocking it manually through the returned pointer or by
capturing the pointer with another scoped_async_lock which will take ownerwhip over it.
*/
async_lock* release () noexcept ;
/*
Returns a pointer to the wrapped async_lock, or a null pointer if there is no wrapped async_lock.
*/
async_lock* mutex () const noexcept ;
};async_lock -Beispiel: In diesem Beispiel schieben wir 10.000.000 Ganzzahlen gleichzeitig aus verschiedenen Aufgaben zu einem std::vector Vektorobjekt, während wir async_lock verwenden, um sicherzustellen, dass keine Datenrennen auftreten und die Richtigkeit des internen Zustands dieses Vektorobjekts erhalten bleibt.
# include " concurrencpp/concurrencpp.h "
# include < vector >
# include < iostream >
std::vector< size_t > numbers;
concurrencpp::async_lock lock;
concurrencpp::result< void > add_numbers (concurrencpp::executor_tag,
std::shared_ptr<concurrencpp::executor> executor,
size_t begin,
size_t end) {
for ( auto i = begin; i < end; i++) {
concurrencpp::scoped_async_lock raii_wrapper = co_await lock. lock (executor);
numbers. push_back (i);
}
}
int main () {
concurrencpp::runtime runtime;
constexpr size_t range = 10'000'000 ;
constexpr size_t sections = 4 ;
concurrencpp::result< void > results[sections];
for ( size_t i = 0 ; i < 4 ; i++) {
const auto range_start = i * range / sections;
const auto range_end = (i + 1 ) * range / sections;
results[i] = add_numbers ({}, runtime. thread_pool_executor (), range_start, range_end);
}
for ( auto & result : results) {
result. get ();
}
std::cout << " vector size is " << numbers. size () << std::endl;
// make sure the vector state has not been corrupted by unprotected concurrent accesses
std::sort (numbers. begin (), numbers. end ());
for ( size_t i = 0 ; i < range; i++) {
if (numbers[i] != i) {
std::cerr << " vector state is corrupted. " << std::endl;
return - 1 ;
}
}
std::cout << " succeeded pushing range [0 - 10,000,000] concurrently to the vector! " << std::endl;
return 0 ;
} async_condition_variable imitiert die condition_variable und kann mit Aufgaben neben async_lock sicher verwendet werden. async_condition_variable funktioniert mit async_lock um eine Aufgabe zu suspendieren, bis sich ein gemeinsamer Speicher (durch die Sperre geschützt) geändert hat. Aufgaben, die gemeinsame Speicheränderungen überwachen möchten, sperren eine Instanz von async_lock und rufen Sie async_condition_variable::await . Dadurch wird die Sperre atomisch entsperren und die aktuelle Aufgabe suspendieren, bis eine Modifikator -Aufgabe die Bedingungsvariable benachrichtigt. Eine Modifikator -Aufgabe erfasst die Sperre, verändert den freigegebenen Speicher, entsperren die Sperre und rufen Sie entweder notify_one oder notify_all auf. Wenn eine suspendierte Aufgabe wieder aufgenommen wird (unter Verwendung des Lebenslauf -Ausführers, der await wurde), sperrt sie das Schloss erneut ab, sodass die Aufgabe vom Sperrungspunkt nahtlos fortgesetzt werden kann. async_condition_variable ist wie async_lock weder beweglich noch kopierbar - es soll an einem Ort erstellt und durch mehrere Aufgaben zugegriffen werden.
async_condition_variable::await überlaste überlast erfordern einen Lebenslauf-Exezutor, mit dem die Aufgabe wieder aufgenommen wird, und einen gesperrten scoped_async_lock . async_condition_variable::await wird mit zwei Überladungen geliefert - eine, die ein Prädikat akzeptiert und eine, die dies nicht tut. Die Überlastung, die kein Prädikat akzeptiert, wird die Anrufaufgabe sofort nach dem Aufruf aussetzen, bis sie durch einen Anruf auf notify_* wieder aufgenommen wird. Die Überlastung, die ein Prädikat akzeptiert, funktioniert, indem das Prädikat den gemeinsam genutzten Speicher inspizieren und die Aufgabe wiederholt suspendiert, bis das gemeinsame Speicher seinen gesuchten Zustand erreicht hat. Schematisch funktioniert es wie Anrufe
while (!pred()) { // pred() inspects the shared memory and returns true or false
co_await await (resume_executor, lock); // suspend the current task until another task calls `notify_xxx`
} Genau wie bei der Standardbedingungsvariable werden Anwendungen aufgefordert, die Prädikat-Überladung zu verwenden, da sie eine feinkörnige Steuerung über Suspensionen und Wiederversuche ermöglicht. async_condition_variable kann verwendet werden, um gleichzeitige Sammlungen und Datenstrukturen wie gleichzeitige Warteschlangen und Kanäle zu schreiben.
Innen hält async_condition_variable eine Suspensionsqueue, in der sich die Aufgaben selbst erfüllen, wenn sie auf die zu benachbarte Bedingungsvariable warten. Wenn eine von notify_* -Methoden aufgerufen wird, wird je nach aufgerufener Methode entweder eine Aufgabe oder alle Aufgaben bezeichnet. Aufgaben werden auf fifo aus der Suspensionsqueue entleert. Wenn beispielsweise Aufgabe A Anrufe await und dann auf Task -B -Aufrufe await , wird die Aufgabe C -Anrufe notify_one , dann die interne Aufgabe A dequed und wieder aufgenommen. Aufgabe B bleibt suspendiert, bis ein anderer Anruf bei notify_one oder notify_all aufgerufen wird. Wenn Aufgabe A und Aufgabe B suspendiert sind und Task C -Aufrufe notify_all sind, werden beide Aufgaben dequed und wieder aufgenommen.
async_condition_variable API class async_condition_variable {
/*
Constructor.
*/
async_condition_variable () noexcept ;
/*
Atomically releases lock and suspends the current task by adding it to *this suspension-queue.
Throws std::invalid_argument if resume_executor is null.
Throws std::invalid_argument if lock is not locked at the moment of calling this method.
Might throw std::system_error if the underlying std::mutex throws.
*/
lazy_result< void > await (std::shared_ptr<executor> resume_executor, scoped_async_lock& lock);
/*
Equivalent to:
while (!pred()) {
co_await await(resume_executor, lock);
}
Might throw any exception that await(resume_executor, lock) might throw.
Might throw any exception that pred might throw.
*/
template < class predicate_type >
lazy_result< void > await (std::shared_ptr<executor> resume_executor, scoped_async_lock& lock, predicate_type pred);
/*
Dequeues one task from *this suspension-queue and resumes it, if any available at the moment of calling this method.
The suspended task is resumed by scheduling it to run on the executor given when await was called.
Might throw std::system_error if the underlying std::mutex throws.
*/
void notify_one ();
/*
Dequeues all tasks from *this suspension-queue and resumes them, if any available at the moment of calling this method.
The suspended tasks are resumed by scheduling them to run on the executors given when await was called.
Might throw std::system_error if the underlying std::mutex throws.
*/
void notify_all ();
};async_condition_variable Beispiel: In diesem Beispiel arbeiten async_lock und async_condition_variable zusammen, um eine gleichzeitige Warteschlange zu implementieren, mit der Daten (in diesem Beispiel Ganzzahlen) zwischen Aufgaben gesendet werden können. Beachten Sie, dass einige Methoden ein result zurückgeben, während eine andere Rückgabe lazy_result und zeigt, wie sowohl eifrige als auch faule Aufgaben zusammenarbeiten können.
# include " concurrencpp/concurrencpp.h "
# include < queue >
# include < iostream >
using namespace concurrencpp ;
class concurrent_queue {
private:
async_lock _lock;
async_condition_variable _cv;
std::queue< int > _queue;
bool _abort = false ;
public:
concurrent_queue () = default ;
result< void > shutdown (std::shared_ptr<executor> resume_executor) {
{
auto guard = co_await _lock. lock (resume_executor);
_abort = true ;
}
_cv. notify_all ();
}
lazy_result< void > push (std::shared_ptr<executor> resume_executor, int i) {
{
auto guard = co_await _lock. lock (resume_executor);
_queue. push (i);
}
_cv. notify_one ();
}
lazy_result< int > pop (std::shared_ptr<executor> resume_executor) {
auto guard = co_await _lock. lock (resume_executor);
co_await _cv. await (resume_executor, guard, [ this ] {
return _abort || !_queue. empty ();
});
if (!_queue. empty ()) {
auto result = _queue. front ();
_queue. pop ();
co_return result;
}
assert (_abort);
throw std::runtime_error ( " queue has been shut down. " );
}
};
result< void > producer_loop (executor_tag,
std::shared_ptr<thread_pool_executor> tpe,
concurrent_queue& queue,
int range_start,
int range_end) {
for (; range_start < range_end; ++range_start) {
co_await queue. push (tpe, range_start);
}
}
result< void > consumer_loop (executor_tag, std::shared_ptr<thread_pool_executor> tpe, concurrent_queue& queue) {
try {
while ( true ) {
std::cout << co_await queue. pop (tpe) << std::endl;
}
} catch ( const std:: exception & e) {
std::cerr << e. what () << std::endl;
}
}
int main () {
runtime runtime;
const auto thread_pool_executor = runtime. thread_pool_executor ();
concurrent_queue queue;
result< void > producers[ 4 ];
result< void > consumers[ 4 ];
for ( int i = 0 ; i < 4 ; i++) {
producers[i] = producer_loop ({}, thread_pool_executor, queue, i * 5 , (i + 1 ) * 5 );
}
for ( int i = 0 ; i < 4 ; i++) {
consumers[i] = consumer_loop ({}, thread_pool_executor, queue);
}
for ( int i = 0 ; i < 4 ; i++) {
producers[i]. get ();
}
queue. shutdown (thread_pool_executor). get ();
for ( int i = 0 ; i < 4 ; i++) {
consumers[i]. get ();
}
return 0 ;
} Das Concurrencpp Runtime -Objekt ist der Agent, der zum Erwerb, Speichern und Erstellen neuer Führungskräfte verwendet wird.
Die Laufzeit muss als Werttyp erstellt werden, sobald die Hauptfunktion ausgeführt wird. Wenn die Runtime Concurrencpp aus dem Zielfernrohr herauskommt, iteriert sie über die gespeicherten Testamentsvollstrecker und schließt sie einzeln, indem sie executor::shutdown anruft. Die Executoren beenden dann ihre innere Arbeitsschleife, und jeder nachfolgende Versuch, eine neue Aufgabe zu planen, wird eine Ausnahme von concurrencpp::runtime_shutdown ausführen. Die Laufzeit enthält auch die globale Timer -Warteschlange zum Erstellen von Timern und Verzögerung von Objekten. Bei der Zerstörung zerstören gespeicherte Testamentsvollstrecker unerkannte Aufgaben und warten auf die laufenden Aufgaben. Wenn eine laufende Aufgabe versucht, einen Testamentsvollstrecker zu verwenden, um neue Aufgaben zu erzeugen oder eine eigene Aufgabe -Fortsetzung zu planen, wird eine Ausnahme ausgelöst. In diesem Fall müssen laufende Aufgaben so schnell wie möglich aufhören, damit deren zugrunde liegenden Testamentsvollstrecker aufhören können. Die Timer -Warteschlange wird ebenfalls heruntergefahren und storniert alle laufenden Timer. Mit diesem Raii -Codestil können vor der Erstellung des Laufzeitobjekts keine Aufgaben bearbeitet werden, und während/nach der Laufzeit aus dem Spielraum herauskommt. Dies befreit gleichzeitige Anwendungen, um die Beendigung von Nachrichten explizit zu kommunizieren. Aufgaben sind freie Nutzungsparanten, solange das Laufzeitobjekt lebt.
runtime -API class runtime {
/*
Creates a runtime object with default options.
*/
runtime ();
/*
Creates a runtime object with user defined options.
*/
runtime ( const concurrencpp::runtime_options& options);
/*
Destroys this runtime object.
Calls executor::shutdown on each monitored executor.
Calls timer_queue::shutdown on the global timer queue.
*/
~runtime () noexcept ;
/*
Returns this runtime timer queue used to create new times.
*/
std::shared_ptr<concurrencpp::timer_queue> timer_queue () const noexcept ;
/*
Returns this runtime concurrencpp::inline_executor
*/
std::shared_ptr<concurrencpp::inline_executor> inline_executor () const noexcept ;
/*
Returns this runtime concurrencpp::thread_pool_executor
*/
std::shared_ptr<concurrencpp::thread_pool_executor> thread_pool_executor () const noexcept ;
/*
Returns this runtime concurrencpp::background_executor
*/
std::shared_ptr<concurrencpp::thread_pool_executor> background_executor () const noexcept ;
/*
Returns this runtime concurrencpp::thread_executor
*/
std::shared_ptr<concurrencpp::thread_executor> thread_executor () const noexcept ;
/*
Creates a new concurrencpp::worker_thread_executor and registers it in this runtime.
Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
*/
std::shared_ptr<concurrencpp::worker_thread_executor> make_worker_thread_executor ();
/*
Creates a new concurrencpp::manual_executor and registers it in this runtime.
Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
*/
std::shared_ptr<concurrencpp::manual_executor> make_manual_executor ();
/*
Creates a new user defined executor and registers it in this runtime.
executor_type must be a valid concrete class of concurrencpp::executor.
Might throw std::bad_alloc if no memory is available.
Might throw any exception that the constructor of <<executor_type>> might throw.
*/
template < class executor_type , class ... argument_types>
std::shared_ptr<executor_type> make_executor (argument_types&& ... arguments);
/*
returns the version of concurrencpp that the library was built with.
*/
static std::tuple< unsigned int , unsigned int , unsigned int > version () noexcept ;
}; In einigen Fällen sind Anwendungen daran interessiert, die Erstellung und Beendigung der Threads zu überwachen. Einige Speicherallocatoren benötigen beispielsweise neue Threads, die bei ihrer Erstellung und Beendigung registriert und nicht registriert werden müssen. Die Concurrencpp -Laufzeit ermöglicht das Einstellen eines Rückrufs von Thread -Erstellungen und einen Rückruf von Thread -Beendigungen. Diese Rückrufe werden aufgerufen, wenn einer der Concurrencpp -Mitarbeiter einen neuen Thread erstellt und wenn dieser Thread endet. Diese Rückrufe werden immer aus dem erstellten/ std::this_thread::get_id Thread aufgerufen. Die Signatur dieser Rückrufe ist void callback (std::string_view thread_name) . thread_name ist ein Concurrencpp -spezifischer Titel, der dem Thread angegeben wird und bei einigen Debuggern, die den Threadnamen präsentieren, beobachtet werden kann. Der Thread -Name ist garantiert nicht einzigartig und sollte zum Protokollieren und Debuggen verwendet werden.
Um einen Thread-Creation-Rückruf und/oder einen Thread-Abschluss-Rückruf festzulegen, können Anwendungen die thread_started_callback und/oder thread_terminated_callback Mitglieder der runtime_options festlegen, die an den Laufzeitkonstruktor übergeben werden. Da diese Rückrufe an jeden Concurrencpp -Arbeiter kopiert werden, der Threads erzeugt, müssen diese Rückrufe kopierbar sein.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::runtime_options options;
options. thread_started_callback = [](std::string_view thread_name) {
std::cout << " A new thread is starting to run, name: " << thread_name << " , thread id: " << std::this_thread::get_id ()
<< std::endl;
};
options. thread_terminated_callback = [](std::string_view thread_name) {
std::cout << " A thread is terminating, name: " << thread_name << " , thread id: " << std::this_thread::get_id () << std::endl;
};
concurrencpp::runtime runtime (options);
const auto timer_queue = runtime. timer_queue ();
const auto thread_pool_executor = runtime. thread_pool_executor ();
concurrencpp::timer timer =
timer_queue-> make_timer ( std::chrono::milliseconds ( 100 ), std::chrono::milliseconds ( 500 ), thread_pool_executor, [] {
std::cout << " A timer callable is executing " << std::endl;
});
std::this_thread::sleep_for ( std::chrono::seconds ( 3 ));
return 0 ;
}Mögliche Ausgabe:
A new thread is starting to run, name: concurrencpp::timer_queue worker, thread id: 7496
A new thread is starting to run, name: concurrencpp::thread_pool_executor worker, thread id: 21620
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A thread is terminating, name: concurrencpp::timer_queue worker, thread id: 7496
A thread is terminating, name: concurrencpp::thread_pool_executor worker, thread id: 21620
Anwendungen können ihren eigenen benutzerdefinierten Ausführungs -Typ erstellen, indem die Klasse derivable_executor erbt. Bei der Implementierung von benutzerdefinierten Ausführern sind einige Punkte zu berücksichtigen: Das Wichtigste ist, sich daran zu erinnern, dass Executoren aus mehreren Threads verwendet werden, sodass implementierte Methoden mit Thread-sicher sein müssen.
Neue Executoren können mit runtime::make_executor erstellt werden. Bewerbungen dürfen keine neuen Executoren mit einfacher Instanziierung erstellen (z. B. std::make_shared oder einfach new ), nur mit runtime::make_executor . Anwendungen dürfen auch nicht versuchen, die integrierten ConcurRencpp-Ausführende wie der thread_pool_executor oder den thread_executor neu zu stantieren, auf die diese Executoren nur über ihre vorhandenen Instanzen im Laufzeitobjekt zugegriffen werden müssen.
Ein weiterer wichtiger Punkt besteht darin, das Abschalten korrekt zu verarbeiten: shutdown , shutdown_requested und enqueue sollten den Ausführungsstatus überwachen und sich beim Aufrufen entsprechend verhalten:
shutdown sollte die zugrunde liegenden Threads angeben, um zu beenden und sich ihnen dann anzuschließen.shutdown kann mehrmals aufgerufen werden, und die Methode muss dieses Szenario verarbeiten, indem nachfolgende Anrufe nach dem ersten shutdown ignoriert werden.enqueue muss eine concurrencpp::errors::runtime_shutdown -Ausnahme auswerfen, wenn shutdown zuvor angerufen worden war. task Die Implementierung von Ausführern ist einer der seltenen Fälle, in denen Anwendungen direkt mit concurrencpp::task -Klasse arbeiten müssen. concurrencpp::task ist eine std::function wie Objekt, jedoch mit einigen Unterschieden. Wie std::function speichert das Task -Objekt einen Callable, der als asynchrone Operation fungiert. Im Gegensatz zu std::function ist task nur ein Bewegungstyp. Auf dem Aufruf erhalten Aufgabenobjekte keine Parameter und geben void zurück. Darüber hinaus kann jedes Task -Objekt nur einmal aufgerufen werden. Nach dem ersten Aufruf wird das Aufgabenobjekt leer. Das Aufrufen eines leeren Aufgabenobjekts entspricht dem Aufrufen eines leeren Lambda ( []{} ) und wird keine Ausnahme ausgeben. Taskobjekte empfangen ihre Callable als Weiterleitungsreferenz ( type&& wobei type ein Vorlagenparameter ist) und nicht durch Kopie (wie std::function ). Der Bau des gespeicherten Anrufs erfolgt an Ort und Stelle. Auf diese Weise können Aufgabenobjekte Anrufbel enthalten, die nur für Bewegungsstoffe enthält (wie std::unique_ptr und concurrencpp::result ). Taskobjekte versuchen, verschiedene Methoden zu verwenden, um die Verwendung der gespeicherten Typen zu optimieren. Auf Taskobjekte wenden die Kurz-Buffer-Optimierung (SBO) für normale, kleine Anrufläte an und werden Anrufe an std::coroutine_handle<void> indem sie sie direkt ohne virtuelle Dispatch anrufen.
task -API class task {
/*
Creates an empty task object.
*/
task () noexcept ;
/*
Creates a task object by moving the stored callable of rhs to *this.
If rhs is empty, then *this will also be empty after construction.
After this call, rhs is empty.
*/
task (task&& rhs) noexcept ;
/*
Creates a task object by storing callable in *this.
<<typename std::decay<callable_type>::type>> will be in-place-
constructed inside *this by perfect forwarding callable.
*/
template < class callable_type >
task (callable_type&& callable);
/*
Destroys stored callable, does nothing if empty.
*/
~task () noexcept ;
/*
If *this is empty, does nothing.
Invokes stored callable, and immediately destroys it.
After this call, *this is empty.
May throw any exception that the invoked callable may throw.
*/
void operator ()();
/*
Moves the stored callable of rhs to *this.
If rhs is empty, then *this will also be empty after this call.
If *this already contains a stored callable, operator = destroys it first.
*/
task& operator =(task&& rhs) noexcept ;
/*
If *this is not empty, task::clear destroys the stored callable and empties *this.
If *this is empty, clear does nothing.
*/
void clear () noexcept ;
/*
Returns true if *this stores a callable. false otherwise.
*/
explicit operator bool () const noexcept ;
/*
Returns true if *this stores a callable,
and that stored callable has the same type as <<typename std::decay<callable_type>::type>>
*/
template < class callable_type >
bool contains () const noexcept ;
}; Bei der Implementierung von benutzerdefinierten Ausführern liegt es in der Implementierung, task zu speichern (wenn enqueue aufgerufen wird), und sie gemäß dem Inner-Mechanismus des Testamentsausführers ausführen.
In diesem Beispiel erstellen wir einen Executor, der Aktionen wie Enqueuching -Aufgaben oder ausführen. Wir implementieren die executor -Schnittstelle und bitten die Laufzeit, um eine Instanz davon zu erstellen und zu speichern, indem wir runtime::make_executor anrufen. Der Rest der Anwendung verhält sich genauso, als ob wir nicht benutzerdefinierte Ausführende verwenden würden.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < queue >
# include < thread >
# include < mutex >
# include < condition_variable >
class logging_executor : public concurrencpp ::derivable_executor<logging_executor> {
private:
mutable std::mutex _lock;
std::queue<concurrencpp::task> _queue;
std::condition_variable _condition;
bool _shutdown_requested;
std::thread _thread;
const std::string _prefix;
void work_loop () {
while ( true ) {
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
return ;
}
if (!_queue. empty ()) {
auto task = std::move (_queue. front ());
_queue. pop ();
lock. unlock ();
std::cout << _prefix << " A task is being executed " << std::endl;
task ();
continue ;
}
_condition. wait (lock, [ this ] {
return !_queue. empty () || _shutdown_requested;
});
}
}
public:
logging_executor (std::string_view prefix) :
derivable_executor<logging_executor>( " logging_executor " ),
_shutdown_requested ( false ),
_prefix (prefix) {
_thread = std::thread ([ this ] {
work_loop ();
});
}
void enqueue (concurrencpp::task task) override {
std::cout << _prefix << " A task is being enqueued! " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
throw concurrencpp::errors::runtime_shutdown ( " logging executor - executor was shutdown. " );
}
_queue. emplace ( std::move (task));
_condition. notify_one ();
}
void enqueue (std::span<concurrencpp::task> tasks) override {
std::cout << _prefix << tasks. size () << " tasks are being enqueued! " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
throw concurrencpp::errors::runtime_shutdown ( " logging executor - executor was shutdown. " );
}
for ( auto & task : tasks) {
_queue. emplace ( std::move (task));
}
_condition. notify_one ();
}
int max_concurrency_level () const noexcept override {
return 1 ;
}
bool shutdown_requested () const noexcept override {
std::unique_lock<std::mutex> lock (_lock);
return _shutdown_requested;
}
void shutdown () noexcept override {
std::cout << _prefix << " shutdown requested " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) return ; // nothing to do.
_shutdown_requested = true ;
lock. unlock ();
_condition. notify_one ();
_thread. join ();
}
};
int main () {
concurrencpp::runtime runtime;
auto logging_ex = runtime. make_executor <logging_executor>( " Session #1234 " );
for ( size_t i = 0 ; i < 10 ; i++) {
logging_ex-> post ([] {
std::cout << " hello world " << std::endl;
});
}
std::getchar ();
return 0 ;
}$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S . -B build /lib
$ cmake -- build build /lib --config Release$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S test -B build / test
$ cmake -- build build / test
< # for release mode: cmake --build build/test --config Release #>
$ cd build / test
$ ctest . -V -C Debug
< # for release mode: ctest . -V -C Release #> $ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -DCMAKE_BUILD_TYPE=Release -S . -B build /lib
$ cmake -- build build /lib
#optional, install the library: sudo cmake --install build/lib Mit Clang und GCC ist es auch möglich, die Tests mit TSAN (Thread Desinfizierer) zu unterstützen.
$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S test -B build / test
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S test -B build/test
#for TSAN mode: cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_THREAD_SANITIZER=Yes -S test -B build/test
$ cmake -- build build / test
$ cd build / test
$ ctest . -V Beim Kompilieren unter Linux versucht die Bibliothek standardmäßig libstdc++ . Wenn Sie libc++ als Standardbibliotheksimplementierung verwenden möchten, sollte CMAKE_TOOLCHAIN_FILE -Flag wie unten angegeben werden:
$ cmake -DCMAKE_TOOLCHAIN_FILE=../cmake/libc++.cmake -DCMAKE_BUILD_TYPE=Release -S . -B build /libAlternativ zum manuellen Aufbau und die Installation der Bibliothek erhalten Entwickler möglicherweise über die VCPKG- und Conan -Paketverwalter stabile Veröffentlichungen von Concurrencpp:
vcpkg:
$ vcpkg install concurrencppConan: Concurrencpp auf Conancenter
Concurrencpp verfügt über ein integriertes Sandbox-Programm, das Entwickler ändern und experimentieren können, ohne die kompilierte Bibliothek installieren oder mit einer anderen Code-Basis verknüpfen zu müssen. Um mit der Sandbox zu spielen, können Entwickler sandbox/main.cpp ändern und die Anwendung mit den folgenden Befehlen kompilieren:
$ cmake -S sandbox -B build /sandbox
$ cmake -- build build /sandbox
< # for release mode: cmake --build build/sandbox --config Release #>
$ ./ build /sandbox < # runs the sandbox> $ cmake -S sandbox -B build /sandbox
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S sandbox -B build/sandbox
$ cmake -- build build /sandbox
$ ./ build /sandbox #runs the sandbox