Skip to content

Commit e845e46

Browse files
committed
Do not throw exception from PythonHandlerData's destructor. Move the
call to apply_global_refcounts to a single place in adapt_read_df(s). Create all handler data in the python bindings.
1 parent 2d03dba commit e845e46

18 files changed

+105
-102
lines changed

cpp/arcticdb/column_store/test/ingestion_stress_test.cpp

+3-4
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
#include <gtest/gtest.h>
99
#include <string>
1010
#include <algorithm>
11-
#include <fmt/format.h>
1211

1312
#include <arcticdb/util/random.h>
1413
#include <arcticdb/util/timer.hpp>
@@ -128,7 +127,7 @@ TEST_F(IngestionStressStore, ScalarIntAppend) {
128127
auto read_query = std::make_shared<ReadQuery>();
129128
read_query->row_filter = universal_range();
130129
register_native_handler_data_factory();
131-
auto handler_data = get_type_handler_data(OutputFormat::NATIVE);
130+
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE);
132131
auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ro, handler_data);
133132
GTEST_COUT << "columns in res: " << read_result.frame_data.index_columns().size();
134133
}
@@ -216,7 +215,7 @@ TEST_F(IngestionStressStore, ScalarIntDynamicSchema) {
216215
auto read_query = std::make_shared<ReadQuery>();
217216
read_query->row_filter = universal_range();
218217
register_native_handler_data_factory();
219-
auto handler_data = get_type_handler_data(OutputFormat::NATIVE);
218+
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE);
220219
auto read_result = test_store_->read_dataframe_version_internal(symbol, VersionQuery{}, read_query, read_options, handler_data);
221220
}
222221

@@ -269,7 +268,7 @@ TEST_F(IngestionStressStore, DynamicSchemaWithStrings) {
269268
auto read_query = std::make_shared<ReadQuery>();
270269
read_query->row_filter = universal_range();
271270
register_native_handler_data_factory();
272-
auto handler_data = get_type_handler_data(OutputFormat::NATIVE);
271+
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE);
273272
auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, read_options, handler_data);
274273
ARCTICDB_DEBUG(log::version(), "result columns: {}", read_result.frame_data.names());
275274
}

cpp/arcticdb/entity/output_format.hpp

-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77

88
#pragma once
99

10-
#include <cstdint>
11-
1210
namespace arcticdb {
1311
enum class OutputFormat : uint8_t {
1412
NATIVE,

cpp/arcticdb/pipeline/pipeline_utils.hpp

+7-5
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
#include <arcticdb/pipeline/column_mapping.hpp>
1616
#include <arcticdb/column_store/memory_segment.hpp>
1717
#include <arcticdb/util/type_handler.hpp>
18-
19-
#include "python/python_handler_data.hpp"
18+
#include <arcticdb/python/python_handler_data.hpp>
2019

2120
namespace arcticdb::pipelines {
2221

@@ -37,7 +36,12 @@ inline void apply_type_handlers(SegmentInMemory seg, std::any& handler_data, Out
3736
}
3837
}
3938

40-
inline ReadResult read_result_from_single_frame(FrameAndDescriptor& frame_and_desc, const AtomKey& key, OutputFormat output_format) {
39+
inline ReadResult read_result_from_single_frame(
40+
FrameAndDescriptor& frame_and_desc,
41+
const AtomKey& key,
42+
std::any& handler_data,
43+
OutputFormat output_format
44+
) {
4145
auto pipeline_context = std::make_shared<PipelineContext>(frame_and_desc.frame_.descriptor());
4246
SliceAndKey sk{FrameSlice{frame_and_desc.frame_},key};
4347
pipeline_context->slice_and_keys_.emplace_back(std::move(sk));
@@ -50,10 +54,8 @@ inline ReadResult read_result_from_single_frame(FrameAndDescriptor& frame_and_de
5054
pipeline_context->begin()->set_string_pool(frame_and_desc.frame_.string_pool_ptr());
5155
auto descriptor = std::make_shared<StreamDescriptor>(frame_and_desc.frame_.descriptor());
5256
pipeline_context->begin()->set_descriptor(std::move(descriptor));
53-
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(output_format);
5457
reduce_and_fix_columns(pipeline_context, frame_and_desc.frame_, ReadOptions{}, handler_data).get();
5558
apply_type_handlers(frame_and_desc.frame_, handler_data, output_format);
56-
apply_global_refcounts(handler_data, output_format);
5759
return create_python_read_result(VersionedItem{key}, output_format, std::move(frame_and_desc));
5860
}
5961

cpp/arcticdb/python/adapt_read_dataframe.hpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,14 @@
99

1010
#include <arcticdb/python/python_utils.hpp>
1111
#include <arcticdb/entity/read_result.hpp>
12+
#include <arcticdb/python/python_handler_data.hpp>
1213

1314
namespace arcticdb {
1415

15-
inline auto adapt_read_df = [](ReadResult && ret) -> py::tuple{
16+
inline py::tuple adapt_read_df(ReadResult&& ret, std::pair<std::any&, OutputFormat>* const handler) {
17+
if (handler) {
18+
apply_global_refcounts(handler->first, handler->second);
19+
}
1620
auto pynorm = python_util::pb_to_python(ret.norm_meta);
1721
auto pyuser_meta = python_util::pb_to_python(ret.user_meta);
1822
auto multi_key_meta = python_util::pb_to_python(ret.multi_key_meta);

cpp/arcticdb/python/python_handler_data.hpp

-10
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
#include <pybind11/pybind11.h>
44
#include <folly/ThreadCachedInt.h>
5-
#include <arcticdb/python/python_utils.hpp>
65

76
#include <memory>
87

@@ -59,15 +58,6 @@ struct PythonHandlerData {
5958
Py_INCREF(py_nan_->ptr());
6059
}
6160
}
62-
63-
~PythonHandlerData() {
64-
internal::check<ErrorCode::E_ASSERTION_FAILURE>(none_refcount_, "None refcount must not be null");
65-
const size_t none_count = none_refcount_->readFull();
66-
internal::check<ErrorCode::E_ASSERTION_FAILURE>(none_count == 0, "None refcount not applied. {} more to be applied", none_count);
67-
internal::check<ErrorCode::E_ASSERTION_FAILURE>(nan_refcount_, "None refcount must not be null");
68-
const size_t nan_count = none_refcount_->readFull();
69-
internal::check<ErrorCode::E_ASSERTION_FAILURE>(nan_count == 0, "NaN refcount not applied. {} more to be applied", nan_count);
70-
}
7161
private:
7262
std::shared_ptr<folly::ThreadCachedInt<uint64_t>> none_refcount_ = std::make_shared<folly::ThreadCachedInt<uint64_t>>();
7363
std::shared_ptr<folly::ThreadCachedInt<uint64_t>> nan_refcount_ = std::make_shared<folly::ThreadCachedInt<uint64_t>>();

cpp/arcticdb/python/python_utils.hpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <arcticdb/stream/stream_reader.hpp>
1818
#include <arcticdb/util/variant.hpp>
1919
#include <arcticdb/python/python_utils.hpp>
20+
#include <arcticdb/python/python_handler_data.hpp>
2021

2122
namespace py = pybind11;
2223

@@ -238,7 +239,7 @@ class PyTimestampRange {
238239
timestamp end_;
239240
};
240241

241-
inline py::list adapt_read_dfs(std::vector<std::variant<ReadResult, DataError>>&& r) {
242+
inline py::list adapt_read_dfs(std::vector<std::variant<ReadResult, DataError>>&& r, std::pair<std::any&, OutputFormat>* const handler) {
242243
auto ret = std::move(r);
243244
py::list lst;
244245
for (auto &res: ret) {
@@ -256,6 +257,9 @@ inline py::list adapt_read_dfs(std::vector<std::variant<ReadResult, DataError>>&
256257
}
257258
);
258259
}
260+
if (handler) {
261+
apply_global_refcounts(handler->first, handler->second);
262+
}
259263
return lst;
260264
}
261265

cpp/arcticdb/storage/test/test_memory_storage.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ TEST(InMemory, ReadTwice) {
2929

3030
auto read_query = std::make_shared<ReadQuery>();
3131
register_native_handler_data_factory();
32-
auto handler_data = get_type_handler_data(OutputFormat::NATIVE);
32+
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE);
3333
auto read_result1 = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data);
3434
auto read_result2 = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data);
3535
}

cpp/arcticdb/toolbox/library_tool.cpp

+4-5
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,17 @@ async::AsyncStore<>& LibraryTool::async_store() {
3434
return dynamic_cast<async::AsyncStore<>&>(*store());
3535
}
3636

37-
ReadResult LibraryTool::read(const VariantKey& key) {
37+
ReadResult LibraryTool::read(const VariantKey& key, std::any& handler_data, OutputFormat output_format) {
3838
auto segment = read_to_segment(key);
3939
auto segment_in_memory = decode_segment(segment);
4040
auto frame_and_descriptor = frame_and_descriptor_from_segment(std::move(segment_in_memory));
41-
auto atom_key = util::variant_match(
41+
const auto& atom_key = util::variant_match(
4242
key,
4343
[](const AtomKey& key){return key;},
4444
// We construct a dummy atom key in case of a RefKey to be able to build the read_result
45-
[](const RefKey& key){return AtomKeyBuilder().build<KeyType::VERSION_REF>(key.id());},
46-
[](const auto&){});
45+
[](const RefKey& key){return AtomKeyBuilder().build<KeyType::VERSION_REF>(key.id());});
4746

48-
return pipelines::read_result_from_single_frame(frame_and_descriptor, atom_key, OutputFormat::PANDAS);
47+
return pipelines::read_result_from_single_frame(frame_and_descriptor, atom_key, handler_data, output_format);
4948
}
5049

5150
Segment LibraryTool::read_to_segment(const VariantKey& key) {

cpp/arcticdb/toolbox/library_tool.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class LibraryTool {
3434
public:
3535
explicit LibraryTool(std::shared_ptr<storage::Library> lib);
3636

37-
ReadResult read(const VariantKey& key);
37+
ReadResult read(const VariantKey& key, std::any& handler_data, OutputFormat output_format);
3838

3939
Segment read_to_segment(const VariantKey& key);
4040

cpp/arcticdb/toolbox/python_bindings.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,10 @@ void register_bindings(py::module &m, py::exception<arcticdb::ArcticException>&
6363
.def("batch_key_exists", &LibraryTool::batch_key_exists, py::call_guard<SingleThreadMutexHolder>())
6464
.def("read_to_read_result",
6565
[&](LibraryTool& lt, const VariantKey& key){
66-
return adapt_read_df(lt.read(key));
66+
constexpr OutputFormat output_format = OutputFormat::PANDAS;
67+
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(output_format);
68+
std::pair<std::any&, OutputFormat> handler{handler_data, output_format};
69+
return adapt_read_df(lt.read(key, handler_data, output_format), &handler);
6770
},
6871
"Read the most recent dataframe from the store")
6972
.def("inspect_env_variable", &LibraryTool::inspect_env_variable)

cpp/arcticdb/util/type_handler.hpp

-4
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,5 @@ inline std::shared_ptr<TypeHandler> get_type_handler(OutputFormat output_format,
152152
return TypeHandlerRegistry::instance()->get_handler(output_format, target);
153153
}
154154

155-
inline std::any get_type_handler_data(OutputFormat output_format) {
156-
return TypeHandlerRegistry::instance()->get_handler_data(output_format);
157-
}
158-
159155

160156
}

cpp/arcticdb/version/local_versioned_engine.cpp

+8-14
Original file line numberDiff line numberDiff line change
@@ -1109,22 +1109,16 @@ VersionedItem LocalVersionedEngine::defragment_symbol_data(const StreamId& strea
11091109
return versioned_item;
11101110
}
11111111

1112-
std::vector<ReadVersionOutput> LocalVersionedEngine::batch_read_keys(const std::vector<AtomKey> &keys) {
1113-
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::PANDAS);
1112+
std::vector<ReadVersionOutput> LocalVersionedEngine::batch_read_keys(const std::vector<AtomKey> &keys, std::any& handler_data) {
11141113
std::vector<ReadVersionOutput> result;
1115-
{
1116-
py::gil_scoped_release release_gil;
1117-
std::vector<folly::Future<ReadVersionOutput>> read_futures;
1118-
read_futures.reserve(keys.size());
1119-
for (const auto& index_key: keys) {
1120-
read_futures.emplace_back(read_frame_for_version(store(), {index_key}, std::make_shared<ReadQuery>(), ReadOptions{}, handler_data));
1121-
}
1122-
Allocator::instance()->trim();
1123-
result = folly::collect(read_futures).get();
1114+
std::vector<folly::Future<ReadVersionOutput>> read_futures;
1115+
read_futures.reserve(keys.size());
1116+
py::gil_scoped_release release_gil;
1117+
for (const auto& index_key: keys) {
1118+
read_futures.emplace_back(read_frame_for_version(store(), {index_key}, std::make_shared<ReadQuery>(), ReadOptions{}, handler_data));
11241119
}
1125-
apply_global_refcounts(handler_data, OutputFormat::PANDAS);
1126-
return result;
1127-
1120+
Allocator::instance()->trim();
1121+
return folly::collect(read_futures).get();
11281122
}
11291123

11301124
std::vector<std::variant<ReadVersionOutput, DataError>> LocalVersionedEngine::batch_read_internal(

cpp/arcticdb/version/local_versioned_engine.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ class LocalVersionedEngine : public VersionedEngine {
288288
bool prune_previous_versions,
289289
bool upsert);
290290

291-
std::vector<ReadVersionOutput> batch_read_keys(const std::vector<AtomKey> &keys);
291+
std::vector<ReadVersionOutput> batch_read_keys(const std::vector<AtomKey> &keys, std::any& handler_data);
292292

293293
std::vector<std::variant<ReadVersionOutput, DataError>> batch_read_internal(
294294
const std::vector<StreamId>& stream_ids,

cpp/arcticdb/version/python_bindings.cpp

+25-16
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@
2222
#include <arcticdb/python/adapt_read_dataframe.hpp>
2323
#include <arcticdb/version/schema_checks.hpp>
2424
#include <arcticdb/util/pybind_mutex.hpp>
25-
26-
#include "python/python_handler_data.hpp"
25+
#include <arcticdb/python/python_handler_data.hpp>
2726

2827

2928
namespace arcticdb::version_store {
@@ -233,7 +232,10 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
233232
version.def("write_dataframe_to_file", &write_dataframe_to_file);
234233
version.def("read_dataframe_from_file",
235234
[] (StreamId sid, std::string path, std::shared_ptr<ReadQuery>& read_query, const ReadOptions& read_options){
236-
return adapt_read_df(read_dataframe_from_file(sid, path, read_query, read_options));
235+
const OutputFormat output_format = read_options.output_format();
236+
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(output_format);
237+
std::pair<std::any&, OutputFormat> handler{handler_data, output_format};
238+
return adapt_read_df(read_dataframe_from_file(sid, path, read_query, read_options, handler_data), &handler);
237239
});
238240

239241
using FrameDataWrapper = arcticdb::pipelines::FrameDataWrapper;
@@ -572,7 +574,9 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
572574
py::call_guard<SingleThreadMutexHolder>(), "Drop column stats")
573575
.def("read_column_stats_version",
574576
[&](PythonVersionStore& v, StreamId sid, const VersionQuery& version_query){
575-
return adapt_read_df(v.read_column_stats_version(sid, version_query));
577+
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::PANDAS);
578+
std::pair<std::any&, OutputFormat> handler{handler_data, OutputFormat::PANDAS};
579+
return adapt_read_df(v.read_column_stats_version(sid, version_query, handler_data), &handler);
576580
},
577581
py::call_guard<SingleThreadMutexHolder>(), "Read the column stats")
578582
.def("get_column_stats_info_version",
@@ -679,17 +683,19 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
679683
&PythonVersionStore::write_dataframe_specific_version,
680684
py::call_guard<SingleThreadMutexHolder>(), "Write a specific version of this dataframe to the store")
681685
.def("read_dataframe_version",
682-
[&](PythonVersionStore& v, StreamId sid, const VersionQuery& version_query, const std::shared_ptr<ReadQuery>& read_query, const ReadOptions& read_options) {
683-
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format());
684-
py::list result = adapt_read_df(v.read_dataframe_version(sid, version_query, read_query, read_options, handler_data));
685-
apply_global_refcounts(handler_data, read_options.output_format());
686-
return result;
686+
[&](PythonVersionStore& v, StreamId sid, const VersionQuery& version_query, const std::shared_ptr<ReadQuery>& read_query, const ReadOptions& read_options) {
687+
const OutputFormat output_format = read_options.output_format();
688+
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(output_format);
689+
std::pair<std::any&, OutputFormat> handler{handler_data, output_format};
690+
return adapt_read_df(v.read_dataframe_version(sid, version_query, read_query, read_options, handler_data), &handler);
687691
},
688692
py::call_guard<SingleThreadMutexHolder>(),
689693
"Read the specified version of the dataframe from the store")
690694
.def("read_index",
691695
[&](PythonVersionStore& v, StreamId sid, const VersionQuery& version_query){
692-
return adapt_read_df(v.read_index(sid, version_query));
696+
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::PANDAS);
697+
std::pair<std::any&, OutputFormat> handler{handler_data, OutputFormat::PANDAS};
698+
return adapt_read_df(v.read_index(sid, version_query, handler_data), &handler);
693699
},
694700
py::call_guard<SingleThreadMutexHolder>(), "Read the most recent dataframe from the store")
695701
.def("get_update_time",
@@ -747,7 +753,7 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
747753
tsd_proto.multi_key_meta(),
748754
std::vector<entity::AtomKey>{}
749755
};
750-
return adapt_read_df(std::move(res)); },
756+
return adapt_read_df(std::move(res), nullptr); },
751757
py::call_guard<SingleThreadMutexHolder>(), "Restore a previous version of a symbol.")
752758
.def("check_ref_key",
753759
&PythonVersionStore::check_ref_key,
@@ -782,15 +788,18 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
782788
const std::vector<VersionQuery>& version_queries,
783789
std::vector<std::shared_ptr<ReadQuery>>& read_queries,
784790
const ReadOptions& read_options){
791+
const OutputFormat output_format = read_options.output_format();
785792
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format());
786-
py::list result = python_util::adapt_read_dfs(v.batch_read(stream_ids, version_queries, read_queries, read_options));
787-
apply_global_refcounts(handler_data, read_options.output_format());
788-
return result;
793+
std::pair<std::any&, OutputFormat> handler{handler_data, output_format};
794+
return python_util::adapt_read_dfs(v.batch_read(stream_ids, version_queries, read_queries, read_options, handler_data), &handler);
789795
},
790796
py::call_guard<SingleThreadMutexHolder>(), "Read a dataframe from the store")
791797
.def("batch_read_keys",
792798
[&](PythonVersionStore& v, std::vector<AtomKey> atom_keys) {
793-
return python_util::adapt_read_dfs(frame_to_read_result(v.batch_read_keys(atom_keys)));
799+
constexpr OutputFormat output_format = OutputFormat::PANDAS;
800+
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(output_format);
801+
std::pair<std::any&, OutputFormat> handler{handler_data, output_format};
802+
return python_util::adapt_read_dfs(frame_to_read_result(v.batch_read_keys(atom_keys, handler_data)), &handler);
794803
},
795804
py::call_guard<SingleThreadMutexHolder>(), "Read a specific version of a dataframe from the store")
796805
.def("batch_write",
@@ -821,7 +830,7 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
821830
tsd_proto.user_meta(),
822831
tsd_proto.multi_key_meta(), {}};
823832

824-
output.emplace_back(adapt_read_df(std::move(res)));
833+
output.emplace_back(adapt_read_df(std::move(res), nullptr));
825834
}
826835
return output;
827836
},

0 commit comments

Comments
 (0)