Skip to content

Enhancement 8277989680: symbol concatenation poc #2142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

alexowens90
Copy link
Collaborator

@alexowens90 alexowens90 commented Jan 27, 2025

Reference Issues/PRs

8277989680

What does this implement or fix?

Implements symbol concatenation. Inner and outer joins over columns both supported. Expected usage:

# Read requests can contain usual as_of, date_range, columns, etc arguments
lazy_dfs = lib.read_batch([read_request_1, read_request_2, ...])
# Potentially apply some processing to all or individual constituent lazy dataframes here, that will be applied before the join
lazy_dfs = lazy_dfs[lazy_dfs["col"].notnull()]
# Join here
lazy_df = adb.concat(lazy_dfs)
# Perform more processing if desired
lazy_df = lazy_df.resample("15min").agg({"col": "mean"})
# Collect result
res = lazy_df.collect()
# res contains a list of VersionedItems from the consituent symbols that went into the join with data=None, and a data member with the joined Series/DataFrame

See test_symbol_concatenation.py for thorough examples of how the API works.
For outer joins, if a column is not present in one of the input symbols, then the same type-specific behaviour as used for dynamic schema is used to backfill the missing values.
Not all symbols can be concatenated together. The following will throw exceptions if attempted to be concatenated:

  • a Series with a DataFrame
  • Different index types, including multiindexes with different numbers of levels
  • Incompatible column types. e.g. if col has type INT64 in one symbol, and is a string column in another symbol. this only applies if the column would be in the result, which is always the case for all columns with an outer join, but may not always be for inner joins.

Where possible, the implementation is permissive with what can be joined with an output as sensible as possible:

  • Joining two or more Series with different names that are otherwise compatible will produce a Series with no name
  • Joining two or more timeseries where the indexes have different names will produce a timeseries with an unnamed index
  • Joining two or more timeseries where the indexes have different timezones will produce a timeseries with a UTC index
  • Joining two or more multiindexed Series/DataFrames where the levels have compatible types but different names will produce a multiindexed Series/DataFrame with unnamed levels where they differed between some of the inputs.
  • Joining two or more Series/DataFrames that all have RangeIndex. If the index step does not match between all of the inputs, then the output will have a RangeIndex with start=0 and step=1. This is different behaviour to Pandas, which converts to an Int64 index in this case. For this reason, a warning is logged when this happens.

The only known major limitation is that all of the symbols being joined together (after any pre-join processing) must fit into memory. Relaxing this constraint would require much more sophisticated query planning than we currently support, in which all of the clauses both for individual symbols pre-join, the join, and any post-join clauses, are all taken into account when scheduling both IO and individual processing tasks.

@alexowens90 alexowens90 marked this pull request as draft January 27, 2025 10:02
@alexowens90 alexowens90 self-assigned this Jan 27, 2025
@alexowens90 alexowens90 added the enhancement New feature or request label Jan 27, 2025
@alexowens90 alexowens90 force-pushed the enhancement/8277989680/symbol-concatenation-poc branch from 831b364 to 6b32843 Compare March 17, 2025 14:12
@alexowens90 alexowens90 force-pushed the enhancement/8277989680/symbol-concatenation-poc branch from 4001a25 to b5843c0 Compare April 16, 2025 15:42
@alexowens90 alexowens90 force-pushed the enhancement/8277989680/symbol-concatenation-poc branch from b5843c0 to 0c81995 Compare April 17, 2025 09:16
@alexowens90 alexowens90 added the minor Feature change, should increase minor version label Apr 17, 2025
@alexowens90 alexowens90 changed the title WIP Enhancement 8277989680: symbol concatenation poc Enhancement 8277989680: symbol concatenation poc Apr 17, 2025
@alexowens90 alexowens90 marked this pull request as ready for review April 17, 2025 15:00
@@ -76,8 +83,14 @@ inline ReadResult create_python_read_result(
util::print_total_mem_usage(__FILE__, __LINE__, __FUNCTION__);

const auto& desc_proto = result.desc_.proto();
std::variant<arcticdb::proto::descriptors::UserDefinedMetadata, std::vector<arcticdb::proto::descriptors::UserDefinedMetadata>> metadata;
if (user_meta.has_value()) {
metadata = *user_meta;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
metadata = *user_meta;
metadata = *std::move(user_meta);

return {version, std::move(python_frame), desc_proto.normalization(),
desc_proto.user_meta(), desc_proto.multi_key_meta(), std::move(result.keys_)};
metadata, desc_proto.multi_key_meta(), std::move(result.keys_)};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can also std::move(metadata) but for it to make sense we have to change ReadResult constructor to either take rvalue ref or forwarding ref or value.

@@ -22,7 +22,14 @@ struct FrameSliceMap {

FrameSliceMap(std::shared_ptr<PipelineContext> context, bool dynamic_schema) :
context_(std::move(context)) {

const entity::StreamDescriptor& descriptor = context_->descriptor();
const auto required_fields_count = [&]() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there any reason not to use ternary if?

const bool first_col_slice = first_col == 0;
// Skip the "true" index fields (i.e. those stored in every column slice) if we are not in the first column slice
// Second condition required to avoid underflow when substracting one unsigned value from another
const bool required_field =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will be a bit easier to read if the expression is split into two variables.

((first_col_slice ? 0 : descriptor.index().field_count()) <= field.index) &&
(required_fields_count >= first_col) &&
(field.index < required_fields_count - first_col);
// If required_field is true, this is a required column in the output. The name in slice stream
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused.

  1. If a field is not required why do we insert it?
  2. Are name mismatches allowed only for index columns?

@@ -657,6 +660,37 @@ std::shared_ptr<std::vector<folly::Future<std::vector<EntityId>>>> schedule_firs
return futures;
}

folly::Future<std::vector<EntityId>> schedule_remaining_iterations(
std::vector<std::vector<EntityId>>&& entity_ids_vec,
std::shared_ptr<std::vector<std::shared_ptr<Clause>>> clauses
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check that clauses != nullptr

auto next_units_of_work = clauses->front()->structure_for_processing(std::move(entity_id_vectors));

std::vector<folly::Future<std::vector<EntityId>>> work_futures;
for(auto&& unit_of_work : next_units_of_work) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&& will have no effect, & is enough.

});
}

return std::move(entity_ids_vec_fut).thenValueInline([](std::vector<std::vector<EntityId>>&& entity_id_vectors) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think there's not need for a lambda

std::move(entity_ids_vec_fut).thenValueInline(flatten_entities))

std::shared_ptr<RowRange>,
std::shared_ptr<ColRange>>(*component_manager, processed_entity_ids);

if (std::any_of(read_query->clauses_.begin(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use std::ranges::any_of to avoid explicitly passing begn/end iterators

* processing unit collected into a single ProcessingUnit. Slices contained within a single ProcessingUnit are processed
* within a single thread.
*
* The processing of a ProcessingUnit is scheduled via the Async Store. Within a single thread, the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that a single thread will read all slices? Wouldn't it be better if try to make this read parallel?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request minor Feature change, should increase minor version
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants