From dce6eae89c4370e6d2433953ba180b2d73ddc165 Mon Sep 17 00:00:00 2001 From: Ivo Dilov Date: Wed, 12 Mar 2025 16:24:01 +0200 Subject: [PATCH] TODO: lazy-collect-schema --- cpp/arcticdb/entity/descriptor_item.hpp | 9 +- cpp/arcticdb/entity/timeseries_descriptor.hpp | 8 + .../pipeline/index_segment_reader.cpp | 1 + .../pipeline/index_segment_reader.hpp | 11 ++ cpp/arcticdb/pipeline/query.hpp | 11 ++ .../processing/operation_dispatch_binary.hpp | 1 - cpp/arcticdb/storage/file/file_store.hpp | 2 +- .../version/local_versioned_engine.cpp | 137 ++++++++++------ .../version/local_versioned_engine.hpp | 13 +- cpp/arcticdb/version/python_bindings.cpp | 11 +- cpp/arcticdb/version/version_core.cpp | 112 +++++++++---- cpp/arcticdb/version/version_core.hpp | 22 ++- cpp/arcticdb/version/version_store_api.cpp | 9 + cpp/arcticdb/version/version_store_api.hpp | 6 + python/arcticdb/version_store/_store.py | 63 +++++-- python/arcticdb/version_store/library.py | 63 ++++++- .../version_store/test_lazy_dataframe.py | 154 +++++++++++++++++- 17 files changed, 525 insertions(+), 108 deletions(-) diff --git a/cpp/arcticdb/entity/descriptor_item.hpp b/cpp/arcticdb/entity/descriptor_item.hpp index 6defea51f3..db5e9b9179 100644 --- a/cpp/arcticdb/entity/descriptor_item.hpp +++ b/cpp/arcticdb/entity/descriptor_item.hpp @@ -18,12 +18,15 @@ struct DescriptorItem { entity::AtomKey &&key, std::optional start_index, std::optional end_index, - std::optional timeseries_descriptor) : + std::optional timeseries_descriptor, + std::optional> cached_index + ) : key_(std::move(key)), start_index_(start_index), end_index_(end_index), - timeseries_descriptor_(std::move(timeseries_descriptor)) { + timeseries_descriptor_(std::move(timeseries_descriptor)), + cached_index_(std::move(cached_index)) { } DescriptorItem() = delete; @@ -32,6 +35,7 @@ struct DescriptorItem { std::optional start_index_; std::optional end_index_; std::optional timeseries_descriptor_; + std::optional> cached_index_; std::string symbol() const { return fmt::format("{}", key_.id()); } uint64_t version() const { return key_.version_id(); } @@ -39,5 +43,6 @@ struct DescriptorItem { std::optional start_index() const { return start_index_; } std::optional end_index() const { return end_index_; } std::optional timeseries_descriptor() const { return timeseries_descriptor_; } + std::optional> cached_index() const { return cached_index_; } }; } \ No newline at end of file diff --git a/cpp/arcticdb/entity/timeseries_descriptor.hpp b/cpp/arcticdb/entity/timeseries_descriptor.hpp index 76a2f34c25..275b7afec3 100644 --- a/cpp/arcticdb/entity/timeseries_descriptor.hpp +++ b/cpp/arcticdb/entity/timeseries_descriptor.hpp @@ -96,6 +96,14 @@ struct TimeseriesDescriptor { *proto_->mutable_multi_key_meta() = std::move(multi_key_meta); } + void set_output_schema(OutputSchema&& output_schema) { + this->set_normalization_metadata(std::move(output_schema.norm_metadata_)); + const auto& stream_descriptor = output_schema.stream_descriptor(); + segment_desc_ = stream_descriptor.segment_desc_; + fields_ = stream_descriptor.fields_; + stream_id_ = stream_descriptor.stream_id_; + } + [[nodiscard]] std::shared_ptr fields_ptr() const { return fields_; } diff --git a/cpp/arcticdb/pipeline/index_segment_reader.cpp b/cpp/arcticdb/pipeline/index_segment_reader.cpp index 5a8a718efc..bb20465d0c 100644 --- a/cpp/arcticdb/pipeline/index_segment_reader.cpp +++ b/cpp/arcticdb/pipeline/index_segment_reader.cpp @@ -42,6 +42,7 @@ bool IndexSegmentReader::empty() const { return seg_.empty(); } + // TODO: Remove IndexRange get_index_segment_range( const AtomKey& prev_index, const std::shared_ptr& store) { diff --git a/cpp/arcticdb/pipeline/index_segment_reader.hpp b/cpp/arcticdb/pipeline/index_segment_reader.hpp index edfb10b803..9e473a0d60 100644 --- a/cpp/arcticdb/pipeline/index_segment_reader.hpp +++ b/cpp/arcticdb/pipeline/index_segment_reader.hpp @@ -142,4 +142,15 @@ IndexRange get_index_segment_range( void check_column_and_date_range_filterable(const IndexSegmentReader& index_segment_reader, const ReadQuery& read_query); +// TODO: Consider renaming to IndexKeyAndSegment or something. It is not used only as a cache but as intermediate steps in some calls e.g. get_descriptor. +// Can be used to cache index key and index segment between runs, to avoid re-reading from storage +struct CachedIndex { + AtomKey index_key; + IndexSegmentReader isr; + + CachedIndex(AtomKey&& index_key, SegmentInMemory&& seg) : index_key(std::move(index_key)), isr(std::move(seg)) { + util::check(index_key.type() == KeyType::TABLE_INDEX, "Expected an index key in CachedIndex but got {}", index_key.type()); + } +}; + } // namespace arcticdb::pipelines::index \ No newline at end of file diff --git a/cpp/arcticdb/pipeline/query.hpp b/cpp/arcticdb/pipeline/query.hpp index d0229ec93f..56f129d41c 100644 --- a/cpp/arcticdb/pipeline/query.hpp +++ b/cpp/arcticdb/pipeline/query.hpp @@ -52,17 +52,28 @@ using VersionQueryType = std::variant< struct VersionQuery { VersionQueryType content_; + // TODO: Doc explaining this + // We can't escape from the shared pointer because this needs to be able to live in python + // TODO: Think about const + std::optional> cached_index_; void set_snap_name(const std::string& snap_name) { content_ = SnapshotVersionQuery{snap_name}; + cached_index_ = std::nullopt; } void set_timestamp(timestamp ts, bool iterate_snapshots_if_tombstoned) { content_ = TimestampVersionQuery{ts, iterate_snapshots_if_tombstoned}; + cached_index_ = std::nullopt; } void set_version(SignedVersionId version, bool iterate_snapshots_if_tombstoned) { content_ = SpecificVersionQuery{version, iterate_snapshots_if_tombstoned}; + cached_index_ = std::nullopt; + } + + void set_cached_index(std::shared_ptr cached_index) { + cached_index_ = cached_index; } }; diff --git a/cpp/arcticdb/processing/operation_dispatch_binary.hpp b/cpp/arcticdb/processing/operation_dispatch_binary.hpp index f26882bf8d..19613e4951 100644 --- a/cpp/arcticdb/processing/operation_dispatch_binary.hpp +++ b/cpp/arcticdb/processing/operation_dispatch_binary.hpp @@ -382,7 +382,6 @@ VariantData binary_operator(const ColumnWithStrings& col, const Value& val, Func details::visit_type(val.type().data_type(), [&](auto val_tag) { using val_type_info = ScalarTypeInfo; if constexpr(!is_numeric_type(col_type_info::data_type) || !is_numeric_type(val_type_info::data_type)) { - std::string error_message; user_input::raise("Non-numeric type provided to binary operation: {}", binary_operation_with_types_to_string( col.column_name_, diff --git a/cpp/arcticdb/storage/file/file_store.hpp b/cpp/arcticdb/storage/file/file_store.hpp index 51a5234236..f49f5549eb 100644 --- a/cpp/arcticdb/storage/file/file_store.hpp +++ b/cpp/arcticdb/storage/file/file_store.hpp @@ -125,6 +125,6 @@ version_store::ReadVersionOutput read_dataframe_from_file_internal( const auto header_offset = key_data.key_offset_ + key_data.key_size_; ARCTICDB_DEBUG(log::storage(), "Got header offset at {}", header_offset); single_file_storage->load_header(header_offset, data_end - header_offset); - return version_store::read_frame_for_version(store, versioned_item, read_query, read_options, handler_data).get(); + return version_store::read_frame_for_version(store, version_store::StoredIndexKey{versioned_item}, read_query, read_options, handler_data).get(); } } //namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 851d3d91f6..55170dcf20 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -376,6 +376,10 @@ std::optional LocalVersionedEngine::get_version_to_read( const StreamId &stream_id, const VersionQuery &version_query ) { + if (version_query.cached_index_.has_value()) { + auto index_key = version_query.cached_index_.value()->index_key; + return VersionedItem{std::move(index_key)}; + } return util::variant_match(version_query.content_, [&stream_id, &version_query, this](const SpecificVersionQuery &specific) { return get_specific_version(stream_id, specific.version_id_, version_query); @@ -392,6 +396,7 @@ std::optional LocalVersionedEngine::get_version_to_read( ); } +// TODO: Remove this, it is unused IndexRange LocalVersionedEngine::get_index_range( const StreamId &stream_id, const VersionQuery& version_query) { @@ -402,24 +407,28 @@ IndexRange LocalVersionedEngine::get_index_range( return index::get_index_segment_range(version->key_, store()); } -std::variant get_version_identifier( + +IndexSource LocalVersionedEngine::get_index_source( const StreamId& stream_id, const VersionQuery& version_query, - const ReadOptions& read_options, - const std::optional& version) { + const ReadOptions& read_options) { + auto version = get_version_to_read(stream_id, version_query); if (!version) { if (opt_false(read_options.incompletes())) { log::version().warn("No index: Key not found for {}, will attempt to use incomplete segments.", stream_id); - return stream_id; + return Incompletes{stream_id}; } else { missing_data::raise( - "read_dataframe_version: version matching query '{}' not found for symbol '{}'", + "get_index_source: version matching query '{}' not found for symbol '{}'", version_query, stream_id ); } } - return *version; + if (version_query.cached_index_.has_value()) { + return CachedIndexKey{std::move(version.value()), version_query.cached_index_.value()}; + } + return StoredIndexKey{std::move(version.value())}; } ReadVersionOutput LocalVersionedEngine::read_dataframe_version_internal( @@ -429,43 +438,53 @@ ReadVersionOutput LocalVersionedEngine::read_dataframe_version_internal( const ReadOptions& read_options, std::any& handler_data) { py::gil_scoped_release release_gil; - auto version = get_version_to_read(stream_id, version_query); - const auto identifier = get_version_identifier(stream_id, version_query, read_options, version); - return read_frame_for_version(store(), identifier, read_query, read_options, handler_data).get(); + auto index_source = get_index_source(stream_id, version_query, read_options); + return read_frame_for_version(store(), std::move(index_source), read_query, read_options, handler_data).get(); } -folly::Future LocalVersionedEngine::get_descriptor( - AtomKey&& k){ - const auto key = std::move(k); - return store()->read(key) - .thenValue([](auto&& key_seg_pair) -> DescriptorItem { - auto key = to_atom(std::move(key_seg_pair.first)); - auto seg = std::move(key_seg_pair.second); - std::optional timeseries_descriptor; - if (seg.has_index_descriptor()) - timeseries_descriptor.emplace(seg.index_descriptor()); - - std::optional start_index; - std::optional end_index; - if (seg.row_count() > 0) { - const auto& start_index_column = seg.column(position_t(index::Fields::start_index)); - details::visit_type(start_index_column.type().data_type(), [&start_index_column, &start_index](auto column_desc_tag) { - using type_info = ScalarTypeInfo; - if constexpr (is_time_type(type_info::data_type)) { - start_index = start_index_column.template scalar_at(0); - } - }); +DescriptorItem get_descriptor_from_index(std::shared_ptr index) { + const auto& isr = index->isr; + auto timeseries_descriptor = isr.tsd(); - const auto& end_index_column = seg.column(position_t(index::Fields::end_index)); - details::visit_type(end_index_column.type().data_type(), [&end_index_column, &end_index, row_count=seg.row_count()](auto column_desc_tag) { - using type_info = ScalarTypeInfo; - if constexpr (is_time_type(type_info::data_type)) { - // -1 as the end timestamp in the data keys is one nanosecond greater than the last value in the index column - end_index = *end_index_column.template scalar_at(row_count - 1) - 1; - } - }); + std::optional start_index; + std::optional end_index; + if (isr.size() > 0) { + auto start_index_value = isr.begin()->key().start_index(); + if (std::holds_alternative(start_index_value)) { + start_index = std::get(start_index_value); + } + auto last_index_value = isr.last()->key().end_index(); + if (std::holds_alternative(last_index_value)) { + end_index = std::get(last_index_value); } - return DescriptorItem{std::move(key), start_index, end_index, std::move(timeseries_descriptor)}; + } + auto key = index->index_key; + return DescriptorItem{std::move(key), start_index, end_index, std::move(timeseries_descriptor), index}; +} + +folly::Future LocalVersionedEngine::get_descriptor( + IndexSource&& index_source) { + auto index = util::variant_match( + index_source, + [this] (StoredIndexKey& source) -> folly::Future> { + auto key = source.versioned_item.key_; + return this->store()->read(std::move(key)) + .thenValue([](auto&& key_seg_pair) { + auto key = to_atom(std::move(key_seg_pair.first)); + auto seg = std::move(key_seg_pair.second); + return std::make_shared(std::move(key), std::move(seg)); + }); + }, + [] (CachedIndexKey& source) -> folly::Future> { + return {std::move(source.cached_index)}; + }, + [] (Incompletes& source) -> folly::Future> { + // TODO: Currently there is no user facing API to get the schema with incomplete=True. If we decide to + // introduce this we'll need to deduplicate this logic with `read_incompletes_to_pipeline` + internal::raise("User reading only the desriptor of incompletes is not implemented for {}", source.stream_id); + }); + return std::move(index).thenValue([](auto&& index) -> DescriptorItem { + return get_descriptor_from_index(index); }); } @@ -477,7 +496,7 @@ folly::Future LocalVersionedEngine::get_descriptor_async( .thenValue([this, &stream_id, &version_query](std::optional&& opt_index_key){ missing_data::check(opt_index_key.has_value(), "Unable to retrieve descriptor data. {}@{}: version not found", stream_id, version_query); - return get_descriptor(std::move(*opt_index_key)); + return get_descriptor(StoredIndexKey{VersionedItem{std::move(*opt_index_key)}}); }).via(&async::cpu_executor()); } @@ -486,10 +505,11 @@ DescriptorItem LocalVersionedEngine::read_descriptor_internal( const VersionQuery& version_query ) { ARCTICDB_SAMPLE(ReadDescriptor, 0) + // if (version_query.cached_index_) auto version = get_version_to_read(stream_id, version_query); missing_data::check(version.has_value(), "Unable to retrieve descriptor data. {}@{}: version not found", stream_id, version_query); - return get_descriptor(std::move(version->key_)).get(); + return get_descriptor(StoredIndexKey{std::move(version.value())}).get(); } @@ -497,7 +517,6 @@ std::vector> LocalVersionedEngine::batch const std::vector& stream_ids, const std::vector& version_queries, const ReadOptions& read_options) { - internal::check(read_options.batch_throw_on_error().has_value(), "ReadOptions::batch_throw_on_error_ should always be set here"); @@ -513,6 +532,30 @@ std::vector> LocalVersionedEngine::batch return transform_batch_items_or_throw(std::move(descriptors), stream_ids, flags, version_queries); } +// TODO: Maybe should live in a different place +TimeseriesDescriptor compute_output_timeseries_descriptor(const TimeseriesDescriptor& input_tsd, const ReadQuery& read_query) { + auto tsd = input_tsd.clone(); + auto input_schema = OutputSchema(tsd.as_stream_descriptor(), tsd.normalization()); + auto output_schema = compute_output_schema(std::move(input_schema), read_query); + tsd.set_output_schema(std::move(output_schema)); + return tsd; +} + +// TODO: Think whether this doesn't make sense to become part of read_descriptor with optional ReadQuery argument. +DescriptorItem LocalVersionedEngine::read_output_schema_internal( + const StreamId& stream_id, + const VersionQuery& version_query, + [[maybe_unused]] const ReadQuery& read_query, + const ReadOptions& read_options) { + auto index_source = get_index_source(stream_id, version_query, read_options); + auto descriptor = get_descriptor(std::move(index_source)).get(); + if (descriptor.timeseries_descriptor_.has_value()) { + auto output_tsd = compute_output_timeseries_descriptor(descriptor.timeseries_descriptor_.value(), read_query); + descriptor.timeseries_descriptor_ = output_tsd; + } + return descriptor; +} + void LocalVersionedEngine::flush_version_map() { version_map()->flush(); } @@ -1106,7 +1149,7 @@ std::vector LocalVersionedEngine::batch_read_keys(const std:: std::vector> res; res.reserve(keys.size()); for (const auto& index_key: keys) { - res.emplace_back(read_frame_for_version(store(), {index_key}, std::make_shared(), ReadOptions{}, handler_data)); + res.emplace_back(read_frame_for_version(store(), StoredIndexKey{VersionedItem{index_key}}, std::make_shared(), ReadOptions{}, handler_data)); } Allocator::instance()->trim(); return folly::collect(res).get(); @@ -1139,19 +1182,19 @@ std::vector> LocalVersionedEngine::ba read_query = read_queries.empty() ? std::make_shared(): read_queries[idx], &read_options, &handler_data](auto&& opt_index_key) { - std::variant version_info; + IndexSource index_source; if (opt_index_key.has_value()) { - version_info = VersionedItem(std::move(*opt_index_key)); + index_source = StoredIndexKey{VersionedItem(std::move(*opt_index_key))}; } else { if (opt_false(read_options.incompletes())) { log::version().warn("No index: Key not found for {}, will attempt to use incomplete segments.", stream_ids[idx]); - version_info = stream_ids[idx]; + index_source = Incompletes{stream_ids[idx]}; } else { missing_data::raise( "batch_read_internal: version matching query '{}' not found for symbol '{}'", version_queries[idx], stream_ids[idx]); } } - return read_frame_for_version(store, version_info, read_query, read_options, handler_data); + return read_frame_for_version(store, std::move(index_source), read_query, read_options, handler_data); }) ); if(++batch_count == static_cast(max_batch_size)) { diff --git a/cpp/arcticdb/version/local_versioned_engine.hpp b/cpp/arcticdb/version/local_versioned_engine.hpp index dd421782e9..7bb041c4cb 100644 --- a/cpp/arcticdb/version/local_versioned_engine.hpp +++ b/cpp/arcticdb/version/local_versioned_engine.hpp @@ -137,6 +137,11 @@ class LocalVersionedEngine : public VersionedEngine { const VersionQuery& version_query ); + IndexSource get_index_source( + const StreamId& stream_id, + const VersionQuery& version_query, + const ReadOptions& read_options); + ReadVersionOutput read_dataframe_version_internal( const StreamId &stream_id, const VersionQuery& version_query, @@ -148,6 +153,12 @@ class LocalVersionedEngine : public VersionedEngine { const StreamId& stream_id, const VersionQuery& version_query); + DescriptorItem read_output_schema_internal( + const StreamId& stream_id, + const VersionQuery& version_query, + const ReadQuery& read_query, + const ReadOptions& read_options); + void write_parallel_frame( const StreamId& stream_id, const std::shared_ptr& frame, @@ -196,7 +207,7 @@ class LocalVersionedEngine : public VersionedEngine { const VersionQuery& version_query); folly::Future get_descriptor( - AtomKey&& key); + IndexSource&& index_source); folly::Future get_descriptor_async( folly::Future>&& opt_index_key_fut, diff --git a/cpp/arcticdb/version/python_bindings.cpp b/cpp/arcticdb/version/python_bindings.cpp index abca754ba6..3b64102237 100644 --- a/cpp/arcticdb/version/python_bindings.cpp +++ b/cpp/arcticdb/version/python_bindings.cpp @@ -209,7 +209,8 @@ void register_bindings(py::module &version, py::exception(version, "OutputFormat") .value("PANDAS", OutputFormat::PANDAS) @@ -311,13 +312,16 @@ void register_bindings(py::module &version, py::exception>(version, "CachedIndex"); + py::class_(version, "DescriptorItem") .def_property_readonly("symbol", &DescriptorItem::symbol) .def_property_readonly("version", &DescriptorItem::version) .def_property_readonly("start_index", &DescriptorItem::start_index) .def_property_readonly("end_index", &DescriptorItem::end_index) .def_property_readonly("creation_ts", &DescriptorItem::creation_ts) - .def_property_readonly("timeseries_descriptor", &DescriptorItem::timeseries_descriptor); + .def_property_readonly("timeseries_descriptor", &DescriptorItem::timeseries_descriptor) + .def_property_readonly("cached_index", &DescriptorItem::cached_index); py::class_>(version, "FrameSlice") .def_property_readonly("col_range", &pipelines::FrameSlice::columns) @@ -724,6 +728,9 @@ void register_bindings(py::module &version, py::exception(), "Get back the descriptor of a list of symbols.") + .def("_read_output_schema", + &PythonVersionStore::read_output_schema + ) .def("restore_version", [&](PythonVersionStore& v, StreamId sid, const VersionQuery& version_query, const ReadOptions& read_options) { auto [vit, tsd] = v.restore_version(sid, version_query); diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 405b175baf..0dcd98a088 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -516,7 +516,7 @@ folly::Future read_multi_key( VersionedItem versioned_item{std::move(dup)}; TimeseriesDescriptor multi_key_desc{index_key_seg.index_descriptor()}; - return read_frame_for_version(store, versioned_item, std::make_shared(), ReadOptions{}, handler_data) + return read_frame_for_version(store, StoredIndexKey{versioned_item}, std::make_shared(), ReadOptions{}, handler_data) .thenValue([multi_key_desc=std::move(multi_key_desc), keys=std::move(keys), key=std::move(key)](ReadVersionOutput&& read_version_output) mutable { multi_key_desc.mutable_proto().mutable_normalization()->CopyFrom(read_version_output.frame_and_descriptor_.desc_.proto().normalization()); read_version_output.frame_and_descriptor_.desc_ = std::move(multi_key_desc); @@ -526,6 +526,34 @@ folly::Future read_multi_key( }); } +OutputSchema columns_filter_output_schema(OutputSchema&& output_schema, const std::vector& columns_filter) { + auto columns_set = std::unordered_set(columns_filter.begin(), columns_filter.end()); + // TODO: Inefficient, copies instead of move + const auto& input_stream_desc = output_schema.stream_descriptor(); + StreamDescriptor stream_desc(input_stream_desc.id()); + stream_desc.set_index(input_stream_desc.index()); + + for (auto idx=0u; idx < input_stream_desc.fields().size(); ++idx) { + const auto& field = input_stream_desc.fields()[idx]; + if (idx < stream_desc.index().field_count() || columns_set.contains(std::string(field.name()))) { + stream_desc.add_field(field); + } + } + output_schema.set_stream_descriptor(std::move(stream_desc)); + return output_schema; +} + +OutputSchema compute_output_schema(OutputSchema&& input_schema, const ReadQuery& read_query) { + OutputSchema output_schema = std::move(input_schema); + for (auto clause : read_query.clauses_) { + output_schema = clause->modify_schema(std::move(output_schema)); + } + if (read_query.columns.has_value()) { + output_schema = columns_filter_output_schema(std::move(output_schema), read_query.columns.value()); + } + return output_schema; +} + void add_slice_to_component_manager( EntityId entity_id, pipelines::SegmentAndSlice& segment_and_slice, @@ -1016,16 +1044,11 @@ void check_multi_key_is_not_index_only( } void read_indexed_keys_to_pipeline( - const std::shared_ptr& store, + const index::IndexSegmentReader& index_segment_reader, const std::shared_ptr& pipeline_context, const VersionedItem& version_info, ReadQuery& read_query, const ReadOptions& read_options) { - auto maybe_reader = get_index_segment_reader(store, pipeline_context, version_info); - if(!maybe_reader) - return; - - auto index_segment_reader = std::move(*maybe_reader); ARCTICDB_DEBUG(log::version(), "Read index segment with {} keys", index_segment_reader.size()); check_can_read_index_only_if_required(index_segment_reader, read_query); check_column_and_date_range_filterable(index_segment_reader, read_query); @@ -1046,12 +1069,26 @@ void read_indexed_keys_to_pipeline( pipeline_context->slice_and_keys_ = filter_index(index_segment_reader, combine_filter_functions(queries)); pipeline_context->total_rows_ = pipeline_context->calc_rows(); pipeline_context->rows_ = index_segment_reader.tsd().total_rows(); - pipeline_context->norm_meta_ = std::make_unique(std::move(*index_segment_reader.mutable_tsd().mutable_proto().mutable_normalization())); - pipeline_context->user_meta_ = std::make_unique(std::move(*index_segment_reader.mutable_tsd().mutable_proto().mutable_user_meta())); + // TODO: Think about not moving metadata out of index segment reader. Previously we used to move which can prove expensive if metadata is very large + pipeline_context->norm_meta_ = std::make_unique(index_segment_reader.tsd().proto().normalization()); + pipeline_context->user_meta_ = std::make_unique(index_segment_reader.tsd().proto().user_meta()); pipeline_context->bucketize_dynamic_ = bucketize_dynamic; ARCTICDB_DEBUG(log::version(), "read_indexed_keys_to_pipeline: Symbol {} Found {} keys with {} total rows", pipeline_context->slice_and_keys_.size(), pipeline_context->total_rows_, version_info.symbol()); } +void read_indexed_keys_to_pipeline( + const std::shared_ptr& store, + const std::shared_ptr& pipeline_context, + const VersionedItem& version_info, + ReadQuery& read_query, + const ReadOptions& read_options) { + auto maybe_reader = get_index_segment_reader(store, pipeline_context, version_info); + if(!maybe_reader) + return; + + read_indexed_keys_to_pipeline(maybe_reader.value(), pipeline_context, version_info, read_query, read_options); +} + // Returns true if there are staged segments bool read_incompletes_to_pipeline( const std::shared_ptr& store, @@ -2026,34 +2063,43 @@ void set_row_id_if_index_only( // part of a dataframe as-is, or transforms it via a processing pipeline folly::Future read_frame_for_version( const std::shared_ptr& store, - const std::variant& version_info, + IndexSource&& index_source, const std::shared_ptr& read_query , const ReadOptions& read_options, std::any& handler_data) { using namespace arcticdb::pipelines; auto pipeline_context = std::make_shared(); - VersionedItem res_versioned_item; - - if(std::holds_alternative(version_info)) { - pipeline_context->stream_id_ = std::get(version_info); - // This isn't ideal. It would be better if the version() and timestamp() methods on the C++ VersionedItem class - // returned optionals, but this change would bubble up to the Python VersionedItem class defined in _store.py. - // This class is very hard to change at this point, as users do things like pickling them to pass them around. - // This at least gets the symbol attribute of VersionedItem correct. The creation timestamp will be zero, which - // corresponds to 1970, and so with this obviously ridiculous version ID, it should be clear to users that these - // values are meaningless before an indexed version exists. - res_versioned_item = VersionedItem(AtomKeyBuilder() - .version_id(std::numeric_limits::max()) - .build(std::get(version_info))); - } else { - pipeline_context->stream_id_ = std::get(version_info).key_.id(); - read_indexed_keys_to_pipeline(store, pipeline_context, std::get(version_info), *read_query, read_options); - res_versioned_item = std::get(version_info); - } + auto read_index_from_incompletes = false; + auto versioned_item = util::variant_match( + index_source, + [&](Incompletes& source) { + util::check(read_options.incompletes() == std::make_optional(true), "Expected TODO"); + read_index_from_incompletes = true; + // This isn't ideal. It would be better if the version() and timestamp() methods on the C++ VersionedItem class + // returned optionals, but this change would bubble up to the Python VersionedItem class defined in _store.py. + // This class is very hard to change at this point, as users do things like pickling them to pass them around. + // This at least gets the symbol attribute of VersionedItem correct. The creation timestamp will be zero, which + // corresponds to 1970, and so with this obviously ridiculous version ID, it should be clear to users that these + // values are meaningless before an indexed version exists. + return VersionedItem(AtomKeyBuilder() + .version_id(std::numeric_limits::max()) + .build(source.stream_id)); + }, + [&](StoredIndexKey& source){ + read_indexed_keys_to_pipeline(store, pipeline_context, source.versioned_item, *read_query, read_options); + return std::move(source.versioned_item); + }, + [&](CachedIndexKey& source){ + read_indexed_keys_to_pipeline(source.cached_index->isr, pipeline_context, source.versioned_item, *read_query, read_options); + return std::move(source.versioned_item); + } + // TODO: Add a variant for multi-key + ); + pipeline_context->stream_id_ = versioned_item.symbol(); if(pipeline_context->multi_key_) { check_multi_key_is_not_index_only(*pipeline_context, *read_query); - return read_multi_key(store, *pipeline_context->multi_key_, handler_data, std::move(res_versioned_item.key_)); + return read_multi_key(store, *pipeline_context->multi_key_, handler_data, std::move(versioned_item.key_)); } if(opt_false(read_options.incompletes())) { @@ -2064,7 +2110,7 @@ folly::Future read_frame_for_version( read_incompletes_to_pipeline(store, pipeline_context, *read_query, read_options, false, false, false, opt_false(read_options.dynamic_schema())); } - if(std::holds_alternative(version_info) && !pipeline_context->incompletes_after_) { + if(read_index_from_incompletes && !pipeline_context->incompletes_after_) { missing_data::raise( "read_dataframe_impl: read returned no data for symbol {} (found no versions or append data)", pipeline_context->stream_id_); } @@ -2075,13 +2121,13 @@ folly::Future read_frame_for_version( DecodePathData shared_data; return do_direct_read_or_process(store, read_query, read_options, pipeline_context, shared_data, handler_data) - .thenValue([res_versioned_item, pipeline_context, read_options, &handler_data, read_query, shared_data](auto&& frame) mutable { + .thenValue([versioned_item, pipeline_context, read_options, &handler_data, read_query, shared_data](auto&& frame) mutable { ARCTICDB_DEBUG(log::version(), "Reduce and fix columns"); return reduce_and_fix_columns(pipeline_context, frame, read_options, handler_data) .via(&async::cpu_executor()) - .thenValue([res_versioned_item, pipeline_context, frame, read_query, shared_data](auto&&) mutable { + .thenValue([versioned_item, pipeline_context, frame, read_query, shared_data](auto&&) mutable { set_row_id_if_index_only(*pipeline_context, frame, *read_query); - return ReadVersionOutput{std::move(res_versioned_item), + return ReadVersionOutput{std::move(versioned_item), {frame, timeseries_descriptor_from_pipeline_context(pipeline_context, {}, pipeline_context->bucketize_dynamic_), {}}}; diff --git a/cpp/arcticdb/version/version_core.hpp b/cpp/arcticdb/version/version_core.hpp index 76605b6f3a..0df98811c9 100644 --- a/cpp/arcticdb/version/version_core.hpp +++ b/cpp/arcticdb/version/version_core.hpp @@ -140,6 +140,8 @@ folly::Future read_multi_key( const SegmentInMemory& index_key_seg, std::any& handler_data); +OutputSchema compute_output_schema(OutputSchema&& input_schema, const ReadQuery& read_query); + folly::Future> schedule_clause_processing( std::shared_ptr component_manager, std::vector>&& segment_and_slice_futures, @@ -200,6 +202,13 @@ void modify_descriptor( const std::shared_ptr& pipeline_context, const ReadOptions& read_options); +void read_indexed_keys_to_pipeline( + const index::IndexSegmentReader& index_segment_reader, + const std::shared_ptr& pipeline_context, + const VersionedItem& version_info, + ReadQuery& read_query, + const ReadOptions& read_options); + void read_indexed_keys_to_pipeline( const std::shared_ptr& store, const std::shared_ptr& pipeline_context, @@ -211,9 +220,20 @@ void add_index_columns_to_query( const ReadQuery& read_query, const TimeseriesDescriptor& desc); +// TODO: It would make more sense to use `AtomKey` instead of a `VersionedItem` but that would require a slightly bigger +// refactor of `get_version_to_read` +struct StoredIndexKey {VersionedItem versioned_item;}; +struct CachedIndexKey { + VersionedItem versioned_item; + // TODO: Maybe const? + std::shared_ptr cached_index; +}; +struct Incompletes {StreamId stream_id;}; +using IndexSource = std::variant; + folly::Future read_frame_for_version( const std::shared_ptr& store, - const std::variant& version_info, + IndexSource&& index_source, const std::shared_ptr& read_query, const ReadOptions& read_options, std::any& handler_data diff --git a/cpp/arcticdb/version/version_store_api.cpp b/cpp/arcticdb/version/version_store_api.cpp index d8cbd24daf..859710c769 100644 --- a/cpp/arcticdb/version/version_store_api.cpp +++ b/cpp/arcticdb/version/version_store_api.cpp @@ -1099,6 +1099,15 @@ std::vector> PythonVersionStore::batch_r return batch_read_descriptor_internal(stream_ids, version_queries, read_options); } +DescriptorItem PythonVersionStore::read_output_schema( + const StreamId& stream_id, + const VersionQuery& version_query, + const ReadQuery& read_query, + const ReadOptions& read_options + ) { + return read_output_schema_internal(stream_id, version_query, read_query, read_options); +} + ReadResult PythonVersionStore::read_index( const StreamId& stream_id, const VersionQuery& version_query diff --git a/cpp/arcticdb/version/version_store_api.hpp b/cpp/arcticdb/version/version_store_api.hpp index 4855f970b6..2f8483e41f 100644 --- a/cpp/arcticdb/version/version_store_api.hpp +++ b/cpp/arcticdb/version/version_store_api.hpp @@ -184,6 +184,12 @@ class PythonVersionStore : public LocalVersionedEngine { const StreamId& stream_id, const VersionQuery& version_query); + DescriptorItem read_output_schema( + const StreamId& stream_id, + const VersionQuery& version_query, + const ReadQuery& read_query, + const ReadOptions& read_options); + ReadResult read_index( const StreamId& stream_id, const VersionQuery& version_query); diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py index 6d86dcdde7..8037e1e9f0 100644 --- a/python/arcticdb/version_store/_store.py +++ b/python/arcticdb/version_store/_store.py @@ -40,19 +40,22 @@ ) from arcticdb_ext.types import IndexKind from arcticdb.version_store.read_result import ReadResult -from arcticdb_ext.version_store import IndexRange as _IndexRange -from arcticdb_ext.version_store import RowRange as _RowRange -from arcticdb_ext.version_store import SignedRowRange as _SignedRowRange -from arcticdb_ext.version_store import PythonVersionStore as _PythonVersionStore -from arcticdb_ext.version_store import PythonVersionStoreReadQuery as _PythonVersionStoreReadQuery -from arcticdb_ext.version_store import PythonVersionStoreUpdateQuery as _PythonVersionStoreUpdateQuery -from arcticdb_ext.version_store import PythonVersionStoreReadOptions as _PythonVersionStoreReadOptions -from arcticdb_ext.version_store import PythonVersionStoreVersionQuery as _PythonVersionStoreVersionQuery -from arcticdb_ext.version_store import ColumnStats as _ColumnStats -from arcticdb_ext.version_store import StreamDescriptorMismatch -from arcticdb_ext.version_store import DataError -from arcticdb_ext.version_store import sorted_value_name -from arcticdb_ext.version_store import OutputFormat +from arcticdb_ext.version_store import ( + IndexRange as _IndexRange, + RowRange as _RowRange, + SignedRowRange as _SignedRowRange, + PythonVersionStore as _PythonVersionStore, + PythonVersionStoreReadQuery as _PythonVersionStoreReadQuery, + PythonVersionStoreUpdateQuery as _PythonVersionStoreUpdateQuery, + PythonVersionStoreReadOptions as _PythonVersionStoreReadOptions, + PythonVersionStoreVersionQuery as _PythonVersionStoreVersionQuery, + ColumnStats as _ColumnStats, + StreamDescriptorMismatch, + DataError, + sorted_value_name, + OutputFormat, + CachedIndex, +) from arcticdb.authorization.permissions import OpenMode from arcticdb.exceptions import ArcticDbNotYetImplemented, ArcticNativeException from arcticdb.flattener import Flattener @@ -1588,7 +1591,7 @@ def batch_restore_version( for result, meta in zip(read_results, metadatas) ] - def _get_version_query(self, as_of: VersionQueryInput, **kwargs): + def _get_version_query(self, as_of: VersionQueryInput, cached_index: Optional[CachedIndex] = None, **kwargs): version_query = _PythonVersionStoreVersionQuery() iterate_snapshots_if_tombstoned = _assume_true("iterate_snapshots_if_tombstoned", kwargs) @@ -1601,6 +1604,9 @@ def _get_version_query(self, as_of: VersionQueryInput, **kwargs): elif as_of is not None: raise ArcticNativeException("Unexpected combination of read parameters") + if cached_index is not None: + version_query._set_cached_index(cached_index) + return version_query def _get_version_queries(self, num_symbols: int, as_ofs: Optional[List[VersionQueryInput]], **kwargs): @@ -1721,8 +1727,8 @@ def _get_read_options(self, **kwargs): read_options.set_incompletes(self.resolve_defaults("incomplete", proto_cfg, global_default=False, **kwargs)) return read_options - def _get_queries(self, as_of, date_range, row_range, columns=None, query_builder=None, **kwargs): - version_query = self._get_version_query(as_of, **kwargs) + def _get_queries(self, as_of, date_range, row_range, columns=None, query_builder=None, cached_index=None, **kwargs): + version_query = self._get_version_query(as_of, cached_index, **kwargs) read_options = self._get_read_options(**kwargs) read_query = self._get_read_query( date_range=date_range, row_range=row_range, columns=columns, query_builder=query_builder @@ -2954,6 +2960,31 @@ def _batch_read_descriptor(self, symbols, as_ofs, throw_on_error, date_range_ns_ description_results.append(self._process_info(symbol, dit, as_of, date_range_ns_precision)) return description_results + def _read_output_schema( + self, + symbol, + as_of, + date_range, + row_range, + columns, + query_builder, + **kwargs, + ): + # TODO: Think about the empty columns munging + # implement_read_index = kwargs.get("implement_read_index", False) + # columns = self._resolve_empty_columns(columns, implement_read_index) + # Take a copy as _get_queries can modify the input argument, which makes reusing the input counter-intuitive + query_builder = copy.deepcopy(query_builder) + version_query, read_options, read_query = self._get_queries( + as_of=as_of, + date_range=date_range, + row_range=row_range, + columns=columns, + query_builder=query_builder, + **kwargs + ) + return self.version_store._read_output_schema(symbol, version_query, read_query, read_options) + def write_metadata( self, symbol: str, metadata: Any, prune_previous_version: Optional[bool] = None ) -> VersionedItem: diff --git a/python/arcticdb/version_store/library.py b/python/arcticdb/version_store/library.py index 3d34855145..4ba2ea7d10 100644 --- a/python/arcticdb/version_store/library.py +++ b/python/arcticdb/version_store/library.py @@ -22,7 +22,7 @@ from arcticdb.version_store.processing import ExpressionNode, QueryBuilder from arcticdb.version_store._store import NativeVersionStore, VersionedItem, VersionQueryInput from arcticdb_ext.exceptions import ArcticException -from arcticdb_ext.version_store import DataError, OutputFormat +from arcticdb_ext.version_store import DataError, CachedIndex, DescriptorItem import pandas as pd import numpy as np import logging @@ -258,6 +258,8 @@ class ReadRequest(NamedTuple): See `read` method. query_builder: Optional[Querybuilder], default=none See `read` method. + cached_index: Optional[CachedIndex], default=none + See `read` method. See Also -------- @@ -270,6 +272,7 @@ class ReadRequest(NamedTuple): row_range: Optional[Tuple[int, int]] = None columns: Optional[List[str]] = None query_builder: Optional[QueryBuilder] = None + cached_index: Optional[CachedIndex] = None def __repr__(self): res = f"ReadRequest(symbol={self.symbol}" @@ -365,6 +368,28 @@ def __init__( self.lib = lib self.read_request = read_request._replace(query_builder=None) + def select_columns(self, columns): + self.read_request = ReadRequest( + symbol=self.read_request.symbol, + as_of=self.read_request.as_of, + date_range=self.read_request.date_range, + row_range=self.read_request.row_range, + columns=columns, + query_builder=self.read_request.query_builder, + cached_index=self.read_request.cached_index, + ) + + def _set_cached_index(self, cached_index): + self.read_request = ReadRequest( + symbol=self.read_request.symbol, + as_of=self.read_request.as_of, + date_range=self.read_request.date_range, + row_range=self.read_request.row_range, + columns=self.read_request.columns, + query_builder=self.read_request.query_builder, + cached_index=cached_index, + ) + def _to_read_request(self) -> ReadRequest: """ Convert this object into a ReadRequest, including any queries applied to this object since the read call. @@ -383,18 +408,50 @@ def _to_read_request(self) -> ReadRequest: row_range=self.read_request.row_range, columns=self.read_request.columns, query_builder=q, + cached_index=self.read_request.cached_index ) - def collect(self) -> VersionedItem: + def collect(self, use_latest_version=False) -> VersionedItem: """ Read the data and execute any queries applied to this object since the read call. + Parameters + ---------- + use_latest_version : `bool`, default=False + Whether to overwrite the cached version to read and collect the latest version of the symbol. + Returns ------- VersionedItem Object that contains a .data and .metadata element. """ - return self.lib.read(**self._to_read_request()._asdict()) + if use_latest_version: + # This is kinda broken to modify in place + self._set_cached_index(None) + vit = self.lib._nvs.read(**self._to_read_request()._asdict()) + # TODO: We probably also want to cache index for multiple collects + return vit + + + def collect_schema(self, use_latest_version=False) -> DescriptorItem: + """ + Read only the schema of the output dataframe. + + Parameters + ---------- + use_latest_version : `bool`, default=False + Whether to overwrite the cached version to read and collect the latest version of the symbol. + + Returns + ------- + DescriptorItem + """ + if use_latest_version: + self._set_cached_index(None) + # TODO: We need to parse output_format and use a variant type for schema (pyarrow.Schema / TimeseriesDescriptor?). + dit = self.lib._nvs._read_output_schema(**self._to_read_request()._asdict()) + self._set_cached_index(dit.cached_index) + return dit def __str__(self) -> str: query_builder_repr = super().__str__() diff --git a/python/tests/unit/arcticdb/version_store/test_lazy_dataframe.py b/python/tests/unit/arcticdb/version_store/test_lazy_dataframe.py index c9d1ee7cbd..5ab7c14995 100644 --- a/python/tests/unit/arcticdb/version_store/test_lazy_dataframe.py +++ b/python/tests/unit/arcticdb/version_store/test_lazy_dataframe.py @@ -12,7 +12,7 @@ from arcticdb import col, LazyDataFrame, LazyDataFrameCollection, QueryBuilder, ReadRequest from arcticdb.util.test import assert_frame_equal - +from arcticdb_ext.types import FieldDescriptor, TypeDescriptor, DataType, Dimension pytestmark = pytest.mark.pipeline @@ -455,3 +455,155 @@ def test_lazy_batch_pickling(lmdb_library): received_roundtripped = roundtripped.collect() for vit in received_roundtripped: assert_frame_equal(expected, vit.data) + + +def to_pd_dtype(type_descriptor): + # TODO: Extend with more types + typ = type_descriptor.data_type() + if typ == DataType.INT64: + return np.int64 + if typ == DataType.FLOAT64: + return np.float64 + if typ == DataType.FLOAT32: + return np.float32 + if typ == DataType.NANOSECONDS_UTC64: + return np.dtype("datetime64[ns]") + if typ == DataType.UTF_DYNAMIC64: + return np.dtype("object") + raise ValueError("Unexpected data type") + + +def assert_lazy_frame_schema_equal(lazy_df, expected_types_and_names, expect_equal_column_order=True, check_against_collected=True): + tsd = lazy_df.collect_schema().timeseries_descriptor + lazy_fields = tsd.fields + + if not expect_equal_column_order: + lazy_fields[1:] = sorted(lazy_fields[1:], key=lambda x: x.name) + expected_types_and_names[1:] = sorted(expected_types_and_names[1:], key=lambda x: x[1]) + + # Assert lazy_fields are same as expected + assert len(lazy_fields) == len(expected_types_and_names) + for lazy_field, (expected_type, expected_name) in zip(lazy_fields, expected_types_and_names): + assert lazy_field.type == TypeDescriptor(expected_type, Dimension.Dim0) + assert lazy_field.name == expected_name + + # Assert lazy_fields are same as collected + if check_against_collected: + collected_df = lazy_df.collect().data + if not expect_equal_column_order: + collected_df = collected_df.reindex(sorted(collected_df.columns), axis=1) + + field_names = [field.name for field in lazy_fields] + field_dtypes = [to_pd_dtype(field.type) for field in lazy_fields] + # TODO: The index name assertion should probably be better. Also add tests for dataframes with index name and other index types + assert field_names[0] == collected_df.index.name or (field_names[0] == "index" and collected_df.index.name is None) + assert field_names[1:] == list(collected_df.columns) + assert field_dtypes[0] == collected_df.index.dtype + print(type(collected_df.dtypes[0]), type(collected_df.index.dtype)) + assert field_dtypes[1:] == list(collected_df.dtypes) + + +def test_lazy_collect_schema_basic(lmdb_library): + lib = lmdb_library + sym = "sym" + num_rows = 10 + df = pd.DataFrame({ + "col_int": np.arange(num_rows, dtype=np.int64), + "col_float": np.arange(num_rows, dtype=np.float64), + "col_float_2": np.arange(num_rows, dtype=np.float32), + "col_str": [f"str_{i//5}" for i in range(num_rows)], + # TODO: Add more types + }, index=pd.date_range(pd.Timestamp(2025, 1, 1), periods=num_rows, freq="s")) + lib.write(sym, df) + + # No query_builder + lazy_df = lib.read(sym, lazy=True) + expected_fields = [ + (DataType.NANOSECONDS_UTC64, "index"), + (DataType.INT64, "col_int"), + (DataType.FLOAT64, "col_float"), + (DataType.FLOAT32, "col_float_2"), + (DataType.UTF_DYNAMIC64, "col_str"), + ] + assert_lazy_frame_schema_equal(lazy_df, expected_fields) + + # With a filter we don't change the expected schema + lazy_df = lazy_df[(lazy_df["col_int"] < 20) & (lazy_df["col_float"] >= 0.4) & (lazy_df["col_str"].isin(["str_0", "str_1"]))] + assert_lazy_frame_schema_equal(lazy_df, expected_fields) + + # With a projection + lazy_df.apply("col_sum", lazy_df["col_int"] + lazy_df["col_float"] / lazy_df["col_float_2"]) + expected_fields = [ + (DataType.NANOSECONDS_UTC64, "index"), + (DataType.INT64, "col_int"), + (DataType.FLOAT64, "col_float"), + (DataType.FLOAT32, "col_float_2"), + (DataType.UTF_DYNAMIC64, "col_str"), + (DataType.FLOAT64, "col_sum"), + ] + assert_lazy_frame_schema_equal(lazy_df, expected_fields) + + # With a columns filter + lazy_df.select_columns(["col_int", "col_float", "col_sum"]) + expected_fields = [ + (DataType.NANOSECONDS_UTC64, "index"), + (DataType.INT64, "col_int"), + (DataType.FLOAT64, "col_float"), + (DataType.FLOAT64, "col_sum"), + ] + # TODO: We should check against collected. Problem will be fixed by 8774208809 + assert_lazy_frame_schema_equal(lazy_df, expected_fields, check_against_collected=False) + + # With resampling + lazy_df.resample("3s").agg({"col_int": "max", "col_float_2": "sum", "col_sum": "mean"}) + lazy_df.select_columns(None) # Remove column filter + expected_fields = [ + (DataType.NANOSECONDS_UTC64, "index"), + (DataType.INT64, "col_int"), + (DataType.FLOAT64, "col_float_2"), + (DataType.FLOAT64, "col_sum"), + ] + assert_lazy_frame_schema_equal(lazy_df, expected_fields, expect_equal_column_order=False) + + # With group by + lazy_df = lib.read(sym, lazy=True) + lazy_df.groupby("col_str").agg({"col_int": "sum", "col_float": "mean", "col_float_2": "mean"}) + expected_fields = [ + (DataType.UTF_DYNAMIC64, "col_str"), + (DataType.INT64, "col_int"), + (DataType.FLOAT64, "col_float"), + (DataType.FLOAT64, "col_float_2"), + ] + assert_lazy_frame_schema_equal(lazy_df, expected_fields, expect_equal_column_order=False) + + +def test_lazy_index_caching(lmdb_library_dynamic_schema): + lib = lmdb_library_dynamic_schema + sym = "sym" + dfs = [ + pd.DataFrame({"col_1": np.arange(3, dtype=np.int32)}, index=pd.date_range(pd.Timestamp(2025, 1, 1), periods=3)), + pd.DataFrame({"col_1": np.arange(3, dtype=np.int64), "col_2": np.arange(3, dtype=np.int64)}, index=pd.date_range(pd.Timestamp(2025, 1, 4), periods=3)), + pd.DataFrame({"col_2": np.arange(3, dtype=np.int32), "col_3": np.arange(3, dtype=np.float32)}, index=pd.date_range(pd.Timestamp(2025, 1, 7), periods=3)), + ] + + lib.write(sym, dfs[0]) + + lazy_df = lib.read(sym, lazy=True) + expected_fields = [ + (DataType.NANOSECONDS_UTC64, "index"), + (DataType.INT32, "col_1"), + ] + assert_lazy_frame_schema_equal(lazy_df, expected_fields, check_against_collected=False) # We don't call collect + + lib.append(sym, dfs[1]) + # The lazy_df should have cached the index and still return the old schema and dataframe + assert_lazy_frame_schema_equal(lazy_df, expected_fields, check_against_collected=False) # We don't call collect + vit = lazy_df.collect() + assert vit.version == 0 + assert_frame_equal(vit.data, dfs[0]) + + # But if we decide to use + vit = lazy_df.collect(use_latest_version=True) + assert vit.version == 1 + assert_frame_equal(vit.data, pd.concat(dfs[:2])) +