Skip to content

Commit

Permalink
Add json predicate hints to load_as_pandas. (#454)
Browse files Browse the repository at this point in the history
* Add json predicate hints to load_as_pandas.

* remote bogus print.

* fix lint.
  • Loading branch information
chakankardb authored Jan 25, 2024
1 parent b3af8b9 commit 951d91d
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 1 deletion.
6 changes: 5 additions & 1 deletion python/delta_sharing/delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ def load_as_pandas(
url: str,
limit: Optional[int] = None,
version: Optional[int] = None,
timestamp: Optional[str] = None
timestamp: Optional[str] = None,
jsonPredicateHints: Optional[str] = None,
) -> pd.DataFrame:
"""
Load the shared table using the given url as a pandas DataFrame.
Expand All @@ -112,13 +113,16 @@ def load_as_pandas(
Use this optional parameter to explore the shared table without loading the entire table to
the memory.
:param version: an optional non-negative int. Load the snapshot of table at version
:param jsonPredicateHints: Predicate hints to be applied to the table. For more details see:
https://github.com/delta-io/delta-sharing/blob/main/PROTOCOL.md#json-predicates-for-filtering
:return: A pandas DataFrame representing the shared table.
"""
profile_json, share, schema, table = _parse_url(url)
profile = DeltaSharingProfile.read_from_file(profile_json)
return DeltaSharingReader(
table=Table(name=table, share=share, schema=schema),
rest_client=DataSharingRestClient(profile),
jsonPredicateHints=jsonPredicateHints,
limit=limit,
version=version,
timestamp=timestamp
Expand Down
101 changes: 101 additions & 0 deletions python/delta_sharing/tests/test_delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,107 @@ def test_load_as_pandas_success(
pd.testing.assert_frame_equal(pdf, expected)


# We will test predicates with the table share8.default.cdf_table_with_partition
# This table is partitioned by birthday column of type date.
# There are two partitions: 2020-02-02, and 2020-01-01.
# Each partition has one row.
@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
@pytest.mark.parametrize(
"fragments,jsonPredicateHints,expected",
[
# No predicates specified, so both rows are returned.
pytest.param(
"share8.default.cdf_table_with_partition",
None,
pd.DataFrame(
{
"name": ["2", "1"],
"age": pd.Series([2, 1], dtype="int32"),
"birthday": [date(2020, 2, 2), date(2020, 1, 1)],
}
),
id="no predicates",
),
# Equality predicate returns only one row.
pytest.param(
"share8.default.cdf_table_with_partition",
(
'{"op":"equal", "children":['
' {"op":"column","name":"birthday","valueType":"date"},'
' {"op":"literal","value":"2020-02-02","valueType":"date"}]}'
),
pd.DataFrame(
{
"name": ["2"],
"age": pd.Series([2], dtype="int32"),
"birthday": [date(2020, 2, 2)],
}
),
id="equal 2020-02-02",
),
# Equality predicate returns the other row.
pytest.param(
"share8.default.cdf_table_with_partition",
(
'{"op":"equal", "children":['
' {"op":"column","name":"birthday","valueType":"date"},'
' {"op":"literal","value":"2020-01-01","valueType":"date"}]}'
),
pd.DataFrame(
{
"name": ["1"],
"age": pd.Series([1], dtype="int32"),
"birthday": [date(2020, 1, 1)],
}
),
id="equal 2020-01-01",
),
# Equality predicate returns zero rows.
pytest.param(
"share8.default.cdf_table_with_partition",
(
'{"op":"equal", "children":['
' {"op":"column","name":"birthday","valueType":"date"},'
' {"op":"literal","value":"2022-02-02","valueType":"date"}]}'
),
pd.DataFrame(
{
"name": pd.Series([], dtype="str"),
"age": pd.Series([], dtype="int32"),
"birthday": pd.Series([], dtype="object"),
}
),
id="equal 2022-02-02",
),
# GT predicate returns all rows.
pytest.param(
"share8.default.cdf_table_with_partition",
(
'{"op":"greaterThan", "children":['
' {"op":"column","name":"birthday","valueType":"date"},'
' {"op":"literal","value":"2019-01-01","valueType":"date"}]}'
),
pd.DataFrame(
{
"name": ["2", "1"],
"age": pd.Series([2, 1], dtype="int32"),
"birthday": [date(2020, 2, 2), date(2020, 1, 1)],
}
),
id="greatherThan 2019-01-01",
),
],
)
def test_load_as_pandas_with_json_predicates(
profile_path: str,
fragments: str,
jsonPredicateHints: Optional[str],
expected: pd.DataFrame
):
pdf = load_as_pandas(f"{profile_path}#{fragments}", None, None, None, jsonPredicateHints)
pd.testing.assert_frame_equal(pdf, expected)


@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
@pytest.mark.parametrize(
"fragments,version,timestamp,error",
Expand Down

0 comments on commit 951d91d

Please sign in to comment.