Skip to content

Commit

Permalink
[Feature] Support setting concurrency for thread pool token (#6237)
Browse files Browse the repository at this point in the history
Now we can submit a group of tasks using thread pool token, and limit
the max concurrency of this task group
  • Loading branch information
morningman authored Jul 21, 2021
1 parent 7592f52 commit 327e31c
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 44 deletions.
47 changes: 41 additions & 6 deletions be/src/util/threadpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,20 @@ Status ThreadPoolBuilder::build(std::unique_ptr<ThreadPool>* pool) const {
return Status::OK();
}

ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode)
ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode, int max_concurrency)
: _mode(mode),
_pool(pool),
_state(State::IDLE),
_not_running_cond(&pool->_lock),
_active_threads(0) {}
_active_threads(0),
_max_concurrency(max_concurrency),
_num_submitted_tasks(0),
_num_unsubmitted_tasks(0) {

if (max_concurrency == 1 && mode != ThreadPool::ExecutionMode::SERIAL) {
_mode = ThreadPool::ExecutionMode::SERIAL;
}
}

ThreadPoolToken::~ThreadPoolToken() {
shutdown();
Expand Down Expand Up @@ -240,6 +248,11 @@ const char* ThreadPoolToken::state_to_string(State s) {
return "<cannot reach here>";
}

bool ThreadPoolToken::need_dispatch() {
return _state == ThreadPoolToken::State::IDLE
|| (_mode == ThreadPool::ExecutionMode::CONCURRENT && _num_submitted_tasks < _max_concurrency);
}

ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
: _name(builder._name),
_min_threads(builder._min_threads),
Expand Down Expand Up @@ -294,6 +307,7 @@ void ThreadPool::shutdown() {
// wanting to access the ThreadPool. The task's destructors may acquire
// locks, etc, so this also prevents lock inversions.
_queue.clear();

std::deque<std::deque<Task>> to_release;
for (auto* t : _tokens) {
if (!t->_entries.empty()) {
Expand Down Expand Up @@ -336,9 +350,9 @@ void ThreadPool::shutdown() {
}
}

std::unique_ptr<ThreadPoolToken> ThreadPool::new_token(ExecutionMode mode) {
std::unique_ptr<ThreadPoolToken> ThreadPool::new_token(ExecutionMode mode, int max_concurrency) {
MutexLock unique_lock(&_lock);
std::unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode));
std::unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode, max_concurrency));
InsertOrDie(&_tokens, t.get());
return t;
}
Expand Down Expand Up @@ -416,11 +430,22 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token
ThreadPoolToken::State state = token->state();
DCHECK(state == ThreadPoolToken::State::IDLE || state == ThreadPoolToken::State::RUNNING);
token->_entries.emplace_back(std::move(task));
if (state == ThreadPoolToken::State::IDLE || token->mode() == ExecutionMode::CONCURRENT) {
// When we need to execute the task in the token, we submit the token object to the queue.
// There are currently two places where tokens will be submitted to the queue:
// 1. When submitting a new task, if the token is still in the IDLE state,
// or the concurrency of the token has not reached the online level, it will be added to the queue.
// 2. When the dispatch thread finishes executing a task:
// 1. If it is a SERIAL token, and there are unsubmitted tasks, submit them to the queue.
// 2. If it is a CONCURRENT token, and there are still unsubmitted tasks, and the upper limit of concurrency is not reached,
// then submitted to the queue.
if (token->need_dispatch()) {
_queue.emplace_back(token);
++token->_num_submitted_tasks;
if (state == ThreadPoolToken::State::IDLE) {
token->transition(ThreadPoolToken::State::RUNNING);
}
} else {
++token->_num_unsubmitted_tasks;
}
_total_queued_tasks++;

Expand Down Expand Up @@ -563,16 +588,26 @@ void ThreadPool::dispatch_thread() {
ThreadPoolToken::State state = token->state();
DCHECK(state == ThreadPoolToken::State::RUNNING ||
state == ThreadPoolToken::State::QUIESCING);
if (--token->_active_threads == 0) {
--token->_active_threads;
--token->_num_submitted_tasks;
if (token->_active_threads == 0) {
if (state == ThreadPoolToken::State::QUIESCING) {
DCHECK(token->_entries.empty());
token->transition(ThreadPoolToken::State::QUIESCED);
} else if (token->_entries.empty()) {
token->transition(ThreadPoolToken::State::IDLE);
} else if (token->mode() == ExecutionMode::SERIAL) {
_queue.emplace_back(token);
++token->_num_submitted_tasks;
--token->_num_unsubmitted_tasks;
}
} else if (token->mode() == ExecutionMode::CONCURRENT && token->_num_submitted_tasks < token->_max_concurrency
&& token->_num_unsubmitted_tasks > 0) {
_queue.emplace_back(token);
++token->_num_submitted_tasks;
--token->_num_unsubmitted_tasks;
}

if (--_active_threads == 0) {
_idle_cond.notify_all();
}
Expand Down
22 changes: 18 additions & 4 deletions be/src/util/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ class ThreadPool {
SERIAL,

// Tasks submitted via this token may be executed concurrently.
CONCURRENT,
CONCURRENT
};
std::unique_ptr<ThreadPoolToken> new_token(ExecutionMode mode);
std::unique_ptr<ThreadPoolToken> new_token(ExecutionMode mode, int max_concurrency = INT_MAX);

// Return the number of threads currently running (or in the process of starting up)
// for this thread pool.
Expand Down Expand Up @@ -362,6 +362,13 @@ class ThreadPoolToken {
// Returns true if all submissions are complete, false otherwise.
bool wait_for(const MonoDelta& delta);

bool need_dispatch();

size_t num_tasks() {
MutexLock l(&_pool->_lock);
return _entries.size();
}

private:
// All possible token states. Legal state transitions:
// IDLE -> RUNNING: task is submitted via token
Expand Down Expand Up @@ -400,7 +407,7 @@ class ThreadPoolToken {
// Constructs a new token.
//
// The token may not outlive its thread pool ('pool').
ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode);
ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode, int max_concurrency = INT_MAX);

// Changes this token's state to 'new_state' taking actions as needed.
void transition(State new_state);
Expand All @@ -418,7 +425,7 @@ class ThreadPoolToken {
ThreadPool::ExecutionMode mode() const { return _mode; }

// Token's configured execution mode.
const ThreadPool::ExecutionMode _mode;
ThreadPool::ExecutionMode _mode;

// Pointer to the token's thread pool.
ThreadPool* _pool;
Expand All @@ -436,6 +443,13 @@ class ThreadPoolToken {
// Number of worker threads currently executing tasks belonging to this
// token.
int _active_threads;
// The max number of tasks that can be ran concurrenlty. This is to limit
// the concurrency of a thread pool token, and default is INT_MAX(no limited)
int _max_concurrency;
// Number of tasks which has been submitted to the thread pool's queue.
int _num_submitted_tasks;
// Number of tasks which has not been submitted to the thread pool's queue.
int _num_unsubmitted_tasks;

DISALLOW_COPY_AND_ASSIGN(ThreadPoolToken);
};
Expand Down
85 changes: 51 additions & 34 deletions be/test/util/threadpool_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -739,44 +739,61 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) {
total_num_tokens_submitted.load());
}

/*
TEST_F(ThreadPoolTest, TestLIFOThreadWakeUps) {
const int kNumThreads = 10;
static void MyFunc(int idx, int n) {
std::cout << idx << ", " << std::this_thread::get_id() << " before sleep " << n << " seconds" << std::endl;
sleep(n);
std::cout << idx << ", " << std::this_thread::get_id() << " after sleep " << n << " seconds" << std::endl;
}

// Test with a pool that allows for kNumThreads concurrent threads.
ASSERT_OK(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
.set_max_threads(kNumThreads)).ok());
TEST_F(ThreadPoolTest, TestNormal) {
std::unique_ptr<ThreadPool> thread_pool;
ThreadPoolBuilder("my_pool")
.set_min_threads(0)
.set_max_threads(5)
.set_max_queue_size(10)
.set_idle_timeout(MonoDelta::FromMilliseconds(2000))
.build(&thread_pool);

std::unique_ptr<ThreadPoolToken> token1 = thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT, 2);
for (int i = 0; i < 10; i++) {
token1->submit_func(std::bind(&MyFunc, i, 1));
}
std::cout << "after submit 1" << std::endl;
token1->wait();
ASSERT_EQ(0, token1->num_tasks());

// Submit kNumThreads slow tasks and unblock them, in order to produce
// kNumThreads worker threads.
CountDownLatch latch(1);
SCOPED_CLEANUP({
latch.CountDown();
});
for (int i = 0; i < kNumThreads; i++) {
ASSERT_OK(pool_->submit(SlowTask::new_slow_task(&latch)).ok());
std::unique_ptr<ThreadPoolToken> token2 = thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT, 20);
for (int i = 0; i < 10; i++) {
token2->submit_func(std::bind(&MyFunc, i, 1));
}
ASSERT_EQ(kNumThreads, _pool->num_threads());
latch.count_down();
pool_->wait();
// The kNumThreads threads are idle and waiting for the idle timeout.
// Submit a slow trickle of lightning fast tasks.
//
// If the threads are woken up in FIFO order, this trickle is enough to
// prevent all of them from idling and the AssertEventually will time out.
//
// If LIFO order is used, the same thread will be reused for each task and
// the other threads will eventually time out.
AssertEventually([&]() {
ASSERT_OK(_pool->submit_func([](){}).ok());
SleepFor(MonoDelta::FromMilliseconds(10));
ASSERT_EQ(1, _pool->num_threads());
}, MonoDelta::FromSeconds(10), AssertBackoff::NONE);
NO_PENDING_FATALS();
std::cout << "after submit 2" << std::endl;
token2->wait();
ASSERT_EQ(0, token2->num_tasks());

std::unique_ptr<ThreadPoolToken> token3 = thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT, 1);
for (int i = 0; i < 10; i++) {
token3->submit_func(std::bind(&MyFunc, i, 1));
}
std::cout << "after submit 3" << std::endl;
token3->wait();
ASSERT_EQ(0, token3->num_tasks());

std::unique_ptr<ThreadPoolToken> token4 = thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL);
for (int i = 0; i < 10; i++) {
token4->submit_func(std::bind(&MyFunc, i, 1));
}
std::cout << "after submit 4" << std::endl;
token4->wait();
ASSERT_EQ(0, token4->num_tasks());

std::unique_ptr<ThreadPoolToken> token5 = thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT, 20);
for (int i = 0; i < 10; i++) {
token5->submit_func(std::bind(&MyFunc, i, 1));
}
std::cout << "after submit 5" << std::endl;
token5->shutdown();
ASSERT_EQ(0, token5->num_tasks());
}
*/

} // namespace doris

Expand Down

0 comments on commit 327e31c

Please sign in to comment.