Skip to content

Query Stat Azure v3 #2310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 57 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
8f1cbd4
Foundation of stats query
phoebusm Feb 25, 2025
b914eb3
More C++ changes
phoebusm Feb 27, 2025
9b2a3eb
More polishing
phoebusm Feb 28, 2025
463b9ba
Some boilerplate code to beautify the output
phoebusm Feb 28, 2025
e6b654b
Slight performace improvement and remove useless function
phoebusm Feb 28, 2025
d6694f7
Beautify the output and clear some python warning
phoebusm Feb 28, 2025
9a6867a
Simplification
phoebusm Feb 28, 2025
c6ba469
Address PR comments
phoebusm Mar 4, 2025
1651d6e
Add support of filtering stats with object
phoebusm Mar 4, 2025
bae315b
Support context manager usage
phoebusm Mar 4, 2025
9755f09
More tests
phoebusm Mar 4, 2025
b5f05fc
Address some PR comments
phoebusm Mar 4, 2025
d0deeb7
Update docstring
phoebusm Mar 5, 2025
da31689
Tests on stats reset
phoebusm Mar 5, 2025
c30f185
In-place stats storage C++ changes
phoebusm Mar 10, 2025
3bd8142
Some docstrings
phoebusm Mar 10, 2025
48d05d3
More docstring and checking
phoebusm Mar 10, 2025
310cc3a
Support python multithread
phoebusm Mar 10, 2025
696dc62
Some bug fix
phoebusm Mar 11, 2025
04fe32b
Better chart and naming
phoebusm Mar 11, 2025
48bb964
Remove obsolete log
phoebusm Mar 11, 2025
4791497
Add C++ test for demo
phoebusm Mar 11, 2025
76f1e34
Better naming
phoebusm Mar 12, 2025
dc73c93
Addressing PR comment snapshot
phoebusm Mar 13, 2025
d8d4147
Better printing
phoebusm Mar 13, 2025
b240011
Better printing
phoebusm Mar 13, 2025
682ce3f
Bug fix
phoebusm Mar 13, 2025
9ad4ff5
typo
phoebusm Mar 13, 2025
44f87d9
typo
phoebusm Mar 13, 2025
cadd55e
typo
phoebusm Mar 13, 2025
aab9219
Bug fix
phoebusm Mar 13, 2025
58c4b66
python test update
phoebusm Mar 13, 2025
fc14727
Fix cpp test
phoebusm Mar 13, 2025
dedf0f0
Set qs' function free in python!
phoebusm Mar 13, 2025
ac87910
Update some tests
phoebusm Mar 13, 2025
911e9ce
Update docstring
phoebusm Mar 13, 2025
2a5e374
Update some comments
phoebusm Mar 13, 2025
8e70769
Fix minor issues
phoebusm Mar 13, 2025
7127c18
Update time limit
phoebusm Mar 14, 2025
3eaafb6
Make the limit losser
phoebusm Mar 14, 2025
2b46548
More static schema
phoebusm Apr 7, 2025
e216376
Fix linking error
phoebusm Apr 7, 2025
b0721bf
Remove universal consturctor
phoebusm Apr 8, 2025
32d2228
Raw pointer to shared_ptr
phoebusm Apr 8, 2025
d9a1c17
correct C++ test sync
phoebusm Apr 8, 2025
e41acd7
Minor move update and add mutex
phoebusm Apr 8, 2025
2c76810
Indent
phoebusm Apr 8, 2025
88aab78
Remove query stats map
phoebusm Apr 8, 2025
5583265
Fix C++ tests
phoebusm Apr 8, 2025
d78ba4c
Add python bindings
phoebusm Apr 8, 2025
cc4f271
Remove comment
phoebusm Apr 9, 2025
95b7a16
Fix python test
phoebusm Apr 9, 2025
4b3f74f
Remove obsolete import
phoebusm Apr 9, 2025
93091f5
Simplify time calculation
phoebusm Apr 9, 2025
6bcb690
Add s3 storage ops stat
phoebusm Apr 9, 2025
6329be3
Add support of logical keys and compressed sizes
phoebusm Apr 9, 2025
d420a4b
Support query stat on azure
phoebusm Apr 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ set(arcticdb_srcs
util/type_traits.hpp
util/variant.hpp
util/gil_safe_py_none.hpp
util/query_stats.hpp
version/de_dup_map.hpp
version/op_log.hpp
version/schema_checks.hpp
Expand Down Expand Up @@ -520,9 +521,10 @@ set(arcticdb_srcs
util/timer.cpp
util/trace.cpp
util/type_handler.cpp
util/gil_safe_py_none.cpp
util/query_stats.cpp
version/key_block.hpp
version/key_block.cpp
util/gil_safe_py_none.cpp
version/local_versioned_engine.cpp
version/schema_checks.cpp
version/op_log.cpp
Expand Down
7 changes: 7 additions & 0 deletions cpp/arcticdb/async/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ void TaskScheduler::init(){
TaskScheduler::instance_ = std::make_shared<TaskSchedulerPtrWrapper>(new TaskScheduler);
}

bool TaskScheduler::tasks_pending() {
auto cpu_stats = cpu_exec_.getPoolStats();
auto io_stats = io_exec_.getPoolStats();
return cpu_stats.activeThreadCount != 0 || cpu_stats.pendingTaskCount != 0 ||
io_stats.activeThreadCount != 0 || io_stats.pendingTaskCount != 0;
}

TaskSchedulerPtrWrapper::~TaskSchedulerPtrWrapper() {
delete ptr_;
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/arcticdb/async/task_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ class TaskScheduler {
return io_exec_.addFuture(std::move(task));
}

bool tasks_pending();

static std::shared_ptr<TaskSchedulerPtrWrapper> instance_;
static std::once_flag init_flag_;
static std::once_flag shutdown_flag_;
Expand Down
32 changes: 30 additions & 2 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <arcticdb/util/constructors.hpp>
#include <arcticdb/codec/codec.hpp>
#include <arcticdb/util/test/random_throw.hpp>
#include <arcticdb/util/query_stats.hpp>

#include <type_traits>
#include <ranges>
Expand Down Expand Up @@ -93,7 +94,12 @@ struct EncodeAtomTask : BaseTask {
storage::KeySegmentPair encode() {
ARCTICDB_DEBUG(log::codec(), "Encoding object with partial key {}", partial_key_);
ARCTICDB_DEBUG_THROW(5)
QUERY_STATS_SET_KEY_TYPE(partial_key_.key_type)
QUERY_STATS_SET_TASK_TYPE(Encode)
QUERY_STATS_ADD_LOGICAL_KEYS(segment_);
QUERY_STATS_ADD(uncompressed_size_bytes, segment_.num_bytes());
auto enc_seg = ::arcticdb::encode_dispatch(std::move(segment_), *codec_meta_, encoding_version_);
QUERY_STATS_ADD(compressed_size_bytes, enc_seg.calculate_size());
auto content_hash = get_segment_hash(enc_seg);

AtomKey k = partial_key_.build_key(creation_ts_, content_hash);
Expand Down Expand Up @@ -125,7 +131,12 @@ struct EncodeSegmentTask : BaseTask {
ARCTICDB_MOVE_ONLY_DEFAULT(EncodeSegmentTask)

storage::KeySegmentPair encode() {
QUERY_STATS_SET_KEY_TYPE(variant_key_type(key_))
QUERY_STATS_SET_TASK_TYPE(Encode)
QUERY_STATS_ADD_LOGICAL_KEYS(segment_)
QUERY_STATS_ADD(uncompressed_size_bytes, segment_.num_bytes())
auto enc_seg = ::arcticdb::encode_dispatch(std::move(segment_), *codec_meta_, encoding_version_);
QUERY_STATS_ADD(compressed_size_bytes, enc_seg.calculate_size())
return {std::move(key_), std::move(enc_seg)};
}

Expand Down Expand Up @@ -160,7 +171,12 @@ struct EncodeRefTask : BaseTask {
ARCTICDB_MOVE_ONLY_DEFAULT(EncodeRefTask)

[[nodiscard]] storage::KeySegmentPair encode() {
QUERY_STATS_SET_KEY_TYPE(key_type_)
QUERY_STATS_SET_TASK_TYPE(Encode)
QUERY_STATS_ADD_LOGICAL_KEYS(segment_)
QUERY_STATS_ADD(uncompressed_size_bytes, segment_.num_bytes());
auto enc_seg = ::arcticdb::encode_dispatch(std::move(segment_), *codec_meta_, encoding_version_);
QUERY_STATS_ADD(compressed_size_bytes, enc_seg.calculate_size());
auto k = RefKey{id_, key_type_};
return {std::move(k), std::move(enc_seg)};
}
Expand Down Expand Up @@ -430,8 +446,15 @@ struct DecodeSegmentTask : BaseTask {
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment with key {}",
variant_key_view(key_seg.variant_key()));
QUERY_STATS_SET_KEY_TYPE(variant_key_type(key_seg.variant_key()))
QUERY_STATS_SET_TASK_TYPE(Decode)
QUERY_STATS_ADD(compressed_size_bytes, key_seg.segment_ptr()->size());
auto segment = decode_segment(*key_seg.segment_ptr());
QUERY_STATS_ADD(uncompressed_size_bytes, segment.num_bytes());

return {key_seg.variant_key(), decode_segment(*key_seg.segment_ptr())};
QUERY_STATS_ADD_LOGICAL_KEYS(segment);

return {key_seg.variant_key(), std::move(segment)};
}
};

Expand All @@ -451,7 +474,12 @@ struct DecodeSliceTask : BaseTask {
pipelines::SegmentAndSlice operator()(storage::KeySegmentPair&& key_segment_pair) {
ARCTICDB_SAMPLE(DecodeSliceTask, 0)
ARCTICDB_DEBUG(log::memory(), "Decode into slice {}", key_segment_pair.variant_key());
return decode_into_slice(std::move(key_segment_pair));
QUERY_STATS_SET_KEY_TYPE(variant_key_type(key_segment_pair.variant_key()))
QUERY_STATS_SET_TASK_TYPE(Decode)
QUERY_STATS_ADD(compressed_size_bytes, key_segment_pair.segment_ptr()->size());
auto result = decode_into_slice(std::move(key_segment_pair));
QUERY_STATS_ADD(uncompressed_size_bytes, result.segment_in_memory_.num_bytes());
return result;
}

private:
Expand Down
64 changes: 63 additions & 1 deletion cpp/arcticdb/async/test/test_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
#include <arcticdb/storage/library_index.hpp>
#include <arcticdb/storage/storage_factory.hpp>
#include <arcticdb/async/async_store.hpp>
#include <arcticdb/util/test/config_common.hpp>
#include <arcticdb/pipeline/frame_slice.hpp>
#include <arcticdb/stream/test/stream_test_common.hpp>
#include <arcticdb/util/test/config_common.hpp>
#include <arcticdb/util/random.h>
#include <arcticdb/util/query_stats.hpp>

#include <fmt/format.h>

Expand Down Expand Up @@ -141,6 +142,67 @@ TEST(Async, CollectWithThrow) {
ARCTICDB_DEBUG(log::version(), "Collect returned");
}

TEST(Async, StatsQueryDemo) {
using namespace arcticdb::util::query_stats;
class EnableQueryStatsRAII {
public:
EnableQueryStatsRAII() {
QueryStats::instance().enable();
}
~EnableQueryStatsRAII() {
QueryStats::instance().disable();
QueryStats::instance().reset_stats();
}
};
EnableQueryStatsRAII enable_query_stats;
async::TaskScheduler sched{20, 20};
auto work = [&]() {
std::vector<folly::Future<folly::Unit>> stuff;
{
stuff.push_back(sched.submit_cpu_task(MaybeThrowTask(false))
.thenValue([](auto) {
QUERY_STATS_SET_KEY_TYPE(KeyType::SYMBOL_LIST)
QUERY_STATS_SET_TASK_TYPE(S3_ListObjectsV2)
std::this_thread::sleep_for(std::chrono::milliseconds(1)); // For verifying call duration calculation
QUERY_STATS_ADD(count, 1)
QUERY_STATS_ADD(count, 1)
QUERY_STATS_ADD(result_count, 123)
return folly::Unit{};
})
.via(&async::io_executor()) // switching executor in the chain won't create a new child level
);
stuff.push_back(sched.submit_io_task(MaybeThrowTask(false))
.thenValue([](auto) {
QUERY_STATS_SET_KEY_TYPE(KeyType::SYMBOL_LIST)
QUERY_STATS_SET_TASK_TYPE(S3_ListObjectsV2)
QUERY_STATS_ADD(count, 1)
QUERY_STATS_ADD(result_count, 456)
return folly::Unit{};
})
.thenValue([](auto) {
throw std::runtime_error("Test exception"); // Exception will not affect query stats
}).thenValue([](auto) {
// Below won't be logged as preceeding task throws
QUERY_STATS_SET_KEY_TYPE(KeyType::SYMBOL_LIST)
QUERY_STATS_SET_TASK_TYPE(S3_ListObjectsV2)
QUERY_STATS_ADD(count, 1)
QUERY_STATS_ADD(result_count, 9999)
return folly::Unit{};
})
);
folly::collectAll(stuff).get();
}
};
std::thread t1(work), t2(work); // mimic multithreading at python level
t1.join();
t2.join();
auto& result = QueryStats::instance().get_stats(&sched);
auto& op_stats = result.keys_stats_[static_cast<size_t>(KeyType::SYMBOL_LIST)][static_cast<size_t>(TaskType::S3_ListObjectsV2)];
ASSERT_TRUE(op_stats.total_time_ms_ > 0);
ASSERT_EQ(op_stats.count_, 6);
ASSERT_EQ(op_stats.result_count_, 1158);
}

using IndexSegmentReader = int;

int get_index_segment_reader_impl(arcticdb::StreamId id) {
Expand Down
5 changes: 5 additions & 0 deletions cpp/arcticdb/pipeline/read_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <arcticdb/util/magic_num.hpp>
#include <arcticdb/codec/segment_identifier.hpp>
#include <arcticdb/util/spinlock.hpp>
#include <arcticdb/util/query_stats.hpp>
#include <arcticdb/pipeline/string_reducers.hpp>
#include <arcticdb/pipeline/read_query.hpp>

Expand Down Expand Up @@ -884,11 +885,15 @@ folly::Future<SegmentInMemory> fetch_data(
keys_and_continuations.emplace_back(row.slice_and_key().key(),
[row=row, frame=frame, dynamic_schema=dynamic_schema, shared_data, &handler_data, read_query, read_options](auto &&ks) mutable {
auto key_seg = std::forward<storage::KeySegmentPair>(ks);
QUERY_STATS_SET_KEY_TYPE(variant_key_type(key_seg.variant_key()))
QUERY_STATS_SET_TASK_TYPE(Decode)
QUERY_STATS_ADD(compressed_size_bytes, key_seg.segment_ptr()->size());
if(dynamic_schema) {
decode_into_frame_dynamic(frame, row, key_seg.segment(), shared_data, handler_data, read_query, read_options);
} else {
decode_into_frame_static(frame, row, key_seg.segment(), shared_data, handler_data, read_query, read_options);
}
QUERY_STATS_ADD(uncompressed_size_bytes, frame.num_bytes());

return key_seg.variant_key();
});
Expand Down
Loading
Loading