Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Support for Custom Session+Proxy Configurations #644

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

qxtaiba
Copy link

@qxtaiba qxtaiba commented Jan 23, 2025

Description

This PR introduces the ability for users to pass custom requests.Session objects to the SharingClient in the Delta Sharing Python library. This enhancement allows users to configure more complex session settings that cannot be achieved using environment variables alone, such as authenticated proxies, custom headers, SSL configurations, timeout settings, and other session-related configurations. This provides users with greater flexibility when working in complex network environments or when specific session configurations are required.

This PR also updates the Delta Sharing File System in Spark to support proxy configurations. This means users can now define proxy settings, including authenticated proxies, custom headers, SSL configurations, and timeout settings, through the Spark configuration.

Key Changes

1. SharingClient Class Update (Python)

The SharingClient class now accepts an optional session parameter in its constructor. This allows users to pass a custom requests.Session object when creating a SharingClient. If no session is provided, a new requests.Session will be created as before:

class SharingClient:
    def __init__(
        self,
        profile: Union[str, BinaryIO, TextIO, Path, DeltaSharingProfile],
        session: Optional[requests.Session] = None
    ):
        if not isinstance(profile, DeltaSharingProfile):
            profile = DeltaSharingProfile.read_from_file(profile)
        self._profile = profile
        self._rest_client = DataSharingRestClient(profile, session=session)

2. DataSharingRestClient Class Update (Python)

The DataSharingRestClient class now accepts an optional session parameter. The custom session is passed from SharingClient to DataSharingRestClient, ensuring all HTTP requests utilize the custom session.

class DataSharingRestClient:
    def __init__(
        self,
        profile: DeltaSharingProfile,
        num_retries=10,
        session: Optional[requests.Session] = None
    ):
        self._profile = profile
        self._num_retries = num_retries
        self._sleeper = lambda sleep_ms: time.sleep(sleep_ms / 1000)
        self.__auth_session(profile, session)

        self._session.headers.update(
            {
                "User-Agent": DataSharingRestClient.USER_AGENT,
            }
        )

The __auth_session method uses the provided session or creates a new one if none is provided:

def __auth_session(self, profile, session: Optional[requests.Session] = None):
    self._session = session or requests.Session()
    self._auth_credential_provider = (
        AuthCredentialProviderFactory.create_auth_credential_provider(profile)
    )
    if urlparse(profile.endpoint).hostname == "localhost":
        self._session.verify = False

3. High-Level Function Updates (Python)

The load_as_pandas and load_table_changes_as_pandas functions now accept an optional sharing_client parameter. If a sharing_client is provided, these functions will use its rest_client for making HTTP requests, ensuring the custom session is used.

def load_as_pandas(
  url: str,
  limit: Optional[int] = None,
  version: Optional[int] = None,
  timestamp: Optional[str] = None,
  jsonPredicateHints: Optional[str] = None,
  use_delta_format: Optional[bool] = None,
  sharing_client: Optional[SharingClient] = None,
) -> pd.DataFrame:
  profile_json, share, schema, table = _parse_url(url)
  profile = DeltaSharingProfile.read_from_file(profile_json)
  rest_client = (sharing_client and sharing_client.rest_client) or DataSharingRestClient(profile)
  return DeltaSharingReader(
      table=Table(name=table, share=share, schema=schema),
      rest_client=rest_client,
      jsonPredicateHints=jsonPredicateHints,
      limit=limit,
      version=version,
      timestamp=timestamp,
      use_delta_format=use_delta_format
  ).to_pandas()

3. Proxy Configuration Support (Spark)

The DeltaSharingFileSystem class now supports proxy configurations through its configuration settings. Users can define proxy hosts, ports, and other related settings in the Spark configuration. In order to use this, you can configure the following properties:

spark.delta.sharing.network.proxyHost=your.proxy.host
spark.delta.sharing.network.proxyPort=your_proxy_port
spark.delta.sharing.network.proxyAuthToken=your_proxy_auth_token
spark.delta.sharing.network.sslTrustAll=true
spark.delta.sharing.network.caCertPath=/path/to/your/ca_cert.pem

5. ConfUtils Updates (Spark)

The ConfUtils utility object has been updated to handle the retrieval and validation of proxy-related configurations, including custom headers and SSL configurations.

6. HTTP Client Configuration (Spark)

The DeltaSharingFileSystem.createHttpClient method has been enhanced to configure the HTTP client with proxy settings, custom headers, SSL configurations, and timeout settings.

Example Usage

Python

This is a simplified example of how to use the updated SharingClient with a custom requests.Session to configure an authenticated proxy, custom headers, and other settings:

import requests
from delta_sharing import SharingClient, load_as_pandas

custom_session = requests.Session()

custom_session.proxies = {
    'http': 'http://username:password@proxyserver:port',
    'https': 'https://username:password@proxyserver:port',
}

custom_session.headers.update({'Custom-Header': 'CustomValue'})
custom_session.timeout = 30  # Timeout in seconds
custom_session.verify = '/path/to/custom/cert.pem'  # Path to SSL certificate

profile_path = '/path/to/delta_sharing_profile.share'
client = SharingClient(profile_path, session=custom_session)

shares = client.list_shares()

table_url = f"{profile_path}#share_name.schema_name.table_name"
df = load_as_pandas(table_url, sharing_client=client)

print(df.head())

Spark

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("DeltaSharingExample")
  .config("spark.delta.sharing.network.proxyHost", "proxy.example.com")
  .config("spark.delta.sharing.network.proxyPort", "8080")
  .config("spark.delta.sharing.network.noProxyHosts", "noproxy.example.com")
  .config("spark.delta.sharing.network.proxyAuthToken", "my_proxy_auth_token")
  .config("spark.delta.sharing.network.sslTrustAll", "true")
  .config("spark.delta.sharing.network.caCertPath", "/path/to/ca_cert.pem")
  .getOrCreate()

@qxtaiba
Copy link
Author

qxtaiba commented Jan 23, 2025

@qinghaowu-db, @linzhou-db, @moderakh : I noticed you guys seem to be pretty active in this repo. I'd appreciate a review, sanity check, and thoughts on the above changes please! I'm keen to get this merged in and rolled out

@qxtaiba qxtaiba force-pushed the qalnuaimy/session-support branch 3 times, most recently from 7ae84c5 to f5c7c63 Compare January 25, 2025 10:34
Fix the description of "Read Change Data Feed from a Table"
@qxtaiba qxtaiba force-pushed the qalnuaimy/session-support branch from f5c7c63 to 2b92b44 Compare January 25, 2025 10:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant