Skip to content

Commit 47e9fb4

Browse files
committed
Add session metrics logging for statesync server
Added per-request logging of disk I/O statistics, upsert counts, and throughput metrics to enable performance monitoring and debugging of statesync server operations. Fix the read counter in async io to use 64 bit, 32 bit was prone to overflow. Add counter for number of bytes read in async io. This code was generated using Claude Sonnet 4.5
1 parent 9fdc2f5 commit 47e9fb4

File tree

4 files changed

+98
-18
lines changed

4 files changed

+98
-18
lines changed

category/async/io.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,12 +302,13 @@ AsyncIO::~AsyncIO()
302302
::close(fds_.msgwrite);
303303
}
304304

305-
void AsyncIO::account_read_()
305+
void AsyncIO::account_read_(size_t size)
306306
{
307307
if (++records_.inflight_rd > records_.max_inflight_rd) {
308308
records_.max_inflight_rd = records_.inflight_rd;
309309
}
310310
++records_.nreads;
311+
records_.bytes_read += size;
311312
}
312313

313314
void AsyncIO::submit_request_sqe_(
@@ -520,7 +521,7 @@ size_t AsyncIO::poll_uring_(bool blocking, unsigned poll_rings_mask)
520521
auto const &read = concurrent_read_ios_pending_.front();
521522
submit_request_sqe_(
522523
read.buffer, read.offset, read.op, read.op->io_priority());
523-
account_read_();
524+
account_read_(read.buffer.size());
524525
concurrent_read_ios_pending_.pop_front();
525526
}
526527
}

category/async/io.hpp

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,10 @@ struct IORecord
5353
unsigned max_inflight_rd_scatter{0};
5454
unsigned max_inflight_wr{0};
5555

56-
unsigned nreads{0};
56+
uint64_t nreads{0};
5757
// Reads and scatter reads which got a EAGAIN and were retried
5858
unsigned reads_retried{0};
59+
uint64_t bytes_read{0};
5960
};
6061

6162
class AsyncIO final
@@ -129,7 +130,7 @@ class AsyncIO final
129130
void *uring_data, enum erased_connected_operation::io_priority prio);
130131
void submit_request_(timed_invocation_state *state, void *uring_data);
131132

132-
void account_read_();
133+
void account_read_(size_t size);
133134

134135
void poll_uring_while_submission_queue_full_();
135136
size_t poll_uring_(bool blocking, unsigned poll_rings_mask);
@@ -235,6 +236,16 @@ class AsyncIO final
235236
return records_.inflight_ts.load(std::memory_order_relaxed);
236237
}
237238

239+
uint64_t total_reads_submitted() const noexcept
240+
{
241+
return records_.nreads;
242+
}
243+
244+
uint64_t total_bytes_read() const noexcept
245+
{
246+
return records_.bytes_read;
247+
}
248+
238249
unsigned concurrent_read_io_limit() const noexcept
239250
{
240251
return concurrent_read_io_limit_;
@@ -338,6 +349,8 @@ class AsyncIO final
338349
records_.max_inflight_rd_scatter = 0;
339350
records_.max_inflight_wr = 0;
340351
records_.nreads = 0;
352+
records_.bytes_read = 0;
353+
records_.reads_retried = 0;
341354
}
342355

343356
size_t submit_read_request(
@@ -355,8 +368,9 @@ class AsyncIO final
355368
return size_t(-1); // we never complete immediately
356369
}
357370

371+
auto const size = buffer.size();
358372
submit_request_(buffer, offset, uring_data, uring_data->io_priority());
359-
account_read_();
373+
account_read_(size);
360374
return size_t(-1); // we never complete immediately
361375
}
362376

@@ -659,7 +673,7 @@ class AsyncIO final
659673
using erased_connected_operation_ptr =
660674
AsyncIO::erased_connected_operation_unique_ptr_type;
661675

662-
static_assert(sizeof(AsyncIO) == 272);
676+
static_assert(sizeof(AsyncIO) == 296);
663677
static_assert(alignof(AsyncIO) == 8);
664678

665679
namespace detail

category/statesync/statesync_server.cpp

Lines changed: 76 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// You should have received a copy of the GNU General Public License
1414
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1515

16+
#include <category/async/io.hpp>
1617
#include <category/core/assert.h>
1718
#include <category/core/basic_formatter.hpp>
1819
#include <category/core/byte_string.hpp>
@@ -116,7 +117,8 @@ byte_string from_prefix(uint64_t const prefix, size_t const n_bytes)
116117

117118
bool send_deletion(
118119
monad_statesync_server *const sync, monad_sync_request const &rq,
119-
monad_statesync_server_context &ctx)
120+
monad_statesync_server_context &ctx, uint64_t *const num_upserts,
121+
uint64_t *const upsert_bytes)
120122
{
121123
MONAD_ASSERT(
122124
rq.old_target <= rq.target || rq.old_target == INVALID_BLOCK_NUM);
@@ -125,8 +127,10 @@ bool send_deletion(
125127
return true;
126128
}
127129

128-
auto const fn = [sync, prefix = from_prefix(rq.prefix, rq.prefix_bytes)](
129-
Deletion const &deletion) {
130+
auto const fn = [sync,
131+
prefix = from_prefix(rq.prefix, rq.prefix_bytes),
132+
num_upserts,
133+
upsert_bytes](Deletion const &deletion) {
130134
auto const &[addr, key] = deletion;
131135
auto const hash = keccak256(addr.bytes);
132136
byte_string_view const view{hash.bytes, sizeof(hash.bytes)};
@@ -141,6 +145,8 @@ bool send_deletion(
141145
sizeof(addr),
142146
nullptr,
143147
0);
148+
++(*num_upserts);
149+
*upsert_bytes += sizeof(addr);
144150
}
145151
else {
146152
auto const skey = rlp::encode_bytes32_compact(key.value());
@@ -151,6 +157,8 @@ bool send_deletion(
151157
sizeof(addr),
152158
skey.data(),
153159
skey.size());
160+
++(*num_upserts);
161+
*upsert_bytes += sizeof(addr) + skey.size();
154162
}
155163
};
156164

@@ -165,6 +173,17 @@ bool send_deletion(
165173
bool statesync_server_handle_request(
166174
monad_statesync_server *const sync, monad_sync_request const rq)
167175
{
176+
uint64_t disk_ios_start = 0;
177+
uint64_t disk_bytes_start = 0;
178+
auto const *io = monad::async::AsyncIO::thread_instance();
179+
if (io != nullptr) {
180+
disk_ios_start = io->total_reads_submitted();
181+
disk_bytes_start = io->total_bytes_read();
182+
}
183+
184+
uint64_t num_upserts = 0;
185+
uint64_t upsert_bytes = 0;
186+
168187
struct Traverse final : public TraverseMachine
169188
{
170189
unsigned char nibble;
@@ -174,16 +193,21 @@ bool statesync_server_handle_request(
174193
NibblesView prefix;
175194
uint64_t from;
176195
uint64_t until;
196+
uint64_t *num_upserts;
197+
uint64_t *upsert_bytes;
177198

178199
Traverse(
179200
monad_statesync_server *const sync, NibblesView const prefix,
180-
uint64_t const from, uint64_t const until)
201+
uint64_t const from, uint64_t const until,
202+
uint64_t *const num_upserts, uint64_t *const upsert_bytes)
181203
: nibble{INVALID_BRANCH}
182204
, depth{0}
183205
, sync{sync}
184206
, prefix{prefix}
185207
, from{from}
186208
, until{until}
209+
, num_upserts{num_upserts}
210+
, upsert_bytes{upsert_bytes}
187211
{
188212
}
189213

@@ -235,13 +259,11 @@ bool statesync_server_handle_request(
235259
unsigned char const *const v1 =
236260
nullptr,
237261
uint64_t const size1 = 0) {
262+
uint64_t const size2 = node.value().size();
238263
sync->statesync_server_send_upsert(
239-
sync->net,
240-
type,
241-
v1,
242-
size1,
243-
node.value().data(),
244-
node.value().size());
264+
sync->net, type, v1, size1, node.value().data(), size2);
265+
++(*num_upserts);
266+
*upsert_bytes += size1 + size2;
245267
};
246268

247269
if (nibble == CODE_NIBBLE) {
@@ -332,9 +354,11 @@ bool statesync_server_handle_request(
332354
val.size(),
333355
nullptr,
334356
0);
357+
++num_upserts;
358+
upsert_bytes += val.size();
335359
}
336360

337-
if (!send_deletion(sync, rq, *ctx)) {
361+
if (!send_deletion(sync, rq, *ctx, &num_upserts, &upsert_bytes)) {
338362
return false;
339363
}
340364

@@ -350,12 +374,25 @@ bool statesync_server_handle_request(
350374
}
351375

352376
[[maybe_unused]] auto const begin = std::chrono::steady_clock::now();
353-
Traverse traverse(sync, NibblesView{bytes}, rq.from, rq.until);
377+
Traverse traverse(
378+
sync,
379+
NibblesView{bytes},
380+
rq.from,
381+
rq.until,
382+
&num_upserts,
383+
&upsert_bytes);
354384
if (!db.traverse(finalized_root, traverse, rq.target)) {
355385
return false;
356386
}
357387
[[maybe_unused]] auto const end = std::chrono::steady_clock::now();
358388

389+
uint64_t disk_ios_submitted = 0;
390+
uint64_t disk_bytes_total = 0;
391+
if (io != nullptr) {
392+
disk_ios_submitted = io->total_reads_submitted() - disk_ios_start;
393+
disk_bytes_total = io->total_bytes_read() - disk_bytes_start;
394+
}
395+
359396
LOG_INFO(
360397
"processed request prefix={} prefix_bytes={} target={} from={} "
361398
"until={} "
@@ -369,6 +406,33 @@ bool statesync_server_handle_request(
369406
std::chrono::duration_cast<std::chrono::microseconds>(end - start),
370407
std::chrono::duration_cast<std::chrono::microseconds>(end - begin));
371408

409+
auto const elapsed_seconds =
410+
std::chrono::duration_cast<std::chrono::duration<double>>(end - start)
411+
.count();
412+
double disk_ios_per_sec = 0.0;
413+
double disk_bytes_per_sec = 0.0;
414+
double upsert_bytes_per_sec = 0.0;
415+
if (elapsed_seconds > 0.0) {
416+
disk_ios_per_sec =
417+
static_cast<double>(disk_ios_submitted) / elapsed_seconds;
418+
disk_bytes_per_sec =
419+
static_cast<double>(disk_bytes_total) / elapsed_seconds;
420+
upsert_bytes_per_sec =
421+
static_cast<double>(upsert_bytes) / elapsed_seconds;
422+
}
423+
424+
LOG_INFO(
425+
"session metrics: disk_ios={} disk_bytes={} num_upserts={} "
426+
"upsert_bytes={} | disk_ios/s={:.1f} disk_bytes/s={:.1f} "
427+
"upsert_bytes/s={:.1f}",
428+
disk_ios_submitted,
429+
disk_bytes_total,
430+
num_upserts,
431+
upsert_bytes,
432+
disk_ios_per_sec,
433+
disk_bytes_per_sec,
434+
upsert_bytes_per_sec);
435+
372436
return true;
373437
}
374438

category/statesync/test/test_statesync.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,6 +1047,7 @@ TEST_F(StateSyncFixture, benchmark)
10471047
handle_target(cctx, hdr);
10481048
run();
10491049
EXPECT_TRUE(monad_statesync_client_finalize(cctx));
1050+
quill::flush();
10501051
}
10511052

10521053
TEST(Deletions, history_length)

0 commit comments

Comments
 (0)