Skip to content

Hold the GIL when incrementing None's refcount to prevent race conditions when there are multiple Python threads #2334

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
May 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
8d88aaa
Add a function to test if object is py none
Apr 9, 2025
1e6b92c
Fix double mutex unlock
Apr 10, 2025
60f4da4
Get rid of of gil safe py none
Apr 10, 2025
06fe25e
Refine taking of the spinlock
Apr 11, 2025
5393e2c
Address review comments
vasil-pashov Apr 14, 2025
7818c95
Create a single function incrementing None's refcount
vasil-pashov Apr 14, 2025
62cc298
Keep None refcount in python handler data
vasil-pashov Apr 16, 2025
e4f46d8
Fix C++ unit tests
vasil-pashov Apr 16, 2025
97670c9
Add stress test for None/NaN multithreaded handling
vasil-pashov Apr 17, 2025
4e7f3ec
Use std::fill_n instead of a for loop
vasil-pashov Apr 17, 2025
7d3c580
Merge branch 'master' into vasil.pashov/hold-gil-on-none-incref
vasil-pashov Apr 17, 2025
5680e2e
Move fill_with_none in python_util
vasil-pashov Apr 17, 2025
5b6b145
Do not increase None's refcount after calling python_util::fill_with_…
vasil-pashov Apr 17, 2025
4b6fa31
Address review comments
vasil-pashov Apr 23, 2025
e896668
Use np.nan instead of np.NaN as the latter is deprecated
vasil-pashov Apr 23, 2025
a1e6d15
Do not use np.asfortranarray in None stress test
vasil-pashov Apr 23, 2025
2c5ee55
Bring back asfortranarray for none GIL stresstest
vasil-pashov Apr 24, 2025
e443396
Change None GIL sterss test column generation
vasil-pashov Apr 24, 2025
8646f9d
Add read_batch stress test for Py_None GIL handling
vasil-pashov Apr 25, 2025
a337d12
Add test for QueryBuilder reads because it's a different codepath for…
vasil-pashov Apr 25, 2025
2d03dba
Extend unit tests for GIL handling of None refcounts to library tool …
vasil-pashov Apr 28, 2025
a72f055
Do not throw exception from PythonHandlerData's destructor. Move the
vasil-pashov May 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions cpp/arcticdb/async/test/test_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@
#include <arcticdb/storage/common.hpp>
#include <arcticdb/storage/config_resolvers.hpp>
#include <arcticdb/storage/library_index.hpp>
#include <arcticdb/storage/storage_factory.hpp>
#include <arcticdb/async/async_store.hpp>
#include <arcticdb/util/test/config_common.hpp>
#include <arcticdb/pipeline/frame_slice.hpp>
#include <arcticdb/stream/test/stream_test_common.hpp>
#include <arcticdb/util/random.h>

#include <fmt/format.h>

#include <string>
#include <vector>
Expand Down
7 changes: 3 additions & 4 deletions cpp/arcticdb/column_store/test/ingestion_stress_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <gtest/gtest.h>
#include <string>
#include <algorithm>
#include <fmt/format.h>

#include <arcticdb/util/random.h>
#include <arcticdb/util/timer.hpp>
Expand Down Expand Up @@ -128,7 +127,7 @@ TEST_F(IngestionStressStore, ScalarIntAppend) {
auto read_query = std::make_shared<ReadQuery>();
read_query->row_filter = universal_range();
register_native_handler_data_factory();
auto handler_data = get_type_handler_data(OutputFormat::NATIVE);
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE);
auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ro, handler_data);
GTEST_COUT << "columns in res: " << read_result.frame_data.index_columns().size();
}
Expand Down Expand Up @@ -216,7 +215,7 @@ TEST_F(IngestionStressStore, ScalarIntDynamicSchema) {
auto read_query = std::make_shared<ReadQuery>();
read_query->row_filter = universal_range();
register_native_handler_data_factory();
auto handler_data = get_type_handler_data(OutputFormat::NATIVE);
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE);
auto read_result = test_store_->read_dataframe_version_internal(symbol, VersionQuery{}, read_query, read_options, handler_data);
}

Expand Down Expand Up @@ -269,7 +268,7 @@ TEST_F(IngestionStressStore, DynamicSchemaWithStrings) {
auto read_query = std::make_shared<ReadQuery>();
read_query->row_filter = universal_range();
register_native_handler_data_factory();
auto handler_data = get_type_handler_data(OutputFormat::NATIVE);
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(OutputFormat::NATIVE);
auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, read_options, handler_data);
ARCTICDB_DEBUG(log::version(), "result columns: {}", read_result.frame_data.names());
}
2 changes: 0 additions & 2 deletions cpp/arcticdb/entity/output_format.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

#pragma once

#include <cstdint>

namespace arcticdb {
enum class OutputFormat : uint8_t {
NATIVE,
Expand Down
3 changes: 1 addition & 2 deletions cpp/arcticdb/entity/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
#include <arcticdb/util/preconditions.hpp>
#include <arcticdb/util/constructors.hpp>
#include <arcticdb/entity/output_format.hpp>
#include "arcticdb/storage/memory_layout.hpp"
#include <arcticdb/storage/memory_layout.hpp>

#include <cstdint>
#include <vector>
#include <string>
#include <type_traits>
Expand Down
11 changes: 8 additions & 3 deletions cpp/arcticdb/pipeline/pipeline_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <arcticdb/pipeline/column_mapping.hpp>
#include <arcticdb/column_store/memory_segment.hpp>
#include <arcticdb/util/type_handler.hpp>
#include <arcticdb/python/python_handler_data.hpp>

namespace arcticdb::pipelines {

Expand All @@ -35,7 +36,12 @@ inline void apply_type_handlers(SegmentInMemory seg, std::any& handler_data, Out
}
}

inline ReadResult read_result_from_single_frame(FrameAndDescriptor& frame_and_desc, const AtomKey& key, OutputFormat output_format) {
inline ReadResult read_result_from_single_frame(
FrameAndDescriptor& frame_and_desc,
const AtomKey& key,
std::any& handler_data,
OutputFormat output_format
) {
auto pipeline_context = std::make_shared<PipelineContext>(frame_and_desc.frame_.descriptor());
SliceAndKey sk{FrameSlice{frame_and_desc.frame_},key};
pipeline_context->slice_and_keys_.emplace_back(std::move(sk));
Expand All @@ -48,10 +54,9 @@ inline ReadResult read_result_from_single_frame(FrameAndDescriptor& frame_and_de
pipeline_context->begin()->set_string_pool(frame_and_desc.frame_.string_pool_ptr());
auto descriptor = std::make_shared<StreamDescriptor>(frame_and_desc.frame_.descriptor());
pipeline_context->begin()->set_descriptor(std::move(descriptor));
auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(output_format);
reduce_and_fix_columns(pipeline_context, frame_and_desc.frame_, ReadOptions{}, handler_data).get();
apply_type_handlers(frame_and_desc.frame_, handler_data, output_format);
return create_python_read_result(VersionedItem{key}, output_format, std::move(frame_and_desc));
}

}
}
3 changes: 1 addition & 2 deletions cpp/arcticdb/pipeline/read_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@
#include <arcticdb/pipeline/column_mapping.hpp>
#include <arcticdb/util/magic_num.hpp>
#include <arcticdb/codec/segment_identifier.hpp>
#include <arcticdb/util/spinlock.hpp>
#include <arcticdb/pipeline/string_reducers.hpp>
#include <arcticdb/pipeline/read_query.hpp>

#include <ankerl/unordered_dense.h>
#include <folly/gen/Base.h>


namespace arcticdb::pipelines {

Expand Down
6 changes: 5 additions & 1 deletion cpp/arcticdb/python/adapt_read_dataframe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@

#include <arcticdb/python/python_utils.hpp>
#include <arcticdb/entity/read_result.hpp>
#include <arcticdb/python/python_handler_data.hpp>

namespace arcticdb {

inline auto adapt_read_df = [](ReadResult && ret) -> py::tuple{
inline py::tuple adapt_read_df(ReadResult&& ret, std::pair<std::any&, OutputFormat>* const handler) {
if (handler) {
apply_global_refcounts(handler->first, handler->second);
}
auto pynorm = python_util::pb_to_python(ret.norm_meta);
auto pyuser_meta = python_util::pb_to_python(ret.user_meta);
auto multi_key_meta = python_util::pb_to_python(ret.multi_key_meta);
Expand Down
51 changes: 45 additions & 6 deletions cpp/arcticdb/python/python_handler_data.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include <arcticdb/util/spinlock.hpp>
#include <pybind11/pybind11.h>
#include <folly/ThreadCachedInt.h>

#include <memory>

Expand All @@ -24,19 +24,58 @@ struct PythonHandlerData {
})) {
}

SpinLock& spin_lock() {
util::check(spin_lock_, "Spinlock not set on python handler data");
return *spin_lock_;
void increment_none_refcount(size_t increment) {
none_refcount_->increment(increment);
}

std::shared_ptr<SpinLock> spin_lock_ = std::make_shared<SpinLock>();
void increment_nan_refcount(size_t increment) {
nan_refcount_->increment(increment);
}

bool is_nan_initialized() const {
return static_cast<bool>(py_nan_);
}

PyObject* non_owning_nan_handle() const {
return py_nan_->ptr();
}

/// The GIL must be acquired when this is called as it changes the refcount of the global static None variable which
/// can be used by other Python threads
void apply_none_refcount() {
const size_t cnt = none_refcount_->readFullAndReset();
internal::check<ErrorCode::E_ASSERTION_FAILURE>(PyGILState_Check(), "The thread incrementing None refcount must hold the GIL");
for(size_t i = 0; i < cnt; ++i) {
Py_INCREF(Py_None);
}
}

/// There is no need to hold the GIL for this operation as this python object was created by the
/// PythonHandlerData object on a read/read_batch/etc... operation and not handled to python yet.
void apply_nan_refcount() {
const size_t count = nan_refcount_->readFullAndReset();
for (size_t i = 0; i < count; ++i) {
Py_INCREF(py_nan_->ptr());
}
}
private:
std::shared_ptr<folly::ThreadCachedInt<uint64_t>> none_refcount_ = std::make_shared<folly::ThreadCachedInt<uint64_t>>();
std::shared_ptr<folly::ThreadCachedInt<uint64_t>> nan_refcount_ = std::make_shared<folly::ThreadCachedInt<uint64_t>>();
std::shared_ptr<py::handle> py_nan_;
};

inline void apply_global_refcounts(std::any& handler_data, OutputFormat output_format) {
if (output_format == OutputFormat::PANDAS) {
PythonHandlerData& python_handler_data = std::any_cast<PythonHandlerData&>(handler_data);
python_handler_data.apply_nan_refcount();
python_handler_data.apply_none_refcount();
}
}

struct PythonHandlerDataFactory : public TypeHandlerDataFactory {
std::any get_data() const override {
return {PythonHandlerData{}};
}
};

}
}
40 changes: 16 additions & 24 deletions cpp/arcticdb/python/python_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,9 @@

namespace arcticdb {

static inline PyObject** fill_with_none(PyObject** ptr_dest, size_t count, SpinLock& spin_lock) {
for(auto i = 0U; i < count; ++i) {
*ptr_dest++ = Py_None;
}
python_util::increment_none_refcount(count, spin_lock);
return ptr_dest;
}

static inline PyObject** fill_with_none(ChunkedBuffer& buffer, size_t offset, size_t count, SpinLock& spin_lock) {
static PyObject** fill_with_none(ChunkedBuffer& buffer, size_t offset, size_t count, PythonHandlerData& handler_data) {
auto dest = buffer.ptr_cast<PyObject*>(offset, count * sizeof(PyObject*));
return fill_with_none(dest, count, spin_lock);
return python_util::fill_with_none(dest, count, handler_data);
}

void PythonEmptyHandler::handle_type(
Expand Down Expand Up @@ -108,8 +100,8 @@ void PythonEmptyHandler::default_initialize(
size_t byte_size,
const DecodePathData&,
std::any& any) const {
auto& handler_data = get_handler_data(any);
fill_with_none(buffer, bytes_offset, byte_size / type_size(), handler_data.spin_lock());
auto& handler_data = cast_handler_data(any);
fill_with_none(buffer, bytes_offset, byte_size / type_size(), handler_data);
}

void PythonBoolHandler::handle_type(
Expand Down Expand Up @@ -152,16 +144,16 @@ void PythonBoolHandler::convert_type(
util::check(dest_data != nullptr, "Got null destination pointer");
auto ptr_dest = reinterpret_cast<PyObject**>(dest_data);
if (sparse_map.has_value()) {
auto& handler_data = get_handler_data(any);
auto& handler_data = cast_handler_data(any);
ARCTICDB_TRACE(log::codec(), "Bool handler using a sparse map");
unsigned last_row = 0u;
for (auto en = sparse_map->first(); en < sparse_map->end(); ++en, ++last_row) {
const auto current_pos = *en;
ptr_dest = fill_with_none(ptr_dest, current_pos - last_row, handler_data.spin_lock());
ptr_dest = python_util::fill_with_none(ptr_dest, current_pos - last_row, handler_data);
last_row = current_pos;
*ptr_dest++ = py::bool_(static_cast<bool>(*ptr_src++)).release().ptr();
}
fill_with_none(ptr_dest, mapping.num_rows_ - last_row, handler_data.spin_lock());
python_util::fill_with_none(ptr_dest, mapping.num_rows_ - last_row, handler_data);
} else {
ARCTICDB_TRACE(log::codec(), "Bool handler didn't find a sparse map. Assuming dense array.");
std::transform(ptr_src, ptr_src + num_bools, ptr_dest, [](uint8_t value) {
Expand All @@ -179,8 +171,8 @@ TypeDescriptor PythonBoolHandler::output_type(const TypeDescriptor&) const {
}

void PythonBoolHandler::default_initialize(ChunkedBuffer& buffer, size_t bytes_offset, size_t byte_size, const DecodePathData&, std::any& any) const {
auto& handler_data = get_handler_data(any);
fill_with_none(buffer, bytes_offset, byte_size / type_size(), handler_data.spin_lock());
auto& handler_data = cast_handler_data(any);
fill_with_none(buffer, bytes_offset, byte_size / type_size(), handler_data);
}

void PythonStringHandler::handle_type(
Expand Down Expand Up @@ -230,7 +222,7 @@ void PythonStringHandler::convert_type(
const std::shared_ptr<StringPool>& string_pool) const {
auto dest_data = dest_column.bytes_at(mapping.offset_bytes_, mapping.num_rows_ * sizeof(PyObject*));
auto ptr_dest = reinterpret_cast<PyObject**>(dest_data);
DynamicStringReducer string_reducer{shared_data, get_handler_data(handler_data), ptr_dest, mapping.num_rows_};
DynamicStringReducer string_reducer{shared_data, cast_handler_data(handler_data), ptr_dest, mapping.num_rows_};
string_reducer.reduce(source_column, mapping.source_type_desc_, mapping.dest_type_desc_, mapping.num_rows_, *string_pool, source_column.opt_sparse_map());
string_reducer.finalize();
}
Expand All @@ -244,8 +236,8 @@ TypeDescriptor PythonStringHandler::output_type(const TypeDescriptor& input_type
}

void PythonStringHandler::default_initialize(ChunkedBuffer& buffer, size_t bytes_offset, size_t byte_size, const DecodePathData&, std::any& any) const {
auto& handler_data = get_handler_data(any);
fill_with_none(buffer, bytes_offset, byte_size / type_size(), handler_data.spin_lock());
auto& handler_data = cast_handler_data(any);
fill_with_none(buffer, bytes_offset, byte_size / type_size(), handler_data);
}

[[nodiscard]] static inline py::dtype generate_python_dtype(const TypeDescriptor &td, stride_t type_byte_size) {
Expand Down Expand Up @@ -313,8 +305,8 @@ void PythonArrayHandler::convert_type(

auto column_data = source_column.data();
if (source_column.is_sparse()) {
auto& handler_data = get_handler_data(any);
python_util::prefill_with_none(ptr_dest, mapping.num_rows_, source_column.sparse_map().count(), handler_data.spin_lock());
auto& handler_data = cast_handler_data(any);
python_util::prefill_with_none(ptr_dest, mapping.num_rows_, source_column.sparse_map().count(), handler_data);

auto en = sparse_map->first();

Expand Down Expand Up @@ -361,8 +353,8 @@ int PythonArrayHandler::type_size() const {
}

void PythonArrayHandler::default_initialize(ChunkedBuffer& buffer, size_t offset, size_t byte_size, const DecodePathData&, std::any& any) const {
auto& handler_data = get_handler_data(any);
fill_with_none(buffer, offset, byte_size / type_size(), handler_data.spin_lock());
auto& handler_data = cast_handler_data(any);
fill_with_none(buffer, offset, byte_size / type_size(), handler_data);
}

} //namespace arcticdb
10 changes: 4 additions & 6 deletions cpp/arcticdb/python/python_strings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ DynamicStringReducer::DynamicStringReducer(
handler_data_(handler_data),
ptr_dest_(ptr_dest),
total_rows_(total_rows) {
util::check(static_cast<bool>(handler_data_.py_nan_), "Got null nan in string reducer");
util::check(is_py_nan(handler_data_.py_nan_->ptr()), "Got the wrong value in global nan");
util::check(handler_data_.is_nan_initialized(), "Got null nan in string reducer");
util::check(is_py_nan(handler_data_.non_owning_nan_handle()), "Got the wrong value in global nan");
}

void DynamicStringReducer::reduce(const Column& source_column,
Expand All @@ -93,10 +93,8 @@ void DynamicStringReducer::reduce(const Column& source_column,
void DynamicStringReducer::finalize() {
if (row_ != total_rows_) {
const auto diff = total_rows_ - row_;
for (; row_ < total_rows_; ++row_, ++ptr_dest_) {
*ptr_dest_ = Py_None;
}
python_util::increment_none_refcount(diff, handler_data_.spin_lock());
ptr_dest_ = python_util::fill_with_none(ptr_dest_, diff, handler_data_);
row_ = total_rows_;
}
}

Expand Down
Loading
Loading