BS::thread_pool :快速,轻巧且易于使用的C ++ 17线程池库由Barak Shoshany
电子邮件:[email protected]
网站:https://baraksh.com/
github:https://github.com/bshoshany
这是图书馆的v4.1.0的完整文档,于2024-03-22发布。
BS::multi_future<T>更多信息BS::synced_stream同步打印到流BS::timer测量执行时间BS::signaller在线程之间发送简单信号BS_thread_pool.hpp )BS::thread_pool类BS::thread_pool类的可选功能BS::this_thread名称空间BS::multi_future<T>类BS_thread_pool_utils.hpp )BS::signaller课程BS::synced_stream类BS::timer课多线程对于现代高性能计算至关重要。自C ++ 11以来,C ++标准库包括使用std::thread等构造的内置低级多线程支持。但是, std::thread每次称为新线程都会创建一个新线程,该线程可以在开销中具有重要的性能。此外,可以创建比硬件可以同时处理更多的线程,这可能导致大幅放缓。
此处介绍的库包含一个C ++线程池类BS::thread_pool ,该类别通过一劳永逸地创建固定的线程池来避免这些问题,然后在程序的整个生命周期内连续重复使用相同的线程来执行不同的任务。默认情况下,池中的线程数等于硬件可以并行运行的最大线程数。
用户将要执行为队列执行的任务提交。每当线程可用时,它都会从队列中检索下一个任务并执行它。池自动为每个任务生成一个std::future ,允许用户等待任务完成执行和/或获得其最终返回值(如果适用)。线程和任务在后台由池自主管理,而无需除了提交所需的任务以外的用户输入。
该图书馆的设计以四个重要原则为指导。首先,紧凑度:整个库仅包含一个独立的标头文件,没有其他组件或依赖项,除了带有可选公用事业的小型独立标头文件。其次,可移植性:该库仅利用C ++ 17标准库,而无需依赖任何编译器扩展程序或第三方库,因此与任何平台上的任何现代标准配合的C ++ 17编译器兼容。第三,易用性:库有广泛的记录,任何级别的程序员都应该能够直接使用它。
第四个也是最后一个指导原则是性能:该库中的每一条代码均经过精心设计,并在各种编译器和平台上进行了测试和验证。确实,该库最初是为作者自己的计算密集型科学计算项目而设计的,该项目都在高端台式机/笔记本电脑和高性能计算节点上运行。
其他更高级的多线程库可能会提供更多功能和/或更高的性能。但是,它们通常由具有多个组件和依赖项的庞大代码库组成,并且涉及需要大量时间投资才能学习的复杂API。该库无意取代这些更高级的库;取而代之的是,它是为不需要非常高级功能的用户而设计的,并且喜欢一个易于学习和使用的简单且轻巧的库,并且可以轻松地将其纳入现有项目或新项目中。
#include "BS_thread_pool.hpp" ,您都设置了!submit_task()成员函数提交给队列的每个任务都会自动生成std::future ,可用于等待任务完成执行和/或获得其最终返回值。submit_loop()成员函数自动将循环并行在任意数量的任务中,该函数返回BS::multi_future ,可用于一次跟踪所有并行任务的执行。detach_task()提交任务,并且可以使用detach_loop()并行循环 - 牺牲便利性以提高性能。在这种情况下,可以使用wait() , wait_for()和wait_until()等待队列中的所有任务要完成。BS_thread_pool_test.cpp可用于执行详尽的自动化测试和基准,并作为如何正确使用库的全面示例。随附的PowerShell脚本BS_thread_pool_test.ps1提供了一种使用多个编译器运行测试的便携式方法。BS_thread_pool_utils.hpp包含几个有用的实用程序类。BS::signaller实用程序类在线程之间发送简单信号。BS::synced_stream实用程序类同步来自多个线程的流量。BS::timer实用程序类轻松测量用于基准目的的执行时间。detach_sequence()和submit_sequence()提交一系列由索引列举的任务列出的任务。reset()成员函数,可以安全地更改池中的线程数。get_tasks_queued() , get_tasks_running()和get_tasks_total()成员函数监视排队和/或运行任务的数量。get_thread_count()获取池的当前线程计数。pause() , unpause()和is_paused()成员函数自由暂停并恢复池;暂停时,线程不会从队列中检索新任务。purge()成员函数等待的所有任务。submit_task()或从主线程通过其期货submit_loop()的任务提交的任务捕获异常。BS::this_thread::get_index()获取当前线程的池索引,并使用BS::this_thread::get_pool()使用该线程的池指针。get_thread_ids()或使用可选的get_native_handles()成员函数获取池中所有线程的唯一线程ID。该库应在任何C ++ 17标准编译器上成功编译,并在所有可用的编译器的操作系统和架构上进行编译。使用以下编译器和平台通过24核(8p+16e) / 32-thear Read Intel I9-13900K CPU验证兼容性:
此外,该库在加拿大的数字研究联盟中进行了测试,该节点配备了两个20核 / 40杆Intel Xeon Gold 6148 CPU(总共40个核心和80个线程),使用GCC v13.2.0,运行Centos Linux 7.9.2009。
测试程序BS_thread_pool_test.cpp是没有警告的(带警告标志-Wall -Wextra -Wconversion -Wsign-conversion -Wpedantic -Weffc++ -Wshadow在MSVC中的GCC /Clang和/W4中的),使用和成功完成了所有自动化的测试,并完成了所有自动化的测试。
由于该库需要C ++ 17个功能,因此必须使用C ++ 17的支持编译代码:
-std=c++17标志。在Linux上,您还需要使用-pthread标志来启用POSIX线程库。/std:c++17 ,以及/permissive-确保标准符合度。为了获得最高性能,建议对所有可用编译器优化进行编译:
-O3标志。/O2 。例如,要使用警告和优化编译测试程序BS_thread_pool_test.cpp ,建议使用以下命令:
g++ BS_thread_pool_test.cpp -std=c++17 -O3 -Wall -Wextra -Wconversion -Wsign-conversion -Wpedantic -Weffc++ -Wshadow -pthread -o BS_thread_pool_testclang++替换g++ 。-o BS_thread_pool_test ,用-o BS_thread_pool_test.exe替换-O bs_thread_pool_test和remove -pthread 。cl BS_thread_pool_test.cpp /std:c++17 /permissive- /O2 /W4 /EHsc /Fo:BS_thread_pool_test.obj /Fe:BS_thread_pool_test.exe 要安装BS::thread_pool ,只需从GitHub存储库中下载最新版本,请将标题文件BS_thread_pool.hpp放在所需文件夹中的include文件夹中,然后将其包括在您的程序中:
# include " BS_thread_pool.hpp "现在,将可以通过BS::thread_pool类访问线程池。要进行更快的安装,您可以直接在此URL下下载标头文件本身。
该库还带有一个独立的实用程序标题文件BS_thread_pool_utils.hpp ,该文件不需要使用线程池,而是提供一些可能有助于多线程的实用程序类。该标头文件还位于include文件夹中。它可以直接在此URL下下载。
该库还可以在各种软件包管理器和构建系统上使用,包括VCPKG,Conan,Meson和CMAKE,并使用CPM。请参阅下面有关更多详细信息。
默认的构造函数可以使用std::thread::hardware_concurrency()的实现报告,创建了一个线程池,用硬件可以同时处理的线程池。这通常取决于CPU中的核心数。如果核心被超线程,则将其视为两个线程。例如:
// Constructs a thread pool with as many threads as available in the hardware.
BS::thread_pool pool;可选地,可以将许多与硬件并发不同的线程指定为构造函数的参数。但是,请注意,添加超过硬件可以处理的更多线程不会提高性能,实际上很可能会阻碍它。存在此选项,以便在您希望留下一些可用于其他进程的线程的情况下使用少于硬件并发的线程。例如:
// Constructs a thread pool with only 12 threads.
BS::thread_pool pool ( 12 );通常,当使用线程池时,程序的主线程只能将任务提交到线程池并等待它们完成,并且不应单独执行任何计算密集的任务。在这种情况下,建议将默认值用于线程数。这样可以确保在主线程等待时,硬件中可用的所有线程都可以正常工作。
成员函数get_thread_count()返回池中的线程数。如果使用默认构造函数,这将等于std::thread::hardware_concurrency() 。
通常不需要更改池创建池中的线程数,因为线程池的整个点是您只能创建一次线程。但是,如果需要,可以使用reset()成员函数安全,可以安全地完成此操作。
reset()将等待所有当前运行的任务要完成,但将将其余的任务留在队列中。然后,它将破坏线程池并创建一个新的,并具有所需的新数量线程,如函数参数中指定的(或如果不给出参数,硬件并发)。然后,新线程池将恢复执行排队中保留的任务和任何新提交的任务。
如果需要,则可以从以下三个宏中读取此库的版本:
BS_THREAD_POOL_VERSION_MAJOR指示主要版本。BS_THREAD_POOL_VERSION_MINOR指示次要版本。BS_THREAD_POOL_VERSION_PATCH指示补丁版。 std::cout << " Thread pool library version is " << BS_THREAD_POOL_VERSION_MAJOR << ' . ' << BS_THREAD_POOL_VERSION_MINOR << ' . ' << BS_THREAD_POOL_VERSION_PATCH << " . n " ;样本输出:
Thread pool library version is 4.1.0.
例如,可以使用这允许相同的代码库使用#if指令与库的多个不兼容版本一起使用。
注意:此功能仅从v4.0.1开始。该库的较早版本并未定义这些宏。
在本节中,我们将学习如何在没有参数的情况下提交任务,而可能会带有回报值。一旦提交任务,将在线程可用后立即执行。除非启用任务优先级,否则以提交任务的顺序执行任务(请参见下文)。
例如,如果池有8个线程和一个空的队列,我们提交了16个任务,那么我们应该期望并行执行前8个任务,其余的任务由线程逐一拾取,因为每个线程都完成执行第一个任务,直到任何任务都保留在排队中。
成员函数submit_task()用于将任务提交给队列。它完全需要一个输入,即提交的任务。此任务必须是一个没有参数的函数,但可以具有返回值。返回值是与任务相关的std::future 。
如果提交的函数具有T型的返回值,则未来将是类型std::future<T> ,并且当功能完成其执行时,将设置为返回值。如果提交的函数没有返回值,那么未来将是一个std::future<void> ,它将不会返回任何值,但仍可以用来等待该函数完成。
使用auto进行submit_task()的返回值表示编译器将自动检测到模板std::future的实例。但是,如下示例中,指定特定类型的std::future<T>以提高可读性。
要等到任务完成,请使用未来的成员函数wait() 。要获取返回值,请使用成员函数get() ,如果尚未完成任务,该任务也将自动等待完成。这是一个简单的例子:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < future > // std::future
# include < iostream > // std::cout
int the_answer ()
{
return 42 ;
}
int main ()
{
BS::thread_pool pool;
std::future< int > my_future = pool. submit_task (the_answer);
std::cout << my_future. get () << ' n ' ;
}在此示例中,我们提交了函数the_answer() ,该函数返回int 。因此,池的成员函数submit_task()返回了一个std::future<int> 。然后,我们使用了未来的get()成员函数来获取返回值,并将其打印出来。
除了提交预定义的功能外,我们还可以使用lambda表达式快速定义任务。根据lambda表达式重写上一个示例,我们得到:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < future > // std::future
# include < iostream > // std::cout
int main ()
{
BS::thread_pool pool;
std::future< int > my_future = pool. submit_task ([]{ return 42 ; });
std::cout << my_future. get () << ' n ' ;
}在这里,lambda表达式[]{ return 42; }有两个部分:
[]表示。这意味着编译器正在定义lambda表达式。{ return 42; }仅返回值42 。提交lambda表达式而不是预定义的功能通常更简单,更快,尤其是由于捕获本地变量的能力,我们将在下一部分中进行讨论。
当然,任务不必返回值。在下面的示例中,我们提交一个无返回值的函数,然后使用未来等待它完成执行:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < chrono > // std::chrono
# include < future > // std::future
# include < iostream > // std::cout
# include < thread > // std::this_thread
int main ()
{
BS::thread_pool pool;
const std::future< void > my_future = pool. submit_task (
[]
{
std::this_thread::sleep_for ( std::chrono::milliseconds ( 500 ));
});
std::cout << " Waiting for the task to complete... " ;
my_future. wait ();
std::cout << " Done. " << ' n ' ;
}在这里,我们将lambda分为多行,使其更可读。命令std::this_thread::sleep_for(std::chrono::milliseconds(500))指示任务简单地睡500毫秒,模拟计算密集型任务。
如上一节所述,使用submit_task()提交的任务不能有任何参数。但是,很容易通过将函数包装在lambda中或直接使用lambda捕获来提交用参数提交任务。这是两个例子。
以下是通过用lambda包装来提交带有参数的预定义函数的示例:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < future > // std::future
# include < iostream > // std::cout
double multiply ( const double lhs, const double rhs)
{
return lhs * rhs;
}
int main ()
{
BS::thread_pool pool;
std::future< double > my_future = pool. submit_task (
[]
{
return multiply ( 6 , 7 );
});
std::cout << my_future. get () << ' n ' ;
}如您所见,要将参数传递给multiply ,我们简单地将multiply(6, 7)明确地称为lambda。如果参数不是文字,我们需要使用lambda捕获条款来捕获本地范围的参数:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < future > // std::future
# include < iostream > // std::cout
double multiply ( const double lhs, const double rhs)
{
return lhs * rhs;
}
int main ()
{
BS::thread_pool pool;
constexpr double first = 6 ;
constexpr double second = 7 ;
std::future< double > my_future = pool. submit_task (
[first, second]
{
return multiply (first, second);
});
std::cout << my_future. get () << ' n ' ;
}如果需要,我们甚至可以完全摆脱multiply功能,并将所有东西放入lambda中:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < future > // std::future
# include < iostream > // std::cout
int main ()
{
BS::thread_pool pool;
constexpr double first = 6 ;
constexpr double second = 7 ;
std::future< double > my_future = pool. submit_task (
[first, second]
{
return first * second;
});
std::cout << my_future. get () << ' n ' ;
}通常,最好使用submit_task()向队列提交任务。这使您可以等待任务完成和/或以后获得其返回值。但是,有时不需要未来,例如,当您只想“设置并忘记”某个任务时,或者任务已经与主线程或其他任务通信而不使用未来(例如通过条件变量)。
在这种情况下,您可能希望避免为任务分配未来的间接费用,以提高绩效。这称为“分离”任务,因为任务从主线程脱离并独立运行。
分离任务是使用detach_task()成员函数完成的,该功能使您可以将任务分离为队列而不为其生成未来。该任务可以具有多数参数,但不能具有返回值,因为主线程无法检索该值。
由于detach_task()不会返回未来,因此没有内置的方式让用户知道任务何时完成执行。您必须在尝试使用任何取决于其输出的内容之前手动确保任务完成执行。否则,坏事将会发生!
BS::thread_pool提供成员函数wait() ,以促进等待队列中的所有任务完成,无论是分离还是在未来中脱离或提交。 wait()成员函数的工作原理与std::future的wait()成员函数相似。例如,考虑以下代码:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < chrono > // std::chrono
# include < iostream > // std::cout
# include < thread > // std::this_thread
int main ()
{
BS::thread_pool pool;
int result = 0 ;
pool. detach_task (
[&result]
{
std::this_thread::sleep_for ( std::chrono::milliseconds ( 100 ));
result = 42 ;
});
std::cout << result << ' n ' ;
}该程序首先定义一个局部变量命名result ,并将其初始化为0 。然后,它以lambda表达式的形式分离任务。请注意,lambda通过引用result ,如&在其前面所示。这意味着任务可以修改result ,任何此类修改都将反映在主线程中。任务的变化result为42 ,但首先入睡100毫秒。当主线程打印出result的值时,任务还没有时间修改其值,因为它仍在睡觉。因此,程序将打印出初始值0 。
要等待任务完成,我们必须在分离后使用wait()成员函数:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < chrono > // std::chrono
# include < iostream > // std::cout
# include < thread > // std::this_thread
int main ()
{
BS::thread_pool pool;
int result = 0 ;
pool. detach_task (
[&result]
{
std::this_thread::sleep_for ( std::chrono::milliseconds ( 100 ));
result = 42 ;
});
pool. wait ();
std::cout << result << ' n ' ;
}现在,该程序将按预期打印出值42 。但是请注意, wait()将等待队列中的所有任务,包括我们关心的任何其他任务。如果我们只等待一个任务, submit_task()将是一个更好的选择。
有时,您可能希望等待任务完成,但只需要一定时间,或直到特定时间点。例如,如果一段时间后任务尚未完成,则可能希望让用户知道有延迟。
对于使用submit_task()提交期货的任务,可以使用std::future的两个成员功能来实现这一点:
wait_for()等待完成任务的完成,但是在指定的持续时间之后停止等待,作为std::chrono::duration的参数,已通过。wait_until()等待完成任务的完成,但是已经达到了指定的时间点之后的等待,该任务已作为类型std::chrono::time_point的参数。在这两种情况下,如果未来准备就绪,功能都将返回future_status::ready ,这意味着已完成任务,并且已获得其返回值(如果有)。但是,如果未来尚未准备就绪时,它将返回std::future_status::timeout 。
这是一个示例:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < chrono > // std::chrono
# include < future > // std::future
# include < iostream > // std::cout
# include < thread > // std::this_thread
int main ()
{
BS::thread_pool pool;
const std::future< void > my_future = pool. submit_task (
[]
{
std::this_thread::sleep_for ( std::chrono::milliseconds ( 1000 ));
std::cout << " Task done! n " ;
});
while ( true )
{
if (my_future. wait_for ( std::chrono::milliseconds ( 200 )) != std::future_status::ready)
std::cout << " Sorry, the task is not done yet. n " ;
else
break ;
}
}输出应该看起来与此相似:
Sorry, the task is not done yet.
Sorry, the task is not done yet.
Sorry, the task is not done yet.
Sorry, the task is not done yet.
Task done!
对于独立的任务,由于我们没有未来的未来,因此我们无法使用此方法。但是, BS::thread_pool具有两个成员函数,也命名为wait_for()和wait_until() ,它们类似地等待指定的持续时间或直到指定的时间点,但对于所有任务(无论已提交还是分离)。线程池的等待功能不是std::future_status ,如果完成所有任务,则返回true ,或者如果持续时间到期或达到时间点,则false返回true,但某些任务仍在运行。
这是与上述相同的示例,使用detach_task()和pool.wait_for() :
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < chrono > // std::chrono
# include < iostream > // std::cout
# include < thread > // std::this_thread
int main ()
{
BS::thread_pool pool;
pool. detach_task (
[]
{
std::this_thread::sleep_for ( std::chrono::milliseconds ( 1000 ));
std::cout << " Task done! n " ;
});
while ( true )
{
if (!pool. wait_for ( std::chrono::milliseconds ( 200 )))
std::cout << " Sorry, the task is not done yet. n " ;
else
break ;
}
}让我们考虑以下程序:
# include < iostream > // std::cout, std::boolalpha
class flag_class
{
public:
[[nodiscard]] bool get_flag () const
{
return flag;
}
void set_flag ( const bool arg)
{
flag = arg;
}
private:
bool flag = false ;
};
int main ()
{
flag_class flag_object;
flag_object. set_flag ( true );
std::cout << std::boolalpha << flag_object. get_flag () << ' n ' ;
}该程序创建一个新的对象flag_object的类flag_class ,使用setter成员函数set_flag()将标志设置为true ,然后使用getter成员函数get_flag()打印出标志的值。
如果我们要将成员函数set_flag()作为任务提交到线程池怎么办?我们只需包装整个语句flag_object.set_flag(true);从lambda中的行,然后通过引用将flag_object传递到lambda,如此示例:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < iostream > // std::cout, std::boolalpha
class flag_class
{
public:
[[nodiscard]] bool get_flag () const
{
return flag;
}
void set_flag ( const bool arg)
{
flag = arg;
}
private:
bool flag = false ;
};
int main ()
{
BS::thread_pool pool;
flag_class flag_object;
pool. submit_task (
[&flag_object]
{
flag_object. set_flag ( true );
})
. wait ();
std::cout << std::boolalpha << flag_object. get_flag () << ' n ' ;
}当然,如果我们在池本身上调用wait() ,这也将与detach_task()一起使用。
请注意,在此示例中,我们没有从submit_task()获得未来,然后等待那个未来,而只是直接在未来的未来中调用wait() 。如果我们在此期间无事可做,这是一种等待任务完成的常见方法。还要注意,我们通过引用lambda传递了flag_object ,因为我们想在同一对象上设置标志,而不是它的副本(无论如何通过value都不会工作,因为值按值捕获的变量隐含是const )。
您可能要做的另一件事是从对象本身中调用成员函数,即来自另一个成员函数。这遵循类似的语法,除了您还必须在lambda中捕获this (即指向当前对象的指针)。这是一个示例:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < iostream > // std::cout, std::boolalpha
BS::thread_pool pool;
class flag_class
{
public:
[[nodiscard]] bool get_flag () const
{
return flag;
}
void set_flag ( const bool arg)
{
flag = arg;
}
void set_flag_to_true ()
{
pool. submit_task (
[ this ]
{
set_flag ( true );
})
. wait ();
}
private:
bool flag = false ;
};
int main ()
{
flag_class flag_object;
flag_object. set_flag_to_true ();
std::cout << std::boolalpha << flag_object. get_flag () << ' n ' ;
}请注意,在此示例中,我们将线程池定义为全局对象,因此在main()函数之外可以访问它。
并行的最常见和有效的方法之一是将环将循环拆分为较小的循环并并行运行。它在“令人尴尬的并行”计算中最有效,例如向量或矩阵操作,其中循环的每次迭代都完全独立于其他所有迭代。
例如,如果我们要将每个1000个元素的两个向量总结,并且我们有10个线程,则可以将求和分为每个100个元素的10个块,并并行运行所有块,可能会增加性能,最高为10倍。
BS::thread_pool可以自动并行化循环。要查看其工作原理,请考虑以下通用循环:
for (T i = start; i < end; ++i)
loop (i);在哪里:
T是任何签名或未签名的整数类型。[start, end)范围,即包括start ,但不包括end 。loop()是针对每个循环索引i执行的操作,例如修改具有end - start元素的数组。该循环可以自动并行化并使用成员函数submit_loop()提交到线程池的队列,该曲线具有以下语法:
pool.submit_loop(start, end, loop, num_blocks);在哪里:
start是该范围内的第一个索引。end是该范围内最后一个索引之后的索引,因此完整范围是[start, end) 。换句话说,如果start和end是相同的,则循环将等效于上面的循环。start和end都必须是相同的整数T 。请参阅下面的示例,说明当它们不是相同类型时该怎么做。end <= start ,则不会发生任何事情。loop()是应在循环的每一个迭代中运行的函数,并采用一个参数,即循环索引。num_blocks是形式[a, b)的块数量,以将循环拆分为。例如,如果范围为[0, 9)并且有3个块,则块将是范围[0, 3) , [3, 6)和[6, 9) 。[0, 100)分为15个块,则结果将是大小7的10个块,该块将首先执行,大小为6的5个块。每个块将作为单独的任务提交到线程池的队列。因此,分为3个块的循环将分为3个单独的任务,该任务可能并行运行。如果只有一个块,则整个循环将作为一个任务运行,并且不会进行并行化。
为了并行化上面的通用循环,我们使用以下命令:
BS::multi_future< void > loop_future = pool.submit_loop(start, end, loop, num_blocks);
loop_future.wait(); submit_loop()返回辅助类模板BS::multi_future的对象。这本质上是std::vector<std::future<T>>具有其他成员功能的专业化。每个num_blocks块都会有一个std::future分配给它,并且所有这些期货都将存储在返回的BS::multi_future中。当调用loop_future.wait()时,主线程将等到submit_loop()完成执行的所有任务,并且只有这些任务 - 没有其他任何任务也恰好在排队中。从本质上讲,这是BS::multi_future类的角色:等待特定的任务组,在这种情况下,运行循环块的任务。
您应该将num_blocks使用什么值?省略此参数,以便块数量将等于池中的线程数,通常是一个不错的选择。为了获得最佳性能,建议您进行自己的基准测试,以找到每个循环的最佳块数(您可以使用BS::timer Utility类)。如果您还同时运行其他任务,则使用少于线程的任务少。在某些情况下,使用比线程更多的任务可能会提高性能,但是与多个任务的并行化将遭受回报减少。
作为一个简单的示例,以下代码计算并打印了所有整数的正方形表,从0到99:
# include < iomanip > // std::setw
# include < iostream > // std::cout
int main ()
{
constexpr unsigned int max = 100 ;
unsigned int squares[max];
for ( unsigned int i = 0 ; i < max; ++i)
squares[i] = i * i;
for ( unsigned int i = 0 ; i < max; ++i)
std::cout << std::setw ( 2 ) << i << " ^2 = " << std::setw ( 4 ) << squares[i] << ((i % 5 != 4 ) ? " | " : " n " );
}我们可以按以下方式进行并行化:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < iomanip > // std::setw
# include < iostream > // std::cout
int main ()
{
BS::thread_pool pool ( 10 );
constexpr unsigned int max = 100 ;
unsigned int squares[max];
const BS::multi_future< void > loop_future = pool. submit_loop < unsigned int >( 0 , max,
[&squares]( const unsigned int i)
{
squares[i] = i * i;
});
loop_future. wait ();
for ( unsigned int i = 0 ; i < max; ++i)
std::cout << std::setw ( 2 ) << i << " ^2 = " << std::setw ( 4 ) << squares[i] << ((i % 5 != 4 ) ? " | " : " n " );
}由于有10个线程,并且我们省略了num_blocks参数,因此循环将分为10个块,每个块计算10个正方形。
请注意,用显式模板参数<unsigned int>执行submit_loop() 。原因是两个循环指数必须是相同类型的。但是,这里的max是一个unsigned int ,而0是(签名) int ,因此类型不匹配,并且除非我们强迫0是正确的类型,否则代码不会编译。可以通过使用模板参数明确指定索引的类型来最优雅地完成。
之所以自动完成此操作(例如,使用std::common_type ,可能导致将负面指数施加到无符号类型,或者将整数索引变成太狭窄的整数类型,这可能会导致不正确的环路范围。
我们也可以将0明确地施放给未签名的int,但这看起来不太好:
pool.submit_loop( static_cast < unsigned int >( 0 ), max, /* ... */ );或者我们可以使用C风格的演员:
pool.submit_loop(( unsigned int )( 0 ), max, /* ... */ );或者我们可以使用整数字面的后缀:
pool.submit_loop< size_t >( 0U , max, ...);附带说明,请注意,我们在这里平行地计算正方形,但我们不同步打印结果。这有两个原因:
就像在detach_task() vs.sump_task submit_task()的情况下一样,有时您可能需要并行化一个循环,但是您不需要它返回BS::multi_future 。在这种情况下,您可以通过使用detach_loop()而不是submit_loop()来保存生成期货的开销(这可能是重要的,取决于块数),并具有相同的参数。
例如,我们可以按以下方式分离上面的正方形示例:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < iomanip > // std::setw
# include < iostream > // std::cout
int main ()
{
BS::thread_pool pool ( 10 );
constexpr unsigned int max = 100 ;
unsigned int squares[max];
pool. detach_loop < unsigned int >( 0 , max,
[&squares]( const unsigned int i)
{
squares[i] = i * i;
});
pool. wait ();
for ( unsigned int i = 0 ; i < max; ++i)
std::cout << std::setw ( 2 ) << i << " ^2 = " << std::setw ( 4 ) << squares[i] << ((i % 5 != 4 ) ? " | " : " n " );
}警告:由于detach_loop()没有返回BS::multi_future ,因此没有内置的方式让用户知道循环完成执行时。 You must use either wait() as we did here, or some other method such as condition variables, to ensure that the loop finishes executing before trying to use anything that depends on its output. Otherwise, bad things will happen!
We have seen that detach_loop() and submit_loop() execute the function loop(i) for each index i in the loop. However, behind the scenes, the loop is split into blocks, and each block executes the loop() function multiple times. Each block has an internal loop of the form (where T is the type of the indices):
for (T i = start; i < end; ++i)
loop (i); The start and end indices of each block are determined automatically by the pool. For example, in the previous section, the loop from 0 to 100 was split into 10 blocks of 10 indices each: start = 0 to end = 10 , start = 10 to end = 20 , and so on; the blocks are not inclusive of the last index, since the for loop has the condition i < end and not i <= end .
However, this also means that the loop() function is executed multiple times per block. This generates additional overhead due to the multiple function calls. For short loops, this should not affect performance. However, for very long loops, with millions of indices, the performance cost may be significate.
For this reason, the thread pool library provides two additional member functions for parallelizing loops: detach_blocks() and submit_blocks() . While detach_loop() and submit_loop() execute a function loop(i) once per index but multiple times per block, detach_blocks() and submit_blocks() execute a function block(start, end) once per block.
The main advantage of this method is increased performance, but the main disadvantage is slightly more complicated code. In particular, the user must define the loop from start to end manually within each block. Here is the previous example using detach_blocks() :
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < iomanip > // std::setw
# include < iostream > // std::cout
int main ()
{
BS::thread_pool pool ( 10 );
constexpr unsigned int max = 100 ;
unsigned int squares[max];
pool. detach_blocks < unsigned int >( 0 , max,
[&squares]( const unsigned int start, const unsigned int end)
{
for ( unsigned int i = start; i < end; ++i)
squares[i] = i * i;
});
pool. wait ();
for ( unsigned int i = 0 ; i < max; ++i)
std::cout << std::setw ( 2 ) << i << " ^2 = " << std::setw ( 4 ) << squares[i] << ((i % 5 != 4 ) ? " | " : " n " );
}Note how the block function takes two arguments, and includes the internal loop.
Generally, compiler optimizations should be able to make detach_loop() and submit_loop() perform roughly the same as detach_blocks() and submit_blocks() . However, you should perform your own benchmarks to see which option works best for your particular use case.
Unlike submit_task() , the member function submit_loop() only takes loop functions with no return values. The reason is that it wouldn't make sense to return a future for every single index of the loop. However, submit_blocks() does allow the block function to have a return value, as the number of blocks will generally not be too large, unlike the number of indices.
The block function will be executed once for each block, but the blocks are managed by the thread pool, with the user only able to select the number of blocks, but not the range of each block. Therefore, there is limited usability in returning one value per block. However, for cases where this is desired, such as for summation or some sorting algorithms, submit_blocks() does accept functions with return values, in which case it returns a BS::multi_future<T> object where T is the type of the return values.
Here's an example of a function template summing all elements of type T in a given range:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < cstdint > // std::uint64_t
# include < future > // std::future
# include < iostream > // std::cout
BS::thread_pool pool;
template < typename T>
T sum (T min, T max)
{
BS::multi_future<T> loop_future = pool. submit_blocks <T>(
min, max + 1 ,
[]( const T start, const T end)
{
T block_total = 0 ;
for (T i = start; i < end; ++i)
block_total += i;
return block_total;
},
100 );
T result = 0 ;
for (std::future<T>& future : loop_future)
result += future. get ();
return result;
}
int main ()
{
std::cout << sum<std:: uint64_t >( 1 , 1'000'000 );
} Here we used the fact that BS::multi_future<T> is a specialization of std::vector<std::future<T>> , so we can use a range-based for loop to iterate over the futures, and use the get() member function of each future to get its value. The values of the futures will be the partial sums from each block, so when we add them up, we will get the total sum. Note that we divided the loop into 100 blocks, so there will be 100 futures in total, each with the partial sum of 10,000 numbers.
The range-based for loop will likely start before the loop finished executing, and each time it calls a future, it will get the value of that future if it is ready, or it will wait until the future is ready and then get the value. This increases performance, since we can start summing the results without waiting for the entire loop to finish executing first - we only need to wait for individual blocks.
If we did want to wait until the entire loop finishes before summing the results, we could have used the get() member function of the BS::multi_future<T> object itself, which returns an std::vector<T> with the values obtained from each future. In that case, the sum could be obtained after calling submit_blocks() as follows:
std::vector<T> partial_sums = loop_future.get();
T result = std::reduce(partial_sums.begin(), partial_sums.end());
return result; The member functions detach_loop() , submit_loop() , detach_blocks() , and submit_blocks() parallelize a loop by splitting it into blocks, and submitting each block as an individual task to the queue, with each such task iterating over all the indices in the corresponding block's range, which can be numerous. However, sometimes we have loops with few indices, or more generally, a sequence of tasks enumerated by some index. In such cases, we can avoid the overhead of splitting into blocks and simply submit each individual index as its own independent task to the pool's queue.
This can be done with detach_sequence() and submit_sequence() . The syntax of these functions is similar to detach_loop() and submit_loop() , except that they don't have the num_blocks argument at the end. The sequence function must take only one argument, the index. As usual, detach_sequence() detaches the tasks and does not return a future, while submit_sequence() returns a BS::multi_future . If the tasks in the sequence return values, then the futures will contain those values, otherwise they will be void futures.
Here is a simple example:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < cstdint > // std::uint64_t
# include < iostream > // std::cout
# include < vector > // std::vector
using ui64 = std:: uint64_t ;
ui64 factorial ( const ui64 n)
{
ui64 result = 1 ;
for (ui64 i = 2 ; i <= n; ++i)
result *= i;
return result;
}
int main ()
{
BS::thread_pool pool;
constexpr ui64 max = 20 ;
BS::multi_future<ui64> sequence_future = pool. submit_sequence <ui64>( 0 , max + 1 , factorial);
std::vector<ui64> factorials = sequence_future. get ();
for (ui64 i = 0 ; i < max + 1 ; ++i)
std::cout << i << " ! = " << factorials[i] << ' n ' ;
}BS::multi_future<T> The helper class template BS::multi_future<T> , which we have been using throughout this section, provides a convenient way to collect and access groups of futures. This class is a specialization of std::vector<T> , so it should be used in a similar way:
[] operator to access the future at a specific index, or the push_back() member function to append a new future to the list.size() member function tells you how many futures are currently stored in the object. However, BS::multi_future<T> also has additional member functions that are aimed specifically at handling futures:
wait() to wait for all of them at once or get() to get an std::vector<T> with the results from all of them.ready_count() .valid() .wait_for() or wait until a specific time with wait_until() . These functions return true if all futures have been waited for before the duration expired or the time point was reached, and false otherwise. Aside from using BS::multi_future<T> to track the execution of parallelized loops, it can also be used, for example, whenever you have several different groups of tasks and you want to track the execution of each group individually.
The optional header file BS_thread_pool_utils.hpp contains several useful utility classes. These are not necessary for using the thread pool itself; BS_thread_pool.hpp is the only header file required. However, the utility classes can make writing multithreading code more convenient.
As with the main header file, the version of the utilities header file can be found by checking three macros:
BS_THREAD_POOL_UTILS_VERSION_MAJOR - indicates the major version.BS_THREAD_POOL_UTILS_VERSION_MINOR - indicates the minor version.BS_THREAD_POOL_UTILS_VERSION_PATCH - indicates the patch version.BS::synced_streamWhen printing to an output stream from multiple threads in parallel, the output may become garbled. For example, consider this code:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < iostream > // std::cout
BS::thread_pool pool;
int main ()
{
pool. detach_sequence ( 0 , 5 ,
[]( int i)
{
std::cout << " Task no. " << i << " executing. n " ;
});
}The output will be a mess similar to this:
Task no. Task no. Task no. 3 executing.
0 executing.
Task no. 41 executing.
Task no. 2 executing.
executing.
The reason is that, although each individual insertion to std::cout is thread-safe, there is no mechanism in place to ensure subsequent insertions from the same thread are printed contiguously.
The utility class BS::synced_stream is designed to eliminate such synchronization issues. The constructor takes one optional argument, specifying the output stream to print to. If no argument is supplied, std::cout will be used:
// Construct a synced stream that will print to std::cout.
BS::synced_stream sync_out;
// Construct a synced stream that will print to the output stream my_stream.
BS::synced_stream sync_out (my_stream); The member function print() takes an arbitrary number of arguments, which are inserted into the stream one by one, in the order they were given. println() does the same, but also prints a newline character n at the end, for convenience. A mutex is used to synchronize this process, so that any other calls to print() or println() using the same BS::synced_stream object must wait until the previous call has finished.
As an example, this code:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
BS::synced_stream sync_out;
BS::thread_pool pool;
int main ()
{
pool. detach_sequence ( 0 , 5 ,
[]( int i)
{
sync_out. println ( " Task no. " , i, " executing. " );
});
}Will print out:
Task no. 0 executing.
Task no. 1 executing.
Task no. 2 executing.
Task no. 3 executing.
Task no. 4 executing.
Warning: Always create the BS::synced_stream object before the BS::thread_pool object, as we did in this example. When the BS::thread_pool object goes out of scope, it waits for the remaining tasks to be executed. If the BS::synced_stream object goes out of scope before the BS::thread_pool object, then any tasks using the BS::synced_stream will crash. Since objects are destructed in the opposite order of construction, creating the BS::synced_stream object before the BS::thread_pool object ensures that the BS::synced_stream is always available to the tasks, even while the pool is destructing.
Most stream manipulators defined in the headers <ios> and <iomanip> , such as std::setw (set the character width of the next output), std::setprecision (set the precision of floating point numbers), and std::fixed (display floating point numbers with a fixed number of digits), can be passed to print() and println() just as you would pass them to a stream.
The only exceptions are the flushing manipulators std::endl and std::flush , which will not work because the compiler will not be able to figure out which template specializations to use. Instead, use BS::synced_stream::endl and BS::synced_stream::flush .这是一个示例:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
# include < cmath > // std::sqrt
# include < iomanip > // std::setprecision, std::setw
# include < ios > // std::fixed
BS::synced_stream sync_out;
BS::thread_pool pool;
int main ()
{
sync_out. print ( std::setprecision ( 10 ), std::fixed);
pool. detach_sequence ( 0 , 16 ,
[]( int i)
{
sync_out. print ( " The square root of " , std::setw ( 2 ), i, " is " , std::sqrt (i), " . " , BS::synced_stream::endl);
});
} Note, however, that BS::synced_stream::endl should only be used if flushing is desired; otherwise, a newline character should be used instead.
BS::timerIf you are using a thread pool, then your code is most likely performance-critical. Achieving maximum performance requires performing a considerable amount of benchmarking to determine the optimal settings and algorithms. Therefore, it is important to be able to measure the execution time of various computations and operations under different conditions.
The utility class BS::timer provides a simple way to measure execution time. It is very straightforward to use:
BS::timer object.start() member function.stop() member function.ms() to obtain the elapsed time for the computation in milliseconds.current_ms() to obtain the elapsed time so far but keep the timer ticking.例如:
BS::timer tmr;
tmr.start();
do_something ();
tmr.stop();
std::cout << " The elapsed time was " << tmr.ms() << " ms. n " ; A practical application of the BS::timer class can be found in the benchmark portion of the test program BS_thread_pool_test.cpp .
BS::signaller BS::signaller is a utility class which can be used to allow simple signalling between threads. To use it, construct an object and then pass it to the different threads. Multiple threads can call the wait() member function of the signaller. When another thread calls the ready() member function, the waiting threads will stop waiting.
That's really all there is to it; BS::signaller is really just a convenient wrapper around std::promise , which contains both the promise and its future. For usage examples, please see the test program BS_thread_pool_test.cpp .
Sometimes you may wish to monitor what is happening with the tasks you submitted to the pool. This may be done using three member functions:
get_tasks_queued() gets the number of tasks currently waiting in the queue to be executed by the threads.get_tasks_running() gets the number of tasks currently being executed by the threads.get_tasks_total() gets the total number of unfinished tasks: either still in the queue, or running in a thread.get_tasks_total() == get_tasks_queued() + get_tasks_running() .These functions are demonstrated in the following program:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
# include < chrono > // std::chrono
# include < thread > // std::this_thread
BS::synced_stream sync_out;
BS::thread_pool pool ( 4 );
void sleep_half_second ( const int i)
{
std::this_thread::sleep_for ( std::chrono::milliseconds ( 500 ));
sync_out. println ( " Task " , i, " done. " );
}
void monitor_tasks ()
{
sync_out. println (pool. get_tasks_total (), " tasks total, " , pool. get_tasks_running (), " tasks running, " , pool. get_tasks_queued (), " tasks queued. " );
}
int main ()
{
pool. wait ();
pool. detach_sequence ( 0 , 12 , sleep_half_second);
monitor_tasks ();
std::this_thread::sleep_for ( std::chrono::milliseconds ( 750 ));
monitor_tasks ();
std::this_thread::sleep_for ( std::chrono::milliseconds ( 500 ));
monitor_tasks ();
std::this_thread::sleep_for ( std::chrono::milliseconds ( 500 ));
monitor_tasks ();
}Assuming you have at least 4 hardware threads (so that 4 tasks can run concurrently), the output should be similar to:
12 tasks total, 0 tasks running, 12 tasks queued.
Task 0 done.
Task 1 done.
Task 2 done.
Task 3 done.
8 tasks total, 4 tasks running, 4 tasks queued.
Task 4 done.
Task 5 done.
Task 6 done.
Task 7 done.
4 tasks total, 4 tasks running, 0 tasks queued.
Task 8 done.
Task 9 done.
Task 10 done.
Task 11 done.
0 tasks total, 0 tasks running, 0 tasks queued.
The reason we called pool.wait() in the beginning is that when the thread pool is created, an initialization task runs in each thread, so if we don't wait, the first line will say there are 16 tasks in total, including the 4 initialization tasks. See below for more details.
Consider a situation where the user cancels a multithreaded operation while it is still ongoing. Perhaps the operation was split into multiple tasks, and half of the tasks are currently being executed by the pool's threads, but the other half are still waiting in the queue.
The thread pool cannot terminate the tasks that are already running, as the C++17 standard does not provide that functionality (and in any case, abruptly terminating a task while it's running could have extremely bad consequences, such as memory leaks and data corruption). However, the tasks that are still waiting in the queue can be purged using the purge() member function.
Once purge() is called, any tasks still waiting in the queue will be discarded, and will never be executed by the threads. Please note that there is no way to restore the purged tasks; they are gone forever.
Consider for example the following program:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
# include < chrono > // std::chrono
# include < thread > // std::this_thread
BS::synced_stream sync_out;
BS::thread_pool pool ( 4 );
int main ()
{
for ( size_t i = 0 ; i < 8 ; ++i)
{
pool. detach_task (
[i]
{
std::this_thread::sleep_for ( std::chrono::milliseconds ( 100 ));
sync_out. println ( " Task " , i, " done. " );
});
}
std::this_thread::sleep_for ( std::chrono::milliseconds ( 50 ));
pool. purge ();
pool. wait ();
} The program submit 8 tasks to the queue. Each task waits 100 milliseconds and then prints a message. The thread pool has 4 threads, so it will execute the first 4 tasks in parallel, and then the remaining 4. We wait 50 milliseconds, to ensure that the first 4 tasks have all started running. Then we call purge() to purge the remaining 4 tasks. As a result, these tasks never get executed. However, since the first 4 tasks are still running when purge() is called, they will finish uninterrupted; purge() only discards tasks that have not yet started running. The output of the program therefore only contains the messages from the first 4 tasks:
Task 0 done.
Task 1 done.
Task 2 done.
Task 3 done.
submit_task() catches any exceptions thrown by the submitted task and forwards them to the corresponding future. They can then be caught when invoking the get() member function of the future.例如:
# include " BS_thread_pool.hpp "
BS::synced_stream sync_out;
BS::thread_pool pool;
double inverse ( const double x)
{
if (x == 0 )
throw std::runtime_error ( " Division by zero! " );
else
return 1 / x;
}
int main ()
{
constexpr double num = 0 ;
std::future< double > my_future = pool. submit_task (inverse, num);
try
{
const double result = my_future. get ();
sync_out. println ( " The inverse of " , num, " is " , result, " . " );
}
catch ( const std:: exception & e)
{
sync_out. println ( " Caught exception: " , e. what ());
}
}The output will be:
Caught exception: Division by zero!
However, if you change num to any non-zero number, no exceptions will be thrown and the inverse will be printed.
It is important to note that wait() does not throw any exceptions; only get() does. Therefore, even if your task does not return anything, ie your future is an std::future<void> , you must still use get() on the future obtained from it if you want to catch exceptions thrown by it.这是一个示例:
# include " BS_thread_pool.hpp "
BS::synced_stream sync_out;
BS::thread_pool pool;
void print_inverse ( const double x)
{
if (x == 0 )
throw std::runtime_error ( " Division by zero! " );
else
sync_out. println ( " The inverse of " , x, " is " , 1 / x, " . " );
}
int main ()
{
constexpr double num = 0 ;
std::future< void > my_future = pool. submit_task (print_inverse, num);
try
{
my_future. get ();
}
catch ( const std:: exception & e)
{
sync_out. println ( " Caught exception: " , e. what ());
}
} When using BS::multi_future to handle multiple futures at once, exception handling works the same way: if any of the futures may throw exceptions, you may catch these exceptions when calling get() , even in the case of BS::multi_future<void> .
If you do not require exception handling, or if exceptions are explicitly disabled in your codebase, you can define the macro BS_THREAD_POOL_DISABLE_EXCEPTION_HANDLING before including BS_thread_pool.hpp , which will disable exception handling in submit_task() . Note that if the feature-test macro __cpp_exceptions is undefined, BS_THREAD_POOL_DISABLE_EXCEPTION_HANDLING will be automatically defined.
BS::thread_pool comes with a variety of methods to obtain information about the threads in the pool:
BS::this_thread provides functionality similar to std::this_thread . If the current thread belongs to a BS::thread_pool object, then BS::this_thread::get_index() can be used to get the index of the current thread, and BS::this_thread::get_pool() can be used to get the pointer to the thread pool that owns the current thread. Please see the reference below for more details.get_thread_ids() returns a vector containing the unique identifiers for each of the pool's threads, as obtained by std::thread::get_id() . These values are not so useful on their own, but can be used for whatever the user wants to use them for.get_native_handles() , if enabled, returns a vector containing the underlying implementation-defined thread handles for each of the pool's threads, as obtained by std::thread::native_handle() . For more information, see the relevant section below. Sometimes, it is necessary to initialize the threads before they run any tasks. This can be done by submitting a proper initialization function to the constructor or to reset() , either as the only argument or as the second argument after the desired number of threads. The thread initialization must take no arguments and have no return value. However, if needed, the function can use BS::this_thread::get_index() and BS::this_thread::get_pool() to figure out which thread and pool it belongs to.
The thread initialization function is submitted as a set of special tasks, one per thread, which bypass the queue, but still count towards the number of running tasks, which means get_tasks_total() and get_tasks_running() will report that these tasks are running if they are checked immediately after the pool is initialized.
This is done so that the user has the option to either wait for the initialization tasks to finish, by calling wait() on the pool, or just keep going. In either case, the initialization tasks will always finish executing before any tasks are picked out of the queue, so there is no reason to wait for them to finish unless they have some side-effects that affect the main thread.
Here is a simple example:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
# include < random > // std::mt19937_64, std::random_device
BS::synced_stream sync_out;
thread_local std::mt19937_64 twister;
int main ()
{
BS::thread_pool pool (
[]
{
twister. seed ( std::random_device ()());
});
pool. submit_sequence ( 0 , 4 ,
[]( int )
{
sync_out. println ( " I generated a random number: " , twister ());
})
. wait ();
} In this example, we create a thread_local Mersenne twister engine, meaning that each thread has its own independent engine. However, we did not seed the engine, so each thread will generate the exact same sequence of pseudo-random numbers. To remedy this, we pass an initialization function to the BS::thread_pool constructor which seeds the twister in each thread with the (hopefully) non-deterministic random number generator std::random_device .
In C++, it is often crucial to pass function arguments by reference or constant reference, instead of by value. This allows the function to access the object being passed directly, rather than creating a new copy of the object. We have already seen that submitting an argument by reference is a simple matter of capturing it with a & in the lambda capture list. To submit as constant reference, we can use std::as_const as in the following example:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
# include < utility > // std::as_const
BS::synced_stream sync_out;
void increment ( int & x)
{
++x;
}
void print ( const int & x)
{
sync_out. println (x);
}
int main ()
{
BS::thread_pool pool;
int n = 0 ;
pool. submit_task (
[&n]
{
increment (n);
})
. wait ();
pool. submit_task (
[&n = std::as_const (n)]
{
print (n);
})
. wait ();
} The increment() function takes a reference to an integer, and increments that integer. Passing the argument by reference guarantees that n itself, in the scope of main() , will be incremented - rather than a copy of it in the scope of increment() .
Similarly, the print() function takes a constant reference to an integer, and prints that integer. Passing the argument by constant reference guarantees that the variable will not be accidentally modified by the function, even though we are accessing n itself, rather than a copy. If we replace print with increment , the program won't compile, as increment cannot take constant references.
Generally, it is not really necessary to pass arguments by constant reference, but it is more "correct" to do so, if we would like to guarantee that the variable being referenced is indeed never modified. This section is therefore included here for completeness.
Sometimes you may wish to temporarily pause the execution of tasks, or perhaps you want to submit tasks to the queue in advance and only start executing them at a later time. You can do this using the member functions pause() , unpause() , and is_paused() .
However, these functions are disabled by default, and must be explicitly enabled by defining the macro BS_THREAD_POOL_ENABLE_PAUSE before including BS_thread_pool.hpp . The reason is that pausing the pool adds additional checks to the waiting and worker functions, which have a very small but non-zero overhead.
When you call pause() , the workers will temporarily stop retrieving new tasks out of the queue. However, any tasks already executed will keep running until they are done, since the thread pool has no control over the internal code of your tasks. If you need to pause a task in the middle of its execution, you must do that manually by programming your own pause mechanism into the task itself. To resume retrieving tasks, call unpause() . To check whether the pool is currently paused, call is_paused() .
这是一个示例:
# define BS_THREAD_POOL_ENABLE_PAUSE
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
# include < chrono > // std::chrono
# include < thread > // std::this_thread
BS::synced_stream sync_out;
BS::thread_pool pool ( 4 );
void sleep_half_second ( const int i)
{
std::this_thread::sleep_for ( std::chrono::milliseconds ( 500 ));
sync_out. println ( " Task " , i, " done. " );
}
void check_if_paused ()
{
if (pool. is_paused ())
sync_out. println ( " Pool paused. " );
else
sync_out. println ( " Pool unpaused. " );
}
int main ()
{
pool. detach_sequence ( 0 , 8 , sleep_half_second);
sync_out. println ( " Submitted 8 tasks. " );
std::this_thread::sleep_for ( std::chrono::milliseconds ( 250 ));
pool. pause ();
check_if_paused ();
std::this_thread::sleep_for ( std::chrono::milliseconds ( 1000 ));
sync_out. println ( " Still paused... " );
std::this_thread::sleep_for ( std::chrono::milliseconds ( 1000 ));
pool. detach_sequence ( 8 , 12 , sleep_half_second);
sync_out. println ( " Submitted 4 more tasks. " );
sync_out. println ( " Still paused... " );
std::this_thread::sleep_for ( std::chrono::milliseconds ( 1000 ));
pool. unpause ();
check_if_paused ();
}Assuming you have at least 4 hardware threads, the output should be similar to:
Submitted 8 tasks.
Pool paused.
Task 0 done.
Task 1 done.
Task 2 done.
Task 3 done.
Still paused...
Submitted 4 more tasks.
Still paused...
Pool unpaused.
Task 4 done.
Task 5 done.
Task 6 done.
Task 7 done.
Task 8 done.
Task 9 done.
Task 10 done.
Task 11 done.
Here is what happened. We initially submitted a total of 8 tasks to the queue. Since we waited for 250ms before pausing, the first 4 tasks have already started running, so they kept running until they finished. While the pool was paused, we submitted 4 more tasks to the queue, but they just waited at the end of the queue. When we unpaused, the remaining 4 initial tasks were executed, followed by the 4 new tasks.
While the workers are paused, wait() will wait for the running tasks instead of all tasks (otherwise it would wait forever). This is demonstrated by the following program:
# define BS_THREAD_POOL_ENABLE_PAUSE
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
# include < chrono > // std::chrono
# include < thread > // std::this_thread
BS::synced_stream sync_out;
BS::thread_pool pool ( 4 );
void sleep_half_second ( const int i)
{
std::this_thread::sleep_for ( std::chrono::milliseconds ( 500 ));
sync_out. println ( " Task " , i, " done. " );
}
void check_if_paused ()
{
if (pool. is_paused ())
sync_out. println ( " Pool paused. " );
else
sync_out. println ( " Pool unpaused. " );
}
int main ()
{
pool. detach_sequence ( 0 , 8 , sleep_half_second);
sync_out. println ( " Submitted 8 tasks. Waiting for them to complete. " );
pool. wait ();
pool. detach_sequence ( 8 , 20 , sleep_half_second);
sync_out. println ( " Submitted 12 more tasks. " );
std::this_thread::sleep_for ( std::chrono::milliseconds ( 250 ));
pool. pause ();
check_if_paused ();
sync_out. println ( " Waiting for the " , pool. get_tasks_running (), " running tasks to complete. " );
pool. wait ();
sync_out. println ( " All running tasks completed. " , pool. get_tasks_queued (), " tasks still queued. " );
std::this_thread::sleep_for ( std::chrono::milliseconds ( 1000 ));
sync_out. println ( " Still paused... " );
std::this_thread::sleep_for ( std::chrono::milliseconds ( 1000 ));
sync_out. println ( " Still paused... " );
std::this_thread::sleep_for ( std::chrono::milliseconds ( 1000 ));
pool. unpause ();
check_if_paused ();
std::this_thread::sleep_for ( std::chrono::milliseconds ( 250 ));
sync_out. println ( " Waiting for the remaining " , pool. get_tasks_total (), " tasks ( " , pool. get_tasks_running (), " running and " , pool. get_tasks_queued (), " queued) to complete. " );
pool. wait ();
sync_out. println ( " All tasks completed. " );
}The output should be similar to:
Submitted 8 tasks. Waiting for them to complete.
Task 0 done.
Task 1 done.
Task 2 done.
Task 3 done.
Task 4 done.
Task 5 done.
Task 6 done.
Task 7 done.
Submitted 12 more tasks.
Pool paused.
Waiting for the 4 running tasks to complete.
Task 8 done.
Task 9 done.
Task 10 done.
Task 11 done.
All running tasks completed. 8 tasks still queued.
Still paused...
Still paused...
Pool unpaused.
Waiting for the remaining 8 tasks (4 running and 4 queued) to complete.
Task 12 done.
Task 13 done.
Task 14 done.
Task 15 done.
Task 16 done.
Task 17 done.
Task 18 done.
Task 19 done.
All tasks completed.
The first wait() , which was called while the pool was not paused, waited for all 8 tasks, both running and queued. The second wait() , which was called after pausing the pool, only waited for the 4 running tasks, while the other 8 tasks remained queued, and were not executed since the pool was paused. Finally, the third wait() , which was called after unpausing the pool, waited for the remaining 8 tasks, both running and queued.
Warning: If the thread pool is destroyed while paused, any tasks still in the queue will never be executed!
Consider the following program:
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < iostream > // std::cout
int main ()
{
BS::thread_pool pool;
pool. detach_task (
[&pool]
{
pool. wait ();
std::cout << " Done waiting. n " ;
});
}This program creates a thread pool, and then detaches a task that waits for tasks in the same thread pool to complete. If you run this program, it will never print the message "Done waiting", because the task will wait for itself to complete. This causes a deadlock , and the program will wait forever.
Usually, in simple programs, this will never happen. However, in more complicated programs, perhaps ones running multiple thread pools in parallel, wait deadlocks could potentially occur. In such cases, the macro BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK can be defined before including BS_thread_pool.hpp . wait() will then check whether the user tried to call it from within a thread of the same pool, and if so, it will throw the exception BS::thread_pool::wait_deadlock instead of waiting. This check is disabled by default because wait deadlocks are not something that happens often, and the check adds a small but non-zero overhead every time wait() is called.
这是一个示例:
# define BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK
# include " BS_thread_pool.hpp " // BS::thread_pool
# include < iostream > // std::cout
int main ()
{
BS::thread_pool pool;
pool. detach_task (
[&pool]
{
try
{
pool. wait ();
std::cout << " Done waiting. n " ;
}
catch ( const BS::thread_pool::wait_deadlock&)
{
std::cout << " Error: Deadlock! n " ;
}
});
} This time, wait() will detect the deadlock, and will throw an exception, causing the output to be "Error: Deadlock!" 。
Note that if the feature-test macro __cpp_exceptions is undefined, BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK will be automatically undefined.
The BS::thread_pool member function get_native_handles() returns a vector containing the underlying implementation-defined thread handles for each of the pool's threads. These can then be used in an implementation-specific way to manage the threads at the OS level
However, note that this will generally not be portable code. Furthermore, this feature uses std::thread::native_handle(), which is in the C++ standard library, but is not guaranteed to be present on all systems. Therefore, this feature is turned off by default, and must be turned on by defining the macro BS_THREAD_POOL_ENABLE_NATIVE_HANDLES before including BS_thread_pool.hpp .
这是一个示例:
# define BS_THREAD_POOL_ENABLE_NATIVE_HANDLES
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
# include < thread > // std::thread
# include < vector > // std::vector
BS::synced_stream sync_out;
BS::thread_pool pool ( 4 );
int main ()
{
std::vector<std::thread::native_handle_type> handles = pool. get_native_handles ();
for (BS:: concurrency_t i = 0 ; i < handles. size (); ++i)
sync_out. println ( " Thread " , i, " native handle: " , handles[i]);
}The output will depend on your compiler and operating system.这是一个示例:
Thread 0 native handle: 000000F4
Thread 1 native handle: 000000F8
Thread 2 native handle: 000000EC
Thread 3 native handle: 000000FC
Defining the macro BS_THREAD_POOL_ENABLE_PRIORITY before including BS_thread_pool.hpp enables task priority. The priority of a task or group of tasks may then be specified as an additional argument (at the end of the argument list) to detach_task() , submit_task() , detach_blocks() , submit_blocks() , detach_loop() , submit_loop() , detach_sequence() , and submit_sequence() . If the priority is not specified, the default value will be 0.
The priority is a number of type BS::priority_t , which is a signed 16-bit integer, so it can have any value between -32,768 and 32,767. The tasks will be executed in priority order from highest to lowest. If priority is assigned to the block/loop/sequence parallelization functions, which submit multiple tasks, then all of these tasks will have the same priority.
The namespace BS::pr contains some pre-defined priorities for users who wish to avoid magic numbers and enjoy better future-proofing. In order of decreasing priority, the pre-defined priorities are: BS::pr::highest , BS::pr::high , BS::pr::normal , BS::pr::low , and BS::pr::lowest .
Here is a simple example:
# define BS_THREAD_POOL_ENABLE_PRIORITY
# include " BS_thread_pool.hpp " // BS::thread_pool
# include " BS_thread_pool_utils.hpp " // BS::synced_stream
BS::synced_stream sync_out;
BS::thread_pool pool ( 1 );
int main ()
{
pool. detach_task ([] { sync_out. println ( " This task will execute third. " ); }, BS::pr:: normal );
pool. detach_task ([] { sync_out. println ( " This task will execute fifth. " ); }, BS::pr::lowest);
pool. detach_task ([] { sync_out. println ( " This task will execute second. " ); }, BS::pr::high);
pool. detach_task ([] { sync_out. println ( " This task will execute first. " ); }, BS::pr::highest);
pool. detach_task ([] { sync_out. println ( " This task will execute fourth. " ); }, BS::pr::low);
}This program will print out the tasks in the correct priority order. Note that for simplicity, we used a pool with just one thread, so the tasks will run one at a time. In a pool with 5 or more threads, all 5 tasks will actually run more or less at the same time, because, for example, the task with the second-highest priority will be picked up by another thread while the task with the highest priority is still running.
Of course, this is just a pedagogical example. In a realistic use case we may want, for example, to submit tasks that must be completed immediately with high priority so they skip over other tasks already in the queue, or background non-urgent tasks with low priority so they evaluate only after higher-priority tasks are done.
Here are some subtleties to note when using task priority:
std::priority_queue , which has O(log n) complexity for storing new tasks, but only O(1) complexity for retrieving the next (ie highest-priority) task. This is in contrast with std::queue , used if priority is disabled, which both stores and retrieves with O(1) complexity.std::priority_queue as a binary heap, which means tasks are stored as a binary tree instead of sequentially. To execute tasks in submission order, give them monotonically decreasing priorities.BS::priority_t is defined to be ( std::int_least16_t ), since this type is guaranteed to be present on all systems, rather than std::int16_t , which is optional in the C++ standard. This means that on some exotic systems BS::priority_t may actually have more than 16 bits. However, the pre-defined priorities are 100% portable, and will always have the same values (eg: BS::pr::highest = 32767 ) regardless of the actual bit width. The file BS_thread_pool_test.cpp in the tests folder of the GitHub repository will perform automated tests of all aspects of the library. The output will be printed both to std::cout and to a file with the same name as the executable and the suffix -yyyy-mm-dd_hh.mm.ss.log based on the current date and time. In addition, the code is meant to serve as an extensive example of how to properly use the library.
Please make sure to:
BS_thread_pool_test.cpp with optimization flags enabled (eg -O3 on GCC / Clang or /O2 on MSVC).The test program also takes command line arguments for automation purposes:
help : Show a help message and exit. Any other arguments will be ignored.log : Create a log file.tests : Perform standard tests.deadlock Perform long deadlock tests.benchmarks : Perform benchmarks. If no options are entered, the default is: log tests benchmarks .
By default, the test program enables all the optional features by defining the suitable macros, so it can test them. However, if the macro BS_THREAD_POOL_LIGHT_TEST is defined during compilation, the optional features will not be tested.
A PowerShell script, BS_thread_pool_test.ps1 , is provided for your convenience in the tests folder to make running the test on multiple compilers and operating systems easier. Since it is written in PowerShell, it is fully portable and works on Windows, Linux, and macOS. The script will automatically detect if Clang, GCC, and/or MSVC are available, and compile the test program using each available compiler twice - with and without all the optional features. It will then run each compiled test program and report on any errors.
If any of the tests fail, please submit a bug report including the exact specifications of your system (OS, CPU, compiler, etc.) and the generated log file.
If all checks passed, BS_thread_pool_test.cpp performs simple benchmarks by filling a very large vector with values using detach_blocks() . The program decides what the size of the vector should be by testing how many elements are needed to reach a certain target duration when parallelizing using a number of blocks equal to the number of threads. This ensures that the test takes approximately the same amount of time on all systems, and is thus more consistent and portable.
Once the appropriate size of the vector has been determined, the program allocates the vector and fills it with values, calculated according to a fixed prescription. This operation is performed both single-threaded and multithreaded, with the multithreaded computation spread across multiple tasks submitted to the pool.
Several different multithreaded tests are performed, with the number of tasks either equal to, smaller than, or larger than the pool's thread count. Each test is repeated multiple times, with the run times averaged over all runs of the same test. The program keeps increasing the number of blocks by a factor of 2 until diminishing returns are encountered. The run times of the tests are compared, and the maximum speedup obtained is calculated.
As an example, here are the results of the benchmarks from a Digital Research Alliance of Canada node equipped with two 20-core / 40-thread Intel Xeon Gold 6148 CPUs (for a total of 40 cores and 80 threads), running CentOS Linux 7.9.2009. The tests were compiled using GCC v13.2.0 with the -O3 and -march=native flags. The output was as follows:
======================
Performing benchmarks:
======================
Using 80 threads.
Determining the number of elements to generate in order to achieve an approximate mean execution time of 50 ms with 80 tasks...
Each test will be repeated up to 30 times to collect reliable statistics.
Generating 27962000 elements:
[......]
Single-threaded, mean execution time was 2815.2 ms with standard deviation 3.5 ms.
[......]
With 2 tasks, mean execution time was 1431.3 ms with standard deviation 10.1 ms.
[.......]
With 4 tasks, mean execution time was 722.1 ms with standard deviation 11.4 ms.
[..............]
With 8 tasks, mean execution time was 364.9 ms with standard deviation 10.9 ms.
[............................]
With 16 tasks, mean execution time was 181.9 ms with standard deviation 8.0 ms.
[..............................]
With 32 tasks, mean execution time was 110.6 ms with standard deviation 1.8 ms.
[..............................]
With 64 tasks, mean execution time was 64.0 ms with standard deviation 6.3 ms.
[..............................]
With 128 tasks, mean execution time was 59.8 ms with standard deviation 0.8 ms.
[..............................]
With 256 tasks, mean execution time was 59.0 ms with standard deviation 0.0 ms.
[..............................]
With 512 tasks, mean execution time was 52.8 ms with standard deviation 0.4 ms.
[..............................]
With 1024 tasks, mean execution time was 50.7 ms with standard deviation 0.9 ms.
[..............................]
With 2048 tasks, mean execution time was 50.0 ms with standard deviation 0.5 ms.
[..............................]
With 4096 tasks, mean execution time was 49.4 ms with standard deviation 0.5 ms.
[..............................]
With 8192 tasks, mean execution time was 50.2 ms with standard deviation 0.4 ms.
Maximum speedup obtained by multithreading vs. single-threading: 56.9x, using 4096 tasks.
+++++++++++++++++++++++++++++++++++++++
Thread pool performance test completed!
+++++++++++++++++++++++++++++++++++++++
These two CPUs have 40 physical cores in total, with each core providing two separate logical cores via hyperthreading, for a total of 80 threads. Without hyperthreading, we would expect a maximum theoretical speedup of 40x. With hyperthreading, one might naively expect to achieve up to an 80x speedup, but this is in fact impossible, as each pair of hyperthreaded logical cores share the same physical core's resources. However, generally we would expect at most an estimated 30% additional speedup from hyperthreading, which amounts to around 52x in this case. The speedup of 56.9x in our performance test exceeds this estimate.
If you are using the vcpkg C/C++ package manager, you can easily install BS::thread_pool with the following commands:
On Linux/macOS:
./vcpkg install bshoshany-thread-pool
在Windows上:
.vcpkg install bshoshany-thread-pool:x86-windows bshoshany-thread-pool:x64-windows
To update the package to the latest version, run:
vcpkg upgrade
If you are using the Conan C/C++ package manager, you can easily integrate BS::thread_pool into your project by adding the following lines to your conanfile.txt :
[requires]
bshoshany-thread-pool/4.1.0To update the package to the latest version, simply change the version number. Please refer to this package's page on ConanCenter for more information.
If you are using the Meson build system, you can install BS::thread_pool from WrapDB. To do so, create a subprojects folder in your project (if it does not already exist) and run the following command:
meson wrap install bshoshany-thread-pool
Then, use dependency('bshoshany-thread-pool') in your meson.build file to include the package. To update the package to the latest version, run:
meson wrap update bshoshany-thread-pool
If you are using CMake, you can install BS::thread_pool with CPM. If CPM is already installed, simply add the following to your project's CMakeLists.txt :
CPMAddPackage(
NAME BS_thread_pool
GITHUB_REPOSITORY bshoshany/thread-pool
VERSION 4.1.0)
add_library (BS_thread_pool INTERFACE )
target_include_directories (BS_thread_pool INTERFACE ${BS_thread_pool_SOURCE_DIR} / include )This will automatically download the indicated version of the package from the GitHub repository and include it in your project.
It is also possible to use CPM without installing it first, by adding the following lines to CMakeLists.txt before CPMAddPackage :
set (CPM_DOWNLOAD_LOCATION " ${CMAKE_BINARY_DIR} /cmake/CPM.cmake" )
if ( NOT ( EXISTS ${CPM_DOWNLOAD_LOCATION} ))
message ( STATUS "Downloading CPM.cmake" )
file (DOWNLOAD https://github.com/cpm-cmake/CPM.cmake/releases/latest/download/CPM.cmake ${CPM_DOWNLOAD_LOCATION} )
endif ()
include ( ${CPM_DOWNLOAD_LOCATION} ) Here is an example of a complete CMakeLists.txt for a project named my_project consisting of a single source file main.cpp which uses BS_thread_pool.hpp :
cmake_minimum_required ( VERSION 3.19)
project (my_project LANGUAGES CXX)
set (CMAKE_CXX_STANDARD 17)
set (CMAKE_CXX_STANDARD_REQUIRED ON )
set (CMAKE_CXX_EXTENSIONS OFF )
set (CPM_DOWNLOAD_LOCATION " ${CMAKE_BINARY_DIR} /cmake/CPM.cmake" )
if ( NOT ( EXISTS ${CPM_DOWNLOAD_LOCATION} ))
message ( STATUS "Downloading CPM.cmake" )
file (DOWNLOAD https://github.com/cpm-cmake/CPM.cmake/releases/latest/download/CPM.cmake ${CPM_DOWNLOAD_LOCATION} )
endif ()
include ( ${CPM_DOWNLOAD_LOCATION} )
CPMAddPackage(
NAME BS_thread_pool
GITHUB_REPOSITORY bshoshany/thread-pool
VERSION 4.1.0)
add_library (BS_thread_pool INTERFACE )
target_include_directories (BS_thread_pool INTERFACE ${BS_thread_pool_SOURCE_DIR} / include )
add_executable (my_project main.cpp)
target_link_libraries (my_project BS_thread_pool) With both CMakeLists.txt and main.cpp in the same folder, type the following commands to build the project:
cmake -S . -B build
cmake --build build
This section provides a complete reference to classes, member functions, objects, and macros available in this library, along with other important information. Member functions are given here with simplified prototypes (eg removing const ) for ease of reading.
More information can be found in the provided Doxygen comments. Any modern IDE, such as Visual Studio Code, can use the Doxygen comments to provide automatic documentation for any class and member function in this library when hovering over code with the mouse or using auto-complete.
BS_thread_pool.hpp ) BS::thread_pool class The class BS::thread_pool is the main thread pool class. It can be used to create a pool of threads and submit tasks to a queue. When a thread becomes available, it takes a task from the queue and executes it. The member functions that are available by default, when no macros are defined, are:
thread_pool() : Construct a new thread pool. The number of threads will be the total number of hardware threads available, as reported by the implementation. This is usually determined by the number of cores in the CPU. If a core is hyperthreaded, it will count as two threads.thread_pool(BS::concurrency_t num_threads) : Construct a new thread pool with the specified number of threads.thread_pool(std::function<void()>& init_task) : Construct a new thread pool with the specified initialization function.thread_pool(BS::concurrency_t num_threads, std::function<void()>& init_task) : Construct a new thread pool with the specified number of threads and initialization function.void reset() : Reset the pool with the total number of hardware threads available, as reported by the implementation. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well.void reset(BS::concurrency_t num_threads) : Reset the pool with a new number of threads.void reset(std::function<void()>& init_task) Reset the pool with the total number of hardware threads available, as reported by the implementation, and a new initialization function.void reset(BS::concurrency_t num_threads, std::function<void()>& init_task) : Reset the pool with a new number of threads and a new initialization function.size_t get_tasks_queued() : Get the number of tasks currently waiting in the queue to be executed by the threads.size_t get_tasks_running() : Get the number of tasks currently being executed by the threads.size_t get_tasks_total() : Get the total number of unfinished tasks: either still waiting in the queue, or running in a thread. Note that get_tasks_total() == get_tasks_queued() + get_tasks_running() .BS::concurrency_t get_thread_count() : Get the number of threads in the pool.std::vector<std::thread::id> get_thread_ids() : Get a vector containing the unique identifiers for each of the pool's threads, as obtained by std::thread::get_id() .T and F are template parameters):void detach_task(F&& task) : Submit a function with no arguments and no return value into the task queue. To push a function with arguments, enclose it in a lambda expression. Does not return a future, so the user must use wait() or some other method to ensure that the task finishes executing, otherwise bad things will happen.void detach_blocks(T first_index, T index_after_last, F&& block, size_t num_blocks = 0) : Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. The block function takes two arguments, the start and end of the block, so that it is only called only once per block, but it is up to the user make sure the block function correctly deals with all the indices in each block. Does not return a BS::multi_future , so the user must use wait() or some other method to ensure that the loop finishes executing, otherwise bad things will happen.void detach_loop(T first_index, T index_after_last, F&& loop, size_t num_blocks = 0) : Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. The loop function takes one argument, the loop index, so that it is called many times per block. Does not return a BS::multi_future , so the user must use wait() or some other method to ensure that the loop finishes executing, otherwise bad things will happen.void detach_sequence(T first_index, T index_after_last, F&& sequence) : Submit a sequence of tasks enumerated by indices to the queue. Does not return a BS::multi_future , so the user must use wait() or some other method to ensure that the sequence finishes executing, otherwise bad things will happen.T , F , and R are template parameters):std::future<R> submit_task(F&& task) : Submit a function with no arguments into the task queue. To submit a function with arguments, enclose it in a lambda expression. If the function has a return value, get a future for the eventual returned value. If the function has no return value, get an std::future<void> which can be used to wait until the task finishes.BS::multi_future<R> submit_blocks(T first_index, T index_after_last, F&& block, size_t num_blocks = 0) : Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. The block function takes two arguments, the start and end of the block, so that it is only called only once per block, but it is up to the user make sure the block function correctly deals with all the indices in each block. Returns a BS::multi_future that contains the futures for all of the blocks.BS::multi_future<void> submit_loop(T first_index, T index_after_last, F&& loop, size_t num_blocks = 0) : Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. The loop function takes one argument, the loop index, so that it is called many times per block. It must have no return value. Returns a BS::multi_future that contains the futures for all of the blocks.BS::multi_future<R> submit_sequence(T first_index, T index_after_last, F&& sequence) : Submit a sequence of tasks enumerated by indices to the queue. Returns a BS::multi_future that contains the futures for all of the tasks.void purge() : Purge all the tasks waiting in the queue. Tasks that are currently running will not be affected, but any tasks still waiting in the queue will be discarded, and will never be executed by the threads. Please note that there is no way to restore the purged tasks.R and P , C , and D are template parameters):void wait() : Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are currently running in the threads and those that are still waiting in the queue. However, if the pool is paused, this function only waits for the currently running tasks (otherwise it would wait forever). Note: To wait for just one specific task, use submit_task() instead, and call the wait() member function of the generated future.bool wait_for(std::chrono::duration<R, P>& duration) : Wait for tasks to be completed, but stop waiting after the specified duration has passed. Returns true if all tasks finished running, false if the duration expired but some tasks are still running.bool wait_until(std::chrono::time_point<C, D>& timeout_time) : Wait for tasks to be completed, but stop waiting after the specified time point has been reached. Returns true if all tasks finished running, false if the time point was reached but some tasks are still running. When a BS::thread_pool object goes out of scope, the destructor first waits for all tasks to complete, then destroys all threads. Note that if the pool is paused, then any tasks still in the queue will never be executed.
BS::thread_pool classThe thread pool has several optional features that must be explicitly enabled using macros.
BS_THREAD_POOL_ENABLE_PRIORITY .detach_task() , submit_task() , detach_blocks() , submit_blocks() , detach_loop() , submit_loop() , detach_sequence() , and submit_sequence() . If the priority is not specified, the default value will be 0.BS::priority_t , which is a signed 16-bit integer, so it can have any value between -32,768 and 32,767. The tasks will be executed in priority order from highest to lowest.BS::pr contains some pre-defined priorities: BS::pr::highest , BS::pr::high , BS::pr::normal , BS::pr::low , and BS::pr::lowest .BS_THREAD_POOL_ENABLE_PAUSE . Adds the following member functions:void pause() : Pause the pool. The workers will temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished.void unpause() : Unpause the pool. The workers will resume retrieving new tasks out of the queue.bool is_paused() : Check whether the pool is currently paused.BS_THREAD_POOL_ENABLE_NATIVE_HANDLES . Adds the following member function:std::vector<std::thread::native_handle_type> get_native_handles() : Get a vector containing the underlying implementation-defined thread handles for each of the pool's threads.BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK .wait() , wait_for() , and wait_until() will check whether the user tried to call them from within a thread of the same pool, which would result in a deadlock. If so, they will throw the exception BS::thread_pool::wait_deadlock instead of waiting.BS_THREAD_POOL_DISABLE_EXCEPTION_HANDLING .submit_task() if it is not needed, or if exceptions are explicitly disabled in the codebase.BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK . Disabling exception handling removes the try - catch block from submit_task() , while enabling wait deadlock checks adds a throw expression to wait() , wait_for() , and wait_until() .__cpp_exceptions is undefined, BS_THREAD_POOL_DISABLE_EXCEPTION_HANDLING is automatically defined, and BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK is automatically undefined. BS::this_thread namespace The namespace BS::this_thread provides functionality similar to std::this_thread . It contains the following function objects:
BS::this_thread::get_index() can be used to get the index of the current thread. If this thread belongs to a BS::thread_pool object, it will have an index from 0 to BS::thread_pool::get_thread_count() - 1 . Otherwise, for example if this thread is the main thread or an independent std::thread , std::nullopt will be returned.BS::this_thread::get_pool() can be used to get the pointer to the thread pool that owns the current thread. If this thread belongs to a BS::thread_pool object, a pointer to that object will be returned. Otherwise, std::nullopt will be returned.std::optional object will be returned, of type BS::this_thread::optional_index or BS::this_thread::optional_pool respectively. Unless you are 100% sure this thread is in a pool, first use std::optional::has_value() to check if it contains a value, and if so, use std::optional::value() to obtain that value. BS::multi_future<T> class BS::multi_future<T> is a helper class used to facilitate waiting for and/or getting the results of multiple futures at once. It is defined as a specialization of std::vector<std::future<T>> . This means that all of the member functions that can be used on an std::vector can also be used on a BS::multi_future . For example, you may use a range-based for loop with a BS::multi_future , since it has iterators.
In addition to inherited member functions, BS::multi_future has the following specialized member functions ( R and P , C , and D are template parameters):
[void or std::vector<T>] get() : Get the results from all the futures stored in this BS::multi_future , rethrowing any stored exceptions. If the futures return void , this function returns void as well. If the futures return a type T , this function returns a vector containing the results.size_t ready_count() : Check how many of the futures stored in this BS::multi_future are ready.bool valid() : Check if all the futures stored in this BS::multi_future are valid.void wait() : Wait for all the futures stored in this BS::multi_future .bool wait_for(std::chrono::duration<R, P>& duration) : Wait for all the futures stored in this BS::multi_future , but stop waiting after the specified duration has passed. Returns true if all futures have been waited for before the duration expired, false otherwise.bool wait_until(std::chrono::time_point<C, D>& timeout_time) : Wait for all the futures stored in this multi_future object, but stop waiting after the specified time point has been reached. Returns true if all futures have been waited for before the time point was reached, false otherwise.BS_thread_pool_utils.hpp ) BS::signaller class BS::signaller is a utility class which can be used to allow simple signalling between threads. This class is really just a convenient wrapper around std::promise , which contains both the promise and its future. It has the following member functions:
signaller() : Construct a new signaller.void wait() : Wait until the signaller is ready.void ready() : Inform any waiting threads that the signaller is ready. BS::synced_stream class BS::synced_stream is a utility class which can be used to synchronize printing to an output stream by different threads. It has the following member functions ( T is a template parameter pack):
synced_stream(std::ostream& stream = std::cout) : Construct a new synced stream which prints to the given output stream.void print(T&&... items) : Print any number of items into the output stream. Ensures that no other threads print to this stream simultaneously, as long as they all exclusively use the same synced_stream object to print.void println(T&&... items) : Print any number of items into the output stream, followed by a newline character.In addition, the class comes with two stream manipulators, which are meant to help the compiler figure out which template specializations to use with the class:
BS::synced_stream::endl : An explicit cast of std::endl . Prints a newline character to the stream, and then flushes it. Should only be used if flushing is desired, otherwise a newline character should be used instead.BS::synced_stream::flush : An explicit cast of std::flush . Used to flush the stream. BS::timer class BS::timer is a utility class which can be used to measure execution time for benchmarking purposes. It has the following member functions:
timer() : Construct a new timer and immediately start measuring time.void start() : Start (or restart) measuring time. Note that the timer starts ticking as soon as the object is created, so this is only necessary if we want to restart the clock later.void stop() : Stop measuring time and store the elapsed time since the object was constructed or since start() was last called.std::chrono::milliseconds::rep current_ms() : Get the number of milliseconds that have elapsed since the object was constructed or since start() was last called, but keep the timer ticking.std::chrono::milliseconds::rep ms() : Get the number of milliseconds stored when stop() was last called. This library is under continuous and active development. If you encounter any bugs, or if you would like to request any additional features, please feel free to open a new issue on GitHub and I will look into it as soon as I can.
总是欢迎捐款。 However, I release my projects in cumulative updates after editing and testing them locally on my system, so my policy is not to accept any pull requests. If you open a pull request, and I decide to incorporate your suggestion into the project, I will first modify your code to comply with the project's coding conventions (formatting, syntax, naming, comments, programming practices, etc.), and perform some tests to ensure that the change doesn't break anything. I will then merge it into the next release of the project, possibly together with some other changes. The new release will also include a note in CHANGELOG.md with a link to your pull request, and modifications to the documentation in README.md as needed.
Many GitHub users have helped improve this project, directly or indirectly, via issues, pull requests, comments, and/or personal correspondence. Please see CHANGELOG.md for links to specific issues and pull requests that have been the most helpful. Thank you all for your contribution! :)
If you found this project useful, please consider starring it on GitHub! This allows me to see how many people are using my code, and motivates me to keep working to improve it.
Copyright (c) 2024 Barak Shoshany.根据MIT许可获得许可。
If you use this C++ thread pool library in software of any kind, please provide a link to the GitHub repository in the source code and documentation.
If you use this library in published research, please cite it as follows:
You can use the following BibTeX entry:
@article { Shoshany2024_ThreadPool ,
archiveprefix = { arXiv } ,
author = { Barak Shoshany } ,
doi = { 10.1016/j.softx.2024.101687 } ,
eprint = { 2105.00613 } ,
journal = { SoftwareX } ,
pages = { 101687 } ,
title = { {A C++17 Thread Pool for High-Performance Scientific Computing} } ,
url = { https://www.sciencedirect.com/science/article/pii/S235271102400058X } ,
volume = { 26 } ,
year = { 2024 }
} Please note that the papers on SoftwareX and arXiv are not up to date with the latest version of the library. These publications are only intended to facilitate discovery of this library by scientists, and to enable citing it in scientific research. Documentation for the latest version is provided only by the README.md file in the GitHub repository.
Beginner C++ programmers may be interested in my lecture notes for a course taught at McMaster University, which teach modern C and C++ from scratch, including some of the advanced techniques and programming practices used in developing this library.