Skip to content

Commit 4c2780b

Browse files
committed
Avoid deadlocks because of reusing same pool for creating fibers
Since submit_eth_call_to_pool() is called from rust thread, blocking on fiber promises is not appropriate and can lead to performance issues. Move recovert_authorities() logic inside lambda executed in fiber pool. Since eth call txn has single authority, parallelization logic inside recover_authorities() only intoduces overhead without benefits. Replace parallelization with single call to recover_authority(). Modify recover_senders_and_authorities() to recover senders and authorities directly using recover_sender() and recover_authority() instead of using the pool-based monad::recover_senders() and monad::recover_authorities(). This avoids potential deadlock if all fibers in the pool are occupied, since the function is called from within a fiber pool lambda context." Refactor fiber pools to enable thread sharing between fiber groups. Introduce FiberThreadPool and FiberGroup abstractions that allow multiple fiber groups to share OS threads while maintaining separate task queues and fiber limits. Architecture: PriorityPool = FiberThreadPool + FiberGroup (isolated, dedicated threads) Shared setup = FiberThreadPool + multiple FiberGroups (shared threads) Thread 0 runs a bootstrap fiber that handles fiber creation requests to ensure fibers are created on threads with proper scheduler configuration. Use trace_tx_exec_poo to parallelize trace transaction execution to avoid deadlock because of using same pool.
1 parent 00ecacc commit 4c2780b

19 files changed

+769
-169
lines changed

category/core/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ add_library(
126126
"event/test_event_ctypes_metadata.c"
127127
# fiber
128128
"fiber/config.hpp"
129+
"fiber/fiber_group.cpp"
130+
"fiber/fiber_group.hpp"
131+
"fiber/fiber_thread_pool.cpp"
132+
"fiber/fiber_thread_pool.hpp"
129133
"fiber/priority_algorithm.cpp"
130134
"fiber/priority_algorithm.hpp"
131135
"fiber/priority_pool.cpp"
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright (C) 2025 Category Labs, Inc.
2+
//
3+
// This program is free software: you can redistribute it and/or modify
4+
// it under the terms of the GNU General Public License as published by
5+
// the Free Software Foundation, either version 3 of the License, or
6+
// (at your option) any later version.
7+
//
8+
// This program is distributed in the hope that it will be useful,
9+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
// GNU General Public License for more details.
12+
//
13+
// You should have received a copy of the GNU General Public License
14+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
15+
16+
#include <category/core/fiber/fiber_group.hpp>
17+
18+
#include <category/core/assert.h>
19+
#include <category/core/fiber/config.hpp>
20+
#include <category/core/fiber/fiber_thread_pool.hpp>
21+
#include <category/core/fiber/priority_properties.hpp>
22+
#include <category/core/fiber/priority_task.hpp>
23+
24+
#include <boost/fiber/channel_op_status.hpp>
25+
#include <boost/fiber/fiber.hpp>
26+
#include <boost/fiber/operations.hpp>
27+
#include <boost/fiber/protected_fixedsize_stack.hpp>
28+
29+
#include <cstddef>
30+
#include <future>
31+
#include <memory>
32+
#include <utility>
33+
34+
MONAD_FIBER_NAMESPACE_BEGIN
35+
36+
FiberGroup::FiberGroup(FiberThreadPool &pool, unsigned const n_fibers)
37+
: pool_{pool}
38+
{
39+
MONAD_ASSERT(n_fibers);
40+
41+
pool_.register_group();
42+
fibers_.reserve(n_fibers);
43+
44+
// Create fibers via bootstrap channel so they're created on a thread with
45+
// the proper scheduler configured.
46+
pool_.submit_bootstrap_task([this, n_fibers] {
47+
for (unsigned i = 0; i < n_fibers; ++i) {
48+
auto *const properties = new PriorityProperties{nullptr};
49+
boost::fibers::fiber fiber{
50+
static_cast<boost::fibers::fiber_properties *>(properties),
51+
std::allocator_arg,
52+
boost::fibers::protected_fixedsize_stack{
53+
static_cast<size_t>(8 * 1024 * 1024)},
54+
[this, properties] {
55+
PriorityTask task;
56+
while (channel_.pop(task) ==
57+
boost::fibers::channel_op_status::success) {
58+
properties->set_priority(task.priority);
59+
boost::this_fiber::yield();
60+
task.task();
61+
properties->set_priority(0);
62+
}
63+
}};
64+
fibers_.push_back(std::move(fiber));
65+
}
66+
start_.set_value();
67+
});
68+
69+
start_.get_future().wait();
70+
}
71+
72+
FiberGroup::~FiberGroup()
73+
{
74+
channel_.close();
75+
76+
while (fibers_.size()) {
77+
auto &fiber = fibers_.back();
78+
fiber.join();
79+
fibers_.pop_back();
80+
}
81+
82+
pool_.unregister_group();
83+
}
84+
85+
MONAD_FIBER_NAMESPACE_END
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Copyright (C) 2025 Category Labs, Inc.
2+
//
3+
// This program is free software: you can redistribute it and/or modify
4+
// it under the terms of the GNU General Public License as published by
5+
// the Free Software Foundation, either version 3 of the License, or
6+
// (at your option) any later version.
7+
//
8+
// This program is distributed in the hope that it will be useful,
9+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
// GNU General Public License for more details.
12+
//
13+
// You should have received a copy of the GNU General Public License
14+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
15+
16+
#pragma once
17+
18+
#include <category/core/fiber/config.hpp>
19+
#include <category/core/fiber/priority_task.hpp>
20+
21+
#include <boost/fiber/buffered_channel.hpp>
22+
#include <boost/fiber/fiber.hpp>
23+
24+
#include <functional>
25+
#include <future>
26+
#include <vector>
27+
28+
MONAD_FIBER_NAMESPACE_BEGIN
29+
30+
class FiberThreadPool;
31+
32+
/// FiberGroup represents a group of fibers that execute on a shared
33+
/// FiberThreadPool. Multiple FiberGroup instances can coexist on the same
34+
/// thread pool, allowing different components to have their own task queues
35+
/// and fiber limits while sharing the underlying OS threads.
36+
///
37+
/// Each FiberGroup has:
38+
/// - Its own buffered_channel for task submission
39+
/// - Its own set of fibers that process tasks from the channel
40+
/// - Reference to a shared FiberThreadPool for thread resources
41+
///
42+
/// The fibers in this group can migrate to any thread in the pool via
43+
/// work-stealing through the shared PriorityQueue.
44+
///
45+
/// FiberGroup must be destroyed before the FiberThreadPool it references.
46+
class FiberGroup final
47+
{
48+
FiberThreadPool &pool_;
49+
50+
boost::fibers::buffered_channel<PriorityTask> channel_{1024};
51+
52+
std::vector<boost::fibers::fiber> fibers_{};
53+
54+
std::promise<void> start_{};
55+
56+
friend class FiberThreadPool;
57+
58+
FiberGroup(FiberThreadPool &pool, unsigned n_fibers);
59+
60+
public:
61+
FiberGroup(FiberGroup const &) = delete;
62+
FiberGroup &operator=(FiberGroup const &) = delete;
63+
64+
~FiberGroup();
65+
66+
void submit(uint64_t const priority, std::function<void()> task)
67+
{
68+
channel_.push({priority, std::move(task)});
69+
}
70+
71+
unsigned num_fibers() const
72+
{
73+
return static_cast<unsigned>(fibers_.size());
74+
}
75+
};
76+
77+
MONAD_FIBER_NAMESPACE_END
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Copyright (C) 2025 Category Labs, Inc.
2+
//
3+
// This program is free software: you can redistribute it and/or modify
4+
// it under the terms of the GNU General Public License as published by
5+
// the Free Software Foundation, either version 3 of the License, or
6+
// (at your option) any later version.
7+
//
8+
// This program is distributed in the hope that it will be useful,
9+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
// GNU General Public License for more details.
12+
//
13+
// You should have received a copy of the GNU General Public License
14+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
15+
16+
#include <category/core/fiber/fiber_thread_pool.hpp>
17+
18+
#include <category/core/assert.h>
19+
#include <category/core/fiber/config.hpp>
20+
#include <category/core/fiber/fiber_group.hpp>
21+
#include <category/core/fiber/priority_algorithm.hpp>
22+
#include <category/core/fiber/priority_properties.hpp>
23+
24+
#include <boost/fiber/channel_op_status.hpp>
25+
#include <boost/fiber/fiber.hpp>
26+
#include <boost/fiber/operations.hpp>
27+
#include <boost/fiber/protected_fixedsize_stack.hpp>
28+
29+
#include <atomic>
30+
#include <cstddef>
31+
#include <cstdio>
32+
#include <functional>
33+
#include <memory>
34+
#include <mutex>
35+
#include <thread>
36+
#include <utility>
37+
#include <vector>
38+
39+
#include <pthread.h>
40+
41+
MONAD_FIBER_NAMESPACE_BEGIN
42+
43+
FiberThreadPool::FiberThreadPool(
44+
unsigned const n_threads, bool const prevent_spin)
45+
: prevent_spin_{prevent_spin}
46+
{
47+
MONAD_ASSERT(n_threads);
48+
49+
threads_.reserve(n_threads);
50+
51+
// Create worker threads (1 through n_threads-1) that wait for work via
52+
// the shared queue with priority-based work-stealing scheduler.
53+
for (unsigned i = n_threads - 1; i > 0; --i) {
54+
auto thread = std::thread([this, i] {
55+
char name[16];
56+
std::snprintf(name, 16, "ftpool %u", i);
57+
pthread_setname_np(pthread_self(), name);
58+
59+
boost::fibers::use_scheduling_algorithm<PriorityAlgorithm>(
60+
queue_, prevent_spin_);
61+
62+
std::unique_lock<boost::fibers::mutex> lock{mutex_};
63+
cv_.wait(lock, [this] { return done_; });
64+
});
65+
threads_.push_back(std::move(thread));
66+
}
67+
68+
// Thread 0 runs a bootstrap fiber that handles fiber creation requests
69+
// from FiberGroup constructors.
70+
auto thread = std::thread([this] {
71+
pthread_setname_np(pthread_self(), "ftpool 0");
72+
73+
boost::fibers::use_scheduling_algorithm<PriorityAlgorithm>(
74+
queue_, prevent_spin_);
75+
76+
auto *const properties = new PriorityProperties{nullptr};
77+
boost::fibers::fiber bootstrap_fiber{
78+
static_cast<boost::fibers::fiber_properties *>(properties),
79+
std::allocator_arg,
80+
boost::fibers::protected_fixedsize_stack{
81+
static_cast<size_t>(8 * 1024 * 1024)},
82+
[this] {
83+
std::function<void()> task;
84+
while (bootstrap_channel_.pop(task) ==
85+
boost::fibers::channel_op_status::success) {
86+
task();
87+
}
88+
}};
89+
90+
bootstrap_fiber.detach();
91+
92+
std::unique_lock<boost::fibers::mutex> lock{mutex_};
93+
cv_.wait(lock, [this] { return done_; });
94+
});
95+
threads_.push_back(std::move(thread));
96+
}
97+
98+
FiberThreadPool::~FiberThreadPool()
99+
{
100+
MONAD_ASSERT(
101+
active_groups_.load(std::memory_order_relaxed) == 0 &&
102+
"All FiberGroup instances must be destroyed before FiberThreadPool");
103+
104+
bootstrap_channel_.close();
105+
106+
{
107+
std::unique_lock<boost::fibers::mutex> const lock{mutex_};
108+
done_ = true;
109+
}
110+
111+
cv_.notify_all();
112+
113+
while (threads_.size()) {
114+
auto &thread = threads_.back();
115+
thread.join();
116+
threads_.pop_back();
117+
}
118+
}
119+
120+
std::unique_ptr<FiberGroup>
121+
FiberThreadPool::create_fiber_group(unsigned const n_fibers)
122+
{
123+
MONAD_ASSERT(n_fibers);
124+
MONAD_ASSERT(
125+
!done_ && "Cannot create FiberGroup after FiberThreadPool shutdown");
126+
127+
return std::unique_ptr<FiberGroup>(new FiberGroup{*this, n_fibers});
128+
}
129+
130+
MONAD_FIBER_NAMESPACE_END

0 commit comments

Comments
 (0)