From 2b92b44f7210395cb47012226a703be08b94e985 Mon Sep 17 00:00:00 2001 From: Qutaiba Al-Nuaimy Date: Fri, 24 Jan 2025 01:48:08 +0400 Subject: [PATCH 1/4] enable support for custom session configurations Fix the description of "Read Change Data Feed from a Table" --- .../client/DeltaSharingFileSystem.scala | 55 ++++++-- .../delta/sharing/client/util/ConfUtils.scala | 54 +++++++- .../client/DeltaSharingFileSystemSuite.scala | 4 +- .../sharing/client/util/ConfUtilsSuite.scala | 11 +- .../sharing/client/util/RetryUtilsSuite.scala | 1 - python/delta_sharing/delta_sharing.py | 105 ++------------- python/delta_sharing/reader.py | 28 ++-- python/delta_sharing/rest_client.py | 19 ++- python/delta_sharing/sharing_client.py | 126 ++++++++++++++++++ python/delta_sharing/tests/test_reader.py | 54 +++++++- 10 files changed, 326 insertions(+), 131 deletions(-) create mode 100644 python/delta_sharing/sharing_client.py diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingFileSystem.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingFileSystem.scala index 73b2e969a..a7be6c985 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingFileSystem.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingFileSystem.scala @@ -16,6 +16,7 @@ package io.delta.sharing.client +import java.io.File import java.net.{URI, URLDecoder, URLEncoder} import java.util.concurrent.TimeUnit @@ -23,13 +24,15 @@ import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.util.Progressable import org.apache.http.{HttpClientConnection, HttpHost, HttpRequest, HttpResponse} -import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials} import org.apache.http.client.config.RequestConfig import org.apache.http.client.utils.URIBuilder import org.apache.http.conn.routing.HttpRoute -import org.apache.http.impl.client.{BasicCredentialsProvider, HttpClientBuilder, RequestWrapper} +import org.apache.http.conn.ssl.NoopHostnameVerifier +import org.apache.http.conn.ssl.TrustSelfSignedStrategy +import org.apache.http.impl.client.{CloseableHttpClient, HttpClientBuilder, RequestWrapper} import org.apache.http.impl.conn.{DefaultRoutePlanner, DefaultSchemePortResolver} import org.apache.http.protocol.{HttpContext, HttpRequestExecutor} +import org.apache.http.ssl.SSLContextBuilder import org.apache.spark.SparkEnv import org.apache.spark.delta.sharing.{PreSignedUrlCache, PreSignedUrlFetcher} import org.apache.spark.internal.Logging @@ -44,18 +47,20 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem with Logging { lazy private val numRetries = ConfUtils.numRetries(getConf) lazy private val maxRetryDurationMillis = ConfUtils.maxRetryDurationMillis(getConf) - lazy private val timeoutInSeconds = ConfUtils.timeoutInSeconds(getConf) + lazy private val timeoutInMillis = ConfUtils.getTimeoutInMillis(getConf) lazy private val httpClient = createHttpClient() - private[sharing] def createHttpClient() = { + private[sharing] def createHttpClient(): CloseableHttpClient = { val proxyConfigOpt = ConfUtils.getProxyConfig(getConf) val maxConnections = ConfUtils.maxConnections(getConf) + val customHeadersOpt = ConfUtils.getCustomHeaders(getConf) + val config = RequestConfig.custom() - .setConnectTimeout(timeoutInSeconds * 1000) - .setConnectionRequestTimeout(timeoutInSeconds * 1000) - .setSocketTimeout(timeoutInSeconds * 1000).build() + .setConnectTimeout(timeoutInMillis) + .setConnectionRequestTimeout(timeoutInMillis) + .setSocketTimeout(timeoutInMillis).build() - logDebug(s"Creating delta sharing httpClient with timeoutInSeconds: $timeoutInSeconds.") + logDebug(s"Creating delta sharing httpClient with timeoutInMillis: $timeoutInMillis.") val clientBuilder = HttpClientBuilder.create() .setMaxConnTotal(maxConnections) .setMaxConnPerRoute(maxConnections) @@ -66,10 +71,15 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem with Logging { // Set proxy if provided. proxyConfigOpt.foreach { proxyConfig => - val proxy = new HttpHost(proxyConfig.host, proxyConfig.port) clientBuilder.setProxy(proxy) + if (proxyConfig.authToken.nonEmpty) { + clientBuilder.addInterceptorFirst((request: HttpRequest, _: HttpContext) => { + request.addHeader("Proxy-Authorization", s"Bearer ${proxyConfig.authToken}") + }) + } + val neverUseHttps = ConfUtils.getNeverUseHttps(getConf) if (neverUseHttps) { val httpRequestDowngradeExecutor = new HttpRequestExecutor { @@ -94,6 +104,7 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem with Logging { } clientBuilder.setRequestExecutor(httpRequestDowngradeExecutor) } + if (proxyConfig.noProxyHosts.nonEmpty || neverUseHttps) { val routePlanner = new DefaultRoutePlanner(DefaultSchemePortResolver.INSTANCE) { override def determineRoute(target: HttpHost, @@ -110,6 +121,28 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem with Logging { } clientBuilder.setRoutePlanner(routePlanner) } + + if (proxyConfig.sslTrustAll) { + clientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE) + clientBuilder.setSSLContext( + new SSLContextBuilder() + .loadTrustMaterial(null, new TrustSelfSignedStrategy) + .build() + ) + } else if (proxyConfig.caCertPath.nonEmpty) { + clientBuilder.setSSLContext( + new SSLContextBuilder() + .loadTrustMaterial(new File(proxyConfig.caCertPath.getOrElse("")), null) + .build() + ) + } + + customHeadersOpt.foreach { headers => + ConfUtils.validateCustomHeaders(headers) + clientBuilder.addInterceptorFirst((request: HttpRequest, _: HttpContext) => { + headers.foreach { case (key, value) => request.addHeader(key, value) } + }) + } } clientBuilder.build() } @@ -122,7 +155,7 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem with Logging { override def getScheme: String = SCHEME - override def getUri(): URI = URI.create(s"$SCHEME:///") + override def getUri: URI = URI.create(s"$SCHEME:///") // open a file path with the format below: // ``` @@ -204,7 +237,7 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem with Logging { private[sharing] object DeltaSharingFileSystem { - val SCHEME = "delta-sharing" + private val SCHEME = "delta-sharing" case class DeltaSharingPath(tablePath: String, fileId: String, fileSize: Long) { diff --git a/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala b/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala index fcff6e580..32922e9df 100644 --- a/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala +++ b/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala @@ -16,8 +16,8 @@ package io.delta.sharing.client.util +import com.fasterxml.jackson.databind.ObjectMapper import java.util.concurrent.TimeUnit - import org.apache.hadoop.conf.Configuration import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.internal.SQLConf @@ -81,6 +81,10 @@ object ConfUtils { val PROXY_PORT = "spark.delta.sharing.network.proxyPort" val NO_PROXY_HOSTS = "spark.delta.sharing.network.noProxyHosts" + val CUSTOM_HEADERS = "spark.delta.sharing.network.customHeaders" + val PROXY_AUTH_TOKEN = "spark.delta.sharing.network.proxyAuthToken" + val CA_CERT_PATH = "spark.delta.sharing.network.caCertPath" + val OAUTH_RETRIES_CONF = "spark.delta.sharing.oauth.tokenExchangeMaxRetries" val OAUTH_RETRIES_DEFAULT = 5 @@ -98,6 +102,10 @@ object ConfUtils { def getProxyConfig(conf: Configuration): Option[ProxyConfig] = { val proxyHost = conf.get(PROXY_HOST, null) val proxyPortAsString = conf.get(PROXY_PORT, null) + val noProxyList = conf.getTrimmedStrings(NO_PROXY_HOSTS).toSeq + val authToken = Option(conf.get(PROXY_AUTH_TOKEN, null)) + val caCertPath = Option(conf.get(CA_CERT_PATH, null)) + val sslTrustAll = conf.getBoolean(SSL_TRUST_ALL_CONF, SSL_TRUST_ALL_DEFAULT.toBoolean) if (proxyHost == null && proxyPortAsString == null) { return None @@ -105,11 +113,44 @@ object ConfUtils { validateNonEmpty(proxyHost, PROXY_HOST) validateNonEmpty(proxyPortAsString, PROXY_PORT) + validateNonEmpty(sslTrustAll.toString, SSL_TRUST_ALL_CONF) + val proxyPort = proxyPortAsString.toInt validatePortNumber(proxyPort, PROXY_PORT) - val noProxyList = conf.getTrimmedStrings(NO_PROXY_HOSTS).toSeq - Some(ProxyConfig(proxyHost, proxyPort, noProxyHosts = noProxyList)) + Some(ProxyConfig( + host = proxyHost, + port = proxyPort, + noProxyHosts = noProxyList, + authToken = authToken, + caCertPath = caCertPath, + sslTrustAll = sslTrustAll + )) + } + + def getCustomHeaders(conf: Configuration): Option[Map[String, String]] = { + val headersString = conf.get(CUSTOM_HEADERS, null) + if (headersString != null && headersString.nonEmpty) { + val mapper = new ObjectMapper() + val headers = mapper.readValue(headersString, classOf[Map[String, String]]) + Some(headers) + } else { + None + } + } + + def validateCustomHeaders(headers: Map[String, String]): Unit = { + headers.foreach { case (key, value) => + require(key != null && key.nonEmpty, "Custom header name must not be null or empty") + require(value != null, s"Custom header value for '$key' must not be null") + } + } + + def getTimeoutInMillis(conf: Configuration): Int = { + val timeoutStr = conf.get(TIMEOUT_CONF, TIMEOUT_DEFAULT) + val timeoutMillis = JavaUtils.timeStringAs(timeoutStr, TimeUnit.MILLISECONDS) + validateNonNeg(timeoutMillis, TIMEOUT_CONF) + timeoutMillis.toInt } def getNeverUseHttps(conf: Configuration): Boolean = { @@ -325,7 +366,10 @@ object ConfUtils { } case class ProxyConfig(host: String, - port: Int, - noProxyHosts: Seq[String] = Seq.empty + port: Int, + noProxyHosts: Seq[String] = Seq.empty, + authToken: Option[String] = None, + caCertPath: Option[String] = None, + sslTrustAll: Boolean = false ) } diff --git a/client/src/test/scala/io/delta/sharing/client/DeltaSharingFileSystemSuite.scala b/client/src/test/scala/io/delta/sharing/client/DeltaSharingFileSystemSuite.scala index f576d540f..0b535b0c4 100644 --- a/client/src/test/scala/io/delta/sharing/client/DeltaSharingFileSystemSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/DeltaSharingFileSystemSuite.scala @@ -92,11 +92,13 @@ class DeltaSharingFileSystemSuite extends SparkFunSuite { proxyServer.initialize() try { - // Create a ProxyConfig with the host and port of the local proxy server. val conf = new Configuration conf.set(ConfUtils.PROXY_HOST, proxyServer.getHost()) conf.set(ConfUtils.PROXY_PORT, proxyServer.getPort().toString) + conf.set(ConfUtils.PROXY_AUTH_TOKEN, "testAuthToken") + conf.set(ConfUtils.CA_CERT_PATH, "/path/to/ca_cert.pem") + conf.set(ConfUtils.SSL_TRUST_ALL_CONF, "true") // Configure the httpClient to use the ProxyConfig. val fs = new DeltaSharingFileSystem() { diff --git a/client/src/test/scala/io/delta/sharing/client/util/ConfUtilsSuite.scala b/client/src/test/scala/io/delta/sharing/client/util/ConfUtilsSuite.scala index 239f950ea..160cc0ca6 100644 --- a/client/src/test/scala/io/delta/sharing/client/util/ConfUtilsSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/util/ConfUtilsSuite.scala @@ -113,13 +113,19 @@ class ConfUtilsSuite extends SparkFunSuite { val conf = newConf(Map( PROXY_HOST -> "1.2.3.4", PROXY_PORT -> "8080", - NO_PROXY_HOSTS -> "localhost,127.0.0.1" + NO_PROXY_HOSTS -> "localhost,127.0.0.1", + PROXY_AUTH_TOKEN -> "testAuthToken", + CA_CERT_PATH -> "/path/to/ca_cert.pem", + SSL_TRUST_ALL_CONF -> "true" )) val proxyConfig = getProxyConfig(conf) assert(proxyConfig.isDefined) assert(proxyConfig.get.host == "1.2.3.4") assert(proxyConfig.get.port == 8080) assert(proxyConfig.get.noProxyHosts == Seq("localhost", "127.0.0.1")) + assert(proxyConfig.get.authToken.contains("testAuthToken")) + assert(proxyConfig.get.caCertPath.contains("/path/to/ca_cert.pem")) + assert(proxyConfig.get.sslTrustAll) } test("getProxyConfig with only host and port") { @@ -132,6 +138,9 @@ class ConfUtilsSuite extends SparkFunSuite { assert(proxyConfig.get.host == "1.2.3.4") assert(proxyConfig.get.port == 8080) assert(proxyConfig.get.noProxyHosts.isEmpty) + assert(proxyConfig.get.authToken.isEmpty) + assert(proxyConfig.get.caCertPath.isEmpty) + assert(!proxyConfig.get.sslTrustAll) } test("getProxyConfig with no proxy settings") { diff --git a/client/src/test/scala/io/delta/sharing/client/util/RetryUtilsSuite.scala b/client/src/test/scala/io/delta/sharing/client/util/RetryUtilsSuite.scala index 934147233..4a2136053 100644 --- a/client/src/test/scala/io/delta/sharing/client/util/RetryUtilsSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/util/RetryUtilsSuite.scala @@ -22,7 +22,6 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkFunSuite -import io.delta.sharing.client.util.{RetryUtils, UnexpectedHttpStatus} import io.delta.sharing.client.util.RetryUtils._ import io.delta.sharing.spark.MissingEndStreamActionException diff --git a/python/delta_sharing/delta_sharing.py b/python/delta_sharing/delta_sharing.py index 5269a6fd6..863749344 100644 --- a/python/delta_sharing/delta_sharing.py +++ b/python/delta_sharing/delta_sharing.py @@ -13,9 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from itertools import chain -from typing import BinaryIO, List, Optional, Sequence, TextIO, Tuple, Union -from pathlib import Path +from typing import Optional, Tuple import pandas as pd @@ -26,12 +24,11 @@ except ImportError: pass -from delta_sharing.protocol import DeltaSharingProfile, Schema, Share, Table +from delta_sharing.sharing_client import SharingClient +from delta_sharing.protocol import DeltaSharingProfile, Table from delta_sharing.reader import DeltaSharingReader from delta_sharing.rest_client import DataSharingRestClient -from requests.exceptions import HTTPError - def _parse_url(url: str) -> Tuple[str, str, str, str]: """ @@ -105,6 +102,7 @@ def load_as_pandas( timestamp: Optional[str] = None, jsonPredicateHints: Optional[str] = None, use_delta_format: Optional[bool] = None, + sharing_client: Optional[SharingClient] = None, ) -> pd.DataFrame: """ Load the shared table using the given url as a pandas DataFrame. @@ -120,9 +118,10 @@ def load_as_pandas( """ profile_json, share, schema, table = _parse_url(url) profile = DeltaSharingProfile.read_from_file(profile_json) + rest_client = (sharing_client and sharing_client.rest_client) or DataSharingRestClient(profile) return DeltaSharingReader( table=Table(name=table, share=share, schema=schema), - rest_client=DataSharingRestClient(profile), + rest_client=rest_client, jsonPredicateHints=jsonPredicateHints, limit=limit, version=version, @@ -216,7 +215,8 @@ def load_table_changes_as_pandas( ending_version: Optional[int] = None, starting_timestamp: Optional[str] = None, ending_timestamp: Optional[str] = None, - use_delta_format: Optional[bool] = None + use_delta_format: Optional[bool] = None, + sharing_client: Optional[SharingClient] = None, ) -> pd.DataFrame: """ Load the table changes of shared table as a pandas DataFrame using the given url. @@ -233,9 +233,10 @@ def load_table_changes_as_pandas( """ profile_json, share, schema, table = _parse_url(url) profile = DeltaSharingProfile.read_from_file(profile_json) + rest_client = (sharing_client and sharing_client.rest_client) or DataSharingRestClient(profile) return DeltaSharingReader( table=Table(name=table, share=share, schema=schema), - rest_client=DataSharingRestClient(profile), + rest_client=rest_client, use_delta_format=use_delta_format ).table_changes_to_pandas(CdfOptions( starting_version=starting_version, @@ -246,89 +247,3 @@ def load_table_changes_as_pandas( # handle them properly when replaying the delta log include_historical_metadata=use_delta_format )) - - -class SharingClient: - """ - A Delta Sharing client to query shares/schemas/tables from a Delta Sharing Server. - """ - - def __init__(self, profile: Union[str, BinaryIO, TextIO, Path, DeltaSharingProfile]): - if not isinstance(profile, DeltaSharingProfile): - profile = DeltaSharingProfile.read_from_file(profile) - self._profile = profile - self._rest_client = DataSharingRestClient(profile) - - def __list_all_tables_in_share(self, share: Share) -> Sequence[Table]: - tables: List[Table] = [] - page_token: Optional[str] = None - while True: - response = self._rest_client.list_all_tables(share=share, page_token=page_token) - tables.extend(response.tables) - page_token = response.next_page_token - if page_token is None or page_token == "": - return tables - - def list_shares(self) -> Sequence[Share]: - """ - List shares that can be accessed by you in a Delta Sharing Server. - - :return: the shares that can be accessed. - """ - shares: List[Share] = [] - page_token: Optional[str] = None - while True: - response = self._rest_client.list_shares(page_token=page_token) - shares.extend(response.shares) - page_token = response.next_page_token - if page_token is None or page_token == "": - return shares - - def list_schemas(self, share: Share) -> Sequence[Schema]: - """ - List schemas in a share that can be accessed by you in a Delta Sharing Server. - - :param share: the share to list. - :return: the schemas in a share. - """ - schemas: List[Schema] = [] - page_token: Optional[str] = None - while True: - response = self._rest_client.list_schemas(share=share, page_token=page_token) - schemas.extend(response.schemas) - page_token = response.next_page_token - if page_token is None or page_token == "": - return schemas - - def list_tables(self, schema: Schema) -> Sequence[Table]: - """ - List tables in a schema that can be accessed by you in a Delta Sharing Server. - - :param schema: the schema to list. - :return: the tables in a schema. - """ - tables: List[Table] = [] - page_token: Optional[str] = None - while True: - response = self._rest_client.list_tables(schema=schema, page_token=page_token) - tables.extend(response.tables) - page_token = response.next_page_token - if page_token is None or page_token == "": - return tables - - def list_all_tables(self) -> Sequence[Table]: - """ - List all tables that can be accessed by you in a Delta Sharing Server. - - :return: all tables that can be accessed. - """ - shares = self.list_shares() - try: - return list(chain(*(self.__list_all_tables_in_share(share) for share in shares))) - except HTTPError as e: - if e.response.status_code == 404: - # The server doesn't support all-tables API. Fallback to the old APIs instead. - schemas = chain(*(self.list_schemas(share) for share in shares)) - return list(chain(*(self.list_tables(schema) for schema in schemas))) - else: - raise e diff --git a/python/delta_sharing/reader.py b/python/delta_sharing/reader.py index d98b7b1cf..73c283e20 100644 --- a/python/delta_sharing/reader.py +++ b/python/delta_sharing/reader.py @@ -24,6 +24,7 @@ import os import pandas as pd import pyarrow as pa +import requests import tempfile from pyarrow.dataset import dataset @@ -172,16 +173,17 @@ def to_pandas(self) -> pd.DataFrame: converters = to_converters(schema_json) + session = self._rest_client.get_session() if self._limit is None: pdfs = [ DeltaSharingReader._to_pandas( - file, converters, False, None) for file in response.add_files + file, converters, False, None, session=session) for file in response.add_files ] else: left = self._limit pdfs = [] for file in response.add_files: - pdf = DeltaSharingReader._to_pandas(file, converters, False, left) + pdf = DeltaSharingReader._to_pandas(file, converters, False, left, session=session) pdfs.append(pdf) left -= len(pdf) assert ( @@ -387,10 +389,11 @@ def table_changes_to_pandas(self, cdfOptions: CdfOptions) -> pd.DataFrame: if len(response.actions) == 0: return get_empty_table(self._add_special_cdf_schema(schema_json)) + session = self._rest_client.get_session() converters = to_converters(schema_json) pdfs = [] for action in response.actions: - pdf = DeltaSharingReader._to_pandas(action, converters, True, None) + pdf = DeltaSharingReader._to_pandas(action, converters, True, None, session=session) pdfs.append(pdf) return pd.concat(pdfs, axis=0, ignore_index=True, copy=False) @@ -418,7 +421,8 @@ def _to_pandas( action: FileAction, converters: Dict[str, Callable[[str], Any]], for_cdf: bool, - limit: Optional[int] + limit: Optional[int], + session: Optional[requests.Session] ) -> pd.DataFrame: url = urlparse(action.url) if "storage.googleapis.com" in (url.netloc.lower()): @@ -426,11 +430,19 @@ def _to_pandas( import delta_sharing._yarl_patch # noqa: F401 protocol = url.scheme - proxy = getproxies() - if len(proxy) != 0: - filesystem = fsspec.filesystem(protocol, client_kwargs={"trust_env":True}) + + if session is not None: + proxies = session.proxies + client_kwargs = {} + if proxies: + client_kwargs = {'proxies': proxies} + filesystem = fsspec.filesystem(protocol, **client_kwargs) else: - filesystem = fsspec.filesystem(protocol) + proxy = getproxies() + if len(proxy) != 0: + filesystem = fsspec.filesystem(protocol, client_kwargs={"trust_env":True}) + else: + filesystem = fsspec.filesystem(protocol) pa_dataset = dataset(source=action.url, format="parquet", filesystem=filesystem) pa_table = pa_dataset.head(limit) if limit is not None else pa_dataset.to_table() diff --git a/python/delta_sharing/rest_client.py b/python/delta_sharing/rest_client.py index b9638f8fc..2aab07694 100644 --- a/python/delta_sharing/rest_client.py +++ b/python/delta_sharing/rest_client.py @@ -149,11 +149,16 @@ class DataSharingRestClient: DELTA_FORMAT = "delta" PARQUET_FORMAT = "parquet" - def __init__(self, profile: DeltaSharingProfile, num_retries=10): + def __init__( + self, + profile: DeltaSharingProfile, + num_retries=10, + session: Optional[requests.Session] = None + ): self._profile = profile self._num_retries = num_retries self._sleeper = lambda sleep_ms: time.sleep(sleep_ms / 1000) - self.__auth_session(profile) + self.__auth_session(profile, session) self._session.headers.update( { @@ -161,13 +166,17 @@ def __init__(self, profile: DeltaSharingProfile, num_retries=10): } ) - def __auth_session(self, profile): - self._session = requests.Session() + def __auth_session(self, profile, session: Optional[requests.Session] = None): + self._session = session or requests.Session() self._auth_credential_provider = ( - AuthCredentialProviderFactory.create_auth_credential_provider(profile)) + AuthCredentialProviderFactory.create_auth_credential_provider(profile) + ) if urlparse(profile.endpoint).hostname == "localhost": self._session.verify = False + def get_session(self) -> requests.Session: + return self._session + def set_sharing_capabilities_header(self): delta_sharing_capabilities = ( DataSharingRestClient.DELTA_AND_PARQUET_RESPONSE_FORMAT + ';' + diff --git a/python/delta_sharing/sharing_client.py b/python/delta_sharing/sharing_client.py new file mode 100644 index 000000000..6525ef5a2 --- /dev/null +++ b/python/delta_sharing/sharing_client.py @@ -0,0 +1,126 @@ +# +# Copyright (C) 2021 The Delta Lake Project Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from itertools import chain +from typing import BinaryIO, List, Optional, Sequence, TextIO, Union +from pathlib import Path +import requests + +from delta_sharing.protocol import DeltaSharingProfile, Schema, Share, Table +from delta_sharing.rest_client import DataSharingRestClient + +from requests.exceptions import HTTPError + + +class SharingClient: + """ + A Delta Sharing client to query shares/schemas/tables from a Delta Sharing Server. + + :param profile: The path to the profile file or a DeltaSharingProfile object. + :param session: An optional requests.Session object to use for HTTP requests. + You can use this to customize proxy settings, authentication, etc. + """ + def __init__( + self, + profile: Union[str, BinaryIO, TextIO, Path, DeltaSharingProfile], + session: Optional[requests.Session] = None + ): + if not isinstance(profile, DeltaSharingProfile): + profile = DeltaSharingProfile.read_from_file(profile) + self._profile = profile + self._rest_client = DataSharingRestClient(profile, session=session) + + @property + def rest_client(self) -> DataSharingRestClient: + """ + Get the underlying DataSharingRestClient used by this SharingClient. + + :return: The DataSharingRestClient instance. + """ + return self._rest_client + + def __list_all_tables_in_share(self, share: Share) -> Sequence[Table]: + tables: List[Table] = [] + page_token: Optional[str] = None + while True: + response = self._rest_client.list_all_tables(share=share, page_token=page_token) + tables.extend(response.tables) + page_token = response.next_page_token + if page_token is None or page_token == "": + return tables + + def list_shares(self) -> Sequence[Share]: + """ + List shares that can be accessed by you in a Delta Sharing Server. + + :return: the shares that can be accessed. + """ + shares: List[Share] = [] + page_token: Optional[str] = None + while True: + response = self._rest_client.list_shares(page_token=page_token) + shares.extend(response.shares) + page_token = response.next_page_token + if page_token is None or page_token == "": + return shares + + def list_schemas(self, share: Share) -> Sequence[Schema]: + """ + List schemas in a share that can be accessed by you in a Delta Sharing Server. + + :param share: the share to list. + :return: the schemas in a share. + """ + schemas: List[Schema] = [] + page_token: Optional[str] = None + while True: + response = self._rest_client.list_schemas(share=share, page_token=page_token) + schemas.extend(response.schemas) + page_token = response.next_page_token + if page_token is None or page_token == "": + return schemas + + def list_tables(self, schema: Schema) -> Sequence[Table]: + """ + List tables in a schema that can be accessed by you in a Delta Sharing Server. + + :param schema: the schema to list. + :return: the tables in a schema. + """ + tables: List[Table] = [] + page_token: Optional[str] = None + while True: + response = self._rest_client.list_tables(schema=schema, page_token=page_token) + tables.extend(response.tables) + page_token = response.next_page_token + if page_token is None or page_token == "": + return tables + + def list_all_tables(self) -> Sequence[Table]: + """ + List all tables that can be accessed by you in a Delta Sharing Server. + + :return: all tables that can be accessed. + """ + shares = self.list_shares() + try: + return list(chain(*(self.__list_all_tables_in_share(share) for share in shares))) + except HTTPError as e: + if e.response.status_code == 404: + # The server doesn't support all-tables API. Fallback to the old APIs instead. + schemas = chain(*(self.list_schemas(share) for share in shares)) + return list(chain(*(self.list_tables(schema) for schema in schemas))) + else: + raise e diff --git a/python/delta_sharing/tests/test_reader.py b/python/delta_sharing/tests/test_reader.py index 90571ec1d..4b190c1d4 100644 --- a/python/delta_sharing/tests/test_reader.py +++ b/python/delta_sharing/tests/test_reader.py @@ -14,12 +14,10 @@ # limitations under the License. # import pytest - +import pandas as pd from datetime import date from typing import Optional, Sequence -import pandas as pd - from delta_sharing.protocol import AddFile, AddCdcFile, CdfOptions, Metadata, RemoveFile, Table from delta_sharing.reader import DeltaSharingReader from delta_sharing.rest_client import ( @@ -38,6 +36,9 @@ def test_to_pandas_non_partitioned(tmp_path): pdf2.to_parquet(tmp_path / "pdf2.parquet") class RestClientMock: + def __init__(self, session=None): + self._session = session + def list_files_in_table( self, table: Table, @@ -91,6 +92,9 @@ def list_files_in_table( def autoresolve_query_format(self, table: Table): return "parquet" + def get_session(self): + return self._session + reader = DeltaSharingReader(Table("table_name", "share_name", "schema_name"), RestClientMock()) pdf = reader.to_pandas() expected = pd.concat([pdf1, pdf2]).reset_index(drop=True) @@ -123,6 +127,9 @@ def test_to_pandas_partitioned(tmp_path): pdf2.to_parquet(tmp_path / "pdf2.parquet") class RestClientMock: + def __init__(self, session=None): + self._session = session + def list_files_in_table( self, table: Table, @@ -170,6 +177,9 @@ def list_files_in_table( def autoresolve_query_format(self, table: Table): return "parquet" + def get_session(self): + return self._session + reader = DeltaSharingReader(Table("table_name", "share_name", "schema_name"), RestClientMock()) pdf = reader.to_pandas() @@ -190,6 +200,9 @@ def test_to_pandas_partitioned_different_schemas(tmp_path): pdf2.to_parquet(tmp_path / "pdf2.parquet") class RestClientMock: + def __init__(self, session=None): + self._session = session + def list_files_in_table( self, table: Table, @@ -238,6 +251,9 @@ def list_files_in_table( def autoresolve_query_format(self, table: Table): return "parquet" + def get_session(self): + return self._session + reader = DeltaSharingReader(Table("table_name", "share_name", "schema_name"), RestClientMock()) pdf = reader.to_pandas() @@ -253,6 +269,9 @@ def autoresolve_query_format(self, table: Table): @pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) def test_to_pandas_empty(rest_client: DataSharingRestClient): class RestClientMock: + def __init__(self, session=None): + self._session = session + def list_files_in_table( self, table: Table, @@ -302,6 +321,9 @@ def list_files_in_table( def autoresolve_query_format(self, table: Table): return "parquet" + def get_session(self): + return self._session + reader = DeltaSharingReader( Table("table_name", "share_name", "schema_name"), RestClientMock() # type: ignore ) @@ -356,6 +378,9 @@ def test_table_changes_to_pandas_non_partitioned(tmp_path): pdf4[DeltaSharingReader._commit_timestamp_col_name()] = timestamp4 class RestClientMock: + def __init__(self, session=None): + self._session = session + def list_table_changes( self, table: Table, cdfOptions: CdfOptions ) -> ListTableChangesResponse: @@ -414,6 +439,9 @@ def list_table_changes( def autoresolve_query_format(self, table: Table): return "parquet" + def get_session(self): + return self._session + reader = DeltaSharingReader(Table("table_name", "share_name", "schema_name"), RestClientMock()) pdf = reader.table_changes_to_pandas(CdfOptions()) @@ -442,6 +470,9 @@ def test_table_changes_to_pandas_partitioned(tmp_path): pdf2[DeltaSharingReader._commit_timestamp_col_name()] = timestamp class RestClientMock: + def __init__(self, session=None): + self._session = session + def list_table_changes( self, table: Table, @@ -482,6 +513,9 @@ def list_table_changes( lines=None ) + def get_session(self): + return self._session + reader = DeltaSharingReader(Table("table_name", "share_name", "schema_name"), RestClientMock()) pdf = reader.table_changes_to_pandas(CdfOptions()) @@ -491,6 +525,9 @@ def list_table_changes( def test_table_changes_empty(tmp_path): class RestClientMock: + def __init__(self, session=None): + self._session = session + def list_table_changes( self, table: Table, cdfOptions: CdfOptions ) -> ListTableChangesResponse: @@ -511,6 +548,9 @@ def list_table_changes( lines=None ) + def get_session(self): + return self._session + reader = DeltaSharingReader(Table("table_name", "share_name", "schema_name"), RestClientMock()) pdf = reader.table_changes_to_pandas(CdfOptions()) assert pdf.empty @@ -565,6 +605,9 @@ def test_table_changes_to_pandas_non_partitioned_delta(tmp_path): pdf4[DeltaSharingReader._commit_timestamp_col_name()] = timestamp4 class RestClientMock: + def __init__(self, session=None): + self._session = session + def list_table_changes( self, table: Table, cdfOptions: CdfOptions ) -> ListTableChangesResponse: @@ -575,7 +618,7 @@ def list_table_changes( '{"metadata":{},"name":"a","nullable":true,"type":"long"},' '{"metadata":{},"name":"b","nullable":true,"type":"string"}' '],"type":"struct"}' - ).replace('"',r'\"') + ).replace('"', r'\"') lines = [ f'''{{ "protocol": {{ @@ -675,6 +718,9 @@ def set_delta_format_header(self): def remove_delta_format_header(self): return + def get_session(self): + return self._session + reader = DeltaSharingReader( Table("table_name", "share_name", "schema_name"), RestClientMock(), From d1aacf94e753b101e7d2a0e2aef2916cc2c4464c Mon Sep 17 00:00:00 2001 From: Qutaiba Al-Nuaimy Date: Sat, 25 Jan 2025 21:08:54 +0400 Subject: [PATCH 2/4] fix proxyconfig --- .../io/delta/sharing/client/DeltaSharingFileSystem.scala | 2 +- .../scala/io/delta/sharing/client/util/ConfUtils.scala | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingFileSystem.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingFileSystem.scala index a7be6c985..5b30f3c5c 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingFileSystem.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingFileSystem.scala @@ -132,7 +132,7 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem with Logging { } else if (proxyConfig.caCertPath.nonEmpty) { clientBuilder.setSSLContext( new SSLContextBuilder() - .loadTrustMaterial(new File(proxyConfig.caCertPath.getOrElse("")), null) + .loadTrustMaterial(new File(proxyConfig.caCertPath), null) .build() ) } diff --git a/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala b/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala index 32922e9df..3f073694a 100644 --- a/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala +++ b/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala @@ -103,8 +103,8 @@ object ConfUtils { val proxyHost = conf.get(PROXY_HOST, null) val proxyPortAsString = conf.get(PROXY_PORT, null) val noProxyList = conf.getTrimmedStrings(NO_PROXY_HOSTS).toSeq - val authToken = Option(conf.get(PROXY_AUTH_TOKEN, null)) - val caCertPath = Option(conf.get(CA_CERT_PATH, null)) + val authToken = conf.get(PROXY_AUTH_TOKEN, null) + val caCertPath = conf.get(CA_CERT_PATH, null) val sslTrustAll = conf.getBoolean(SSL_TRUST_ALL_CONF, SSL_TRUST_ALL_DEFAULT.toBoolean) if (proxyHost == null && proxyPortAsString == null) { @@ -368,8 +368,8 @@ object ConfUtils { case class ProxyConfig(host: String, port: Int, noProxyHosts: Seq[String] = Seq.empty, - authToken: Option[String] = None, - caCertPath: Option[String] = None, + authToken: String, + caCertPath: String = None, sslTrustAll: Boolean = false ) } From b2893f3cfbbbc030ffbea72bf6e241b016e15f92 Mon Sep 17 00:00:00 2001 From: Qutaiba Al-Nuaimy Date: Sat, 25 Jan 2025 21:58:32 +0400 Subject: [PATCH 3/4] refactor for clarity and readability --- .../client/DeltaSharingFileSystem.scala | 173 ++++++++++-------- .../delta/sharing/client/util/ConfUtils.scala | 47 ++--- 2 files changed, 119 insertions(+), 101 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingFileSystem.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingFileSystem.scala index 5b30f3c5c..bf619ccfc 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingFileSystem.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingFileSystem.scala @@ -23,10 +23,10 @@ import java.util.concurrent.TimeUnit import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.util.Progressable -import org.apache.http.{HttpClientConnection, HttpHost, HttpRequest, HttpResponse} +import org.apache.http.{HttpClientConnection, HttpHost, HttpRequest, HttpRequestInterceptor, HttpResponse} import org.apache.http.client.config.RequestConfig import org.apache.http.client.utils.URIBuilder -import org.apache.http.conn.routing.HttpRoute +import org.apache.http.conn.routing.{HttpRoute, HttpRoutePlanner} import org.apache.http.conn.ssl.NoopHostnameVerifier import org.apache.http.conn.ssl.TrustSelfSignedStrategy import org.apache.http.impl.client.{CloseableHttpClient, HttpClientBuilder, RequestWrapper} @@ -39,6 +39,7 @@ import org.apache.spark.internal.Logging import io.delta.sharing.client.model.FileAction import io.delta.sharing.client.util.ConfUtils +import io.delta.sharing.client.util.ConfUtils.ProxyConfig /** Read-only file system for delta paths. */ private[sharing] class DeltaSharingFileSystem extends FileSystem with Logging { @@ -47,104 +48,128 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem with Logging { lazy private val numRetries = ConfUtils.numRetries(getConf) lazy private val maxRetryDurationMillis = ConfUtils.maxRetryDurationMillis(getConf) - lazy private val timeoutInMillis = ConfUtils.getTimeoutInMillis(getConf) lazy private val httpClient = createHttpClient() private[sharing] def createHttpClient(): CloseableHttpClient = { - val proxyConfigOpt = ConfUtils.getProxyConfig(getConf) - val maxConnections = ConfUtils.maxConnections(getConf) - val customHeadersOpt = ConfUtils.getCustomHeaders(getConf) - - val config = RequestConfig.custom() + val conf = getConf + val timeoutInMillis = ConfUtils.getTimeoutInMillis(conf) + val proxyConfigOpt = ConfUtils.getProxyConfig(conf) + val maxConnections = ConfUtils.maxConnections(conf) + val customHeadersOpt = ConfUtils.getCustomHeaders(conf) + val neverUseHttps = ConfUtils.getNeverUseHttps(conf) + + val requestConfig = RequestConfig.custom() .setConnectTimeout(timeoutInMillis) .setConnectionRequestTimeout(timeoutInMillis) - .setSocketTimeout(timeoutInMillis).build() + .setSocketTimeout(timeoutInMillis) + .build() + + logDebug(s"Creating HTTP client with timeoutInMillis: $timeoutInMillis") - logDebug(s"Creating delta sharing httpClient with timeoutInMillis: $timeoutInMillis.") val clientBuilder = HttpClientBuilder.create() .setMaxConnTotal(maxConnections) .setMaxConnPerRoute(maxConnections) - .setDefaultRequestConfig(config) - // Disable the default retry behavior because we have our own retry logic. - // See `RetryUtils.runWithExponentialBackoff`. + .setDefaultRequestConfig(requestConfig) .disableAutomaticRetries() - // Set proxy if provided. proxyConfigOpt.foreach { proxyConfig => - val proxy = new HttpHost(proxyConfig.host, proxyConfig.port) - clientBuilder.setProxy(proxy) + configureProxy(clientBuilder, proxyConfig, neverUseHttps) + } - if (proxyConfig.authToken.nonEmpty) { - clientBuilder.addInterceptorFirst((request: HttpRequest, _: HttpContext) => { - request.addHeader("Proxy-Authorization", s"Bearer ${proxyConfig.authToken}") - }) - } + customHeadersOpt.foreach { headers => + addCustomHeaders(clientBuilder, headers) + } - val neverUseHttps = ConfUtils.getNeverUseHttps(getConf) - if (neverUseHttps) { - val httpRequestDowngradeExecutor = new HttpRequestExecutor { - override def execute( - request: HttpRequest, - connection: HttpClientConnection, - context: HttpContext): HttpResponse = { - try { - val modifiedUri: URI = { - new URIBuilder(request.getRequestLine.getUri).setScheme("http").build() - } - val wrappedRequest = new RequestWrapper(request) - wrappedRequest.setURI(modifiedUri) - - return super.execute(wrappedRequest, connection, context) - } catch { - case e: Exception => - logInfo("Failed to downgrade the request to http", e) - } - super.execute(request, connection, context) - } - } - clientBuilder.setRequestExecutor(httpRequestDowngradeExecutor) - } + clientBuilder.build() + } - if (proxyConfig.noProxyHosts.nonEmpty || neverUseHttps) { - val routePlanner = new DefaultRoutePlanner(DefaultSchemePortResolver.INSTANCE) { - override def determineRoute(target: HttpHost, - request: HttpRequest, - context: HttpContext): HttpRoute = { - if (proxyConfig.noProxyHosts.contains(target.getHostName)) { - // Direct route (no proxy) - new HttpRoute(target) - } else { - // Route via proxy - new HttpRoute(target, proxy) - } - } + private def configureProxy(clientBuilder: HttpClientBuilder, proxyConfig: ProxyConfig, + neverUseHttps: Boolean): Unit = { + + val proxy = new HttpHost(proxyConfig.host, proxyConfig.port) + clientBuilder.setProxy(proxy) + + proxyConfig.authToken.foreach { token => + clientBuilder.addInterceptorFirst(new HttpRequestInterceptor { + override def process(request: HttpRequest, context: HttpContext): Unit = { + request.addHeader("Proxy-Authorization", s"Bearer $token") } - clientBuilder.setRoutePlanner(routePlanner) - } + }) + } + + configureSSL(clientBuilder, proxyConfig) + + if (neverUseHttps) { + clientBuilder.setRequestExecutor(createHttpRequestDowngradeExecutor()) + } + + if (proxyConfig.noProxyHosts.nonEmpty || neverUseHttps) { + clientBuilder.setRoutePlanner(createRoutePlanner(proxy, proxyConfig.noProxyHosts)) + } + } - if (proxyConfig.sslTrustAll) { - clientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE) + private def configureSSL(clientBuilder: HttpClientBuilder, proxyConfig: ProxyConfig): Unit = { + if (proxyConfig.sslTrustAll) { + clientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE) + clientBuilder.setSSLContext( + new SSLContextBuilder() + .loadTrustMaterial(null, new TrustSelfSignedStrategy) + .build() + ) + } else { + proxyConfig.caCertPath.foreach { path => clientBuilder.setSSLContext( new SSLContextBuilder() - .loadTrustMaterial(null, new TrustSelfSignedStrategy) + .loadTrustMaterial(new File(path), null) .build() ) - } else if (proxyConfig.caCertPath.nonEmpty) { - clientBuilder.setSSLContext( - new SSLContextBuilder() - .loadTrustMaterial(new File(proxyConfig.caCertPath), null) + } + } + } + + private def createHttpRequestDowngradeExecutor(): HttpRequestExecutor = { + new HttpRequestExecutor() { + override def execute(request: HttpRequest, conn: HttpClientConnection, + context: HttpContext): HttpResponse = { + try { + val modifiedUri = new URIBuilder(request.getRequestLine.getUri) + .setScheme("http") .build() - ) + val wrappedRequest = new RequestWrapper(request) + wrappedRequest.setURI(modifiedUri) + super.execute(wrappedRequest, conn, context) + } catch { + case e: Exception => + logInfo("Failed to downgrade the request to HTTP", e) + super.execute(request, conn, context) + } } + } + } - customHeadersOpt.foreach { headers => - ConfUtils.validateCustomHeaders(headers) - clientBuilder.addInterceptorFirst((request: HttpRequest, _: HttpContext) => { - headers.foreach { case (key, value) => request.addHeader(key, value) } - }) + private def createRoutePlanner(proxy: HttpHost, noProxyHosts: Seq[String]): HttpRoutePlanner = { + new DefaultRoutePlanner(DefaultSchemePortResolver.INSTANCE) { + override def determineRoute(target: HttpHost, request: HttpRequest, + context: HttpContext): HttpRoute = { + if (noProxyHosts.contains(target.getHostName)) { + // Direct route (no proxy) + new HttpRoute(target) + } else { + // Route via proxy + new HttpRoute(target, proxy) + } } } - clientBuilder.build() + } + + private def addCustomHeaders(clientBuilder: HttpClientBuilder, + headers: Map[String, String]): Unit = { + ConfUtils.validateCustomHeaders(headers) + clientBuilder.addInterceptorFirst(new HttpRequestInterceptor { + override def process(request: HttpRequest, context: HttpContext): Unit = { + headers.foreach { case (key, value) => request.addHeader(key, value) } + } + }) } private lazy val refreshThresholdMs = getConf.getLong( diff --git a/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala b/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala index 3f073694a..64c44a50f 100644 --- a/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala +++ b/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala @@ -102,32 +102,25 @@ object ConfUtils { def getProxyConfig(conf: Configuration): Option[ProxyConfig] = { val proxyHost = conf.get(PROXY_HOST, null) val proxyPortAsString = conf.get(PROXY_PORT, null) - val noProxyList = conf.getTrimmedStrings(NO_PROXY_HOSTS).toSeq - val authToken = conf.get(PROXY_AUTH_TOKEN, null) - val caCertPath = conf.get(CA_CERT_PATH, null) - val sslTrustAll = conf.getBoolean(SSL_TRUST_ALL_CONF, SSL_TRUST_ALL_DEFAULT.toBoolean) - if (proxyHost == null && proxyPortAsString == null) { - return None + if (proxyHost != null && proxyPortAsString != null) { + val proxyPort = proxyPortAsString.toInt + validatePortNumber(proxyPort, PROXY_PORT) + + Some(ProxyConfig( + host = proxyHost, + port = proxyPort, + noProxyHosts = conf.getTrimmedStrings(NO_PROXY_HOSTS).toSeq, + authToken = Option(conf.get(PROXY_AUTH_TOKEN, null)), + caCertPath = Option(conf.get(CA_CERT_PATH, null)), + sslTrustAll = conf.getBoolean(SSL_TRUST_ALL_CONF, SSL_TRUST_ALL_DEFAULT.toBoolean) + )) + } else { + None } - - validateNonEmpty(proxyHost, PROXY_HOST) - validateNonEmpty(proxyPortAsString, PROXY_PORT) - validateNonEmpty(sslTrustAll.toString, SSL_TRUST_ALL_CONF) - - val proxyPort = proxyPortAsString.toInt - validatePortNumber(proxyPort, PROXY_PORT) - - Some(ProxyConfig( - host = proxyHost, - port = proxyPort, - noProxyHosts = noProxyList, - authToken = authToken, - caCertPath = caCertPath, - sslTrustAll = sslTrustAll - )) } + def getCustomHeaders(conf: Configuration): Option[Map[String, String]] = { val headersString = conf.get(CUSTOM_HEADERS, null) if (headersString != null && headersString.nonEmpty) { @@ -366,10 +359,10 @@ object ConfUtils { } case class ProxyConfig(host: String, - port: Int, - noProxyHosts: Seq[String] = Seq.empty, - authToken: String, - caCertPath: String = None, - sslTrustAll: Boolean = false + port: Int, + noProxyHosts: Seq[String] = Seq.empty, + authToken: Option[String] = None, + caCertPath: Option[String] = None, + sslTrustAll: Boolean = false ) } From f833042639e6f6c50bd02a9b5d7aef634b30276e Mon Sep 17 00:00:00 2001 From: Qutaiba Al-Nuaimy Date: Sat, 25 Jan 2025 22:12:17 +0400 Subject: [PATCH 4/4] fix confutils check failure --- .../delta/sharing/client/util/ConfUtils.scala | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala b/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala index 64c44a50f..78b8d9492 100644 --- a/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala +++ b/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala @@ -103,21 +103,23 @@ object ConfUtils { val proxyHost = conf.get(PROXY_HOST, null) val proxyPortAsString = conf.get(PROXY_PORT, null) - if (proxyHost != null && proxyPortAsString != null) { - val proxyPort = proxyPortAsString.toInt - validatePortNumber(proxyPort, PROXY_PORT) - - Some(ProxyConfig( - host = proxyHost, - port = proxyPort, - noProxyHosts = conf.getTrimmedStrings(NO_PROXY_HOSTS).toSeq, - authToken = Option(conf.get(PROXY_AUTH_TOKEN, null)), - caCertPath = Option(conf.get(CA_CERT_PATH, null)), - sslTrustAll = conf.getBoolean(SSL_TRUST_ALL_CONF, SSL_TRUST_ALL_DEFAULT.toBoolean) - )) - } else { - None + if (proxyHost == null && proxyPortAsString == null) { + return None } + + validateNonEmpty(proxyHost, PROXY_HOST) + validateNonEmpty(proxyPortAsString, PROXY_PORT) + val proxyPort = proxyPortAsString.toInt + validatePortNumber(proxyPort, PROXY_PORT) + + Some(ProxyConfig( + host = proxyHost, + port = proxyPort, + noProxyHosts = conf.getTrimmedStrings(NO_PROXY_HOSTS).toSeq, + authToken = Option(conf.get(PROXY_AUTH_TOKEN, null)), + caCertPath = Option(conf.get(CA_CERT_PATH, null)), + sslTrustAll = conf.getBoolean(SSL_TRUST_ALL_CONF, SSL_TRUST_ALL_DEFAULT.toBoolean) + )) }