Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1962180: Support relaxed pandas in pd.read_snowflake for query data sources #3164

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#### Improvements

- Support relaxed consistency and ordering guarantees in `pd.read_snowflake` for non-query data sources.
- Support relaxed consistency and ordering guarantees in `pd.read_snowflake` for query data sources.

## 1.29.0 (2025-03-05)

Expand Down
23 changes: 17 additions & 6 deletions src/snowflake/snowpark/modin/plugin/_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ def create_initial_ordered_dataframe(
table_name_or_query
)
is_query = not _is_table_name(table_name_or_query)
if not is_query:
if not is_query or relaxed_ordering:
if not relaxed_ordering:
try:
readonly_table_name = _create_read_only_table(
Expand Down Expand Up @@ -391,9 +391,24 @@ def create_initial_ordered_dataframe(
error_code=SnowparkPandasErrorCode.GENERAL_SQL_EXCEPTION.value,
) from ex

if is_query:
# If the string passed in to `pd.read_snowflake` is a SQL query, we can simply create
# a Snowpark DataFrame, and convert that to a Snowpark pandas DataFrame, and extract
# the OrderedDataFrame and row_position_snowflake_quoted_identifier from there.
# If there is an ORDER BY in the query, we should log it.
contains_order_by = _check_if_sql_query_contains_order_by_and_warn_user(
table_name_or_query
)
statement_params = get_default_snowpark_pandas_statement_params()
statement_params[STATEMENT_PARAMS.CONTAINS_ORDER_BY] = str(
contains_order_by
).upper()

initial_ordered_dataframe = OrderedDataFrame(
DataFrameReference(session.table(readonly_table_name, _emit_ast=False))
if not relaxed_ordering
else DataFrameReference(session.sql(table_name_or_query, _emit_ast=False))
if is_query
else DataFrameReference(session.table(table_name_or_query, _emit_ast=False))
)
# generate a snowflake quoted identifier for row position column that can be used for aliasing
Expand Down Expand Up @@ -439,11 +454,7 @@ def create_initial_ordered_dataframe(
row_position_snowflake_quoted_identifier=row_position_snowflake_quoted_identifier,
)
else:
if relaxed_ordering:
raise NotImplementedError(
"The 'pd.read_snowflake' method does not currently support 'relaxed_ordering=True'"
" when 'name_or_query' is a query"
)
assert is_query and not relaxed_ordering

# If the string passed in to `pd.read_snowflake` is a SQL query, we can simply create
# a Snowpark DataFrame, and convert that to a Snowpark pandas DataFrame, and extract
Expand Down
67 changes: 45 additions & 22 deletions tests/integ/modin/io/test_read_snowflake_query_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,35 @@

import modin.pandas as pd
import pandas as native_pd
import pytest

import snowflake.snowpark.modin.plugin # noqa: F401
from snowflake.snowpark._internal.utils import TempObjectType
from tests.integ.modin.utils import (
assert_snowpark_pandas_equals_to_pandas_without_dtypecheck,
)
from tests.integ.utils.sql_counter import sql_count_checker
from tests.integ.utils.sql_counter import SqlCounter
from tests.utils import Utils


@sql_count_checker(query_count=7, sproc_count=1)
def test_read_snowflake_call_sproc(session):
session.sql(
"""
@pytest.mark.parametrize(
"relaxed_ordering",
[
pytest.param(
True,
marks=pytest.mark.skip(
"Queries with CALL statements raise a SQL compilation "
"error when relaxed_ordering=True"
),
),
False,
],
)
def test_read_snowflake_call_sproc(session, relaxed_ordering):
expected_query_count = 7 if not relaxed_ordering else 5
with SqlCounter(query_count=expected_query_count, sproc_count=1):
session.sql(
"""
CREATE OR REPLACE PROCEDURE filter_by_role(tableName VARCHAR, role VARCHAR)
RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR)
LANGUAGE PYTHON
Expand All @@ -29,23 +44,31 @@ def filter_by_role(session, table_name, role):
df = session.table(table_name)
return df.filter(col('role') == role)
$$"""
).collect()
try:
table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
session.sql(
f"""CREATE OR REPLACE TEMPORARY TABLE {table_name}(id NUMBER, name VARCHAR, role VARCHAR) AS SELECT * FROM VALUES(1, 'Alice', 'op'), (2, 'Bob', 'dev')"""
).collect()
df = pd.read_snowflake(f"CALL filter_by_role('{table_name}', 'op')")
native_df = native_pd.DataFrame(
[[1, "Alice", "op"]], columns=["ID", "NAME", "ROLE"]
)
assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(df, native_df)
finally:
session.sql("DROP PROCEDURE filter_by_role(VARCHAR, VARCHAR)").collect()
try:
table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
session.sql(
f"""CREATE OR REPLACE TEMPORARY TABLE {table_name}(id NUMBER, name VARCHAR, role VARCHAR) AS SELECT * FROM VALUES(1, 'Alice', 'op'), (2, 'Bob', 'dev')"""
).collect()
df = pd.read_snowflake(
f"CALL filter_by_role('{table_name}', 'op')",
relaxed_ordering=relaxed_ordering,
)
native_df = native_pd.DataFrame(
[[1, "Alice", "op"]], columns=["ID", "NAME", "ROLE"]
)
assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(df, native_df)
finally:
session.sql("DROP PROCEDURE filter_by_role(VARCHAR, VARCHAR)").collect()


@sql_count_checker(query_count=4)
def test_read_snowflake_system_function(session):
df = pd.read_snowflake("SELECT SYSTEM$TYPEOF(TRUE)")
native_df = session.sql("SELECT SYSTEM$TYPEOF(TRUE)").to_pandas()
assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(df, native_df)
@pytest.mark.parametrize("relaxed_ordering", [True, False])
def test_read_snowflake_system_function(session, relaxed_ordering):
expected_query_count = 4 if not relaxed_ordering else 2
with SqlCounter(query_count=expected_query_count):
df = pd.read_snowflake(
"SELECT SYSTEM$TYPEOF(TRUE)",
relaxed_ordering=relaxed_ordering,
)
native_df = session.sql("SELECT SYSTEM$TYPEOF(TRUE)").to_pandas()
assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(df, native_df)
Loading
Loading