diff --git a/python/delta_sharing/delta_sharing.py b/python/delta_sharing/delta_sharing.py index 9301bc6f6..e65f68939 100644 --- a/python/delta_sharing/delta_sharing.py +++ b/python/delta_sharing/delta_sharing.py @@ -14,7 +14,7 @@ # limitations under the License. # from itertools import chain -from typing import BinaryIO, List, Optional, Sequence, TextIO, Tuple, Union +from typing import Any, BinaryIO, Dict, List, Optional, Sequence, TextIO, Tuple, Union from pathlib import Path import pandas as pd @@ -100,6 +100,8 @@ def get_table_metadata(url: str) -> Metadata: def load_as_pandas( url: str, + jsonPredicateHints: Optional[Dict[str, Any]] = None, + predicateHints: Optional[Sequence[str]] = None, limit: Optional[int] = None, version: Optional[int] = None, timestamp: Optional[str] = None @@ -119,6 +121,8 @@ def load_as_pandas( return DeltaSharingReader( table=Table(name=table, share=share, schema=schema), rest_client=DataSharingRestClient(profile), + jsonPredicateHints=jsonPredicateHints, + predicateHints=predicateHints, limit=limit, version=version, timestamp=timestamp diff --git a/python/delta_sharing/reader.py b/python/delta_sharing/reader.py index 92dc8c949..9265c5480 100644 --- a/python/delta_sharing/reader.py +++ b/python/delta_sharing/reader.py @@ -32,6 +32,7 @@ def __init__( table: Table, rest_client: DataSharingRestClient, *, + jsonPredicateHints: Optional[Dict[str, Any]] = None, predicateHints: Optional[Sequence[str]] = None, limit: Optional[int] = None, version: Optional[int] = None, @@ -40,6 +41,7 @@ def __init__( self._table = table self._rest_client = rest_client + self._jsonPredicateHints = jsonPredicateHints if predicateHints is not None: assert isinstance(predicateHints, Sequence) assert all(isinstance(predicateHint, str) for predicateHint in predicateHints) @@ -55,8 +57,21 @@ def __init__( def table(self) -> Table: return self._table + def jsonPredicateHints( + self, + jsonPredicateHints: Optional[Dict[str, Any]] + ) -> "DeltaSharingReader": + return self._copy( + jsonPredicateHints=jsonPredicateHints, + predicateHints=self._predicateHints, + limit=self._limit, + version=self._version, + timestamp=self._timestamp + ) + def predicateHints(self, predicateHints: Optional[Sequence[str]]) -> "DeltaSharingReader": return self._copy( + jsonPredicateHints=self._jsonPredicateHints, predicateHints=predicateHints, limit=self._limit, version=self._version, @@ -65,6 +80,7 @@ def predicateHints(self, predicateHints: Optional[Sequence[str]]) -> "DeltaShari def limit(self, limit: Optional[int]) -> "DeltaSharingReader": return self._copy( + jsonPredicateHints=self._jsonPredicateHints, predicateHints=self._predicateHints, limit=limit, version=self._version, @@ -74,6 +90,7 @@ def limit(self, limit: Optional[int]) -> "DeltaSharingReader": def to_pandas(self) -> pd.DataFrame: response = self._rest_client.list_files_in_table( self._table, + jsonPredicateHints=self._jsonPredicateHints, predicateHints=self._predicateHints, limitHint=self._limit, version=self._version, @@ -131,6 +148,7 @@ def table_changes_to_pandas(self, cdfOptions: CdfOptions) -> pd.DataFrame: def _copy( self, *, + jsonPredicateHints: Optional[Dict[str, Any]], predicateHints: Optional[Sequence[str]], limit: Optional[int], version: Optional[int], @@ -139,6 +157,7 @@ def _copy( return DeltaSharingReader( table=self._table, rest_client=self._rest_client, + jsonPredicateHints=jsonPredicateHints, predicateHints=predicateHints, limit=limit, version=version, diff --git a/python/delta_sharing/rest_client.py b/python/delta_sharing/rest_client.py index 55067991d..6435c56ef 100644 --- a/python/delta_sharing/rest_client.py +++ b/python/delta_sharing/rest_client.py @@ -272,12 +272,15 @@ def list_files_in_table( self, table: Table, *, + jsonPredicateHints: Optional[Dict[str, Any]] = None, predicateHints: Optional[Sequence[str]] = None, limitHint: Optional[int] = None, version: Optional[int] = None, timestamp: Optional[str] = None, ) -> ListFilesInTableResponse: data: Dict = {} + if jsonPredicateHints is not None: + data["jsonPredicateHints"] = jsonPredicateHints if predicateHints is not None: data["predicateHints"] = predicateHints if limitHint is not None: diff --git a/python/delta_sharing/tests/test_delta_sharing.py b/python/delta_sharing/tests/test_delta_sharing.py index e0c199ff7..216622584 100644 --- a/python/delta_sharing/tests/test_delta_sharing.py +++ b/python/delta_sharing/tests/test_delta_sharing.py @@ -14,7 +14,7 @@ # limitations under the License. # from datetime import date, datetime -from typing import Optional, Sequence +from typing import Any, Dict, Optional, Sequence import pandas as pd import pytest @@ -486,20 +486,31 @@ def test_get_table_protocol(profile_path: str): def test_load_as_pandas_success( profile_path: str, fragments: str, + jsonPredicateHints: Optional[Dict[str, Any]], + predicateHints: Optional[Sequence[str]], limit: Optional[int], version: Optional[int], expected: pd.DataFrame ): - pdf = load_as_pandas(f"{profile_path}#{fragments}", limit, version, None) + pdf = load_as_pandas( + f"{profile_path}#{fragments}", + jsonPredicateHints, + predicateHints, + limit, + version, + None + ) pd.testing.assert_frame_equal(pdf, expected) @pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) @pytest.mark.parametrize( - "fragments,version,timestamp,error", + "fragments,jsonPredicateHints,predicateHints,version,timestamp,error", [ pytest.param( "share1.default.table1", + None, + None, 1, None, "Reading table by version or timestamp is not supported", @@ -508,12 +519,16 @@ def test_load_as_pandas_success( pytest.param( "share1.default.table1", None, + None, + None, "random_timestamp", "Reading table by version or timestamp is not supported", id="timestamp not supported", ), pytest.param( "share8.default.cdf_table_cdf_enabled", + None, + None, 1, "random_timestamp", "Please only provide one of", @@ -522,6 +537,8 @@ def test_load_as_pandas_success( pytest.param( "share8.default.cdf_table_cdf_enabled", None, + None, + None, "2000-01-01T00:00:00Z", "Please use a timestamp greater", id="timestamp too early ", @@ -531,12 +548,21 @@ def test_load_as_pandas_success( def test_load_as_pandas_exception( profile_path: str, fragments: str, + jsonPredicateHints: Optional[Dict[str, Any]], + predicateHints: Optional[Sequence[str]], version: Optional[int], timestamp: Optional[str], error: Optional[str] ): try: - load_as_pandas(f"{profile_path}#{fragments}", None, version, timestamp) + load_as_pandas( + f"{profile_path}#{fragments}", + jsonPredicateHints, + predicateHints, + None, + version, + timestamp + ) assert False except Exception as e: assert isinstance(e, HTTPError) diff --git a/python/delta_sharing/tests/test_reader.py b/python/delta_sharing/tests/test_reader.py index 3c5f2f91e..50d6b0072 100644 --- a/python/delta_sharing/tests/test_reader.py +++ b/python/delta_sharing/tests/test_reader.py @@ -16,11 +16,18 @@ import pytest from datetime import date -from typing import Optional, Sequence +from typing import Any, Dict, Optional, Sequence import pandas as pd -from delta_sharing.protocol import AddFile, AddCdcFile, CdfOptions, Metadata, RemoveFile, Table +from delta_sharing.protocol import ( + AddFile, + AddCdcFile, + CdfOptions, + Metadata, + RemoveFile, + Table, +) from delta_sharing.reader import DeltaSharingReader from delta_sharing.rest_client import ( ListFilesInTableResponse, @@ -42,6 +49,7 @@ def list_files_in_table( self, table: Table, *, + jsonPredicateHints: Optional[Dict[str, Any]] = None, predicateHints: Optional[Sequence[str]] = None, limitHint: Optional[int] = None, version: Optional[int] = None, @@ -97,6 +105,7 @@ def list_files_in_table( self, table: Table, *, + jsonPredicateHints: Optional[Dict[str, Any]] = None, predicateHints: Optional[Sequence[str]] = None, limitHint: Optional[int] = None, version: Optional[int] = None, @@ -156,6 +165,7 @@ def list_files_in_table( self, table: Table, *, + jsonPredicateHints: Optional[Dict[str, Any]] = None, predicateHints: Optional[Sequence[str]] = None, limitHint: Optional[int] = None, version: Optional[int] = None, @@ -211,6 +221,7 @@ def list_files_in_table( self, table: Table, *, + jsonPredicateHints: Optional[Dict[str, Any]] = None, predicateHints: Optional[Sequence[str]] = None, limitHint: Optional[int] = None, version: Optional[int] = None,