From 3e228690bedef21d7f042efaabed3c7aaafeb098 Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 29 Sep 2023 14:56:44 -0400 Subject: [PATCH 01/12] feature: wip implementation of chase-lev deque work in progress implementation of a chase-lev, lock free, work stealing deque. --- include/thread_pool/thread_pool.h | 29 ++- include/thread_pool/work_stealing_deque.h | 194 ++++++++++++++++++++ test/source/work_stealing_deque.cpp | 206 ++++++++++++++++++++++ 3 files changed, 413 insertions(+), 16 deletions(-) create mode 100644 include/thread_pool/work_stealing_deque.h create mode 100644 test/source/work_stealing_deque.cpp diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 08a4735..5a7bdd4 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -17,11 +17,13 @@ #endif #include "thread_pool/thread_safe_queue.h" +#include "thread_pool/work_stealing_deque.h" namespace dp { namespace details { -#ifdef __cpp_lib_move_only_function + // TODO: use move only function, work stealing deque can't use move only types +#if 0 // __cpp_lib_move_only_function using default_function_type = std::move_only_function; #else using default_function_type = std::function; @@ -48,7 +50,7 @@ namespace dp { do { // invoke the task - while (auto task = tasks_[id].tasks.pop_front()) { + while (auto task = tasks_[id].tasks.pop_top()) { try { pending_tasks_.fetch_sub(1, std::memory_order_release); std::invoke(std::move(task.value())); @@ -56,16 +58,11 @@ namespace dp { } } - // try to steal a task - for (std::size_t j = 1; j < tasks_.size(); ++j) { - const std::size_t index = (id + j) % tasks_.size(); - if (auto task = tasks_[index].tasks.steal()) { - // steal a task - pending_tasks_.fetch_sub(1, std::memory_order_release); - std::invoke(std::move(task.value())); - // stop stealing once we have invoked a stolen task - break; - } + // try to steal a task from our donor + auto donor_index = (id + 1) % tasks_.size(); + if (auto task = tasks_[donor_index].tasks.pop_top()) { + pending_tasks_.fetch_sub(1, std::memory_order_release); + std::invoke(std::move(task.value())); } } while (pending_tasks_.load(std::memory_order_acquire) > 0); @@ -116,8 +113,8 @@ namespace dp { typename ReturnType = std::invoke_result_t> requires std::invocable [[nodiscard]] std::future enqueue(Function f, Args... args) { -#ifdef __cpp_lib_move_only_function - // we can do this in C++23 because we now have support for move only functions +#if 0 // __cpp_lib_move_only_function + // we can do this in C++23 because we now have support for move only functions std::promise promise; auto future = promise.get_future(); auto task = [func = std::move(f), ... largs = std::move(args), @@ -204,12 +201,12 @@ namespace dp { } auto i = *(i_opt); pending_tasks_.fetch_add(1, std::memory_order_relaxed); - tasks_[i].tasks.push_back(std::forward(f)); + tasks_[i].tasks.push_bottom(std::forward(f)); tasks_[i].signal.release(); } struct task_item { - dp::thread_safe_queue tasks{}; + dp::work_stealing_deque tasks{}; std::binary_semaphore signal{0}; }; diff --git a/include/thread_pool/work_stealing_deque.h b/include/thread_pool/work_stealing_deque.h new file mode 100644 index 0000000..44ffaa1 --- /dev/null +++ b/include/thread_pool/work_stealing_deque.h @@ -0,0 +1,194 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace dp { + +#ifdef __cpp_lib_hardware_interference_size + using std::hardware_destructive_interference_size; +#else + // 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ... + inline constexpr std::size_t hardware_destructive_interference_size = + 2 * sizeof(std::max_align_t); +#endif + + /** + * @brief Chase-Lev work stealing queue + * @details Support single producer, multiple consumer. The producer owns the back, consumers + * own the top. Consumers can also take from the top of the queue. The queue is "lock-free" in + * that it does not directly use mutexes or locks. + * + * This is an implementation of the deque described in "Correct and Efficient Work-Stealing for + * Weak Memory Models" and "Dynamic Circular Work-Stealing Deque" by Chase,Lev. + * + */ + template + requires std::is_destructible_v + class work_stealing_deque final { + /** + * @brief Simple circular array buffer that can regrow + * TODO: Leverage std::pmr facilities to automatically allocate/reclaim memory? + */ + class circular_buffer final { + public: + explicit circular_buffer(const std::int64_t size) : size_(size), mask_(size - 1) { + // size must be a power of 2 + assert((size % 2) == 0); + + buffer_ = std::make_unique_for_overwrite(size_); + pointer_.store(buffer_.get(), release); + } + + [[nodiscard]] std::int64_t capacity() const noexcept { return size_; } + + void store(const std::size_t index, T value, std::memory_order order = acquire) noexcept + requires std::is_move_assignable_v + { + auto buf = pointer_.load(order); + buf[index & mask_] = value; + } + + T load(const std::size_t index, std::memory_order order = acquire) noexcept { + auto buf = pointer_.load(order); + return buf[index & mask_]; + } + + /** + * @brief Resize the internal buffer. Copies [start, end) to the new buffer. + * @param start The start index + * @param end The end index + */ + circular_buffer* resize(const std::size_t start, const std::size_t end) { + auto temp = new circular_buffer(size_ * 2); + for (std::size_t i = start; i != end; ++i) { + temp->store(i, load(i)); + } + return temp; + } + + private: + std::int64_t size_; + std::int64_t mask_; + std::atomic pointer_; + std::unique_ptr buffer_; + }; + + constexpr static std::size_t default_count = 1024; + alignas(hardware_destructive_interference_size) std::atomic_int64_t top_; + alignas(hardware_destructive_interference_size) std::atomic_int64_t bottom_; + alignas(hardware_destructive_interference_size) std::atomic buffer_; + + std::vector> garbage_{32}; + + static constexpr std::memory_order relaxed = std::memory_order_relaxed; + static constexpr std::memory_order acquire = std::memory_order_acquire; + static constexpr std::memory_order consume = std::memory_order_consume; + static constexpr std::memory_order release = std::memory_order_release; + static constexpr std::memory_order seq_cst = std::memory_order_seq_cst; + + public: + explicit work_stealing_deque(const std::size_t& capacity = default_count) + : top_(0), bottom_(0), buffer_(new circular_buffer(capacity)) {} + + // queue is non-copyable + work_stealing_deque(work_stealing_deque&) = delete; + work_stealing_deque& operator=(work_stealing_deque&) = delete; + + [[nodiscard]] std::size_t capacity() const { return buffer_.load(relaxed)->capacity(); } + [[nodiscard]] std::size_t size() const { + const auto bottom = bottom_.load(relaxed); + const auto top = top_.load(relaxed); + return static_cast(bottom >= top ? bottom - top : 0); + } + + [[nodiscard]] bool empty() const { return size() == 0; } + template + void push_bottom(Args&&... args) { + // construct first in case it throws + T value(std::forward(args)...); + push_bottom(std::move(value)); + } + + void push_bottom(T value) { + auto bottom = bottom_.load(relaxed); + auto top = top_.load(acquire); + auto buffer = buffer_.load(relaxed); + + if (buffer->capacity() < (bottom - top) + 1) { + garbage_.emplace_back(std::exchange(buffer, buffer->resize(top, bottom))); + buffer_.store(buffer, release); + } + + buffer->store(bottom, std::move(value)); + + // this synchronizes with other acquire fences + // memory operations about this line cannot be reordered + std::atomic_thread_fence(release); + + bottom_.store(bottom + 1, relaxed); + } + + std::optional take_bottom() { + auto bottom = bottom_.load(relaxed) - 1; + auto buffer = buffer_.load(relaxed); + + // prevent stealing + bottom_.store(bottom, relaxed); + + // this synchronizes with other release fences + // memory ops below this line cannot be reordered + std::atomic_thread_fence(acquire); + + auto top = top_.load(relaxed); + if (top <= bottom) { + // queue isn't empty + if (top == bottom) { + // there is only 1 item left in the queue, we need the CAS to succeed + // since another thread may be trying to steal and could steal before we're able + // to take the bottom + if (!top_.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) { + // failed race + bottom_.store(bottom + 1, relaxed); + return std::nullopt; + } + bottom_.store(bottom + 1, relaxed); + } + // there is more than one item in the queue, we can take the bottom + return buffer->load(bottom); + } + // queue is empty, reset bottom + bottom_.store(bottom + 1, relaxed); + return std::nullopt; + } + + std::optional pop_top() { + auto top = top_.load(acquire); + // this synchronizes with other release fences + // memory ops below this line cannot be reordered with ops above this line + std::atomic_thread_fence(acquire); + const auto bottom = bottom_.load(acquire); + + if (top < bottom) { + // non-empty queue + auto buffer = buffer_.load(release); + auto temp = buffer->load(top, acquire); + if (!top_.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) { + // failed the race + return std::nullopt; + } + return temp; + } else { + // deque is empty + return std::nullopt; + } + } + }; +} // namespace dp diff --git a/test/source/work_stealing_deque.cpp b/test/source/work_stealing_deque.cpp new file mode 100644 index 0000000..8f11043 --- /dev/null +++ b/test/source/work_stealing_deque.cpp @@ -0,0 +1,206 @@ +#include +#include + +#include +#include +#include +#include + +TEST_CASE("Construct queue") { dp::work_stealing_deque queue{}; } + +TEST_CASE("Construct and grow queue") { + dp::work_stealing_deque queue{2}; + + queue.push_bottom(1); + queue.push_bottom(2); + queue.push_bottom(3); + + REQUIRE_EQ(queue.capacity(), 4); +} + +TEST_CASE("Take bottom while queue is empty") { + dp::work_stealing_deque queue{}; + + REQUIRE_EQ(queue.take_bottom(), std::nullopt); +} + +TEST_CASE("Take bottom while queue is not empty") { + dp::work_stealing_deque queue{}; + + queue.push_bottom(1); + queue.push_bottom(2); + queue.push_bottom(3); + + REQUIRE_EQ(queue.take_bottom(), 3); + REQUIRE_EQ(queue.take_bottom(), 2); + REQUIRE_EQ(queue.take_bottom(), 1); + REQUIRE_EQ(queue.take_bottom(), std::nullopt); +} + +TEST_CASE("Multiple thread steal single item") { + dp::work_stealing_deque queue{}; + + queue.push_bottom(23567); + std::uint64_t value = 0; + + { + auto thread_task = [&queue, &value]() { + if (const auto temp = queue.pop_top()) { + value = temp.value(); + } + }; + std::jthread t1{thread_task}; + std::jthread t2{thread_task}; + std::jthread t3{thread_task}; + std::jthread t4{thread_task}; + } + + REQUIRE_EQ(value, 23567); +} + +TEST_CASE("Steal std::function while pushing") { + dp::work_stealing_deque> deque{}; + std::atomic_uint64_t count{0}; + constexpr auto max = 64'000; + auto expected_sum = 0; + std::atomic_uint64_t pending_tasks{0}; + std::deque signals; + signals.emplace_back(0); + signals.emplace_back(0); + signals.emplace_back(0); + signals.emplace_back(0); + + auto supply_task = [&] { + for (auto i = 0; i < max; i++) { + deque.push_bottom([&count, i]() { count += i; }); + expected_sum += i; + pending_tasks.fetch_add(1, std::memory_order_release); + // wake all threads + if ((i + 1) % 8000 == 0) { + for (auto& signal : signals) signal.release(); + } + } + }; + + auto task = [&](int id) { + signals[id].acquire(); + while (pending_tasks.load(std::memory_order_acquire) > 0) { + auto value = deque.pop_top(); + if (value.has_value()) { + auto temp = std::move(value.value()); + std::invoke(temp); + pending_tasks.fetch_sub(1, std::memory_order_release); + } + } + }; + + { + std::jthread supplier(supply_task); + std::jthread t1(task, 0); + std::jthread t2(task, 1); + std::jthread t3(task, 2); + std::jthread t4(task, 3); + } + + REQUIRE_EQ(count.load(), expected_sum); +} + +// class move_only { +// int private_value_ = 2; +// +// public: +// move_only() = default; +// ~move_only() = default; +// move_only(move_only&) = delete; +// move_only(move_only&& other) noexcept { private_value_ = other.private_value_ * 2; } +// move_only& operator=(move_only&) = delete; +// move_only& operator=(move_only&& other) noexcept { +// private_value_ = other.private_value_ * 2; +// return *this; +// } +// [[nodiscard]] int secret() const { return private_value_; } +// }; +// +// TEST_CASE("Store move only types") { +// move_only mv_only{}; +// dp::work_stealing_deque deque{}; +// deque.push_bottom(std::move(mv_only)); +// +// const auto value = deque.take_bottom(); +// REQUIRE(value.has_value()); +// REQUIRE_NE(value->secret(), 2); +// } +// +// TEST_CASE("Steal move only type") { +// move_only mv_only{}; +// dp::work_stealing_deque queue{}; +// queue.push_bottom(std::move(mv_only)); +// std::optional value = std::nullopt; +// { +// auto thread_task = [&queue, &value]() { +// if (auto temp = queue.pop_top()) { +// value.emplace(std::move(temp.value())); +// } +// }; +// +// std::jthread t1{thread_task}; +// std::jthread t2{thread_task}; +// std::jthread t3{thread_task}; +// std::jthread t4{thread_task}; +// } +// +// REQUIRE(value.has_value()); +// REQUIRE_NE(value->secret(), 2); +// } +// +// #if __cpp_lib_move_only_function +// +// TEST_CASE("Steal std::move_only_function while pushing") { +// dp::work_stealing_deque> deque{}; +// std::atomic_uint64_t count{0}; +// constexpr auto max = 64'000; +// auto expected_sum = 0; +// std::atomic_uint64_t pending_tasks{0}; +// std::deque signals; +// signals.emplace_back(0); +// signals.emplace_back(0); +// signals.emplace_back(0); +// signals.emplace_back(0); +// +// auto supply_task = [&] { +// for (auto i = 0; i < max; i++) { +// deque.push_bottom([&count, i]() { count += i; }); +// expected_sum += i; +// pending_tasks.fetch_add(1, std::memory_order_release); +// // wake all threads +// if (i % 1000 == 0) { +// for (auto& signal : signals) signal.release(); +// } +// } +// }; +// +// auto task = [&](int id) { +// signals[id].acquire(); +// while (pending_tasks.load(std::memory_order_acquire) > 0) { +// auto value = deque.pop_top(); +// if (value.has_value()) { +// auto temp = std::move(value.value()); +// if (temp) { +// std::invoke(value.value()); +// pending_tasks.fetch_sub(1, std::memory_order_release); +// } +// } +// } +// }; +// +// { +// std::jthread supplier(supply_task); +// std::jthread t1(task, 0); +// std::jthread t2(task, 1); +// std::jthread t3(task, 2); +// std::jthread t4(task, 3); +// } +// +// REQUIRE_EQ(count.load(), expected_sum); +// } +// #endif From 9e3d43b8a00a212960fae005d93d88f4bab08c4a Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 29 Sep 2023 14:56:55 -0400 Subject: [PATCH 02/12] chore: add brace init in example --- examples/mandelbrot/source/main.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/mandelbrot/source/main.cpp b/examples/mandelbrot/source/main.cpp index 38147ad..8d5c417 100644 --- a/examples/mandelbrot/source/main.cpp +++ b/examples/mandelbrot/source/main.cpp @@ -20,7 +20,7 @@ void mandelbrot_threadpool(int image_width, int image_height, int max_iterations std::cout << "calculating mandelbrot" << std::endl; - dp::thread_pool pool; + dp::thread_pool pool{}; std::vector>> futures; futures.reserve(source.height()); const auto start = std::chrono::steady_clock::now(); From 12f70911128305525fcf8dc627f2d858a64f81f1 Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 29 Sep 2023 15:02:38 -0400 Subject: [PATCH 03/12] fix: memory ordering --- include/thread_pool/work_stealing_deque.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/thread_pool/work_stealing_deque.h b/include/thread_pool/work_stealing_deque.h index 44ffaa1..e3d9ab9 100644 --- a/include/thread_pool/work_stealing_deque.h +++ b/include/thread_pool/work_stealing_deque.h @@ -178,7 +178,7 @@ namespace dp { if (top < bottom) { // non-empty queue - auto buffer = buffer_.load(release); + auto buffer = buffer_.load(acquire); auto temp = buffer->load(top, acquire); if (!top_.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) { // failed the race From 07bcab84395e8546ad6f3ccf83131b9188b412f9 Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 19 Jul 2024 14:47:56 -0400 Subject: [PATCH 04/12] fix: minor issues with work stealing deque Fixed minor bugs based on other implementations out there --- include/thread_pool/work_stealing_deque.h | 91 +++++++++++++---------- 1 file changed, 51 insertions(+), 40 deletions(-) diff --git a/include/thread_pool/work_stealing_deque.h b/include/thread_pool/work_stealing_deque.h index e3d9ab9..1bddb97 100644 --- a/include/thread_pool/work_stealing_deque.h +++ b/include/thread_pool/work_stealing_deque.h @@ -29,36 +29,44 @@ namespace dp { * This is an implementation of the deque described in "Correct and Efficient Work-Stealing for * Weak Memory Models" and "Dynamic Circular Work-Stealing Deque" by Chase,Lev. * + * This implementation is taken from the following implementations + * - https://github.com/ConorWilliams/ConcurrentDeque/blob/main/include/riften/deque.hpp + * - https://github.com/taskflow/work-stealing-queue/blob/master/wsq.hpp + * + * I've made some minor edits and changes based on new C++ 23 features and other personal coding + * style choices/preferences. */ template requires std::is_destructible_v class work_stealing_deque final { /** * @brief Simple circular array buffer that can regrow - * TODO: Leverage std::pmr facilities to automatically allocate/reclaim memory? */ class circular_buffer final { + std::int64_t size_; + std::int64_t mask_; + std::unique_ptr buffer_ = std::make_unique_for_overwrite(size_); + public: explicit circular_buffer(const std::int64_t size) : size_(size), mask_(size - 1) { // size must be a power of 2 assert((size % 2) == 0); - - buffer_ = std::make_unique_for_overwrite(size_); - pointer_.store(buffer_.get(), release); } [[nodiscard]] std::int64_t capacity() const noexcept { return size_; } - void store(const std::size_t index, T value, std::memory_order order = acquire) noexcept + void store(const std::size_t index, T&& value) noexcept requires std::is_move_assignable_v { - auto buf = pointer_.load(order); - buf[index & mask_] = value; + buffer_[index & mask_] = std::move(value); } - T load(const std::size_t index, std::memory_order order = acquire) noexcept { - auto buf = pointer_.load(order); - return buf[index & mask_]; + T&& load(const std::size_t index) noexcept { + if constexpr (std::is_move_constructible_v) { + return std::move(buffer_[index & mask_]); + } else { + return buffer_[index & mask_]; + } } /** @@ -73,12 +81,6 @@ namespace dp { } return temp; } - - private: - std::int64_t size_; - std::int64_t mask_; - std::atomic pointer_; - std::unique_ptr buffer_; }; constexpr static std::size_t default_count = 1024; @@ -109,25 +111,27 @@ namespace dp { return static_cast(bottom >= top ? bottom - top : 0); } - [[nodiscard]] bool empty() const { return size() == 0; } + [[nodiscard]] bool empty() const { return !size(); } + template - void push_bottom(Args&&... args) { + void emplace(Args&&... args) { // construct first in case it throws T value(std::forward(args)...); push_bottom(std::move(value)); } - void push_bottom(T value) { + void push_bottom(T&& value) { auto bottom = bottom_.load(relaxed); auto top = top_.load(acquire); - auto buffer = buffer_.load(relaxed); + auto* buffer = buffer_.load(relaxed); - if (buffer->capacity() < (bottom - top) + 1) { + // check if the buffer is full + if (buffer->capacity() - 1 < (bottom - top)) { garbage_.emplace_back(std::exchange(buffer, buffer->resize(top, bottom))); - buffer_.store(buffer, release); + buffer_.store(buffer, relaxed); } - buffer->store(bottom, std::move(value)); + buffer->store(bottom, std::forward(value)); // this synchronizes with other acquire fences // memory operations about this line cannot be reordered @@ -138,18 +142,21 @@ namespace dp { std::optional take_bottom() { auto bottom = bottom_.load(relaxed) - 1; - auto buffer = buffer_.load(relaxed); + auto* buffer = buffer_.load(relaxed); // prevent stealing bottom_.store(bottom, relaxed); // this synchronizes with other release fences // memory ops below this line cannot be reordered - std::atomic_thread_fence(acquire); + std::atomic_thread_fence(seq_cst); + + std::optional item = std::nullopt; auto top = top_.load(relaxed); if (top <= bottom) { // queue isn't empty + item = buffer->load(bottom); if (top == bottom) { // there is only 1 item left in the queue, we need the CAS to succeed // since another thread may be trying to steal and could steal before we're able @@ -157,38 +164,42 @@ namespace dp { if (!top_.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) { // failed race bottom_.store(bottom + 1, relaxed); - return std::nullopt; + item = std::nullopt; } bottom_.store(bottom + 1, relaxed); } - // there is more than one item in the queue, we can take the bottom - return buffer->load(bottom); + } else { + bottom_.store(bottom + 1, relaxed); } - // queue is empty, reset bottom - bottom_.store(bottom + 1, relaxed); - return std::nullopt; + + return item; } + /** + * @brief Steal from the top of the queue + * + * @return std::optional + */ std::optional pop_top() { auto top = top_.load(acquire); // this synchronizes with other release fences // memory ops below this line cannot be reordered with ops above this line - std::atomic_thread_fence(acquire); - const auto bottom = bottom_.load(acquire); + std::atomic_thread_fence(seq_cst); + auto bottom = bottom_.load(acquire); + std::optional item; if (top < bottom) { // non-empty queue - auto buffer = buffer_.load(acquire); - auto temp = buffer->load(top, acquire); + auto* buffer = buffer_.load(consume); + item = buffer->load(top); + if (!top_.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) { // failed the race - return std::nullopt; + item = std::nullopt; } - return temp; - } else { - // deque is empty - return std::nullopt; } + // empty queue + return item; } }; } // namespace dp From c161115e8f5dfd7acc05bb42ff8b67aae8f970f9 Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 19 Jul 2024 14:49:23 -0400 Subject: [PATCH 05/12] fix: usage of new work-stealing deque --- include/thread_pool/thread_pool.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index c9982e7..376daf6 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -22,7 +22,7 @@ namespace dp { namespace details { // TODO: use move only function, work stealing deque can't use move only types -#if 0 // __cpp_lib_move_only_function +#if __cpp_lib_move_only_function using default_function_type = std::move_only_function; #else using default_function_type = std::function; @@ -61,7 +61,7 @@ namespace dp { do { // invoke the task - while (auto task = tasks_[id].tasks.pop_front()) { + while (auto task = tasks_[id].tasks.pop_top()) { // decrement the unassigned tasks as the task is now going // to be executed unassigned_tasks_.fetch_sub(1, std::memory_order_release); @@ -76,7 +76,7 @@ namespace dp { // try to steal a task for (std::size_t j = 1; j < tasks_.size(); ++j) { const std::size_t index = (id + j) % tasks_.size(); - if (auto task = tasks_[index].tasks.steal()) { + if (auto task = tasks_[index].tasks.pop_top()) { // steal a task unassigned_tasks_.fetch_sub(1, std::memory_order_release); std::invoke(std::move(task.value())); @@ -144,7 +144,7 @@ namespace dp { requires std::invocable [[nodiscard]] std::future enqueue(Function f, Args... args) { #if 0 // __cpp_lib_move_only_function - // we can do this in C++23 because we now have support for move only functions + // we can do this in C++23 because we now have support for move only functions std::promise promise; auto future = promise.get_future(); auto task = [func = std::move(f), ... largs = std::move(args), @@ -264,7 +264,7 @@ namespace dp { } // assign work - tasks_[i].tasks.push_back(std::forward(f)); + tasks_[i].tasks.push_bottom(std::forward(f)); tasks_[i].signal.release(); } From 7c5c8fd58758b61dff2b5d9633cc34b32e7c3d0b Mon Sep 17 00:00:00 2001 From: Paul Tsouchlos Date: Wed, 4 Sep 2024 16:53:49 -0400 Subject: [PATCH 06/12] fix: properly check that circular buffer size is a power of 2 --- include/thread_pool/work_stealing_deque.h | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/include/thread_pool/work_stealing_deque.h b/include/thread_pool/work_stealing_deque.h index 1bddb97..3abca77 100644 --- a/include/thread_pool/work_stealing_deque.h +++ b/include/thread_pool/work_stealing_deque.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -43,14 +44,14 @@ namespace dp { * @brief Simple circular array buffer that can regrow */ class circular_buffer final { - std::int64_t size_; - std::int64_t mask_; + std::size_t size_; + std::size_t mask_; std::unique_ptr buffer_ = std::make_unique_for_overwrite(size_); public: - explicit circular_buffer(const std::int64_t size) : size_(size), mask_(size - 1) { + explicit circular_buffer(const std::size_t size) : size_(size), mask_(size - 1) { // size must be a power of 2 - assert((size % 2) == 0); + assert(std::has_single_bit(size)); } [[nodiscard]] std::int64_t capacity() const noexcept { return size_; } From b443661354c940535a890e8d67c92cd4c23b9f86 Mon Sep 17 00:00:00 2001 From: Paul Tsouchlos Date: Sun, 15 Sep 2024 17:15:50 -0400 Subject: [PATCH 07/12] fix: deprecation warnings when building unit tests --- test/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9e21faa..60d5d3f 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -64,7 +64,7 @@ endif() if(NOT TEST_INSTALLED_VERSION) target_compile_options( ${PROJECT_NAME} PRIVATE $<$:-Wall -Wpedantic -Wextra - -Werror> + -Werror -Wno-deprecated> ) target_compile_options(${PROJECT_NAME} PRIVATE $<$:/W4 /WX /wd4324>) target_compile_definitions( From fd12d9c927b4de75debb06744a1f813b1a603100 Mon Sep 17 00:00:00 2001 From: Paul Tsouchlos Date: Sat, 21 Sep 2024 07:13:07 -0600 Subject: [PATCH 08/12] wip: trying to fix issues with work stealing deque --- include/thread_pool/work_stealing_deque.h | 75 ++++---- test/source/work_stealing_deque.cpp | 212 +++++++++++----------- 2 files changed, 146 insertions(+), 141 deletions(-) diff --git a/include/thread_pool/work_stealing_deque.h b/include/thread_pool/work_stealing_deque.h index 3abca77..067ab09 100644 --- a/include/thread_pool/work_stealing_deque.h +++ b/include/thread_pool/work_stealing_deque.h @@ -57,13 +57,14 @@ namespace dp { [[nodiscard]] std::int64_t capacity() const noexcept { return size_; } void store(const std::size_t index, T&& value) noexcept - requires std::is_move_assignable_v + requires std::is_nothrow_move_assignable_v { buffer_[index & mask_] = std::move(value); } T&& load(const std::size_t index) noexcept { - if constexpr (std::is_move_constructible_v) { + if constexpr (std::is_move_constructible_v || + std::is_nothrow_move_constructible_v) { return std::move(buffer_[index & mask_]); } else { return buffer_[index & mask_]; @@ -121,13 +122,18 @@ namespace dp { push_bottom(std::move(value)); } + /** + * @brief Push data to the bottom of the queue. + * @details Only the producer thread can push data to the bottom. Consumers should take data + * from the top. See #pop_top. + */ void push_bottom(T&& value) { auto bottom = bottom_.load(relaxed); auto top = top_.load(acquire); auto* buffer = buffer_.load(relaxed); // check if the buffer is full - if (buffer->capacity() - 1 < (bottom - top)) { + if (buffer->capacity() < (bottom - top) + 1) { garbage_.emplace_back(std::exchange(buffer, buffer->resize(top, bottom))); buffer_.store(buffer, relaxed); } @@ -141,6 +147,33 @@ namespace dp { bottom_.store(bottom + 1, relaxed); } + /** + * @brief Steal from the top of the queue + * + * @return std::optional + */ + std::optional pop_top() { + auto top = top_.load(acquire); + // this synchronizes with other release fences + // memory ops below this line cannot be reordered with ops above this line + std::atomic_thread_fence(seq_cst); + auto bottom = bottom_.load(acquire); + + if (top < bottom) { + // non-empty queue + auto item = buffer_.load(consume)->load(top); + + if (!top_.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) { + // failed the race + return std::nullopt; + } + + return item; + } + // empty queue + return std::nullopt; + } + std::optional take_bottom() { auto bottom = bottom_.load(relaxed) - 1; auto* buffer = buffer_.load(relaxed); @@ -152,12 +185,9 @@ namespace dp { // memory ops below this line cannot be reordered std::atomic_thread_fence(seq_cst); - std::optional item = std::nullopt; - auto top = top_.load(relaxed); if (top <= bottom) { // queue isn't empty - item = buffer->load(bottom); if (top == bottom) { // there is only 1 item left in the queue, we need the CAS to succeed // since another thread may be trying to steal and could steal before we're able @@ -165,42 +195,15 @@ namespace dp { if (!top_.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) { // failed race bottom_.store(bottom + 1, relaxed); - item = std::nullopt; + return std::nullopt; } bottom_.store(bottom + 1, relaxed); } + return buffer->load(bottom); } else { bottom_.store(bottom + 1, relaxed); + return std::nullopt; } - - return item; - } - - /** - * @brief Steal from the top of the queue - * - * @return std::optional - */ - std::optional pop_top() { - auto top = top_.load(acquire); - // this synchronizes with other release fences - // memory ops below this line cannot be reordered with ops above this line - std::atomic_thread_fence(seq_cst); - auto bottom = bottom_.load(acquire); - std::optional item; - - if (top < bottom) { - // non-empty queue - auto* buffer = buffer_.load(consume); - item = buffer->load(top); - - if (!top_.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) { - // failed the race - item = std::nullopt; - } - } - // empty queue - return item; } }; } // namespace dp diff --git a/test/source/work_stealing_deque.cpp b/test/source/work_stealing_deque.cpp index 8f11043..3d9b28a 100644 --- a/test/source/work_stealing_deque.cpp +++ b/test/source/work_stealing_deque.cpp @@ -6,9 +6,11 @@ #include #include -TEST_CASE("Construct queue") { dp::work_stealing_deque queue{}; } +TEST_CASE("work_stealing_deque: construct queue") { + dp::work_stealing_deque queue{}; +} -TEST_CASE("Construct and grow queue") { +TEST_CASE("work_stealing_deque: construct and grow queue") { dp::work_stealing_deque queue{2}; queue.push_bottom(1); @@ -18,13 +20,13 @@ TEST_CASE("Construct and grow queue") { REQUIRE_EQ(queue.capacity(), 4); } -TEST_CASE("Take bottom while queue is empty") { +TEST_CASE("work_stealing_deque: take bottom while queue is empty") { dp::work_stealing_deque queue{}; REQUIRE_EQ(queue.take_bottom(), std::nullopt); } -TEST_CASE("Take bottom while queue is not empty") { +TEST_CASE("work_stealing_deque: take bottom while queue is not empty") { dp::work_stealing_deque queue{}; queue.push_bottom(1); @@ -37,7 +39,7 @@ TEST_CASE("Take bottom while queue is not empty") { REQUIRE_EQ(queue.take_bottom(), std::nullopt); } -TEST_CASE("Multiple thread steal single item") { +TEST_CASE("work_stealing_deque: multiple thread steal single item") { dp::work_stealing_deque queue{}; queue.push_bottom(23567); @@ -58,7 +60,7 @@ TEST_CASE("Multiple thread steal single item") { REQUIRE_EQ(value, 23567); } -TEST_CASE("Steal std::function while pushing") { +TEST_CASE("work_stealing_deque: steal std::function while pushing") { dp::work_stealing_deque> deque{}; std::atomic_uint64_t count{0}; constexpr auto max = 64'000; @@ -105,102 +107,102 @@ TEST_CASE("Steal std::function while pushing") { REQUIRE_EQ(count.load(), expected_sum); } -// class move_only { -// int private_value_ = 2; -// -// public: -// move_only() = default; -// ~move_only() = default; -// move_only(move_only&) = delete; -// move_only(move_only&& other) noexcept { private_value_ = other.private_value_ * 2; } -// move_only& operator=(move_only&) = delete; -// move_only& operator=(move_only&& other) noexcept { -// private_value_ = other.private_value_ * 2; -// return *this; -// } -// [[nodiscard]] int secret() const { return private_value_; } -// }; -// -// TEST_CASE("Store move only types") { -// move_only mv_only{}; -// dp::work_stealing_deque deque{}; -// deque.push_bottom(std::move(mv_only)); -// -// const auto value = deque.take_bottom(); -// REQUIRE(value.has_value()); -// REQUIRE_NE(value->secret(), 2); -// } -// -// TEST_CASE("Steal move only type") { -// move_only mv_only{}; -// dp::work_stealing_deque queue{}; -// queue.push_bottom(std::move(mv_only)); -// std::optional value = std::nullopt; -// { -// auto thread_task = [&queue, &value]() { -// if (auto temp = queue.pop_top()) { -// value.emplace(std::move(temp.value())); -// } -// }; -// -// std::jthread t1{thread_task}; -// std::jthread t2{thread_task}; -// std::jthread t3{thread_task}; -// std::jthread t4{thread_task}; -// } -// -// REQUIRE(value.has_value()); -// REQUIRE_NE(value->secret(), 2); -// } -// -// #if __cpp_lib_move_only_function -// -// TEST_CASE("Steal std::move_only_function while pushing") { -// dp::work_stealing_deque> deque{}; -// std::atomic_uint64_t count{0}; -// constexpr auto max = 64'000; -// auto expected_sum = 0; -// std::atomic_uint64_t pending_tasks{0}; -// std::deque signals; -// signals.emplace_back(0); -// signals.emplace_back(0); -// signals.emplace_back(0); -// signals.emplace_back(0); -// -// auto supply_task = [&] { -// for (auto i = 0; i < max; i++) { -// deque.push_bottom([&count, i]() { count += i; }); -// expected_sum += i; -// pending_tasks.fetch_add(1, std::memory_order_release); -// // wake all threads -// if (i % 1000 == 0) { -// for (auto& signal : signals) signal.release(); -// } -// } -// }; -// -// auto task = [&](int id) { -// signals[id].acquire(); -// while (pending_tasks.load(std::memory_order_acquire) > 0) { -// auto value = deque.pop_top(); -// if (value.has_value()) { -// auto temp = std::move(value.value()); -// if (temp) { -// std::invoke(value.value()); -// pending_tasks.fetch_sub(1, std::memory_order_release); -// } -// } -// } -// }; -// -// { -// std::jthread supplier(supply_task); -// std::jthread t1(task, 0); -// std::jthread t2(task, 1); -// std::jthread t3(task, 2); -// std::jthread t4(task, 3); -// } -// -// REQUIRE_EQ(count.load(), expected_sum); -// } -// #endif +class move_only { + int private_value_ = 2; + + public: + move_only() = default; + ~move_only() = default; + move_only(move_only&) = delete; + move_only(move_only&& other) noexcept { private_value_ = std::move(other.private_value_); } + move_only& operator=(move_only&) = delete; + move_only& operator=(move_only&& other) noexcept { + private_value_ = std::move(other.private_value_); + return *this; + } + [[nodiscard]] int secret() const { return private_value_; } +}; + +TEST_CASE("work_stealing_deque: store move only types") { + move_only mv_only{}; + dp::work_stealing_deque deque{}; + deque.push_bottom(std::move(mv_only)); + + const auto value = deque.take_bottom(); + REQUIRE(value.has_value()); + REQUIRE_NE(value->secret(), 2); +} + +TEST_CASE("work_stealing_deque: steal move only type") { + move_only mv_only{}; + dp::work_stealing_deque queue{}; + queue.push_bottom(std::move(mv_only)); + std::optional value = std::nullopt; + { + auto thread_task = [&queue, &value]() { + if (auto temp = queue.pop_top()) { + value.emplace(std::move(temp.value())); + } + }; + + std::jthread t1{thread_task}; + std::jthread t2{thread_task}; + std::jthread t3{thread_task}; + std::jthread t4{thread_task}; + } + + REQUIRE(value.has_value()); + REQUIRE_NE(value->secret(), 2); +} + +#if __cpp_lib_move_only_function + +TEST_CASE("work_stealing_deque: steal std::move_only_function while pushing") { + dp::work_stealing_deque> deque{}; + std::atomic_uint64_t count{0}; + constexpr auto max = 64'000; + auto expected_sum = 0; + std::atomic_uint64_t pending_tasks{0}; + std::deque signals; + signals.emplace_back(0); + signals.emplace_back(0); + signals.emplace_back(0); + signals.emplace_back(0); + + auto supply_task = [&] { + for (auto i = 0; i < max; i++) { + deque.push_bottom([&count, i]() { count += i; }); + expected_sum += i; + pending_tasks.fetch_add(1, std::memory_order_release); + // wake all threads + if (i % 1000 == 0) { + for (auto& signal : signals) signal.release(); + } + } + }; + + auto task = [&](int id) { + signals[id].acquire(); + while (pending_tasks.load(std::memory_order_acquire) > 0) { + auto value = deque.pop_top(); + if (value.has_value()) { + auto temp = std::move(value.value()); + if (temp) { + std::invoke(value.value()); + pending_tasks.fetch_sub(1, std::memory_order_release); + } + } + } + }; + + { + std::jthread supplier(supply_task); + std::jthread t1(task, 0); + std::jthread t2(task, 1); + std::jthread t3(task, 2); + std::jthread t4(task, 3); + } + + REQUIRE_EQ(count.load(), expected_sum); +} +#endif From 1ac31fcb5b13c71de97d49e0ae4fa446cfe6f43b Mon Sep 17 00:00:00 2001 From: Paul Tsouchlos Date: Sat, 21 Sep 2024 07:13:24 -0600 Subject: [PATCH 09/12] chore: don't warn about hardware interference size --- test/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 60d5d3f..6c9a7a3 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -64,7 +64,7 @@ endif() if(NOT TEST_INSTALLED_VERSION) target_compile_options( ${PROJECT_NAME} PRIVATE $<$:-Wall -Wpedantic -Wextra - -Werror -Wno-deprecated> + -Werror -Wno-deprecated -Wno-interference-size> ) target_compile_options(${PROJECT_NAME} PRIVATE $<$:/W4 /WX /wd4324>) target_compile_definitions( From 1bedebb9ef465386f731cb1ac26ba8cf99ad73e7 Mon Sep 17 00:00:00 2001 From: Paul Tsouchlos Date: Sat, 21 Sep 2024 07:14:14 -0600 Subject: [PATCH 10/12] chore: change strategy when using work stealing deque Added a global task queue in addition to the queues that each thread owns. This should allow us to properly use the SPMC queue we have implemented. --- include/thread_pool/thread_pool.h | 84 +++++++++++++++++++++++++------ 1 file changed, 70 insertions(+), 14 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 376daf6..216046a 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -9,6 +9,7 @@ #include #include #include +#include #ifdef __has_include # if __has_include() # include @@ -20,8 +21,6 @@ namespace dp { namespace details { - - // TODO: use move only function, work stealing deque can't use move only types #if __cpp_lib_move_only_function using default_function_type = std::move_only_function; #else @@ -42,12 +41,16 @@ namespace dp { const unsigned int &number_of_threads = std::thread::hardware_concurrency(), InitializationFunction init = [](std::size_t) {}) : tasks_(number_of_threads) { + producer_id_ = std::this_thread::get_id(); std::size_t current_id = 0; for (std::size_t i = 0; i < number_of_threads; ++i) { priority_queue_.push_back(size_t(current_id)); try { threads_.emplace_back([&, id = current_id, init](const std::stop_token &stop_tok) { + tasks_[id].thread_id = std::this_thread::get_id(); + add_thread_id_to_map(tasks_[id].thread_id, id); + // invoke the init function on the thread try { std::invoke(init, id); @@ -60,7 +63,16 @@ namespace dp { tasks_[id].signal.acquire(); do { - // invoke the task + // execute work from the global queue + // all threads can pull from the top, but the producer thread owns + // the bottom + while (auto task = global_tasks_.pop_top()) { + unassigned_tasks_.fetch_sub(1, std::memory_order_release); + std::invoke(std::move(task.value())); + in_flight_tasks_.fetch_sub(1, std::memory_order_release); + } + + // invoke any tasks from the queue that this thread owns while (auto task = tasks_[id].tasks.pop_top()) { // decrement the unassigned tasks as the task is now going // to be executed @@ -73,7 +85,7 @@ namespace dp { in_flight_tasks_.fetch_sub(1, std::memory_order_release); } - // try to steal a task + // try to steal a task from other threads for (std::size_t j = 1; j < tasks_.size(); ++j) { const std::size_t index = (id + j) % tasks_.size(); if (auto task = tasks_[index].tasks.pop_top()) { @@ -89,6 +101,8 @@ namespace dp { // front and waiting for more work } while (unassigned_tasks_.load(std::memory_order_acquire) > 0); + // the thread finished all its work, so we "notify" by putting this + // thread in front in the priority queue priority_queue_.rotate_to_front(id); // check if all tasks are completed and release the barrier (binary // semaphore) @@ -143,8 +157,8 @@ namespace dp { typename ReturnType = std::invoke_result_t> requires std::invocable [[nodiscard]] std::future enqueue(Function f, Args... args) { -#if 0 // __cpp_lib_move_only_function - // we can do this in C++23 because we now have support for move only functions +#if __cpp_lib_move_only_function + // we can do this in C++23 because we now have support for move only functions std::promise promise; auto future = promise.get_future(); auto task = [func = std::move(f), ... largs = std::move(args), @@ -246,13 +260,45 @@ namespace dp { private: template void enqueue_task(Function &&f) { - auto i_opt = priority_queue_.copy_front_and_rotate_to_back(); - if (!i_opt.has_value()) { - // would only be a problem if there are zero threads - return; + // are we enquing from the producer thread? Or is a worker thread + // enquing to the pool? + auto current_id = std::this_thread::get_id(); + auto is_producer = current_id == producer_id_; + // assign the work + if (is_producer) { + // we push to the global task queue + global_tasks_.emplace(std::forward(f)); + } else { + // This is a violation of the pre-condition. + // We cannot accept work from an arbitrary thread that is not the root producer or a + // worker in the pool + assert(thread_id_to_index_.contains(current_id)); + // assign the task + tasks_[thread_id_to_index_.at(current_id)].tasks.emplace( + std::forward(f)); } - // get the index - auto i = *(i_opt); + + /** + * Now we need to wake up the correct thread. If the thread that is enqueuing the task + * is a worker from the pool, then that thread needs to execute the work. Otherwise we + * need to use the priority queue to use the next available thread. + */ + + // immediately invoked lambda + auto thread_wakeup_index = [&]() -> std::size_t { + if (is_producer) { + auto i_opt = priority_queue_.copy_front_and_rotate_to_back(); + if (!i_opt.has_value()) { + // would only be a problem if there are zero threads + return std::size_t{0}; + } + // get the index + return *(i_opt); + } else { + // get the worker thread id index + return thread_id_to_index_.at(current_id); + } + }(); // increment the unassigned tasks and in flight tasks unassigned_tasks_.fetch_add(1, std::memory_order_release); @@ -264,13 +310,18 @@ namespace dp { } // assign work - tasks_[i].tasks.push_bottom(std::forward(f)); - tasks_[i].signal.release(); + tasks_[thread_wakeup_index].signal.release(); + } + + void add_thread_id_to_map(std::thread::id thread_id, std::size_t index) { + std::lock_guard lock(thread_id_map_mutex_); + thread_id_to_index_.insert_or_assign(thread_id, index); } struct task_item { dp::work_stealing_deque tasks{}; std::binary_semaphore signal{0}; + std::thread::id thread_id; }; std::vector threads_; @@ -279,6 +330,11 @@ namespace dp { // guarantee these get zero-initialized std::atomic_int_fast64_t unassigned_tasks_{0}, in_flight_tasks_{0}; std::atomic_bool threads_complete_signal_{false}; + + std::thread::id producer_id_; + dp::work_stealing_deque global_tasks_{}; + std::mutex thread_id_map_mutex_{}; + std::unordered_map thread_id_to_index_{}; }; /** From 3fdee7ee43971bff977467facb86cea2b61e4abc Mon Sep 17 00:00:00 2001 From: Paul Tsouchlos Date: Sat, 21 Sep 2024 07:14:25 -0600 Subject: [PATCH 11/12] wip: trying to address a build issue --- benchmark/source/matrix_multiplication.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/benchmark/source/matrix_multiplication.cpp b/benchmark/source/matrix_multiplication.cpp index bcfa906..b1ba464 100644 --- a/benchmark/source/matrix_multiplication.cpp +++ b/benchmark/source/matrix_multiplication.cpp @@ -97,11 +97,12 @@ TEST_CASE("matrix_multiplication") { } #endif { - dp::thread_pool> pool{}; + dp::thread_pool> pool{}; run_benchmark(&bench, array_size, iterations, "dp::thread_pool - fu2::unique_function", - [&](const std::vector& a, const std::vector& b) -> void { - pool.enqueue_detach(thread_task, a, b); + [&pool, task = thread_task](const std::vector& a, + const std::vector& b) -> void { + pool.enqueue_detach(std::move(task), a, b); }); } From 3b0b88fe1e824bf90ec7f716fdd27ff2b7bcab8b Mon Sep 17 00:00:00 2001 From: Paul Tsouchlos Date: Fri, 19 Sep 2025 14:55:46 -0400 Subject: [PATCH 12/12] chore: update CPM.cmake version --- cmake/CPM.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/CPM.cmake b/cmake/CPM.cmake index 176748d..c961ea6 100644 --- a/cmake/CPM.cmake +++ b/cmake/CPM.cmake @@ -1,4 +1,4 @@ -set(CPM_DOWNLOAD_VERSION 0.38.1) +set(CPM_DOWNLOAD_VERSION 0.42.0) if(CPM_SOURCE_CACHE) # Expand relative path. This is important if the provided path contains a tilde (~)