Skip to content

Commit

Permalink
fix: Using BatchStatement instead of execute_concurrent_with_args (#163)
Browse files Browse the repository at this point in the history
* fix: Using BatchStatement instead of execute_concurrent_with_args

* refactor: moving online store override configuration to materialization

---------

Co-authored-by: Bhargav Dodla <[email protected]>
  • Loading branch information
EXPEbdodla and Bhargav Dodla authored Feb 11, 2025
1 parent 38be3e4 commit 5ba2afc
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 256 deletions.
26 changes: 13 additions & 13 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
join_keys=[DUMMY_ENTITY_ID],
)

ONLINE_STORE_TAG_SUFFIX = "online_store_"


@typechecked
class FeatureView(BaseFeatureView):
Expand Down Expand Up @@ -496,20 +498,18 @@ def most_recent_end_time(self) -> Optional[datetime]:
return max([interval[1] for interval in self.materialization_intervals])

@property
def online_store_key_ttl_seconds(self) -> Optional[int]:
def get_online_store_tags(self) -> Dict[str, str]:
"""
Retrieves the online store TTL from the FeatureView's tags.
Retrieves online store specific tags which are prefixed with online_store_.
This helps to identify the overrides provided at feature view level. Not all
online store configurations are relevant for override.
Returns:
An integer representing the TTL in seconds, or None if not set.
A dictionary of tags. If no tags are found, returns an empty dictionary.
"""
ttl_str = self.tags.get("online_store_key_ttl_seconds")
if ttl_str:
try:
return int(ttl_str)
except ValueError:
raise ValueError(
f"Invalid online_store_key_ttl_seconds value '{ttl_str}' in tags. It must be an integer representing seconds."
)
else:
return None
tags = {
k.removeprefix(ONLINE_STORE_TAG_SUFFIX).lower(): v
for k, v in self.tags.items()
if k.lower().startswith(ONLINE_STORE_TAG_SUFFIX)
}
return tags
109 changes: 2 additions & 107 deletions sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import time
from types import MethodType
from typing import List, Optional, Set, Union, no_type_check

import pandas as pd
import pyarrow
from pyspark import SparkContext
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.avro.functions import from_avro
Expand All @@ -25,7 +23,6 @@
)
from feast.infra.provider import get_provider
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping


class SparkProcessorConfig(ProcessorConfig):
Expand Down Expand Up @@ -140,10 +137,7 @@ def ingest_stream_feature_view(
self._create_infra_if_necessary()
ingested_stream_df = self._ingest_stream_data()
transformed_df = self._construct_transformation_plan(ingested_stream_df)
if self.fs.config.provider == "expedia":
online_store_query = self._write_stream_data_expedia(transformed_df, to)
else:
online_store_query = self._write_stream_data(transformed_df, to)
online_store_query = self._write_stream_data(transformed_df, to)
return online_store_query

# In the line 116 of __init__(), the "data_source" is assigned a stream_source (and has to be KafkaSource as in line 80).
Expand Down Expand Up @@ -245,112 +239,14 @@ def _construct_transformation_plan(self, df: StreamTable) -> StreamTable:

if len(drop_list) > 0:
print(
f"INFO!!! Dropping extra columns in the DataFrame: {drop_list}. Avoid unnecessary columns in the dataframe."
f"INFO: Dropping extra columns in the DataFrame: {drop_list}. Avoid unnecessary columns in the dataframe."
)
return df.drop(*drop_list)
else:
raise Exception(f"Stream source is not defined for {self.sfv.name}")
elif isinstance(self.sfv, StreamFeatureView):
return self.sfv.udf.__call__(df) if self.sfv.udf else df

def _write_stream_data_expedia(self, df: StreamTable, to: PushMode):
"""
Ensures materialization logic in sync with stream ingestion.
Support only write to online store. No support for preprocess_fn also.
In Spark 3.2.2, toPandas() is throwing error when the dataframe has Boolean columns.
To fix this error, we need spark 3.4.0 or numpy < 1.20.0 but feast needs numpy >= 1.22.
Switching to use mapInPandas to solve the problem for boolean columns and
toPandas() also load all data into driver's memory.
Error Message:
AttributeError: module 'numpy' has no attribute 'bool'.
`np.bool` was a deprecated alias for the builtin `bool`.
To avoid this error in existing code, use `bool` by itself.
Doing this will not modify any behavior and is safe.
If you specifically wanted the numpy scalar type, use `np.bool_` here.
"""

# TODO: Support writing to offline store and preprocess_fn. Remove _write_stream_data method

# Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema.
def batch_write_pandas_df(iterator, spark_serialized_artifacts, join_keys):
for pdf in iterator:
(
feature_view,
online_store,
repo_config,
) = spark_serialized_artifacts.unserialize()

if isinstance(feature_view, StreamFeatureView):
ts_field = feature_view.timestamp_field
else:
ts_field = feature_view.stream_source.timestamp_field

# Extract the latest feature values for each unique entity row (i.e. the join keys).
pdf = (
pdf.sort_values(by=[*join_keys, ts_field], ascending=False)
.groupby(join_keys)
.nth(0)
)

table = pyarrow.Table.from_pandas(pdf)
if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
)

join_key_to_value_type = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}
rows_to_write = _convert_arrow_to_proto(
table, feature_view, join_key_to_value_type
)
online_store.online_write_batch(
repo_config,
feature_view,
rows_to_write,
lambda x: None,
)

yield pd.DataFrame([pd.Series(range(1, 2))]) # dummy result

def batch_write(
sdf: DataFrame,
batch_id: int,
spark_serialized_artifacts,
join_keys,
feature_view,
):
start_time = time.time()
sdf.mapInPandas(
lambda x: batch_write_pandas_df(
x, spark_serialized_artifacts, join_keys
),
"status int",
).count() # dummy action to force evaluation
print(
f"Time taken to write batch {batch_id} is: {(time.time() - start_time) * 1000:.2f} ms"
)

query = (
df.writeStream.outputMode("update")
.option("checkpointLocation", self.checkpoint_location)
.trigger(processingTime=self.processing_time)
.foreachBatch(
lambda df, batch_id: batch_write(
df,
batch_id,
self.spark_serialized_artifacts,
self.join_keys,
self.sfv,
)
)
.start()
)

query.awaitTermination(timeout=self.query_timeout)
return query

def _write_stream_data(self, df: StreamTable, to: PushMode) -> StreamingQuery:
# Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema.
def batch_write(row: DataFrame, batch_id: int):
Expand All @@ -368,7 +264,6 @@ def batch_write(row: DataFrame, batch_id: int):
.nth(0)
)
# Created column is not used anywhere in the code, but it is added to the dataframe.
# Expedia provider drops the unused columns from dataframe
# Commenting this out as it is not used anywhere in the code
# rows["created"] = pd.to_datetime("now", utc=True)

Expand Down
Loading

0 comments on commit 5ba2afc

Please sign in to comment.