From cb282f9bce377251595deb87317c2b71de0dec54 Mon Sep 17 00:00:00 2001 From: Shingo OKAWA Date: Wed, 19 Apr 2023 20:04:11 +0900 Subject: [PATCH 1/4] Add predicateHints to load_as_pandas Signed-off-by: Shingo OKAWA --- python/delta_sharing/delta_sharing.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/delta_sharing/delta_sharing.py b/python/delta_sharing/delta_sharing.py index e6f0b0f1e..2e297551e 100644 --- a/python/delta_sharing/delta_sharing.py +++ b/python/delta_sharing/delta_sharing.py @@ -53,6 +53,7 @@ def _parse_url(url: str) -> Tuple[str, str, str, str]: def load_as_pandas( url: str, + predicateHints: Optional[Sequence[str]] = None, limit: Optional[int] = None, version: Optional[int] = None, timestamp: Optional[str] = None @@ -72,6 +73,7 @@ def load_as_pandas( return DeltaSharingReader( table=Table(name=table, share=share, schema=schema), rest_client=DataSharingRestClient(profile), + predicateHints=predicateHints, limit=limit, version=version, timestamp=timestamp From 608e774f7c0eaa6151704667484ae45f8b6ace25 Mon Sep 17 00:00:00 2001 From: Shingo OKAWA Date: Thu, 20 Apr 2023 01:12:48 +0900 Subject: [PATCH 2/4] Add jsonPredicateHints to load_as_pandas Signed-off-by: Shingo OKAWA --- python/delta_sharing/delta_sharing.py | 4 +++- python/delta_sharing/protocol.py | 3 +++ python/delta_sharing/reader.py | 21 ++++++++++++++++++++- python/delta_sharing/rest_client.py | 4 ++++ python/delta_sharing/tests/test_reader.py | 14 +++++++++++++- 5 files changed, 43 insertions(+), 3 deletions(-) diff --git a/python/delta_sharing/delta_sharing.py b/python/delta_sharing/delta_sharing.py index 2e297551e..b1d0e87ed 100644 --- a/python/delta_sharing/delta_sharing.py +++ b/python/delta_sharing/delta_sharing.py @@ -26,7 +26,7 @@ except ImportError: pass -from delta_sharing.protocol import DeltaSharingProfile, Schema, Share, Table +from delta_sharing.protocol import DeltaSharingProfile, JsonPredicateHints, Schema, Share, Table from delta_sharing.reader import DeltaSharingReader from delta_sharing.rest_client import DataSharingRestClient @@ -53,6 +53,7 @@ def _parse_url(url: str) -> Tuple[str, str, str, str]: def load_as_pandas( url: str, + jsonPredicateHints: Optional[JsonPredicateHints] = None, predicateHints: Optional[Sequence[str]] = None, limit: Optional[int] = None, version: Optional[int] = None, @@ -73,6 +74,7 @@ def load_as_pandas( return DeltaSharingReader( table=Table(name=table, share=share, schema=schema), rest_client=DataSharingRestClient(profile), + jsonPredicateHints=jsonPredicateHints, predicateHints=predicateHints, limit=limit, version=version, diff --git a/python/delta_sharing/protocol.py b/python/delta_sharing/protocol.py index 289133864..f5baf16de 100644 --- a/python/delta_sharing/protocol.py +++ b/python/delta_sharing/protocol.py @@ -21,6 +21,9 @@ import fsspec +JsonPredicateHints = Dict[str, Union['JsonPredicateHints', str]] + + @dataclass(frozen=True) class DeltaSharingProfile: CURRENT: ClassVar[int] = 1 diff --git a/python/delta_sharing/reader.py b/python/delta_sharing/reader.py index 92dc8c949..e1d8807c3 100644 --- a/python/delta_sharing/reader.py +++ b/python/delta_sharing/reader.py @@ -22,7 +22,7 @@ from pyarrow.dataset import dataset from delta_sharing.converter import to_converters, get_empty_table -from delta_sharing.protocol import AddCdcFile, CdfOptions, FileAction, Table +from delta_sharing.protocol import AddCdcFile, CdfOptions, FileAction, JsonPredicateHints, Table from delta_sharing.rest_client import DataSharingRestClient @@ -32,6 +32,7 @@ def __init__( table: Table, rest_client: DataSharingRestClient, *, + jsonPredicateHints: Optional[JsonPredicateHints] = None, predicateHints: Optional[Sequence[str]] = None, limit: Optional[int] = None, version: Optional[int] = None, @@ -40,6 +41,7 @@ def __init__( self._table = table self._rest_client = rest_client + self._jsonPredicateHints = jsonPredicateHints if predicateHints is not None: assert isinstance(predicateHints, Sequence) assert all(isinstance(predicateHint, str) for predicateHint in predicateHints) @@ -55,8 +57,21 @@ def __init__( def table(self) -> Table: return self._table + def jsonPredicateHints( + self, + jsonPredicateHints: Optional[JsonPredicateHints] + ) -> "DeltaSharingReader": + return self._copy( + jsonPredicateHints=jsonPredicateHints, + predicateHints=self._predicateHints, + limit=self._limit, + version=self._version, + timestamp=self._timestamp + ) + def predicateHints(self, predicateHints: Optional[Sequence[str]]) -> "DeltaSharingReader": return self._copy( + jsonPredicateHints=self._jsonPredicateHints, predicateHints=predicateHints, limit=self._limit, version=self._version, @@ -65,6 +80,7 @@ def predicateHints(self, predicateHints: Optional[Sequence[str]]) -> "DeltaShari def limit(self, limit: Optional[int]) -> "DeltaSharingReader": return self._copy( + jsonPredicateHints=self._jsonPredicateHints, predicateHints=self._predicateHints, limit=limit, version=self._version, @@ -74,6 +90,7 @@ def limit(self, limit: Optional[int]) -> "DeltaSharingReader": def to_pandas(self) -> pd.DataFrame: response = self._rest_client.list_files_in_table( self._table, + jsonPredicateHints=self._jsonPredicateHints, predicateHints=self._predicateHints, limitHint=self._limit, version=self._version, @@ -131,6 +148,7 @@ def table_changes_to_pandas(self, cdfOptions: CdfOptions) -> pd.DataFrame: def _copy( self, *, + jsonPredicateHints: Optional[JsonPredicateHints], predicateHints: Optional[Sequence[str]], limit: Optional[int], version: Optional[int], @@ -139,6 +157,7 @@ def _copy( return DeltaSharingReader( table=self._table, rest_client=self._rest_client, + jsonPredicateHints=jsonPredicateHints, predicateHints=predicateHints, limit=limit, version=version, diff --git a/python/delta_sharing/rest_client.py b/python/delta_sharing/rest_client.py index 55067991d..0ca7bf16e 100644 --- a/python/delta_sharing/rest_client.py +++ b/python/delta_sharing/rest_client.py @@ -32,6 +32,7 @@ FileAction, CdfOptions, DeltaSharingProfile, + JsonPredicateHints, Metadata, Protocol, Share, @@ -272,12 +273,15 @@ def list_files_in_table( self, table: Table, *, + jsonPredicateHints: Optional[JsonPredicateHints] = None, predicateHints: Optional[Sequence[str]] = None, limitHint: Optional[int] = None, version: Optional[int] = None, timestamp: Optional[str] = None, ) -> ListFilesInTableResponse: data: Dict = {} + if jsonPredicateHints is not None: + data["jsonPredicateHints"] = jsonPredicateHints if predicateHints is not None: data["predicateHints"] = predicateHints if limitHint is not None: diff --git a/python/delta_sharing/tests/test_reader.py b/python/delta_sharing/tests/test_reader.py index 3c5f2f91e..54bf52917 100644 --- a/python/delta_sharing/tests/test_reader.py +++ b/python/delta_sharing/tests/test_reader.py @@ -20,7 +20,15 @@ import pandas as pd -from delta_sharing.protocol import AddFile, AddCdcFile, CdfOptions, Metadata, RemoveFile, Table +from delta_sharing.protocol import ( + AddFile, + AddCdcFile, + CdfOptions, + JsonPredicateHints, + Metadata, + RemoveFile, + Table, +) from delta_sharing.reader import DeltaSharingReader from delta_sharing.rest_client import ( ListFilesInTableResponse, @@ -42,6 +50,7 @@ def list_files_in_table( self, table: Table, *, + jsonPredicateHints: Optional[JsonPredicateHints] = None, predicateHints: Optional[Sequence[str]] = None, limitHint: Optional[int] = None, version: Optional[int] = None, @@ -97,6 +106,7 @@ def list_files_in_table( self, table: Table, *, + jsonPredicateHints: Optional[JsonPredicateHints] = None, predicateHints: Optional[Sequence[str]] = None, limitHint: Optional[int] = None, version: Optional[int] = None, @@ -156,6 +166,7 @@ def list_files_in_table( self, table: Table, *, + jsonPredicateHints: Optional[JsonPredicateHints] = None, predicateHints: Optional[Sequence[str]] = None, limitHint: Optional[int] = None, version: Optional[int] = None, @@ -211,6 +222,7 @@ def list_files_in_table( self, table: Table, *, + jsonPredicateHints: Optional[JsonPredicateHints] = None, predicateHints: Optional[Sequence[str]] = None, limitHint: Optional[int] = None, version: Optional[int] = None, From 2636f4a1fd529a848a6343149d9006c42cf0154d Mon Sep 17 00:00:00 2001 From: Shingo OKAWA Date: Thu, 27 Apr 2023 13:25:46 +0900 Subject: [PATCH 3/4] Remove JsonPredicateHints with Dict[str, Any] Signed-off-by: Shingo OKAWA --- python/delta_sharing/delta_sharing.py | 6 +++--- python/delta_sharing/protocol.py | 3 --- python/delta_sharing/reader.py | 8 ++++---- python/delta_sharing/rest_client.py | 3 +-- python/delta_sharing/tests/test_reader.py | 11 +++++------ 5 files changed, 13 insertions(+), 18 deletions(-) diff --git a/python/delta_sharing/delta_sharing.py b/python/delta_sharing/delta_sharing.py index b1d0e87ed..a8167d362 100644 --- a/python/delta_sharing/delta_sharing.py +++ b/python/delta_sharing/delta_sharing.py @@ -14,7 +14,7 @@ # limitations under the License. # from itertools import chain -from typing import BinaryIO, List, Optional, Sequence, TextIO, Tuple, Union +from typing import Any, BinaryIO, Dict, List, Optional, Sequence, TextIO, Tuple, Union from pathlib import Path import pandas as pd @@ -26,7 +26,7 @@ except ImportError: pass -from delta_sharing.protocol import DeltaSharingProfile, JsonPredicateHints, Schema, Share, Table +from delta_sharing.protocol import DeltaSharingProfile, Schema, Share, Table from delta_sharing.reader import DeltaSharingReader from delta_sharing.rest_client import DataSharingRestClient @@ -53,7 +53,7 @@ def _parse_url(url: str) -> Tuple[str, str, str, str]: def load_as_pandas( url: str, - jsonPredicateHints: Optional[JsonPredicateHints] = None, + jsonPredicateHints: Optional[Dict[str, Any]] = None, predicateHints: Optional[Sequence[str]] = None, limit: Optional[int] = None, version: Optional[int] = None, diff --git a/python/delta_sharing/protocol.py b/python/delta_sharing/protocol.py index f5baf16de..289133864 100644 --- a/python/delta_sharing/protocol.py +++ b/python/delta_sharing/protocol.py @@ -21,9 +21,6 @@ import fsspec -JsonPredicateHints = Dict[str, Union['JsonPredicateHints', str]] - - @dataclass(frozen=True) class DeltaSharingProfile: CURRENT: ClassVar[int] = 1 diff --git a/python/delta_sharing/reader.py b/python/delta_sharing/reader.py index e1d8807c3..9265c5480 100644 --- a/python/delta_sharing/reader.py +++ b/python/delta_sharing/reader.py @@ -22,7 +22,7 @@ from pyarrow.dataset import dataset from delta_sharing.converter import to_converters, get_empty_table -from delta_sharing.protocol import AddCdcFile, CdfOptions, FileAction, JsonPredicateHints, Table +from delta_sharing.protocol import AddCdcFile, CdfOptions, FileAction, Table from delta_sharing.rest_client import DataSharingRestClient @@ -32,7 +32,7 @@ def __init__( table: Table, rest_client: DataSharingRestClient, *, - jsonPredicateHints: Optional[JsonPredicateHints] = None, + jsonPredicateHints: Optional[Dict[str, Any]] = None, predicateHints: Optional[Sequence[str]] = None, limit: Optional[int] = None, version: Optional[int] = None, @@ -59,7 +59,7 @@ def table(self) -> Table: def jsonPredicateHints( self, - jsonPredicateHints: Optional[JsonPredicateHints] + jsonPredicateHints: Optional[Dict[str, Any]] ) -> "DeltaSharingReader": return self._copy( jsonPredicateHints=jsonPredicateHints, @@ -148,7 +148,7 @@ def table_changes_to_pandas(self, cdfOptions: CdfOptions) -> pd.DataFrame: def _copy( self, *, - jsonPredicateHints: Optional[JsonPredicateHints], + jsonPredicateHints: Optional[Dict[str, Any]], predicateHints: Optional[Sequence[str]], limit: Optional[int], version: Optional[int], diff --git a/python/delta_sharing/rest_client.py b/python/delta_sharing/rest_client.py index 0ca7bf16e..6435c56ef 100644 --- a/python/delta_sharing/rest_client.py +++ b/python/delta_sharing/rest_client.py @@ -32,7 +32,6 @@ FileAction, CdfOptions, DeltaSharingProfile, - JsonPredicateHints, Metadata, Protocol, Share, @@ -273,7 +272,7 @@ def list_files_in_table( self, table: Table, *, - jsonPredicateHints: Optional[JsonPredicateHints] = None, + jsonPredicateHints: Optional[Dict[str, Any]] = None, predicateHints: Optional[Sequence[str]] = None, limitHint: Optional[int] = None, version: Optional[int] = None, diff --git a/python/delta_sharing/tests/test_reader.py b/python/delta_sharing/tests/test_reader.py index 54bf52917..50d6b0072 100644 --- a/python/delta_sharing/tests/test_reader.py +++ b/python/delta_sharing/tests/test_reader.py @@ -16,7 +16,7 @@ import pytest from datetime import date -from typing import Optional, Sequence +from typing import Any, Dict, Optional, Sequence import pandas as pd @@ -24,7 +24,6 @@ AddFile, AddCdcFile, CdfOptions, - JsonPredicateHints, Metadata, RemoveFile, Table, @@ -50,7 +49,7 @@ def list_files_in_table( self, table: Table, *, - jsonPredicateHints: Optional[JsonPredicateHints] = None, + jsonPredicateHints: Optional[Dict[str, Any]] = None, predicateHints: Optional[Sequence[str]] = None, limitHint: Optional[int] = None, version: Optional[int] = None, @@ -106,7 +105,7 @@ def list_files_in_table( self, table: Table, *, - jsonPredicateHints: Optional[JsonPredicateHints] = None, + jsonPredicateHints: Optional[Dict[str, Any]] = None, predicateHints: Optional[Sequence[str]] = None, limitHint: Optional[int] = None, version: Optional[int] = None, @@ -166,7 +165,7 @@ def list_files_in_table( self, table: Table, *, - jsonPredicateHints: Optional[JsonPredicateHints] = None, + jsonPredicateHints: Optional[Dict[str, Any]] = None, predicateHints: Optional[Sequence[str]] = None, limitHint: Optional[int] = None, version: Optional[int] = None, @@ -222,7 +221,7 @@ def list_files_in_table( self, table: Table, *, - jsonPredicateHints: Optional[JsonPredicateHints] = None, + jsonPredicateHints: Optional[Dict[str, Any]] = None, predicateHints: Optional[Sequence[str]] = None, limitHint: Optional[int] = None, version: Optional[int] = None, From 45e016cd81fab47531a540fb9e6ee823479a720a Mon Sep 17 00:00:00 2001 From: Shingo OKAWA Date: Thu, 27 Apr 2023 13:45:54 +0900 Subject: [PATCH 4/4] Fix integration test parameters Signed-off-by: Shingo OKAWA --- .../delta_sharing/tests/test_delta_sharing.py | 66 +++++++++++++++++-- 1 file changed, 61 insertions(+), 5 deletions(-) diff --git a/python/delta_sharing/tests/test_delta_sharing.py b/python/delta_sharing/tests/test_delta_sharing.py index 030bf0e59..ab7499818 100644 --- a/python/delta_sharing/tests/test_delta_sharing.py +++ b/python/delta_sharing/tests/test_delta_sharing.py @@ -14,7 +14,7 @@ # limitations under the License. # from datetime import date, datetime -from typing import Optional, Sequence +from typing import Any, Dict, Optional, Sequence import pandas as pd import pytest @@ -149,12 +149,14 @@ def list_all_tables( @pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) @pytest.mark.parametrize( - "fragments,limit,version,expected", + "fragments,jsonPredicateHints,predicateHints,limit,version,expected", [ pytest.param( "share1.default.table1", None, None, + None, + None, pd.DataFrame( { "eventTime": [ @@ -170,6 +172,8 @@ def list_all_tables( "share2.default.table2", None, None, + None, + None, pd.DataFrame( { "eventTime": [ @@ -185,6 +189,8 @@ def list_all_tables( "share1.default.table3", None, None, + None, + None, pd.DataFrame( { "eventTime": [ @@ -200,6 +206,8 @@ def list_all_tables( ), pytest.param( "share1.default.table3", + None, + None, 0, None, pd.DataFrame( @@ -213,6 +221,8 @@ def list_all_tables( ), pytest.param( "share1.default.table3", + None, + None, 1, None, pd.DataFrame( @@ -226,6 +236,8 @@ def list_all_tables( ), pytest.param( "share1.default.table3", + None, + None, 2, None, pd.DataFrame( @@ -242,6 +254,8 @@ def list_all_tables( ), pytest.param( "share1.default.table3", + None, + None, 3, None, pd.DataFrame( @@ -259,6 +273,8 @@ def list_all_tables( ), pytest.param( "share1.default.table3", + None, + None, 4, None, pd.DataFrame( @@ -278,6 +294,8 @@ def list_all_tables( "share8.default.cdf_table_cdf_enabled", None, None, + None, + None, pd.DataFrame( { "name": ["1", "2"], @@ -290,6 +308,8 @@ def list_all_tables( pytest.param( "share8.default.cdf_table_cdf_enabled", None, + None, + None, 1, pd.DataFrame( { @@ -304,6 +324,8 @@ def list_all_tables( "share3.default.table4", None, None, + None, + None, pd.DataFrame( { "type": [None, None], @@ -320,6 +342,8 @@ def list_all_tables( "share4.default.test_gzip", None, None, + None, + None, pd.DataFrame({"a": [True], "b": pd.Series([1], dtype="int32"), "c": ["Hi"]}), id="table column order is not the same as parquet files", ), @@ -327,6 +351,8 @@ def list_all_tables( "share_azure.default.table_wasb", None, None, + None, + None, pd.DataFrame( { "c1": ["foo bar"], @@ -339,6 +365,8 @@ def list_all_tables( "share_azure.default.table_abfs", None, None, + None, + None, pd.DataFrame( { "c1": ["foo bar"], @@ -351,6 +379,8 @@ def list_all_tables( "share_gcp.default.table_gcs", None, None, + None, + None, pd.DataFrame( { "c1": ["foo bar"], @@ -364,20 +394,31 @@ def list_all_tables( def test_load_as_pandas_success( profile_path: str, fragments: str, + jsonPredicateHints: Optional[Dict[str, Any]], + predicateHints: Optional[Sequence[str]], limit: Optional[int], version: Optional[int], expected: pd.DataFrame ): - pdf = load_as_pandas(f"{profile_path}#{fragments}", limit, version, None) + pdf = load_as_pandas( + f"{profile_path}#{fragments}", + jsonPredicateHints, + predicateHints, + limit, + version, + None + ) pd.testing.assert_frame_equal(pdf, expected) @pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) @pytest.mark.parametrize( - "fragments,version,timestamp,error", + "fragments,jsonPredicateHints,predicateHints,version,timestamp,error", [ pytest.param( "share1.default.table1", + None, + None, 1, None, "Reading table by version or timestamp is not supported", @@ -386,12 +427,16 @@ def test_load_as_pandas_success( pytest.param( "share1.default.table1", None, + None, + None, "random_timestamp", "Reading table by version or timestamp is not supported", id="timestamp not supported", ), pytest.param( "share8.default.cdf_table_cdf_enabled", + None, + None, 1, "random_timestamp", "Please only provide one of", @@ -400,6 +445,8 @@ def test_load_as_pandas_success( pytest.param( "share8.default.cdf_table_cdf_enabled", None, + None, + None, "2000-01-01T00:00:00Z", "Please use a timestamp greater", id="timestamp too early ", @@ -409,12 +456,21 @@ def test_load_as_pandas_success( def test_load_as_pandas_exception( profile_path: str, fragments: str, + jsonPredicateHints: Optional[Dict[str, Any]], + predicateHints: Optional[Sequence[str]], version: Optional[int], timestamp: Optional[str], error: Optional[str] ): try: - load_as_pandas(f"{profile_path}#{fragments}", None, version, timestamp) + load_as_pandas( + f"{profile_path}#{fragments}", + jsonPredicateHints, + predicateHints, + None, + version, + timestamp + ) assert False except Exception as e: assert isinstance(e, HTTPError)