Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Enable Support for Custom Session+Proxy Configurations #638

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/delta_sharing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
# limitations under the License.
#

from delta_sharing.delta_sharing import SharingClient, load_as_pandas, load_as_spark
from delta_sharing.delta_sharing import load_as_pandas, load_as_spark
from delta_sharing.delta_sharing import get_table_metadata, get_table_protocol, get_table_version
from delta_sharing.delta_sharing import load_table_changes_as_pandas, load_table_changes_as_spark
from delta_sharing.protocol import Share, Schema, Table
from delta_sharing.sharing_client import SharingClient
from delta_sharing.version import __version__


Expand Down
104 changes: 10 additions & 94 deletions python/delta_sharing/delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from itertools import chain
from typing import BinaryIO, List, Optional, Sequence, TextIO, Tuple, Union
from pathlib import Path
from typing import Optional, Tuple

import pandas as pd

Expand All @@ -26,11 +24,11 @@
except ImportError:
pass

from delta_sharing.protocol import DeltaSharingProfile, Schema, Share, Table
from delta_sharing.sharing_client import SharingClient
from delta_sharing.protocol import DeltaSharingProfile, Table
from delta_sharing.reader import DeltaSharingReader
from delta_sharing.rest_client import DataSharingRestClient

from requests.exceptions import HTTPError


def _parse_url(url: str) -> Tuple[str, str, str, str]:
Expand Down Expand Up @@ -105,6 +103,7 @@ def load_as_pandas(
timestamp: Optional[str] = None,
jsonPredicateHints: Optional[str] = None,
use_delta_format: Optional[bool] = None,
sharing_client: Optional[SharingClient] = None,
) -> pd.DataFrame:
"""
Load the shared table using the given url as a pandas DataFrame.
Expand All @@ -120,9 +119,10 @@ def load_as_pandas(
"""
profile_json, share, schema, table = _parse_url(url)
profile = DeltaSharingProfile.read_from_file(profile_json)
rest_client = (sharing_client and sharing_client.rest_client) or DataSharingRestClient(profile)
return DeltaSharingReader(
table=Table(name=table, share=share, schema=schema),
rest_client=DataSharingRestClient(profile),
rest_client=rest_client,
jsonPredicateHints=jsonPredicateHints,
limit=limit,
version=version,
Expand Down Expand Up @@ -216,7 +216,8 @@ def load_table_changes_as_pandas(
ending_version: Optional[int] = None,
starting_timestamp: Optional[str] = None,
ending_timestamp: Optional[str] = None,
use_delta_format: Optional[bool] = None
use_delta_format: Optional[bool] = None,
sharing_client: Optional[SharingClient] = None,
) -> pd.DataFrame:
"""
Load the table changes of shared table as a pandas DataFrame using the given url.
Expand All @@ -233,9 +234,10 @@ def load_table_changes_as_pandas(
"""
profile_json, share, schema, table = _parse_url(url)
profile = DeltaSharingProfile.read_from_file(profile_json)
rest_client = (sharing_client and sharing_client.rest_client) or DataSharingRestClient(profile)
return DeltaSharingReader(
table=Table(name=table, share=share, schema=schema),
rest_client=DataSharingRestClient(profile),
rest_client=rest_client,
use_delta_format=use_delta_format
).table_changes_to_pandas(CdfOptions(
starting_version=starting_version,
Expand All @@ -246,89 +248,3 @@ def load_table_changes_as_pandas(
# handle them properly when replaying the delta log
include_historical_metadata=use_delta_format
))


class SharingClient:
"""
A Delta Sharing client to query shares/schemas/tables from a Delta Sharing Server.
"""

def __init__(self, profile: Union[str, BinaryIO, TextIO, Path, DeltaSharingProfile]):
if not isinstance(profile, DeltaSharingProfile):
profile = DeltaSharingProfile.read_from_file(profile)
self._profile = profile
self._rest_client = DataSharingRestClient(profile)

def __list_all_tables_in_share(self, share: Share) -> Sequence[Table]:
tables: List[Table] = []
page_token: Optional[str] = None
while True:
response = self._rest_client.list_all_tables(share=share, page_token=page_token)
tables.extend(response.tables)
page_token = response.next_page_token
if page_token is None or page_token == "":
return tables

def list_shares(self) -> Sequence[Share]:
"""
List shares that can be accessed by you in a Delta Sharing Server.

:return: the shares that can be accessed.
"""
shares: List[Share] = []
page_token: Optional[str] = None
while True:
response = self._rest_client.list_shares(page_token=page_token)
shares.extend(response.shares)
page_token = response.next_page_token
if page_token is None or page_token == "":
return shares

def list_schemas(self, share: Share) -> Sequence[Schema]:
"""
List schemas in a share that can be accessed by you in a Delta Sharing Server.

:param share: the share to list.
:return: the schemas in a share.
"""
schemas: List[Schema] = []
page_token: Optional[str] = None
while True:
response = self._rest_client.list_schemas(share=share, page_token=page_token)
schemas.extend(response.schemas)
page_token = response.next_page_token
if page_token is None or page_token == "":
return schemas

def list_tables(self, schema: Schema) -> Sequence[Table]:
"""
List tables in a schema that can be accessed by you in a Delta Sharing Server.

:param schema: the schema to list.
:return: the tables in a schema.
"""
tables: List[Table] = []
page_token: Optional[str] = None
while True:
response = self._rest_client.list_tables(schema=schema, page_token=page_token)
tables.extend(response.tables)
page_token = response.next_page_token
if page_token is None or page_token == "":
return tables

def list_all_tables(self) -> Sequence[Table]:
"""
List all tables that can be accessed by you in a Delta Sharing Server.

:return: all tables that can be accessed.
"""
shares = self.list_shares()
try:
return list(chain(*(self.__list_all_tables_in_share(share) for share in shares)))
except HTTPError as e:
if e.response.status_code == 404:
# The server doesn't support all-tables API. Fallback to the old APIs instead.
schemas = chain(*(self.list_schemas(share) for share in shares))
return list(chain(*(self.list_tables(schema) for schema in schemas)))
else:
raise e
28 changes: 20 additions & 8 deletions python/delta_sharing/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import os
import pandas as pd
import pyarrow as pa
import requests
import tempfile
from pyarrow.dataset import dataset

Expand Down Expand Up @@ -172,16 +173,17 @@ def to_pandas(self) -> pd.DataFrame:

converters = to_converters(schema_json)

session = self._rest_client.get_session()
if self._limit is None:
pdfs = [
DeltaSharingReader._to_pandas(
file, converters, False, None) for file in response.add_files
file, converters, False, None, session=session) for file in response.add_files
]
else:
left = self._limit
pdfs = []
for file in response.add_files:
pdf = DeltaSharingReader._to_pandas(file, converters, False, left)
pdf = DeltaSharingReader._to_pandas(file, converters, False, left, session=session)
pdfs.append(pdf)
left -= len(pdf)
assert (
Expand Down Expand Up @@ -387,10 +389,11 @@ def table_changes_to_pandas(self, cdfOptions: CdfOptions) -> pd.DataFrame:
if len(response.actions) == 0:
return get_empty_table(self._add_special_cdf_schema(schema_json))

session = self._rest_client.get_session()
converters = to_converters(schema_json)
pdfs = []
for action in response.actions:
pdf = DeltaSharingReader._to_pandas(action, converters, True, None)
pdf = DeltaSharingReader._to_pandas(action, converters, True, None, session=session)
pdfs.append(pdf)

return pd.concat(pdfs, axis=0, ignore_index=True, copy=False)
Expand Down Expand Up @@ -418,19 +421,28 @@ def _to_pandas(
action: FileAction,
converters: Dict[str, Callable[[str], Any]],
for_cdf: bool,
limit: Optional[int]
limit: Optional[int],
session: Optional[requests.Session]
) -> pd.DataFrame:
url = urlparse(action.url)
if "storage.googleapis.com" in (url.netloc.lower()):
# Apply the yarl patch for GCS pre-signed urls
import delta_sharing._yarl_patch # noqa: F401

protocol = url.scheme
proxy = getproxies()
if len(proxy) != 0:
filesystem = fsspec.filesystem(protocol, client_kwargs={"trust_env":True})

if session is not None:
proxies = session.proxies
client_kwargs = {}
if proxies:
client_kwargs = {'proxies': proxies}
filesystem = fsspec.filesystem(protocol, **client_kwargs)
else:
filesystem = fsspec.filesystem(protocol)
proxy = getproxies()
if len(proxy) != 0:
filesystem = fsspec.filesystem(protocol, client_kwargs={"trust_env":True})
else:
filesystem = fsspec.filesystem(protocol)

pa_dataset = dataset(source=action.url, format="parquet", filesystem=filesystem)
pa_table = pa_dataset.head(limit) if limit is not None else pa_dataset.to_table()
Expand Down
14 changes: 9 additions & 5 deletions python/delta_sharing/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,25 +149,29 @@ class DataSharingRestClient:
DELTA_FORMAT = "delta"
PARQUET_FORMAT = "parquet"

def __init__(self, profile: DeltaSharingProfile, num_retries=10):
def __init__(self, profile: DeltaSharingProfile, num_retries=10, session: Optional[requests.Session] = None):
self._profile = profile
self._num_retries = num_retries
self._sleeper = lambda sleep_ms: time.sleep(sleep_ms / 1000)
self.__auth_session(profile)
self.__auth_session(profile, session)

self._session.headers.update(
{
"User-Agent": DataSharingRestClient.USER_AGENT,
}
)

def __auth_session(self, profile):
self._session = requests.Session()
def __auth_session(self, profile, session: Optional[requests.Session] = None):
self._session = session or requests.Session()
self._auth_credential_provider = (
AuthCredentialProviderFactory.create_auth_credential_provider(profile))
AuthCredentialProviderFactory.create_auth_credential_provider(profile)
)
if urlparse(profile.endpoint).hostname == "localhost":
self._session.verify = False

def get_session(self) -> requests.Session:
return self._session

def set_sharing_capabilities_header(self):
delta_sharing_capabilities = (
DataSharingRestClient.DELTA_AND_PARQUET_RESPONSE_FORMAT + ';' +
Expand Down
108 changes: 108 additions & 0 deletions python/delta_sharing/sharing_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from itertools import chain
from typing import BinaryIO, List, Optional, Sequence, TextIO, Union
from pathlib import Path
import requests

from delta_sharing.protocol import DeltaSharingProfile, Schema, Share, Table
from delta_sharing.reader import DeltaSharingReader
from delta_sharing.rest_client import DataSharingRestClient

from requests.exceptions import HTTPError


class SharingClient:
"""
A Delta Sharing client to query shares/schemas/tables from a Delta Sharing Server.

:param profile: The path to the profile file or a DeltaSharingProfile object.
:param session: An optional requests.Session object to use for HTTP requests.
You can use this to customize proxy settings, authentication, etc.
"""
def __init__(self, profile: Union[str, BinaryIO, TextIO, Path, DeltaSharingProfile], session: Optional[requests.Session] = None):
if not isinstance(profile, DeltaSharingProfile):
profile = DeltaSharingProfile.read_from_file(profile)
self._profile = profile
self._rest_client = DataSharingRestClient(profile, session=session)

@property
def rest_client(self) -> DataSharingRestClient:
"""
Get the underlying DataSharingRestClient used by this SharingClient.

:return: The DataSharingRestClient instance.
"""
return self._rest_client

def __list_all_tables_in_share(self, share: Share) -> Sequence[Table]:
tables: List[Table] = []
page_token: Optional[str] = None
while True:
response = self._rest_client.list_all_tables(share=share, page_token=page_token)
tables.extend(response.tables)
page_token = response.next_page_token
if page_token is None or page_token == "":
return tables

def list_shares(self) -> Sequence[Share]:
"""
List shares that can be accessed by you in a Delta Sharing Server.

:return: the shares that can be accessed.
"""
shares: List[Share] = []
page_token: Optional[str] = None
while True:
response = self._rest_client.list_shares(page_token=page_token)
shares.extend(response.shares)
page_token = response.next_page_token
if page_token is None or page_token == "":
return shares

def list_schemas(self, share: Share) -> Sequence[Schema]:
"""
List schemas in a share that can be accessed by you in a Delta Sharing Server.

:param share: the share to list.
:return: the schemas in a share.
"""
schemas: List[Schema] = []
page_token: Optional[str] = None
while True:
response = self._rest_client.list_schemas(share=share, page_token=page_token)
schemas.extend(response.schemas)
page_token = response.next_page_token
if page_token is None or page_token == "":
return schemas

def list_tables(self, schema: Schema) -> Sequence[Table]:
"""
List tables in a schema that can be accessed by you in a Delta Sharing Server.

:param schema: the schema to list.
:return: the tables in a schema.
"""
tables: List[Table] = []
page_token: Optional[str] = None
while True:
response = self._rest_client.list_tables(schema=schema, page_token=page_token)
tables.extend(response.tables)
page_token = response.next_page_token
if page_token is None or page_token == "":
return tables

def list_all_tables(self) -> Sequence[Table]:
"""
List all tables that can be accessed by you in a Delta Sharing Server.

:return: all tables that can be accessed.
"""
shares = self.list_shares()
try:
return list(chain(*(self.__list_all_tables_in_share(share) for share in shares)))
except HTTPError as e:
if e.response.status_code == 404:
# The server doesn't support all-tables API. Fallback to the old APIs instead.
schemas = chain(*(self.list_schemas(share) for share in shares))
return list(chain(*(self.list_tables(schema) for schema in schemas)))
else:
raise e
Loading
Loading