Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1830033: Dataframe API improvements #2811

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@
- Fixed a bug in options sql generation that could cause multiple values to be formatted incorrectly.
- Fixed a bug in `Session.catalog` where empty strings for database or schema were not handled correctly and were generating erroneous sql statements.

#### Behavior Changes

- Added new methods in class `DataFrame`:
- `col_regex`: Select columns that match with provided regex.
- `map` and its alias `foreach`: A method to apply user function on each row with 1-1 mapping.
- `flat_map`: A method to apply user function on each row with one to many mapping.
- `toJSON` and its alias `to_json`: Convert each row of dataframe into json string.
- `transform`: Chain multiple transformations on dataframe.
- Removed Snowpark Python function `snowflake_cortex_summarize`. Users can install snowflake-ml-python and use the snowflake.cortex.summarize function instead.
- Removed Snowpark Python function `snowflake_cortex_sentiment`. Users can install snowflake-ml-python and use the snowflake.cortex.sentiment function instead.

#### Experimental Features

- Added support for writing pyarrow Tables to Snowflake tables.
Expand Down Expand Up @@ -103,6 +114,7 @@

#### New Features

- Added support for `DataFrame.summary()` to compute desired statistics of a DataFrame.
- Added support for the following functions in `functions.py`
- `array_reverse`
- `divnull`
Expand Down Expand Up @@ -235,6 +247,7 @@

#### New Features

- Added support for property `version` and class method `get_active_session` for `Session` class.
- Added support for property `version` and class method `get_active_session` for `Session` class.
- Added new methods and variables to enhance data type handling and JSON serialization/deserialization:
- To `DataType`, its derived classes, and `StructField`:
Expand Down
8 changes: 7 additions & 1 deletion docs/source/snowpark/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ DataFrame
DataFrame.fillna
DataFrame.filter
DataFrame.first
DataFrame.flat_map
DataFrame.flatMap
DataFrame.flatten
DataFrame.groupBy
DataFrame.group_by
Expand All @@ -60,6 +62,7 @@ DataFrame
DataFrame.join
DataFrame.join_table_function
DataFrame.limit
DataFrame.map
DataFrame.minus
DataFrame.natural_join
DataFrame.orderBy
Expand All @@ -81,15 +84,19 @@ DataFrame
DataFrame.show
DataFrame.sort
DataFrame.subtract
DataFrame.summary
DataFrame.take
DataFrame.toDF
DataFrame.toJSON
DataFrame.toLocalIterator
DataFrame.toPandas
DataFrame.to_df
DataFrame.to_local_iterator
DataFrame.to_json
DataFrame.to_pandas
DataFrame.to_pandas_batches
DataFrame.to_snowpark_pandas
DataFrame.transform
DataFrame.union
DataFrame.unionAll
DataFrame.unionAllByName
Expand Down Expand Up @@ -119,7 +126,6 @@ DataFrame
DataFrameAnalyticsFunctions.compute_lag
DataFrameAnalyticsFunctions.compute_lead
DataFrameAnalyticsFunctions.time_series_agg
dataframe.map



Expand Down
2 changes: 0 additions & 2 deletions docs/source/snowpark/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,6 @@ Functions
sinh
size
skew
snowflake_cortex_sentiment
snowflake_cortex_summarize
sort_array
soundex
split
Expand Down
17 changes: 14 additions & 3 deletions src/snowflake/snowpark/_internal/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
SubfieldString,
UnresolvedAttribute,
WithinGroup,
UnresolvedColumnRegex,
)
from snowflake.snowpark._internal.analyzer.grouping_set import (
GroupingSet,
Expand Down Expand Up @@ -332,7 +333,7 @@ def analyze(
if isinstance(expr, WindowSpecDefinition):
return window_spec_expression(
[
self.analyze(
self.to_sql_try_avoid_cast(
x, df_aliased_col_name_to_real_col_name, parse_local_name
)
for x in expr.partition_spec
Expand Down Expand Up @@ -424,6 +425,14 @@ def analyze(
]
)

if isinstance(expr, UnresolvedColumnRegex):
return ",".join(
[
self.analyze(e, df_aliased_col_name_to_real_col_name)
for e in expr.expressions
]
)

if isinstance(expr, SnowflakeUDF):
if expr.api_call_source is not None:
self.session._conn._telemetry_client.send_function_usage_telemetry(
Expand Down Expand Up @@ -454,7 +463,7 @@ def analyze(
return table_function_partition_spec(
expr.over,
[
self.analyze(
self.to_sql_try_avoid_cast(
x, df_aliased_col_name_to_real_col_name, parse_local_name
)
for x in expr.partition_spec
Expand Down Expand Up @@ -621,7 +630,9 @@ def table_function_expression_extractor(
"NamedArgumentsTableFunction, GeneratorTableFunction, or FlattenFunction."
)
partition_spec_sql = (
self.analyze(expr.partition_spec, df_aliased_col_name_to_real_col_name)
self.to_sql_try_avoid_cast(
expr.partition_spec, df_aliased_col_name_to_real_col_name
)
if expr.partition_spec
else ""
)
Expand Down
19 changes: 19 additions & 0 deletions src/snowflake/snowpark/_internal/analyzer/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,25 @@ def individual_node_complexity(self) -> Dict[PlanNodeCategory, int]:
)


class UnresolvedColumnRegex(Expression):
def __init__(self, expressions: List[Attribute]) -> None:
super().__init__()
assert len(expressions) > 0
self.expressions = expressions

def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return derive_dependent_columns(*self.expressions)

def dependent_column_names_with_duplication(self) -> List[str]:
return derive_dependent_columns_with_duplication(*self.expressions)

@property
def individual_node_complexity(self) -> Dict[PlanNodeCategory, int]:
# expressions contain column names that match given regex. The generated sql is
# SELECT col1, col2, .... FROM child
return {PlanNodeCategory.COLUMN: len(self.expressions)}


class UnresolvedAttribute(Expression, NamedExpression):
def __init__(
self, name: str, is_sql_text: bool = False, df_alias: Optional[str] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
Star,
UnresolvedAttribute,
derive_dependent_columns,
UnresolvedColumnRegex,
)
from snowflake.snowpark._internal.analyzer.schema_utils import analyze_attributes
from snowflake.snowpark._internal.analyzer.snowflake_plan import Query, SnowflakePlan
Expand Down Expand Up @@ -1801,11 +1802,14 @@ def derive_column_states_from_subquery(
analyzer = from_.analyzer
column_states = ColumnStateDict()
for c in cols:
if isinstance(c, UnresolvedAlias) and isinstance(c.child, Star):
if isinstance(c, UnresolvedAlias) and (
isinstance(c.child, Star) or isinstance(c.child, UnresolvedColumnRegex)
):
if c.child.expressions:
# df.select(df["*"]) will have child expressions. df.select("*") doesn't.
# df.select(df.colRegex(...)) will have a column expressions
columns_from_star = [copy(e) for e in c.child.expressions]
elif c.child.df_alias:
elif isinstance(c.child, Star) and c.child.df_alias:
if c.child.df_alias not in from_.df_aliased_col_name_to_real_col_name:
raise SnowparkClientExceptionMessages.DF_ALIAS_NOT_RECOGNIZED(
c.child.df_alias
Expand Down
Loading
Loading