diff --git a/include/qt_threadpool.h b/include/qt_threadpool.h index 514853c19..183eb8d2d 100644 --- a/include/qt_threadpool.h +++ b/include/qt_threadpool.h @@ -18,6 +18,7 @@ typedef enum { hw_pool_init_status hw_pool_init(uint32_t num_threads); void hw_pool_destroy(); -void hw_pool_run_on_all(qt_threadpool_func_type func, void *arg); +void run_on_current_pool(qt_threadpool_func_type func, void *arg); +uint32_t get_num_delegated_threads(); #endif diff --git a/src/threadpool.c b/src/threadpool.c index 0af68d70d..c55956bcf 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -75,6 +75,8 @@ typedef struct { #endif } pool_header; +_Thread_local pool_header *delegated_pool; + typedef struct { // 16 byte aligned to allow loading it in one atomic instruction // on architectures where that makes sense (most of them). @@ -252,7 +254,6 @@ static int pooled_thread_func(void *void_arg) { API_FUNC hw_pool_init_status hw_pool_init(uint32_t num_threads) { if unlikely (!num_threads) return POOL_INIT_NO_THREADS_SPECIFIED; uint32_t old = 0u; - assert(num_threads < UINT32_MAX); if unlikely (!atomic_compare_exchange_strong_explicit(&hw_pool.num_threads, &old, num_threads, @@ -281,35 +282,38 @@ API_FUNC hw_pool_init_status hw_pool_init(uint32_t num_threads) { while (i < num_threads) { pooled_thread_control *thread_control = (pooled_thread_control *)(buffer + alignment * (size_t)i); - // Initialize the thread control struct in two 128b atomic writes. - // TODO: It's possible to just do this in a single 256b atomic write on most - // x86 platforms. That may also require increasing the alignment constraints - // for the control_slice. - // TODO: also ifdef in an implementation for platforms that can't do - // lock-free 128b writes or that don't handle mixed-size atomic writes. - // TODO: making some kind of ifunc to handle this initialization is probably - // actually the right way to do it because it's hard to know enough about - // the CPU at compile-time. init_thread_control(thread_control, i, &hw_pool); - int status; + if (i) { + int status; #ifdef QPOOL_USE_PTHREADS - status = pthread_create( - &thread_control->thread, &attr, pooled_thread_func, thread_control); - if unlikely (status) goto cleanup_threads; + status = pthread_create( + &thread_control->thread, &attr, pooled_thread_func, thread_control); + if unlikely (status) goto cleanup_threads; #else - status = - thrd_create(&thread_control->thread, pooled_thread_func, thread_control); - if unlikely (status != thrd_success) goto cleanup_threads; + status = thrd_create( + &thread_control->thread, pooled_thread_func, thread_control); + if unlikely (status != thrd_success) goto cleanup_threads; #endif + } + // Leave the thread object uninitialized for thread 0. + // It needs to be there for the sake of alignment, + // but other than that it's unused. ++i; } #ifdef QPOOL_USE_PTHREADS pthread_attr_destroy(&attr); #endif + delegated_pool = &hw_pool; return POOL_INIT_SUCCESS; cleanup_threads: if (i) { + // Last thread failed to launch, so no need to clean it up. + // If an error was raised it would have been at an iteration + // higher than 0 for the thread create loop since no thread is + // created at 0. uint32_t j = --i; + // current thread does the work of worker zero so + // no need to signal or join for that one. while (i) { // TODO: fix deinit to match new layout and interrupt mechanism. pooled_thread_control *thread_control = @@ -348,21 +352,22 @@ API_FUNC hw_pool_init_status hw_pool_init(uint32_t num_threads) { } API_FUNC __attribute__((no_sanitize("memory"))) void hw_pool_destroy() { + delegated_pool = NULL; uint32_t num_threads = atomic_load_explicit(&hw_pool.num_threads, memory_order_relaxed); char *buffer = atomic_load_explicit(&hw_pool.threads, memory_order_relaxed); size_t alignment = QTHREAD_MAX((size_t)64u, get_cache_line_size()); - uint32_t i = num_threads; + uint32_t i = num_threads - 1u; + // Current thread is thread 0 so no need to notify/join that one. while (i) { - --i; // TODO: fix deinit to match new layout and interrupt mechanism. pooled_thread_control *thread_control = (pooled_thread_control *)(buffer + alignment * (size_t)i); notify_worker_of_termination(thread_control); + --i; } - i = num_threads; + i = num_threads - 1u; while (i) { - --i; pooled_thread_control *thread_control = (pooled_thread_control *)(buffer + alignment * (size_t)i); // TODO: crash informatively if join fails somehow. @@ -371,6 +376,7 @@ API_FUNC __attribute__((no_sanitize("memory"))) void hw_pool_destroy() { #else thrd_join(thread_control->thread, NULL); #endif + --i; } atomic_store_explicit(&hw_pool.threads, NULL, memory_order_relaxed); @@ -378,29 +384,77 @@ API_FUNC __attribute__((no_sanitize("memory"))) void hw_pool_destroy() { atomic_store_explicit(&hw_pool.num_threads, 0, memory_order_release); } +API_FUNC uint32_t get_num_delegated_threads() { + if (delegated_pool) return delegated_pool->num_threads; + // Every thread at least has itself available for work. + return 1; +} + +// Note: current thread fills the role of thread zero in the pool. + API_FUNC void pool_run_on_all(pool_header *pool, qt_threadpool_func_type func, void *arg) { uint32_t num_threads = atomic_load_explicit(&pool->num_threads, memory_order_relaxed); assert(num_threads); - assert(num_threads < UINT32_MAX); - char *buffer = - (char *)atomic_load_explicit(&pool->threads, memory_order_relaxed); - atomic_store_explicit( - &pool->num_active_threads, num_threads, memory_order_relaxed); - init_main_sync(pool); - size_t alignment = QTHREAD_MAX((size_t)64u, get_cache_line_size()); - for (uint32_t i = 0u; - i < atomic_load_explicit(&pool->num_threads, memory_order_relaxed); - i++) { - pooled_thread_control *thread_control = - (pooled_thread_control *)(buffer + alignment * (size_t)i); - launch_work_on_thread(thread_control, func, arg); + if (num_threads > 1u) { + char *buffer = + (char *)atomic_load_explicit(&pool->threads, memory_order_relaxed); + atomic_store_explicit( + &pool->num_active_threads, num_threads - 1u, memory_order_relaxed); + init_main_sync(pool); + size_t alignment = QTHREAD_MAX((size_t)64u, get_cache_line_size()); + for (uint32_t i = 1u; + i < atomic_load_explicit(&pool->num_threads, memory_order_relaxed); + i++) { + pooled_thread_control *thread_control = + (pooled_thread_control *)(buffer + alignment * (size_t)i); + launch_work_on_thread(thread_control, func, arg); + } + } + uint32_t outer_index = context_index; + context_index = 0u; + pool_header *outer_delegated_pool = delegated_pool; + delegated_pool = NULL; + func(arg); + delegated_pool = outer_delegated_pool; + context_index = outer_index; + if (num_threads > 1u) { + // some loops may have threads that take dramatically longer + // so we still suspend, but it's potentially for much less time. + suspend_main_while_working(pool); + } +} + +API_FUNC void run_on_current_pool(qt_threadpool_func_type func, void *arg) { + if (delegated_pool) { + pool_run_on_all(delegated_pool, func, arg); + } else { + uint32_t outer_index = context_index; + context_index = 0; + func(arg); + context_index = outer_index; } - suspend_main_while_working(pool); } API_FUNC void hw_pool_run_on_all(qt_threadpool_func_type func, void *arg) { pool_run_on_all(&hw_pool, func, arg); } +API_FUNC void divide_pool(uint32_t num_groups, ...) { + // TODO: for each group: + // make a new threadpool header for the group + // wake the leader thread and have it: + // update its own thread-local thread pool and index + // re-wake and launch a new iteration loop on its delegated worker + // threads, having them: + // update their thread-local indices then launch their own iteration + // loops + // wait for the other threads in the group to finish (busy or futex?) + // restore its own thread-locals + // signal completion to main via the atomic on the outer pool + // have the main thread act as leader for the first group + // wait for the groups to finish (busy or futex?) + ; +} + diff --git a/test/internal/threadpool.c b/test/internal/threadpool.c index 4c9ac7e14..a27fd7264 100644 --- a/test/internal/threadpool.c +++ b/test/internal/threadpool.c @@ -2,15 +2,19 @@ #include "qt_threadpool.h" static int on_thread_test(void *arg) { + test_check(get_num_delegated_threads() == 1); printf("hello from thread\n"); return 0; } int main() { + test_check(get_num_delegated_threads() == 1); hw_pool_init(2); + test_check(get_num_delegated_threads() == 2); hw_pool_destroy(); + test_check(get_num_delegated_threads() == 1); hw_pool_init(2); - hw_pool_run_on_all(&on_thread_test, NULL); + run_on_current_pool(&on_thread_test, NULL); hw_pool_destroy(); printf("exited successfully\n"); fflush(stdout);