diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 82246d8b3a..ea8bf9b78f 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -8,7 +8,7 @@ import pyarrow from typeguard import typechecked -from feast.base_feature_view import BaseFeatureView +from feast.aggregation import Aggregation from feast.data_source import RequestSource from feast.entity import Entity from feast.errors import RegistryInferenceFailure, SpecifiedFeaturesNotPresentError @@ -42,11 +42,12 @@ @typechecked -class OnDemandFeatureView(BaseFeatureView): +class OnDemandFeatureView(FeatureView): """ - [Experimental] An OnDemandFeatureView defines a logical group of features that are + An OnDemandFeatureView defines a logical group of features that are generated by applying a transformation on a set of input sources, such as feature - views and request data sources. + views and request data sources. Now supports aggregations for enhanced feature + engineering capabilities. Attributes: name: The unique name of the on demand feature view. @@ -56,6 +57,7 @@ class OnDemandFeatureView(BaseFeatureView): source_request_sources: A map from input source names to the actual input sources with type RequestSource. feature_transformation: The user defined transformation. + aggregations: List of aggregations registered with the on demand feature view. description: A human-readable description. tags: A dictionary of key-value pairs to store arbitrary metadata. owner: The owner of the on demand feature view, typically the email of the primary @@ -65,6 +67,7 @@ class OnDemandFeatureView(BaseFeatureView): name: str entities: Optional[List[str]] features: List[Field] + source: Optional[Union[OnDemandSourceType, List[OnDemandSourceType]]] source_feature_view_projections: dict[str, FeatureViewProjection] source_request_sources: dict[str, RequestSource] feature_transformation: Transformation @@ -76,6 +79,7 @@ class OnDemandFeatureView(BaseFeatureView): singleton: bool udf: Optional[FunctionType] udf_string: Optional[str] + aggregations: List[Aggregation] def __init__( # noqa: C901 self, @@ -83,7 +87,8 @@ def __init__( # noqa: C901 name: str, entities: Optional[List[Entity]] = None, schema: Optional[List[Field]] = None, - sources: List[OnDemandSourceType], + source: Optional[Union[OnDemandSourceType, List[OnDemandSourceType]]] = None, + sources: Optional[List[OnDemandSourceType]] = None, udf: Optional[FunctionType] = None, udf_string: Optional[str] = "", feature_transformation: Optional[Transformation] = None, @@ -93,6 +98,7 @@ def __init__( # noqa: C901 owner: str = "", write_to_online_store: bool = False, singleton: bool = False, + aggregations: Optional[List[Aggregation]] = None, ): """ Creates an OnDemandFeatureView object. @@ -102,9 +108,9 @@ def __init__( # noqa: C901 entities (optional): The list of names of entities that this feature view is associated with. schema: The list of features in the output of the on demand feature view, after the transformation has been applied. - sources: A map from input source names to the actual input sources, which may be - feature views, or request data sources. These sources serve as inputs to the udf, - which will refer to them by name. + source: A single source or list of sources for the on demand feature view. Can be + feature views, or request data sources. + sources: [DEPRECATED] Use 'source' instead. Maintained for backward compatibility. udf: The user defined transformation function, which must take pandas dataframes as inputs. udf_string: The source code version of the udf (for diffing and displaying in Web UI) @@ -118,24 +124,52 @@ def __init__( # noqa: C901 the online store for faster retrieval. singleton (optional): A boolean that indicates whether the transformation is executed on a singleton (only applicable when mode="python"). + aggregations (optional): List of aggregations to apply before transformation. """ + + # Handle backward compatibility: sources -> source + if sources is not None and source is not None: + raise ValueError("Cannot specify both 'source' and 'sources'. Use 'source' parameter.") + + if sources is not None: + warnings.warn( + "The 'sources' parameter is deprecated. Use 'source' instead.", + DeprecationWarning, + stacklevel=2 + ) + self.source = sources + elif source is not None: + # Convert single source to list for internal processing + if isinstance(source, list): + self.source = source + else: + self.source = [source] + else: + raise ValueError("Must specify either 'source' or 'sources' parameter.") + + + # Call parent constructor with minimal required params, no source + # We'll override source-related attributes afterward super().__init__( name=name, - features=schema, + source=self.source, + schema=schema, + entities=entities, description=description, tags=tags, owner=owner, + online=write_to_online_store, + offline=False, ) schema = schema or [] self.entities = [e.name for e in entities] if entities else [DUMMY_ENTITY_NAME] - self.sources = sources self.mode = mode.lower() self.udf = udf self.udf_string = udf_string self.source_feature_view_projections: dict[str, FeatureViewProjection] = {} self.source_request_sources: dict[str, RequestSource] = {} - for odfv_source in sources: + for odfv_source in self.source: if isinstance(odfv_source, RequestSource): self.source_request_sources[odfv_source.name] = odfv_source elif isinstance(odfv_source, FeatureViewProjection): @@ -185,6 +219,7 @@ def __init__( # noqa: C901 ) self.write_to_online_store = write_to_online_store self.singleton = singleton + self.aggregations = aggregations or [] if self.singleton and self.mode != "python": raise ValueError("Singleton is only supported for Python mode.") @@ -224,6 +259,7 @@ def __copy__(self): owner=self.owner, write_to_online_store=self.write_to_online_store, singleton=self.singleton, + aggregations=self.aggregations, ) fv.entities = self.entities fv.features = self.features @@ -251,6 +287,7 @@ def __eq__(self, other): or self.write_to_online_store != other.write_to_online_store or sorted(self.entity_columns) != sorted(other.entity_columns) or self.singleton != other.singleton + or self.aggregations != other.aggregations ): return False @@ -260,6 +297,19 @@ def __eq__(self, other): def join_keys(self) -> List[str]: """Returns a list of all the join keys.""" return [entity.name for entity in self.entity_columns] + + @property + def sources(self) -> List[OnDemandSourceType]: + """ + Returns the list of sources for this OnDemandFeatureView. + This maintains compatibility with existing code. + """ + return self.source + + @sources.setter + def sources(self, value: List[OnDemandSourceType]) -> None: + """Setter for backward compatibility.""" + self.source = value @property def schema(self) -> List[Field]: @@ -512,11 +562,242 @@ def get_request_data_schema(self) -> dict[str, ValueType]: def _get_projected_feature_name(self, feature: str) -> str: return f"{self.projection.name_to_use()}__{feature}" + def _build_source_mappings(self) -> dict[str, str]: + """ + Build clean mapping from source columns to UDF expected names. + + Returns: + Dict mapping UDF expected column name -> actual source column name + """ + mappings = {} + + # Handle FeatureView projections + for source_name, projection in self.source_feature_view_projections.items(): + for feature in projection.features: + udf_expected_name = feature.name # UDF expects simple name like 'age' + full_source_name = f"{source_name}__{feature.name}" # Source might have 'customer_fv__age' + + # Map UDF expected name to the source column name it should come from + mappings[udf_expected_name] = full_source_name + + # Handle RequestSources - these typically use simple names already + for source_name, request_source in self.source_request_sources.items(): + for field in request_source.schema: + # RequestSource features usually already have simple names + mappings[field.name] = field.name + + return mappings + + def _apply_source_mappings(self, pa_table: pyarrow.Table) -> pyarrow.Table: + """ + Apply source column mappings to create a clean input table for UDF execution. + + Instead of duplicating columns, this creates a new table with only the columns + the UDF expects, mapped from the appropriate source columns. + + Args: + pa_table: Input table with mixed column naming conventions + + Returns: + Clean table with consistent column naming for UDF execution + """ + source_mappings = self._build_source_mappings() + + # Build new table with clean column mapping + mapped_columns = [] + mapped_names = [] + + for udf_column_name, source_column_name in source_mappings.items(): + # Try to find the source column in the input table + available_column = None + + # First, try the exact source column name + if source_column_name in pa_table.column_names: + available_column = source_column_name + # If not found, try the simple name (for backward compatibility) + elif udf_column_name in pa_table.column_names: + available_column = udf_column_name + # Finally, try variations (e.g., if source_column_name is full but table has simple) + else: + # Look for any column that ends with the expected name + for col_name in pa_table.column_names: + if col_name.endswith(f"__{udf_column_name}") or col_name == udf_column_name: + available_column = col_name + break + + # If we found a matching column, add it to the mapped table + if available_column: + mapped_columns.append(pa_table[available_column]) + mapped_names.append(udf_column_name) # Always use the UDF expected name + + # Create new table with clean, consistent naming + if mapped_columns: + return pyarrow.Table.from_arrays(mapped_columns, names=mapped_names) + else: + # Return empty table with empty schema if no columns found + return pyarrow.table({}) + + def _apply_source_mappings_dict(self, feature_dict: dict[str, Any]) -> dict[str, Any]: + """ + Apply source column mappings to create a clean input dict for UDF execution. + + Args: + feature_dict: Input dict with mixed column naming conventions + + Returns: + Clean dict with consistent column naming for UDF execution + """ + source_mappings = self._build_source_mappings() + clean_dict = {} + + for udf_column_name, source_column_name in source_mappings.items(): + # Try to find the source column in the input dict + available_value = None + + # First, try the exact source column name + if source_column_name in feature_dict: + available_value = feature_dict[source_column_name] + # If not found, try the simple name (for backward compatibility) + elif udf_column_name in feature_dict: + available_value = feature_dict[udf_column_name] + # Finally, try variations + else: + # Look for any key that ends with the expected name + for key in feature_dict.keys(): + if key.endswith(f"__{udf_column_name}") or key == udf_column_name: + available_value = feature_dict[key] + break + + # If we found a matching value, add it to the cleaned dict + if available_value is not None: + clean_dict[udf_column_name] = available_value + + return clean_dict + + def _apply_source_mappings_ibis(self, ibis_table): + """ + Apply source column mappings to create a clean input table for UDF execution. + + Args: + ibis_table: Input ibis table with mixed column naming conventions + + Returns: + Clean ibis table with consistent column naming for UDF execution + """ + source_mappings = self._build_source_mappings() + + # Build column selection and renaming for ibis + select_exprs = {} + + for udf_column_name, source_column_name in source_mappings.items(): + # Try to find the source column in the input table + available_column = None + + # First, try the exact source column name + if source_column_name in ibis_table.columns: + available_column = source_column_name + # If not found, try the simple name (for backward compatibility) + elif udf_column_name in ibis_table.columns: + available_column = udf_column_name + # Finally, try variations + else: + # Look for any column that ends with the expected name + for col_name in ibis_table.columns: + if col_name.endswith(f"__{udf_column_name}") or col_name == udf_column_name: + available_column = col_name + break + + # If we found a matching column, add it to the selection + if available_column: + select_exprs[udf_column_name] = ibis_table[available_column] + + # Create new table with clean, consistent naming + if select_exprs: + return ibis_table.select(**select_exprs) + else: + # Return table with no columns if no matches found + return ibis_table.select() + + def _apply_output_formatting_ibis(self, transformed_table, full_feature_names: bool = False): + """ + Apply final output column naming for ibis tables based on preferences. + + Args: + transformed_table: Ibis table output from UDF execution + full_feature_names: Whether to use fully qualified names + + Returns: + Ibis table with appropriately named output columns + """ + if not full_feature_names: + return transformed_table + + # Apply full feature naming + rename_mapping = {} + for feature in self.features: + short_name = feature.name + long_name = self._get_projected_feature_name(feature.name) + if short_name in transformed_table.columns: + rename_mapping[long_name] = transformed_table[short_name] + + if rename_mapping: + # Add non-renamed columns + for col in transformed_table.columns: + found_in_mapping = False + for feature in self.features: + if feature.name == col: + found_in_mapping = True + break + if not found_in_mapping: + rename_mapping[col] = transformed_table[col] + + return transformed_table.select(**rename_mapping) + + return transformed_table + + def _apply_output_formatting(self, transformed_table: pyarrow.Table, full_feature_names: bool = False) -> pyarrow.Table: + """ + Apply final output column naming based on preferences. + + Args: + transformed_table: Table output from UDF execution + full_feature_names: Whether to use fully qualified names + + Returns: + Table with appropriately named output columns + """ + if not full_feature_names: + return transformed_table + + # Apply full feature naming + rename_mapping = {} + for feature in self.features: + short_name = feature.name + long_name = self._get_projected_feature_name(feature.name) + if short_name in transformed_table.column_names: + rename_mapping[short_name] = long_name + + if rename_mapping: + new_names = [rename_mapping.get(name, name) for name in transformed_table.column_names] + return transformed_table.rename_columns(new_names) + + return transformed_table + def transform_ibis( self, ibis_table, full_feature_names: bool = False, ): + """ + Transform Ibis table using clean source mapping architecture. + + Args: + ibis_table: Input ibis table with potentially mixed column naming conventions + full_feature_names: Whether to use fully qualified output names + + Returns: + Transformed ibis table with clean, consistent naming + """ from ibis.expr.types import Table if not isinstance(ibis_table, Table): @@ -527,121 +808,67 @@ def transform_ibis( "The feature_transformation is not SubstraitTransformation type while calling transform_ibis()." ) - columns_to_cleanup = [] - for source_fv_projection in self.source_feature_view_projections.values(): - for feature in source_fv_projection.features: - full_feature_ref = f"{source_fv_projection.name}__{feature.name}" - if full_feature_ref in ibis_table.columns: - # Make sure the partial feature name is always present - ibis_table = ibis_table.mutate( - **{feature.name: ibis_table[full_feature_ref]} - ) - columns_to_cleanup.append(feature.name) - elif feature.name in ibis_table.columns: - ibis_table = ibis_table.mutate( - **{full_feature_ref: ibis_table[feature.name]} - ) - columns_to_cleanup.append(full_feature_ref) - - transformed_table = self.feature_transformation.transform_ibis(ibis_table) - - transformed_table = transformed_table.drop(*columns_to_cleanup) - - rename_columns: dict[str, str] = {} - for feature in self.features: - short_name = feature.name - long_name = self._get_projected_feature_name(feature.name) - if short_name in transformed_table.columns and full_feature_names: - rename_columns[short_name] = long_name - elif not full_feature_names: - rename_columns[long_name] = short_name - - for rename_from, rename_to in rename_columns.items(): - if rename_from in transformed_table.columns: - transformed_table = transformed_table.rename(**{rename_to: rename_from}) - - return transformed_table + # Stage 1: Apply source mappings to create clean input for UDF + clean_input_table = self._apply_source_mappings_ibis(ibis_table) + + # Stage 2: Apply UDF transformation + transformed_table = self.feature_transformation.transform_ibis(clean_input_table) + + # Stage 3: Apply output formatting based on preference + return self._apply_output_formatting_ibis(transformed_table, full_feature_names) def transform_arrow( self, pa_table: pyarrow.Table, full_feature_names: bool = False, ) -> pyarrow.Table: + """ + Transform PyArrow table using clean source mapping architecture. + + Args: + pa_table: Input table with potentially mixed column naming conventions + full_feature_names: Whether to use fully qualified output names + + Returns: + Transformed table with clean, consistent naming + """ if not isinstance(pa_table, pyarrow.Table): raise TypeError("transform_arrow only accepts pyarrow.Table") - columns_to_cleanup = [] - for source_fv_projection in self.source_feature_view_projections.values(): - for feature in source_fv_projection.features: - full_feature_ref = f"{source_fv_projection.name}__{feature.name}" - if full_feature_ref in pa_table.column_names: - # Make sure the partial feature name is always present - pa_table = pa_table.append_column( - feature.name, pa_table[full_feature_ref] - ) - columns_to_cleanup.append(feature.name) - elif feature.name in pa_table.column_names: - # Make sure the full feature name is always present - pa_table = pa_table.append_column( - full_feature_ref, pa_table[feature.name] - ) - columns_to_cleanup.append(full_feature_ref) - - df_with_transformed_features: pyarrow.Table = ( - self.feature_transformation.transform_arrow(pa_table, self.features) - ) - - # Work out whether the correct columns names are used. - rename_columns: dict[str, str] = {} - for feature in self.features: - short_name = feature.name - long_name = self._get_projected_feature_name(feature.name) - if ( - short_name in df_with_transformed_features.column_names - and full_feature_names - ): - rename_columns[short_name] = long_name - elif not full_feature_names: - rename_columns[long_name] = short_name - - # Cleanup extra columns used for transformation - for col in columns_to_cleanup: - if col in df_with_transformed_features.column_names: - df_with_transformed_features = df_with_transformed_features.drop(col) - return df_with_transformed_features.rename_columns( - [ - rename_columns.get(c, c) - for c in df_with_transformed_features.column_names - ] + + # Stage 1: Apply source mappings to create clean input for UDF + clean_input_table = self._apply_source_mappings(pa_table) + + # Stage 2: Apply UDF transformation + transformed_table = self.feature_transformation.transform_arrow( + clean_input_table, self.features ) + + # Stage 3: Apply output formatting based on preference + return self._apply_output_formatting(transformed_table, full_feature_names) def transform_dict( self, feature_dict: dict[str, Any], # type: ignore ) -> dict[str, Any]: - # we need a mapping from full feature name to short and back to do a renaming - # The simplest thing to do is to make the full reference, copy the columns with the short reference - # and rerun - columns_to_cleanup: list[str] = [] - for source_fv_projection in self.source_feature_view_projections.values(): - for feature in source_fv_projection.features: - full_feature_ref = f"{source_fv_projection.name}__{feature.name}" - if full_feature_ref in feature_dict.keys(): - # Make sure the partial feature name is always present - feature_dict[feature.name] = feature_dict[full_feature_ref] - columns_to_cleanup.append(str(feature.name)) - elif feature.name in feature_dict.keys(): - # Make sure the full feature name is always present - feature_dict[full_feature_ref] = feature_dict[feature.name] - columns_to_cleanup.append(str(full_feature_ref)) - + """ + Transform dict using clean source mapping architecture. + + Args: + feature_dict: Input dict with potentially mixed column naming conventions + + Returns: + Transformed dict with clean, consistent naming + """ + # Stage 1: Apply source mappings to create clean input for UDF + clean_input_dict = self._apply_source_mappings_dict(feature_dict) + + # Stage 2: Apply UDF transformation if self.singleton and self.mode == "python": - output_dict: dict[str, Any] = ( - self.feature_transformation.transform_singleton(feature_dict) - ) + output_dict = self.feature_transformation.transform_singleton(clean_input_dict) else: - output_dict = self.feature_transformation.transform(feature_dict) - for feature_name in columns_to_cleanup: - del output_dict[feature_name] + output_dict = self.feature_transformation.transform(clean_input_dict) + + # Stage 3: No special output formatting needed for dict - UDF already produces correct names return output_dict def infer_features(self) -> None: