Skip to content

Commit

Permalink
Merge branch 'main' into feature/delta-credentials-as-opts
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenayers authored Feb 5, 2025
2 parents 1b23a84 + af3128a commit 1fe4165
Show file tree
Hide file tree
Showing 22 changed files with 1,285 additions and 161 deletions.
41 changes: 28 additions & 13 deletions .github/workflows/build-kernel-wheels.yml
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
name: Build wheels for 4 os for delta-kernel-rust-sharing-wrapper
name: Build delta-kernel-rust-sharing-wrapper wheels for 4 OS and 2 architectures

on:
push:
paths:
- python/delta-sharing-kernel/**
- python/delta-kernel-rust-sharing-wrapper/**
- .github/workflows/**
pull_request:
paths:
- python/delta-sharing-kernel/**
- python/delta-kernel-rust-sharing-wrapper/**
- .github/workflows/**

jobs:
build:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
os: [ubuntu-latest, ubuntu-20.04, macos-latest, windows-latest]
python-version: [3.8]
arch: [x86_64, arm64]
include:
- os: macos-latest
arch: x86_64
- os: macos-latest
arch: arm64
exclude:
- os: ubuntu-latest
arch: x86_64
arch: arm64
- os: ubuntu-20.04
arch: arm64
- os: windows-latest
arch: x86_64
arch: arm64

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -69,8 +67,25 @@ jobs:
maturin build --release
shell: bash

- name: Build wheel (x86_64 Linux Ubuntu 20.04)
if: matrix.os == 'ubuntu-20.04'
run: |
cd python/delta-kernel-rust-sharing-wrapper
maturin build --release
shell: bash

- name: Upload wheels
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: wheels
name: wheel-${{ matrix.os }}-${{ matrix.arch }}
path: python/delta-kernel-rust-sharing-wrapper/target/wheels/*.whl

merge:
runs-on: ubuntu-latest
needs: build
steps:
- name: Merge Artifacts
uses: actions/upload-artifact/merge@v4
with:
name: all-wheels
pattern: wheel-*
2 changes: 1 addition & 1 deletion PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -2077,7 +2077,7 @@ delta-table-version: 123
### Read Change Data Feed from a Table
This is the API for clients to read change data feed from a table.

The API supports a start parameter and and an end parameter. The start/end parameter can either be a version or a timestamp. The start parameter must be provided. If the end parameter is not provided, the API will use the latest table version for it. The parameter range is inclusive in the query.
The API supports a start parameter and an end parameter. The start/end parameter can either be a version or a timestamp. The start parameter must be provided. If the end parameter is not provided, the API will use the latest table version for it. The parameter range is inclusive in the query.

You can specify a version as a Long or a timestamp as a string in the [Timestamp Format](#timestamp-format).

Expand Down
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,18 @@ If this doesn’t work because of an issue downloading delta-kernel-rust-sharing
- Check python3 version >= 3.8
- Upgrade your pip3 to the latest version
- Check the linux glibc version >= 2.31
- [Install Rust](https://www.rust-lang.org/tools/install)

If you cannot upgrade glibc or your OS is not supported by the delta-kernel-rust-sharing-wrapper pypi install you will need to install the package manually.
See https://pypi.org/project/delta-kernel-rust-sharing-wrapper/0.1.0/#files for supported OS.
If you cannot upgrade glibc or PyPI does not have a pre-built wheel for delta-kernel-rust-sharing-wrapper for your environment, pip will have to build the package from source, which requires Rust to be installed.
See https://pypi.org/project/delta-kernel-rust-sharing-wrapper/0.2.1/#files for environments that have a pre-built wheel.

To install the delta-kernel-rust-sharing-wrapper package manually:
You can also use an older version of the delta-sharing package which did not bake delta-kernel-rust-sharing-wrapper into the installation with the following:
```
# you need to use the older version of the delta-sharing package which did not bake delta-kernel-rust-sharing-wrapper into the installation
pip3 install delta-sharing==1.1.0
```

You can also install the delta-kernel-rust-sharing-wrapper package manually:
```
cd [delta-sharing-root]/python/delta-kernel-rust-sharing-wrapper
python3 -m venv .venv
source .venv/bin/activate
Expand Down Expand Up @@ -108,6 +111,9 @@ If the table supports history sharing(`tableConfig.cdfEnabled=true` in the OSS D
# Load table changes from version 0 to version 5, as a Pandas DataFrame.
delta_sharing.load_table_changes_as_pandas(table_url, starting_version=0, ending_version=5)

# Load table changes from version 0 to version 5 as a Pandas DataFrame, explicitly using Delta Format.
delta_sharing.load_table_changes_as_pandas(table_url, starting_version=0, ending_version=5, use_delta_format=True)

# If the code is running with PySpark, you can load table changes as Spark DataFrame.
delta_sharing.load_table_changes_as_spark(table_url, starting_version=0, ending_version=5)
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,47 @@ private[client] class OAuthClient(httpClient:
}

private def parseOAuthTokenResponse(response: String): OAuthClientCredentials = {
// Parsing the response per oauth spec
// https://datatracker.ietf.org/doc/html/rfc6749#section-5.1
if (response == null || response.isEmpty) {
throw new RuntimeException("Empty response from OAuth token endpoint")
}
val jsonNode = JsonUtils.readTree(response)
if (!jsonNode.has("access_token") || !jsonNode.get("access_token").isTextual) {
throw new RuntimeException("Missing 'access_token' field in OAuth token response")
}
if (!jsonNode.has("expires_in") || !jsonNode.get("expires_in").isNumber) {
if (!jsonNode.has("expires_in")) {
throw new RuntimeException("Missing 'expires_in' field in OAuth token response")
}

// OAuth spec requires 'expires_in' to be an integer, e.g., 3600.
// See https://datatracker.ietf.org/doc/html/rfc6749#section-5.1
// But some token endpoints return `expires_in` as a string e.g., "3600".
// This ensures that we support both integer and string values for 'expires_in' field.
// Example request resulting in 'expires_in' as a string:
// curl -X POST \
// https://login.windows.net/$TENANT_ID/oauth2/token \
// -H "Content-Type: application/x-www-form-urlencoded" \
// -d "grant_type=client_credentials" \
// -d "client_id=$CLIENT_ID" \
// -d "client_secret=$CLIENT_SECRET" \
// -d "scope=https://graph.microsoft.com/.default"
val expiresIn : Long = jsonNode.get("expires_in") match {
case n if n.isNumber => n.asLong()
case n if n.isTextual =>
try {
n.asText().toLong
} catch {
case _: NumberFormatException =>
throw new RuntimeException("Invalid 'expires_in' field in OAuth token response")
}
case _ =>
throw new RuntimeException("Invalid 'expires_in' field in OAuth token response")
}

OAuthClientCredentials(
jsonNode.get("access_token").asText(),
jsonNode.get("expires_in").asLong(),
expiresIn,
System.currentTimeMillis()
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ object ConfUtils {
val NEVER_USE_HTTPS = "spark.delta.sharing.network.never.use.https"
val NEVER_USE_HTTPS_DEFAULT = "false"

val STRUCTURAL_SCHEMA_MATCH_CONF = "spark.delta.sharing.client.useStructuralSchemaMatch"
val STRUCTURAL_SCHEMA_MATCH_DEFAULT = "false"

def getProxyConfig(conf: Configuration): Option[ProxyConfig] = {
val proxyHost = conf.get(PROXY_HOST, null)
val proxyPortAsString = conf.get(PROXY_PORT, null)
Expand Down Expand Up @@ -286,6 +289,9 @@ object ConfUtils {
maxDur
}

def structuralSchemaMatchingEnabled(conf: SQLConf): Boolean =
conf.getConfString(STRUCTURAL_SCHEMA_MATCH_CONF, STRUCTURAL_SCHEMA_MATCH_DEFAULT).toBoolean

private def toTimeInSeconds(timeStr: String, conf: String): Int = {
val timeInSeconds = JavaUtils.timeStringAs(timeStr, TimeUnit.SECONDS)
validateNonNeg(timeInSeconds, conf)
Expand Down
21 changes: 17 additions & 4 deletions client/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import io.delta.sharing.client.model.{
}
import io.delta.sharing.client.util.ConfUtils
import io.delta.sharing.spark.perf.DeltaSharingLimitPushDown
import io.delta.sharing.spark.util.SchemaUtils

/**
* Used to query the current state of the transaction logs of a remote shared Delta table.
Expand Down Expand Up @@ -229,11 +230,23 @@ class RemoteSnapshot(
}

private def checkSchemaNotChange(newMetadata: Metadata): Unit = {
if (newMetadata.schemaString != metadata.schemaString ||
val schemaChangedException = new SparkException(
s"""The schema or partition columns of your Delta table has changed since your
|DataFrame was created. Please redefine your DataFrame""".stripMargin)

if (ConfUtils.structuralSchemaMatchingEnabled(spark.sessionState.conf)) {
val newSchema = DataType.fromJson(newMetadata.schemaString).asInstanceOf[StructType]
val currentSchema = DataType.fromJson(metadata.schemaString).asInstanceOf[StructType]

if (
metadata.partitionColumns != newMetadata.partitionColumns ||
!SchemaUtils.isReadCompatible(currentSchema, newSchema)
) {
throw schemaChangedException
}
} else if (newMetadata.schemaString != metadata.schemaString ||
newMetadata.partitionColumns != metadata.partitionColumns) {
throw new SparkException(
s"""The schema or partition columns of your Delta table has changed since your
|DataFrame was created. Please redefine your DataFrame""")
throw schemaChangedException
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import org.apache.http.impl.bootstrap.{HttpServer, ServerBootstrap}
import org.apache.http.impl.client.{CloseableHttpClient, HttpClients}
import org.apache.http.protocol.{HttpContext, HttpRequestHandler}
import org.apache.spark.SparkFunSuite
import org.scalatest.prop.TableDrivenPropertyChecks

class OAuthClientSuite extends SparkFunSuite {
class OAuthClientSuite extends SparkFunSuite with TableDrivenPropertyChecks {
var server: HttpServer = _

def startServer(handler: HttpRequestHandler): Unit = {
Expand Down Expand Up @@ -58,40 +59,66 @@ class OAuthClientSuite extends SparkFunSuite {
throw new RuntimeException(s"Port $port is not released after $timeoutMillis milliseconds")
}

test("OAuthClient should parse token response correctly") {
val handler = new HttpRequestHandler {
@throws[HttpException]
@throws[IOException]
override def handle(request: HttpRequest,
response: HttpResponse,
context: HttpContext): Unit = {
val responseBody =
"""{
| "access_token": "test-access-token",
| "expires_in": 3600,
| "token_type": "bearer"
|}""".stripMargin
response.setEntity(new StringEntity(responseBody, ContentType.APPLICATION_JSON))
response.setStatusCode(200)
case class TokenExchangeSuccessScenario(responseBody: String,
expectedAccessToken: String,
expectedExpiresIn: Long)

// OAuth spec requires 'expires_in' to be an integer, e.g., 3600.
// See https://datatracker.ietf.org/doc/html/rfc6749#section-5.1
// But some token endpoints return `expires_in` as a string e.g., "3600".
// This test ensures the client can handle such cases.
// The test case ensures that we support both integer and string values for 'expires_in' field.
private val tokenExchangeSuccessScenarios = Table(
"testScenario",
TokenExchangeSuccessScenario(
responseBody = """{
| "access_token": "test-access-token",
| "expires_in": 3600,
| "token_type": "bearer"
|}""".stripMargin,
expectedAccessToken = "test-access-token",
expectedExpiresIn = 3600
),
TokenExchangeSuccessScenario(
responseBody = """{
| "access_token": "test-access-token",
| "expires_in": "3600",
| "token_type": "bearer"
|}""".stripMargin,
expectedAccessToken = "test-access-token",
expectedExpiresIn = 3600
)
)

forAll(tokenExchangeSuccessScenarios) { testScenario =>
test(s"OAuthClient should parse token response correctly for ${testScenario.responseBody}") {
val handler = new HttpRequestHandler {
@throws[HttpException]
@throws[IOException]
override def handle(request: HttpRequest,
response: HttpResponse,
context: HttpContext): Unit = {
response.setEntity(
new StringEntity(testScenario.responseBody, ContentType.APPLICATION_JSON))
response.setStatusCode(200)
}
}
}
startServer(handler)
startServer(handler)

val httpClient: CloseableHttpClient = HttpClients.createDefault()
val oauthClient = new OAuthClient(httpClient, AuthConfig(),
"http://localhost:1080/token", "client-id", "client-secret")

val start = System.currentTimeMillis()
val httpClient: CloseableHttpClient = HttpClients.createDefault()
val oauthClient = new OAuthClient(httpClient, AuthConfig(),
"http://localhost:1080/token", "client-id", "client-secret")

val token = oauthClient.clientCredentials()
val start = System.currentTimeMillis()
val token = oauthClient.clientCredentials()
val end = System.currentTimeMillis()

val end = System.currentTimeMillis()
assert(token.accessToken == testScenario.expectedAccessToken)
assert(token.expiresIn == testScenario.expectedExpiresIn)
assert(token.creationTimestamp >= start && token.creationTimestamp <= end)

assert(token.accessToken == "test-access-token")
assert(token.expiresIn == 3600)
assert(token.creationTimestamp >= start && token.creationTimestamp <= end)

stopServer()
stopServer()
}
}

test("OAuthClient should handle 401 Unauthorized response") {
Expand Down
1 change: 1 addition & 0 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This is the Python client library for Delta Sharing, which lets you load shared
## Installation and Usage

1. Install using `pip install delta-sharing`.
a. On some environments, you may also need to [install Rust](https://www.rust-lang.org/tools/install). This is because the `delta-sharing` package depends on the `delta-kernel-rust-sharing-wrapper` package, which does not have a pre-built Python wheel for all environments. As a result, pip will have to build `delta-kernel-rust-sharing-wrapper` from source.
2. To use the Python Connector, see [the project docs](https://github.com/delta-io/delta-sharing) for details.

## Documentation
Expand Down
1 change: 1 addition & 0 deletions python/delta-kernel-rust-sharing-wrapper/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Cargo.lock
8 changes: 4 additions & 4 deletions python/delta-kernel-rust-sharing-wrapper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@
name = "delta-kernel-rust-sharing-wrapper"
edition = "2021"
license = "Apache-2.0"
version = "0.1.0"
version = "0.2.2"

[lib]
name = "delta_kernel_rust_sharing_wrapper"
# "cdylib" is necessary to produce a shared library for Python to import from.
crate-type = ["cdylib"]

[dependencies]
arrow = { version = "53.3.0", features = ["pyarrow"] }
delta_kernel = {version = "0.5", features = ["cloud", "default", "default-engine"]}
arrow = { version = "54.0.0", features = ["pyarrow"] }
delta_kernel = { version = "0.6.1", features = ["cloud", "default-engine"]}
openssl = { version = "0.10", features = ["vendored"] }
url = "2"

[dependencies.pyo3]
version = "0.22.4"
version = "0.23.3"
# "abi3-py38" tells pyo3 (and maturin) to build using the stable ABI with minimum Python version 3.8
features = ["abi3-py38"]
Loading

0 comments on commit 1fe4165

Please sign in to comment.