From 045d8d0f8bf7edfee51839948c6397d621972514 Mon Sep 17 00:00:00 2001 From: Hazem Elmeleegy Date: Fri, 14 Mar 2025 19:25:11 -0700 Subject: [PATCH 1/4] SNOW-1962180: Support relaxed pandas in pd.read_snowflake for query data sources --- CHANGELOG.md | 1 + .../snowpark/modin/plugin/_internal/utils.py | 23 +- .../io/test_read_snowflake_query_call.py | 67 +- .../modin/io/test_read_snowflake_query_cte.py | 352 +++++---- .../io/test_read_snowflake_query_order_by.py | 477 +++++++----- .../io/test_read_snowflake_select_query.py | 735 ++++++++++-------- 6 files changed, 939 insertions(+), 716 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 466591d8a1..da8b34d1b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ #### Improvements - Support relaxed consistency and ordering guarantees in `pd.read_snowflake` for non-query data sources. +- Support relaxed consistency and ordering guarantees in `pd.read_snowflake` for query data sources. ## 1.29.0 (2025-03-05) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/utils.py b/src/snowflake/snowpark/modin/plugin/_internal/utils.py index cc68a80ec0..3a11b95506 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/utils.py @@ -350,7 +350,7 @@ def create_initial_ordered_dataframe( table_name_or_query ) is_query = not _is_table_name(table_name_or_query) - if not is_query: + if not is_query or relaxed_ordering: if not relaxed_ordering: try: readonly_table_name = _create_read_only_table( @@ -391,9 +391,24 @@ def create_initial_ordered_dataframe( error_code=SnowparkPandasErrorCode.GENERAL_SQL_EXCEPTION.value, ) from ex + if is_query: + # If the string passed in to `pd.read_snowflake` is a SQL query, we can simply create + # a Snowpark DataFrame, and convert that to a Snowpark pandas DataFrame, and extract + # the OrderedDataFrame and row_position_snowflake_quoted_identifier from there. + # If there is an ORDER BY in the query, we should log it. + contains_order_by = _check_if_sql_query_contains_order_by_and_warn_user( + table_name_or_query + ) + statement_params = get_default_snowpark_pandas_statement_params() + statement_params[STATEMENT_PARAMS.CONTAINS_ORDER_BY] = str( + contains_order_by + ).upper() + initial_ordered_dataframe = OrderedDataFrame( DataFrameReference(session.table(readonly_table_name, _emit_ast=False)) if not relaxed_ordering + else DataFrameReference(session.sql(table_name_or_query, _emit_ast=False)) + if is_query else DataFrameReference(session.table(table_name_or_query, _emit_ast=False)) ) # generate a snowflake quoted identifier for row position column that can be used for aliasing @@ -439,11 +454,7 @@ def create_initial_ordered_dataframe( row_position_snowflake_quoted_identifier=row_position_snowflake_quoted_identifier, ) else: - if relaxed_ordering: - raise NotImplementedError( - "The 'pd.read_snowflake' method does not currently support 'relaxed_ordering=True'" - " when 'name_or_query' is a query" - ) + assert is_query and not relaxed_ordering # If the string passed in to `pd.read_snowflake` is a SQL query, we can simply create # a Snowpark DataFrame, and convert that to a Snowpark pandas DataFrame, and extract diff --git a/tests/integ/modin/io/test_read_snowflake_query_call.py b/tests/integ/modin/io/test_read_snowflake_query_call.py index 67d4323ec6..aa7ace1717 100644 --- a/tests/integ/modin/io/test_read_snowflake_query_call.py +++ b/tests/integ/modin/io/test_read_snowflake_query_call.py @@ -4,20 +4,35 @@ import modin.pandas as pd import pandas as native_pd +import pytest import snowflake.snowpark.modin.plugin # noqa: F401 from snowflake.snowpark._internal.utils import TempObjectType from tests.integ.modin.utils import ( assert_snowpark_pandas_equals_to_pandas_without_dtypecheck, ) -from tests.integ.utils.sql_counter import sql_count_checker +from tests.integ.utils.sql_counter import SqlCounter from tests.utils import Utils -@sql_count_checker(query_count=7, sproc_count=1) -def test_read_snowflake_call_sproc(session): - session.sql( - """ +@pytest.mark.parametrize( + "relaxed_ordering", + [ + pytest.param( + True, + marks=pytest.mark.skip( + "Queries with CALL statements raise a SQL compilation " + "error when relaxed_ordering=True" + ), + ), + False, + ], +) +def test_read_snowflake_call_sproc(session, relaxed_ordering): + expected_query_count = 7 if not relaxed_ordering else 5 + with SqlCounter(query_count=expected_query_count, sproc_count=1): + session.sql( + """ CREATE OR REPLACE PROCEDURE filter_by_role(tableName VARCHAR, role VARCHAR) RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR) LANGUAGE PYTHON @@ -29,23 +44,31 @@ def filter_by_role(session, table_name, role): df = session.table(table_name) return df.filter(col('role') == role) $$""" - ).collect() - try: - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - session.sql( - f"""CREATE OR REPLACE TEMPORARY TABLE {table_name}(id NUMBER, name VARCHAR, role VARCHAR) AS SELECT * FROM VALUES(1, 'Alice', 'op'), (2, 'Bob', 'dev')""" ).collect() - df = pd.read_snowflake(f"CALL filter_by_role('{table_name}', 'op')") - native_df = native_pd.DataFrame( - [[1, "Alice", "op"]], columns=["ID", "NAME", "ROLE"] - ) - assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(df, native_df) - finally: - session.sql("DROP PROCEDURE filter_by_role(VARCHAR, VARCHAR)").collect() + try: + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.sql( + f"""CREATE OR REPLACE TEMPORARY TABLE {table_name}(id NUMBER, name VARCHAR, role VARCHAR) AS SELECT * FROM VALUES(1, 'Alice', 'op'), (2, 'Bob', 'dev')""" + ).collect() + df = pd.read_snowflake( + f"CALL filter_by_role('{table_name}', 'op')", + relaxed_ordering=relaxed_ordering, + ) + native_df = native_pd.DataFrame( + [[1, "Alice", "op"]], columns=["ID", "NAME", "ROLE"] + ) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(df, native_df) + finally: + session.sql("DROP PROCEDURE filter_by_role(VARCHAR, VARCHAR)").collect() -@sql_count_checker(query_count=4) -def test_read_snowflake_system_function(session): - df = pd.read_snowflake("SELECT SYSTEM$TYPEOF(TRUE)") - native_df = session.sql("SELECT SYSTEM$TYPEOF(TRUE)").to_pandas() - assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(df, native_df) +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_read_snowflake_system_function(session, relaxed_ordering): + expected_query_count = 4 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count): + df = pd.read_snowflake( + "SELECT SYSTEM$TYPEOF(TRUE)", + relaxed_ordering=relaxed_ordering, + ) + native_df = session.sql("SELECT SYSTEM$TYPEOF(TRUE)").to_pandas() + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(df, native_df) diff --git a/tests/integ/modin/io/test_read_snowflake_query_cte.py b/tests/integ/modin/io/test_read_snowflake_query_cte.py index baa35f01e5..87eb29d0dc 100644 --- a/tests/integ/modin/io/test_read_snowflake_query_cte.py +++ b/tests/integ/modin/io/test_read_snowflake_query_cte.py @@ -11,167 +11,207 @@ from tests.integ.modin.utils import ( assert_snowpark_pandas_equals_to_pandas_without_dtypecheck, ) -from tests.integ.utils.sql_counter import sql_count_checker +from tests.integ.utils.sql_counter import SqlCounter from tests.utils import Utils -@sql_count_checker(query_count=4, union_count=2) -def test_read_snowflake_query_basic_cte(session): - # create table - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - session.create_dataframe( - native_pd.DataFrame([[1, 2], [3, 7], [6, 7]], columns=["A", "B"]) - ).write.save_as_table(table_name, table_type="temp") - SQL_QUERY = f"""WITH CTE1 AS (SELECT SQUARE(A) AS A2, SQUARE(B) AS B2 FROM {table_name} WHERE A % 2 = 1), - CTE2 AS (SELECT SQUARE(A2) as A2, SQUARE(B2) AS B4 FROM CTE1 WHERE B2 % 2 = 0) (SELECT * FROM {table_name}) - UNION ALL (SELECT * FROM CTE1) UNION ALL (SELECT * FROM CTE2)""" - df = pd.read_snowflake(SQL_QUERY) - pdf = native_pd.DataFrame( - [[1, 2], [3, 7], [6, 7], [1, 4], [9, 49], [1, 16]], columns=["A", "B"] - ) - assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(df, pdf) +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_read_snowflake_query_basic_cte(session, relaxed_ordering): + expected_query_count = 4 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count, union_count=2): + # create table + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.create_dataframe( + native_pd.DataFrame([[1, 2], [3, 7], [6, 7]], columns=["A", "B"]) + ).write.save_as_table(table_name, table_type="temp") + SQL_QUERY = f"""WITH CTE1 AS (SELECT SQUARE(A) AS A2, SQUARE(B) AS B2 FROM {table_name} WHERE A % 2 = 1), + CTE2 AS (SELECT SQUARE(A2) as A2, SQUARE(B2) AS B4 FROM CTE1 WHERE B2 % 2 = 0) (SELECT * FROM {table_name}) + UNION ALL (SELECT * FROM CTE1) UNION ALL (SELECT * FROM CTE2)""" + df = pd.read_snowflake(SQL_QUERY, relaxed_ordering=relaxed_ordering) + pdf = native_pd.DataFrame( + [[1, 2], [3, 7], [6, 7], [1, 4], [9, 49], [1, 16]], columns=["A", "B"] + ) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(df, pdf) -@sql_count_checker(query_count=3, union_count=1) -def test_read_snowflake_query_recursive_cte(): - SQL_QUERY = """WITH RECURSIVE current_f (current_val, previous_val) AS - ( - SELECT 0, 1 - UNION ALL - SELECT current_val + previous_val, current_val FROM current_f - WHERE current_val + previous_val < 100 - ) - SELECT current_val FROM current_f ORDER BY current_val""" - df = pd.read_snowflake(SQL_QUERY) - native_df = ( - native_pd.DataFrame( - [[0], [1], [1], [2], [3], [5], [8], [13], [21], [34], [55], [89]], - columns=["CURRENT_VAL"], +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_read_snowflake_query_recursive_cte(relaxed_ordering): + expected_query_count = 3 if not relaxed_ordering else 1 + with SqlCounter(query_count=expected_query_count, union_count=1): + SQL_QUERY = """WITH RECURSIVE current_f (current_val, previous_val) AS + ( + SELECT 0, 1 + UNION ALL + SELECT current_val + previous_val, current_val FROM current_f + WHERE current_val + previous_val < 100 + ) + SELECT current_val FROM current_f ORDER BY current_val""" + df = pd.read_snowflake(SQL_QUERY, relaxed_ordering=relaxed_ordering) + native_df = ( + native_pd.DataFrame( + [[0], [1], [1], [2], [3], [5], [8], [13], [21], [34], [55], [89]], + columns=["CURRENT_VAL"], + ) + .sort_values("CURRENT_VAL") + .reset_index(drop=True) + ) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + df.sort_values("CURRENT_VAL").reset_index(drop=True), native_df ) - .sort_values("CURRENT_VAL") - .reset_index(drop=True) - ) - assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( - df.sort_values("CURRENT_VAL").reset_index(drop=True), native_df - ) -@sql_count_checker(query_count=4, join_count=1, union_count=1) -def test_read_snowflake_query_complex_recursive_cte(session): - # create table - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - session.sql( - f""" - -- The components of a car. - CREATE OR REPLACE TEMPORARY TABLE {table_name} ( - description VARCHAR, - component_ID INTEGER, - quantity INTEGER, - parent_component_ID INTEGER - ) - """ - ).collect() - session.sql( - f""" - INSERT INTO {table_name} (description, quantity, component_ID, parent_component_ID) VALUES - ('car', 1, 1, 0), - ('wheel', 4, 11, 1), - ('tire', 1, 111, 11), - ('#112 bolt', 5, 112, 11), - ('brake', 1, 113, 11), - ('brake pad', 1, 1131, 113), - ('engine', 1, 12, 1), - ('piston', 4, 121, 12), - ('cylinder block', 1, 122, 12), - ('#112 bolt', 16, 112, 12) -- Can use same type of bolt in multiple places - """ - ) - SQL_QUERY = f""" - WITH RECURSIVE current_layer (indent, layer_ID, parent_component_ID, component_id, description, sort_key) AS ( - SELECT - '...', - 1, - parent_component_ID, +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_read_snowflake_query_complex_recursive_cte(session, relaxed_ordering): + expected_query_count = 4 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count, join_count=1, union_count=1): + # create table + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.sql( + f""" + -- The components of a car. + CREATE OR REPLACE TEMPORARY TABLE {table_name} ( + description VARCHAR, + component_ID INTEGER, + quantity INTEGER, + parent_component_ID INTEGER + ) + """ + ).collect() + session.sql( + f""" + INSERT INTO {table_name} (description, quantity, component_ID, parent_component_ID) VALUES + ('car', 1, 1, 0), + ('wheel', 4, 11, 1), + ('tire', 1, 111, 11), + ('#112 bolt', 5, 112, 11), + ('brake', 1, 113, 11), + ('brake pad', 1, 1131, 113), + ('engine', 1, 12, 1), + ('piston', 4, 121, 12), + ('cylinder block', 1, 122, 12), + ('#112 bolt', 16, 112, 12) -- Can use same type of bolt in multiple places + """ + ) + SQL_QUERY = f""" + WITH RECURSIVE current_layer (indent, layer_ID, parent_component_ID, component_id, description, sort_key) AS ( + SELECT + '...', + 1, + parent_component_ID, + component_id, + description, + '0001' + FROM {table_name} WHERE component_id = 1 + UNION ALL + SELECT indent || '...', + layer_ID + 1, + {table_name}.parent_component_ID, + {table_name}.component_id, + {table_name}.description, + sort_key || SUBSTRING('000' || {table_name}.component_ID, -4) + FROM current_layer JOIN {table_name} + ON ({table_name}.parent_component_id = current_layer.component_id) + ) + SELECT + -- The indentation gives us a sort of "side-ways tree" view, with + -- sub-{table_name} indented under their respective {table_name}. + indent || description AS description, component_id, - description, - '0001' - FROM {table_name} WHERE component_id = 1 - UNION ALL - SELECT indent || '...', - layer_ID + 1, - {table_name}.parent_component_ID, - {table_name}.component_id, - {table_name}.description, - sort_key || SUBSTRING('000' || {table_name}.component_ID, -4) - FROM current_layer JOIN {table_name} - ON ({table_name}.parent_component_id = current_layer.component_id) - ) - SELECT - -- The indentation gives us a sort of "side-ways tree" view, with - -- sub-{table_name} indented under their respective {table_name}. - indent || description AS description, - component_id, - parent_component_ID, - sort_key - -- The layer_ID and sort_key are useful for debugging, but not - -- needed in the report. - -- , layer_ID, sort_key - FROM current_layer - ORDER BY sort_key - """ - cur = session.connection.cursor() - cur.execute(SQL_QUERY) - native_df = cur.fetch_pandas_all().sort_values("SORT_KEY").reset_index(drop=True) - snow_df = ( - pd.read_snowflake(SQL_QUERY).sort_values("SORT_KEY").reset_index(drop=True) - ) - assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(snow_df, native_df) + parent_component_ID, + sort_key + -- The layer_ID and sort_key are useful for debugging, but not + -- needed in the report. + -- , layer_ID, sort_key + FROM current_layer + ORDER BY sort_key + """ + cur = session.connection.cursor() + cur.execute(SQL_QUERY) + native_df = ( + cur.fetch_pandas_all().sort_values("SORT_KEY").reset_index(drop=True) + ) + snow_df = ( + pd.read_snowflake(SQL_QUERY, relaxed_ordering=relaxed_ordering) + .sort_values("SORT_KEY") + .reset_index(drop=True) + ) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(snow_df, native_df) -@sql_count_checker(query_count=5, sproc_count=1) -def test_read_snowflake_query_cte_with_cross_language_sproc(session): - # create table name - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - SPROC_CREATION = """ - WITH filter_by_role AS PROCEDURE (table_name VARCHAR, role VARCHAR) - RETURNS TABLE("id" NUMBER, "name" VARCHAR, "role" VARCHAR) - LANGUAGE SCALA - RUNTIME_VERSION = '2.12' - PACKAGES = ('com.snowflake:snowpark:latest') - HANDLER = 'Filter.filterByRole' - AS - $$ - import com.snowflake.snowpark.functions._ - import com.snowflake.snowpark._ +@pytest.mark.parametrize( + "relaxed_ordering", + [ + pytest.param( + True, + marks=pytest.mark.skip( + "Queries with CALL statements raise a SQL compilation " + "error when relaxed_ordering=True" + ), + ), + False, + ], +) +def test_read_snowflake_query_cte_with_cross_language_sproc(session, relaxed_ordering): + expected_query_count = 5 if not relaxed_ordering else 3 + with SqlCounter(query_count=expected_query_count, sproc_count=1): + # create table name + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + SPROC_CREATION = """ + WITH filter_by_role AS PROCEDURE (table_name VARCHAR, role VARCHAR) + RETURNS TABLE("id" NUMBER, "name" VARCHAR, "role" VARCHAR) + LANGUAGE SCALA + RUNTIME_VERSION = '2.12' + PACKAGES = ('com.snowflake:snowpark:latest') + HANDLER = 'Filter.filterByRole' + AS + $$ + import com.snowflake.snowpark.functions._ + import com.snowflake.snowpark._ - object Filter { - def filterByRole(session: Session, tableName: String, role: String): DataFrame = { - val table = session.table(tableName) - val filteredRows = table.filter(col("\\"role\\"") === role) - return filteredRows + object Filter { + def filterByRole(session: Session, tableName: String, role: String): DataFrame = { + val table = session.table(tableName) + val filteredRows = table.filter(col("\\"role\\"") === role) + return filteredRows + } } - } - $$ - """ - SQL_QUERY = f"{SPROC_CREATION} CALL filter_by_role('{table_name}', 'op')" - native_df = native_pd.DataFrame( - [[1, "Alice", "op"], [2, "Bob", "dev"], [3, "Cindy", "dev"]], - columns=["id", "name", "role"], - ) - session.create_dataframe(native_df).write.save_as_table( - table_name, table_type="temp" - ) - native_df = native_df.iloc[0:1] - snow_df = pd.read_snowflake(SQL_QUERY) - assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(snow_df, native_df) + $$ + """ + SQL_QUERY = f"{SPROC_CREATION} CALL filter_by_role('{table_name}', 'op')" + native_df = native_pd.DataFrame( + [[1, "Alice", "op"], [2, "Bob", "dev"], [3, "Cindy", "dev"]], + columns=["id", "name", "role"], + ) + session.create_dataframe(native_df).write.save_as_table( + table_name, table_type="temp" + ) + native_df = native_df.iloc[0:1] + snow_df = pd.read_snowflake(SQL_QUERY, relaxed_ordering=relaxed_ordering) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(snow_df, native_df) @pytest.mark.modin_sp_precommit -@sql_count_checker(query_count=5, sproc_count=1) -def test_read_snowflake_query_cte_with_python_anonymous_sproc(session): - # create table name - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - SPROC_CREATION = """ +@pytest.mark.parametrize( + "relaxed_ordering", + [ + pytest.param( + True, + marks=pytest.mark.skip( + "Queries with CALL statements raise a SQL compilation " + "error when relaxed_ordering=True" + ), + ), + False, + ], +) +def test_read_snowflake_query_cte_with_python_anonymous_sproc( + session, relaxed_ordering +): + expected_query_count = 5 if not relaxed_ordering else 3 + with SqlCounter(query_count=expected_query_count, sproc_count=1): + # create table name + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + SPROC_CREATION = """ WITH filterByRole AS PROCEDURE (tableName VARCHAR, role VARCHAR) RETURNS TABLE("id" NUMBER, "name" VARCHAR, "role" VARCHAR) LANGUAGE PYTHON @@ -185,14 +225,14 @@ def filter_by_role(session, table_name, role): return df.filter(col('"role"') == role) $$ """ - SQL_QUERY = f"{SPROC_CREATION} CALL filterByRole('{table_name}', 'op')" - native_df = native_pd.DataFrame( - [[1, "Alice", "op"], [2, "Bob", "dev"], [3, "Cindy", "dev"]], - columns=["id", "name", "role"], - ) - session.create_dataframe(native_df).write.save_as_table( - table_name, table_type="temp" - ) - native_df = native_df.iloc[0:1] - snow_df = pd.read_snowflake(SQL_QUERY) - assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(snow_df, native_df) + SQL_QUERY = f"{SPROC_CREATION} CALL filterByRole('{table_name}', 'op')" + native_df = native_pd.DataFrame( + [[1, "Alice", "op"], [2, "Bob", "dev"], [3, "Cindy", "dev"]], + columns=["id", "name", "role"], + ) + session.create_dataframe(native_df).write.save_as_table( + table_name, table_type="temp" + ) + native_df = native_df.iloc[0:1] + snow_df = pd.read_snowflake(SQL_QUERY, relaxed_ordering=relaxed_ordering) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(snow_df, native_df) diff --git a/tests/integ/modin/io/test_read_snowflake_query_order_by.py b/tests/integ/modin/io/test_read_snowflake_query_order_by.py index 5c3cb13d9b..09310445bb 100644 --- a/tests/integ/modin/io/test_read_snowflake_query_order_by.py +++ b/tests/integ/modin/io/test_read_snowflake_query_order_by.py @@ -18,248 +18,293 @@ from tests.integ.modin.utils import ( assert_snowpark_pandas_equals_to_pandas_without_dtypecheck, ) -from tests.integ.utils.sql_counter import sql_count_checker +from tests.integ.utils.sql_counter import SqlCounter from tests.utils import Utils -@sql_count_checker(query_count=4) -def test_select_star_with_order_by(session, caplog): - # This test ensures that the presence of an ORDER BY causes us not to take the fastpath - # of select * from table, where we just do `pd.read_snowflake("table")` instead. - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - # Want random permutation, but need to make sure that there are no duplicates in the sorting column - # as otherwise ties may be broken differently between us and vanilla pandas. - native_df = native_pd.DataFrame( - np.random.choice(10_000, size=(1_000, 10), replace=False), - columns=[f"col{i}" for i in range(10)], - ) - session.create_dataframe(native_df).write.save_as_table( - table_name, table_type="temp" - ) - caplog.clear() - WarningMessage.printed_warnings = set() - with caplog.at_level(logging.DEBUG): - snow_df = pd.read_snowflake(f'SELECT * FROM {table_name} ORDER BY "col8"') - assert ORDER_BY_IN_SQL_QUERY_NOT_GUARANTEED_WARNING in caplog.text - native_df = native_df.reset_index(drop=True) - assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( - snow_df.sort_values("col0").reset_index(drop=True), - native_df.sort_values("col0").reset_index(drop=True), - ) +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_select_star_with_order_by(session, caplog, relaxed_ordering): + expected_query_count = 4 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count): + # This test ensures that the presence of an ORDER BY causes us not to take the fastpath + # of select * from table, where we just do `pd.read_snowflake("table")` instead. + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + # Want random permutation, but need to make sure that there are no duplicates in the sorting column + # as otherwise ties may be broken differently between us and vanilla pandas. + native_df = native_pd.DataFrame( + np.random.choice(10_000, size=(1_000, 10), replace=False), + columns=[f"col{i}" for i in range(10)], + ) + session.create_dataframe(native_df).write.save_as_table( + table_name, table_type="temp" + ) + caplog.clear() + WarningMessage.printed_warnings = set() + with caplog.at_level(logging.DEBUG): + snow_df = pd.read_snowflake( + f'SELECT * FROM {table_name} ORDER BY "col8"', + relaxed_ordering=relaxed_ordering, + ) + assert ORDER_BY_IN_SQL_QUERY_NOT_GUARANTEED_WARNING in caplog.text + native_df = native_df.reset_index(drop=True) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + snow_df.sort_values("col0").reset_index(drop=True), + native_df.sort_values("col0").reset_index(drop=True), + ) -@sql_count_checker(query_count=3) -def test_no_order_by_but_column_name_shadows(session, caplog): - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - session.create_dataframe( - native_pd.DataFrame( - [[1, 2, 3], [4, 5, 6], [7, 8, 9]], columns=["A", "B", "ORDER BY"] - ) - ).write.save_as_table(table_name, table_type="temp") - caplog.clear() - WarningMessage.printed_warnings = set() - with caplog.at_level(logging.DEBUG): - df = pd.read_snowflake(f'SELECT A, B, "ORDER BY" FROM {table_name}') - # verify no temporary table is materialized for regular table - assert "Materialize temporary table" not in caplog.text - assert ORDER_BY_IN_SQL_QUERY_NOT_GUARANTEED_WARNING not in caplog.text - assert df.columns.tolist() == ["A", "B", "ORDER BY"] +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_no_order_by_but_column_name_shadows(session, caplog, relaxed_ordering): + expected_query_count = 3 if not relaxed_ordering else 1 + with SqlCounter(query_count=expected_query_count): + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.create_dataframe( + native_pd.DataFrame( + [[1, 2, 3], [4, 5, 6], [7, 8, 9]], columns=["A", "B", "ORDER BY"] + ) + ).write.save_as_table(table_name, table_type="temp") + caplog.clear() + WarningMessage.printed_warnings = set() + with caplog.at_level(logging.DEBUG): + df = pd.read_snowflake( + f'SELECT A, B, "ORDER BY" FROM {table_name}', + relaxed_ordering=relaxed_ordering, + ) + # verify no temporary table is materialized for regular table + assert "Materialize temporary table" not in caplog.text + assert ORDER_BY_IN_SQL_QUERY_NOT_GUARANTEED_WARNING not in caplog.text + assert df.columns.tolist() == ["A", "B", "ORDER BY"] @pytest.mark.parametrize("order_by_col", [1, '"ORDER BY"', '"ORDER BY 1"', "A"]) -@sql_count_checker(query_count=4) -def test_order_by_and_column_name_shadows(session, caplog, order_by_col): - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - # Want random permutation, but need to make sure that there are no duplicates in the sorting column - # as otherwise ties may be broken differently between us and vanilla pandas. - native_df = native_pd.DataFrame( - np.random.choice(3_000, size=(1_000, 3), replace=False), - columns=["ORDER BY", "A", "ORDER BY 1"], - ) - session.create_dataframe(native_df).write.save_as_table( - table_name, table_type="temp" - ) - caplog.clear() - WarningMessage.printed_warnings = set() - with caplog.at_level(logging.DEBUG): - snow_df = pd.read_snowflake( - f'SELECT "ORDER BY", A, "ORDER BY 1" FROM {table_name} ORDER BY {order_by_col}' +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_order_by_and_column_name_shadows( + session, caplog, order_by_col, relaxed_ordering +): + expected_query_count = 4 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count): + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + # Want random permutation, but need to make sure that there are no duplicates in the sorting column + # as otherwise ties may be broken differently between us and vanilla pandas. + native_df = native_pd.DataFrame( + np.random.choice(3_000, size=(1_000, 3), replace=False), + columns=["ORDER BY", "A", "ORDER BY 1"], + ) + session.create_dataframe(native_df).write.save_as_table( + table_name, table_type="temp" + ) + caplog.clear() + WarningMessage.printed_warnings = set() + with caplog.at_level(logging.DEBUG): + snow_df = pd.read_snowflake( + f'SELECT "ORDER BY", A, "ORDER BY 1" FROM {table_name} ORDER BY {order_by_col}', + relaxed_ordering=relaxed_ordering, + ) + # verify warning issued since we are sorting. + assert ORDER_BY_IN_SQL_QUERY_NOT_GUARANTEED_WARNING in caplog.text + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + snow_df.sort_values("A").reset_index(drop=True), + native_df.sort_values("A").reset_index(drop=True), ) - # verify warning issued since we are sorting. - assert ORDER_BY_IN_SQL_QUERY_NOT_GUARANTEED_WARNING in caplog.text - assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( - snow_df.sort_values("A").reset_index(drop=True), - native_df.sort_values("A").reset_index(drop=True), - ) -@sql_count_checker(query_count=4) -def test_order_by_as_column_name_should_not_warn_negative(session, caplog): - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - session.create_dataframe( - native_pd.DataFrame( - [[1, 2, 3], [4, 5, 6], [7, 8, 9]], columns=["A", "B", " ORDER BY "] - ) - ).write.save_as_table(table_name, table_type="temp") - caplog.clear() - WarningMessage.printed_warnings = set() - with caplog.at_level(logging.DEBUG): - df = pd.read_snowflake(f'SELECT " ORDER BY " FROM {table_name}') - df.to_pandas() # Force materialization of snowpark dataframe backing this dataframe. - # In this case, there is no ORDER BY, but since we use string matching, we will get a false - # positive here. - assert ORDER_BY_IN_SQL_QUERY_NOT_GUARANTEED_WARNING in caplog.text - assert df.columns.tolist() == [" ORDER BY "] +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_order_by_as_column_name_should_not_warn_negative( + session, caplog, relaxed_ordering +): + expected_query_count = 4 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count): + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.create_dataframe( + native_pd.DataFrame( + [[1, 2, 3], [4, 5, 6], [7, 8, 9]], columns=["A", "B", " ORDER BY "] + ) + ).write.save_as_table(table_name, table_type="temp") + caplog.clear() + WarningMessage.printed_warnings = set() + with caplog.at_level(logging.DEBUG): + df = pd.read_snowflake( + f'SELECT " ORDER BY " FROM {table_name}', + relaxed_ordering=relaxed_ordering, + ) + df.to_pandas() # Force materialization of snowpark dataframe backing this dataframe. + # In this case, there is no ORDER BY, but since we use string matching, we will get a false + # positive here. + assert ORDER_BY_IN_SQL_QUERY_NOT_GUARANTEED_WARNING in caplog.text + assert df.columns.tolist() == [" ORDER BY "] -@sql_count_checker(query_count=4) +@pytest.mark.parametrize("relaxed_ordering", [True, False]) def test_inner_order_by_should_be_ignored_and_no_outer_order_by_negative( - session, caplog + session, caplog, relaxed_ordering ): - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - session.create_dataframe( - native_pd.DataFrame( - [[1, 2, 3], [4, 5, 6], [7, 8, 9]], columns=["A", "B", "ORDER BY"] - ) - ).write.save_as_table(table_name, table_type="temp") - caplog.clear() - WarningMessage.printed_warnings = set() - with caplog.at_level(logging.DEBUG): - df = pd.read_snowflake(f"SELECT * FROM (SELECT * FROM {table_name} ORDER BY 1)") - df.to_pandas() # Force materialization of snowpark dataframe backing this dataframe. - # Ideally, in this case, we would optimize away the ORDER BY, since it has no bearing - # on the final result; however, we use the logical plan of a SQL Query to determine if - # its got an ORDER BY, and the logical plan bubbles up nested inner ORDER BY's - # (for context as to why, see Thierry's message here: - # https://snowflake.slack.com/archives/C02BTC3HY/p1708032327090439?thread_ts=1708025496.641369&cid=C02BTC3HY) - # so we still include the sort in our code. - assert ORDER_BY_IN_SQL_QUERY_NOT_GUARANTEED_WARNING in caplog.text - assert df.columns.tolist() == ["A", "B", "ORDER BY"] + expected_query_count = 4 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count): + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.create_dataframe( + native_pd.DataFrame( + [[1, 2, 3], [4, 5, 6], [7, 8, 9]], columns=["A", "B", "ORDER BY"] + ) + ).write.save_as_table(table_name, table_type="temp") + caplog.clear() + WarningMessage.printed_warnings = set() + with caplog.at_level(logging.DEBUG): + df = pd.read_snowflake( + f"SELECT * FROM (SELECT * FROM {table_name} ORDER BY 1)", + relaxed_ordering=relaxed_ordering, + ) + df.to_pandas() # Force materialization of snowpark dataframe backing this dataframe. + # Ideally, in this case, we would optimize away the ORDER BY, since it has no bearing + # on the final result; however, we use the logical plan of a SQL Query to determine if + # its got an ORDER BY, and the logical plan bubbles up nested inner ORDER BY's + # (for context as to why, see Thierry's message here: + # https://snowflake.slack.com/archives/C02BTC3HY/p1708032327090439?thread_ts=1708025496.641369&cid=C02BTC3HY) + # so we still include the sort in our code. + assert ORDER_BY_IN_SQL_QUERY_NOT_GUARANTEED_WARNING in caplog.text + assert df.columns.tolist() == ["A", "B", "ORDER BY"] -@sql_count_checker(query_count=4) -def test_order_by_with_no_limit_but_colname_shadows(session, caplog): - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - native_df = native_pd.DataFrame( - [[1, 2, 4], [4, 5, 6], [7, 8, 3]], columns=["A", "B", "LIMIT 1"] - ) - session.create_dataframe(native_df).write.save_as_table( - table_name, table_type="temp" - ) - WarningMessage.printed_warnings = set() - caplog.clear() - with caplog.at_level(logging.DEBUG): - df = pd.read_snowflake(f'SELECT * FROM {table_name} ORDER BY "LIMIT 1"') - assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( - df.sort_values("A").reset_index(drop=True), - native_df.sort_values("A").reset_index(drop=True), +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_order_by_with_no_limit_but_colname_shadows(session, caplog, relaxed_ordering): + expected_query_count = 4 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count): + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + native_df = native_pd.DataFrame( + [[1, 2, 4], [4, 5, 6], [7, 8, 3]], columns=["A", "B", "LIMIT 1"] + ) + session.create_dataframe(native_df).write.save_as_table( + table_name, table_type="temp" ) - assert ORDER_BY_IN_SQL_QUERY_NOT_GUARANTEED_WARNING in caplog.text + WarningMessage.printed_warnings = set() + caplog.clear() + with caplog.at_level(logging.DEBUG): + df = pd.read_snowflake( + f'SELECT * FROM {table_name} ORDER BY "LIMIT 1"', + relaxed_ordering=relaxed_ordering, + ) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + df.sort_values("A").reset_index(drop=True), + native_df.sort_values("A").reset_index(drop=True), + ) + assert ORDER_BY_IN_SQL_QUERY_NOT_GUARANTEED_WARNING in caplog.text -@sql_count_checker(query_count=4) -def test_order_by_with_limit_and_name_shadows(session, caplog): - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - native_df = native_pd.DataFrame( - [[1, 2, 4], [4, 5, 6], [7, 8, 3]], columns=["A", "B", "LIMIT 1"] - ) - session.create_dataframe(native_df).write.save_as_table( - table_name, table_type="temp" - ) - WarningMessage.printed_warnings = set() - with caplog.at_level(logging.DEBUG): - df = pd.read_snowflake(f'SELECT * FROM {table_name} ORDER BY "LIMIT 1" LIMIT 2') - assert len(df) == 2 - assert ORDER_BY_IN_SQL_QUERY_NOT_GUARANTEED_WARNING in caplog.text +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_order_by_with_limit_and_name_shadows(session, caplog, relaxed_ordering): + expected_query_count = 4 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count): + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + native_df = native_pd.DataFrame( + [[1, 2, 4], [4, 5, 6], [7, 8, 3]], columns=["A", "B", "LIMIT 1"] + ) + session.create_dataframe(native_df).write.save_as_table( + table_name, table_type="temp" + ) + WarningMessage.printed_warnings = set() + with caplog.at_level(logging.DEBUG): + df = pd.read_snowflake( + f'SELECT * FROM {table_name} ORDER BY "LIMIT 1" LIMIT 2', + relaxed_ordering=relaxed_ordering, + ) + assert len(df) == 2 + assert ORDER_BY_IN_SQL_QUERY_NOT_GUARANTEED_WARNING in caplog.text -@sql_count_checker(query_count=5, join_count=1) -def test_read_snowflake_query_complex_query_with_join_and_order_by(session, caplog): - # create table - table_name1 = Utils.random_name_for_temp_object(TempObjectType.TABLE) - session.create_dataframe( - native_pd.DataFrame( - [[10, "car"], [3, "bus"], [6, "train"]], - columns=["price to consumer", "mode of transportation"], - ) - ).write.save_as_table(table_name1, table_type="temp") - table_name2 = Utils.random_name_for_temp_object(TempObjectType.TABLE) - session.create_dataframe( - native_pd.DataFrame( - [[5, "car"], [0.5, "bus"], [2, "train"]], - columns=["cost to operator", "mode of transportation"], +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_read_snowflake_query_complex_query_with_join_and_order_by( + session, caplog, relaxed_ordering +): + expected_query_count = 5 if not relaxed_ordering else 3 + with SqlCounter(query_count=expected_query_count, join_count=1): + # create table + table_name1 = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.create_dataframe( + native_pd.DataFrame( + [[10, "car"], [3, "bus"], [6, "train"]], + columns=["price to consumer", "mode of transportation"], + ) + ).write.save_as_table(table_name1, table_type="temp") + table_name2 = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.create_dataframe( + native_pd.DataFrame( + [[5, "car"], [0.5, "bus"], [2, "train"]], + columns=["cost to operator", "mode of transportation"], + ) + ).write.save_as_table(table_name2, table_type="temp") + WarningMessage.printed_warnings = set() + with caplog.at_level(logging.DEBUG): + df = pd.read_snowflake( + f'SELECT "price to consumer" - "cost to operator" as "profit", "mode of transportation" FROM {table_name1} NATURAL JOIN {table_name2} ORDER BY "profit"', + relaxed_ordering=relaxed_ordering, + ) + assert ORDER_BY_IN_SQL_QUERY_NOT_GUARANTEED_WARNING in caplog.text + pdf = native_pd.DataFrame( + [[5, "car"], [2.5, "bus"], [4, "train"]], + columns=["profit", "mode of transportation"], ) - ).write.save_as_table(table_name2, table_type="temp") - WarningMessage.printed_warnings = set() - with caplog.at_level(logging.DEBUG): - df = pd.read_snowflake( - f'SELECT "price to consumer" - "cost to operator" as "profit", "mode of transportation" FROM {table_name1} NATURAL JOIN {table_name2} ORDER BY "profit"' + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + df.sort_values("profit").reset_index(drop=True), + pdf.sort_values("profit").reset_index(drop=True), ) - assert ORDER_BY_IN_SQL_QUERY_NOT_GUARANTEED_WARNING in caplog.text - pdf = native_pd.DataFrame( - [[5, "car"], [2.5, "bus"], [4, "train"]], - columns=["profit", "mode of transportation"], - ) - assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( - df.sort_values("profit").reset_index(drop=True), - pdf.sort_values("profit").reset_index(drop=True), - ) @pytest.mark.parametrize("ordinal", [1, 2, 28]) -@sql_count_checker(query_count=4) -def test_order_by_with_position_key(session, ordinal, caplog): - column_order = [ - "col12", - "col1", - "col10", - "col11", - "col16", - "col24", - "col22", - "col20", - "col28", - "col26", - "col13", - "col15", - "col23", - "col14", - "col5", - "col18", - "col3", - "col6", - "col2", - "col4", - "col19", - "col0", - "col7", - "col8", - "col27", - "col29", - "col17", - "col9", - "col25", - "col21", - ] - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - # Want random permutation, but need to make sure that there are no duplicates in the sorting column - # as otherwise ties may be broken differently between us and vanilla pandas. - native_df = native_pd.DataFrame( - np.arange(60).reshape((2, 30)), - columns=[f"col{i}" for i in range(30)], - ) - native_df[column_order[ordinal]].iloc[0] = np.nan - session.create_dataframe(native_df).write.save_as_table( - table_name, table_type="temp" - ) - columns = ", ".join([f'"{col_name}"' for col_name in column_order]) - WarningMessage.printed_warnings = set() - with caplog.at_level(logging.DEBUG): - snow_df = pd.read_snowflake( - f"SELECT * from (SELECT {columns} FROM {table_name}) ORDER BY {ordinal + 1} ASC NULLS LAST" +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_order_by_with_position_key(session, ordinal, caplog, relaxed_ordering): + expected_query_count = 4 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count): + column_order = [ + "col12", + "col1", + "col10", + "col11", + "col16", + "col24", + "col22", + "col20", + "col28", + "col26", + "col13", + "col15", + "col23", + "col14", + "col5", + "col18", + "col3", + "col6", + "col2", + "col4", + "col19", + "col0", + "col7", + "col8", + "col27", + "col29", + "col17", + "col9", + "col25", + "col21", + ] + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + # Want random permutation, but need to make sure that there are no duplicates in the sorting column + # as otherwise ties may be broken differently between us and vanilla pandas. + native_df = native_pd.DataFrame( + np.arange(60).reshape((2, 30)), + columns=[f"col{i}" for i in range(30)], ) - assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( - snow_df.sort_values("col12").reset_index(drop=True), - native_df[column_order].sort_values("col12").reset_index(drop=True), + native_df[column_order[ordinal]].iloc[0] = np.nan + session.create_dataframe(native_df).write.save_as_table( + table_name, table_type="temp" ) - assert ORDER_BY_IN_SQL_QUERY_NOT_GUARANTEED_WARNING in caplog.text + columns = ", ".join([f'"{col_name}"' for col_name in column_order]) + WarningMessage.printed_warnings = set() + with caplog.at_level(logging.DEBUG): + snow_df = pd.read_snowflake( + f"SELECT * from (SELECT {columns} FROM {table_name}) ORDER BY {ordinal + 1} ASC NULLS LAST", + relaxed_ordering=relaxed_ordering, + ) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + snow_df.sort_values("col12").reset_index(drop=True), + native_df[column_order].sort_values("col12").reset_index(drop=True), + ) + assert ORDER_BY_IN_SQL_QUERY_NOT_GUARANTEED_WARNING in caplog.text diff --git a/tests/integ/modin/io/test_read_snowflake_select_query.py b/tests/integ/modin/io/test_read_snowflake_select_query.py index dacd509058..37c153c2ac 100644 --- a/tests/integ/modin/io/test_read_snowflake_select_query.py +++ b/tests/integ/modin/io/test_read_snowflake_select_query.py @@ -12,6 +12,7 @@ import snowflake.snowpark.modin.plugin # noqa: F401 from snowflake.snowpark._internal.analyzer.analyzer_utils import quote_name from snowflake.snowpark._internal.utils import TempObjectType +from snowflake.snowpark.exceptions import SnowparkSQLException from snowflake.snowpark.modin.plugin._internal.utils import ( READ_ONLY_TABLE_SUFFIX, extract_pandas_label_from_snowflake_quoted_identifier, @@ -26,248 +27,318 @@ VALID_SNOWFLAKE_COLUMN_NAMES_AND_ALIASES, assert_snowpark_pandas_equals_to_pandas_without_dtypecheck, ) -from tests.integ.utils.sql_counter import sql_count_checker +from tests.integ.utils.sql_counter import SqlCounter, sql_count_checker from tests.utils import Utils -@sql_count_checker(query_count=4) -def test_read_snowflake_basic_query_with_weird_formatting(session): - # create table - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - session.create_dataframe([BASIC_TYPE_DATA1, BASIC_TYPE_DATA2]).write.save_as_table( - table_name, table_type="temp" - ) - df = pd.read_snowflake(f"(((SELECT * FROM {table_name})))") - - # test if the snapshot is created - # the table name should match the following reg expression - # "^SNOWPARK_TEMP_TABLE_[0-9A-Z]+$") - sql = df._query_compiler._modin_frame.ordered_dataframe.queries["queries"][-1] - temp_table_pattern = ".*SNOWPARK_TEMP_TABLE_[0-9A-Z]+.*$" - assert re.match(temp_table_pattern, sql) is not None - assert READ_ONLY_TABLE_SUFFIX in sql - - # check the row position snowflake quoted identifier is set - assert ( - df._query_compiler._modin_frame.row_position_snowflake_quoted_identifier - is not None - ) +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_read_snowflake_basic_query_with_weird_formatting(session, relaxed_ordering): + expected_query_count = 4 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count): + # create table + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.create_dataframe( + [BASIC_TYPE_DATA1, BASIC_TYPE_DATA2] + ).write.save_as_table(table_name, table_type="temp") + df = pd.read_snowflake( + f"(((SELECT * FROM {table_name})))", relaxed_ordering=relaxed_ordering + ) - pdf = df.to_pandas() - assert pdf.values[0].tolist() == BASIC_TYPE_DATA1 - assert pdf.values[1].tolist() == BASIC_TYPE_DATA2 + if not relaxed_ordering: + # test if the snapshot is created + # the table name should match the following reg expression + # "^SNOWPARK_TEMP_TABLE_[0-9A-Z]+$") + sql = df._query_compiler._modin_frame.ordered_dataframe.queries["queries"][ + -1 + ] + temp_table_pattern = ".*SNOWPARK_TEMP_TABLE_[0-9A-Z]+.*$" + assert re.match(temp_table_pattern, sql) is not None + assert READ_ONLY_TABLE_SUFFIX in sql + + # check the row position snowflake quoted identifier is set + assert ( + df._query_compiler._modin_frame.row_position_snowflake_quoted_identifier + is not None + ) + pdf = df.to_pandas() + assert pdf.values[0].tolist() == BASIC_TYPE_DATA1 + assert pdf.values[1].tolist() == BASIC_TYPE_DATA2 -@sql_count_checker(query_count=5) -def test_read_snowflake_basic_query_with_comment_preceding_sql_inline_string(session): - # create table - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - session.create_dataframe([BASIC_TYPE_DATA1, BASIC_TYPE_DATA2]).write.save_as_table( - table_name, table_type="temp" - ) - df = pd.read_snowflake(f"--SQL Comment\nSELECT * FROM {table_name}") - - # test if the snapshot is created - # the table name should match the following reg expression - # "^SNOWPARK_TEMP_TABLE_[0-9A-Z]+$") - sql = df._query_compiler._modin_frame.ordered_dataframe.queries["queries"][-1] - temp_table_pattern = ".*SNOWPARK_TEMP_TABLE_[0-9A-Z]+.*$" - assert re.match(temp_table_pattern, sql) is not None - assert READ_ONLY_TABLE_SUFFIX in sql - - # check the row position snowflake quoted identifier is set - assert ( - df._query_compiler._modin_frame.row_position_snowflake_quoted_identifier - is not None - ) - pdf = df.to_pandas() - assert pdf.values[0].tolist() == BASIC_TYPE_DATA1 - assert pdf.values[1].tolist() == BASIC_TYPE_DATA2 +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_read_snowflake_basic_query_with_comment_preceding_sql_inline_string( + session, relaxed_ordering +): + expected_query_count = 5 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count): + # create table + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.create_dataframe( + [BASIC_TYPE_DATA1, BASIC_TYPE_DATA2] + ).write.save_as_table(table_name, table_type="temp") + df = pd.read_snowflake( + f"--SQL Comment\nSELECT * FROM {table_name}", + relaxed_ordering=relaxed_ordering, + ) + + if not relaxed_ordering: + # test if the snapshot is created + # the table name should match the following reg expression + # "^SNOWPARK_TEMP_TABLE_[0-9A-Z]+$") + sql = df._query_compiler._modin_frame.ordered_dataframe.queries["queries"][ + -1 + ] + temp_table_pattern = ".*SNOWPARK_TEMP_TABLE_[0-9A-Z]+.*$" + assert re.match(temp_table_pattern, sql) is not None + assert READ_ONLY_TABLE_SUFFIX in sql + + # check the row position snowflake quoted identifier is set + assert ( + df._query_compiler._modin_frame.row_position_snowflake_quoted_identifier + is not None + ) + + pdf = df.to_pandas() + assert pdf.values[0].tolist() == BASIC_TYPE_DATA1 + assert pdf.values[1].tolist() == BASIC_TYPE_DATA2 -@sql_count_checker(query_count=5) +@pytest.mark.parametrize( + "relaxed_ordering", + [ + pytest.param( + True, + marks=pytest.mark.skip( + "Queries with comment preceding sql multiline string " + "raise a SQL compilation error when relaxed_ordering=True" + ), + ), + False, + ], +) def test_read_snowflake_basic_query_with_comment_preceding_sql_multiline_string( - session, + session, relaxed_ordering ): - # create table - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - session.create_dataframe([BASIC_TYPE_DATA1, BASIC_TYPE_DATA2]).write.save_as_table( - table_name, table_type="temp" - ) - df = pd.read_snowflake( - f"""--SQL Comment 1 - -- SQL Comment 2 - SELECT * FROM {table_name} - -- SQL Comment 3""" - ) + expected_query_count = 5 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count): + # create table + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.create_dataframe( + [BASIC_TYPE_DATA1, BASIC_TYPE_DATA2] + ).write.save_as_table(table_name, table_type="temp") + df = pd.read_snowflake( + f"""--SQL Comment 1 + -- SQL Comment 2 + SELECT * FROM {table_name} + -- SQL Comment 3""", + relaxed_ordering=relaxed_ordering, + ) - # test if the snapshot is created - # the table name should match the following reg expression - # "^SNOWPARK_TEMP_TABLE_[0-9A-Z]+$") - sql = df._query_compiler._modin_frame.ordered_dataframe.queries["queries"][-1] - temp_table_pattern = ".*SNOWPARK_TEMP_TABLE_[0-9A-Z]+.*$" - assert re.match(temp_table_pattern, sql) is not None - assert READ_ONLY_TABLE_SUFFIX in sql - - # check the row position snowflake quoted identifier is set - assert ( - df._query_compiler._modin_frame.row_position_snowflake_quoted_identifier - is not None - ) + if not relaxed_ordering: + # test if the snapshot is created + # the table name should match the following reg expression + # "^SNOWPARK_TEMP_TABLE_[0-9A-Z]+$") + sql = df._query_compiler._modin_frame.ordered_dataframe.queries["queries"][ + -1 + ] + temp_table_pattern = ".*SNOWPARK_TEMP_TABLE_[0-9A-Z]+.*$" + assert re.match(temp_table_pattern, sql) is not None + assert READ_ONLY_TABLE_SUFFIX in sql + + # check the row position snowflake quoted identifier is set + assert ( + df._query_compiler._modin_frame.row_position_snowflake_quoted_identifier + is not None + ) - pdf = df.to_pandas() - assert pdf.values[0].tolist() == BASIC_TYPE_DATA1 - assert pdf.values[1].tolist() == BASIC_TYPE_DATA2 + pdf = df.to_pandas() + assert pdf.values[0].tolist() == BASIC_TYPE_DATA1 + assert pdf.values[1].tolist() == BASIC_TYPE_DATA2 -@sql_count_checker(query_count=4) @pytest.mark.parametrize("only_nulls", [True, False]) -def test_read_snowflake_query_none_nan_condition(session, only_nulls): - # create table - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - session.create_dataframe( - native_pd.DataFrame( - [[1, 2, None], [3, 4, 5], [6, 7, float("nan")]], columns=["A", "B", "C"] +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_read_snowflake_query_none_nan_condition(session, only_nulls, relaxed_ordering): + expected_query_count = 4 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count): + # create table + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.create_dataframe( + native_pd.DataFrame( + [[1, 2, None], [3, 4, 5], [6, 7, float("nan")]], columns=["A", "B", "C"] + ) + ).write.save_as_table(table_name, table_type="temp") + + # crate snowpark pandas dataframe + df = pd.read_snowflake( + f"SELECT * FROM {table_name} WHERE C IS {'NOT' if not only_nulls else ''} NULL", + relaxed_ordering=relaxed_ordering, ) - ).write.save_as_table(table_name, table_type="temp") - - # crate snowpark pandas dataframe - df = pd.read_snowflake( - f"SELECT * FROM {table_name} WHERE C IS {'NOT' if not only_nulls else ''} NULL" - ) - if not only_nulls: - pdf = native_pd.DataFrame([[3, 4, 5]], columns=["A", "B", "C"]) - else: - pdf = native_pd.DataFrame( - [[1, 2, None], [6, 7, float("nan")]], columns=["A", "B", "C"] - ) - assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(df, pdf) + if not only_nulls: + pdf = native_pd.DataFrame([[3, 4, 5]], columns=["A", "B", "C"]) + else: + pdf = native_pd.DataFrame( + [[1, 2, None], [6, 7, float("nan")]], columns=["A", "B", "C"] + ) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(df, pdf) @pytest.mark.parametrize( "col_name_and_alias_tuple", VALID_SNOWFLAKE_COLUMN_NAMES_AND_ALIASES ) -@sql_count_checker(query_count=4) -def test_read_snowflake_query_aliased_columns(session, col_name_and_alias_tuple): - # create table - col_name, alias = col_name_and_alias_tuple - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - Utils.create_table(session, table_name, f"{col_name} int", is_temporary=True) - - # create snowpark pandas dataframe - df = pd.read_snowflake(f"SELECT {col_name} AS {alias} FROM {table_name}") - pdf = df.to_pandas() - assert pdf.index.dtype == np.int64 - assert pdf.columns[0] == extract_pandas_label_from_snowflake_quoted_identifier( - quote_name(alias) - ) +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_read_snowflake_query_aliased_columns( + session, col_name_and_alias_tuple, relaxed_ordering +): + expected_query_count = 4 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count): + # create table + col_name, alias = col_name_and_alias_tuple + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + Utils.create_table(session, table_name, f"{col_name} int", is_temporary=True) + + # create snowpark pandas dataframe + df = pd.read_snowflake( + f"SELECT {col_name} AS {alias} FROM {table_name}", + relaxed_ordering=relaxed_ordering, + ) + pdf = df.to_pandas() + assert pdf.index.dtype == np.int64 + assert pdf.columns[0] == extract_pandas_label_from_snowflake_quoted_identifier( + quote_name(alias) + ) @pytest.mark.parametrize( "col_name_and_alias_tuple", VALID_SNOWFLAKE_COLUMN_NAMES_AND_ALIASES ) -@sql_count_checker(query_count=4) +@pytest.mark.parametrize("relaxed_ordering", [True, False]) def test_read_snowflake_query_aliased_columns_and_columns_kwarg_specified( - session, col_name_and_alias_tuple + session, col_name_and_alias_tuple, relaxed_ordering ): - # create table - col_name, alias = col_name_and_alias_tuple - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - Utils.create_table(session, table_name, f"{col_name} int", is_temporary=True) - - # create snowpark pandas dataframe - pandas_col_name = extract_pandas_label_from_snowflake_quoted_identifier( - quote_name(alias) - ) - df = pd.read_snowflake( - f"SELECT {col_name} AS {alias}, {col_name} FROM {table_name}", - columns=[pandas_col_name], - ) - pdf = df.to_pandas() - assert pdf.index.dtype == np.int64 - assert pdf.columns[0] == pandas_col_name - assert len(pdf.columns) == 1 - - -@sql_count_checker(query_count=3) -def test_read_snowflake_query_with_columns(session): - # create table - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - Utils.create_table(session, table_name, '"col0" int, "col1" int', is_temporary=True) - - # create snowpark pandas dataframe - df = pd.read_snowflake(f"SELECT * FROM {table_name}", columns=["col0"]) - pdf = df.to_pandas() - assert pdf.index.dtype == np.int64 - assert len(pdf.columns) == 1 - assert pdf.columns[0] == "col0" - - -@sql_count_checker(query_count=3) -def test_read_snowflake_query_with_index_col_and_columns(session): - # create table - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - Utils.create_table( - session, - table_name, - '"index_col" int, "col0" int, "col1" int', - is_temporary=True, - ) + expected_query_count = 4 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count): + # create table + col_name, alias = col_name_and_alias_tuple + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + Utils.create_table(session, table_name, f"{col_name} int", is_temporary=True) - # create snowpark pandas dataframe - df = pd.read_snowflake( - f"SELECT * FROM {table_name}", columns=["col0"], index_col="index_col" - ) - pdf = df.to_pandas() - assert pdf.index.dtype == np.int64 - assert len(pdf.columns) == 1 - assert pdf.columns[0] == "col0" - assert pdf.index.name == "index_col" - - -@sql_count_checker(query_count=3) -def test_read_snowflake_query_with_index_col_and_columns_overlap(session): - # create table - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - Utils.create_table( - session, - table_name, - '"index_col" int, "col0" int, "col1" int', - is_temporary=True, - ) + # create snowpark pandas dataframe + pandas_col_name = extract_pandas_label_from_snowflake_quoted_identifier( + quote_name(alias) + ) + df = pd.read_snowflake( + f"SELECT {col_name} AS {alias}, {col_name} FROM {table_name}", + columns=[pandas_col_name], + relaxed_ordering=relaxed_ordering, + ) + pdf = df.to_pandas() + assert pdf.index.dtype == np.int64 + assert pdf.columns[0] == pandas_col_name + assert len(pdf.columns) == 1 + + +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_read_snowflake_query_with_columns(session, relaxed_ordering): + expected_query_count = 3 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count): + # create table + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + Utils.create_table( + session, table_name, '"col0" int, "col1" int', is_temporary=True + ) - # create snowpark pandas dataframe - df = pd.read_snowflake( - f"SELECT * FROM {table_name}", - columns=["col0", "index_col"], - index_col="index_col", - ) - pdf = df.to_pandas() - assert pdf.index.dtype == np.int64 - assert pdf.columns.equals(native_pd.Index(["col0", "index_col"])) - assert pdf.index.name == "index_col" - - -@sql_count_checker(query_count=4) -def test_read_snowflake_query_additional_derived_column(session): - # create table - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - session.create_dataframe( - native_pd.DataFrame([[1, 2], [3, 4], [6, 7]], columns=["A", "B"]) - ).write.save_as_table(table_name, table_type="temp") - - df = pd.read_snowflake( - f"SELECT A, B, SQUARE(A) + SQUARE(B) as C FROM {table_name}", index_col="C" - ) - pdf = native_pd.DataFrame( - [[1, 2], [3, 4], [6, 7]], - index=native_pd.Index([5, 25, 85], name="C"), - columns=["A", "B"], - ) - assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( - df, pdf, check_index_type=False - ) + # create snowpark pandas dataframe + df = pd.read_snowflake( + f"SELECT * FROM {table_name}", + columns=["col0"], + relaxed_ordering=relaxed_ordering, + ) + pdf = df.to_pandas() + assert pdf.index.dtype == np.int64 + assert len(pdf.columns) == 1 + assert pdf.columns[0] == "col0" + + +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_read_snowflake_query_with_index_col_and_columns(session, relaxed_ordering): + expected_query_count = 3 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count): + # create table + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + Utils.create_table( + session, + table_name, + '"index_col" int, "col0" int, "col1" int', + is_temporary=True, + ) + + # create snowpark pandas dataframe + df = pd.read_snowflake( + f"SELECT * FROM {table_name}", + columns=["col0"], + index_col="index_col", + relaxed_ordering=relaxed_ordering, + ) + pdf = df.to_pandas() + assert pdf.index.dtype == np.int64 + assert len(pdf.columns) == 1 + assert pdf.columns[0] == "col0" + assert pdf.index.name == "index_col" + + +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_read_snowflake_query_with_index_col_and_columns_overlap( + session, relaxed_ordering +): + expected_query_count = 3 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count): + # create table + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + Utils.create_table( + session, + table_name, + '"index_col" int, "col0" int, "col1" int', + is_temporary=True, + ) + + # create snowpark pandas dataframe + df = pd.read_snowflake( + f"SELECT * FROM {table_name}", + columns=["col0", "index_col"], + index_col="index_col", + relaxed_ordering=relaxed_ordering, + ) + pdf = df.to_pandas() + assert pdf.index.dtype == np.int64 + assert pdf.columns.equals(native_pd.Index(["col0", "index_col"])) + assert pdf.index.name == "index_col" + + +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_read_snowflake_query_additional_derived_column(session, relaxed_ordering): + expected_query_count = 4 if not relaxed_ordering else 2 + with SqlCounter(query_count=expected_query_count): + # create table + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.create_dataframe( + native_pd.DataFrame([[1, 2], [3, 4], [6, 7]], columns=["A", "B"]) + ).write.save_as_table(table_name, table_type="temp") + + df = pd.read_snowflake( + f"SELECT A, B, SQUARE(A) + SQUARE(B) as C FROM {table_name}", + index_col="C", + relaxed_ordering=relaxed_ordering, + ) + pdf = native_pd.DataFrame( + [[1, 2], [3, 4], [6, 7]], + index=native_pd.Index([5, 25, 85], name="C"), + columns=["A", "B"], + ) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + df, pdf, check_index_type=False + ) @pytest.mark.parametrize( @@ -283,131 +354,163 @@ def test_read_snowflake_query_additional_derived_column(session): ('"COL"', "col"), ), ) -@sql_count_checker(query_count=2) +@pytest.mark.parametrize("relaxed_ordering", [True, False]) def test_read_snowflake_query_non_existing( session, col_name, non_existing_index_col, + relaxed_ordering, ): - # create table - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - Utils.create_table(session, table_name, f"{col_name} int", is_temporary=True) - with pytest.raises( - KeyError, - match="is not in existing snowflake columns", - ): - pd.read_snowflake( - f"SELECT * FROM {table_name}", index_col=non_existing_index_col + expected_query_count = 2 if not relaxed_ordering else 1 + with SqlCounter(query_count=expected_query_count): + # create table + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + Utils.create_table(session, table_name, f"{col_name} int", is_temporary=True) + with pytest.raises( + KeyError, + match="is not in existing snowflake columns", + ): + pd.read_snowflake( + f"SELECT * FROM {table_name}", + index_col=non_existing_index_col, + relaxed_ordering=relaxed_ordering, + ) + + +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_read_snowflake_query_duplicate_columns(session, relaxed_ordering): + expected_query_count = 5 if not relaxed_ordering else 3 + with SqlCounter(query_count=expected_query_count): + # create table + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + Utils.create_table(session, table_name, '"X" int, Y int', is_temporary=True) + + df = pd.read_snowflake( + f"SELECT * FROM {table_name}", + index_col=["X", "X"], + relaxed_ordering=relaxed_ordering, ) + assert df.index.names == ["X", "X"] + assert df.columns.tolist() == ["Y"] - -@sql_count_checker(query_count=5) -def test_read_snowflake_query_duplicate_columns(session): - # create table - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - Utils.create_table(session, table_name, '"X" int, Y int', is_temporary=True) - - df = pd.read_snowflake(f"SELECT * FROM {table_name}", index_col=["X", "X"]) - assert df.index.names == ["X", "X"] - assert df.columns.tolist() == ["Y"] - - df = pd.read_snowflake(f"SELECT * FROM {table_name}", index_col=["X", "Y"]) - assert df.index.names == ["X", "Y"] - assert df.columns.tolist() == [] + df = pd.read_snowflake( + f"SELECT * FROM {table_name}", + index_col=["X", "Y"], + relaxed_ordering=relaxed_ordering, + ) + assert df.index.names == ["X", "Y"] + assert df.columns.tolist() == [] +@pytest.mark.parametrize("relaxed_ordering", [True, False]) @sql_count_checker(query_count=0) -def test_read_snowflake_query_table_not_exist_negative() -> None: +def test_read_snowflake_query_table_not_exist_negative(relaxed_ordering) -> None: table_name = "non_exist_table_error" - with pytest.raises(SnowparkPandasException) as ex: - pd.read_snowflake(f"SELECT * FROM {table_name}") + expected_exception_type = ( + SnowparkPandasException if not relaxed_ordering else SnowparkSQLException + ) + with pytest.raises(expected_exception_type) as ex: + pd.read_snowflake( + f"SELECT * FROM {table_name}", + relaxed_ordering=relaxed_ordering, + ) - assert ex.value.error_code == SnowparkPandasErrorCode.GENERAL_SQL_EXCEPTION.value + expected_error_code = ( + SnowparkPandasErrorCode.GENERAL_SQL_EXCEPTION.value + if not relaxed_ordering + else "1304" + ) + assert ex.value.error_code == expected_error_code @sql_count_checker(query_count=0) @pytest.mark.parametrize( "bad_sql", ["SELET * FROM A", "WITH T1 as (SELECT * FROM A), SELECT * FROM T1"] ) -def test_read_snowflake_query_table_bad_sql_negative(bad_sql) -> None: - with pytest.raises(SnowparkPandasException) as ex: - pd.read_snowflake(bad_sql) - - assert ex.value.error_code == SnowparkPandasErrorCode.GENERAL_SQL_EXCEPTION.value - +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_read_snowflake_query_table_bad_sql_negative(bad_sql, relaxed_ordering) -> None: + expected_exception_type = ( + SnowparkPandasException if not relaxed_ordering else SnowparkSQLException + ) + with pytest.raises(expected_exception_type) as ex: + pd.read_snowflake(bad_sql, relaxed_ordering=relaxed_ordering) -@sql_count_checker(query_count=5, join_count=1) -def test_read_snowflake_query_complex_query_with_join(session): - # create table - table_name1 = Utils.random_name_for_temp_object(TempObjectType.TABLE) - session.create_dataframe( - native_pd.DataFrame( - [[10, "car"], [3, "bus"], [6, "train"]], - columns=["price to consumer", "mode of transportation"], + expected_error_code = ( + SnowparkPandasErrorCode.GENERAL_SQL_EXCEPTION.value + if not relaxed_ordering + else "1304" + ) + assert ex.value.error_code == expected_error_code + + +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_read_snowflake_query_complex_query_with_join(session, relaxed_ordering): + expected_query_count = 5 if not relaxed_ordering else 3 + with SqlCounter(query_count=expected_query_count, join_count=1): + # create table + table_name1 = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.create_dataframe( + native_pd.DataFrame( + [[10, "car"], [3, "bus"], [6, "train"]], + columns=["price to consumer", "mode of transportation"], + ) + ).write.save_as_table(table_name1, table_type="temp") + table_name2 = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.create_dataframe( + native_pd.DataFrame( + [[5, "car"], [0.5, "bus"], [2, "train"]], + columns=["cost to operator", "mode of transportation"], + ) + ).write.save_as_table(table_name2, table_type="temp") + df = ( + pd.read_snowflake( + f"""SELECT "price to consumer" - "cost to operator" as "profit", + "mode of transportation" FROM {table_name1} NATURAL JOIN {table_name2}""", + relaxed_ordering=relaxed_ordering, + ) + .sort_values("profit") + .reset_index(drop=True) ) - ).write.save_as_table(table_name1, table_type="temp") - table_name2 = Utils.random_name_for_temp_object(TempObjectType.TABLE) - session.create_dataframe( - native_pd.DataFrame( - [[5, "car"], [0.5, "bus"], [2, "train"]], - columns=["cost to operator", "mode of transportation"], + pdf = ( + native_pd.DataFrame( + [[5, "car"], [2.5, "bus"], [4, "train"]], + columns=["profit", "mode of transportation"], + ) + .sort_values("profit") + .reset_index(drop=True) ) - ).write.save_as_table(table_name2, table_type="temp") - df = pd.read_snowflake( - f"""SELECT "price to consumer" - "cost to operator" as "profit", - "mode of transportation" FROM {table_name1} NATURAL JOIN {table_name2}""" - ) - pdf = native_pd.DataFrame( - [[5, "car"], [2.5, "bus"], [4, "train"]], - columns=["profit", "mode of transportation"], - ) - assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(df, pdf) - - -@sql_count_checker(query_count=6) -def test_read_snowflake_query_connect_by(session): - # create table - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - session.sql( - f"CREATE OR REPLACE TABLE {table_name} (title VARCHAR, employee_ID INTEGER, manager_ID INTEGER)" - ).collect() - session.sql( - f"""INSERT INTO {table_name} (title, employee_ID, manager_ID) VALUES - ('President', 1, NULL), -- The President has no manager. - ('Vice President Engineering', 10, 1), - ('Programmer', 100, 10), - ('QA Engineer', 101, 10), - ('Vice President HR', 20, 1), - ('Health Insurance Analyst', 200, 20)""" - ).collect() - SQL_QUERY = f"""SELECT employee_ID, manager_ID, title - FROM {table_name} - START WITH title = 'President' - CONNECT BY - manager_ID = PRIOR employee_id - ORDER BY employee_ID""" - native_df = session.sql(SQL_QUERY).to_pandas() - snow_df = ( - pd.read_snowflake(SQL_QUERY).sort_values("EMPLOYEE_ID").reset_index(drop=True) - ) - assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(snow_df, native_df) - - -@sql_count_checker(query_count=1) -def test_read_snowflake_query_with_relaxed_ordering_neg(session): - # create table - table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) - session.create_dataframe( - native_pd.DataFrame([[1, 2], [3, 4], [6, 7]], columns=["A", "B"]) - ).write.save_as_table(table_name, table_type="temp") - - with pytest.raises( - NotImplementedError, - match="does not currently support 'relaxed_ordering=True'", - ): - # create snowpark pandas dataframe - pd.read_snowflake( - f"SELECT A, B, SQUARE(A) + SQUARE(B) as C FROM {table_name}", - relaxed_ordering=True, + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(df, pdf) + + +@pytest.mark.parametrize("relaxed_ordering", [True, False]) +def test_read_snowflake_query_connect_by(session, relaxed_ordering): + expected_query_count = 6 if not relaxed_ordering else 4 + with SqlCounter(query_count=expected_query_count): + # create table + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.sql( + f"CREATE OR REPLACE TABLE {table_name} (title VARCHAR, employee_ID INTEGER, manager_ID INTEGER)" + ).collect() + session.sql( + f"""INSERT INTO {table_name} (title, employee_ID, manager_ID) VALUES + ('President', 1, NULL), -- The President has no manager. + ('Vice President Engineering', 10, 1), + ('Programmer', 100, 10), + ('QA Engineer', 101, 10), + ('Vice President HR', 20, 1), + ('Health Insurance Analyst', 200, 20)""" + ).collect() + SQL_QUERY = f"""SELECT employee_ID, manager_ID, title + FROM {table_name} + START WITH title = 'President' + CONNECT BY + manager_ID = PRIOR employee_id + ORDER BY employee_ID""" + native_df = session.sql(SQL_QUERY).to_pandas() + snow_df = ( + pd.read_snowflake(SQL_QUERY, relaxed_ordering=relaxed_ordering) + .sort_values("EMPLOYEE_ID") + .reset_index(drop=True) ) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck(snow_df, native_df) From 90b5250571ff0173e903f560d10287ccc03de0d0 Mon Sep 17 00:00:00 2001 From: Hazem Elmeleegy Date: Tue, 18 Mar 2025 13:58:31 -0700 Subject: [PATCH 2/4] address comments --- CHANGELOG.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a5b5df7ac..81cdeae953 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,8 +21,7 @@ #### Improvements -- Support relaxed consistency and ordering guarantees in `pd.read_snowflake` for non-query data sources. -- Support relaxed consistency and ordering guarantees in `pd.read_snowflake` for query data sources. +- Support relaxed consistency and ordering guarantees in `pd.read_snowflake` for both named data sources (e.g., tables and views) and query data sources. ## 1.29.1 (2025-03-12) From b5bae4b9fd9bb041a24540b13810aaf8bea27cb1 Mon Sep 17 00:00:00 2001 From: Hazem Elmeleegy Date: Tue, 18 Mar 2025 14:21:11 -0700 Subject: [PATCH 3/4] address comments --- .../snowpark/modin/plugin/extensions/pd_extensions.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/extensions/pd_extensions.py b/src/snowflake/snowpark/modin/plugin/extensions/pd_extensions.py index b86dc147b3..ec117c0d8a 100644 --- a/src/snowflake/snowpark/modin/plugin/extensions/pd_extensions.py +++ b/src/snowflake/snowpark/modin/plugin/extensions/pd_extensions.py @@ -74,8 +74,7 @@ def read_snowflake( Snowpark pandas provides two modes of consistency and ordering semantics. * When `relaxed_ordering` is set to True, Snowpark pandas provides relaxed consistency and ordering guarantees. In particular, the returned DataFrame object will be - directly based on the source given by `name_or_query` as long as it is not a query. Setting `relaxed_ordering` to True along with passing a query in `name_or_query` - is not currently supported. Consistency and isolation guarantees are relaxed in this case because any changes that happen to the source will be reflected in the + directly based on the source given by `name_or_query`. Consistency and isolation guarantees are relaxed in this case because any changes that happen to the source will be reflected in the DataFrame object returned by `pd.read_snowflake`. Ordering guarantees will also be relaxed in the sense that each time an operation is run on the returned DataFrame object, the underlying ordering of rows maybe @@ -85,6 +84,9 @@ def read_snowflake( With this mode, it is still possible to switch to strict ordering guarantees by explicitly calling `df.sort_values()` and providing a custom sort key. This will ensure that future operations will consistently experience the same sort order, but the consistency guarantees will remain relaxed. + Note that when `name_or_query` is a query with an ORDER BY clause, this will only guarantee that the immediate results of the input query are sorted. But it still gives no guarantees + on the order of the final results (after applying a sequence of pandas operations to those initial results). + * When `relaxed_ordering` is set to False, Snowpark pandas provides the same consistency and ordering guarantees for `read_snowflake` as if local files were read. For example, calling `df.head(5)` two consecutive times is guaranteed to result in the exact same set of 5 rows each time and with the same ordering. Depending on the type of source, `pd.read_snowflake` will do one of the following From 7194232df8398cc6be347d4c6b8d9fc5951971eb Mon Sep 17 00:00:00 2001 From: Hazem Elmeleegy Date: Tue, 18 Mar 2025 15:01:11 -0700 Subject: [PATCH 4/4] address comments --- .../io/test_read_snowflake_query_call.py | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/tests/integ/modin/io/test_read_snowflake_query_call.py b/tests/integ/modin/io/test_read_snowflake_query_call.py index aa7ace1717..e4bd60f699 100644 --- a/tests/integ/modin/io/test_read_snowflake_query_call.py +++ b/tests/integ/modin/io/test_read_snowflake_query_call.py @@ -8,10 +8,11 @@ import snowflake.snowpark.modin.plugin # noqa: F401 from snowflake.snowpark._internal.utils import TempObjectType +from snowflake.snowpark.exceptions import SnowparkSQLException from tests.integ.modin.utils import ( assert_snowpark_pandas_equals_to_pandas_without_dtypecheck, ) -from tests.integ.utils.sql_counter import SqlCounter +from tests.integ.utils.sql_counter import SqlCounter, sql_count_checker from tests.utils import Utils @@ -62,6 +63,39 @@ def filter_by_role(session, table_name, role): session.sql("DROP PROCEDURE filter_by_role(VARCHAR, VARCHAR)").collect() +@sql_count_checker(query_count=3) +def test_read_snowflake_call_sproc_relaxed_ordering_neg(session): + session.sql( + """ + CREATE OR REPLACE PROCEDURE filter_by_role(tableName VARCHAR, role VARCHAR) + RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR) + LANGUAGE PYTHON + RUNTIME_VERSION = '3.8' + PACKAGES = ('snowflake-snowpark-python') + HANDLER = 'filter_by_role' + AS $$from snowflake.snowpark.functions import col +def filter_by_role(session, table_name, role): + df = session.table(table_name) + return df.filter(col('role') == role) + $$""" + ).collect() + try: + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + session.sql( + f"""CREATE OR REPLACE TEMPORARY TABLE {table_name}(id NUMBER, name VARCHAR, role VARCHAR) AS SELECT * FROM VALUES(1, 'Alice', 'op'), (2, 'Bob', 'dev')""" + ).collect() + with pytest.raises( + SnowparkSQLException, + match="unexpected 'CALL'", + ): + pd.read_snowflake( + f"CALL filter_by_role('{table_name}', 'op')", + relaxed_ordering=True, + ).head() + finally: + session.sql("DROP PROCEDURE filter_by_role(VARCHAR, VARCHAR)").collect() + + @pytest.mark.parametrize("relaxed_ordering", [True, False]) def test_read_snowflake_system_function(session, relaxed_ordering): expected_query_count = 4 if not relaxed_ordering else 2