BS::thread_pool : 빠르고 가벼우 며 사용하기 쉬운 C ++ 17 스레드 풀 라이브러리 Barak Shoshany에 의해
이메일 : [email protected]
웹 사이트 : https://baraksh.com/
github : https://github.com/bshoshany
이것은 2024-03-22에 출시 된 도서관 v4.1.0에 대한 완전한 문서입니다.
BS::multi_future<T> 에 대한 자세한 내용BS::synced_stream 과 스트림과 인쇄 동기화BS::timer 로 실행 시간을 측정합니다BS::signaller 사용하여 스레드 사이에 간단한 신호를 보냅니다BS_thread_pool.hpp )BS::thread_pool 클래스BS::thread_pool 클래스의 선택적 기능BS::this_thread 네임 스페이스BS::multi_future<T> 클래스BS_thread_pool_utils.hpp )BS::signaller 클래스BS::synced_stream 클래스BS::timer 클래스 멀티 스레딩은 최신 고성능 컴퓨팅에 필수적입니다. C ++ 11 이후 C ++ 표준 라이브러리에는 std::thread 와 같은 구성을 사용하여 내장 저수준 멀티 스레딩 지원이 포함되어 있습니다. 그러나 std::thread 호출 될 때마다 새 스레드를 생성하여 상당한 성능 오버 헤드를 가질 수 있습니다. 또한 하드웨어가 동시에 처리 할 수있는 것보다 더 많은 스레드를 생성 할 수있어 잠재적으로 실질적인 속도가 저하됩니다.
여기에 제시된 라이브러리에는 C ++ 스레드 풀 클래스 BS::thread_pool 포함되어 있으며,이 문제는 고정 된 스레드 풀을 한 번에 한 번에 생성 한 다음 동일한 스레드를 지속적으로 재사용하여 프로그램 수명 내내 다른 작업을 수행합니다. 기본적으로 풀의 스레드 수는 하드웨어가 병렬로 실행할 수있는 최대 스레드 수와 같습니다.
사용자는 작업을 큐에 실행하도록 제출합니다. 스레드를 사용할 수있게 될 때마다 큐에서 다음 작업을 검색하여 실행합니다. 풀은 각 작업에 대해 std::future 자동으로 생성하므로 사용자는 작업이 실행을 완료하고/하거나 최종 반환 값을 얻을 때까지 대기 할 수 있습니다. 스레드 및 작업은 원하는 작업을 제출하는 것 외에는 사용자의 입력을 필요로하지 않고 백그라운드의 풀에서 자율적으로 관리됩니다.
이 라이브러리의 디자인은 네 가지 중요한 원칙에 의해 안내됩니다. 첫째, 컴팩트 함 : 전체 라이브러리는 선택적인 유틸리티가있는 작은 자체 포함 헤더 파일을 제외하고 다른 구성 요소 나 종속성이없는 자체 포함 헤더 파일 하나만으로 구성됩니다. 둘째, 이식성 : 라이브러리는 컴파일러 확장자 또는 3 자 라이브러리에 의존하지 않고 C ++ 17 표준 라이브러리 만 사용하므로 모든 플랫폼의 현대 표준 정보 C ++ 17 컴파일러와 호환됩니다. 셋째, 사용 편의성 : 라이브러리는 광범위하게 문서화되며 모든 레벨의 프로그래머는 상자에서 바로 사용할 수 있어야합니다.
네 번째이자 최종 지침 원칙은 성능 입니다.이 라이브러리의 모든 코드 라인은 최대한의 성능을 염두에두고 신중하게 설계되었으며 다양한 컴파일러와 플랫폼에서 성능을 테스트하고 검증했습니다. 실제로, 라이브러리는 원래 저자 자체의 계산 집약적 인 과학 컴퓨팅 프로젝트에 사용하도록 설계되었으며, 고급 데스크탑/랩톱 컴퓨터와 고성능 컴퓨팅 노드 모두에서 실행됩니다.
다른 고급 멀티 스레딩 라이브러리는 더 많은 기능 및/또는 더 높은 성능을 제공 할 수 있습니다. 그러나 일반적으로 여러 구성 요소와 종속성이있는 방대한 코드베이스로 구성되며 학습하기 위해 상당한 시간 투자가 필요한 복잡한 API가 포함됩니다. 이 라이브러리는 이러한 고급 라이브러리를 대체하기위한 것이 아닙니다. 대신, 고급 기능이 필요하지 않은 사용자를 위해 설계되었으며, 배우고 사용하기 쉽고 기존 또는 새로운 프로젝트에 쉽게 통합 할 수있는 간단하고 경량 라이브러리를 선호합니다.
#include "BS_thread_pool.hpp" 하면 모두 설정되었습니다!submit_task() 멤버 함수를 사용하여 대기열에 제출 된 모든 작업은 std::future 자동으로 생성하며, 이는 작업이 실행을 완료하거나 최종 반환 값을 얻을 때까지 기다릴 수 있습니다.submit_loop() 멤버 함수를 사용하여 모든 수의 작업에 자동으로 병렬화 될 수 있으며, 이는 모든 병렬 작업의 실행을 한 번에 추적하는 데 사용할 수있는 BS::multi_future 반환합니다.detach_task() 사용하여 작업을 제출할 수 있으며 detach_loop() 사용하여 루프를 병렬화 할 수 있습니다. 이 경우 wait() , wait_for() 및 wait_until() 사용하여 큐의 모든 작업이 완료되기를 기다릴 수 있습니다.BS_thread_pool_test.cpp 철저한 자동화 된 테스트 및 벤치 마크를 수행하는 데 사용될 수 있으며 라이브러리를 올바르게 사용하는 방법에 대한 포괄적 인 예제 역할을합니다. 포함 된 PowerShell 스크립트 BS_thread_pool_test.ps1 여러 컴파일러로 테스트를 실행하는 휴대용 방법을 제공합니다.BS_thread_pool_utils.hpp 에는 여러 유용한 유틸리티 클래스가 포함되어 있습니다.BS::signaller 유틸리티 클래스를 사용하여 스레드간에 간단한 신호를 보냅니다.BS::synced_stream 유틸리티 클래스를 사용하여 병렬로 여러 스레드에서 스트림에 출력을 동기화합니다.BS::timer 유틸리티 클래스를 사용하여 벤치마킹 목적으로 실행 시간을 쉽게 측정하십시오.detach_sequence() 및 submit_sequence() 사용하여 큐에 인덱스에 의해 열거 된 일련의 작업을 제출하십시오.reset() 멤버 함수를 사용하여 필요에 따라 수영장의 스레드 수를 안전하고 날짜로 변경하십시오.get_tasks_queued() , get_tasks_running() 및 get_tasks_total() 멤버 함수를 사용하여 대기열 및/또는 실행 작업 수를 모니터링하십시오.get_thread_count() 사용하여 풀의 현재 스레드 수를 가져옵니다.pause() , unpause() 및 is_paused() 멤버 함수를 사용하여 풀을 자유롭게 일시 중지하고 재개합니다. 일시 중지되면 스레드는 대기열에서 새로운 작업을 검색하지 않습니다.purge() 멤버 함수로 현재 대기열에서 대기하는 모든 작업을 제거하십시오.submit_loop() 통해 submit_task() 된 작업에 의해 제출 된 작업에 의해 예외가 발생합니다.BS::this_thread::get_index() 와 BS::this_thread::get_pool() 사용하여 스레드를 소유 한 풀에 대한 포인터를 사용하여 현재 스레드의 풀 인덱스를 가져옵니다.get_thread_ids() 또는 옵션 get_native_handles() 멤버 함수를 사용하여 구현 정의 스레드 핸들을 사용하여 풀의 모든 스레드에 대한 고유 스레드 ID를 가져옵니다.이 라이브러리는 이러한 컴파일러를 사용할 수있는 모든 운영 체제 및 아키텍처에서 C ++ 17 표준 호환 컴파일러를 성공적으로 컴파일해야합니다. 호환성은 다음 컴파일 및 플랫폼을 사용하여 24 코어 (8p+16E) / 32- 스레드 I9-13900K CPU로 검증되었습니다.
또한이 라이브러리는 GCC V13.2.0을 사용하여 Centos Linux 7.9.2009를 실행하는 2 개의 20 코어 / 40 스레드 인텔 Xeon Gold 6148 CPU (총 40 개의 코어 및 80 스레드의 경우)가 장착 된 캐나다의 디지털 리서치 얼라이언스 노드에서 테스트되었습니다.
테스트 프로그램 BS_thread_pool_test.cpp 는 경고없이 컴파일되었습니다 (경고 플래그 -Wall -Wextra -Wconversion -Wsign-conversion -Wpedantic -Weffc++ -Wshadow /W4 에서 실행 및 모든 자동 테스트를 완료했습니다.
이 라이브러리는 C ++ 17 기능이 필요하므로 코드는 C ++ 17 지원으로 컴파일되어야합니다.
-std=c++17 플래그를 사용하십시오. Linux에서는 Posix 스레드 라이브러리를 활성화하려면 -pthread 플래그를 사용해야합니다./std:c++17 및 /permissive- 표준 일치를 보장합니다.최대의 성능을 위해 사용 가능한 모든 컴파일러 최적화와 함께 컴파일하는 것이 좋습니다.
-O3 플래그를 사용하십시오./O2 사용하십시오. 예를 들어, 테스트 프로그램 BS_thread_pool_test.cpp 경고 및 최적화로 컴파일하려면 다음 명령을 사용하는 것이 좋습니다.
g++ BS_thread_pool_test.cpp -std=c++17 -O3 -Wall -Wextra -Wconversion -Wsign-conversion -Wpedantic -Weffc++ -Wshadow -pthread -o BS_thread_pool_testg++ clang++ 로 교체하십시오.-o BS_thread_pool_test -o BS_thread_pool_test.exe 로 바꾸고 -pthread 제거하십시오.cl BS_thread_pool_test.cpp /std:c++17 /permissive- /O2 /W4 /EHsc /Fo:BS_thread_pool_test.obj /Fe:BS_thread_pool_test.exe BS::thread_pool 설치하려면 GitHub 리포지토리에서 최신 릴리스를 다운로드하고 원하는 폴더에있는 폴더 include 폴더에서 헤더 파일 BS_thread_pool.hpp 배치하고 프로그램에 포함시킵니다.
# include " BS_thread_pool.hpp " 스레드 풀은 이제 BS::thread_pool 클래스를 통해 액세스 할 수 있습니다. 더 빠른 설치를 위해이 URL에서 헤더 파일 자체를 직접 다운로드 할 수 있습니다.
이 라이브러리에는 독립 유틸리티 헤더 파일 BS_thread_pool_utils.hpp 가 제공되며 스레드 풀을 사용할 필요는 없지만 멀티 스레딩에 도움이 될 수있는 유틸리티 클래스를 제공합니다. 이 헤더 파일은 또한 include 폴더에 있습니다. 이 URL에서 직접 다운로드 할 수 있습니다.
이 라이브러리는 VCPKG, Conan, Meson 및 CPM을 포함한 CMAKE를 포함한 다양한 패키지 관리자 및 빌드 시스템에서도 제공됩니다. 자세한 내용은 아래를 참조하십시오.
기본 생성자는 std::thread::hardware_concurrency() 통한 구현에 의해보고 된 바와 같이 하드웨어가 동시에 처리 할 수있는만큼 많은 스레드가있는 스레드 풀을 만듭니다. 이것은 일반적으로 CPU의 코어 수에 의해 결정됩니다. 코어가 하이퍼 스레드 인 경우 두 개의 스레드로 계산됩니다. 예를 들어:
// Constructs a thread pool with as many threads as available in the hardware.
BS::thread_pool pool;선택적으로, 하드웨어 동시성과 다른 다수의 스레드는 생성자에 대한 인수로 지정 될 수있다. 그러나 하드웨어가 처리 할 수있는 것보다 더 많은 스레드를 추가하면 성능이 향상되지 않으며 실제로는이를 방해 할 수 있습니다. 이 옵션은 하드웨어 동시성보다 적은 스레드를 사용할 수 있도록 다른 프로세스에 사용할 수있는 스레드를 남겨 두려는 경우에 존재합니다. 예를 들어:
// Constructs a thread pool with only 12 threads.
BS::thread_pool pool ( 12 );일반적으로 스레드 풀이 사용될 때 프로그램의 기본 스레드는 스레드 풀에 작업을 제출하고 완료되기를 기다려야하며 계산 집약적 작업을 자체적으로 수행해서는 안됩니다. 이 경우 스레드 수에 기본값을 사용하는 것이 좋습니다. 이를 통해 기본 스레드가 기다리는 동안 하드웨어에서 사용 가능한 모든 스레드가 작동하도록합니다.
멤버 함수 get_thread_count() 풀의 스레드 수를 반환합니다. 기본 생성자가 사용 된 경우 std::thread::hardware_concurrency() 와 동일합니다.
스레드 풀의 요점은 스레드 만 한 번만 생성한다는 것이기 때문에 일반적으로 풀의 스레드 수를 생성 한 후에도 실수를 변경하는 것은 불필요합니다. 그러나 필요한 경우 reset() 멤버 함수를 사용하여 안전하고 날짜로 수행 할 수 있습니다.
reset() 현재 실행중인 모든 작업이 완료되기를 기다리지 만 나머지 작업을 대기열에 남겨 둡니다. 그런 다음 스레드 풀을 파괴하고 함수의 인수에 지정된대로 (또는 인수가 주어지지 않은 경우 하드웨어 동시성)와 같이 원하는 새 스레드가있는 새 스레드를 만들게됩니다. 그런 다음 새 스레드 풀은 대기열에 남아있는 작업과 새로운 제출 된 작업을 실행하여 재개됩니다.
원하는 경우이 라이브러리의 버전은 다음 세 가지 매크로에서 편집 시간 동안 읽을 수 있습니다.
BS_THREAD_POOL_VERSION_MAJOR 주요 버전을 나타냅니다.BS_THREAD_POOL_VERSION_MINOR 마이너 버전을 나타냅니다.BS_THREAD_POOL_VERSION_PATCH 패치 버전을 나타냅니다. std::cout << " Thread pool library version is " << BS_THREAD_POOL_VERSION_MAJOR << ' . ' << BS_THREAD_POOL_VERSION_MINOR << ' . ' << BS_THREAD_POOL_VERSION_PATCH << " . n " ;샘플 출력 :
Thread pool library version is 4.1.0.
예를 들어, 동일한 코드베이스가 #if 지시문을 사용하여 도서관의 여러 도서관 버전과 함께 작동하도록하기 위해 사용할 수 있습니다.
참고 : 이 기능은 v4.0.1에서만 사용할 수 있습니다. 이 라이브러리의 이전 릴리스는 이러한 매크로를 정의하지 않습니다.
이 섹션에서는 인수가 없지만 잠재적으로 반환 값으로 대기열에 작업을 제출하는 방법을 배웁니다. 작업이 제출되면 스레드를 사용할 수있게 되 자마자 실행됩니다. 작업 우선 순위가 활성화되지 않는 한 (아래 참조) 작업은 제출 된 순서 (첫 번째, 첫 번째)로 실행됩니다.
예를 들어, 풀에 8 개의 스레드와 빈 줄이 있고 16 개의 작업을 제출 한 경우 16 개의 작업을 제출 한 경우, 처음 8 개의 작업이 병렬로 실행될 것으로 예상되며, 각 스레드가 큐에 남을 때까지 각 스레드가 첫 번째 작업을 실행할 때 나머지 작업이 하나씩 픽업됩니다.
멤버 함수 submit_task() 큐에 작업을 제출하는 데 사용됩니다. 제출하는 작업 인 정확히 하나의 입력이 필요합니다. 이 작업은 인수가없는 함수이어야하지만 반환 값을 가질 수 있습니다. 반환 값은 작업과 관련된 std::future 입니다.
제출 된 함수의 반환 값이 유형 T 의 리턴 값을 갖는 경우, 미래는 std::future<T> 유형이되며 함수가 실행을 완료하면 리턴 값으로 설정됩니다. 제출 된 기능에 반환 값이없는 경우 미래는 std::future<void> 가되며, 이는 값이 없지만 기능이 완료되기를 기다리는 데 여전히 사용될 수 있습니다.
submit_task() 의 반환 값에 auto 사용을 사용하면 컴파일러가 사용할 템플릿 std::future 인스턴스를 자동으로 감지한다는 것을 의미합니다. 그러나 아래 예제에서와 같이 특정 유형 std::future<T> 지정하는 것은 가독성 향상을 위해 권장됩니다.
작업이 완료 될 때까지 기다리려면 미래의 멤버 기능 wait() 사용하십시오. 반환 값을 얻으려면 멤버 함수 get() 사용하면 작업이 아직 완료되지 않으면 자동으로 작업이 완료 될 때까지 기다립니다. 간단한 예는 다음과 같습니다.
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < future > // std::future
# include < iostream > // std::cout
int the_answer ()
{
return 42 ;
}
int main ()
{
BS::thread_pool pool;
std::future< int > my_future = pool. submit_task (the_answer);
std::cout << my_future. get () << ' n ' ;
} 이 예에서는 int 반환하는 함수 the_answer() 함수를 제출했습니다. 따라서 풀의 멤버 기능 submit_task() std::future<int> 반환했습니다. 그런 다음 미래의 get() 멤버 기능을 사용하여 반환 값을 얻고 인쇄했습니다.
사전 정의 된 기능을 제출 한 것 외에도 Lambda 표현식을 사용하여 즉시 작업을 신속하게 정의 할 수 있습니다. Lambda 표현의 관점에서 이전 예제를 다시 작성하면 다음을 얻습니다.
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < future > // std::future
# include < iostream > // std::cout
int main ()
{
BS::thread_pool pool;
std::future< int > my_future = pool. submit_task ([]{ return 42 ; });
std::cout << my_future. get () << ' n ' ;
} 여기서, 람다 표현 []{ return 42; } 두 부분이 있습니다.
[] 로 표시된 빈 캡처 조항. 이것은 컴파일러에 Lambda 표현이 정의되고 있음을 나타냅니다.{ return 42; } 단순히 값 42 반환합니다.일반적으로 미리 정의 된 기능보다는 Lambda 표현식을 제출하는 것이 일반적으로 더 간단하고 빠릅니다. 특히 다음 섹션에서 논의 할 로컬 변수를 캡처 할 수있는 능력으로 인해.
물론 작업은 값을 반환 할 필요가 없습니다. 다음 예에서는 반환 값이없는 함수를 제출 한 다음 미래를 사용하여 실행을 마치기를 기다립니다.
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < chrono > // std::chrono
# include < future > // std::future
# include < iostream > // std::cout
# include < thread > // std::this_thread
int main ()
{
BS::thread_pool pool;
const std::future< void > my_future = pool. submit_task (
[]
{
std::this_thread::sleep_for ( std::chrono::milliseconds ( 500 ));
});
std::cout << " Waiting for the task to complete... " ;
my_future. wait ();
std::cout << " Done. " << ' n ' ;
} 여기서 우리는 Lambda를 여러 줄로 나누어 더 읽기 쉽게 만듭니다. 명령 std::this_thread::sleep_for(std::chrono::milliseconds(500)) 작업에 500 밀리 초 동안 잠을 자도록 지시하여 계산 집약적 인 작업을 시뮬레이션합니다.
이전 섹션에서 언급 한 바와 같이, submit_task() 사용하여 제출 된 작업에는 어떠한 주장도 가질 수 없습니다. 그러나 람다에 함수를 감싸거나 Lambda 캡처를 직접 사용하여 인수로 작업을 쉽게 제출할 수 있습니다. 다음은 두 가지 예입니다.
다음은 Lambda로 마무리하여 인수와 함께 사전 정의 된 기능을 제출하는 예입니다.
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < future > // std::future
# include < iostream > // std::cout
double multiply ( const double lhs, const double rhs)
{
return lhs * rhs;
}
int main ()
{
BS::thread_pool pool;
std::future< double > my_future = pool. submit_task (
[]
{
return multiply ( 6 , 7 );
});
std::cout << my_future. get () << ' n ' ;
} 보시다시피, multiply 로 논쟁을 전달하기 위해 우리는 단순히 Lambda 내부에서 명시 적으로 multiply(6, 7) 라고 불렀습니다. 인수가 리터럴이 아닌 경우, 우리는 Lambda Capture 절을 사용하여 로컬 범위에서 인수를 포착해야합니다.
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < future > // std::future
# include < iostream > // std::cout
double multiply ( const double lhs, const double rhs)
{
return lhs * rhs;
}
int main ()
{
BS::thread_pool pool;
constexpr double first = 6 ;
constexpr double second = 7 ;
std::future< double > my_future = pool. submit_task (
[first, second]
{
return multiply (first, second);
});
std::cout << my_future. get () << ' n ' ;
} 우리는 multiply 기능을 완전히 제거하고 원하는 경우 모든 것을 람다 안에 넣을 수도 있습니다.
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < future > // std::future
# include < iostream > // std::cout
int main ()
{
BS::thread_pool pool;
constexpr double first = 6 ;
constexpr double second = 7 ;
std::future< double > my_future = pool. submit_task (
[first, second]
{
return first * second;
});
std::cout << my_future. get () << ' n ' ;
} 일반적으로 submit_task() 사용하여 대기열에 작업을 제출하는 것이 가장 좋습니다. 이를 통해 작업이 완료되고/또는 나중에 반환 값을 얻을 때까지 기다릴 수 있습니다. 그러나 예를 들어 특정 작업을 "설정하고 잊어 버리기"하거나 작업이 이미 기본 스레드와 통신하거나 조건 변수와 같은 선물을 사용하지 않고 다른 작업과 이미 통신하는 경우 때로는 미래가 필요하지 않습니다.
이 경우 성능을 높이기 위해 미래를 작업에 할당하는 데 관련된 오버 헤드를 피할 수 있습니다. 작업이 기본 스레드에서 분리되어 독립적으로 실행되므로 작업을 "분리"합니다.
분리 작업은 detach_task() 멤버 함수를 사용하여 수행되므로 미래를 생성하지 않고 큐에 대한 작업을 분리 할 수 있습니다. 작업에는 여러 인수가있을 수 있지만 기본 스레드가 해당 값을 검색 할 수있는 방법이 없기 때문에 반환 값을 가질 수는 없습니다.
detach_task() 미래를 반환하지 않기 때문에 작업이 언제 실행되는지 알 수있는 내장 방법이 없습니다. 출력에 의존하는 모든 것을 사용하려고 시도하기 전에 작업이 실행되는지 수동으로 확인해야합니다. 그렇지 않으면 나쁜 일이 일어날 것입니다!
BS::thread_pool 큐의 모든 작업이 큐의 모든 작업이 미래에 분리되었거나 제출되었는지 여부에 관계없이 큐의 모든 작업이 완료되기를 용이하게하기 위해 멤버 기능 wait() 를 제공합니다. wait() 멤버 함수는 std::future 의 wait() 멤버 함수와 유사하게 작동합니다. 예를 들어 다음 코드를 고려하십시오.
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < chrono > // std::chrono
# include < iostream > // std::cout
# include < thread > // std::this_thread
int main ()
{
BS::thread_pool pool;
int result = 0 ;
pool. detach_task (
[&result]
{
std::this_thread::sleep_for ( std::chrono::milliseconds ( 100 ));
result = 42 ;
});
std::cout << result << ' n ' ;
} 이 프로그램은 먼저 result 지정된 로컬 변수를 정의하고 0 으로 초기화합니다. 그런 다음 람다 표현의 형태로 작업을 분리합니다. Lambda는 & 앞에 표시된대로 참조별로 result 캡처합니다. 이는 작업이 result 수정할 수 있으며 이러한 수정이 기본 스레드에 반영됨을 의미합니다. 작업은 42 로 변경 result 먼저 100 밀리 초 동안 잠을 자게됩니다. 메인 스레드가 result 의 값을 인쇄 할 때, 작업은 아직 자고 있기 때문에 그 값을 수정할 시간이 아직 없었습니다. 따라서 프로그램은 초기 값 0 인쇄합니다.
작업이 완료되기를 기다리려면 wait() 멤버 함수를 분리 한 후 사용해야합니다.
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < chrono > // std::chrono
# include < iostream > // std::cout
# include < thread > // std::this_thread
int main ()
{
BS::thread_pool pool;
int result = 0 ;
pool. detach_task (
[&result]
{
std::this_thread::sleep_for ( std::chrono::milliseconds ( 100 ));
result = 42 ;
});
pool. wait ();
std::cout << result << ' n ' ;
} 이제 프로그램은 예상대로 값 42 인쇄합니다. 그러나 wait() 는 우리가 관심을 갖기 전후에 제출 된 다른 작업을 포함하여 대기열의 모든 작업을 기다릴 것입니다. 하나의 작업 만 기다리려면 submit_task() 더 나은 선택이 될 것입니다.
때로는 작업이 완료 될 때까지 기다릴 수 있지만 일정 시간 또는 특정 시점까지만 기다릴 수도 있습니다. 예를 들어, 얼마 후에 작업이 아직 완료되지 않은 경우, 지연이 있음을 사용자에게 알릴 수 있습니다.
submit_task() 사용하여 선물과 함께 제출 된 작업의 경우 std::future 의 두 멤버 기능을 사용하여 달성 할 수 있습니다.
wait_for() 는 작업이 완료되기를 기다립니다. 그러나 std::chrono::duration 의 유형으로 주어진 지정된 기간 이후 대기가 중단됩니다.wait_until() 은 작업이 완료되기를 기다립니다. 그러나 std::chrono::time_point 유형의 인수로 주어진 지정된 시점 후 대기가 중단됩니다. 두 경우 모두, 기능은 미래가 준비되면 future_status::ready 되며, 이는 작업이 완료되고 반환 값이 획득 된 경우를 반환합니다. 그러나 타임 아웃이 만료 될 때까지 미래가 아직 준비되지 않으면 std::future_status::timeout 반환합니다.
예는 다음과 같습니다.
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < chrono > // std::chrono
# include < future > // std::future
# include < iostream > // std::cout
# include < thread > // std::this_thread
int main ()
{
BS::thread_pool pool;
const std::future< void > my_future = pool. submit_task (
[]
{
std::this_thread::sleep_for ( std::chrono::milliseconds ( 1000 ));
std::cout << " Task done! n " ;
});
while ( true )
{
if (my_future. wait_for ( std::chrono::milliseconds ( 200 )) != std::future_status::ready)
std::cout << " Sorry, the task is not done yet. n " ;
else
break ;
}
}출력은 이것과 유사하게 보일 것입니다.
Sorry, the task is not done yet.
Sorry, the task is not done yet.
Sorry, the task is not done yet.
Sorry, the task is not done yet.
Task done!
분리 된 작업의 경우 미래가 없기 때문에이 방법을 사용할 수 없습니다. 그러나 BS::thread_pool 에는 두 개의 멤버 함수 wait_for() 및 wait_until() 이라는 두 개의 멤버 함수가 있으며, 이는 지정된 지속 시간을 대기하거나 지정된 시점까지 대기하지만 모든 작업 (제출 또는 분리 여부)에 대해 그렇게합니다. std::future_status 대신, 스레드 풀의 대기 함수는 모든 작업이 실행 된 경우 true false 반환하거나 지속 시간이 만료되거나 시점에 도달했지만 일부 작업은 여전히 실행 중입니다.
다음은 detach_task() 및 pool.wait_for() 사용하여 위와 동일한 예입니다.
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < chrono > // std::chrono
# include < iostream > // std::cout
# include < thread > // std::this_thread
int main ()
{
BS::thread_pool pool;
pool. detach_task (
[]
{
std::this_thread::sleep_for ( std::chrono::milliseconds ( 1000 ));
std::cout << " Task done! n " ;
});
while ( true )
{
if (!pool. wait_for ( std::chrono::milliseconds ( 200 )))
std::cout << " Sorry, the task is not done yet. n " ;
else
break ;
}
}다음 프로그램을 고려해 봅시다.
# include < iostream > // std::cout, std::boolalpha
class flag_class
{
public:
[[nodiscard]] bool get_flag () const
{
return flag;
}
void set_flag ( const bool arg)
{
flag = arg;
}
private:
bool flag = false ;
};
int main ()
{
flag_class flag_object;
flag_object. set_flag ( true );
std::cout << std::boolalpha << flag_object. get_flag () << ' n ' ;
} 이 프로그램은 클래스 flag_class 의 새 개체 flag_object 생성하고 setter 멤버 함수 set_flag() 사용하여 플래그를 true 로 설정 한 다음 getter member function get_flag() 사용하여 플래그 값을 인쇄합니다.
회원 함수 set_flag() 스레드 풀에 작업으로 제출하려면 어떻게해야합니까? 우리는 단순히 전체 문장을 랩핑합니다 flag_object.set_flag(true); 람다의 줄에서,이 예제에서와 같이 flag_object 람다로 전달하십시오.
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < iostream > // std::cout, std::boolalpha
class flag_class
{
public:
[[nodiscard]] bool get_flag () const
{
return flag;
}
void set_flag ( const bool arg)
{
flag = arg;
}
private:
bool flag = false ;
};
int main ()
{
BS::thread_pool pool;
flag_class flag_object;
pool. submit_task (
[&flag_object]
{
flag_object. set_flag ( true );
})
. wait ();
std::cout << std::boolalpha << flag_object. get_flag () << ' n ' ;
} 물론, 이것은 우리가 반환 된 미래가 아닌 풀 자체에서 wait() 호출하면 detach_task() 에서도 작동합니다.
이 예에서는 submit_task() 에서 미래를 얻은 다음 미래를 기다리는 대신 그 미래에 대해 곧바로 wait() 라고 불렀습니다. 이것은 우리가 그 동안 다른 일이 없다면 작업이 완료되기를 기다리는 일반적인 방법입니다. 또한 우리는 람다를 참조하여 flag_object 전달했습니다. 왜냐하면 우리는 동일한 객체에 플래그를 설정하고 싶기 때문에 사본이 아니라 (값으로 전달되는 변수는 값으로 캡처 한 변수가 암시 적으로 const 이기 때문에) 작동하지 않았을 것입니다.
당신이하고 싶은 또 다른 일은 객체 자체, 즉 다른 멤버 함수의 멤버 함수를 호출하는 것입니다. 이것은 람다에서 this (즉, 현재 객체에 대한 포인터)을 캡처해야한다는 점을 제외하고는 유사한 구문을 따릅니다. 예는 다음과 같습니다.
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < iostream > // std::cout, std::boolalpha
BS::thread_pool pool;
class flag_class
{
public:
[[nodiscard]] bool get_flag () const
{
return flag;
}
void set_flag ( const bool arg)
{
flag = arg;
}
void set_flag_to_true ()
{
pool. submit_task (
[ this ]
{
set_flag ( true );
})
. wait ();
}
private:
bool flag = false ;
};
int main ()
{
flag_class flag_object;
flag_object. set_flag_to_true ();
std::cout << std::boolalpha << flag_object. get_flag () << ' n ' ;
} 이 예에서는 스레드 풀을 전역 객체로 정의하여 main() 함수 외부에서 액세스 할 수 있도록했습니다.
가장 일반적이고 효과적인 병렬화 방법 중 하나는 루프를 작은 루프로 분할하여 병렬로 실행하는 것입니다. 루프의 각 반복은 다른 모든 반복과 완전히 독립적 인 벡터 또는 매트릭스 작업과 같은 "당황스럽게 평행 한"계산에 가장 효과적입니다.
예를 들어, 각각 1000 개의 요소의 2 개의 벡터를 합산하고 10 개의 스레드가있는 경우 각각 100 개의 요소 10 블록으로 합산을 분할하고 모든 블록을 병렬로 실행하여 최대 10 배까지 성능을 높일 수 있습니다.
BS::thread_pool 루프를 자동으로 병렬화 할 수 있습니다. 이것이 어떻게 작동하는지 보려면 다음의 일반 루프를 고려하십시오.
for (T i = start; i < end; ++i)
loop (i);어디:
T 서명하거나 서명되지 않은 정수 유형입니다.[start, end) 범위를 초과합니다. 즉, start 포함하지만 end 제외하고.loop() end - start 요소가있는 배열 수정과 같은 각 루프 인덱스 i 에 대해 수행되는 작업입니다. 이 루프는 자동으로 병렬화되어 멤버 함수 submit_loop() 사용하여 스레드 풀의 대기열에 제출 될 수 있으며 다음 구문이 있습니다.
pool.submit_loop(start, end, loop, num_blocks);어디:
start 범위의 첫 번째 인덱스입니다.end 범위의 마지막 인덱스 다음 인덱스로 전체 범위가 [start, end) 입니다. 다시 말해, start 과 end 동일하면 루프가 위의 루프와 동일합니다.start 과 end 둘 다 같은 정수 유형 T 이어야합니다. 동일한 유형이 아닐 때해야 할 일의 예는 아래를 참조하십시오.end <= start 하면 아무 일도 일어나지 않을 것입니다.loop() 는 루프의 모든 반복에서 실행 해야하는 기능이며 하나의 인수 인 루프 색인을 취합니다.num_blocks 루프를 분할하기위한 양식 [a, b) 의 블록 수입니다. 예를 들어, 범위가 [0, 9) 이고 3 개의 블록이 있으면 블록은 [0, 3) , [3, 6) 및 [6, 9) 입니다.[0, 100) 가 15 블록으로 분할되면 결과는 크기 7의 10 블록이되며, 이는 먼저 실행되고 5 블록의 크기 6이됩니다.각 블록은 별도의 작업으로 스레드 풀의 대기열에 제출됩니다. 따라서 3 블록으로 분할되는 루프는 3 개의 개별 작업으로 분할되며 병렬로 실행될 수 있습니다. 블록이 하나만 있으면 전체 루프가 하나의 작업으로 실행되며 병렬화가 발생하지 않습니다.
위의 일반 루프를 병렬화하려면 다음 명령을 사용합니다.
BS::multi_future< void > loop_future = pool.submit_loop(start, end, loop, num_blocks);
loop_future.wait(); submit_loop() 도우미 클래스 템플릿 BS::multi_future 의 객체를 반환합니다. 이것은 본질적으로 추가 회원 함수와 함께 std::vector<std::future<T>> 의 전문화입니다. 각 num_blocks 블록에는 std::future 지정되며,이 모든 선물은 반환 된 BS::multi_future 내부에 저장됩니다. loop_future.wait() 호출되면 기본 스레드는 submit_loop() 마감에 의해 생성 된 모든 작업이 발생하고 해당 작업 만 대기열에 발생하는 다른 작업만으로 기다립니다. 이것은 본질적으로 BS::multi_future 클래스의 역할입니다. 특정 작업 그룹을 기다리려면이 경우 루프 블록을 실행하는 작업이 있습니다.
num_blocks 에 어떤 가치를 사용해야합니까? 블록 수가 풀의 스레드 수와 같도록이 주장을 생략하는 것은 일반적으로 좋은 선택입니다. 최상의 성능을 얻으려면 각 루프에 대한 최적의 블록 수를 찾기 위해 자체 벤치 마크를 수행하는 것이 좋습니다 ( BS::timer 유틸리티 클래스를 사용할 수 있음). 다른 작업을 병렬로 실행하는 경우 스레드보다 적은 작업을 사용하는 것이 선호 될 수 있습니다. 스레드가있는 것보다 더 많은 작업을 사용하면 경우에 따라 성능이 향상 될 수 있지만 너무 많은 작업으로 병렬화하면 수익이 줄어 듭니다.
간단한 예로, 다음 코드는 0에서 99 사이의 모든 정수의 제곱 테이블을 계산하고 인쇄합니다.
# include < iomanip > // std::setw
# include < iostream > // std::cout
int main ()
{
constexpr unsigned int max = 100 ;
unsigned int squares[max];
for ( unsigned int i = 0 ; i < max; ++i)
squares[i] = i * i;
for ( unsigned int i = 0 ; i < max; ++i)
std::cout << std::setw ( 2 ) << i << " ^2 = " << std::setw ( 4 ) << squares[i] << ((i % 5 != 4 ) ? " | " : " n " );
}우리는 다음과 같이 병렬화 할 수 있습니다.
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < iomanip > // std::setw
# include < iostream > // std::cout
int main ()
{
BS::thread_pool pool ( 10 );
constexpr unsigned int max = 100 ;
unsigned int squares[max];
const BS::multi_future< void > loop_future = pool. submit_loop < unsigned int >( 0 , max,
[&squares]( const unsigned int i)
{
squares[i] = i * i;
});
loop_future. wait ();
for ( unsigned int i = 0 ; i < max; ++i)
std::cout << std::setw ( 2 ) << i << " ^2 = " << std::setw ( 4 ) << squares[i] << ((i % 5 != 4 ) ? " | " : " n " );
} 10 개의 스레드가 있으며 num_blocks 인수를 생략 했으므로 루프는 10 개의 블록으로 나뉘어 각각 10 개의 사각형을 계산합니다.
submit_loop() 는 명시 적 템플릿 매개 변수 <unsigned int> 로 실행되었습니다. 그 이유는 두 루프 지수가 동일한 유형이어야하기 때문입니다. 그러나 여기서 max 는 unsigned int 이고 0 은 (서명) int 이므로 유형이 일치하지 않으며 0 올바른 유형으로 강제하지 않으면 코드가 컴파일되지 않습니다. 템플릿 매개 변수를 사용하여 명시 적으로 인덱스 유형을 지정하여 가장 우아하게 수행 할 수 있습니다.
이것이 자동으로 수행되지 않는 이유 (예 std::common_type 사용하는 것은 실수로 부정적인 지수를 부호없는 유형에 시전하거나 너무 좁은 정수 유형에 정수 지수를 캐스팅하여 잘못된 루프 범위로 이어질 수 있기 때문입니다.
우리는 또한 0 서명되지 않은 INT에 명시 적으로 캐스팅 할 수 있지만, 그다지 좋지는 않습니다.
pool.submit_loop( static_cast < unsigned int >( 0 ), max, /* ... */ );또는 우리는 C 스타일 캐스트를 사용할 수 있습니다.
pool.submit_loop(( unsigned int )( 0 ), max, /* ... */ );또는 정수 문자 접미사를 사용할 수 있습니다.
pool.submit_loop< size_t >( 0U , max, ...);참고로, 여기서 우리는 사각형의 계산과 병렬화했지만 결과를 인쇄하는 것을 병렬화하지는 않았다. 이것은 두 가지 이유입니다.
detach_task() 대 submit_task() 의 경우와 마찬가지로, 때로는 루프를 병렬화하고 싶을 수도 있지만 BS::multi_future 반환 할 필요는 없습니다. 이 경우 동일한 인수와 함께 detach_loop() 대신 submit_loop() 사용하여 선물을 생성하는 오버 헤드 (블록 수에 따라 중요 할 수 있음)를 저장할 수 있습니다.
예를 들어, 위의 제곱 예제 루프를 다음과 같이 분리 할 수 있습니다.
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < iomanip > // std::setw
# include < iostream > // std::cout
int main ()
{
BS::thread_pool pool ( 10 );
constexpr unsigned int max = 100 ;
unsigned int squares[max];
pool. detach_loop < unsigned int >( 0 , max,
[&squares]( const unsigned int i)
{
squares[i] = i * i;
});
pool. wait ();
for ( unsigned int i = 0 ; i < max; ++i)
std::cout << std::setw ( 2 ) << i << " ^2 = " << std::setw ( 4 ) << squares[i] << ((i % 5 != 4 ) ? " | " : " n " );
} Warning: Since detach_loop() does not return a BS::multi_future , there is no built-in way for the user to know when the loop finishes executing. You must use either wait() as we did here, or some other method such as condition variables, to ensure that the loop finishes executing before trying to use anything that depends on its output. Otherwise, bad things will happen!
We have seen that detach_loop() and submit_loop() execute the function loop(i) for each index i in the loop. However, behind the scenes, the loop is split into blocks, and each block executes the loop() function multiple times. Each block has an internal loop of the form (where T is the type of the indices):
for (T i = start; i < end; ++i)
loop (i); The start and end indices of each block are determined automatically by the pool. For example, in the previous section, the loop from 0 to 100 was split into 10 blocks of 10 indices each: start = 0 to end = 10 , start = 10 to end = 20 , and so on; the blocks are not inclusive of the last index, since the for loop has the condition i < end and not i <= end .
However, this also means that the loop() function is executed multiple times per block. This generates additional overhead due to the multiple function calls. For short loops, this should not affect performance. However, for very long loops, with millions of indices, the performance cost may be significate.
For this reason, the thread pool library provides two additional member functions for parallelizing loops: detach_blocks() and submit_blocks() . While detach_loop() and submit_loop() execute a function loop(i) once per index but multiple times per block, detach_blocks() and submit_blocks() execute a function block(start, end) once per block.
The main advantage of this method is increased performance, but the main disadvantage is slightly more complicated code. In particular, the user must define the loop from start to end manually within each block. Here is the previous example using detach_blocks() :
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < iomanip > // std::setw
# include < iostream > // std::cout
int main ()
{
BS::thread_pool pool ( 10 );
constexpr unsigned int max = 100 ;
unsigned int squares[max];
pool. detach_blocks < unsigned int >( 0 , max,
[&squares]( const unsigned int start, const unsigned int end)
{
for ( unsigned int i = start; i < end; ++i)
squares[i] = i * i;
});
pool. wait ();
for ( unsigned int i = 0 ; i < max; ++i)
std::cout << std::setw ( 2 ) << i << " ^2 = " << std::setw ( 4 ) << squares[i] << ((i % 5 != 4 ) ? " | " : " n " );
}Note how the block function takes two arguments, and includes the internal loop.
Generally, compiler optimizations should be able to make detach_loop() and submit_loop() perform roughly the same as detach_blocks() and submit_blocks() . However, you should perform your own benchmarks to see which option works best for your particular use case.
Unlike submit_task() , the member function submit_loop() only takes loop functions with no return values. The reason is that it wouldn't make sense to return a future for every single index of the loop. However, submit_blocks() does allow the block function to have a return value, as the number of blocks will generally not be too large, unlike the number of indices.
The block function will be executed once for each block, but the blocks are managed by the thread pool, with the user only able to select the number of blocks, but not the range of each block. Therefore, there is limited usability in returning one value per block. However, for cases where this is desired, such as for summation or some sorting algorithms, submit_blocks() does accept functions with return values, in which case it returns a BS::multi_future<T> object where T is the type of the return values.
Here's an example of a function template summing all elements of type T in a given range:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < cstdint > // std::uint64_t
# include < future > // std::future
# include < iostream > // std::cout
BS::thread_pool pool;
template < typename T>
T sum (T min, T max)
{
BS::multi_future<T> loop_future = pool. submit_blocks <T>(
min, max + 1 ,
[]( const T start, const T end)
{
T block_total = 0 ;
for (T i = start; i < end; ++i)
block_total += i;
return block_total;
},
100 );
T result = 0 ;
for (std::future<T>& future : loop_future)
result += future. get ();
return result;
}
int main ()
{
std::cout << sum<std:: uint64_t >( 1 , 1'000'000 );
} Here we used the fact that BS::multi_future<T> is a specialization of std::vector<std::future<T>> , so we can use a range-based for loop to iterate over the futures, and use the get() member function of each future to get its value. The values of the futures will be the partial sums from each block, so when we add them up, we will get the total sum. Note that we divided the loop into 100 blocks, so there will be 100 futures in total, each with the partial sum of 10,000 numbers.
The range-based for loop will likely start before the loop finished executing, and each time it calls a future, it will get the value of that future if it is ready, or it will wait until the future is ready and then get the value. This increases performance, since we can start summing the results without waiting for the entire loop to finish executing first - we only need to wait for individual blocks.
If we did want to wait until the entire loop finishes before summing the results, we could have used the get() member function of the BS::multi_future<T> object itself, which returns an std::vector<T> with the values obtained from each future. In that case, the sum could be obtained after calling submit_blocks() as follows:
std::vector<T> partial_sums = loop_future.get();
T result = std::reduce(partial_sums.begin(), partial_sums.end());
return result; The member functions detach_loop() , submit_loop() , detach_blocks() , and submit_blocks() parallelize a loop by splitting it into blocks, and submitting each block as an individual task to the queue, with each such task iterating over all the indices in the corresponding block's range, which can be numerous. However, sometimes we have loops with few indices, or more generally, a sequence of tasks enumerated by some index. In such cases, we can avoid the overhead of splitting into blocks and simply submit each individual index as its own independent task to the pool's queue.
This can be done with detach_sequence() and submit_sequence() . The syntax of these functions is similar to detach_loop() and submit_loop() , except that they don't have the num_blocks argument at the end. The sequence function must take only one argument, the index. As usual, detach_sequence() detaches the tasks and does not return a future, while submit_sequence() returns a BS::multi_future . If the tasks in the sequence return values, then the futures will contain those values, otherwise they will be void futures.
Here is a simple example:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < cstdint > // std::uint64_t
# include < iostream > // std::cout
# include < vector > // std::vector
using ui64 = std:: uint64_t ;
ui64 factorial ( const ui64 n)
{
ui64 result = 1 ;
for (ui64 i = 2 ; i <= n; ++i)
result *= i;
return result;
}
int main ()
{
BS::thread_pool pool;
constexpr ui64 max = 20 ;
BS::multi_future<ui64> sequence_future = pool. submit_sequence <ui64>( 0 , max + 1 , factorial);
std::vector<ui64> factorials = sequence_future. get ();
for (ui64 i = 0 ; i < max + 1 ; ++i)
std::cout << i << " ! = " << factorials[i] << ' n ' ;
}BS::multi_future<T> The helper class template BS::multi_future<T> , which we have been using throughout this section, provides a convenient way to collect and access groups of futures. This class is a specialization of std::vector<T> , so it should be used in a similar way:
[] operator to access the future at a specific index, or the push_back() member function to append a new future to the list.size() member function tells you how many futures are currently stored in the object. However, BS::multi_future<T> also has additional member functions that are aimed specifically at handling futures:
wait() to wait for all of them at once or get() to get an std::vector<T> with the results from all of them.ready_count() .valid() .wait_for() or wait until a specific time with wait_until() . These functions return true if all futures have been waited for before the duration expired or the time point was reached, and false otherwise. Aside from using BS::multi_future<T> to track the execution of parallelized loops, it can also be used, for example, whenever you have several different groups of tasks and you want to track the execution of each group individually.
The optional header file BS_thread_pool_utils.hpp contains several useful utility classes. These are not necessary for using the thread pool itself; BS_thread_pool.hpp is the only header file required. However, the utility classes can make writing multithreading code more convenient.
As with the main header file, the version of the utilities header file can be found by checking three macros:
BS_THREAD_POOL_UTILS_VERSION_MAJOR - indicates the major version.BS_THREAD_POOL_UTILS_VERSION_MINOR - indicates the minor version.BS_THREAD_POOL_UTILS_VERSION_PATCH - indicates the patch version.BS::synced_streamWhen printing to an output stream from multiple threads in parallel, the output may become garbled. For example, consider this code:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < iostream > // std::cout
BS::thread_pool pool;
int main ()
{
pool. detach_sequence ( 0 , 5 ,
[]( int i)
{
std::cout << " Task no. " << i << " executing. n " ;
});
}The output will be a mess similar to this:
Task no. Task no. Task no. 3 executing.
0 executing.
Task no. 41 executing.
Task no. 2 executing.
executing.
The reason is that, although each individual insertion to std::cout is thread-safe, there is no mechanism in place to ensure subsequent insertions from the same thread are printed contiguously.
The utility class BS::synced_stream is designed to eliminate such synchronization issues. The constructor takes one optional argument, specifying the output stream to print to. If no argument is supplied, std::cout will be used:
// Construct a synced stream that will print to std::cout.
BS::synced_stream sync_out;
// Construct a synced stream that will print to the output stream my_stream.
BS::synced_stream sync_out (my_stream); The member function print() takes an arbitrary number of arguments, which are inserted into the stream one by one, in the order they were given. println() does the same, but also prints a newline character n at the end, for convenience. A mutex is used to synchronize this process, so that any other calls to print() or println() using the same BS::synced_stream object must wait until the previous call has finished.
As an example, this code:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
BS::synced_stream sync_out;
BS::thread_pool pool;
int main ()
{
pool. detach_sequence ( 0 , 5 ,
[]( int i)
{
sync_out. println ( " Task no. " , i, " executing. " );
});
}Will print out:
Task no. 0 executing.
Task no. 1 executing.
Task no. 2 executing.
Task no. 3 executing.
Task no. 4 executing.
Warning: Always create the BS::synced_stream object before the BS::thread_pool object, as we did in this example. When the BS::thread_pool object goes out of scope, it waits for the remaining tasks to be executed. If the BS::synced_stream object goes out of scope before the BS::thread_pool object, then any tasks using the BS::synced_stream will crash. Since objects are destructed in the opposite order of construction, creating the BS::synced_stream object before the BS::thread_pool object ensures that the BS::synced_stream is always available to the tasks, even while the pool is destructing.
Most stream manipulators defined in the headers <ios> and <iomanip> , such as std::setw (set the character width of the next output), std::setprecision (set the precision of floating point numbers), and std::fixed (display floating point numbers with a fixed number of digits), can be passed to print() and println() just as you would pass them to a stream.
The only exceptions are the flushing manipulators std::endl and std::flush , which will not work because the compiler will not be able to figure out which template specializations to use. Instead, use BS::synced_stream::endl and BS::synced_stream::flush . Here is an example:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
# include < cmath > // std::sqrt
# include < iomanip > // std::setprecision, std::setw
# include < ios > // std::fixed
BS::synced_stream sync_out;
BS::thread_pool pool;
int main ()
{
sync_out. print ( std::setprecision ( 10 ), std::fixed);
pool. detach_sequence ( 0 , 16 ,
[]( int i)
{
sync_out. print ( " The square root of " , std::setw ( 2 ), i, " is " , std::sqrt (i), " . " , BS::synced_stream::endl);
});
} Note, however, that BS::synced_stream::endl should only be used if flushing is desired; otherwise, a newline character should be used instead.
BS::timerIf you are using a thread pool, then your code is most likely performance-critical. Achieving maximum performance requires performing a considerable amount of benchmarking to determine the optimal settings and algorithms. Therefore, it is important to be able to measure the execution time of various computations and operations under different conditions.
The utility class BS::timer provides a simple way to measure execution time. It is very straightforward to use:
BS::timer object.start() member function.stop() member function.ms() to obtain the elapsed time for the computation in milliseconds.current_ms() to obtain the elapsed time so far but keep the timer ticking.예를 들어:
BS::timer tmr;
tmr.start();
do_something ();
tmr.stop();
std::cout << " The elapsed time was " << tmr.ms() << " ms. n " ; A practical application of the BS::timer class can be found in the benchmark portion of the test program BS_thread_pool_test.cpp .
BS::signaller BS::signaller is a utility class which can be used to allow simple signalling between threads. To use it, construct an object and then pass it to the different threads. Multiple threads can call the wait() member function of the signaller. When another thread calls the ready() member function, the waiting threads will stop waiting.
That's really all there is to it; BS::signaller is really just a convenient wrapper around std::promise , which contains both the promise and its future. For usage examples, please see the test program BS_thread_pool_test.cpp .
Sometimes you may wish to monitor what is happening with the tasks you submitted to the pool. This may be done using three member functions:
get_tasks_queued() gets the number of tasks currently waiting in the queue to be executed by the threads.get_tasks_running() gets the number of tasks currently being executed by the threads.get_tasks_total() gets the total number of unfinished tasks: either still in the queue, or running in a thread.get_tasks_total() == get_tasks_queued() + get_tasks_running() .These functions are demonstrated in the following program:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
# include < chrono > // std::chrono
# include < thread > // std::this_thread
BS::synced_stream sync_out;
BS::thread_pool pool ( 4 );
void sleep_half_second ( const int i)
{
std::this_thread::sleep_for ( std::chrono::milliseconds ( 500 ));
sync_out. println ( " Task " , i, " done. " );
}
void monitor_tasks ()
{
sync_out. println (pool. get_tasks_total (), " tasks total, " , pool. get_tasks_running (), " tasks running, " , pool. get_tasks_queued (), " tasks queued. " );
}
int main ()
{
pool. wait ();
pool. detach_sequence ( 0 , 12 , sleep_half_second);
monitor_tasks ();
std::this_thread::sleep_for ( std::chrono::milliseconds ( 750 ));
monitor_tasks ();
std::this_thread::sleep_for ( std::chrono::milliseconds ( 500 ));
monitor_tasks ();
std::this_thread::sleep_for ( std::chrono::milliseconds ( 500 ));
monitor_tasks ();
}Assuming you have at least 4 hardware threads (so that 4 tasks can run concurrently), the output should be similar to:
12 tasks total, 0 tasks running, 12 tasks queued.
Task 0 done.
Task 1 done.
Task 2 done.
Task 3 done.
8 tasks total, 4 tasks running, 4 tasks queued.
Task 4 done.
Task 5 done.
Task 6 done.
Task 7 done.
4 tasks total, 4 tasks running, 0 tasks queued.
Task 8 done.
Task 9 done.
Task 10 done.
Task 11 done.
0 tasks total, 0 tasks running, 0 tasks queued.
The reason we called pool.wait() in the beginning is that when the thread pool is created, an initialization task runs in each thread, so if we don't wait, the first line will say there are 16 tasks in total, including the 4 initialization tasks. 자세한 내용은 아래를 참조하십시오.
Consider a situation where the user cancels a multithreaded operation while it is still ongoing. Perhaps the operation was split into multiple tasks, and half of the tasks are currently being executed by the pool's threads, but the other half are still waiting in the queue.
The thread pool cannot terminate the tasks that are already running, as the C++17 standard does not provide that functionality (and in any case, abruptly terminating a task while it's running could have extremely bad consequences, such as memory leaks and data corruption). However, the tasks that are still waiting in the queue can be purged using the purge() member function.
Once purge() is called, any tasks still waiting in the queue will be discarded, and will never be executed by the threads. Please note that there is no way to restore the purged tasks; they are gone forever.
Consider for example the following program:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
# include < chrono > // std::chrono
# include < thread > // std::this_thread
BS::synced_stream sync_out;
BS::thread_pool pool ( 4 );
int main ()
{
for ( size_t i = 0 ; i < 8 ; ++i)
{
pool. detach_task (
[i]
{
std::this_thread::sleep_for ( std::chrono::milliseconds ( 100 ));
sync_out. println ( " Task " , i, " done. " );
});
}
std::this_thread::sleep_for ( std::chrono::milliseconds ( 50 ));
pool. purge ();
pool. wait ();
} The program submit 8 tasks to the queue. Each task waits 100 milliseconds and then prints a message. The thread pool has 4 threads, so it will execute the first 4 tasks in parallel, and then the remaining 4. We wait 50 milliseconds, to ensure that the first 4 tasks have all started running. Then we call purge() to purge the remaining 4 tasks. As a result, these tasks never get executed. However, since the first 4 tasks are still running when purge() is called, they will finish uninterrupted; purge() only discards tasks that have not yet started running. The output of the program therefore only contains the messages from the first 4 tasks:
Task 0 done.
Task 1 done.
Task 2 done.
Task 3 done.
submit_task() catches any exceptions thrown by the submitted task and forwards them to the corresponding future. They can then be caught when invoking the get() member function of the future. 예를 들어:
# include " BS_thread_pool.hpp "
BS::synced_stream sync_out;
BS::thread_pool pool;
double inverse ( const double x)
{
if (x == 0 )
throw std::runtime_error ( " Division by zero! " );
else
return 1 / x;
}
int main ()
{
constexpr double num = 0 ;
std::future< double > my_future = pool. submit_task (inverse, num);
try
{
const double result = my_future. get ();
sync_out. println ( " The inverse of " , num, " is " , result, " . " );
}
catch ( const std:: exception & e)
{
sync_out. println ( " Caught exception: " , e. what ());
}
}The output will be:
Caught exception: Division by zero!
However, if you change num to any non-zero number, no exceptions will be thrown and the inverse will be printed.
It is important to note that wait() does not throw any exceptions; only get() does. Therefore, even if your task does not return anything, ie your future is an std::future<void> , you must still use get() on the future obtained from it if you want to catch exceptions thrown by it. Here is an example:
# include " BS_thread_pool.hpp "
BS::synced_stream sync_out;
BS::thread_pool pool;
void print_inverse ( const double x)
{
if (x == 0 )
throw std::runtime_error ( " Division by zero! " );
else
sync_out. println ( " The inverse of " , x, " is " , 1 / x, " . " );
}
int main ()
{
constexpr double num = 0 ;
std::future< void > my_future = pool. submit_task (print_inverse, num);
try
{
my_future. get ();
}
catch ( const std:: exception & e)
{
sync_out. println ( " Caught exception: " , e. what ());
}
} When using BS::multi_future to handle multiple futures at once, exception handling works the same way: if any of the futures may throw exceptions, you may catch these exceptions when calling get() , even in the case of BS::multi_future<void> .
If you do not require exception handling, or if exceptions are explicitly disabled in your codebase, you can define the macro BS_THREAD_POOL_DISABLE_EXCEPTION_HANDLING before including BS_thread_pool.hpp , which will disable exception handling in submit_task() . Note that if the feature-test macro __cpp_exceptions is undefined, BS_THREAD_POOL_DISABLE_EXCEPTION_HANDLING will be automatically defined.
BS::thread_pool comes with a variety of methods to obtain information about the threads in the pool:
BS::this_thread provides functionality similar to std::this_thread . If the current thread belongs to a BS::thread_pool object, then BS::this_thread::get_index() can be used to get the index of the current thread, and BS::this_thread::get_pool() can be used to get the pointer to the thread pool that owns the current thread. Please see the reference below for more details.get_thread_ids() returns a vector containing the unique identifiers for each of the pool's threads, as obtained by std::thread::get_id() . These values are not so useful on their own, but can be used for whatever the user wants to use them for.get_native_handles() , if enabled, returns a vector containing the underlying implementation-defined thread handles for each of the pool's threads, as obtained by std::thread::native_handle() . For more information, see the relevant section below. Sometimes, it is necessary to initialize the threads before they run any tasks. This can be done by submitting a proper initialization function to the constructor or to reset() , either as the only argument or as the second argument after the desired number of threads. The thread initialization must take no arguments and have no return value. However, if needed, the function can use BS::this_thread::get_index() and BS::this_thread::get_pool() to figure out which thread and pool it belongs to.
The thread initialization function is submitted as a set of special tasks, one per thread, which bypass the queue, but still count towards the number of running tasks, which means get_tasks_total() and get_tasks_running() will report that these tasks are running if they are checked immediately after the pool is initialized.
This is done so that the user has the option to either wait for the initialization tasks to finish, by calling wait() on the pool, or just keep going. In either case, the initialization tasks will always finish executing before any tasks are picked out of the queue, so there is no reason to wait for them to finish unless they have some side-effects that affect the main thread.
Here is a simple example:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
# include < random > // std::mt19937_64, std::random_device
BS::synced_stream sync_out;
thread_local std::mt19937_64 twister;
int main ()
{
BS::thread_pool pool (
[]
{
twister. seed ( std::random_device ()());
});
pool. submit_sequence ( 0 , 4 ,
[]( int )
{
sync_out. println ( " I generated a random number: " , twister ());
})
. wait ();
} In this example, we create a thread_local Mersenne twister engine, meaning that each thread has its own independent engine. However, we did not seed the engine, so each thread will generate the exact same sequence of pseudo-random numbers. To remedy this, we pass an initialization function to the BS::thread_pool constructor which seeds the twister in each thread with the (hopefully) non-deterministic random number generator std::random_device .
In C++, it is often crucial to pass function arguments by reference or constant reference, instead of by value. This allows the function to access the object being passed directly, rather than creating a new copy of the object. We have already seen that submitting an argument by reference is a simple matter of capturing it with a & in the lambda capture list. To submit as constant reference, we can use std::as_const as in the following example:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
# include < utility > // std::as_const
BS::synced_stream sync_out;
void increment ( int & x)
{
++x;
}
void print ( const int & x)
{
sync_out. println (x);
}
int main ()
{
BS::thread_pool pool;
int n = 0 ;
pool. submit_task (
[&n]
{
increment (n);
})
. wait ();
pool. submit_task (
[&n = std::as_const (n)]
{
print (n);
})
. wait ();
} The increment() function takes a reference to an integer, and increments that integer. Passing the argument by reference guarantees that n itself, in the scope of main() , will be incremented - rather than a copy of it in the scope of increment() .
Similarly, the print() function takes a constant reference to an integer, and prints that integer. Passing the argument by constant reference guarantees that the variable will not be accidentally modified by the function, even though we are accessing n itself, rather than a copy. If we replace print with increment , the program won't compile, as increment cannot take constant references.
Generally, it is not really necessary to pass arguments by constant reference, but it is more "correct" to do so, if we would like to guarantee that the variable being referenced is indeed never modified. This section is therefore included here for completeness.
Sometimes you may wish to temporarily pause the execution of tasks, or perhaps you want to submit tasks to the queue in advance and only start executing them at a later time. You can do this using the member functions pause() , unpause() , and is_paused() .
However, these functions are disabled by default, and must be explicitly enabled by defining the macro BS_THREAD_POOL_ENABLE_PAUSE before including BS_thread_pool.hpp . The reason is that pausing the pool adds additional checks to the waiting and worker functions, which have a very small but non-zero overhead.
When you call pause() , the workers will temporarily stop retrieving new tasks out of the queue. However, any tasks already executed will keep running until they are done, since the thread pool has no control over the internal code of your tasks. If you need to pause a task in the middle of its execution, you must do that manually by programming your own pause mechanism into the task itself. To resume retrieving tasks, call unpause() . To check whether the pool is currently paused, call is_paused() .
Here is an example:
# define BS_THREAD_POOL_ENABLE_PAUSE
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
# include < chrono > // std::chrono
# include < thread > // std::this_thread
BS::synced_stream sync_out;
BS::thread_pool pool ( 4 );
void sleep_half_second ( const int i)
{
std::this_thread::sleep_for ( std::chrono::milliseconds ( 500 ));
sync_out. println ( " Task " , i, " done. " );
}
void check_if_paused ()
{
if (pool. is_paused ())
sync_out. println ( " Pool paused. " );
else
sync_out. println ( " Pool unpaused. " );
}
int main ()
{
pool. detach_sequence ( 0 , 8 , sleep_half_second);
sync_out. println ( " Submitted 8 tasks. " );
std::this_thread::sleep_for ( std::chrono::milliseconds ( 250 ));
pool. pause ();
check_if_paused ();
std::this_thread::sleep_for ( std::chrono::milliseconds ( 1000 ));
sync_out. println ( " Still paused... " );
std::this_thread::sleep_for ( std::chrono::milliseconds ( 1000 ));
pool. detach_sequence ( 8 , 12 , sleep_half_second);
sync_out. println ( " Submitted 4 more tasks. " );
sync_out. println ( " Still paused... " );
std::this_thread::sleep_for ( std::chrono::milliseconds ( 1000 ));
pool. unpause ();
check_if_paused ();
}Assuming you have at least 4 hardware threads, the output should be similar to:
Submitted 8 tasks.
Pool paused.
Task 0 done.
Task 1 done.
Task 2 done.
Task 3 done.
Still paused...
Submitted 4 more tasks.
Still paused...
Pool unpaused.
Task 4 done.
Task 5 done.
Task 6 done.
Task 7 done.
Task 8 done.
Task 9 done.
Task 10 done.
Task 11 done.
Here is what happened. We initially submitted a total of 8 tasks to the queue. Since we waited for 250ms before pausing, the first 4 tasks have already started running, so they kept running until they finished. While the pool was paused, we submitted 4 more tasks to the queue, but they just waited at the end of the queue. When we unpaused, the remaining 4 initial tasks were executed, followed by the 4 new tasks.
While the workers are paused, wait() will wait for the running tasks instead of all tasks (otherwise it would wait forever). This is demonstrated by the following program:
# define BS_THREAD_POOL_ENABLE_PAUSE
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
# include < chrono > // std::chrono
# include < thread > // std::this_thread
BS::synced_stream sync_out;
BS::thread_pool pool ( 4 );
void sleep_half_second ( const int i)
{
std::this_thread::sleep_for ( std::chrono::milliseconds ( 500 ));
sync_out. println ( " Task " , i, " done. " );
}
void check_if_paused ()
{
if (pool. is_paused ())
sync_out. println ( " Pool paused. " );
else
sync_out. println ( " Pool unpaused. " );
}
int main ()
{
pool. detach_sequence ( 0 , 8 , sleep_half_second);
sync_out. println ( " Submitted 8 tasks. Waiting for them to complete. " );
pool. wait ();
pool. detach_sequence ( 8 , 20 , sleep_half_second);
sync_out. println ( " Submitted 12 more tasks. " );
std::this_thread::sleep_for ( std::chrono::milliseconds ( 250 ));
pool. pause ();
check_if_paused ();
sync_out. println ( " Waiting for the " , pool. get_tasks_running (), " running tasks to complete. " );
pool. wait ();
sync_out. println ( " All running tasks completed. " , pool. get_tasks_queued (), " tasks still queued. " );
std::this_thread::sleep_for ( std::chrono::milliseconds ( 1000 ));
sync_out. println ( " Still paused... " );
std::this_thread::sleep_for ( std::chrono::milliseconds ( 1000 ));
sync_out. println ( " Still paused... " );
std::this_thread::sleep_for ( std::chrono::milliseconds ( 1000 ));
pool. unpause ();
check_if_paused ();
std::this_thread::sleep_for ( std::chrono::milliseconds ( 250 ));
sync_out. println ( " Waiting for the remaining " , pool. get_tasks_total (), " tasks ( " , pool. get_tasks_running (), " running and " , pool. get_tasks_queued (), " queued) to complete. " );
pool. wait ();
sync_out. println ( " All tasks completed. " );
}The output should be similar to:
Submitted 8 tasks. Waiting for them to complete.
Task 0 done.
Task 1 done.
Task 2 done.
Task 3 done.
Task 4 done.
Task 5 done.
Task 6 done.
Task 7 done.
Submitted 12 more tasks.
Pool paused.
Waiting for the 4 running tasks to complete.
Task 8 done.
Task 9 done.
Task 10 done.
Task 11 done.
All running tasks completed. 8 tasks still queued.
Still paused...
Still paused...
Pool unpaused.
Waiting for the remaining 8 tasks (4 running and 4 queued) to complete.
Task 12 done.
Task 13 done.
Task 14 done.
Task 15 done.
Task 16 done.
Task 17 done.
Task 18 done.
Task 19 done.
All tasks completed.
The first wait() , which was called while the pool was not paused, waited for all 8 tasks, both running and queued. The second wait() , which was called after pausing the pool, only waited for the 4 running tasks, while the other 8 tasks remained queued, and were not executed since the pool was paused. Finally, the third wait() , which was called after unpausing the pool, waited for the remaining 8 tasks, both running and queued.
Warning: If the thread pool is destroyed while paused, any tasks still in the queue will never be executed!
Consider the following program:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < iostream > // std::cout
int main ()
{
BS::thread_pool pool;
pool. detach_task (
[&pool]
{
pool. wait ();
std::cout << " Done waiting. n " ;
});
}This program creates a thread pool, and then detaches a task that waits for tasks in the same thread pool to complete. If you run this program, it will never print the message "Done waiting", because the task will wait for itself to complete. This causes a deadlock , and the program will wait forever.
Usually, in simple programs, this will never happen. However, in more complicated programs, perhaps ones running multiple thread pools in parallel, wait deadlocks could potentially occur. In such cases, the macro BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK can be defined before including BS_thread_pool.hpp . wait() will then check whether the user tried to call it from within a thread of the same pool, and if so, it will throw the exception BS::thread_pool::wait_deadlock instead of waiting. This check is disabled by default because wait deadlocks are not something that happens often, and the check adds a small but non-zero overhead every time wait() is called.
Here is an example:
# define BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < iostream > // std::cout
int main ()
{
BS::thread_pool pool;
pool. detach_task (
[&pool]
{
try
{
pool. wait ();
std::cout << " Done waiting. n " ;
}
catch ( const BS::thread_pool::wait_deadlock&)
{
std::cout << " Error: Deadlock! n " ;
}
});
} This time, wait() will detect the deadlock, and will throw an exception, causing the output to be "Error: Deadlock!" .
Note that if the feature-test macro __cpp_exceptions is undefined, BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK will be automatically undefined.
The BS::thread_pool member function get_native_handles() returns a vector containing the underlying implementation-defined thread handles for each of the pool's threads. These can then be used in an implementation-specific way to manage the threads at the OS level
However, note that this will generally not be portable code. Furthermore, this feature uses std::thread::native_handle(), which is in the C++ standard library, but is not guaranteed to be present on all systems. Therefore, this feature is turned off by default, and must be turned on by defining the macro BS_THREAD_POOL_ENABLE_NATIVE_HANDLES before including BS_thread_pool.hpp .
Here is an example:
# define BS_THREAD_POOL_ENABLE_NATIVE_HANDLES
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
# include < thread > // std::thread
# include < vector > // std::vector
BS::synced_stream sync_out;
BS::thread_pool pool ( 4 );
int main ()
{
std::vector<std::thread::native_handle_type> handles = pool. get_native_handles ();
for (BS:: concurrency_t i = 0 ; i < handles. size (); ++i)
sync_out. println ( " Thread " , i, " native handle: " , handles[i]);
}The output will depend on your compiler and operating system. Here is an example:
Thread 0 native handle: 000000F4
Thread 1 native handle: 000000F8
Thread 2 native handle: 000000EC
Thread 3 native handle: 000000FC
Defining the macro BS_THREAD_POOL_ENABLE_PRIORITY before including BS_thread_pool.hpp enables task priority. The priority of a task or group of tasks may then be specified as an additional argument (at the end of the argument list) to detach_task() , submit_task() , detach_blocks() , submit_blocks() , detach_loop() , submit_loop() , detach_sequence() , and submit_sequence() . If the priority is not specified, the default value will be 0.
The priority is a number of type BS::priority_t , which is a signed 16-bit integer, so it can have any value between -32,768 and 32,767. The tasks will be executed in priority order from highest to lowest. If priority is assigned to the block/loop/sequence parallelization functions, which submit multiple tasks, then all of these tasks will have the same priority.
The namespace BS::pr contains some pre-defined priorities for users who wish to avoid magic numbers and enjoy better future-proofing. In order of decreasing priority, the pre-defined priorities are: BS::pr::highest , BS::pr::high , BS::pr::normal , BS::pr::low , and BS::pr::lowest .
Here is a simple example:
# define BS_THREAD_POOL_ENABLE_PRIORITY
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
BS::synced_stream sync_out;
BS::thread_pool pool ( 1 );
int main ()
{
pool. detach_task ([] { sync_out. println ( " This task will execute third. " ); }, BS::pr:: normal );
pool. detach_task ([] { sync_out. println ( " This task will execute fifth. " ); }, BS::pr::lowest);
pool. detach_task ([] { sync_out. println ( " This task will execute second. " ); }, BS::pr::high);
pool. detach_task ([] { sync_out. println ( " This task will execute first. " ); }, BS::pr::highest);
pool. detach_task ([] { sync_out. println ( " This task will execute fourth. " ); }, BS::pr::low);
}This program will print out the tasks in the correct priority order. Note that for simplicity, we used a pool with just one thread, so the tasks will run one at a time. In a pool with 5 or more threads, all 5 tasks will actually run more or less at the same time, because, for example, the task with the second-highest priority will be picked up by another thread while the task with the highest priority is still running.
Of course, this is just a pedagogical example. In a realistic use case we may want, for example, to submit tasks that must be completed immediately with high priority so they skip over other tasks already in the queue, or background non-urgent tasks with low priority so they evaluate only after higher-priority tasks are done.
Here are some subtleties to note when using task priority:
std::priority_queue , which has O(log n) complexity for storing new tasks, but only O(1) complexity for retrieving the next (ie highest-priority) task. This is in contrast with std::queue , used if priority is disabled, which both stores and retrieves with O(1) complexity.std::priority_queue as a binary heap, which means tasks are stored as a binary tree instead of sequentially. To execute tasks in submission order, give them monotonically decreasing priorities.BS::priority_t is defined to be ( std::int_least16_t ), since this type is guaranteed to be present on all systems, rather than std::int16_t , which is optional in the C++ standard. This means that on some exotic systems BS::priority_t may actually have more than 16 bits. However, the pre-defined priorities are 100% portable, and will always have the same values (eg: BS::pr::highest = 32767 ) regardless of the actual bit width. The file BS_thread_pool_test.cpp in the tests folder of the GitHub repository will perform automated tests of all aspects of the library. The output will be printed both to std::cout and to a file with the same name as the executable and the suffix -yyyy-mm-dd_hh.mm.ss.log based on the current date and time. In addition, the code is meant to serve as an extensive example of how to properly use the library.
Please make sure to:
BS_thread_pool_test.cpp with optimization flags enabled (eg -O3 on GCC / Clang or /O2 on MSVC).The test program also takes command line arguments for automation purposes:
help : Show a help message and exit. Any other arguments will be ignored.log : Create a log file.tests : Perform standard tests.deadlock Perform long deadlock tests.benchmarks : Perform benchmarks. If no options are entered, the default is: log tests benchmarks .
By default, the test program enables all the optional features by defining the suitable macros, so it can test them. However, if the macro BS_THREAD_POOL_LIGHT_TEST is defined during compilation, the optional features will not be tested.
A PowerShell script, BS_thread_pool_test.ps1 , is provided for your convenience in the tests folder to make running the test on multiple compilers and operating systems easier. Since it is written in PowerShell, it is fully portable and works on Windows, Linux, and macOS. The script will automatically detect if Clang, GCC, and/or MSVC are available, and compile the test program using each available compiler twice - with and without all the optional features. It will then run each compiled test program and report on any errors.
If any of the tests fail, please submit a bug report including the exact specifications of your system (OS, CPU, compiler, etc.) and the generated log file.
If all checks passed, BS_thread_pool_test.cpp performs simple benchmarks by filling a very large vector with values using detach_blocks() . The program decides what the size of the vector should be by testing how many elements are needed to reach a certain target duration when parallelizing using a number of blocks equal to the number of threads. This ensures that the test takes approximately the same amount of time on all systems, and is thus more consistent and portable.
Once the appropriate size of the vector has been determined, the program allocates the vector and fills it with values, calculated according to a fixed prescription. This operation is performed both single-threaded and multithreaded, with the multithreaded computation spread across multiple tasks submitted to the pool.
Several different multithreaded tests are performed, with the number of tasks either equal to, smaller than, or larger than the pool's thread count. Each test is repeated multiple times, with the run times averaged over all runs of the same test. The program keeps increasing the number of blocks by a factor of 2 until diminishing returns are encountered. The run times of the tests are compared, and the maximum speedup obtained is calculated.
As an example, here are the results of the benchmarks from a Digital Research Alliance of Canada node equipped with two 20-core / 40-thread Intel Xeon Gold 6148 CPUs (for a total of 40 cores and 80 threads), running CentOS Linux 7.9.2009. The tests were compiled using GCC v13.2.0 with the -O3 and -march=native flags. The output was as follows:
======================
Performing benchmarks:
======================
Using 80 threads.
Determining the number of elements to generate in order to achieve an approximate mean execution time of 50 ms with 80 tasks...
Each test will be repeated up to 30 times to collect reliable statistics.
Generating 27962000 elements:
[......]
Single-threaded, mean execution time was 2815.2 ms with standard deviation 3.5 ms.
[......]
With 2 tasks, mean execution time was 1431.3 ms with standard deviation 10.1 ms.
[.......]
With 4 tasks, mean execution time was 722.1 ms with standard deviation 11.4 ms.
[..............]
With 8 tasks, mean execution time was 364.9 ms with standard deviation 10.9 ms.
[............................]
With 16 tasks, mean execution time was 181.9 ms with standard deviation 8.0 ms.
[..............................]
With 32 tasks, mean execution time was 110.6 ms with standard deviation 1.8 ms.
[..............................]
With 64 tasks, mean execution time was 64.0 ms with standard deviation 6.3 ms.
[..............................]
With 128 tasks, mean execution time was 59.8 ms with standard deviation 0.8 ms.
[..............................]
With 256 tasks, mean execution time was 59.0 ms with standard deviation 0.0 ms.
[..............................]
With 512 tasks, mean execution time was 52.8 ms with standard deviation 0.4 ms.
[..............................]
With 1024 tasks, mean execution time was 50.7 ms with standard deviation 0.9 ms.
[..............................]
With 2048 tasks, mean execution time was 50.0 ms with standard deviation 0.5 ms.
[..............................]
With 4096 tasks, mean execution time was 49.4 ms with standard deviation 0.5 ms.
[..............................]
With 8192 tasks, mean execution time was 50.2 ms with standard deviation 0.4 ms.
Maximum speedup obtained by multithreading vs. single-threading: 56.9x, using 4096 tasks.
+++++++++++++++++++++++++++++++++++++++
Thread pool performance test completed!
+++++++++++++++++++++++++++++++++++++++
These two CPUs have 40 physical cores in total, with each core providing two separate logical cores via hyperthreading, for a total of 80 threads. Without hyperthreading, we would expect a maximum theoretical speedup of 40x. With hyperthreading, one might naively expect to achieve up to an 80x speedup, but this is in fact impossible, as each pair of hyperthreaded logical cores share the same physical core's resources. However, generally we would expect at most an estimated 30% additional speedup from hyperthreading, which amounts to around 52x in this case. The speedup of 56.9x in our performance test exceeds this estimate.
If you are using the vcpkg C/C++ package manager, you can easily install BS::thread_pool with the following commands:
On Linux/macOS:
./vcpkg install bshoshany-thread-pool
On Windows:
.vcpkg install bshoshany-thread-pool:x86-windows bshoshany-thread-pool:x64-windows
To update the package to the latest version, run:
vcpkg upgrade
If you are using the Conan C/C++ package manager, you can easily integrate BS::thread_pool into your project by adding the following lines to your conanfile.txt :
[requires]
bshoshany-thread-pool/4.1.0To update the package to the latest version, simply change the version number. Please refer to this package's page on ConanCenter for more information.
If you are using the Meson build system, you can install BS::thread_pool from WrapDB. To do so, create a subprojects folder in your project (if it does not already exist) and run the following command:
meson wrap install bshoshany-thread-pool
Then, use dependency('bshoshany-thread-pool') in your meson.build file to include the package. To update the package to the latest version, run:
meson wrap update bshoshany-thread-pool
If you are using CMake, you can install BS::thread_pool with CPM. If CPM is already installed, simply add the following to your project's CMakeLists.txt :
CPMAddPackage(
NAME BS_thread_pool
GITHUB_REPOSITORY bshoshany/thread-pool
VERSION 4.1.0)
add_library (BS_thread_pool INTERFACE )
target_include_directories (BS_thread_pool INTERFACE ${BS_thread_pool_SOURCE_DIR} / include )This will automatically download the indicated version of the package from the GitHub repository and include it in your project.
It is also possible to use CPM without installing it first, by adding the following lines to CMakeLists.txt before CPMAddPackage :
set (CPM_DOWNLOAD_LOCATION " ${CMAKE_BINARY_DIR} /cmake/CPM.cmake" )
if ( NOT ( EXISTS ${CPM_DOWNLOAD_LOCATION} ))
message ( STATUS "Downloading CPM.cmake" )
file (DOWNLOAD https://github.com/cpm-cmake/CPM.cmake/releases/latest/download/CPM.cmake ${CPM_DOWNLOAD_LOCATION} )
endif ()
include ( ${CPM_DOWNLOAD_LOCATION} ) Here is an example of a complete CMakeLists.txt for a project named my_project consisting of a single source file main.cpp which uses BS_thread_pool.hpp :
cmake_minimum_required ( VERSION 3.19)
project (my_project LANGUAGES CXX)
set (CMAKE_CXX_STANDARD 17)
set (CMAKE_CXX_STANDARD_REQUIRED ON )
set (CMAKE_CXX_EXTENSIONS OFF )
set (CPM_DOWNLOAD_LOCATION " ${CMAKE_BINARY_DIR} /cmake/CPM.cmake" )
if ( NOT ( EXISTS ${CPM_DOWNLOAD_LOCATION} ))
message ( STATUS "Downloading CPM.cmake" )
file (DOWNLOAD https://github.com/cpm-cmake/CPM.cmake/releases/latest/download/CPM.cmake ${CPM_DOWNLOAD_LOCATION} )
endif ()
include ( ${CPM_DOWNLOAD_LOCATION} )
CPMAddPackage(
NAME BS_thread_pool
GITHUB_REPOSITORY bshoshany/thread-pool
VERSION 4.1.0)
add_library (BS_thread_pool INTERFACE )
target_include_directories (BS_thread_pool INTERFACE ${BS_thread_pool_SOURCE_DIR} / include )
add_executable (my_project main.cpp)
target_link_libraries (my_project BS_thread_pool) With both CMakeLists.txt and main.cpp in the same folder, type the following commands to build the project:
cmake -S . -B build
cmake --build build
This section provides a complete reference to classes, member functions, objects, and macros available in this library, along with other important information. Member functions are given here with simplified prototypes (eg removing const ) for ease of reading.
More information can be found in the provided Doxygen comments. Any modern IDE, such as Visual Studio Code, can use the Doxygen comments to provide automatic documentation for any class and member function in this library when hovering over code with the mouse or using auto-complete.
BS_thread_pool.hpp ) BS::thread_pool class The class BS::thread_pool is the main thread pool class. It can be used to create a pool of threads and submit tasks to a queue. When a thread becomes available, it takes a task from the queue and executes it. The member functions that are available by default, when no macros are defined, are:
thread_pool() : Construct a new thread pool. The number of threads will be the total number of hardware threads available, as reported by the implementation. This is usually determined by the number of cores in the CPU. If a core is hyperthreaded, it will count as two threads.thread_pool(BS::concurrency_t num_threads) : Construct a new thread pool with the specified number of threads.thread_pool(std::function<void()>& init_task) : Construct a new thread pool with the specified initialization function.thread_pool(BS::concurrency_t num_threads, std::function<void()>& init_task) : Construct a new thread pool with the specified number of threads and initialization function.void reset() : Reset the pool with the total number of hardware threads available, as reported by the implementation. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well.void reset(BS::concurrency_t num_threads) : Reset the pool with a new number of threads.void reset(std::function<void()>& init_task) Reset the pool with the total number of hardware threads available, as reported by the implementation, and a new initialization function.void reset(BS::concurrency_t num_threads, std::function<void()>& init_task) : Reset the pool with a new number of threads and a new initialization function.size_t get_tasks_queued() : Get the number of tasks currently waiting in the queue to be executed by the threads.size_t get_tasks_running() : Get the number of tasks currently being executed by the threads.size_t get_tasks_total() : Get the total number of unfinished tasks: either still waiting in the queue, or running in a thread. Note that get_tasks_total() == get_tasks_queued() + get_tasks_running() .BS::concurrency_t get_thread_count() : Get the number of threads in the pool.std::vector<std::thread::id> get_thread_ids() : Get a vector containing the unique identifiers for each of the pool's threads, as obtained by std::thread::get_id() .T and F are template parameters):void detach_task(F&& task) : Submit a function with no arguments and no return value into the task queue. To push a function with arguments, enclose it in a lambda expression. Does not return a future, so the user must use wait() or some other method to ensure that the task finishes executing, otherwise bad things will happen.void detach_blocks(T first_index, T index_after_last, F&& block, size_t num_blocks = 0) : Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. The block function takes two arguments, the start and end of the block, so that it is only called only once per block, but it is up to the user make sure the block function correctly deals with all the indices in each block. Does not return a BS::multi_future , so the user must use wait() or some other method to ensure that the loop finishes executing, otherwise bad things will happen.void detach_loop(T first_index, T index_after_last, F&& loop, size_t num_blocks = 0) : Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. The loop function takes one argument, the loop index, so that it is called many times per block. Does not return a BS::multi_future , so the user must use wait() or some other method to ensure that the loop finishes executing, otherwise bad things will happen.void detach_sequence(T first_index, T index_after_last, F&& sequence) : Submit a sequence of tasks enumerated by indices to the queue. Does not return a BS::multi_future , so the user must use wait() or some other method to ensure that the sequence finishes executing, otherwise bad things will happen.T , F , and R are template parameters):std::future<R> submit_task(F&& task) : Submit a function with no arguments into the task queue. To submit a function with arguments, enclose it in a lambda expression. If the function has a return value, get a future for the eventual returned value. If the function has no return value, get an std::future<void> which can be used to wait until the task finishes.BS::multi_future<R> submit_blocks(T first_index, T index_after_last, F&& block, size_t num_blocks = 0) : Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. The block function takes two arguments, the start and end of the block, so that it is only called only once per block, but it is up to the user make sure the block function correctly deals with all the indices in each block. Returns a BS::multi_future that contains the futures for all of the blocks.BS::multi_future<void> submit_loop(T first_index, T index_after_last, F&& loop, size_t num_blocks = 0) : Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. The loop function takes one argument, the loop index, so that it is called many times per block. It must have no return value. Returns a BS::multi_future that contains the futures for all of the blocks.BS::multi_future<R> submit_sequence(T first_index, T index_after_last, F&& sequence) : Submit a sequence of tasks enumerated by indices to the queue. Returns a BS::multi_future that contains the futures for all of the tasks.void purge() : Purge all the tasks waiting in the queue. Tasks that are currently running will not be affected, but any tasks still waiting in the queue will be discarded, and will never be executed by the threads. Please note that there is no way to restore the purged tasks.R and P , C , and D are template parameters):void wait() : Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are currently running in the threads and those that are still waiting in the queue. However, if the pool is paused, this function only waits for the currently running tasks (otherwise it would wait forever). Note: To wait for just one specific task, use submit_task() instead, and call the wait() member function of the generated future.bool wait_for(std::chrono::duration<R, P>& duration) : Wait for tasks to be completed, but stop waiting after the specified duration has passed. Returns true if all tasks finished running, false if the duration expired but some tasks are still running.bool wait_until(std::chrono::time_point<C, D>& timeout_time) : Wait for tasks to be completed, but stop waiting after the specified time point has been reached. Returns true if all tasks finished running, false if the time point was reached but some tasks are still running. When a BS::thread_pool object goes out of scope, the destructor first waits for all tasks to complete, then destroys all threads. Note that if the pool is paused, then any tasks still in the queue will never be executed.
BS::thread_pool classThe thread pool has several optional features that must be explicitly enabled using macros.
BS_THREAD_POOL_ENABLE_PRIORITY .detach_task() , submit_task() , detach_blocks() , submit_blocks() , detach_loop() , submit_loop() , detach_sequence() , and submit_sequence() . If the priority is not specified, the default value will be 0.BS::priority_t , which is a signed 16-bit integer, so it can have any value between -32,768 and 32,767. The tasks will be executed in priority order from highest to lowest.BS::pr contains some pre-defined priorities: BS::pr::highest , BS::pr::high , BS::pr::normal , BS::pr::low , and BS::pr::lowest .BS_THREAD_POOL_ENABLE_PAUSE . Adds the following member functions:void pause() : Pause the pool. The workers will temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished.void unpause() : Unpause the pool. The workers will resume retrieving new tasks out of the queue.bool is_paused() : Check whether the pool is currently paused.BS_THREAD_POOL_ENABLE_NATIVE_HANDLES . Adds the following member function:std::vector<std::thread::native_handle_type> get_native_handles() : Get a vector containing the underlying implementation-defined thread handles for each of the pool's threads.BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK .wait() , wait_for() , and wait_until() will check whether the user tried to call them from within a thread of the same pool, which would result in a deadlock. If so, they will throw the exception BS::thread_pool::wait_deadlock instead of waiting.BS_THREAD_POOL_DISABLE_EXCEPTION_HANDLING .submit_task() if it is not needed, or if exceptions are explicitly disabled in the codebase.BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK . Disabling exception handling removes the try - catch block from submit_task() , while enabling wait deadlock checks adds a throw expression to wait() , wait_for() , and wait_until() .__cpp_exceptions is undefined, BS_THREAD_POOL_DISABLE_EXCEPTION_HANDLING is automatically defined, and BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK is automatically undefined. BS::this_thread namespace The namespace BS::this_thread provides functionality similar to std::this_thread . It contains the following function objects:
BS::this_thread::get_index() can be used to get the index of the current thread. If this thread belongs to a BS::thread_pool object, it will have an index from 0 to BS::thread_pool::get_thread_count() - 1 . Otherwise, for example if this thread is the main thread or an independent std::thread , std::nullopt will be returned.BS::this_thread::get_pool() can be used to get the pointer to the thread pool that owns the current thread. If this thread belongs to a BS::thread_pool object, a pointer to that object will be returned. Otherwise, std::nullopt will be returned.std::optional object will be returned, of type BS::this_thread::optional_index or BS::this_thread::optional_pool respectively. Unless you are 100% sure this thread is in a pool, first use std::optional::has_value() to check if it contains a value, and if so, use std::optional::value() to obtain that value. BS::multi_future<T> class BS::multi_future<T> is a helper class used to facilitate waiting for and/or getting the results of multiple futures at once. It is defined as a specialization of std::vector<std::future<T>> . This means that all of the member functions that can be used on an std::vector can also be used on a BS::multi_future . For example, you may use a range-based for loop with a BS::multi_future , since it has iterators.
In addition to inherited member functions, BS::multi_future has the following specialized member functions ( R and P , C , and D are template parameters):
[void or std::vector<T>] get() : Get the results from all the futures stored in this BS::multi_future , rethrowing any stored exceptions. If the futures return void , this function returns void as well. If the futures return a type T , this function returns a vector containing the results.size_t ready_count() : Check how many of the futures stored in this BS::multi_future are ready.bool valid() : Check if all the futures stored in this BS::multi_future are valid.void wait() : Wait for all the futures stored in this BS::multi_future .bool wait_for(std::chrono::duration<R, P>& duration) : Wait for all the futures stored in this BS::multi_future , but stop waiting after the specified duration has passed. Returns true if all futures have been waited for before the duration expired, false otherwise.bool wait_until(std::chrono::time_point<C, D>& timeout_time) : Wait for all the futures stored in this multi_future object, but stop waiting after the specified time point has been reached. Returns true if all futures have been waited for before the time point was reached, false otherwise.BS_thread_pool_utils.hpp ) BS::signaller class BS::signaller is a utility class which can be used to allow simple signalling between threads. This class is really just a convenient wrapper around std::promise , which contains both the promise and its future. It has the following member functions:
signaller() : Construct a new signaller.void wait() : Wait until the signaller is ready.void ready() : Inform any waiting threads that the signaller is ready. BS::synced_stream class BS::synced_stream is a utility class which can be used to synchronize printing to an output stream by different threads. It has the following member functions ( T is a template parameter pack):
synced_stream(std::ostream& stream = std::cout) : Construct a new synced stream which prints to the given output stream.void print(T&&... items) : Print any number of items into the output stream. Ensures that no other threads print to this stream simultaneously, as long as they all exclusively use the same synced_stream object to print.void println(T&&... items) : Print any number of items into the output stream, followed by a newline character.In addition, the class comes with two stream manipulators, which are meant to help the compiler figure out which template specializations to use with the class:
BS::synced_stream::endl : An explicit cast of std::endl . Prints a newline character to the stream, and then flushes it. Should only be used if flushing is desired, otherwise a newline character should be used instead.BS::synced_stream::flush : An explicit cast of std::flush . Used to flush the stream. BS::timer class BS::timer is a utility class which can be used to measure execution time for benchmarking purposes. It has the following member functions:
timer() : Construct a new timer and immediately start measuring time.void start() : Start (or restart) measuring time. Note that the timer starts ticking as soon as the object is created, so this is only necessary if we want to restart the clock later.void stop() : Stop measuring time and store the elapsed time since the object was constructed or since start() was last called.std::chrono::milliseconds::rep current_ms() : Get the number of milliseconds that have elapsed since the object was constructed or since start() was last called, but keep the timer ticking.std::chrono::milliseconds::rep ms() : Get the number of milliseconds stored when stop() was last called. This library is under continuous and active development. If you encounter any bugs, or if you would like to request any additional features, please feel free to open a new issue on GitHub and I will look into it as soon as I can.
Contributions are always welcome. However, I release my projects in cumulative updates after editing and testing them locally on my system, so my policy is not to accept any pull requests. If you open a pull request, and I decide to incorporate your suggestion into the project, I will first modify your code to comply with the project's coding conventions (formatting, syntax, naming, comments, programming practices, etc.), and perform some tests to ensure that the change doesn't break anything. I will then merge it into the next release of the project, possibly together with some other changes. The new release will also include a note in CHANGELOG.md with a link to your pull request, and modifications to the documentation in README.md as needed.
Many GitHub users have helped improve this project, directly or indirectly, via issues, pull requests, comments, and/or personal correspondence. Please see CHANGELOG.md for links to specific issues and pull requests that have been the most helpful. Thank you all for your contribution! :)
If you found this project useful, please consider starring it on GitHub! This allows me to see how many people are using my code, and motivates me to keep working to improve it.
Copyright (c) 2024 Barak Shoshany. Licensed under the MIT license.
If you use this C++ thread pool library in software of any kind, please provide a link to the GitHub repository in the source code and documentation.
If you use this library in published research, please cite it as follows:
You can use the following BibTeX entry:
@article { Shoshany2024_ThreadPool ,
archiveprefix = { arXiv } ,
author = { Barak Shoshany } ,
doi = { 10.1016/j.softx.2024.101687 } ,
eprint = { 2105.00613 } ,
journal = { SoftwareX } ,
pages = { 101687 } ,
title = { {A C++17 Thread Pool for High-Performance Scientific Computing} } ,
url = { https://www.sciencedirect.com/science/article/pii/S235271102400058X } ,
volume = { 26 } ,
year = { 2024 }
} Please note that the papers on SoftwareX and arXiv are not up to date with the latest version of the library. These publications are only intended to facilitate discovery of this library by scientists, and to enable citing it in scientific research. Documentation for the latest version is provided only by the README.md file in the GitHub repository.
Beginner C++ programmers may be interested in my lecture notes for a course taught at McMaster University, which teach modern C and C++ from scratch, including some of the advanced techniques and programming practices used in developing this library.