ConcurrenCPP는 동시 작업의 힘을 C ++ 세계에 제공하여 개발자가 작업, 집행자 및 코 루틴을 사용하여 쉽고 안전하게 응용 프로그램을 쉽고 안전하게 작성할 수 있도록합니다. ConcurrenCPP 애플리케이션을 사용하면 동시에 실행되는 작은 작업으로 비동기 적으로 처리 해야하는 큰 절차를 동시에 처리하고 원하는 결과를 달성하기 위해 협력적인 방식으로 작동해야합니다. ConcurrenCPP를 사용하면 응용 프로그램이 병렬 코 루틴을 사용하여 병렬 알고리즘을 쉽게 작성할 수 있습니다.
ConsurenCPP 주요 장점은 다음과 같습니다.
std::thread 및 std::mutex 와 같은 낮은 레벨 프리미티브 대신 더 높은 레벨 작업을 사용하여 현대 동시 코드 작성.co_await 키워드를 사용하여 비 차단, 동기식 같은 코드를 쉽게 달성합니다.executor APIthread_pool_executor APImanual_executor APIresult 유형result APIlazy_result 유형lazy_result APIresult_promise APIresult_promise 예제shared_result APIshared_result 예제make_ready_resultmake_exceptional_resultwhen_allwhen_anyresume_ontimer_queue APItimer APIgenerator APIgenerator 예제async_lock apiscoped_async_lock apiasync_lock 예제async_condition_variable apiasync_condition_variable 예제runtime APItask 객체task APIConcurrenCPP는 동시 작업의 개념을 중심으로 구축되었습니다. 작업은 비동기 작업입니다. 작업은 기존 스레드 중심 접근 방식보다 동시 코드에 대해 더 높은 수준의 추상화를 제공합니다. 작업은 함께 묶을 수 있습니다. 즉, 작업이 비동기 결과를 상대적으로 전달하여 한 작업의 결과가 마치 다른 진행중인 작업의 매개 변수 또는 중간 값 인 것처럼 사용됩니다. 작업을 통해 응용 프로그램은 사용 가능한 하드웨어 리소스를 더 좋게 활용하고 원시 스레드를 사용하는 것보다 훨씬 더 많은 규모를 확장 할 수 있습니다. 작업은 중단 될 수 있기 때문에 다른 작업을 기다리는 다른 작업을 기다리며 기본 OS- 스레드를 차단하지 않고 결과를 생성합니다. 작업은 스레드 관리 및 스레드 간 동기화와 같은 낮은 수준의 개념에 더 집중할 수있게함으로써 개발자에게 훨씬 더 많은 생산성을 제공합니다.
작업은 실행해야 할 조치 를 지정하지만 집행자는 작업을 수행 할 위치 및 방법을 지정하는 작업자 객체입니다. 실행자는 응용 프로그램을 예비 스레드 풀 및 작업 대기열의 지루한 관리입니다. 또한 집행자들은 작업을 작성하고 예약하기위한 통합 API를 제공함으로써 이러한 개념을 응용 프로그램 코드에서 멀어지게합니다.
작업은 결과 객체를 사용하여 서로 통신합니다. 결과 객체는 한 작업의 비동기 결과를 다른 작업 경로로 전달하는 비동기 파이프입니다. 결과는 차단되지 않은 방식으로 기다릴 수 있습니다.
이 세 가지 개념 - 작업, 집행자 및 관련 결과는 ConcurrenCPP의 빌딩 블록입니다. 집행자는 결과 객체를 통해 결과를 보내서 서로 통신하는 작업을 실행합니다. 작업, 집행자 및 결과 객체는 공생 적으로 협력하여 빠르고 깨끗한 동시 코드를 생성합니다.
ConcurrenCPP는 RAII 개념을 중심으로 구축되었습니다. 작업 및 집행자를 사용하기 위해 응용 프로그램은 main 기능의 시작 부분에서 runtime 인스턴스를 만듭니다. 그런 다음 런타임은 기존의 집행자를 획득하고 새로운 사용자 정의 집행자를 등록하는 데 사용됩니다. 집행자는 실행하기 위해 작업을 작성하고 일정을 잡는 데 사용되며, 비동기 결과를 소비자 역할을하는 다른 작업에 비동기 결과를 전달하는 데 사용할 수있는 result 객체를 반환 할 수 있습니다. 런타임이 파괴되면 저장된 모든 집행자를 반복하여 shutdown 방법을 호출합니다. 그런 다음 모든 집행자는 우아하게 나옵니다. 예약되지 않은 작업이 파괴되며 새로운 작업을 만들려고 시도하면 예외가 발생합니다.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::runtime runtime;
auto result = runtime. thread_executor ()-> submit ([] {
std::cout << " hello world " << std::endl;
});
result. get ();
return 0 ;
} 이 기본 예에서 런타임 객체를 만들고 런타임에서 스레드 executor를 획득했습니다. 우리는 submit 사용하여 람다를 우리의 부름 가능으로 통과했습니다. 이 Lambda는 void 반환하므로 집행자는 비동기 결과를 발신자에게 다시 전달하는 result<void> 객체를 반환합니다. main 통화는 결과가 준비 될 때까지 메인 스레드를 차단하는 것을 get . 예외가 발생하지 않으면 반품을 void get . 예외가 발생하면 다시 던지 get . 비동기 적으로, thread_executor 새로운 실행 스레드를 시작하고 주어진 Lambda를 실행합니다. 암시 적으로 co_return void 되고 작업이 완료되었습니다. 그런 다음 main 차단 해제됩니다.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < vector >
# include < algorithm >
# include < ctime >
using namespace concurrencpp ;
std::vector< int > make_random_vector () {
std::vector< int > vec ( 64 * 1'024 );
std::srand ( std::time ( nullptr ));
for ( auto & i : vec) {
i = :: rand ();
}
return vec;
}
result< size_t > count_even (std::shared_ptr<thread_pool_executor> tpe, const std::vector< int >& vector) {
const auto vecor_size = vector. size ();
const auto concurrency_level = tpe-> max_concurrency_level ();
const auto chunk_size = vecor_size / concurrency_level;
std::vector<result< size_t >> chunk_count;
for ( auto i = 0 ; i < concurrency_level; i++) {
const auto chunk_begin = i * chunk_size;
const auto chunk_end = chunk_begin + chunk_size;
auto result = tpe-> submit ([&vector, chunk_begin, chunk_end]() -> size_t {
return std::count_if (vector. begin () + chunk_begin, vector. begin () + chunk_end, []( auto i) {
return i % 2 == 0 ;
});
});
chunk_count. emplace_back ( std::move (result));
}
size_t total_count = 0 ;
for ( auto & result : chunk_count) {
total_count += co_await result;
}
co_return total_count;
}
int main () {
concurrencpp::runtime runtime;
const auto vector = make_random_vector ();
auto result = count_even (runtime. thread_pool_executor (), vector);
const auto total_count = result. get ();
std::cout << " there are " << total_count << " even numbers in the vector " << std::endl;
return 0 ;
} 이 예에서는 런타임 객체를 만들어 프로그램을 시작합니다. 우리는 랜덤 숫자로 채워진 벡터를 만듭니다. 그런 다음 런타임에서 thread_pool_executor 획득하고 count_even 호출합니다. count_even 더 많은 작업을 스폰하고 내부를 마무리하기 위해 co_await 를 스폰하는 코 루틴입니다. max_concurrency_level 실행자가 지원하는 최대의 근로자를 반환합니다. 실행자 실행자의 경우 근로자 수는 코어 수에서 계산됩니다. 그런 다음 배열을 분할하여 작업자 수와 일치하고 자체 작업으로 처리 할 모든 청크를 보냅니다. 비동기 적으로, 근로자는 각 청크에 포함 된 짝수의 숫자 수를 세고 결과를 co_return . count_even co_await 사용하여 수를 가져 와서 모든 결과를 합산하면 최종 결과는 co_return ed입니다. get 호출하여 차단 된 기본 스레드는 차단 해제되고 총 카운트가 반환됩니다. 메인은 짝수의 숫자를 인쇄하고 프로그램은 우아하게 종료됩니다.
모든 크거나 복잡한 작업은 더 작고 체인 가능한 단계로 분해 될 수 있습니다. 작업은 이러한 계산 단계를 구현하는 비동기 작업입니다. 실행자의 도움으로 작업은 어디에서나 실행할 수 있습니다. 일반 콜블 가능 (예 : 기능 및 Lambdas)에서 작업을 만들 수 있지만 작업은 주로 코 루틴과 함께 사용되므로 원활한 서스펜션 및 재개를 허용합니다. ConcurrenCPP에서 작업 개념은 concurrencpp::task 클래스로 표시됩니다. 작업 개념은 ConcurRenpp의 핵심이지만 응용 프로그램은 외부 도움없이 런타임에 의해 생성되고 예약되므로 작업 객체 자체를 생성하고 조작 할 필요는 없습니다.
ConsurenCPP를 사용하면 응용 프로그램이 작업 생성의 주요 방법으로 코 루틴을 생성하고 소비 할 수 있습니다. ConcurrenCPP는 열렬하고 게으른 작업을 모두 지원합니다.
열렬한 과제는 호출되는 순간을 시작합니다. 이 유형의 실행은 응용 프로그램이 비동기 동작을 발사하고 나중에 결과를 소비 해야하는 경우 (화재 및 나중에 소비하거나) 비동기 결과 (화재 및 잊어 버리기)를 완전히 무시할 때 권장됩니다.
열망하는 작업은 result 또는 null_result 반환 할 수 있습니다. result 반환 유형은 코 루틴에게 반환 된 값 또는 던진 예외 (나중에 소비)를 전달하도록 지시하는 반면 null_result 리턴 유형은 코 루틴에게 그 중 하나를 떨어 뜨리고 무시하도록 지시합니다 (Fire and Forget).
열렬한 코 루틴은 발신자 스레드에서 동기식으로 실행되기 시작할 수 있습니다. 이러한 종류의 코 루틴을 "일반 코 루틴"이라고합니다. Concurrencpp eger coroutines는 주어진 집행자 내에서 이러한 종류의 코 루틴을 "병렬 코 루틴"이라고합니다.
반면에 게으른 과제는 co_await ed 일 때만 실행되기 시작합니다. 이 유형의 작업은 작업 결과가 작업을 작성한 직후에 소비 될 때 권장됩니다. 연기되는 게으른 작업은 비동기 결과를 소비자에게 다시 전달하기 위해 특수 스레드 동기화가 필요하지 않기 때문에 즉각적인 소비의 경우에 약간 최적화됩니다. 컴파일러는 또한 기본 코 루틴 약속을 형성하는 데 필요한 일부 메모리 할당을 최적화 할 수 있습니다. 게으른 작업을 해고하고 다른 일을 실행할 수는 없습니다. 게으른 칼리 코 루틴의 발사는 반드시 발신자 코 루틴의 현탁을 의미합니다. 발신자 코 루틴은 게으른 칼리 코 루틴이 완료 될 때만 재개됩니다. 게으른 작업은 lazy_result 만 반환 할 수 있습니다.
lazy_result::run 호출하여 게으른 작업을 간절한 작업으로 변환 할 수 있습니다. 이 메소드는 게으른 작업 인라인을 실행하고 새로 시작된 작업을 모니터링하는 result 객체를 반환합니다. 개발자가 사용할 결과 유형이 어떤 유형을 사용할지 확신 할 수없는 경우, 필요한 경우 정기적 인 결과로 변환 할 수 있으므로 게으른 결과를 사용하도록 권장됩니다.
함수가 lazy_result , result 또는 null_result 반환하고 본문에 하나 이상의 co_await 또는 co_return 포함하면 기능은 ConsurenCPP Coroutine입니다. 모든 유효한 concurrencpp coroutine은 유효한 작업입니다. 위의 카운트에 대한 예에서 count_even 그러한 코 루틴입니다. 우리는 먼저 count_even 생성 한 다음, 그 안에 ThreadPool Executor는 co_await 사용하여 결국 합류 한 더 많은 어린이 작업 (일반 콜블 즈에서 생성)을 생성했습니다.
ConcurrenCPP Executor는 작업을 예약하고 실행할 수있는 객체입니다. 집행자는 응용 프로그램 코드에서 멀리 떨어져서 스레드, 스레드 풀 및 작업 대기열과 같은 리소스 관리 작업을 단순화합니다. 집행자는 통일 된 concurrencpp::executor 을 제공하고 작업을 실행하는 방법을 제공합니다.
executor API class executor {
/*
Initializes a new executor and gives it a name.
*/
executor (std::string_view name);
/*
Destroys this executor.
*/
virtual ~executor () noexcept = default ;
/*
The name of the executor, used for logging and debugging.
*/
const std::string name;
/*
Schedules a task to run in this executor.
Throws concurrencpp::errors::runtime_shutdown exception if shutdown was called before.
*/
virtual void enqueue (concurrencpp::task task) = 0;
/*
Schedules a range of tasks to run in this executor.
Throws concurrencpp::errors::runtime_shutdown exception if shutdown was called before.
*/
virtual void enqueue (std::span<concurrencpp::task> tasks) = 0;
/*
Returns the maximum count of real OS threads this executor supports.
The actual count of threads this executor is running might be smaller than this number.
returns numeric_limits<int>::max if the executor does not have a limit for OS threads.
*/
virtual int max_concurrency_level () const noexcept = 0;
/*
Returns true if shutdown was called before, false otherwise.
*/
virtual bool shutdown_requested () const noexcept = 0;
/*
Shuts down the executor:
- Tells underlying threads to exit their work loop and joins them.
- Destroys unexecuted coroutines.
- Makes subsequent calls to enqueue, post, submit, bulk_post and
bulk_submit to throw concurrencpp::errors::runtime_shutdown exception.
- Makes shutdown_requested return true.
*/
virtual void shutdown () noexcept = 0;
/*
Turns a callable and its arguments into a task object and
schedules it to run in this executor using enqueue.
Arguments are passed to the task by decaying them first.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type , class ... argument_types>
void post (callable_type&& callable, argument_types&& ... arguments);
/*
Like post, but returns a result object that passes the asynchronous result.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type , class ... argument_types>
result<type> submit (callable_type&& callable, argument_types&& ... arguments);
/*
Turns an array of callables into an array of tasks and
schedules them to run in this executor using enqueue.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type >
void bulk_post (std::span<callable_type> callable_list);
/*
Like bulk_post, but returns an array of result objects that passes the asynchronous results.
Throws errors::runtime_shutdown exception if shutdown has been called before.
*/
template < class callable_type >
std::vector<concurrencpp::result<type>> bulk_submit (std::span<callable_type> callable_list);
};위에서 언급했듯이 ConcurrenCPP는 일반적으로 사용되는 집행자를 제공합니다. 이러한 집행자 유형은 다음과 같습니다.
스레드 풀 executor- 스레드 풀을 유지하는 범용 집행자. 스레드 풀 임원은 차단하지 않는 짧은 CPU 결합 작업에 적합합니다. 응용 프로그램은이 집행자를 비 블로킹 작업의 기본 기관으로 사용하도록 권장됩니다. ConcurrenCPP 스레드 풀은 동적 스레드 주입 및 동적 작업 균형을 제공합니다.
배경 집행자 - 더 큰 실 풀이있는 스레드 풀 executor. 파일 IO 및 DB 쿼리와 같은 짧은 차단 작업을 시작하는 데 적합합니다. 중요 참고 사항 : 결과를 소비 할 때이 집행자는 submit 및 bulk_submit 에 전화하여 반환되었다. CPU-bound 작업이 background_executor 내부에서 처리되는 것을 방지하기 위해 resume_on 을 CPU-bound executor로 전환하는 것이 중요하다.
예:
auto result = background_executor.submit([] { /* some blocking action */ });
auto done_result = co_await result.resolve();
co_await resume_on (some_cpu_executor);
auto val = co_await done_result; // runs inside some_cpu_executor스레드 executor- 새로운 실행 스레드에서 실행되도록 각각의 queured 작업을 시작하는 집행자. 스레드는 재사용되지 않습니다. 이 집행자는 작업 루프를 실행하는 객체 또는 긴 차단 작업과 같은 장기적인 작업에 적합합니다.
작업자 스레드 executor- 단일 작업 대기열을 유지하는 단일 스레드 집행자. 응용 프로그램이 많은 관련 작업을 실행하는 전용 스레드를 원할 때 적합합니다.
매뉴얼 executor- 자체적으로 코 루틴을 실행하지 않는 집행자. 애플리케이션 코드는 수동으로 실행 방법을 호출하여 이전에 queued 작업을 실행할 수 있습니다.
파생 가능한 집행자 - 사용자 정의 된 집행자를위한 기본 클래스. concurrencpp::executor 에서 직접 상속하는 것이 가능하지만 derivable_executor 컴파일러에 최적화 기회를 제공하는 CRTP 패턴을 사용합니다.
인라인 집행자 - 주로 다른 집행자의 행동을 무시하는 데 사용됩니다. 작업을 동반하는 것은 인라인을 호출하는 것과 같습니다.
집행자의 베어 메커니즘은 enqueue 방법으로 캡슐화됩니다. 이 메소드는 실행 작업을 수행하고 두 개의 오버로드가 있습니다. 하나의 오버로드는 단일 작업 객체를 인수로, 다른 하나는 작업 객체를 수신하는 다른 작업 객체를 수신합니다. 두 번째 과부하는 작업의 배치를 큐에 넣는 데 사용됩니다. 이를 통해 더 나은 스케줄링 휴리스틱과 경합 감소가 가능합니다.
응용 프로그램은 enqueue 에만 의존 할 필요가 없으며 concurrencpp::executor 사용자 콜블링을 무대 뒤에서 작업 오브젝트로 변환하여 사용자 콜블링을 예약하기위한 API를 제공합니다. 응용 프로그램은 실행자에게 제공된 호출 가능의 비동기 결과를 전달하는 결과 객체를 반환하도록 요청할 수 있습니다. 이것은 executor::submit 및 executor::bulk_submit . submit 호출 가능을 가져 와서 결과 객체를 반환합니다. executor::bulk_submit 콜라블의 span 를 가져 와서 비슷한 방식으로 결과 객체의 vector submit 합니다. 대부분의 경우 응용 프로그램은 비동기 값 또는 예외에 관심이 없습니다. 이 경우 응용 프로그램은 executor:::post 및 executor::bulk_post 사용하여 호출 가능 또는 실행할 콜블 span 예약 할 수 있지만, 반환 된 값을 삭제하거나 예외를 삭제하는 작업을 지시합니다. 비동기 결과를 통과하지 않는 것이 통과하는 것보다 빠르지 만 진행중인 작업의 상태 나 결과를 알 수있는 방법은 없습니다.
post , bulk_post , submit 및 bulk_submit 기본 스케줄링 메커니즘에 대한 장면 뒤에 enqueue 사용합니다.
thread_pool_executor API post , bulk_post 및 bulk_submit submit 것 외에도 thread_pool_executor 는 이러한 추가 방법을 제공합니다.
class thread_pool_executor {
/*
Returns the number of milliseconds each thread-pool worker
remains idle (lacks any task to execute) before exiting.
This constant can be set by passing a runtime_options object
to the constructor of the runtime class.
*/
std::chrono::milliseconds max_worker_idle_time () const noexcept ;
};manual_executor API post , submit , bulk_post 및 bulk_submit 을 제외하고 manual_executor 이러한 추가 방법을 제공합니다.
class manual_executor {
/*
Destructor. Equivalent to clear.
*/
~manual_executor () noexcept ;
/*
Returns the number of enqueued tasks at the moment of invocation.
This number can change quickly by the time the application handles it, it should be used as a hint.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
size_t size () const noexcept ;
/*
Queries whether the executor is empty from tasks at the moment of invocation.
This value can change quickly by the time the application handles it, it should be used as a hint.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
bool empty () const noexcept ;
/*
Clears the executor from any enqueued but yet to-be-executed tasks,
and returns the number of cleared tasks.
Tasks enqueued to this executor by (post_)submit method are resumed
and errors::broken_task exception is thrown inside them.
Ongoing tasks that are being executed by loop_once(_XXX) or loop(_XXX) are uneffected.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t clear ();
/*
Tries to execute a single task. If at the moment of invocation the executor
is empty, the method does nothing.
Returns true if a task was executed, false otherwise.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool loop_once ();
/*
Tries to execute a single task.
This method returns when either a task was executed or max_waiting_time
(in milliseconds) has reached.
If max_waiting_time is 0, the method is equivalent to loop_once.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool loop_once_for (std::chrono::milliseconds max_waiting_time);
/*
Tries to execute a single task.
This method returns when either a task was executed or timeout_time has reached.
If timeout_time has already expired, this method is equivalent to loop_once.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
bool loop_once_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Tries to execute max_count enqueued tasks and returns the number of tasks that were executed.
This method does not wait: it returns when the executor
becomes empty from tasks or max_count tasks have been executed.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t loop ( size_t max_count);
/*
Tries to execute max_count tasks.
This method returns when either max_count tasks were executed or a
total amount of max_waiting_time has passed.
If max_waiting_time is 0, the method is equivalent to loop.
Returns the actual amount of tasks that were executed.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t loop_for ( size_t max_count, std::chrono::milliseconds max_waiting_time);
/*
Tries to execute max_count tasks.
This method returns when either max_count tasks were executed or timeout_time has reached.
If timeout_time has already expired, the method is equivalent to loop.
Returns the actual amount of tasks that were executed.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
size_t loop_until ( size_t max_count, std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Waits for at least one task to be available for execution.
This method should be used as a hint,
as other threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
void wait_for_task ();
/*
This method returns when one or more tasks are available for
execution or max_waiting_time has passed.
Returns true if at at least one task is available for execution, false otherwise.
This method should be used as a hint, as other threads (calling loop, for example)
might empty the executor, before this thread has a chance to do something
with the newly enqueued tasks.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
bool wait_for_task_for (std::chrono::milliseconds max_waiting_time);
/*
This method returns when one or more tasks are available for execution or timeout_time has reached.
Returns true if at at least one task is available for execution, false otherwise.
This method should be used as a hint,
as other threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method
returns and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
bool wait_for_task_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
This method returns when max_count or more tasks are available for execution.
This method should be used as a hint, as other threads
(calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
void wait_for_tasks ( size_t max_count);
/*
This method returns when max_count or more tasks are available for execution
or max_waiting_time (in milliseconds) has passed.
Returns the number of tasks available for execution when the method returns.
This method should be used as a hint, as other
threads (calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
size_t wait_for_tasks_for ( size_t count, std::chrono::milliseconds max_waiting_time);
/*
This method returns when max_count or more tasks are available for execution
or timeout_time is reached.
Returns the number of tasks available for execution when the method returns.
This method should be used as a hint, as other threads
(calling loop, for example) might empty the executor,
before this thread has a chance to do something with the newly enqueued tasks.
If shutdown is called from another thread, this method returns
and throws errors::shutdown_exception.
This method is thread safe.
Might throw std::system_error if one of the underlying synchronization primitives throws.
Throws errors::shutdown_exception if shutdown was called before.
*/
template < class clock_type , class duration_type >
size_t wait_for_tasks_until ( size_t count, std::chrono::time_point<clock_type, duration_type> timeout_time);
}; ConcurrenCPP 결과 객체를 사용하여 비동기 값과 예외를 소비 할 수 있습니다. result 유형은 열망하는 작업의 비동기 결과를 나타내며 lazy_result 게으른 작업의 연기 된 결과를 나타냅니다.
작업 (열심 또는 게으른)이 완료되면 유효한 값을 반환하거나 예외를 던집니다. 두 경우 모두이 비동기 결과는 결과 객체의 소비자에게 전달됩니다.
result 객체는 비대칭 코 루틴을 형성합니다-발신자-코 루틴의 실행은 Callee-Coroutine의 실행에 의해 영향을받지 않습니다. 두 코 루틴은 독립적으로 실행할 수 있습니다. Callee-Coroutine의 결과를 소비 할 때만 발신자 코 루틴이 Callee가 완료되기를 기다리는 중단 될 수 있습니다. 그 시점까지 두 코 루틴은 독립적으로 실행됩니다. Callee-Coroutine은 결과가 소비되는지 여부에 관계없이 실행됩니다.
lazy_result 객체를 형성합니다. 대칭 코 루틴 형성-Callee-Coroutine의 실행은 발신자-코 루틴이 중단 된 후에 만 발생합니다. 게으른 결과를 기다릴 때, 현재 코 루틴이 중단되고 게으른 결과와 관련된 게으른 작업이 실행되기 시작합니다. Callee-Coroutine이 완료되고 결과를 얻은 후 발신자 코 루틴이 재개됩니다. 게으른 결과가 소비되지 않으면 관련 게으른 작업이 실행되기 시작하지 않습니다.
모든 결과 객체는 이동 전용 유형이므로 콘텐츠가 다른 결과 객체로 이동 한 후에는 사용할 수 없습니다. 이 경우 결과 객체는 비어있는 것으로 간주되며 operator bool 및 operator = 이외의 방법을 호출하려고 시도합니다.
비동기 결과가 결과 객체에서 벗어난 후 (예 : get 또는 operator co_await 호출하여) 결과 객체가 비어 있습니다. 공허함은 operator bool 로 테스트 할 수 있습니다.
결과를 기다리는 것은 결과 객체가 준비 될 때까지 현재 코 루틴을 중단하는 것을 의미합니다. 관련 작업에서 유효한 값을 반환하면 결과 오브젝트에서 반환됩니다. 관련 작업이 예외를 던지면 재투자됩니다. 기다리는 순간, 결과가 이미 준비되면 현재 코 루틴이 즉시 재개됩니다. 그렇지 않으면 비동기 결과 또는 예외를 설정하는 스레드에 의해 재개됩니다.
결과를 해결하는 것은 그것을 기다리는 것과 비슷합니다. 차이점은 co_await 표현식이 결과 객체 자체를 비 빈 형태로 준비 상태로 반환한다는 것입니다. 그런 다음 get 또는 co_await 사용하여 비동기 결과를 가져올 수 있습니다.
모든 결과 객체는 비동기 결과의 상태를 나타내는 상태가 있습니다. 결과 상태는 result_status::idle (비동기 결과 또는 예외는 아직 생성되지 않았다) result_status::value (유효한 값을 반환하여 우아하게 종료 됨)에서 결과 _status :: 예외 (예외를 던져서 종료 된 작업)에 이르기까지 result_status::exception (비동기 결과 또는 예외가 아직 생성되지 않았다)에 따라 다릅니다. (lazy_)result::status 호출하여 상태를 쿼리 할 수 있습니다.
result 유형 result 유형은 std::future 와 유사한 진행중인 비동기적인 작업의 결과를 나타냅니다.
결과 객체를 기다리거나 해결하는 것 외에도 result::wait 호출하여 result::wait_for 과 같은 result::get result::wait_until 하여 기다릴 수 있습니다. 결과가 완료되기를 기다리는 것은 차단 작업 (비동기 결과가 준비되지 않은 경우)이며, 비동기 결과가 사용되기를 기다리는 전체 실행 스레드가 중단됩니다. 대기 작업은 일반적으로 권장되지 않으며 루트 수준 작업 또는 컨텍스트에서만 허용됩니다. 나머지 응용 프로그램이 우아하게 완료되기를 기다리거나 concurrencpp::blocking_executor 또는 concurrencpp::thread_executor 사용하는 것과 같은 기본 스레드를 차단합니다.
co_await 사용하여 결과 객체를 기다리는 (그리고 그렇게함으로써, 현재 함수/작업을 코 루틴으로 전환 함) 기본 스레드를 차단하지 않기 때문에 결과 객체를 소비하는 것이 선호되는 방법입니다.
result API class result {
/*
Creates an empty result that isn't associated with any task.
*/
result () noexcept = default ;
/*
Destroys the result. Associated tasks are not cancelled.
The destructor does not block waiting for the asynchronous result to become ready.
*/
~result () noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result (result&& rhs) noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.
*/
result& operator = (result&& rhs) noexcept = default ;
/*
Returns true if this is a non-empty result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The returned value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Blocks the current thread of execution until this result is ready,
when status() != result_status::idle.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void wait ();
/*
Blocks until this result is ready or duration has passed. Returns the status
of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class duration_unit , class ratio >
result_status wait_for (std::chrono::duration<duration_unit, ratio> duration);
/*
Blocks until this result is ready or timeout_time has reached. Returns the status
of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class clock , class duration >
result_status wait_until (std::chrono::time_point<clock, duration> timeout_time);
/*
Blocks the current thread of execution until this result is ready,
when status() != result_status::idle.
If the result is a valid value, it is returned, otherwise, get rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
type get ();
/*
Returns an awaitable used to await this result.
If the result is already ready - the current coroutine resumes
immediately in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended
and resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to resolve this result.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
};lazy_result 유형게으른 결과 물체는 지연된 게으른 작업의 결과를 나타냅니다.
lazy_result 관련 게으른 작업을 시작하고 연기 된 결과를 소비자에게 다시 전달해야합니다. 기다리거나 해결되면 게으른 결과는 현재 코 루틴을 중단하고 관련 게으른 작업을 시작합니다. 관련 작업이 완료되면 비동기 값이 발신자 작업으로 전달되어 재개됩니다.
때로는 API가 게으른 결과를 반환 할 수 있지만 응용 프로그램은 호출자 작업을 중단하지 않고 간절히 실행하기 위해 관련 작업이 필요합니다. 이 경우, 게으른 작업은 관련 게으른 결과를 run 하여 열망하는 작업으로 변환 할 수 있습니다. 이 경우 관련 작업은 발신자 작업을 중단하지 않고 인라인으로 실행되기 시작합니다. 원래 게으른 결과가 비워지고 새로 시작된 작업을 모니터링하는 유효한 result 객체가 대신 반환됩니다.
lazy_result API class lazy_result {
/*
Creates an empty lazy result that isn't associated with any task.
*/
lazy_result () noexcept = default ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
lazy_result (lazy_result&& rhs) noexcept ;
/*
Destroys the result. If not empty, the destructor destroys the associated task without resuming it.
*/
~lazy_result () noexcept ;
/*
Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.
If *this is not empty, then operator= destroys the associated task without resuming it.
*/
lazy_result& operator =(lazy_result&& rhs) noexcept ;
/*
Returns true if this is a non-empty result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The returned value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Returns an awaitable used to start the associated task and await this result.
If the result is already ready - the current coroutine resumes immediately
in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended and
resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to start the associated task and resolve this result.
If the result is already ready - the current coroutine resumes immediately
in the calling thread of execution.
If the result is not ready yet, the current coroutine is suspended and resumed
when the asynchronous result is ready, by the thread which
had set the asynchronous value or exception.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
/*
Runs the associated task inline and returns a result object that monitors the newly started task.
After this call, *this is empty.
Throws errors::empty_result if *this is empty.
Might throw std::bad_alloc if fails to allocate memory.
*/
result<type> run ();
};일반적인 열성적인 코 루틴은 실행의 호출 스레드에서 동기식으로 실행되기 시작합니다. 예를 들어 코 루틴이 재조정을 겪는 경우, 예를 들어 그 안에는 준비되지 않은 결과 객체를 기다리는 것과 같이 실행은 다른 실행 스레드로 전환 될 수 있습니다. ConcurrenCPP는 또한 평행 한 코 루틴을 제공하며, 이는 실행의 호출 스레드가 아닌 주어진 집행자 내부에서 실행되기 시작합니다. 이 스타일의 스케줄링 코 루틴은 포크 조명 모델을 사용하는 병렬 알고리즘, 재귀 알고리즘 및 동시 알고리즘을 작성할 때 특히 유용합니다.
모든 병렬 코 루틴은 다음 전제 조건을 충족해야합니다.
result / null_result 반환합니다.executor_tag 첫 번째 인수로 얻습니다.type* / type& / std::shared_ptr<type> executor type 가져옵니다.co_await 또는 co_return 이 포함되어 있습니다. 위의 모든 내용이 적용되면이 기능은 병렬 코 루틴입니다. ConsurenCPP는 Coroutine이 매달린 상태로 시작하여 제공된 집행자에서 실행되도록 즉시 일정을 변경합니다. concurrencpp::executor_tag ConcurrenCPP 런타임 에이 함수가 일반 기능이 아니라고 말하는 더미 자리 표시 자입니다. 주어진 집행자 내부에서 실행해야합니다. 집행자가 병렬 코 루틴으로 전달 된 경우, 코 루틴은 실행되기 시작하지 않으며 std::invalid_argument 예외는 동시에 던져집니다. 모든 전제 조건이 충족되면 응용 프로그램은 반환 된 결과 객체를 사용하여 병렬 코 루틴의 결과를 소비 할 수 있습니다.
이 예에서는 Fibonacci 서열의 30 번째 구성원을 평행 한 방식으로 계산합니다. 우리는 자체 병렬 코 루틴에서 각 Fibonacci 단계를 시작합니다. 첫 번째 인수는 더미 executor_tag 이고 두 번째 인수는 ThreadPool 집행자입니다. 모든 재귀 단계는 병렬로 실행되는 새로운 병렬 코 루틴을 호출합니다. 각 결과는 부모 작업에 co_return 사용하고 co_await 사용하여 획득합니다.
입력이 동기식으로 계산 될 정도로 작게 간주되면 ( curr <= 10 일 때), 자체 작업에서 각 재귀 단계를 실행하는 것을 중지하고 알고리즘을 동시에 해결합니다.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace concurrencpp ;
int fibonacci_sync ( int i) {
if (i == 0 ) {
return 0 ;
}
if (i == 1 ) {
return 1 ;
}
return fibonacci_sync (i - 1 ) + fibonacci_sync (i - 2 );
}
result< int > fibonacci (executor_tag, std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10 ) {
co_return fibonacci_sync (curr);
}
auto fib_1 = fibonacci ({}, tpe, curr - 1 );
auto fib_2 = fibonacci ({}, tpe, curr - 2 );
co_return co_await fib_1 + co_await fib_2;
}
int main () {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci ({}, runtime. thread_pool_executor (), 30 ). get ();
std::cout << " fibonacci(30) = " << fibb_30 << std::endl;
return 0 ;
} 비교하기 위해 이것은 병렬 코 루틴을 사용하지 않고 동일한 코드가 작성되고 executor::submit 에 의존하는 방법입니다. fibonacci result<int> 반환하기 때문에 executor::submit result<result<int>> 결과가 발생합니다.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace concurrencpp ;
int fibonacci_sync ( int i) {
if (i == 0 ) {
return 0 ;
}
if (i == 1 ) {
return 1 ;
}
return fibonacci_sync (i - 1 ) + fibonacci_sync (i - 2 );
}
result< int > fibonacci (std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10 ) {
co_return fibonacci_sync (curr);
}
auto fib_1 = tpe-> submit (fibonacci, tpe, curr - 1 );
auto fib_2 = tpe-> submit (fibonacci, tpe, curr - 2 );
co_return co_await co_await fib_1 +
co_await co_await fib_2;
}
int main () {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci (runtime. thread_pool_executor (), 30 ). get ();
std::cout << " fibonacci(30) = " << fibb_30 << std::endl;
return 0 ;
} 결과 객체는 ConcurrenCPP의 작업간에 데이터를 전달하는 주요 방법이며, 우리는 집행자와 코 루틴이 그러한 객체를 생성하는 방법을 보았습니다. 때로는 타사 라이브러리를 사용할 때와 같이 비 태스크와 함께 결과 객체의 기능을 사용하려고합니다. 이 경우 result_promise 사용하여 결과 객체를 완료 할 수 있습니다. result_promise std::promise 객체와 비슷합니다 - 응용 프로그램은 비동기 결과 또는 예외를 수동으로 설정하고 관련 result 객체를 준비 할 수 있습니다.
결과 객체와 마찬가지로 결과 촉진은 이동 후 비어있는 이동 전용 유형입니다. 마찬가지로 결과 또는 예외를 설정 한 후 결과 약속도 비워집니다. 결과 프로모션이 범위를 벗어나 결과/예외가 설정되지 않은 경우 결과 프로파일 파괴자는 set_exception 메소드를 사용하여 concurrencpp::errors::broken_task 예외를 설정합니다. 관련 결과 개체를 기다리는 중단 및 차단 작업이 재개/차단되지 않습니다.
결과 약속은 콜백 스타일의 코드 스타일을 async/await 스타일로 변환 할 수 있습니다. 구성 요소가 비동기 결과를 전달하기 위해 콜백이 필요할 때마다 통과 된 결과 약속에서 set_result 또는 set_exception 호출하는 콜백을 전달하고 관련 결과를 반환 할 수 있습니다.
result_promise API template < class type >
class result_promise {
/*
Constructs a valid result_promise.
Might throw std::bad_alloc if fails to allocate memory.
*/
result_promise ();
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result_promise (result_promise&& rhs) noexcept ;
/*
Destroys *this, possibly setting an errors::broken_task exception
by calling set_exception if *this is not empty at the time of destruction.
*/
~result_promise () noexcept ;
/*
Moves the content of rhs to *this. After this call, rhs is empty.
*/
result_promise& operator = (result_promise&& rhs) noexcept ;
/*
Returns true if this is a non-empty result-promise.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Sets a value by constructing <<type>> from arguments... in-place.
Makes the associated result object become ready - tasks waiting for it
to become ready are unblocked.
Suspended tasks are resumed inline.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Might throw any exception that the constructor
of type(std::forward<argument_types>(arguments)...) throws.
*/
template < class ... argument_types>
void set_result (argument_types&& ... arguments);
/*
Sets an exception.
Makes the associated result object become ready - tasks waiting for it
to become ready are unblocked.
Suspended tasks are resumed inline.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Throws std::invalid_argument exception if exception_ptr is null.
*/
void set_exception (std::exception_ptr exception_ptr);
/*
A convenience method that invokes a callable with arguments... and calls set_result
with the result of the invocation.
If an exception is thrown, the thrown exception is caught and set instead by calling set_exception.
After this call, *this becomes empty.
Throws errors::empty_result_promise exception If *this is empty.
Might throw any exception that callable(std::forward<argument_types>(arguments)...)
or the contructor of type(type&&) throw.
*/
template < class callable_type , class ... argument_types>
void set_from_function (callable_type&& callable, argument_types&& ... arguments);
/*
Gets the associated result object.
Throws errors::empty_result_promise exception If *this is empty.
Throws errors::result_already_retrieved exception if this method had been called before.
*/
result<type> get_result ();
};result_promise 예 : 이 예에서는 result_promise 한 스레드에서 데이터를 푸시하는 데 사용되며 다른 스레드에서 관련 result 객체에서 가져올 수 있습니다.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::result_promise<std::string> promise;
auto result = promise. get_result ();
std::thread my_3_party_executor ([promise = std::move (promise)] () mutable {
std::this_thread::sleep_for ( std::chrono::seconds ( 1 )); // Imitate real work
promise. set_result ( " hello world " );
});
auto asynchronous_string = result. get ();
std::cout << " result promise returned string: " << asynchronous_string << std::endl;
my_3_party_executor. join ();
} 이 예에서는 std::thread 타사 집행자로 사용합니다. 이는 비 Concurrencpp executor가 응용 프로그램 수명주기의 일부로 사용되는 시나리오를 나타냅니다. 우리는 약속을 전달하기 전에 결과 객체를 추출하고 결과가 준비 될 때까지 메인 스레드를 차단합니다. my_3_party_executor 에서 우리는 마치 co_return 에 대한 결과를 설정했습니다.
공유 결과는 여러 소비자가 std::shared_future 와 유사한 비동기 결과에 액세스 할 수있는 특별한 종류의 결과 객체입니다. 다른 스레드의 다른 소비자는 스레드 안전 방식으로 await , get , resolve 같은 기능을 호출 할 수 있습니다.
공유 결과는 일반 결과 객체에서 구축되며 일반 결과 객체와 달리 복사 가능하고 움직일 수 있습니다. 따라서 shared_result std::shared_ptr 유형처럼 동작합니다. 공유 결과 인스턴스가 다른 인스턴스로 이동하면 인스턴스가 비어 있고 액세스하려고 시도하면 예외가 발생합니다.
여러 소비자를 지원하기 위해 공유 결과는 정기적 인 결과와 같이 비동기 가치에 대한 참조를 반환합니다. 예 get 들어, shared_result<int> int& await 합니다. shared_result 의 기본 유형이 void 또는 참조 유형 ( int& ) 인 경우 평소와 같이 반환됩니다. 비동기 결과가 노출 된 경우, 재발된다.
여러 스레드에서 shared_result 사용하여 비동기 결과를 획득하는 동안 스레드-안전이지만 실제 값은 스레드 안전하지 않을 수 있습니다. 예를 들어, 여러 스레드는 참조 ( int& )를 수신하여 비동기 정수를 얻을 수 있습니다. 정수 자체를 안전하게 만들지 는 않습니다 . 비동기 값이 이미 스레드 안전한 경우 비동기 값을 돌연변이하는 것은 괜찮습니다. 대안 적으로, 응용 분야는 const 유형을 사용하여 ( const int 와 같은) 시작하여 돌연변이를 방지하는 일정한 참조 ( const int& 와 같은)를 얻는 것이 좋습니다.
shared_result API class share_result {
/*
Creates an empty shared-result that isn't associated with any task.
*/
shared_result () noexcept = default ;
/*
Destroys the shared-result. Associated tasks are not cancelled.
The destructor does not block waiting for the asynchronous result to become ready.
*/
~shared_result () noexcept = default ;
/*
Converts a regular result object to a shared-result object.
After this call, rhs is empty.
Might throw std::bad_alloc if fails to allocate memory.
*/
shared_result (result<type> rhs);
/*
Copy constructor. Creates a copy of the shared result object that monitors the same task.
*/
shared_result ( const shared_result&) noexcept = default ;
/*
Move constructor. Moves rhs to *this. After this call, rhs is empty.
*/
shared_result (shared_result&& rhs) noexcept = default ;
/*
Copy assignment operator. Copies rhs to *this and monitors the same task that rhs monitors.
*/
shared_result& operator =( const shared_result& rhs) noexcept ;
/*
Move assignment operator. Moves rhs to *this. After this call, rhs is empty.
*/
shared_result& operator =(shared_result&& rhs) noexcept ;
/*
Returns true if this is a non-empty shared-result.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Queries the status of *this.
The return value is any of result_status::idle, result_status::value or result_status::exception.
Throws errors::empty_result if *this is empty.
*/
result_status status () const ;
/*
Blocks the current thread of execution until this shared-result is ready,
when status() != result_status::idle.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void wait ();
/*
Blocks until this shared-result is ready or duration has passed.
Returns the status of this shared-result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class duration_type , class ratio_type >
result_status wait_for (std::chrono::duration<duration_type, ratio_type> duration);
/*
Blocks until this shared-result is ready or timeout_time has reached.
Returns the status of this result after unblocking.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
template < class clock_type , class duration_type >
result_status wait_until (std::chrono::time_point<clock_type, duration_type> timeout_time);
/*
Blocks the current thread of execution until this shared-result is ready,
when status() != result_status::idle.
If the result is a valid value, a reference to it is returned,
otherwise, get rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
std:: add_lvalue_reference_t <type> get ();
/*
Returns an awaitable used to await this shared-result.
If the shared-result is already ready - the current coroutine resumes
immediately in the calling thread of execution.
If the shared-result is not ready yet, the current coroutine is
suspended and resumed when the asynchronous result is ready,
by the thread which had set the asynchronous value or exception.
In either way, after resuming, if the result is a valid value, a reference to it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws errors::empty_result if *this is empty.
*/
auto operator co_await ();
/*
Returns an awaitable used to resolve this shared-result.
After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
Throws errors::empty_result if *this is empty.
*/
auto resolve ();
};shared_result 예 : 이 예에서는 result 객체가 shared_result 객체로 변환되며 비동기 int 결과에 대한 참조는 thread_executor 와 함께 스폰 된 많은 작업에 의해 획득됩니다.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < chrono >
concurrencpp::result< void > consume_shared_result (concurrencpp::shared_result< int > shared_result,
std::shared_ptr<concurrencpp::executor> resume_executor) {
std::cout << " Awaiting shared_result to have a value " << std::endl;
const auto & async_value = co_await shared_result;
concurrencpp::resume_on (resume_executor);
std::cout << " In thread id " << std::this_thread::get_id () << " , got: " << async_value << " , memory address: " << &async_value << std::endl;
}
int main () {
concurrencpp::runtime runtime;
auto result = runtime. background_executor ()-> submit ([] {
std::this_thread::sleep_for ( std::chrono::seconds ( 1 ));
return 100 ;
});
concurrencpp::shared_result< int > shared_result ( std::move (result));
concurrencpp::result< void > results[ 8 ];
for ( size_t i = 0 ; i < 8 ; i++) {
results[i] = consume_shared_result (shared_result, runtime. thread_pool_executor ());
}
std::cout << " Main thread waiting for all consumers to finish " << std::endl;
auto tpe = runtime. thread_pool_executor ();
auto all_consumed = concurrencpp::when_all (tpe, std::begin (results), std::end (results)). run ();
all_consumed. get ();
std::cout << " All consumers are done, exiting " << std::endl;
return 0 ;
} 런타임 객체가 main 범위를 벗어나면 각 저장된 집행자를 반복하고 shutdown 방법을 호출합니다. 타이머 큐에 액세스하려고 시도하면 실행자가 errors::runtime_shutdown 예외. 집행자가 종료되면 내부 작업 대기열을 지우고 실행되지 않은 task 객체를 파괴합니다. 작업 객체가 ConcurrenCPP-Coroutine을 저장하면 해당 코 루틴이 인라인으로 재개되고 errors::broken_task 합니다. 어쨌든 runtime_shutdown 또는 broken_task 예외가 발생하는 경우, 응용 프로그램은 현재 코드 플로우를 최대한 빨리 종료해야합니다. 이러한 예외는 무시해서는 안됩니다. runtime_shutdown 및 broken_task 모두 errors::interrupted_task Base 클래스는이 유형을 catch 절에서 사용하여 연합 방식으로 종료를 처리 할 수 있습니다.
많은 concurrencpp 비동기 동작에는 이력서 집행자 로서 집행자의 인스턴스가 필요합니다. 비동기 동작 (코 루틴으로 구현)이 동시에 완료 될 수 있으면 실행의 호출 스레드에서 즉시 재개됩니다. 비동기 동작이 동기식으로 완료 될 수없는 경우, 주어진 이력서 실행 업체 내부에서 완료되면 재개됩니다. 예를 들어, when_any 유틸리티 함수는 이력서-실행자 인스턴스를 첫 번째 인수로 요구합니다. when_any lazy_result 를 반환하여 적어도 하나의 주어진 결과가 준비되면 준비됩니다. when_any 호출 순간에 결과 중 하나가 이미 준비된 경우, 호출 Coroutine은 Execution의 호출 스레드에서 동기로 재개됩니다. 그렇지 않은 경우, 주어진 이력서-실행자 내부에서 최소한 결과가 완료되면 호출 코 루틴이 재개됩니다. 이력서 집행 인은 코 루틴이 재개 될 위치 (예 : when_any 및 when_all 의 경우) 또는 비동기 조치가 ConcurrencPP 작업자 중 하나에서 처리되는 경우, 신청 코드를 처리하는 데 사용되는 경우에만 비동기 조치를 처리하는 경우 코 루틴이 재개되는 경우의 명령을 의무화하기 때문에 중요합니다.
make_ready_result 함수 make_ready_result 주어진 인수에서 준비된 결과 객체를 만듭니다. 이러한 결과를 기다리면 현재 코 루틴이 즉시 재개됩니다. get 및 operator co_await 구성된 값을 반환합니다.
/*
Creates a ready result object by building <<type>> from arguments&&... in-place.
Might throw any exception that the constructor
of type(std::forward<argument_types>(arguments)...) throws.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type , class ... argument_types>
result<type> make_ready_result (argument_types&& ... arguments);
/*
An overload for void type.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
result< void > make_ready_result ();make_exceptional_result 함수 make_exceptional_result 주어진 예외에서 준비된 결과 객체를 만듭니다. 이러한 결과를 기다리면 현재 코 루틴이 즉시 재개됩니다. get 및 operator co_await 주어진 예외를 다시 줄입니다.
/*
Creates a ready result object from an exception pointer.
The returned result object will re-throw exception_ptr when calling get or await.
Throws std::invalid_argument if exception_ptr is null.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type >
result<type> make_exceptional_result (std::exception_ptr exception_ptr);
/*
Overload. Similar to make_exceptional_result(std::exception_ptr),
but gets an exception object directly.
Might throw any exception that the constructor of exception_type(std::move(exception)) might throw.
Might throw std::bad_alloc exception if fails to allocate memory.
*/
template < class type , class exception_type >
result<type> make_exceptional_result (exception_type exception );when_all 기능 when_all 모든 입력 결과가 완료 될 때 준비되는 게으른 결과 객체를 생성하는 유틸리티 기능입니다. 이 게으른 결과를 기다리는 것은 모든 입력-분리 개체를 준비 상태로 반환하여 소비 할 준비가되었습니다.
when_all 함수에는 세 가지 맛이 포함되어 있습니다. 하나는 이기종의 결과 객체를 받아들이고, 다른 하나는 동일한 유형의 결과 객체에 한 쌍의 반복자를 가져 오는 한 쌍의 반복자를 얻는다. 입력 결과 객체가없는 경우 - 함수는 빈 튜플의 준비 결과 개체를 반환합니다.
전달 된 결과 객체 중 하나가 비어 있으면 예외가 발생합니다. 이 경우, 입력 평균 객체는 함수의 영향을받지 않으며 예외가 처리 된 후에 다시 사용할 수 있습니다. 모든 입력 결과 객체가 유효한 경우이 기능에 의해 비워지고 출력 결과로 유효하고 준비된 상태로 반환됩니다.
현재 when_all result 객체 만 수락합니다.
모든 오버로드는 이력서 집행자를 첫 번째 매개 변수로 허용합니다. when_all 이 반환 한 결과를 기다릴 때, 발신자 코 루틴은 주어진 이력서 집행자에 의해 재개됩니다.
/*
Creates a result object that becomes ready when all the input results become ready.
Passed result objects are emptied and returned as a tuple.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class ... result_types>
lazy_result<std::tuple< typename std::decay<result_types>::type...>>
when_all (std::shared_ptr<executor_type> resume_executor,
result_types&& ... results);
/*
Overload. Similar to when_all(result_types&& ...) but receives a pair of iterators referencing a range.
Passed result objects are emptied and returned as a vector.
If begin == end, the function returns immediately with an empty vector.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class iterator_type >
lazy_result<std::vector< typename std::iterator_traits<iterator_type>::value_type>>
when_all (std::shared_ptr<executor_type> resume_executor,
iterator_type begin, iterator_type end);
/*
Overload. Returns a ready result object that doesn't monitor any asynchronous result.
Might throw an std::bad_alloc exception if no memory is available.
*/
lazy_result<std::tuple<>> when_all (std::shared_ptr<executor_type> resume_executor);when_any 함수 when_any 적어도 하나의 입력 결과가 완료 될 때 준비되는 게으른 결과 객체를 생성하는 유틸리티 함수입니다. 이 결과를 기다리면 모든 입력 평균 객체와 완성 된 작업의 인덱스가 포함 된 도우미 구조를 반환합니다. 준비된 결과를 소비 할 때까지 다른 결과가 이미 비동기식으로 완료되었을 수 있습니다. 응용 프로그램은 모든 결과가 소비 될 때까지 완료 될 때 준비된 결과를 소비하기 위해 when_any 반복적으로 호출 할 수 있습니다.
when_any 함수에는 두 가지 맛이 있습니다. 하나는 이기종 범위의 결과 객체를 받아들이고 다른 하나는 동일한 유형의 결과 - 객체로 한 쌍의 반복자를 얻습니다. when_all 과 달리 결과 범위가 완전히 비어있을 때 완료하기 위해 적어도 하나의 작업을 기다리는 의미는 없습니다. 따라서 인수가없는 과부하가 없습니다. 또한, 반복자의 과부하는 반복자가 빈 범위 ( begin == end )를 참조하면 예외가 발생합니다.
전달 된 결과 객체 중 하나가 비어 있으면 예외가 발생합니다. 어쨌든 예외가 발생하면 입력 평균 객체는 함수의 영향을받지 않으며 예외가 처리 된 후에 다시 사용할 수 있습니다. 모든 입력 결과 객체가 유효한 경우이 기능에 의해 비워지고 출력 결과로 유효한 상태로 반환됩니다.
현재, when_any result 객체 만 수락합니다.
모든 오버로드는 이력서 집행자를 첫 번째 매개 변수로 허용합니다. when_any 가 반환 한 결과를 기다릴 때, 발신자 코 루틴은 주어진 이력서 집행자가 재개합니다.
/*
Helper struct returned from when_any.
index is the position of the ready result in results sequence.
results is either an std::tuple or an std::vector of the results that were passed to when_any.
*/
template < class sequence_type >
struct when_any_result {
std:: size_t index;
sequence_type results;
};
/*
Creates a result object that becomes ready when at least one of the input results is ready.
Passed result objects are emptied and returned as a tuple.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class ... result_types>
lazy_result<when_any_result<std::tuple<result_types...>>>
when_any (std::shared_ptr<executor_type> resume_executor,
result_types&& ... results);
/*
Overload. Similar to when_any(result_types&& ...) but receives a pair of iterators referencing a range.
Passed result objects are emptied and returned as a vector.
Throws std::invalid_argument if begin == end.
Throws std::invalid_argument if any of the passed result objects is empty.
Might throw an std::bad_alloc exception if no memory is available.
*/
template < class iterator_type >
lazy_result<when_any_result<std::vector< typename std::iterator_traits<iterator_type>::value_type>>>
when_any (std::shared_ptr<executor_type> resume_executor,
iterator_type begin, iterator_type end);resume_on 함수 resume_on 현재 코 루틴을 중단하고 주어진 executor 내부에서 재개하는 차단 가능을 반환합니다. 이것은 코 루틴이 올바른 집행자에서 실행되도록하는 중요한 기능입니다. 예를 들어, 응용 프로그램은 background_executor 사용하여 백그라운드 작업을 예약하고 반환 된 결과 객체를 기다릴 수 있습니다. 이 경우, 기다리고있는 코 루틴은 배경 집행자 내부에서 재개됩니다. 다른 CPU가 결합 된 집행자와 resume_on 을 호출하는 것은 백그라운드 작업이 완료되면 CPU 바운드 코드 라인이 백그라운드 실행자에서 실행되지 않도록합니다. 작업이 resume_on 사용하여 다른 집행자에게 실행되도록 작업을 다시 예약하지만, 그 집행자가 정지 된 작업을 재개하기 전에 종료 된 경우 해당 작업이 즉시 재개되고 erros::broken_task 예외가 발생됩니다. 이 경우 응용 프로그램은 상당히 우아하게해야합니다.
/*
Returns an awaitable that suspends the current coroutine and resumes it inside executor.
Might throw any exception that executor_type::enqueue throws.
*/
template < class executor_type >
auto resume_on (std::shared_ptr<executor_type> executor);ConcurrenCPP는 또한 타이머 및 타이머 큐를 제공합니다. 타이머는 잘 정의 된 시간 간격 내에서 실행자에서 실행되는 비동기 동작을 정의하는 객체입니다. 일반적인 타이머 , 온 쇼트 타이머 및 지연 객체 의 세 가지 유형의 타이머가 있습니다.
일반 타이머에는 4 가지 속성이 있습니다.
Like other objects in concurrencpp, timers are a move only type that can be empty. When a timer is destructed or timer::cancel is called, the timer cancels its scheduled but not yet executed tasks. Ongoing tasks are uneffected. The timer callable must be thread safe. It is recommended to set the due time and the frequency of timers to a granularity of 50 milliseconds.
A timer queue is a concurrencpp worker that manages a collection of timers and processes them in just one thread of execution. It is also the agent used to create new timers. When a timer deadline (whether it is the timer's due-time or frequency) has reached, the timer queue "fires" the timer by scheduling its callable to run on the associated executor as a task.
Just like executors, timer queues also adhere to the RAII concept. When the runtime object gets out of scope, It shuts down the timer queue, cancelling all pending timers. After a timer queue has been shut down, any subsequent call to make_timer , make_onshot_timer and make_delay_object will throw an errors::runtime_shutdown exception. Applications must not try to shut down timer queues by themselves.
timer_queue API: class timer_queue {
/*
Destroys this timer_queue.
*/
~timer_queue () noexcept ;
/*
Shuts down this timer_queue:
Tells the underlying thread of execution to quit and joins it.
Cancels all pending timers.
After this call, invocation of any method besides shutdown
and shutdown_requested will throw an errors::runtime_shutdown.
If shutdown had been called before, this method has no effect.
*/
void shutdown () noexcept ;
/*
Returns true if shutdown had been called before, false otherwise.
*/
bool shutdown_requested () const noexcept ;
/*
Creates a new running timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
template < class callable_type , class ... argumet_types>
timer make_timer (
std::chrono::milliseconds due_time,
std::chrono::milliseconds frequency,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new one-shot timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
template < class callable_type , class ... argumet_types>
timer make_one_shot_timer (
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new delay object where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::runtime_shutdown if shutdown had been called before.
Might throw std::bad_alloc if fails to allocate memory.
Might throw std::system_error if the one of the underlying synchronization primitives throws.
*/
result< void > make_delay_object (
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor);
};timer API: class timer {
/*
Creates an empty timer.
*/
timer () noexcept = default ;
/*
Cancels the timer, if not empty.
*/
~timer () noexcept ;
/*
Moves the content of rhs to *this.
rhs is empty after this call.
*/
timer (timer&& rhs) noexcept = default ;
/*
Moves the content of rhs to *this.
rhs is empty after this call.
Returns *this.
*/
timer& operator = (timer&& rhs) noexcept ;
/*
Cancels this timer.
After this call, the associated timer_queue will not schedule *this
to run again and *this becomes empty.
Scheduled, but not yet executed tasks are cancelled.
Ongoing tasks are uneffected.
This method has no effect if *this is empty or the associated timer_queue has already expired.
Might throw std::system_error if one of the underlying synchronization primitives throws.
*/
void cancel ();
/*
Returns the associated executor of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::shared_ptr<executor> get_executor () const ;
/*
Returns the associated timer_queue of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::weak_ptr<timer_queue> get_timer_queue () const ;
/*
Returns the due time of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::chrono::milliseconds get_due_time () const ;
/*
Returns the frequency of this timer.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
std::chrono::milliseconds get_frequency () const ;
/*
Sets new frequency for this timer.
Callables already scheduled to run at the time of invocation are not affected.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
void set_frequency (std::chrono::milliseconds new_frequency);
/*
Returns true is *this is not an empty timer, false otherwise.
The timer should not be used if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
};In this example we create a regular timer by using the timer queue. The timer schedules its callable to run after 1.5 seconds, then fires its callable every 2 seconds. The given callable runs on the threadpool executor.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
int main () {
concurrencpp::runtime runtime;
std:: atomic_size_t counter = 1 ;
concurrencpp::timer timer = runtime. timer_queue ()-> make_timer (
1500ms,
2000ms,
runtime. thread_pool_executor (),
[&] {
const auto c = counter. fetch_add ( 1 );
std::cout << " timer was invoked for the " << c << " th time " << std::endl;
});
std::this_thread::sleep_for (12s);
return 0 ;
}A oneshot timer is a one-time timer with only a due time - after it schedules its callable to run once it never reschedules it to run again.
In this example, we create a timer that runs only once - after 3 seconds from its creation, the timer will schedule its callable to run on a new thread of execution (using thread_executor ).
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
int main () {
concurrencpp::runtime runtime;
concurrencpp::timer timer = runtime. timer_queue ()-> make_one_shot_timer (
3000ms,
runtime. thread_executor (),
[&] {
std::cout << " hello and goodbye " << std::endl;
});
std::this_thread::sleep_for (4s);
return 0 ;
} A delay object is a lazy result object that becomes ready when it's co_await ed and its due time is reached. Applications can co_await this result object to delay the current coroutine in a non-blocking way. The current coroutine is resumed by the executor that was passed to make_delay_object .
In this example, we spawn a task (that does not return any result or thrown exception), which delays itself in a loop by calling co_await on a delay object.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
using namespace std ::chrono_literals ;
concurrencpp::null_result delayed_task (
std::shared_ptr<concurrencpp::timer_queue> tq,
std::shared_ptr<concurrencpp::thread_pool_executor> ex) {
size_t counter = 1 ;
while ( true ) {
std::cout << " task was invoked " << counter << " times. " << std::endl;
counter++;
co_await tq-> make_delay_object (1500ms, ex);
}
}
int main () {
concurrencpp::runtime runtime;
delayed_task (runtime. timer_queue (), runtime. thread_pool_executor ());
std::this_thread::sleep_for (10s);
return 0 ;
} A generator is a lazy, synchronous coroutine that is able to produce a stream of values to consume. Generators use the co_yield keyword to yield values back to their consumers.
Generators are meant to be used synchronously - they can only use the co_yield keyword and must not use the co_await keyword. A generator will continue to produce values as long as the co_yield keyword is called. If the co_return keyword is called (explicitly or implicitly), then the generator will stop producing values. Similarly, if an exception is thrown then the generator will stop producing values and the thrown exception will be re-thrown to the consumer of the generator.
Generators are meant to be used in a range-for loop: Generators implicitly produce two iterators - begin and end which control the execution of the for loop. These iterators should not be handled or accessed manually.
When a generator is created, it starts as a lazy task. When its begin method is called, the generator is resumed for the first time and an iterator is returned. The lazy task is resumed repeatedly by calling operator++ on the returned iterator. The returned iterator will be equal to end iterator when the generator finishes execution either by exiting gracefully or throwing an exception. As mentioned earlier, this happens behind the scenes by the inner mechanism of the loop and the generator, and should not be called directly.
Like other objects in concurrencpp, Generators are a move-only type. After a generator was moved, it is considered empty and trying to access its inner methods (other than operator bool ) will throw an exception. The emptiness of a generator should not generally occur - it is advised to consume generators upon their creation in a for loop and not to try to call their methods individually.
generator API class generator {
/*
Move constructor. After this call, rhs is empty.
*/
generator (generator&& rhs) noexcept ;
/*
Destructor. Invalidates existing iterators.
*/
~generator () noexcept ;
generator ( const generator& rhs) = delete ;
generator& operator =(generator&& rhs) = delete ;
generator& operator =( const generator& rhs) = delete ;
/*
Returns true if this generator is not empty.
Applications must not use this object if this->operator bool() is false.
*/
explicit operator bool () const noexcept ;
/*
Starts running this generator and returns an iterator.
Throws errors::empty_generator if *this is empty.
Re-throws any exception that is thrown inside the generator code.
*/
iterator begin ();
/*
Returns an end iterator.
*/
static generator_end_iterator end () noexcept ;
};
class generator_iterator {
using value_type = std:: remove_reference_t <type>;
using reference = value_type&;
using pointer = value_type*;
using iterator_category = std::input_iterator_tag;
using difference_type = std:: ptrdiff_t ;
/*
Resumes the suspended generator and returns *this.
Re-throws any exception that was thrown inside the generator code.
*/
generator_iterator& operator ++();
/*
Post-increment version of operator++.
*/
void operator ++( int );
/*
Returns the latest value produced by the associated generator.
*/
reference operator *() const noexcept ;
/*
Returns a pointer to the latest value produced by the associated generator.
*/
pointer operator ->() const noexcept ;
/*
Comparision operators.
*/
friend bool operator ==( const generator_iterator& it0, const generator_iterator& it1) noexcept ;
friend bool operator ==( const generator_iterator& it, generator_end_iterator) noexcept ;
friend bool operator ==(generator_end_iterator end_it, const generator_iterator& it) noexcept ;
friend bool operator !=( const generator_iterator& it, generator_end_iterator end_it) noexcept ;
friend bool operator !=(generator_end_iterator end_it, const generator_iterator& it) noexcept ;
};generator example: In this example, we will write a generator that yields the n-th member of the Sequence S(n) = 1 + 2 + 3 + ... + n where n <= 100 :
concurrencpp::generator< int > sequence () {
int i = 1 ;
int sum = 0 ;
while (i <= 100 ) {
sum += i;
++i;
co_yield sum;
}
}
int main () {
for ( auto value : sequence ()) {
std::cout << value << std::end;
}
return 0 ;
} Regular synchronous locks cannot be used safely inside tasks for a number of reasons:
std::mutex , are expected to be locked and unlocked in the same thread of execution. Unlocking a synchronous lock in a thread which had not locked it is undefined behavior. Since tasks can be suspended and resumed in any thread of execution, synchronous locks will break when used inside tasks. concurrencpp::async_lock solves those issues by providing a similar API to std::mutex , with the main difference that calls to concurrencpp::async_lock will return a lazy-result that can be co_awaited safely inside tasks. If one task tries to lock an async-lock and fails, the task will be suspended, and will be resumed when the lock is unlocked and acquired by the suspended task. This allows executors to process a huge amount of tasks waiting to acquire a lock without expensive context-switching and expensive kernel calls.
Similar to how std::mutex works, only one task can acquire async_lock at any given time, and a read barrier is place at the moment of acquiring. Releasing an async lock places a write barrier and allows the next task to acquire it, creating a chain of one-modifier at a time which sees the changes other modifiers had done and posts its modifications for the next modifiers to see.
Like std::mutex , concurrencpp::async_lock is not recursive . Extra attention must be given when acquiring such lock - A lock must not be acquired again in a task that has been spawned by another task which had already acquired the lock. In such case, an unavoidable dead-lock will occur. Unlike other objects in concurrencpp, async_lock is neither copiable nor movable.
Like standard locks, concurrencpp::async_lock is meant to be used with scoped wrappers which leverage C++ RAII idiom to ensure locks are always unlocked upon function return or thrown exception. async_lock::lock returns a lazy-result of a scoped wrapper that calls async_lock::unlock on destruction. Raw uses of async_lock::unlock are discouraged. concurrencpp::scoped_async_lock acts as the scoped wrapper and provides an API which is almost identical to std::unique_lock . concurrencpp::scoped_async_lock is movable, but not copiable.
async_lock::lock and scoped_async_lock::lock require a resume-executor as their parameter. Upon calling those methods, if the lock is available for locking, then it is locked and the current task is resumed immediately. If not, then the current task is suspended, and will be resumed inside the given resume-executor when the lock is finally acquired.
concurrencpp::scoped_async_lock wraps an async_lock and ensure it's properly unlocked. like std::unique_lock , there are cases it does not wrap any lock, and in this case it's considered to be empty. An empty scoped_async_lock can happen when it's defaultly constructed, moved, or scoped_async_lock::release method is called. An empty scoped-async-lock will not unlock any lock on destruction.
Even if the scoped-async-lock is not empty, it does not mean that it owns the underlying async-lock and it will unlock it on destruction. Non-empty and non-owning scoped-async locks can happen if scoped_async_lock::unlock was called or the scoped-async-lock was constructed using scoped_async_lock(async_lock&, std::defer_lock_t) constructor.
async_lock API class async_lock {
/*
Constructs an async lock object.
*/
async_lock () noexcept ;
/*
Destructs an async lock object.
*this is not automatically unlocked at the moment of destruction.
*/
~async_lock () noexcept ;
/*
Asynchronously acquires the async lock.
If *this has already been locked by another non-parent task, the current task will be suspended
and will be resumed when *this is acquired, inside resume_executor.
If *this has not been locked by another task, then *this will be acquired and the current task will be resumed
immediately in the calling thread of execution.
If *this has already been locked by a parent task, then unavoidable dead-lock will occur.
Throws std::invalid_argument if resume_executor is null.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result<scoped_async_lock> lock (std::shared_ptr<executor> resume_executor);
/*
Tries to acquire *this in the calling thread of execution.
Returns true if *this is acquired, false otherwise.
In any case, the current task is resumed immediately in the calling thread of execution.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result< bool > try_lock ();
/*
Releases *this and allows other tasks (including suspended tasks waiting for *this) to acquire it.
Throws std::system error if *this is not locked at the moment of calling this method.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
void unlock ();
};scoped_async_lock API class scoped_async_lock {
/*
Constructs an async lock wrapper that does not wrap any async lock.
*/
scoped_async_lock () noexcept = default ;
/*
If *this wraps async_lock, this method releases the wrapped lock.
*/
~scoped_async_lock () noexcept ;
/*
Moves rhs to *this.
After this call, *rhs does not wrap any async lock.
*/
scoped_async_lock (scoped_async_lock&& rhs) noexcept ;
/*
Wrapps unlocked lock.
lock must not be in acquired mode when calling this method.
*/
scoped_async_lock (async_lock& lock, std:: defer_lock_t ) noexcept ;
/*
Wrapps locked lock.
lock must be already acquired when calling this method.
*/
scoped_async_lock (async_lock& lock, std:: adopt_lock_t ) noexcept ;
/*
Calls async_lock::lock on the wrapped locked, using resume_executor as a parameter.
Throws std::invalid_argument if resume_executor is nulll.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::lock throws.
*/
lazy_result< void > lock (std::shared_ptr<executor> resume_executor);
/*
Calls async_lock::try_lock on the wrapped lock.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::try_lock throws.
*/
lazy_result< bool > try_lock ();
/*
Calls async_lock::unlock on the wrapped lock.
If *this does not wrap any lock, this method does nothing.
Throws std::system_error if *this wraps a lock and it is not locked.
*/
void unlock ();
/*
Checks whether *this wraps a locked mutex or not.
Returns true if wrapped locked is in acquired state, false otherwise.
*/
bool owns_lock () const noexcept ;
/*
Equivalent to owns_lock.
*/
explicit operator bool () const noexcept ;
/*
Swaps the contents of *this and rhs.
*/
void swap (scoped_async_lock& rhs) noexcept ;
/*
Empties *this and returns a pointer to the previously wrapped lock.
After a call to this method, *this doesn't wrap any lock.
The previously wrapped lock is not released,
it must be released by either unlocking it manually through the returned pointer or by
capturing the pointer with another scoped_async_lock which will take ownerwhip over it.
*/
async_lock* release () noexcept ;
/*
Returns a pointer to the wrapped async_lock, or a null pointer if there is no wrapped async_lock.
*/
async_lock* mutex () const noexcept ;
};async_lock example: In this example we push 10,000,000 integers to an std::vector object from different tasks concurrently, while using async_lock to make sure no data race occurs and the correctness of the internal state of that vector object is preserved.
# include " concurrencpp/concurrencpp.h "
# include < vector >
# include < iostream >
std::vector< size_t > numbers;
concurrencpp::async_lock lock;
concurrencpp::result< void > add_numbers (concurrencpp::executor_tag,
std::shared_ptr<concurrencpp::executor> executor,
size_t begin,
size_t end) {
for ( auto i = begin; i < end; i++) {
concurrencpp::scoped_async_lock raii_wrapper = co_await lock. lock (executor);
numbers. push_back (i);
}
}
int main () {
concurrencpp::runtime runtime;
constexpr size_t range = 10'000'000 ;
constexpr size_t sections = 4 ;
concurrencpp::result< void > results[sections];
for ( size_t i = 0 ; i < 4 ; i++) {
const auto range_start = i * range / sections;
const auto range_end = (i + 1 ) * range / sections;
results[i] = add_numbers ({}, runtime. thread_pool_executor (), range_start, range_end);
}
for ( auto & result : results) {
result. get ();
}
std::cout << " vector size is " << numbers. size () << std::endl;
// make sure the vector state has not been corrupted by unprotected concurrent accesses
std::sort (numbers. begin (), numbers. end ());
for ( size_t i = 0 ; i < range; i++) {
if (numbers[i] != i) {
std::cerr << " vector state is corrupted. " << std::endl;
return - 1 ;
}
}
std::cout << " succeeded pushing range [0 - 10,000,000] concurrently to the vector! " << std::endl;
return 0 ;
} async_condition_variable imitates the standard condition_variable and can be used safely with tasks alongside async_lock . async_condition_variable works with async_lock to suspend a task until some shared memory (protected by the lock) has changed. Tasks that want to monitor shared memory changes will lock an instance of async_lock , and call async_condition_variable::await . This will atomically unlock the lock and suspend the current task until some modifier task notifies the condition variable. A modifier task acquires the lock, modifies the shared memory, unlocks the lock and call either notify_one or notify_all . When a suspended task is resumed (using the resume executor that was given to await ), it locks the lock again, allowing the task to continue from the point of suspension seamlessly. Like async_lock , async_condition_variable is neither movable or copiable - it is meant to be created in one place and accessed by multiple tasks.
async_condition_variable::await overloads require a resume-executor, which will be used to resume the task, and a locked scoped_async_lock . async_condition_variable::await comes with two overloads - one that accepts a predicate and one that doesn't. The overload which does not accept a predicate will suspend the calling task immediately upon invocation until it's resumed by a call to notify_* . The overload which does accept a predicate works by letting the predicate inspect the shared memory and suspend the task repeatedly until the shared memory has reached its wanted state. schematically it works like calling
while (!pred()) { // pred() inspects the shared memory and returns true or false
co_await await (resume_executor, lock); // suspend the current task until another task calls `notify_xxx`
} Just like the standard condition variable, applications are encouraged to use the predicate-overload, as it allows more fine-grained control over suspensions and resumptions. async_condition_variable can be used to write concurrent collections and data-structures like concurrent queues and channels.
Internally, async_condition_variable holds a suspension-queue, in which tasks enqueue themselves when they await the condition variable to be notified. When any of notify_* methods are called, the notifying task dequeues either one task or all of the tasks, depending on the invoked method. Tasks are dequeued from the suspension-queue in a fifo manner. For example, if Task A calls await and then Task B calls await , then Task C calls notify_one , then internally task A will be dequeued and and resumed. Task B will remain suspended until another call to notify_one or notify_all is called. If task A and task B are suspended and task C calls notify_all , then both tasks will be dequeued and resumed.
async_condition_variable API class async_condition_variable {
/*
Constructor.
*/
async_condition_variable () noexcept ;
/*
Atomically releases lock and suspends the current task by adding it to *this suspension-queue.
Throws std::invalid_argument if resume_executor is null.
Throws std::invalid_argument if lock is not locked at the moment of calling this method.
Might throw std::system_error if the underlying std::mutex throws.
*/
lazy_result< void > await (std::shared_ptr<executor> resume_executor, scoped_async_lock& lock);
/*
Equivalent to:
while (!pred()) {
co_await await(resume_executor, lock);
}
Might throw any exception that await(resume_executor, lock) might throw.
Might throw any exception that pred might throw.
*/
template < class predicate_type >
lazy_result< void > await (std::shared_ptr<executor> resume_executor, scoped_async_lock& lock, predicate_type pred);
/*
Dequeues one task from *this suspension-queue and resumes it, if any available at the moment of calling this method.
The suspended task is resumed by scheduling it to run on the executor given when await was called.
Might throw std::system_error if the underlying std::mutex throws.
*/
void notify_one ();
/*
Dequeues all tasks from *this suspension-queue and resumes them, if any available at the moment of calling this method.
The suspended tasks are resumed by scheduling them to run on the executors given when await was called.
Might throw std::system_error if the underlying std::mutex throws.
*/
void notify_all ();
};async_condition_variable example: In this example, async_lock and async_condition_variable work together to implement a concurrent queue that can be used to send data (in this example, integers) between tasks. Note that some methods return a result while another return lazy_result , showing how both eager and lazy tasks can work together.
# include " concurrencpp/concurrencpp.h "
# include < queue >
# include < iostream >
using namespace concurrencpp ;
class concurrent_queue {
private:
async_lock _lock;
async_condition_variable _cv;
std::queue< int > _queue;
bool _abort = false ;
public:
concurrent_queue () = default ;
result< void > shutdown (std::shared_ptr<executor> resume_executor) {
{
auto guard = co_await _lock. lock (resume_executor);
_abort = true ;
}
_cv. notify_all ();
}
lazy_result< void > push (std::shared_ptr<executor> resume_executor, int i) {
{
auto guard = co_await _lock. lock (resume_executor);
_queue. push (i);
}
_cv. notify_one ();
}
lazy_result< int > pop (std::shared_ptr<executor> resume_executor) {
auto guard = co_await _lock. lock (resume_executor);
co_await _cv. await (resume_executor, guard, [ this ] {
return _abort || !_queue. empty ();
});
if (!_queue. empty ()) {
auto result = _queue. front ();
_queue. pop ();
co_return result;
}
assert (_abort);
throw std::runtime_error ( " queue has been shut down. " );
}
};
result< void > producer_loop (executor_tag,
std::shared_ptr<thread_pool_executor> tpe,
concurrent_queue& queue,
int range_start,
int range_end) {
for (; range_start < range_end; ++range_start) {
co_await queue. push (tpe, range_start);
}
}
result< void > consumer_loop (executor_tag, std::shared_ptr<thread_pool_executor> tpe, concurrent_queue& queue) {
try {
while ( true ) {
std::cout << co_await queue. pop (tpe) << std::endl;
}
} catch ( const std:: exception & e) {
std::cerr << e. what () << std::endl;
}
}
int main () {
runtime runtime;
const auto thread_pool_executor = runtime. thread_pool_executor ();
concurrent_queue queue;
result< void > producers[ 4 ];
result< void > consumers[ 4 ];
for ( int i = 0 ; i < 4 ; i++) {
producers[i] = producer_loop ({}, thread_pool_executor, queue, i * 5 , (i + 1 ) * 5 );
}
for ( int i = 0 ; i < 4 ; i++) {
consumers[i] = consumer_loop ({}, thread_pool_executor, queue);
}
for ( int i = 0 ; i < 4 ; i++) {
producers[i]. get ();
}
queue. shutdown (thread_pool_executor). get ();
for ( int i = 0 ; i < 4 ; i++) {
consumers[i]. get ();
}
return 0 ;
} The concurrencpp runtime object is the agent used to acquire, store and create new executors.
The runtime must be created as a value type as soon as the main function starts to run. When the concurrencpp runtime gets out of scope, it iterates over its stored executors and shuts them down one by one by calling executor::shutdown . Executors then exit their inner work loop and any subsequent attempt to schedule a new task will throw a concurrencpp::runtime_shutdown exception. The runtime also contains the global timer queue used to create timers and delay objects. Upon destruction, stored executors destroy unexecuted tasks, and wait for ongoing tasks to finish. If an ongoing task tries to use an executor to spawn new tasks or schedule its own task continuation - an exception will be thrown. In this case, ongoing tasks need to quit as soon as possible, allowing their underlying executors to quit. The timer queue will also be shut down, cancelling all running timers. With this RAII style of code, no tasks can be processed before the creation of the runtime object, and while/after the runtime gets out of scope. This frees concurrent applications from needing to communicate termination messages explicitly. Tasks are free use executors as long as the runtime object is alive.
runtime API class runtime {
/*
Creates a runtime object with default options.
*/
runtime ();
/*
Creates a runtime object with user defined options.
*/
runtime ( const concurrencpp::runtime_options& options);
/*
Destroys this runtime object.
Calls executor::shutdown on each monitored executor.
Calls timer_queue::shutdown on the global timer queue.
*/
~runtime () noexcept ;
/*
Returns this runtime timer queue used to create new times.
*/
std::shared_ptr<concurrencpp::timer_queue> timer_queue () const noexcept ;
/*
Returns this runtime concurrencpp::inline_executor
*/
std::shared_ptr<concurrencpp::inline_executor> inline_executor () const noexcept ;
/*
Returns this runtime concurrencpp::thread_pool_executor
*/
std::shared_ptr<concurrencpp::thread_pool_executor> thread_pool_executor () const noexcept ;
/*
Returns this runtime concurrencpp::background_executor
*/
std::shared_ptr<concurrencpp::thread_pool_executor> background_executor () const noexcept ;
/*
Returns this runtime concurrencpp::thread_executor
*/
std::shared_ptr<concurrencpp::thread_executor> thread_executor () const noexcept ;
/*
Creates a new concurrencpp::worker_thread_executor and registers it in this runtime.
Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
*/
std::shared_ptr<concurrencpp::worker_thread_executor> make_worker_thread_executor ();
/*
Creates a new concurrencpp::manual_executor and registers it in this runtime.
Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
*/
std::shared_ptr<concurrencpp::manual_executor> make_manual_executor ();
/*
Creates a new user defined executor and registers it in this runtime.
executor_type must be a valid concrete class of concurrencpp::executor.
Might throw std::bad_alloc if no memory is available.
Might throw any exception that the constructor of <<executor_type>> might throw.
*/
template < class executor_type , class ... argument_types>
std::shared_ptr<executor_type> make_executor (argument_types&& ... arguments);
/*
returns the version of concurrencpp that the library was built with.
*/
static std::tuple< unsigned int , unsigned int , unsigned int > version () noexcept ;
}; In some cases, applications are interested in monitoring thread creation and termination, for example, some memory allocators require new threads to be registered and unregistered upon their creation and termination. The concurrencpp runtime allows setting a thread creation callback and a thread termination callback. those callbacks will be called whenever one of the concurrencpp workers create a new thread and when that thread is terminating. Those callbacks are always called from inside the created/terminating thread, so std::this_thread::get_id will always return the relevant thread ID. The signature of those callbacks is void callback (std::string_view thread_name) . thread_name is a concurrencpp specific title that is given to the thread and can be observed in some debuggers that present the thread name. The thread name is not guaranteed to be unique and should be used for logging and debugging.
In order to set a thread-creation callback and/or a thread termination callback, applications can set the thread_started_callback and/or thread_terminated_callback members of the runtime_options which is passed to the runtime constructor. Since those callbacks are copied to each concurrencpp worker that might create threads, those callbacks have to be copiable.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
int main () {
concurrencpp::runtime_options options;
options. thread_started_callback = [](std::string_view thread_name) {
std::cout << " A new thread is starting to run, name: " << thread_name << " , thread id: " << std::this_thread::get_id ()
<< std::endl;
};
options. thread_terminated_callback = [](std::string_view thread_name) {
std::cout << " A thread is terminating, name: " << thread_name << " , thread id: " << std::this_thread::get_id () << std::endl;
};
concurrencpp::runtime runtime (options);
const auto timer_queue = runtime. timer_queue ();
const auto thread_pool_executor = runtime. thread_pool_executor ();
concurrencpp::timer timer =
timer_queue-> make_timer ( std::chrono::milliseconds ( 100 ), std::chrono::milliseconds ( 500 ), thread_pool_executor, [] {
std::cout << " A timer callable is executing " << std::endl;
});
std::this_thread::sleep_for ( std::chrono::seconds ( 3 ));
return 0 ;
}Possible output:
A new thread is starting to run, name: concurrencpp::timer_queue worker, thread id: 7496
A new thread is starting to run, name: concurrencpp::thread_pool_executor worker, thread id: 21620
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A timer callable is executing
A thread is terminating, name: concurrencpp::timer_queue worker, thread id: 7496
A thread is terminating, name: concurrencpp::thread_pool_executor worker, thread id: 21620
Applications can create their own custom executor type by inheriting the derivable_executor class. There are a few points to consider when implementing user defined executors: The most important thing is to remember that executors are used from multiple threads, so implemented methods must be thread-safe.
New executors can be created using runtime::make_executor . Applications must not create new executors with plain instantiation (such as std::make_shared or plain new ), only by using runtime::make_executor . Also, applications must not try to re-instantiate the built-in concurrencpp executors, like the thread_pool_executor or the thread_executor , those executors must only be accessed through their existing instances in the runtime object.
Another important point is to handle shutdown correctly: shutdown , shutdown_requested and enqueue should all monitor the executor state and behave accordingly when invoked:
shutdown should tell underlying threads to quit and then join them.shutdown might be called multiple times, and the method must handle this scenario by ignoring any subsequent calls to shutdown after the first invocation.enqueue must throw a concurrencpp::errors::runtime_shutdown exception if shutdown had been called before. task objects Implementing executors is one of the rare cases where applications need to work with concurrencpp::task class directly. concurrencpp::task is an std::function like object, but with a few differences. Like std::function , the task object stores a callable that acts as the asynchronous operation. Unlike std::function , task is a move only type. On invocation, task objects receive no parameters and return void . Moreover, every task object can be invoked only once. After the first invocation, the task object becomes empty. Invoking an empty task object is equivalent to invoking an empty lambda ( []{} ), and will not throw any exception. Task objects receive their callable as a forwarding reference ( type&& where type is a template parameter), and not by copy (like std::function ). Construction of the stored callable happens in-place. This allows task objects to contain callables that are move-only type (like std::unique_ptr and concurrencpp::result ). Task objects try to use different methods to optimize the usage of the stored types, for example, task objects apply the short-buffer-optimization (sbo) for regular, small callables, and will inline calls to std::coroutine_handle<void> by calling them directly without virtual dispatch.
task API class task {
/*
Creates an empty task object.
*/
task () noexcept ;
/*
Creates a task object by moving the stored callable of rhs to *this.
If rhs is empty, then *this will also be empty after construction.
After this call, rhs is empty.
*/
task (task&& rhs) noexcept ;
/*
Creates a task object by storing callable in *this.
<<typename std::decay<callable_type>::type>> will be in-place-
constructed inside *this by perfect forwarding callable.
*/
template < class callable_type >
task (callable_type&& callable);
/*
Destroys stored callable, does nothing if empty.
*/
~task () noexcept ;
/*
If *this is empty, does nothing.
Invokes stored callable, and immediately destroys it.
After this call, *this is empty.
May throw any exception that the invoked callable may throw.
*/
void operator ()();
/*
Moves the stored callable of rhs to *this.
If rhs is empty, then *this will also be empty after this call.
If *this already contains a stored callable, operator = destroys it first.
*/
task& operator =(task&& rhs) noexcept ;
/*
If *this is not empty, task::clear destroys the stored callable and empties *this.
If *this is empty, clear does nothing.
*/
void clear () noexcept ;
/*
Returns true if *this stores a callable. false otherwise.
*/
explicit operator bool () const noexcept ;
/*
Returns true if *this stores a callable,
and that stored callable has the same type as <<typename std::decay<callable_type>::type>>
*/
template < class callable_type >
bool contains () const noexcept ;
}; When implementing user-defined executors, it is up to the implementation to store task objects (when enqueue is called), and execute them according to the executor inner-mechanism.
In this example, we create an executor which logs actions like enqueuing tasks or executing them. We implement the executor interface, and we request the runtime to create and store an instance of it by calling runtime::make_executor . The rest of the application behaves exactly the same as if we were to use non user-defined executors.
# include " concurrencpp/concurrencpp.h "
# include < iostream >
# include < queue >
# include < thread >
# include < mutex >
# include < condition_variable >
class logging_executor : public concurrencpp ::derivable_executor<logging_executor> {
private:
mutable std::mutex _lock;
std::queue<concurrencpp::task> _queue;
std::condition_variable _condition;
bool _shutdown_requested;
std::thread _thread;
const std::string _prefix;
void work_loop () {
while ( true ) {
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
return ;
}
if (!_queue. empty ()) {
auto task = std::move (_queue. front ());
_queue. pop ();
lock. unlock ();
std::cout << _prefix << " A task is being executed " << std::endl;
task ();
continue ;
}
_condition. wait (lock, [ this ] {
return !_queue. empty () || _shutdown_requested;
});
}
}
public:
logging_executor (std::string_view prefix) :
derivable_executor<logging_executor>( " logging_executor " ),
_shutdown_requested ( false ),
_prefix (prefix) {
_thread = std::thread ([ this ] {
work_loop ();
});
}
void enqueue (concurrencpp::task task) override {
std::cout << _prefix << " A task is being enqueued! " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
throw concurrencpp::errors::runtime_shutdown ( " logging executor - executor was shutdown. " );
}
_queue. emplace ( std::move (task));
_condition. notify_one ();
}
void enqueue (std::span<concurrencpp::task> tasks) override {
std::cout << _prefix << tasks. size () << " tasks are being enqueued! " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) {
throw concurrencpp::errors::runtime_shutdown ( " logging executor - executor was shutdown. " );
}
for ( auto & task : tasks) {
_queue. emplace ( std::move (task));
}
_condition. notify_one ();
}
int max_concurrency_level () const noexcept override {
return 1 ;
}
bool shutdown_requested () const noexcept override {
std::unique_lock<std::mutex> lock (_lock);
return _shutdown_requested;
}
void shutdown () noexcept override {
std::cout << _prefix << " shutdown requested " << std::endl;
std::unique_lock<std::mutex> lock (_lock);
if (_shutdown_requested) return ; // nothing to do.
_shutdown_requested = true ;
lock. unlock ();
_condition. notify_one ();
_thread. join ();
}
};
int main () {
concurrencpp::runtime runtime;
auto logging_ex = runtime. make_executor <logging_executor>( " Session #1234 " );
for ( size_t i = 0 ; i < 10 ; i++) {
logging_ex-> post ([] {
std::cout << " hello world " << std::endl;
});
}
std::getchar ();
return 0 ;
}$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S . -B build /lib
$ cmake -- build build /lib --config Release$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S test -B build / test
$ cmake -- build build / test
< # for release mode: cmake --build build/test --config Release #>
$ cd build / test
$ ctest . -V -C Debug
< # for release mode: ctest . -V -C Release #> $ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -DCMAKE_BUILD_TYPE=Release -S . -B build /lib
$ cmake -- build build /lib
#optional, install the library: sudo cmake --install build/lib With clang and gcc, it is also possible to run the tests with TSAN (thread sanitizer) support.
$ git clone https://github.com/David-Haim/concurrencpp.git
$ cd concurrencpp
$ cmake -S test -B build / test
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S test -B build/test
#for TSAN mode: cmake -DCMAKE_BUILD_TYPE=Release -DENABLE_THREAD_SANITIZER=Yes -S test -B build/test
$ cmake -- build build / test
$ cd build / test
$ ctest . -V When compiling on Linux, the library tries to use libstdc++ by default. If you intend to use libc++ as your standard library implementation, CMAKE_TOOLCHAIN_FILE flag should be specified as below:
$ cmake -DCMAKE_TOOLCHAIN_FILE=../cmake/libc++.cmake -DCMAKE_BUILD_TYPE=Release -S . -B build /libAlternatively to building and installing the library manually, developers may get stable releases of concurrencpp via the vcpkg and Conan package managers:
vcpkg:
$ vcpkg install concurrencppConan: concurrencpp on ConanCenter
concurrencpp comes with a built-in sandbox program which developers can modify and experiment, without having to install or link the compiled library to a different code-base. In order to play with the sandbox, developers can modify sandbox/main.cpp and compile the application using the following commands:
$ cmake -S sandbox -B build /sandbox
$ cmake -- build build /sandbox
< # for release mode: cmake --build build/sandbox --config Release #>
$ ./ build /sandbox < # runs the sandbox> $ cmake -S sandbox -B build /sandbox
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S sandbox -B build/sandbox
$ cmake -- build build /sandbox
$ ./ build /sandbox #runs the sandbox