Added task_region - a fork/join task implementation

This commit is contained in:
Lee Clagett
2016-11-21 14:48:42 -05:00
parent dbf2ab56c5
commit f025198f19
8 changed files with 620 additions and 140 deletions

View File

@@ -27,6 +27,7 @@
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "common/thread_group.h"
#include <boost/thread/locks.hpp>
#include <cassert>
#include <limits>
#include <stdexcept>
@@ -35,14 +36,20 @@
namespace tools
{
thread_group::thread_group(std::size_t count) : internal() {
std::size_t thread_group::optimal() {
static_assert(
std::numeric_limits<unsigned>::max() <= std::numeric_limits<std::size_t>::max(),
"unexpected truncation"
);
count = std::min<std::size_t>(count, get_max_concurrency());
count = count ? count - 1 : 0;
const std::size_t hardware = get_max_concurrency();
return hardware ? (hardware - 1) : 0;
}
std::size_t thread_group::optimal_with_max(std::size_t count) {
return count ? std::min(count - 1, optimal()) : 0;
}
thread_group::thread_group(std::size_t count) : internal() {
if (count) {
internal.emplace(count);
}
@@ -52,24 +59,21 @@ thread_group::data::data(std::size_t count)
: threads()
, head{nullptr}
, last(std::addressof(head))
, pending(count)
, mutex()
, has_work()
, finished_work()
, stop(false) {
threads.reserve(count);
while (count--) {
threads.push_back(std::thread(&thread_group::data::run, this));
threads.push_back(boost::thread(&thread_group::data::run, this));
}
}
thread_group::data::~data() noexcept {
{
const std::unique_lock<std::mutex> lock(mutex);
const boost::unique_lock<boost::mutex> lock(mutex);
stop = true;
}
has_work.notify_all();
finished_work.notify_all();
for (auto& worker : threads) {
try {
worker.join();
@@ -78,42 +82,6 @@ thread_group::data::~data() noexcept {
}
}
void thread_group::data::sync() noexcept {
/* This function and `run()` can both throw when acquiring the lock, or in
the dispatched function. It is tough to recover from either, particularly the
lock case. These functions are marked as noexcept so that if either call
throws, the entire process is terminated. Users of the `dispatch` call are
expected to make their functions noexcept, or use std::packaged_task to copy
exceptions so that the process will continue in all but the most pessimistic
cases (std::bad_alloc). This was the existing behavior;
`asio::io_service::run` propogates errors from dispatched calls, and uncaught
exceptions on threads result in process termination. */
assert(!threads.empty());
bool not_first = false;
while (true) {
std::unique_ptr<work> next = nullptr;
{
std::unique_lock<std::mutex> lock(mutex);
pending -= std::size_t(not_first);
not_first = true;
finished_work.notify_all();
if (stop) {
return;
}
next = get_next();
if (next == nullptr) {
finished_work.wait(lock, [this] { return pending == 0 || stop; });
return;
}
}
assert(next->f);
next->f();
}
}
std::unique_ptr<thread_group::data::work> thread_group::data::get_next() noexcept {
std::unique_ptr<work> rc = std::move(head.ptr);
if (rc != nullptr) {
@@ -125,14 +93,35 @@ std::unique_ptr<thread_group::data::work> thread_group::data::get_next() noexcep
return rc;
}
bool thread_group::data::try_run_one() noexcept {
/* This function and `run()` can both throw when acquiring the lock, or in
dispatched function. It is tough to recover from either, particularly the
lock case. These functions are marked as noexcept so that if either call
throws, the entire process is terminated. Users of the `dispatch` call are
expected to make their functions noexcept, or use std::packaged_task to copy
exceptions so that the process will continue in all but the most pessimistic
cases (std::bad_alloc). This was the existing behavior;
`asio::io_service::run` propogates errors from dispatched calls, and uncaught
exceptions on threads result in process termination. */
std::unique_ptr<work> next = nullptr;
{
const boost::unique_lock<boost::mutex> lock(mutex);
next = get_next();
}
if (next) {
assert(next->f);
next->f();
return true;
}
return false;
}
void thread_group::data::run() noexcept {
// see `sync()` source for additional information
// see `try_run_one()` source for additional information
while (true) {
std::unique_ptr<work> next = nullptr;
{
std::unique_lock<std::mutex> lock(mutex);
--pending;
finished_work.notify_all();
boost::unique_lock<boost::mutex> lock(mutex);
has_work.wait(lock, [this] { return head.ptr != nullptr || stop; });
if (stop) {
return;
@@ -149,15 +138,12 @@ void thread_group::data::dispatch(std::function<void()> f) {
std::unique_ptr<work> latest(new work{std::move(f), node{nullptr}});
node* const latest_node = std::addressof(latest->next);
{
const std::unique_lock<std::mutex> lock(mutex);
const boost::unique_lock<boost::mutex> lock(mutex);
assert(last != nullptr);
assert(last->ptr == nullptr);
if (pending == std::numeric_limits<std::size_t>::max()) {
throw std::overflow_error("thread_group exceeded max queue depth");
}
last->ptr = std::move(latest);
last = latest_node;
++pending;
}
has_work.notify_one();
}