Skip to content

TODO: lazy-collect-schema #2270

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cpp/arcticdb/entity/descriptor_item.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ struct DescriptorItem {
entity::AtomKey &&key,
std::optional<timestamp> start_index,
std::optional<timestamp> end_index,
std::optional<TimeseriesDescriptor> timeseries_descriptor) :
std::optional<TimeseriesDescriptor> timeseries_descriptor,
std::optional<std::shared_ptr<index::CachedIndex>> cached_index
) :

key_(std::move(key)),
start_index_(start_index),
end_index_(end_index),
timeseries_descriptor_(std::move(timeseries_descriptor)) {
timeseries_descriptor_(std::move(timeseries_descriptor)),
cached_index_(std::move(cached_index)) {
}

DescriptorItem() = delete;
Expand All @@ -32,12 +35,14 @@ struct DescriptorItem {
std::optional<timestamp> start_index_;
std::optional<timestamp> end_index_;
std::optional<TimeseriesDescriptor> timeseries_descriptor_;
std::optional<std::shared_ptr<index::CachedIndex>> cached_index_;

std::string symbol() const { return fmt::format("{}", key_.id()); }
uint64_t version() const { return key_.version_id(); }
timestamp creation_ts() const { return key_.creation_ts(); }
std::optional<timestamp> start_index() const { return start_index_; }
std::optional<timestamp> end_index() const { return end_index_; }
std::optional<TimeseriesDescriptor> timeseries_descriptor() const { return timeseries_descriptor_; }
std::optional<std::shared_ptr<index::CachedIndex>> cached_index() const { return cached_index_; }
};
}
8 changes: 8 additions & 0 deletions cpp/arcticdb/entity/timeseries_descriptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ struct TimeseriesDescriptor {
*proto_->mutable_multi_key_meta() = std::move(multi_key_meta);
}

void set_output_schema(OutputSchema&& output_schema) {
this->set_normalization_metadata(std::move(output_schema.norm_metadata_));
const auto& stream_descriptor = output_schema.stream_descriptor();
segment_desc_ = stream_descriptor.segment_desc_;
fields_ = stream_descriptor.fields_;
stream_id_ = stream_descriptor.stream_id_;
}

[[nodiscard]] std::shared_ptr<FieldCollection> fields_ptr() const {
return fields_;
}
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/pipeline/index_segment_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ bool IndexSegmentReader::empty() const {
return seg_.empty();
}

// TODO: Remove
IndexRange get_index_segment_range(
const AtomKey& prev_index,
const std::shared_ptr<Store>& store) {
Expand Down
11 changes: 11 additions & 0 deletions cpp/arcticdb/pipeline/index_segment_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,15 @@ IndexRange get_index_segment_range(

void check_column_and_date_range_filterable(const IndexSegmentReader& index_segment_reader, const ReadQuery& read_query);

// TODO: Consider renaming to IndexKeyAndSegment or something. It is not used only as a cache but as intermediate steps in some calls e.g. get_descriptor.
// Can be used to cache index key and index segment between runs, to avoid re-reading from storage
struct CachedIndex {
AtomKey index_key;
IndexSegmentReader isr;

CachedIndex(AtomKey&& index_key, SegmentInMemory&& seg) : index_key(std::move(index_key)), isr(std::move(seg)) {
util::check(index_key.type() == KeyType::TABLE_INDEX, "Expected an index key in CachedIndex but got {}", index_key.type());
}
};

} // namespace arcticdb::pipelines::index
11 changes: 11 additions & 0 deletions cpp/arcticdb/pipeline/query.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,28 @@ using VersionQueryType = std::variant<

struct VersionQuery {
VersionQueryType content_;
// TODO: Doc explaining this
// We can't escape from the shared pointer because this needs to be able to live in python
// TODO: Think about const
std::optional<std::shared_ptr<index::CachedIndex>> cached_index_;

void set_snap_name(const std::string& snap_name) {
content_ = SnapshotVersionQuery{snap_name};
cached_index_ = std::nullopt;
}

void set_timestamp(timestamp ts, bool iterate_snapshots_if_tombstoned) {
content_ = TimestampVersionQuery{ts, iterate_snapshots_if_tombstoned};
cached_index_ = std::nullopt;
}

void set_version(SignedVersionId version, bool iterate_snapshots_if_tombstoned) {
content_ = SpecificVersionQuery{version, iterate_snapshots_if_tombstoned};
cached_index_ = std::nullopt;
}

void set_cached_index(std::shared_ptr<index::CachedIndex> cached_index) {
cached_index_ = cached_index;
}
};

Expand Down
1 change: 0 additions & 1 deletion cpp/arcticdb/processing/operation_dispatch_binary.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,6 @@ VariantData binary_operator(const ColumnWithStrings& col, const Value& val, Func
details::visit_type(val.type().data_type(), [&](auto val_tag) {
using val_type_info = ScalarTypeInfo<decltype(val_tag)>;
if constexpr(!is_numeric_type(col_type_info::data_type) || !is_numeric_type(val_type_info::data_type)) {
std::string error_message;
user_input::raise<ErrorCode::E_INVALID_USER_ARGUMENT>("Non-numeric type provided to binary operation: {}",
binary_operation_with_types_to_string(
col.column_name_,
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/file/file_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,6 @@ version_store::ReadVersionOutput read_dataframe_from_file_internal(
const auto header_offset = key_data.key_offset_ + key_data.key_size_;
ARCTICDB_DEBUG(log::storage(), "Got header offset at {}", header_offset);
single_file_storage->load_header(header_offset, data_end - header_offset);
return version_store::read_frame_for_version(store, versioned_item, read_query, read_options, handler_data).get();
return version_store::read_frame_for_version(store, version_store::StoredIndexKey{versioned_item}, read_query, read_options, handler_data).get();
}
} //namespace arcticdb
137 changes: 90 additions & 47 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ std::optional<VersionedItem> LocalVersionedEngine::get_version_to_read(
const StreamId &stream_id,
const VersionQuery &version_query
) {
if (version_query.cached_index_.has_value()) {
auto index_key = version_query.cached_index_.value()->index_key;
return VersionedItem{std::move(index_key)};
}
return util::variant_match(version_query.content_,
[&stream_id, &version_query, this](const SpecificVersionQuery &specific) {
return get_specific_version(stream_id, specific.version_id_, version_query);
Expand All @@ -392,6 +396,7 @@ std::optional<VersionedItem> LocalVersionedEngine::get_version_to_read(
);
}

// TODO: Remove this, it is unused
IndexRange LocalVersionedEngine::get_index_range(
const StreamId &stream_id,
const VersionQuery& version_query) {
Expand All @@ -402,24 +407,28 @@ IndexRange LocalVersionedEngine::get_index_range(
return index::get_index_segment_range(version->key_, store());
}

std::variant<VersionedItem, StreamId> get_version_identifier(

IndexSource LocalVersionedEngine::get_index_source(
const StreamId& stream_id,
const VersionQuery& version_query,
const ReadOptions& read_options,
const std::optional<VersionedItem>& version) {
const ReadOptions& read_options) {
auto version = get_version_to_read(stream_id, version_query);
if (!version) {
if (opt_false(read_options.incompletes())) {
log::version().warn("No index: Key not found for {}, will attempt to use incomplete segments.", stream_id);
return stream_id;
return Incompletes{stream_id};
} else {
missing_data::raise<ErrorCode::E_NO_SUCH_VERSION>(
"read_dataframe_version: version matching query '{}' not found for symbol '{}'",
"get_index_source: version matching query '{}' not found for symbol '{}'",
version_query,
stream_id
);
}
}
return *version;
if (version_query.cached_index_.has_value()) {
return CachedIndexKey{std::move(version.value()), version_query.cached_index_.value()};
}
return StoredIndexKey{std::move(version.value())};
}

ReadVersionOutput LocalVersionedEngine::read_dataframe_version_internal(
Expand All @@ -429,43 +438,53 @@ ReadVersionOutput LocalVersionedEngine::read_dataframe_version_internal(
const ReadOptions& read_options,
std::any& handler_data) {
py::gil_scoped_release release_gil;
auto version = get_version_to_read(stream_id, version_query);
const auto identifier = get_version_identifier(stream_id, version_query, read_options, version);
return read_frame_for_version(store(), identifier, read_query, read_options, handler_data).get();
auto index_source = get_index_source(stream_id, version_query, read_options);
return read_frame_for_version(store(), std::move(index_source), read_query, read_options, handler_data).get();
}

folly::Future<DescriptorItem> LocalVersionedEngine::get_descriptor(
AtomKey&& k){
const auto key = std::move(k);
return store()->read(key)
.thenValue([](auto&& key_seg_pair) -> DescriptorItem {
auto key = to_atom(std::move(key_seg_pair.first));
auto seg = std::move(key_seg_pair.second);
std::optional<TimeseriesDescriptor> timeseries_descriptor;
if (seg.has_index_descriptor())
timeseries_descriptor.emplace(seg.index_descriptor());

std::optional<timestamp> start_index;
std::optional<timestamp> end_index;
if (seg.row_count() > 0) {
const auto& start_index_column = seg.column(position_t(index::Fields::start_index));
details::visit_type(start_index_column.type().data_type(), [&start_index_column, &start_index](auto column_desc_tag) {
using type_info = ScalarTypeInfo<decltype(column_desc_tag)>;
if constexpr (is_time_type(type_info::data_type)) {
start_index = start_index_column.template scalar_at<timestamp>(0);
}
});
DescriptorItem get_descriptor_from_index(std::shared_ptr<index::CachedIndex> index) {
const auto& isr = index->isr;
auto timeseries_descriptor = isr.tsd();

const auto& end_index_column = seg.column(position_t(index::Fields::end_index));
details::visit_type(end_index_column.type().data_type(), [&end_index_column, &end_index, row_count=seg.row_count()](auto column_desc_tag) {
using type_info = ScalarTypeInfo<decltype(column_desc_tag)>;
if constexpr (is_time_type(type_info::data_type)) {
// -1 as the end timestamp in the data keys is one nanosecond greater than the last value in the index column
end_index = *end_index_column.template scalar_at<timestamp>(row_count - 1) - 1;
}
});
std::optional<timestamp> start_index;
std::optional<timestamp> end_index;
if (isr.size() > 0) {
auto start_index_value = isr.begin()->key().start_index();
if (std::holds_alternative<timestamp>(start_index_value)) {
start_index = std::get<timestamp>(start_index_value);
}
auto last_index_value = isr.last()->key().end_index();
if (std::holds_alternative<timestamp>(last_index_value)) {
end_index = std::get<timestamp>(last_index_value);
}
return DescriptorItem{std::move(key), start_index, end_index, std::move(timeseries_descriptor)};
}
auto key = index->index_key;
return DescriptorItem{std::move(key), start_index, end_index, std::move(timeseries_descriptor), index};
}

folly::Future<DescriptorItem> LocalVersionedEngine::get_descriptor(
IndexSource&& index_source) {
auto index = util::variant_match(
index_source,
[this] (StoredIndexKey& source) -> folly::Future<std::shared_ptr<index::CachedIndex>> {
auto key = source.versioned_item.key_;
return this->store()->read(std::move(key))
.thenValue([](auto&& key_seg_pair) {
auto key = to_atom(std::move(key_seg_pair.first));
auto seg = std::move(key_seg_pair.second);
return std::make_shared<index::CachedIndex>(std::move(key), std::move(seg));
});
},
[] (CachedIndexKey& source) -> folly::Future<std::shared_ptr<index::CachedIndex>> {
return {std::move(source.cached_index)};
},
[] (Incompletes& source) -> folly::Future<std::shared_ptr<index::CachedIndex>> {
// TODO: Currently there is no user facing API to get the schema with incomplete=True. If we decide to
// introduce this we'll need to deduplicate this logic with `read_incompletes_to_pipeline`
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("User reading only the desriptor of incompletes is not implemented for {}", source.stream_id);
});
return std::move(index).thenValue([](auto&& index) -> DescriptorItem {
return get_descriptor_from_index(index);
});
}

Expand All @@ -477,7 +496,7 @@ folly::Future<DescriptorItem> LocalVersionedEngine::get_descriptor_async(
.thenValue([this, &stream_id, &version_query](std::optional<AtomKey>&& opt_index_key){
missing_data::check<ErrorCode::E_NO_SUCH_VERSION>(opt_index_key.has_value(),
"Unable to retrieve descriptor data. {}@{}: version not found", stream_id, version_query);
return get_descriptor(std::move(*opt_index_key));
return get_descriptor(StoredIndexKey{VersionedItem{std::move(*opt_index_key)}});
}).via(&async::cpu_executor());
}

Expand All @@ -486,18 +505,18 @@ DescriptorItem LocalVersionedEngine::read_descriptor_internal(
const VersionQuery& version_query
) {
ARCTICDB_SAMPLE(ReadDescriptor, 0)
// if (version_query.cached_index_)
auto version = get_version_to_read(stream_id, version_query);
missing_data::check<ErrorCode::E_NO_SUCH_VERSION>(version.has_value(),
"Unable to retrieve descriptor data. {}@{}: version not found", stream_id, version_query);
return get_descriptor(std::move(version->key_)).get();
return get_descriptor(StoredIndexKey{std::move(version.value())}).get();
}


std::vector<std::variant<DescriptorItem, DataError>> LocalVersionedEngine::batch_read_descriptor_internal(
const std::vector<StreamId>& stream_ids,
const std::vector<VersionQuery>& version_queries,
const ReadOptions& read_options) {

internal::check<ErrorCode::E_ASSERTION_FAILURE>(read_options.batch_throw_on_error().has_value(),
"ReadOptions::batch_throw_on_error_ should always be set here");

Expand All @@ -513,6 +532,30 @@ std::vector<std::variant<DescriptorItem, DataError>> LocalVersionedEngine::batch
return transform_batch_items_or_throw(std::move(descriptors), stream_ids, flags, version_queries);
}

// TODO: Maybe should live in a different place
TimeseriesDescriptor compute_output_timeseries_descriptor(const TimeseriesDescriptor& input_tsd, const ReadQuery& read_query) {
auto tsd = input_tsd.clone();
auto input_schema = OutputSchema(tsd.as_stream_descriptor(), tsd.normalization());
auto output_schema = compute_output_schema(std::move(input_schema), read_query);
tsd.set_output_schema(std::move(output_schema));
return tsd;
}

// TODO: Think whether this doesn't make sense to become part of read_descriptor with optional ReadQuery argument.
DescriptorItem LocalVersionedEngine::read_output_schema_internal(
const StreamId& stream_id,
const VersionQuery& version_query,
[[maybe_unused]] const ReadQuery& read_query,
const ReadOptions& read_options) {
auto index_source = get_index_source(stream_id, version_query, read_options);
auto descriptor = get_descriptor(std::move(index_source)).get();
if (descriptor.timeseries_descriptor_.has_value()) {
auto output_tsd = compute_output_timeseries_descriptor(descriptor.timeseries_descriptor_.value(), read_query);
descriptor.timeseries_descriptor_ = output_tsd;
}
return descriptor;
}

void LocalVersionedEngine::flush_version_map() {
version_map()->flush();
}
Expand Down Expand Up @@ -1106,7 +1149,7 @@ std::vector<ReadVersionOutput> LocalVersionedEngine::batch_read_keys(const std::
std::vector<folly::Future<ReadVersionOutput>> res;
res.reserve(keys.size());
for (const auto& index_key: keys) {
res.emplace_back(read_frame_for_version(store(), {index_key}, std::make_shared<ReadQuery>(), ReadOptions{}, handler_data));
res.emplace_back(read_frame_for_version(store(), StoredIndexKey{VersionedItem{index_key}}, std::make_shared<ReadQuery>(), ReadOptions{}, handler_data));
}
Allocator::instance()->trim();
return folly::collect(res).get();
Expand Down Expand Up @@ -1139,19 +1182,19 @@ std::vector<std::variant<ReadVersionOutput, DataError>> LocalVersionedEngine::ba
read_query = read_queries.empty() ? std::make_shared<ReadQuery>(): read_queries[idx],
&read_options,
&handler_data](auto&& opt_index_key) {
std::variant<VersionedItem, StreamId> version_info;
IndexSource index_source;
if (opt_index_key.has_value()) {
version_info = VersionedItem(std::move(*opt_index_key));
index_source = StoredIndexKey{VersionedItem(std::move(*opt_index_key))};
} else {
if (opt_false(read_options.incompletes())) {
log::version().warn("No index: Key not found for {}, will attempt to use incomplete segments.", stream_ids[idx]);
version_info = stream_ids[idx];
index_source = Incompletes{stream_ids[idx]};
} else {
missing_data::raise<ErrorCode::E_NO_SUCH_VERSION>(
"batch_read_internal: version matching query '{}' not found for symbol '{}'", version_queries[idx], stream_ids[idx]);
}
}
return read_frame_for_version(store, version_info, read_query, read_options, handler_data);
return read_frame_for_version(store, std::move(index_source), read_query, read_options, handler_data);
})
);
if(++batch_count == static_cast<size_t>(max_batch_size)) {
Expand Down
13 changes: 12 additions & 1 deletion cpp/arcticdb/version/local_versioned_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ class LocalVersionedEngine : public VersionedEngine {
const VersionQuery& version_query
);

IndexSource get_index_source(
const StreamId& stream_id,
const VersionQuery& version_query,
const ReadOptions& read_options);

ReadVersionOutput read_dataframe_version_internal(
const StreamId &stream_id,
const VersionQuery& version_query,
Expand All @@ -148,6 +153,12 @@ class LocalVersionedEngine : public VersionedEngine {
const StreamId& stream_id,
const VersionQuery& version_query);

DescriptorItem read_output_schema_internal(
const StreamId& stream_id,
const VersionQuery& version_query,
const ReadQuery& read_query,
const ReadOptions& read_options);

void write_parallel_frame(
const StreamId& stream_id,
const std::shared_ptr<InputTensorFrame>& frame,
Expand Down Expand Up @@ -196,7 +207,7 @@ class LocalVersionedEngine : public VersionedEngine {
const VersionQuery& version_query);

folly::Future<DescriptorItem> get_descriptor(
AtomKey&& key);
IndexSource&& index_source);

folly::Future<DescriptorItem> get_descriptor_async(
folly::Future<std::optional<AtomKey>>&& opt_index_key_fut,
Expand Down
Loading
Loading