From 660899ed0789fe857f9a112a16c0419804a94ebb Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Fri, 17 May 2024 14:56:16 -0700 Subject: [PATCH 01/23] use async api in delta sharing client for view query --- .../sharing/client/DeltaSharingClient.scala | 235 +++++++++++++++--- .../scala/io/delta/sharing/client/model.scala | 8 + .../delta/sharing/client/util/ConfUtils.scala | 50 ++++ .../DeltaSharingRestClientDeltaSuite.scala | 2 +- .../client/DeltaSharingRestClientSuite.scala | 138 +++++++--- server/src/main/protobuf/protocol.proto | 21 ++ .../sharing/server/DeltaSharingService.scala | 109 ++++++-- .../scala/io/delta/sharing/server/model.scala | 10 + .../server/DeltaSharingServiceSuite.scala | 56 ++++- 9 files changed, 528 insertions(+), 101 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index 11e9ad7b1..3dddb3f73 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -16,7 +16,7 @@ package io.delta.sharing.client -import java.io.{BufferedReader, InputStream, InputStreamReader} +import java.io.{BufferedReader, InputStreamReader} import java.net.{URL, URLEncoder} import java.nio.charset.StandardCharsets.UTF_8 import java.sql.Timestamp @@ -24,7 +24,6 @@ import java.time.LocalDateTime import java.time.format.DateTimeFormatter.ISO_DATE_TIME import java.util.UUID -import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, ListBuffer} import org.apache.commons.io.IOUtils @@ -33,7 +32,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.util.VersionInfo import org.apache.http.{HttpHeaders, HttpHost, HttpStatus} import org.apache.http.client.config.RequestConfig -import org.apache.http.client.methods.{HttpGet, HttpHead, HttpPost, HttpRequestBase} +import org.apache.http.client.methods.{HttpGet, HttpPost, HttpRequestBase} import org.apache.http.client.protocol.HttpClientContext import org.apache.http.conn.ssl.{SSLConnectionSocketFactory, SSLContextBuilder, TrustSelfSignedStrategy} import org.apache.http.entity.StringEntity @@ -88,6 +87,16 @@ private[sharing] trait PaginationResponse { def nextPageToken: Option[String] } +private[sharing] trait PaginationRequest { + def maxFiles: Option[Int] + def pageToken: Option[String] + + def clone( + maxFiles: Option[Int], + pageToken: Option[String], + unsetRefreshToken: Boolean = false): PaginationRequest +} + private[sharing] case class QueryTableRequest( predicateHints: Seq[String], limitHint: Option[Long], @@ -99,8 +108,39 @@ private[sharing] case class QueryTableRequest( maxFiles: Option[Int], pageToken: Option[String], includeRefreshToken: Option[Boolean], - refreshToken: Option[String] -) + refreshToken: Option[String], + idempotencyKey: Option[String], + requestedColumns: Seq[String] +) extends PaginationRequest { + override def clone( + maxFiles: Option[Int], + pageToken: Option[String], + unsetRefreshToken: Boolean = false): PaginationRequest = { + if (unsetRefreshToken) { + this.copy( + maxFiles = maxFiles, + pageToken = pageToken, + refreshToken = None, + includeRefreshToken = None) + } + else { + this.copy(maxFiles = maxFiles, pageToken = pageToken) + } + } +} + +private[sharing] case class GetQueryInfoRequest( + queryId: String, + maxFiles: Option[Int], + pageToken: Option[String] + ) extends PaginationRequest { + override def clone( + maxFiles: Option[Int], + pageToken: Option[String], + unsetRefreshToken: Boolean = false): PaginationRequest = { + this.copy(maxFiles = maxFiles, pageToken = pageToken) + } +} private[sharing] case class ListSharesResponse( items: Seq[Share], @@ -112,16 +152,19 @@ private[sharing] case class ListAllTablesResponse( /** A REST client to fetch Delta metadata from remote server. */ class DeltaSharingRestClient( - profileProvider: DeltaSharingProfileProvider, - timeoutInSeconds: Int = 120, - numRetries: Int = 10, - maxRetryDuration: Long = Long.MaxValue, - sslTrustAll: Boolean = false, - forStreaming: Boolean = false, - responseFormat: String = DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET, - readerFeatures: String = "", - queryTablePaginationEnabled: Boolean = false, - maxFilesPerReq: Int = 100000 + profileProvider: DeltaSharingProfileProvider, + timeoutInSeconds: Int = 120, + numRetries: Int = 10, + maxRetryDuration: Long = Long.MaxValue, + sslTrustAll: Boolean = false, + forStreaming: Boolean = false, + responseFormat: String = DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET, + readerFeatures: String = "", + queryTablePaginationEnabled: Boolean = false, + maxFilesPerReq: Int = 100000, + enableAsyncQuery: Boolean = false, + asyncQueryPollIntervalMillis: Long = 1000L, + asyncQueryMaxDuration: Long = Long.MaxValue ) extends DeltaSharingClient with Logging { import DeltaSharingRestClient._ @@ -264,6 +307,7 @@ class DeltaSharingRestClient( table = s"${table.share}.${table.schema}.${table.name}" ) + if (respondedFormat == RESPONSE_FORMAT_DELTA) { return DeltaTableMetadata(version, lines = lines, respondedFormat = respondedFormat) } @@ -300,6 +344,13 @@ class DeltaSharingRestClient( val encodedTableName = URLEncoder.encode(table.name, "UTF-8") val target = getTargetUrl( s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/$encodedTableName/query") + + val idempotencyKey = if (enableAsyncQuery) { + Some(UUID.randomUUID().toString) + } else { + None + } + val request: QueryTableRequest = QueryTableRequest( predicateHints = predicates, limitHint = limit, @@ -311,16 +362,18 @@ class DeltaSharingRestClient( maxFiles = None, pageToken = None, includeRefreshToken = Some(includeRefreshToken), - refreshToken = refreshToken + refreshToken = refreshToken, + idempotencyKey = idempotencyKey, + requestedColumns = Seq.empty[String] ) val (version, respondedFormat, lines, refreshTokenOpt) = if (queryTablePaginationEnabled) { logInfo( s"Making paginated queryTable requests for table " + s"${table.share}.${table.schema}.${table.name} with maxFiles=$maxFilesPerReq" ) - getFilesByPage(target, request) + getFilesByPage(table, target, request) } else { - val (version, respondedFormat, lines) = getNDJson(target, request) + val (version, respondedFormat, lines, _) = getNDJsonWithAsync(table, target, request) val (filteredLines, endStreamAction) = maybeExtractEndStreamAction(lines) val refreshTokenOpt = endStreamAction.flatMap { e => Option(e.refreshToken).flatMap { token => @@ -391,14 +444,16 @@ class DeltaSharingRestClient( maxFiles = None, pageToken = None, includeRefreshToken = None, - refreshToken = None + refreshToken = None, + idempotencyKey = None, + requestedColumns = Seq.empty[String] ) val (version, respondedFormat, lines) = if (queryTablePaginationEnabled) { logInfo( s"Making paginated queryTable from version $startingVersion requests for table " + s"${table.share}.${table.schema}.${table.name} with maxFiles=$maxFilesPerReq" ) - val (version, respondedFormat, lines, _) = getFilesByPage(target, request) + val (version, respondedFormat, lines, _) = getFilesByPage(table, target, request) (version, respondedFormat, lines) } else { getNDJson(target, request) @@ -442,6 +497,7 @@ class DeltaSharingRestClient( // Send paginated queryTable requests. Loop internally to fetch and concatenate all pages, // then return (version, respondedFormat, actions, refreshToken) tuple. private def getFilesByPage( + table: Table, targetUrl: String, request: QueryTableRequest): (Long, String, Seq[String], Option[String]) = { val allLines = ArrayBuffer[String]() @@ -449,12 +505,17 @@ class DeltaSharingRestClient( var numPages = 1 // Fetch first page - var updatedRequest = request.copy(maxFiles = Some(maxFilesPerReq)) - val (version, respondedFormat, lines) = getNDJson(targetUrl, updatedRequest) + val updatedRequest = request.copy( + maxFiles = Some(maxFilesPerReq), + pageToken = request.pageToken) + + val (version, respondedFormat, lines, queryIdOpt) + = getNDJsonWithAsync(table, targetUrl, updatedRequest) var (filteredLines, endStreamAction) = maybeExtractEndStreamAction(lines) if (endStreamAction.isEmpty) { logWarning("EndStreamAction is not returned in the response for paginated query.") } + val protocol = filteredLines(0) val metadata = filteredLines(1) // Extract refresh token if available @@ -473,16 +534,32 @@ class DeltaSharingRestClient( endStreamAction.get.nextPageToken != null && endStreamAction.get.nextPageToken.nonEmpty) { numPages += 1 - updatedRequest = updatedRequest.copy( - pageToken = Some(endStreamAction.get.nextPageToken), - // Unset includeRefreshToken and refreshToken because they can only be used in - // the first page request. - includeRefreshToken = None, - refreshToken = None - ) + + val (pagingRequest, nextPageUrl) = if (queryIdOpt.isDefined) { + ( + GetQueryInfoRequest( + queryId = queryIdOpt.get, + maxFiles = Some(maxFilesPerReq), + pageToken = Some(endStreamAction.get.nextPageToken)), + getQueryInfoTargetUrl(table, queryIdOpt.get) + ) + } else { + ( + updatedRequest.clone( + maxFiles = Some(maxFilesPerReq), + pageToken = Some(endStreamAction.get.nextPageToken), + // Unset includeRefreshToken and refreshToken + // because they can only be used in + // the first page request. + unsetRefreshToken = true + ), + targetUrl + ) + } + val res = fetchNextPageFiles( - targetUrl = targetUrl, - requestBody = Some(updatedRequest), + targetUrl = nextPageUrl, + requestBody = Some(pagingRequest), expectedVersion = version, expectedRespondedFormat = respondedFormat, expectedProtocol = protocol, @@ -629,7 +706,7 @@ class DeltaSharingRestClient( // if it's not returned in the response. private def fetchNextPageFiles( targetUrl: String, - requestBody: Option[QueryTableRequest], + requestBody: Option[PaginationRequest], expectedVersion: Long, expectedRespondedFormat: String, expectedProtocol: String, @@ -719,6 +796,77 @@ class DeltaSharingRestClient( ) } + private def getQueryInfoTargetUrl(table: Table, queryId: String) = { + val shareName = URLEncoder.encode(table.share, "UTF-8") + val schemaName = URLEncoder.encode(table.schema, "UTF-8") + val tableName = URLEncoder.encode(table.name, "UTF-8") + val encodedQueryId = URLEncoder.encode(queryId, "UTF-8") + val target = getTargetUrl( + s"/shares/$shareName/schemas/$schemaName/tables/$tableName/queries/$encodedQueryId") + target + } + + private def getQueryInfo( + table: Table, + queryId: String, + maxFiles: Option[Int], + pageToken: Option[String]): (Long, String, Seq[String]) = { + val target: String = getQueryInfoTargetUrl(table, queryId) + val request: GetQueryInfoRequest = GetQueryInfoRequest( + queryId = queryId, + maxFiles = maxFiles, + pageToken = pageToken) + + getNDJson(target, request) + } + + private def checkQueryPending(lines: Seq[String]): (Seq[String], Option[String], Boolean) = { + val queryStatus = JsonUtils.fromJson[SingleAction](lines(0)).queryStatus + + if(queryStatus == null) { + (lines, None, false) + } else { + (lines.drop(1), Some(queryStatus.queryId), queryStatus.status == QUERY_PENDING_TRUE) + } + } + + private def getNDJsonWithAsync( + table: Table, + target: String, + request: QueryTableRequest): (Long, String, Seq[String], Option[String]) = { + // Initial query to get NDJson data + val (initialVersion, initialRespondedFormat, initialLines) = getNDJson(target, request) + + // Check if the query is still pending + var (lines, queryIdOpt, queryPending) = checkQueryPending(initialLines) + + var version = initialVersion + var respondedFormat = initialRespondedFormat + + val startTime = System.currentTimeMillis() + // Loop while the query is pending + while (queryPending) { + if (System.currentTimeMillis() - startTime > asyncQueryMaxDuration) { + throw new IllegalStateException("Query is still pending after " + + s"${asyncQueryMaxDuration} ms. Please try again later.") + } + Thread.sleep(asyncQueryPollIntervalMillis) + + val queryId = queryIdOpt.get + val (currentVersion, currentRespondedFormat, currentLines) + = getQueryInfo(table, queryId, request.maxFiles, request.pageToken) + val (newLines, _, newQueryPending) = checkQueryPending(currentLines) + + version = currentVersion + respondedFormat = currentRespondedFormat + lines = newLines + queryPending = newQueryPending + } + + (version, respondedFormat, lines, queryIdOpt) + } + + private def getNDJson[T: Manifest](target: String, data: T): (Long, String, Seq[String]) = { val httpPost = new HttpPost(target) val json = JsonUtils.toJson(data) @@ -910,7 +1058,13 @@ class DeltaSharingRestClient( if (responseFormatSet.contains(RESPONSE_FORMAT_DELTA) && readerFeatures.nonEmpty) { capabilities = capabilities :+ s"$READER_FEATURES=$readerFeatures" } - capabilities.mkString(DELTA_SHARING_CAPABILITIES_DELIMITER) + + if(enableAsyncQuery) { + capabilities = capabilities :+ s"$DELTA_SHARING_CAPABILITIES_ASYNC_READ=true" + } + + val cap = capabilities.mkString(DELTA_SHARING_CAPABILITIES_DELIMITER) + cap } def close(): Unit = { @@ -930,9 +1084,11 @@ object DeltaSharingRestClient extends Logging { val RESPONSE_TABLE_VERSION_HEADER_KEY = "Delta-Table-Version" val RESPONSE_FORMAT = "responseformat" val READER_FEATURES = "readerfeatures" + val DELTA_SHARING_CAPABILITIES_ASYNC_READ = "asyncquery" val RESPONSE_FORMAT_DELTA = "delta" val RESPONSE_FORMAT_PARQUET = "parquet" val DELTA_SHARING_CAPABILITIES_DELIMITER = ";" + val QUERY_PENDING_TRUE = "pending" lazy val USER_AGENT = { try { @@ -1018,6 +1174,9 @@ object DeltaSharingRestClient extends Logging { val timeoutInSeconds = ConfUtils.timeoutInSeconds(sqlConf) val queryTablePaginationEnabled = ConfUtils.queryTablePaginationEnabled(sqlConf) val maxFilesPerReq = ConfUtils.maxFilesPerQueryRequest(sqlConf) + val useAsyncQuery = ConfUtils.useAsyncQuery(sqlConf) + val asyncQueryMaxDurationMillis = ConfUtils.asyncQueryTimeout(sqlConf) + val asyncQueryPollDurationMillis = ConfUtils.asyncQueryPollIntervalMillis(sqlConf) val clientClass = ConfUtils.clientClass(sqlConf) Class.forName(clientClass) @@ -1031,7 +1190,10 @@ object DeltaSharingRestClient extends Logging { classOf[String], classOf[String], classOf[Boolean], - classOf[Int] + classOf[Int], + classOf[Boolean], + classOf[Long], + classOf[Long] ).newInstance(profileProvider, java.lang.Integer.valueOf(timeoutInSeconds), java.lang.Integer.valueOf(numRetries), @@ -1041,7 +1203,10 @@ object DeltaSharingRestClient extends Logging { responseFormat, readerFeatures, java.lang.Boolean.valueOf(queryTablePaginationEnabled), - java.lang.Integer.valueOf(maxFilesPerReq) + java.lang.Integer.valueOf(maxFilesPerReq), + java.lang.Boolean.valueOf(useAsyncQuery), + java.lang.Long.valueOf(asyncQueryPollDurationMillis), + java.lang.Long.valueOf(asyncQueryMaxDurationMillis) ).asInstanceOf[DeltaSharingClient] } } diff --git a/client/src/main/scala/io/delta/sharing/client/model.scala b/client/src/main/scala/io/delta/sharing/client/model.scala index a3b933bfa..d60e40d7f 100644 --- a/client/src/main/scala/io/delta/sharing/client/model.scala +++ b/client/src/main/scala/io/delta/sharing/client/model.scala @@ -70,6 +70,7 @@ private[sharing] case class SingleAction( remove: RemoveFile = null, metaData: Metadata = null, protocol: Protocol = null, + queryStatus: QueryStatus = null, endStreamAction: EndStreamAction = null) { def unwrap: Action = { @@ -93,6 +94,13 @@ private[sharing] case class SingleAction( } } +case class QueryStatus( + queryId: String = null, + status: String = null + ) extends Action { + override def wrap: SingleAction = SingleAction(queryStatus = this) +} + private[sharing] case class Format(provider: String = "parquet") private[sharing] case class Metadata( diff --git a/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala b/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala index 0b1b1eb4d..af94f9ded 100644 --- a/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala +++ b/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala @@ -30,6 +30,18 @@ object ConfUtils { val MAX_RETRY_DURATION_CONF = "spark.delta.sharing.network.maxRetryDuration" val MAX_RETRY_DURATION_DEFAULT_MILLIS = 10L * 60L* 1000L /* 10 mins */ + val ASYNC_QUERY_POLL_INTERVAL_CONF = "spark.delta.sharing.network.asyncQueryRetryInterval" + val ASYNC_QUERY_POLL_INTERVAL_DEFAULT_MILLIS = 10L * 1000L /* 10 seconds */ + + val ASYNC_QUERY_MAX_RETRY_COUNT_CONF = "spark.delta.sharing.network.asyncQueryMaxRetryCount" + val ASYNC_QUERY_MAX_RETRY_COUNT_DEFAULT = 10 + + val ASYNC_QUERY_TIMEOUT_CONF = "spark.delta.sharing.network.asyncQueryTimeout" + val ASYNC_QUERY_TIMEOUT_DEFAULT_MILLIS = 10L * 60L * 1000L /* 10 mins */ + + val USE_ASYNC_QUERY_CONF = "spark.delta.sharing.network.useAsyncQuery" + val USE_ASYNC_QUERY_DEFAULT = "false" + val TIMEOUT_CONF = "spark.delta.sharing.network.timeout" val TIMEOUT_DEFAULT = "320s" @@ -111,6 +123,44 @@ object ConfUtils { maxDur } + def asyncQueryPollIntervalMillis(conf: Configuration): Long = { + val interval = conf.getLong( + ASYNC_QUERY_POLL_INTERVAL_CONF, + ASYNC_QUERY_POLL_INTERVAL_DEFAULT_MILLIS) + validateNonNeg(interval, ASYNC_QUERY_POLL_INTERVAL_CONF) + interval + } + + def asyncQueryPollIntervalMillis(conf: SQLConf): Long = { + val interval = conf.getConfString( + ASYNC_QUERY_POLL_INTERVAL_CONF, + ASYNC_QUERY_POLL_INTERVAL_DEFAULT_MILLIS.toString).toLong + validateNonNeg(interval, ASYNC_QUERY_POLL_INTERVAL_CONF) + interval + } + + def asyncQueryTimeout(conf: Configuration): Long = { + val timeout = conf.getLong(ASYNC_QUERY_TIMEOUT_CONF, ASYNC_QUERY_TIMEOUT_DEFAULT_MILLIS) + validateNonNeg(timeout, ASYNC_QUERY_TIMEOUT_CONF) + timeout + } + + def asyncQueryTimeout(conf: SQLConf): Long = { + val timeout = conf.getConfString( + ASYNC_QUERY_TIMEOUT_CONF, + ASYNC_QUERY_TIMEOUT_DEFAULT_MILLIS.toString).toLong + validateNonNeg(timeout, ASYNC_QUERY_TIMEOUT_CONF) + timeout + } + + def useAsyncQuery(conf: Configuration): Boolean = { + conf.getBoolean(USE_ASYNC_QUERY_CONF, USE_ASYNC_QUERY_DEFAULT.toBoolean) + } + + def useAsyncQuery(conf: SQLConf): Boolean = { + conf.getConfString(USE_ASYNC_QUERY_CONF, USE_ASYNC_QUERY_DEFAULT).toBoolean + } + def timeoutInSeconds(conf: Configuration): Int = { val timeoutStr = conf.get(TIMEOUT_CONF, TIMEOUT_DEFAULT) toTimeInSeconds(timeoutStr, TIMEOUT_CONF) diff --git a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientDeltaSuite.scala b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientDeltaSuite.scala index 7adc6088a..2a4bb9fdf 100644 --- a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientDeltaSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientDeltaSuite.scala @@ -93,7 +93,7 @@ class DeltaSharingRestClientDeltaSuite extends DeltaSharingIntegrationTest { } } - integrationTest("getFiles") { + integrationTest("getFiles include async") { def verifyTableFiles(tableFiles: DeltaTableFiles): Unit = { checkDeltaTableFilesBasics(tableFiles, expectedVersion = 2, expectedNumLines = 4, minWriterVersion = 2) assert(tableFiles.lines(1).contains("""{"deltaMetadata":{"id":"f8d5c169-3d01-4ca3-ad9e-7dc3355aedb2","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"eventTime\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["date"],"configuration":{},"createdTime":1619652806049""")) diff --git a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala index 2f3e6e3fc..4eb1d0680 100644 --- a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala @@ -293,7 +293,68 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { assert(errorMessage.contains("is before the earliest version available")) } - integrationTest("getFiles") { + integrationTest("getFiles using async api") { + def verifyTableFiles(tableFiles: DeltaTableFiles): Unit = { + assert(tableFiles.version == 2) + assert(Protocol(minReaderVersion = 1) == tableFiles.protocol) + val expectedMetadata = Metadata( + id = "f8d5c169-3d01-4ca3-ad9e-7dc3355aedb2", + format = Format(), + schemaString = + """{"type":"struct","fields":[{"name":"eventTime","type":"timestamp","nullable":true,"metadata":{}},{"name":"date","type":"date","nullable":true,"metadata":{}}]}""", + partitionColumns = Seq("date") + ) + assert(expectedMetadata == tableFiles.metadata) + assert(tableFiles.files.size == 2) + val expectedFiles = Seq( + AddFile( + url = tableFiles.files(0).url, + expirationTimestamp = tableFiles.files(0).expirationTimestamp, + id = "9f1a49539c5cffe1ea7f9e055d5c003c", + partitionValues = Map("date" -> "2021-04-28"), + size = 573, + stats = + """{"numRecords":1,"minValues":{"eventTime":"2021-04-28T23:33:57.955Z"},"maxValues":{"eventTime":"2021-04-28T23:33:57.955Z"},"nullCount":{"eventTime":0}}""" + ), + AddFile( + url = tableFiles.files(1).url, + expirationTimestamp = tableFiles.files(1).expirationTimestamp, + id = "cd2209b32f5ed5305922dd50f5908a75", + partitionValues = Map("date" -> "2021-04-28"), + size = 573, + stats = + """{"numRecords":1,"minValues":{"eventTime":"2021-04-28T23:33:48.719Z"},"maxValues":{"eventTime":"2021-04-28T23:33:48.719Z"},"nullCount":{"eventTime":0}}""" + ) + ) + assert(expectedFiles == tableFiles.files.toList) + assert(tableFiles.files(0).expirationTimestamp > System.currentTimeMillis()) + } + + Seq(false, true).foreach { paginationEnabled => {} + val client = new DeltaSharingRestClient( + testProfileProvider, + sslTrustAll = true, + queryTablePaginationEnabled = paginationEnabled, + maxFilesPerReq = 1, + responseFormat = RESPONSE_FORMAT_PARQUET, + enableAsyncQuery = true + ) + val table = Table(name = "table2", schema = "default", share = "share2") + val tableFiles = + client.getFiles( + table, + predicates = Nil, + limit = None, + versionAsOf = None, + timestampAsOf = None, + jsonPredicateHints = None, + refreshToken = None + ) + verifyTableFiles(tableFiles) + } + } + + integrationTest("getFiles with Async") { def verifyTableFiles(tableFiles: DeltaTableFiles): Unit = { assert(tableFiles.version == 2) assert(Protocol(minReaderVersion = 1) == tableFiles.protocol) @@ -340,40 +401,47 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { s"${RESPONSE_FORMAT_DELTA},${RESPONSE_FORMAT_PARQUET}", s"${RESPONSE_FORMAT_PARQUET},${RESPONSE_FORMAT_DELTA}" ).foreach { responseFormat => - Seq(true, false).foreach { paginationEnabled => - val client = new DeltaSharingRestClient( - testProfileProvider, - sslTrustAll = true, - queryTablePaginationEnabled = paginationEnabled, - maxFilesPerReq = 1, - responseFormat = responseFormat - ) - val table = Table(name = "table2", schema = "default", share = "share2") - try { - val tableFiles = - client.getFiles( - table, - predicates = Nil, - limit = None, - versionAsOf = None, - timestampAsOf = None, - jsonPredicateHints = None, - refreshToken = None - ) - verifyTableFiles(tableFiles) - val refreshedTableFiles = - client.getFiles( - table, - predicates = Nil, - limit = None, - versionAsOf = None, - timestampAsOf = None, - jsonPredicateHints = None, - refreshToken = tableFiles.refreshToken - ) - verifyTableFiles(refreshedTableFiles) - } finally { - client.close() + Seq(false).foreach { paginationEnabled => + Seq(true).foreach { asyncEnabled => { + val client = new DeltaSharingRestClient( + testProfileProvider, + sslTrustAll = true, + queryTablePaginationEnabled = paginationEnabled, + maxFilesPerReq = 1, + responseFormat = responseFormat, + enableAsyncQuery = asyncEnabled + ) + val table = Table(name = "table2", schema = "default", share = "share2") + try { + val tableFiles = + client.getFiles( + table, + predicates = Nil, + limit = None, + versionAsOf = None, + timestampAsOf = None, + jsonPredicateHints = None, + refreshToken = None + ) + verifyTableFiles(tableFiles) + + if(tableFiles.refreshToken.isDefined) { + val refreshedTableFiles = + client.getFiles( + table, + predicates = Nil, + limit = None, + versionAsOf = None, + timestampAsOf = None, + jsonPredicateHints = None, + refreshToken = tableFiles.refreshToken + ) + verifyTableFiles(refreshedTableFiles) + } + } finally { + client.close() + } + } } } } diff --git a/server/src/main/protobuf/protocol.proto b/server/src/main/protobuf/protocol.proto index d70361d27..70b1a49b1 100644 --- a/server/src/main/protobuf/protocol.proto +++ b/server/src/main/protobuf/protocol.proto @@ -82,6 +82,26 @@ message QueryTableRequest { // TODO: update this. The format of the response, supported formats: parquet, delta. optional bool queryDeltaLog = 8; + + // idempotency key for the query request, this is required for async query + optional string idempotency_key = 14; + + // The columns to be returned in the response for view query. + // If not specified, all columns will be returned. + repeated string requested_columns = 15; +} + +message GetQueryInfoRequest { + optional string query_id = 1; + + // The maximum number of files to return in one page. This is a hint for the server, + // and the server may not honor it. The server that supports pagination should return + // no more than this limit, but it can return fewer. If there are more files available, + // a nextPageToken will be returned to the user to retrieve the next page. + optional int32 max_files = 2; + + // The page token used to retrieve the subsequent page. + optional string page_token = 3; } message ListSharesResponse { @@ -108,6 +128,7 @@ message ListAllTablesResponse { optional string next_page_token = 2; } + // Define a special class to generate the page token for pagination. It includes the information we // need to know where we should start to query, and check whether the page token comes from the // right result. For example, we would like to throw an error when the user uses a page token diff --git a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala index 769273120..0b0d853e8 100644 --- a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala +++ b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala @@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory import scalapb.json4s.Printer import io.delta.sharing.server.config.ServerConfig -import io.delta.sharing.server.model.SingleAction +import io.delta.sharing.server.model.{QueryStatus, SingleAction} import io.delta.sharing.server.protocol._ import io.delta.sharing.server.util.JsonUtils @@ -325,6 +325,45 @@ class DeltaSharingService(serverConfig: ServerConfig) { streamingOutput(Some(queryResult.version), queryResult.responseFormat, queryResult.actions) } + + @Post("/shares/{share}/schemas/{schema}/tables/{table}/queries/{queryId}") + @ConsumesJson + def getQueryStatus( + req: HttpRequest, + @Param("share") share: String, + @Param("schema") schema: String, + @Param("table") table: String, + @Param("queryId") queryId: String, + request: GetQueryInfoRequest): HttpResponse = processRequest { + + // we are reusing the table here to simulate a view query result + val tableConfig = sharedTableManager.getTable(share, schema, table) + val capabilitiesMap = getDeltaSharingCapabilitiesMap( + req.headers().get(DELTA_SHARING_CAPABILITIES_HEADER)) + val responseFormatSet = getResponseFormatSet(capabilitiesMap) + val queryResult = deltaSharedTableLoader.loadTable(tableConfig).query( + includeFiles = true, + Seq.empty[String], + None, + None, + None, + None, + None, + None, + maxFiles = request.maxFiles, + pageToken = request.pageToken, + false, + None, + responseFormatSet = responseFormatSet) + if (queryResult.version < tableConfig.startVersion) { + throw new DeltaSharingIllegalArgumentException( + s"You can only query table data since version ${tableConfig.startVersion}." + ) + } + + streamingOutput(Some(queryResult.version), queryResult.responseFormat, queryResult.actions) + } + @Post("/shares/{share}/schemas/{schema}/tables/{table}/query") @ConsumesJson def listFiles( @@ -396,28 +435,47 @@ class DeltaSharingService(serverConfig: ServerConfig) { } } val responseFormatSet = getResponseFormatSet(capabilitiesMap) - val queryResult = deltaSharedTableLoader.loadTable(tableConfig).query( - includeFiles = true, - request.predicateHints, - request.jsonPredicateHints, - request.limitHint, - request.version, - request.timestamp, - request.startingVersion, - request.endingVersion, - request.maxFiles, - request.pageToken, - request.includeRefreshToken.getOrElse(false), - request.refreshToken, - responseFormatSet = responseFormatSet) - if (queryResult.version < tableConfig.startVersion) { - throw new DeltaSharingIllegalArgumentException( - s"You can only query table data since version ${tableConfig.startVersion}." - ) + + if(getAsyncQuery(capabilitiesMap)) { + if (!request.idempotencyKey.isDefined) { + throw new DeltaSharingIllegalArgumentException( + "idempotencyKey is required for async query." + ) + } + + val queryId = s"${share}_${schema}_${table}" + + streamingOutput( + Some(0), + "parquet", + Seq( + SingleAction(queryStatus = QueryStatus(queryId, "pending")) + )) + } + else { + val queryResult = deltaSharedTableLoader.loadTable(tableConfig).query( + includeFiles = true, + request.predicateHints, + request.jsonPredicateHints, + request.limitHint, + request.version, + request.timestamp, + request.startingVersion, + request.endingVersion, + request.maxFiles, + request.pageToken, + request.includeRefreshToken.getOrElse(false), + request.refreshToken, + responseFormatSet = responseFormatSet) + if (queryResult.version < tableConfig.startVersion) { + throw new DeltaSharingIllegalArgumentException( + s"You can only query table data since version ${tableConfig.startVersion}." + ) + } + logger.info(s"Took ${System.currentTimeMillis - start} ms to load the table " + + s"and sign ${queryResult.actions.length - 2} urls for table $share/$schema/$table") + streamingOutput(Some(queryResult.version), queryResult.responseFormat, queryResult.actions) } - logger.info(s"Took ${System.currentTimeMillis - start} ms to load the table " + - s"and sign ${queryResult.actions.length - 2} urls for table $share/$schema/$table") - streamingOutput(Some(queryResult.version), queryResult.responseFormat, queryResult.actions) } // scalastyle:off argcount @@ -503,6 +561,7 @@ object DeltaSharingService { val DELTA_TABLE_METADATA_CONTENT_TYPE = "application/x-ndjson; charset=utf-8" val DELTA_SHARING_CAPABILITIES_HEADER = "delta-sharing-capabilities" val DELTA_SHARING_RESPONSE_FORMAT = "responseformat" + val DELTA_SHARING_CAPABILITIES_ASYNC_QUERY = "asyncquery" private val parser = { val parser = ArgumentParsers @@ -625,7 +684,11 @@ object DeltaSharingService { private[server] def getResponseFormatSet(headerCapabilities: Map[String, String]): Set[String] = { headerCapabilities.get(DELTA_SHARING_RESPONSE_FORMAT).getOrElse( DeltaSharedTable.RESPONSE_FORMAT_PARQUET - ).split(",").toSet + ).split(";").toSet + } + + private[server] def getAsyncQuery(headerCapabilities: Map[String, String]): Boolean = { + headerCapabilities.get(DELTA_SHARING_CAPABILITIES_ASYNC_QUERY).exists(_.toBoolean) } def main(args: Array[String]): Unit = { diff --git a/server/src/main/scala/io/delta/sharing/server/model.scala b/server/src/main/scala/io/delta/sharing/server/model.scala index 4c58744de..b2540802a 100644 --- a/server/src/main/scala/io/delta/sharing/server/model.scala +++ b/server/src/main/scala/io/delta/sharing/server/model.scala @@ -26,6 +26,7 @@ case class SingleAction( remove: RemoveFile = null, metaData: Metadata = null, protocol: Protocol = null, + queryStatus: QueryStatus = null, endStreamAction: EndStreamAction = null) { def unwrap: Action = { @@ -43,6 +44,8 @@ case class SingleAction( protocol } else if (endStreamAction != null) { endStreamAction + } else if (queryStatus != null) { + queryStatus } else { null } @@ -64,6 +67,13 @@ case class Metadata( override def wrap: SingleAction = SingleAction(metaData = this) } +case class QueryStatus( + queryId: String = null, + status: String = null + ) extends Action { + override def wrap: SingleAction = SingleAction(queryStatus = this) +} + sealed trait Action { /** Turn this object to the [[SingleAction]] wrap object. */ def wrap: SingleAction diff --git a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala index 22b504099..ebc1c2c19 100644 --- a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala @@ -105,14 +105,18 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { } } - def readJson(url: String, expectedTableVersion: Option[Long] = None): String = { + def readJson( + url: String, + expectedTableVersion: Option[Long] = None, + asyncQuery: String = "false"): String = { readHttpContent( url, None, None, RESPONSE_FORMAT_PARQUET, expectedTableVersion, - "application/json; charset=utf-8" + "application/json; charset=utf-8", + asyncQuery = asyncQuery ) } @@ -121,30 +125,35 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { method: Option[String] = None, data: Option[String] = None, expectedTableVersion: Option[Long] = None, - responseFormat: String = RESPONSE_FORMAT_PARQUET): String = { + responseFormat: String = RESPONSE_FORMAT_PARQUET, + asyncQuery: String = "false"): String = { readHttpContent( url, method, data, responseFormat, expectedTableVersion, - "application/x-ndjson; charset=utf-8" + "application/x-ndjson; charset=utf-8", + asyncQuery = asyncQuery ) } - def readHttpContent( url: String, method: Option[String], data: Option[String] = None, responseFormat: String, expectedTableVersion: Option[Long] = None, - expectedContentType: String): String = { + expectedContentType: String, + asyncQuery: String = "true"): String = { val connection = new URL(url).openConnection().asInstanceOf[HttpsURLConnection] connection.setRequestProperty("Authorization", s"Bearer ${TestResource.testAuthorizationToken}") + var deltaSharingCapabilities = s"asyncquery=$asyncQuery" if (responseFormat == RESPONSE_FORMAT_DELTA) { - connection.setRequestProperty("delta-sharing-capabilities", "responseFormat=delta") + deltaSharingCapabilities += s";responseFormat=$responseFormat" } + connection.setRequestProperty("delta-sharing-capabilities", deltaSharingCapabilities) + method.foreach(connection.setRequestMethod) data.foreach { d => connection.setDoOutput(true) @@ -620,6 +629,39 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { } } + integrationTest("table1 async query") { + Seq(RESPONSE_FORMAT_PARQUET, RESPONSE_FORMAT_DELTA).foreach { responseFormat => + val response = readNDJson( + requestPath("/shares/share1/schemas/default/tables/table1/query"), + Some("POST"), + Some("""{"idempotency_key": "random id"}"""), + expectedTableVersion = None, + responseFormat, + asyncQuery = "true" + ) + + assert(JsonUtils.fromJson[SingleAction](response).queryStatus.status == "pending") + } + } + + integrationTest("table1 async query get status") { + Seq(RESPONSE_FORMAT_PARQUET, RESPONSE_FORMAT_DELTA).foreach { responseFormat => + val response = readNDJson( + requestPath("/shares/share1/schemas/default/tables/table1/queries/1234"), + method = Some("POST"), + Some("""{"maxFiles": 1}"""), + expectedTableVersion = None, + responseFormat, + asyncQuery = "true" + ) + + var lines = response.split("\n") + assert(lines.length == 4) + + print(s"response: $response") + } + } + integrationTest("table1 - non partitioned - paginated query") { Seq(RESPONSE_FORMAT_PARQUET, RESPONSE_FORMAT_DELTA).foreach { responseFormat => var response = readNDJson( From b6c9d9451d2769a6cd7095e4dd70b7d5e9e7f0b7 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Mon, 20 May 2024 16:44:16 -0700 Subject: [PATCH 02/23] address comments --- .../sharing/client/DeltaSharingClient.scala | 1 - .../client/DeltaSharingRestClientSuite.scala | 35 +++++++++---------- .../sharing/server/DeltaSharingService.scala | 15 ++++---- .../server/DeltaSharingServiceSuite.scala | 18 +++++++++- .../spark/TestDeltaSharingClient.scala | 5 ++- 5 files changed, 45 insertions(+), 29 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index 3dddb3f73..9daedd599 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -307,7 +307,6 @@ class DeltaSharingRestClient( table = s"${table.share}.${table.schema}.${table.name}" ) - if (respondedFormat == RESPONSE_FORMAT_DELTA) { return DeltaTableMetadata(version, lines = lines, respondedFormat = respondedFormat) } diff --git a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala index 4eb1d0680..71f2e97ba 100644 --- a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala @@ -330,7 +330,7 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { assert(tableFiles.files(0).expirationTimestamp > System.currentTimeMillis()) } - Seq(false, true).foreach { paginationEnabled => {} + Seq(false, true).foreach { paginationEnabled => { val client = new DeltaSharingRestClient( testProfileProvider, sslTrustAll = true, @@ -340,21 +340,22 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { enableAsyncQuery = true ) val table = Table(name = "table2", schema = "default", share = "share2") - val tableFiles = - client.getFiles( - table, - predicates = Nil, - limit = None, - versionAsOf = None, - timestampAsOf = None, - jsonPredicateHints = None, - refreshToken = None - ) - verifyTableFiles(tableFiles) - } + val tableFiles = + client.getFiles( + table, + predicates = Nil, + limit = None, + versionAsOf = None, + timestampAsOf = None, + jsonPredicateHints = None, + refreshToken = None + ) + verifyTableFiles(tableFiles) + } + } } - integrationTest("getFiles with Async") { + integrationTest("getFiles with sync api") { def verifyTableFiles(tableFiles: DeltaTableFiles): Unit = { assert(tableFiles.version == 2) assert(Protocol(minReaderVersion = 1) == tableFiles.protocol) @@ -401,15 +402,14 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { s"${RESPONSE_FORMAT_DELTA},${RESPONSE_FORMAT_PARQUET}", s"${RESPONSE_FORMAT_PARQUET},${RESPONSE_FORMAT_DELTA}" ).foreach { responseFormat => - Seq(false).foreach { paginationEnabled => - Seq(true).foreach { asyncEnabled => { + Seq(true, false).foreach { paginationEnabled => { val client = new DeltaSharingRestClient( testProfileProvider, sslTrustAll = true, queryTablePaginationEnabled = paginationEnabled, maxFilesPerReq = 1, responseFormat = responseFormat, - enableAsyncQuery = asyncEnabled + enableAsyncQuery = false ) val table = Table(name = "table2", schema = "default", share = "share2") try { @@ -442,7 +442,6 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { client.close() } } - } } } } diff --git a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala index 0b0d853e8..434cf712b 100644 --- a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala +++ b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala @@ -329,12 +329,12 @@ class DeltaSharingService(serverConfig: ServerConfig) { @Post("/shares/{share}/schemas/{schema}/tables/{table}/queries/{queryId}") @ConsumesJson def getQueryStatus( - req: HttpRequest, - @Param("share") share: String, - @Param("schema") schema: String, - @Param("table") table: String, - @Param("queryId") queryId: String, - request: GetQueryInfoRequest): HttpResponse = processRequest { + req: HttpRequest, + @Param("share") share: String, + @Param("schema") schema: String, + @Param("table") table: String, + @Param("queryId") queryId: String, + request: GetQueryInfoRequest): HttpResponse = processRequest { // we are reusing the table here to simulate a view query result val tableConfig = sharedTableManager.getTable(share, schema, table) @@ -451,8 +451,7 @@ class DeltaSharingService(serverConfig: ServerConfig) { Seq( SingleAction(queryStatus = QueryStatus(queryId, "pending")) )) - } - else { + } else { val queryResult = deltaSharedTableLoader.loadTable(tableConfig).query( includeFiles = true, request.predicateHints, diff --git a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala index ebc1c2c19..07e9358c8 100644 --- a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala @@ -629,6 +629,19 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { } } + integrationTest("table1 async query without idempotency_key") { + Seq(RESPONSE_FORMAT_PARQUET, RESPONSE_FORMAT_DELTA).foreach { responseFormat => + assertHttpError( + requestPath("/shares/share1/schemas/default/tables/table1/query"), + method = "POST", + data = Some("""{"maxFiles": 1}"""), + expectedErrorCode = 400, + expectedErrorMessage = "idempotency_key is required for async query", + headers = Map("delta-sharing-capabilities" -> s"asyncquery=true;responseFormat=$responseFormat") + ) + } + } + integrationTest("table1 async query") { Seq(RESPONSE_FORMAT_PARQUET, RESPONSE_FORMAT_DELTA).foreach { responseFormat => val response = readNDJson( @@ -3011,13 +3024,16 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { method: String, data: Option[String], expectedErrorCode: Int, - expectedErrorMessage: String): Unit = { + expectedErrorMessage: String, + headers: Map[String, String] = Map.empty[String, String]): Unit = { val connection = new URL(url).openConnection().asInstanceOf[HttpsURLConnection] connection.setRequestProperty("Authorization", s"Bearer ${TestResource.testAuthorizationToken}") connection.setRequestMethod(method) data.foreach { d => connection.setDoOutput(true) connection.setRequestProperty("Content-Type", "application/json; charset=utf8") + headers.foreach((item) => connection.setRequestProperty(item._1, item._2)) + val output = connection.getOutputStream() try { output.write(d.getBytes(UTF_8)) diff --git a/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala b/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala index bdf63fc2a..caf91c11a 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala @@ -46,7 +46,10 @@ class TestDeltaSharingClient( responseFormat: String = DeltaSharingOptions.RESPONSE_FORMAT_PARQUET, readerFeatures: String = "", queryTablePaginationEnabled: Boolean = false, - maxFilesPerReq: Int = 10000 + maxFilesPerReq: Int = 10000, + enableAsyncQuery: Boolean = false, + asyncQueryPollIntervalMillis: Long = 1000L, + asyncQueryMaxDuration: Long = Long.MaxValue ) extends DeltaSharingClient { import DeltaSharingOptions.RESPONSE_FORMAT_PARQUET From af923e44a3824097a259d1a82f86134bf00e8c89 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Mon, 20 May 2024 18:04:40 -0700 Subject: [PATCH 03/23] addressing comments --- .../delta/sharing/client/DeltaSharingRestClientDeltaSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientDeltaSuite.scala b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientDeltaSuite.scala index 2a4bb9fdf..7adc6088a 100644 --- a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientDeltaSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientDeltaSuite.scala @@ -93,7 +93,7 @@ class DeltaSharingRestClientDeltaSuite extends DeltaSharingIntegrationTest { } } - integrationTest("getFiles include async") { + integrationTest("getFiles") { def verifyTableFiles(tableFiles: DeltaTableFiles): Unit = { checkDeltaTableFilesBasics(tableFiles, expectedVersion = 2, expectedNumLines = 4, minWriterVersion = 2) assert(tableFiles.lines(1).contains("""{"deltaMetadata":{"id":"f8d5c169-3d01-4ca3-ad9e-7dc3355aedb2","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"eventTime\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["date"],"configuration":{},"createdTime":1619652806049""")) From 9df47e1231aaac1ca95a4d81bc01ca0e7aa23a07 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Tue, 21 May 2024 12:47:22 -0700 Subject: [PATCH 04/23] addressing comments. --- .../sharing/client/DeltaSharingClient.scala | 50 ++++++++++++++++--- .../client/DeltaSharingRestClientSuite.scala | 26 ++++++++++ .../sharing/server/DeltaSharingService.scala | 7 ++- .../sharing/spark/DeltaSharingSuite.scala | 12 +++++ 4 files changed, 88 insertions(+), 7 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index 9daedd599..75419f3e8 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -87,10 +87,18 @@ private[sharing] trait PaginationResponse { def nextPageToken: Option[String] } +/* +abstract the requirement into a trait so that we can +support pagination request in to get query results this +applies to both sync and async query. +*/ private[sharing] trait PaginationRequest { def maxFiles: Option[Int] def pageToken: Option[String] + // Clone the request object with new optional fields. + // this is used to generate a new request object when + // fetching next pages in paginated query. def clone( maxFiles: Option[Int], pageToken: Option[String], @@ -805,7 +813,7 @@ class DeltaSharingRestClient( target } - private def getQueryInfo( + private def getTableQueryInfo( table: Table, queryId: String, maxFiles: Option[Int], @@ -819,16 +827,40 @@ class DeltaSharingRestClient( getNDJson(target, request) } + /* + * Check if the query is still pending. The first line of the response will + * be a query status object if the query is still pending. + * The method return (queryResultLines, queryId, queryPending) tuple. If the queryPending + * is false it means the query is finished and the queryResultLines contains the result. + */ private def checkQueryPending(lines: Seq[String]): (Seq[String], Option[String], Boolean) = { val queryStatus = JsonUtils.fromJson[SingleAction](lines(0)).queryStatus if(queryStatus == null) { (lines, None, false) } else { - (lines.drop(1), Some(queryStatus.queryId), queryStatus.status == QUERY_PENDING_TRUE) + if (queryStatus.queryId == null) { + throw new IllegalStateException("QueryId is not returned in the response.") + } + + if( queryStatus.status != QUERY_PENDING_TRUE) { + throw new IllegalStateException("Unexpected query status: " + queryStatus.status) + } + + logInfo(s"Query is still pending. QueryId: ${queryStatus.queryId}") + (lines.drop(1), Some(queryStatus.queryId), true) } } + /* + * Get NDJson data with async query. + * This method is used when we get the result of table query + * If the query is async mode and still running, this method + * will keep polling the query id until the query is finished. + * and return the result back. + * If the query is in sync mode it will return the query result + * directly. + */ private def getNDJsonWithAsync( table: Table, target: String, @@ -849,12 +881,18 @@ class DeltaSharingRestClient( throw new IllegalStateException("Query is still pending after " + s"${asyncQueryMaxDuration} ms. Please try again later.") } - Thread.sleep(asyncQueryPollIntervalMillis) + + Thread.sleep(asyncQueryPollIntervalMillis) val queryId = queryIdOpt.get val (currentVersion, currentRespondedFormat, currentLines) - = getQueryInfo(table, queryId, request.maxFiles, request.pageToken) - val (newLines, _, newQueryPending) = checkQueryPending(currentLines) + = getTableQueryInfo(table, queryId, request.maxFiles, request.pageToken) + val (newLines, returnedQueryId, newQueryPending) = checkQueryPending(currentLines) + + if(newQueryPending && returnedQueryId.get != queryId) { + throw new IllegalStateException("QueryId is not consistent in the response. " + + s"Expected: $queryId, Actual: ${returnedQueryId.get}") + } version = currentVersion respondedFormat = currentRespondedFormat @@ -1058,7 +1096,7 @@ class DeltaSharingRestClient( capabilities = capabilities :+ s"$READER_FEATURES=$readerFeatures" } - if(enableAsyncQuery) { + if (enableAsyncQuery) { capabilities = capabilities :+ s"$DELTA_SHARING_CAPABILITIES_ASYNC_READ=true" } diff --git a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala index 71f2e97ba..ef05c18e4 100644 --- a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala @@ -293,6 +293,32 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { assert(errorMessage.contains("is before the earliest version available")) } + integrationTest("getFiles using async api error handling") { + val client = new DeltaSharingRestClient( + testProfileProvider, + sslTrustAll = true, + queryTablePaginationEnabled = true, + maxFilesPerReq = 1, + responseFormat = RESPONSE_FORMAT_PARQUET, + enableAsyncQuery = true + ) + val table = Table(name = "table1", schema = "default", share = "share1") + + var errorMessage = intercept[UnexpectedHttpStatus] { + client.getFiles( + table, + predicates = Nil, + limit = None, + versionAsOf = None, + timestampAsOf = None, + jsonPredicateHints = None, + refreshToken = None + ) + }.getMessage + + assert(errorMessage.contains("expected error")) + } + integrationTest("getFiles using async api") { def verifyTableFiles(tableFiles: DeltaTableFiles): Unit = { assert(tableFiles.version == 2) diff --git a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala index 434cf712b..dcf1995fa 100644 --- a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala +++ b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala @@ -336,6 +336,10 @@ class DeltaSharingService(serverConfig: ServerConfig) { @Param("queryId") queryId: String, request: GetQueryInfoRequest): HttpResponse = processRequest { + if (table == "table1") { + throw new DeltaSharingIllegalArgumentException("expected error") + } + // we are reusing the table here to simulate a view query result val tableConfig = sharedTableManager.getTable(share, schema, table) val capabilitiesMap = getDeltaSharingCapabilitiesMap( @@ -372,6 +376,7 @@ class DeltaSharingService(serverConfig: ServerConfig) { @Param("schema") schema: String, @Param("table") table: String, request: QueryTableRequest): HttpResponse = processRequest { + val capabilitiesMap = getDeltaSharingCapabilitiesMap( req.headers().get(DELTA_SHARING_CAPABILITIES_HEADER) ) @@ -439,7 +444,7 @@ class DeltaSharingService(serverConfig: ServerConfig) { if(getAsyncQuery(capabilitiesMap)) { if (!request.idempotencyKey.isDefined) { throw new DeltaSharingIllegalArgumentException( - "idempotencyKey is required for async query." + "idempotency_key is required for async query." ) } diff --git a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSuite.scala index 293856c0b..bce8d5c02 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSuite.scala @@ -469,6 +469,18 @@ class DeltaSharingSuite extends QueryTest with SharedSparkSession with DeltaShar ) } + integrationTest("async query") { + withSQLConf("spark.delta.sharing.network.useAsyncQuery" -> "true") { + for (azureTableName <- "table_wasb" :: "table_abfs" :: Nil) { + val tablePath = testProfileFile.getCanonicalPath + s"#share_azure.default.${azureTableName}" + checkAnswer( + spark.read.format("deltaSharing").load(tablePath), + Row("foo bar", "foo bar") :: Nil + ) + } + } + } + integrationTest("random access stream") { // Set maxConnections to 1 so that if we leak any connection, we will hang forever because any // further request won't be able to get a free connection from the pool. From 029403954e57e6dc075290993fe029d2df5d43b8 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Wed, 22 May 2024 11:53:35 -0700 Subject: [PATCH 05/23] address comments --- .../sharing/client/DeltaSharingClient.scala | 134 ++++++++---------- .../sharing/server/DeltaSharingService.scala | 7 +- 2 files changed, 60 insertions(+), 81 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index 75419f3e8..f1b583ea6 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -92,7 +92,7 @@ abstract the requirement into a trait so that we can support pagination request in to get query results this applies to both sync and async query. */ -private[sharing] trait PaginationRequest { +private[sharing] trait NextPageRequest { def maxFiles: Option[Int] def pageToken: Option[String] @@ -101,8 +101,7 @@ private[sharing] trait PaginationRequest { // fetching next pages in paginated query. def clone( maxFiles: Option[Int], - pageToken: Option[String], - unsetRefreshToken: Boolean = false): PaginationRequest + pageToken: Option[String]): NextPageRequest } private[sharing] case class QueryTableRequest( @@ -119,33 +118,26 @@ private[sharing] case class QueryTableRequest( refreshToken: Option[String], idempotencyKey: Option[String], requestedColumns: Seq[String] -) extends PaginationRequest { +) extends NextPageRequest { override def clone( maxFiles: Option[Int], - pageToken: Option[String], - unsetRefreshToken: Boolean = false): PaginationRequest = { - if (unsetRefreshToken) { + pageToken: Option[String]): NextPageRequest = { this.copy( maxFiles = maxFiles, pageToken = pageToken, refreshToken = None, includeRefreshToken = None) - } - else { - this.copy(maxFiles = maxFiles, pageToken = pageToken) - } } } -private[sharing] case class GetQueryInfoRequest( +private[sharing] case class GetQueryTableInfoRequest( queryId: String, maxFiles: Option[Int], pageToken: Option[String] - ) extends PaginationRequest { + ) extends NextPageRequest { override def clone( maxFiles: Option[Int], - pageToken: Option[String], - unsetRefreshToken: Boolean = false): PaginationRequest = { + pageToken: Option[String]): NextPageRequest = { this.copy(maxFiles = maxFiles, pageToken = pageToken) } } @@ -160,19 +152,19 @@ private[sharing] case class ListAllTablesResponse( /** A REST client to fetch Delta metadata from remote server. */ class DeltaSharingRestClient( - profileProvider: DeltaSharingProfileProvider, - timeoutInSeconds: Int = 120, - numRetries: Int = 10, - maxRetryDuration: Long = Long.MaxValue, - sslTrustAll: Boolean = false, - forStreaming: Boolean = false, - responseFormat: String = DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET, - readerFeatures: String = "", - queryTablePaginationEnabled: Boolean = false, - maxFilesPerReq: Int = 100000, - enableAsyncQuery: Boolean = false, - asyncQueryPollIntervalMillis: Long = 1000L, - asyncQueryMaxDuration: Long = Long.MaxValue + profileProvider: DeltaSharingProfileProvider, + timeoutInSeconds: Int = 120, + numRetries: Int = 10, + maxRetryDuration: Long = Long.MaxValue, + sslTrustAll: Boolean = false, + forStreaming: Boolean = false, + responseFormat: String = DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET, + readerFeatures: String = "", + queryTablePaginationEnabled: Boolean = false, + maxFilesPerReq: Int = 100000, + enableAsyncQuery: Boolean = false, + asyncQueryPollIntervalMillis: Long = 1000L, + asyncQueryMaxDuration: Long = Long.MaxValue ) extends DeltaSharingClient with Logging { import DeltaSharingRestClient._ @@ -373,25 +365,21 @@ class DeltaSharingRestClient( idempotencyKey = idempotencyKey, requestedColumns = Seq.empty[String] ) - val (version, respondedFormat, lines, refreshTokenOpt) = if (queryTablePaginationEnabled) { - logInfo( - s"Making paginated queryTable requests for table " + - s"${table.share}.${table.schema}.${table.name} with maxFiles=$maxFilesPerReq" - ) - getFilesByPage(table, target, request) - } else { - val (version, respondedFormat, lines, _) = getNDJsonWithAsync(table, target, request) - val (filteredLines, endStreamAction) = maybeExtractEndStreamAction(lines) - val refreshTokenOpt = endStreamAction.flatMap { e => - Option(e.refreshToken).flatMap { token => - if (token.isEmpty) None else Some(token) - } - } - if (includeRefreshToken && refreshTokenOpt.isEmpty) { - logWarning("includeRefreshToken=true but refresh token is not returned.") + + val updatedRequest = if (queryTablePaginationEnabled) { + request.copy( + maxFiles = Some(maxFilesPerReq)) + } else { + request } - (version, respondedFormat, filteredLines, refreshTokenOpt) - } + + logInfo( + s"Making paginated queryTable requests for table " + + s"${table.share}.${table.schema}.${table.name} with maxFiles=$maxFilesPerReq" + ) + + val (version, respondedFormat, lines, refreshTokenOpt) = + getFilesByPage(table, target, updatedRequest) checkRespondedFormat( respondedFormat, @@ -511,13 +499,8 @@ class DeltaSharingRestClient( val start = System.currentTimeMillis() var numPages = 1 - // Fetch first page - val updatedRequest = request.copy( - maxFiles = Some(maxFilesPerReq), - pageToken = request.pageToken) - val (version, respondedFormat, lines, queryIdOpt) - = getNDJsonWithAsync(table, targetUrl, updatedRequest) + = getNDJsonWithAsync(table, targetUrl, request) var (filteredLines, endStreamAction) = maybeExtractEndStreamAction(lines) if (endStreamAction.isEmpty) { logWarning("EndStreamAction is not returned in the response for paginated query.") @@ -544,7 +527,7 @@ class DeltaSharingRestClient( val (pagingRequest, nextPageUrl) = if (queryIdOpt.isDefined) { ( - GetQueryInfoRequest( + GetQueryTableInfoRequest( queryId = queryIdOpt.get, maxFiles = Some(maxFilesPerReq), pageToken = Some(endStreamAction.get.nextPageToken)), @@ -552,14 +535,9 @@ class DeltaSharingRestClient( ) } else { ( - updatedRequest.clone( + request.clone( maxFiles = Some(maxFilesPerReq), - pageToken = Some(endStreamAction.get.nextPageToken), - // Unset includeRefreshToken and refreshToken - // because they can only be used in - // the first page request. - unsetRefreshToken = true - ), + pageToken = Some(endStreamAction.get.nextPageToken)), targetUrl ) } @@ -712,13 +690,13 @@ class DeltaSharingRestClient( // (as original json string) with EndStreamAction. EndStreamAction might be null // if it's not returned in the response. private def fetchNextPageFiles( - targetUrl: String, - requestBody: Option[PaginationRequest], - expectedVersion: Long, - expectedRespondedFormat: String, - expectedProtocol: String, - expectedMetadata: String, - pageNumber: Int): (Seq[String], Option[EndStreamAction]) = { + targetUrl: String, + requestBody: Option[NextPageRequest], + expectedVersion: Long, + expectedRespondedFormat: String, + expectedProtocol: String, + expectedMetadata: String, + pageNumber: Int): (Seq[String], Option[EndStreamAction]) = { val start = System.currentTimeMillis() val (version, respondedFormat, lines) = if (requestBody.isDefined) { getNDJson(targetUrl, requestBody.get) @@ -814,12 +792,12 @@ class DeltaSharingRestClient( } private def getTableQueryInfo( - table: Table, - queryId: String, - maxFiles: Option[Int], - pageToken: Option[String]): (Long, String, Seq[String]) = { + table: Table, + queryId: String, + maxFiles: Option[Int], + pageToken: Option[String]): (Long, String, Seq[String]) = { val target: String = getQueryInfoTargetUrl(table, queryId) - val request: GetQueryInfoRequest = GetQueryInfoRequest( + val request: GetQueryTableInfoRequest = GetQueryTableInfoRequest( queryId = queryId, maxFiles = maxFiles, pageToken = pageToken) @@ -836,18 +814,18 @@ class DeltaSharingRestClient( private def checkQueryPending(lines: Seq[String]): (Seq[String], Option[String], Boolean) = { val queryStatus = JsonUtils.fromJson[SingleAction](lines(0)).queryStatus - if(queryStatus == null) { - (lines, None, false) - } else { + if (queryStatus == null) { + (lines, None, false) + } else { if (queryStatus.queryId == null) { throw new IllegalStateException("QueryId is not returned in the response.") } - if( queryStatus.status != QUERY_PENDING_TRUE) { + if (queryStatus.status != QUERY_PENDING_TRUE) { throw new IllegalStateException("Unexpected query status: " + queryStatus.status) } - logInfo(s"Query is still pending. QueryId: ${queryStatus.queryId}") + logInfo(s"Query is timed out. QueryId: ${queryStatus.queryId}") (lines.drop(1), Some(queryStatus.queryId), true) } } @@ -878,7 +856,7 @@ class DeltaSharingRestClient( // Loop while the query is pending while (queryPending) { if (System.currentTimeMillis() - startTime > asyncQueryMaxDuration) { - throw new IllegalStateException("Query is still pending after " + + throw new IllegalStateException("Query is timed out after " + s"${asyncQueryMaxDuration} ms. Please try again later.") } diff --git a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala index dcf1995fa..943971b8d 100644 --- a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala +++ b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala @@ -454,8 +454,9 @@ class DeltaSharingService(serverConfig: ServerConfig) { Some(0), "parquet", Seq( - SingleAction(queryStatus = QueryStatus(queryId, "pending")) - )) + SingleAction(queryStatus = QueryStatus(queryId, "pending")) + ) + ) } else { val queryResult = deltaSharedTableLoader.loadTable(tableConfig).query( includeFiles = true, @@ -688,7 +689,7 @@ object DeltaSharingService { private[server] def getResponseFormatSet(headerCapabilities: Map[String, String]): Set[String] = { headerCapabilities.get(DELTA_SHARING_RESPONSE_FORMAT).getOrElse( DeltaSharedTable.RESPONSE_FORMAT_PARQUET - ).split(";").toSet + ).split(",").toSet } private[server] def getAsyncQuery(headerCapabilities: Map[String, String]): Boolean = { From d33fd6c40c8466bf92a657cbcba0ef8026eef576 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Thu, 23 May 2024 14:51:18 -0700 Subject: [PATCH 06/23] address comments --- .../sharing/client/DeltaSharingClient.scala | 16 +-- .../scala/io/delta/sharing/client/model.scala | 3 +- .../client/DeltaSharingRestClientSuite.scala | 62 +++++----- .../sharing/server/DeltaSharingService.scala | 113 ++++++++++-------- .../scala/io/delta/sharing/server/model.scala | 3 +- .../server/DeltaSharingServiceSuite.scala | 2 +- 6 files changed, 105 insertions(+), 94 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index f1b583ea6..0516eb321 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -21,7 +21,7 @@ import java.net.{URL, URLEncoder} import java.nio.charset.StandardCharsets.UTF_8 import java.sql.Timestamp import java.time.LocalDateTime -import java.time.format.DateTimeFormatter.ISO_DATE_TIME +import java.time.format.DateTimeFormatter.{ISO_DATE, ISO_DATE_TIME} import java.util.UUID import scala.collection.mutable.{ArrayBuffer, ListBuffer} @@ -499,8 +499,13 @@ class DeltaSharingRestClient( val start = System.currentTimeMillis() var numPages = 1 - val (version, respondedFormat, lines, queryIdOpt) - = getNDJsonWithAsync(table, targetUrl, request) + val (version, respondedFormat, lines, queryIdOpt) = if (enableAsyncQuery) { + getNDJsonWithAsync(table, targetUrl, request) + } else { + val (version, respondedFormat, lines) = getNDJson(targetUrl, request) + (version, respondedFormat, lines, None) + } + var (filteredLines, endStreamAction) = maybeExtractEndStreamAction(lines) if (endStreamAction.isEmpty) { logWarning("EndStreamAction is not returned in the response for paginated query.") @@ -821,11 +826,6 @@ class DeltaSharingRestClient( throw new IllegalStateException("QueryId is not returned in the response.") } - if (queryStatus.status != QUERY_PENDING_TRUE) { - throw new IllegalStateException("Unexpected query status: " + queryStatus.status) - } - - logInfo(s"Query is timed out. QueryId: ${queryStatus.queryId}") (lines.drop(1), Some(queryStatus.queryId), true) } } diff --git a/client/src/main/scala/io/delta/sharing/client/model.scala b/client/src/main/scala/io/delta/sharing/client/model.scala index d60e40d7f..11c661bab 100644 --- a/client/src/main/scala/io/delta/sharing/client/model.scala +++ b/client/src/main/scala/io/delta/sharing/client/model.scala @@ -95,8 +95,7 @@ private[sharing] case class SingleAction( } case class QueryStatus( - queryId: String = null, - status: String = null + queryId: String = null ) extends Action { override def wrap: SingleAction = SingleAction(queryStatus = this) } diff --git a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala index ef05c18e4..ec5f356a3 100644 --- a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala @@ -302,9 +302,9 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { responseFormat = RESPONSE_FORMAT_PARQUET, enableAsyncQuery = true ) - val table = Table(name = "table1", schema = "default", share = "share1") + val table = Table(name = "tableWithAsyncQueryError", schema = "default", share = "share1") - var errorMessage = intercept[UnexpectedHttpStatus] { + val errorMessage = intercept[UnexpectedHttpStatus] { client.getFiles( table, predicates = Nil, @@ -429,17 +429,30 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { s"${RESPONSE_FORMAT_PARQUET},${RESPONSE_FORMAT_DELTA}" ).foreach { responseFormat => Seq(true, false).foreach { paginationEnabled => { - val client = new DeltaSharingRestClient( - testProfileProvider, - sslTrustAll = true, - queryTablePaginationEnabled = paginationEnabled, - maxFilesPerReq = 1, - responseFormat = responseFormat, - enableAsyncQuery = false - ) - val table = Table(name = "table2", schema = "default", share = "share2") - try { - val tableFiles = + val client = new DeltaSharingRestClient( + testProfileProvider, + sslTrustAll = true, + queryTablePaginationEnabled = paginationEnabled, + maxFilesPerReq = 1, + responseFormat = responseFormat, + enableAsyncQuery = false + ) + val table = Table(name = "table2", schema = "default", share = "share2") + try { + val tableFiles = + client.getFiles( + table, + predicates = Nil, + limit = None, + versionAsOf = None, + timestampAsOf = None, + jsonPredicateHints = None, + refreshToken = None + ) + verifyTableFiles(tableFiles) + + if (tableFiles.refreshToken.isDefined) { + val refreshedTableFiles = client.getFiles( table, predicates = Nil, @@ -447,28 +460,15 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { versionAsOf = None, timestampAsOf = None, jsonPredicateHints = None, - refreshToken = None + refreshToken = tableFiles.refreshToken ) - verifyTableFiles(tableFiles) - - if(tableFiles.refreshToken.isDefined) { - val refreshedTableFiles = - client.getFiles( - table, - predicates = Nil, - limit = None, - versionAsOf = None, - timestampAsOf = None, - jsonPredicateHints = None, - refreshToken = tableFiles.refreshToken - ) - verifyTableFiles(refreshedTableFiles) - } - } finally { - client.close() + verifyTableFiles(refreshedTableFiles) } + } finally { + client.close() } } + } } } diff --git a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala index 943971b8d..6c8985625 100644 --- a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala +++ b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala @@ -166,6 +166,8 @@ class DeltaSharingServiceExceptionHandler extends ExceptionHandlerFunction { class DeltaSharingService(serverConfig: ServerConfig) { import DeltaSharingService._ + private val rand = new scala.util.Random() + private val sharedTableManager = new SharedTableManager(serverConfig) private val deltaSharedTableLoader = new DeltaSharedTableLoader(serverConfig) @@ -336,36 +338,47 @@ class DeltaSharingService(serverConfig: ServerConfig) { @Param("queryId") queryId: String, request: GetQueryInfoRequest): HttpResponse = processRequest { - if (table == "table1") { + if (table == "tableWithAsyncQueryError") { throw new DeltaSharingIllegalArgumentException("expected error") } - // we are reusing the table here to simulate a view query result - val tableConfig = sharedTableManager.getTable(share, schema, table) - val capabilitiesMap = getDeltaSharingCapabilitiesMap( - req.headers().get(DELTA_SHARING_CAPABILITIES_HEADER)) - val responseFormatSet = getResponseFormatSet(capabilitiesMap) - val queryResult = deltaSharedTableLoader.loadTable(tableConfig).query( - includeFiles = true, - Seq.empty[String], - None, - None, - None, - None, - None, - None, - maxFiles = request.maxFiles, - pageToken = request.pageToken, - false, - None, - responseFormatSet = responseFormatSet) - if (queryResult.version < tableConfig.startVersion) { - throw new DeltaSharingIllegalArgumentException( - s"You can only query table data since version ${tableConfig.startVersion}." - ) - } + if(rand.nextInt(100) > 50) { + streamingOutput( + Some(0), + "parquet", + Seq( + SingleAction(queryStatus = QueryStatus(queryId)) + ) + ) + } else { - streamingOutput(Some(queryResult.version), queryResult.responseFormat, queryResult.actions) + // we are reusing the table here to simulate a view query result + val tableConfig = sharedTableManager.getTable(share, schema, table) + val capabilitiesMap = getDeltaSharingCapabilitiesMap( + req.headers().get(DELTA_SHARING_CAPABILITIES_HEADER)) + val responseFormatSet = getResponseFormatSet(capabilitiesMap) + val queryResult = deltaSharedTableLoader.loadTable(tableConfig).query( + includeFiles = true, + Seq.empty[String], + None, + None, + None, + None, + None, + None, + maxFiles = request.maxFiles, + pageToken = request.pageToken, + false, + None, + responseFormatSet = responseFormatSet) + if (queryResult.version < tableConfig.startVersion) { + throw new DeltaSharingIllegalArgumentException( + s"You can only query table data since version ${tableConfig.startVersion}." + ) + } + + streamingOutput(Some(queryResult.version), queryResult.responseFormat, queryResult.actions) + } } @Post("/shares/{share}/schemas/{schema}/tables/{table}/query") @@ -376,7 +389,6 @@ class DeltaSharingService(serverConfig: ServerConfig) { @Param("schema") schema: String, @Param("table") table: String, request: QueryTableRequest): HttpResponse = processRequest { - val capabilitiesMap = getDeltaSharingCapabilitiesMap( req.headers().get(DELTA_SHARING_CAPABILITIES_HEADER) ) @@ -418,28 +430,6 @@ class DeltaSharingService(serverConfig: ServerConfig) { } val start = System.currentTimeMillis - val tableConfig = sharedTableManager.getTable(share, schema, table) - if (numVersionParams > 0) { - if (!tableConfig.historyShared) { - throw new DeltaSharingIllegalArgumentException("Reading table by version or timestamp is" + - " not supported because history sharing is not enabled on table: " + - s"$share.$schema.$table") - } - if (request.version.exists(_ < tableConfig.startVersion) || - request.startingVersion.exists(_ < tableConfig.startVersion)) { - throw new DeltaSharingIllegalArgumentException( - s"You can only query table data since version ${tableConfig.startVersion}." - ) - } - if (request.endingVersion.isDefined && - request.startingVersion.exists(_ > request.endingVersion.get)) { - throw new DeltaSharingIllegalArgumentException( - s"startingVersion(${request.startingVersion.get}) must be smaller than or equal to " + - s"endingVersion(${request.endingVersion.get})." - ) - } - } - val responseFormatSet = getResponseFormatSet(capabilitiesMap) if(getAsyncQuery(capabilitiesMap)) { if (!request.idempotencyKey.isDefined) { @@ -454,10 +444,33 @@ class DeltaSharingService(serverConfig: ServerConfig) { Some(0), "parquet", Seq( - SingleAction(queryStatus = QueryStatus(queryId, "pending")) + SingleAction(queryStatus = QueryStatus(queryId)) ) ) } else { + val tableConfig = sharedTableManager.getTable(share, schema, table) + if (numVersionParams > 0) { + if (!tableConfig.historyShared) { + throw new DeltaSharingIllegalArgumentException( + "Reading table by version or " + + "timestamp is not supported because history sharing is not enabled on table: " + + s"$share.$schema.$table") + } + if (request.version.exists(_ < tableConfig.startVersion) || + request.startingVersion.exists(_ < tableConfig.startVersion)) { + throw new DeltaSharingIllegalArgumentException( + s"You can only query table data since version ${tableConfig.startVersion}." + ) + } + if (request.endingVersion.isDefined && + request.startingVersion.exists(_ > request.endingVersion.get)) { + throw new DeltaSharingIllegalArgumentException( + s"startingVersion(${request.startingVersion.get}) must be smaller than or equal to " + + s"endingVersion(${request.endingVersion.get})." + ) + } + } + val responseFormatSet = getResponseFormatSet(capabilitiesMap) val queryResult = deltaSharedTableLoader.loadTable(tableConfig).query( includeFiles = true, request.predicateHints, diff --git a/server/src/main/scala/io/delta/sharing/server/model.scala b/server/src/main/scala/io/delta/sharing/server/model.scala index b2540802a..1886b2679 100644 --- a/server/src/main/scala/io/delta/sharing/server/model.scala +++ b/server/src/main/scala/io/delta/sharing/server/model.scala @@ -68,8 +68,7 @@ case class Metadata( } case class QueryStatus( - queryId: String = null, - status: String = null + queryId: String = null ) extends Action { override def wrap: SingleAction = SingleAction(queryStatus = this) } diff --git a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala index 07e9358c8..73502219f 100644 --- a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala @@ -653,7 +653,7 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { asyncQuery = "true" ) - assert(JsonUtils.fromJson[SingleAction](response).queryStatus.status == "pending") + assert(JsonUtils.fromJson[SingleAction](response).queryStatus != null) } } From f7e842d0a7a73013594f260cc075d8bf125e3317 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Thu, 23 May 2024 14:53:51 -0700 Subject: [PATCH 07/23] address comments --- server/src/main/protobuf/protocol.proto | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/protobuf/protocol.proto b/server/src/main/protobuf/protocol.proto index 70b1a49b1..c72bd6fc7 100644 --- a/server/src/main/protobuf/protocol.proto +++ b/server/src/main/protobuf/protocol.proto @@ -128,7 +128,6 @@ message ListAllTablesResponse { optional string next_page_token = 2; } - // Define a special class to generate the page token for pagination. It includes the information we // need to know where we should start to query, and check whether the page token comes from the // right result. For example, we would like to throw an error when the user uses a page token From 763559ba46b1f84ba959a542f9cf7c3e8c5c37aa Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Thu, 23 May 2024 15:01:46 -0700 Subject: [PATCH 08/23] update comments --- .../io/delta/sharing/client/DeltaSharingClient.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index 0516eb321..b6b6a9664 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -87,10 +87,10 @@ private[sharing] trait PaginationResponse { def nextPageToken: Option[String] } -/* -abstract the requirement into a trait so that we can -support pagination request in to get query results this -applies to both sync and async query. +/** + * A trait to represent the request object for + * fetching next page in paginated query. + * This can be use in both sync and async query. */ private[sharing] trait NextPageRequest { def maxFiles: Option[Int] From 603aa8f2a1b7a1fdc04f722838d69bfd0fdaf8f5 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Thu, 23 May 2024 15:04:04 -0700 Subject: [PATCH 09/23] update indent --- .../delta/sharing/client/DeltaSharingClient.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index b6b6a9664..f2e8db362 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -695,13 +695,13 @@ class DeltaSharingRestClient( // (as original json string) with EndStreamAction. EndStreamAction might be null // if it's not returned in the response. private def fetchNextPageFiles( - targetUrl: String, - requestBody: Option[NextPageRequest], - expectedVersion: Long, - expectedRespondedFormat: String, - expectedProtocol: String, - expectedMetadata: String, - pageNumber: Int): (Seq[String], Option[EndStreamAction]) = { + targetUrl: String, + requestBody: Option[NextPageRequest], + expectedVersion: Long, + expectedRespondedFormat: String, + expectedProtocol: String, + expectedMetadata: String, + pageNumber: Int): (Seq[String], Option[EndStreamAction]) = { val start = System.currentTimeMillis() val (version, respondedFormat, lines) = if (requestBody.isDefined) { getNDJson(targetUrl, requestBody.get) From 6f4e7e09dd64622001464251b6c2a4dc8f479e25 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Fri, 24 May 2024 14:54:28 -0700 Subject: [PATCH 10/23] address comments --- .../scala/io/delta/sharing/client/DeltaSharingClient.scala | 5 ----- .../main/scala/io/delta/sharing/client/util/ConfUtils.scala | 3 --- 2 files changed, 8 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index f2e8db362..2cfcaff99 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -373,11 +373,6 @@ class DeltaSharingRestClient( request } - logInfo( - s"Making paginated queryTable requests for table " + - s"${table.share}.${table.schema}.${table.name} with maxFiles=$maxFilesPerReq" - ) - val (version, respondedFormat, lines, refreshTokenOpt) = getFilesByPage(table, target, updatedRequest) diff --git a/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala b/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala index af94f9ded..f505e864e 100644 --- a/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala +++ b/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala @@ -33,9 +33,6 @@ object ConfUtils { val ASYNC_QUERY_POLL_INTERVAL_CONF = "spark.delta.sharing.network.asyncQueryRetryInterval" val ASYNC_QUERY_POLL_INTERVAL_DEFAULT_MILLIS = 10L * 1000L /* 10 seconds */ - val ASYNC_QUERY_MAX_RETRY_COUNT_CONF = "spark.delta.sharing.network.asyncQueryMaxRetryCount" - val ASYNC_QUERY_MAX_RETRY_COUNT_DEFAULT = 10 - val ASYNC_QUERY_TIMEOUT_CONF = "spark.delta.sharing.network.asyncQueryTimeout" val ASYNC_QUERY_TIMEOUT_DEFAULT_MILLIS = 10L * 60L * 1000L /* 10 mins */ From ac678441993955a0939a6064cc886fe2ea04be2c Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Tue, 28 May 2024 10:36:44 -0700 Subject: [PATCH 11/23] address comments --- .../delta/sharing/client/DeltaSharingClient.scala | 9 +++++---- .../delta/sharing/server/DeltaSharingService.scala | 14 ++++++++------ 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index 2cfcaff99..1e0649871 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -116,8 +116,7 @@ private[sharing] case class QueryTableRequest( pageToken: Option[String], includeRefreshToken: Option[Boolean], refreshToken: Option[String], - idempotencyKey: Option[String], - requestedColumns: Seq[String] + idempotencyKey: Option[String] ) extends NextPageRequest { override def clone( maxFiles: Option[Int], @@ -818,7 +817,8 @@ class DeltaSharingRestClient( (lines, None, false) } else { if (queryStatus.queryId == null) { - throw new IllegalStateException("QueryId is not returned in the response.") + throw new IllegalStateException( + "QueryId is not returned in the first line of the response." + lines(0)) } (lines.drop(1), Some(queryStatus.queryId), true) @@ -855,9 +855,10 @@ class DeltaSharingRestClient( s"${asyncQueryMaxDuration} ms. Please try again later.") } + val queryId = queryIdOpt.get + logInfo(s"Query is still pending. Polling queryId: ${queryId}") Thread.sleep(asyncQueryPollIntervalMillis) - val queryId = queryIdOpt.get val (currentVersion, currentRespondedFormat, currentLines) = getTableQueryInfo(table, queryId, request.maxFiles, request.pageToken) val (newLines, returnedQueryId, newQueryPending) = checkQueryPending(currentLines) diff --git a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala index 6c8985625..de4ee13ce 100644 --- a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala +++ b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala @@ -342,6 +342,8 @@ class DeltaSharingService(serverConfig: ServerConfig) { throw new DeltaSharingIllegalArgumentException("expected error") } + // simulate async query with 50% chance of return + // asynchronously client should be able to hand both cases if(rand.nextInt(100) > 50) { streamingOutput( Some(0), @@ -429,15 +431,15 @@ class DeltaSharingService(serverConfig: ServerConfig) { ) } + if(getAsyncQuery(capabilitiesMap) && !request.idempotencyKey.isDefined) { + throw new DeltaSharingIllegalArgumentException( + "idempotency_key is required for async query." + ) + } + val start = System.currentTimeMillis if(getAsyncQuery(capabilitiesMap)) { - if (!request.idempotencyKey.isDefined) { - throw new DeltaSharingIllegalArgumentException( - "idempotency_key is required for async query." - ) - } - val queryId = s"${share}_${schema}_${table}" streamingOutput( From 9c564c7377fefc6354e89680da900c864ef0e646 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Tue, 28 May 2024 10:53:19 -0700 Subject: [PATCH 12/23] fix build issue --- .../scala/io/delta/sharing/client/DeltaSharingClient.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index 1e0649871..215e92d8d 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -361,8 +361,7 @@ class DeltaSharingRestClient( pageToken = None, includeRefreshToken = Some(includeRefreshToken), refreshToken = refreshToken, - idempotencyKey = idempotencyKey, - requestedColumns = Seq.empty[String] + idempotencyKey = idempotencyKey ) val updatedRequest = if (queryTablePaginationEnabled) { @@ -434,8 +433,7 @@ class DeltaSharingRestClient( pageToken = None, includeRefreshToken = None, refreshToken = None, - idempotencyKey = None, - requestedColumns = Seq.empty[String] + idempotencyKey = None ) val (version, respondedFormat, lines) = if (queryTablePaginationEnabled) { logInfo( From ccf90e3ef205ccde40021781735bf9bd74d71c03 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Tue, 28 May 2024 13:52:41 -0700 Subject: [PATCH 13/23] fixing errors in integration testing --- .../io/delta/sharing/client/DeltaSharingClient.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index 215e92d8d..0b53f60b2 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -116,7 +116,7 @@ private[sharing] case class QueryTableRequest( pageToken: Option[String], includeRefreshToken: Option[Boolean], refreshToken: Option[String], - idempotencyKey: Option[String] + idempotency_key: Option[String] ) extends NextPageRequest { override def clone( maxFiles: Option[Int], @@ -166,6 +166,8 @@ class DeltaSharingRestClient( asyncQueryMaxDuration: Long = Long.MaxValue ) extends DeltaSharingClient with Logging { + logError(s"DeltaSharingRestClient with enableAsyncQuery $enableAsyncQuery") + import DeltaSharingRestClient._ @volatile private var created = false @@ -343,7 +345,7 @@ class DeltaSharingRestClient( val target = getTargetUrl( s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/$encodedTableName/query") - val idempotencyKey = if (enableAsyncQuery) { + val idempotency_key = if (enableAsyncQuery) { Some(UUID.randomUUID().toString) } else { None @@ -361,7 +363,7 @@ class DeltaSharingRestClient( pageToken = None, includeRefreshToken = Some(includeRefreshToken), refreshToken = refreshToken, - idempotencyKey = idempotencyKey + idempotency_key = idempotency_key ) val updatedRequest = if (queryTablePaginationEnabled) { @@ -433,7 +435,7 @@ class DeltaSharingRestClient( pageToken = None, includeRefreshToken = None, refreshToken = None, - idempotencyKey = None + idempotency_key = None ) val (version, respondedFormat, lines) = if (queryTablePaginationEnabled) { logInfo( From b26764ba868fedf272d37e53a9465766c1047a31 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Tue, 28 May 2024 14:09:54 -0700 Subject: [PATCH 14/23] address comments --- .../main/scala/io/delta/sharing/client/DeltaSharingClient.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index 0b53f60b2..c1c3d2721 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -162,7 +162,7 @@ class DeltaSharingRestClient( queryTablePaginationEnabled: Boolean = false, maxFilesPerReq: Int = 100000, enableAsyncQuery: Boolean = false, - asyncQueryPollIntervalMillis: Long = 1000L, + asyncQueryPollIntervalMillis: Long = 10000L, asyncQueryMaxDuration: Long = Long.MaxValue ) extends DeltaSharingClient with Logging { From 5129e2214fa7847314cf2e8c82899fbba6c8a758 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Tue, 28 May 2024 17:38:27 -0700 Subject: [PATCH 15/23] address comments --- .../main/scala/io/delta/sharing/client/DeltaSharingClient.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index c1c3d2721..d92f35aa3 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -163,7 +163,7 @@ class DeltaSharingRestClient( maxFilesPerReq: Int = 100000, enableAsyncQuery: Boolean = false, asyncQueryPollIntervalMillis: Long = 10000L, - asyncQueryMaxDuration: Long = Long.MaxValue + asyncQueryMaxDuration: Long = 600000L ) extends DeltaSharingClient with Logging { logError(s"DeltaSharingRestClient with enableAsyncQuery $enableAsyncQuery") From 0d460e64ec878e578a52dfa0be217335e6cdbffb Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Wed, 29 May 2024 08:53:13 -0700 Subject: [PATCH 16/23] addressing comments --- .../main/scala/io/delta/sharing/server/DeltaSharingService.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala index de4ee13ce..0cd5ad1d1 100644 --- a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala +++ b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala @@ -344,6 +344,7 @@ class DeltaSharingService(serverConfig: ServerConfig) { // simulate async query with 50% chance of return // asynchronously client should be able to hand both cases + // this is for test purpose only and shouldn't cause flakiness if(rand.nextInt(100) > 50) { streamingOutput( Some(0), From 1cd7bb848843933b9132682aac21e21dd8f69604 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Wed, 29 May 2024 10:25:04 -0700 Subject: [PATCH 17/23] fix indentation --- .../scala/io/delta/sharing/client/DeltaSharingClient.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index d92f35aa3..e84d2825c 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -835,9 +835,9 @@ class DeltaSharingRestClient( * directly. */ private def getNDJsonWithAsync( - table: Table, - target: String, - request: QueryTableRequest): (Long, String, Seq[String], Option[String]) = { + table: Table, + target: String, + request: QueryTableRequest): (Long, String, Seq[String], Option[String]) = { // Initial query to get NDJson data val (initialVersion, initialRespondedFormat, initialLines) = getNDJson(target, request) From 585e4879b11c6f9961133d539d03d3a5c6df1042 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Wed, 29 May 2024 16:33:49 -0700 Subject: [PATCH 18/23] Update Python connector version to 1.1.0 --- python/delta_sharing/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/delta_sharing/version.py b/python/delta_sharing/version.py index ae6cc89d2..f5d25472e 100644 --- a/python/delta_sharing/version.py +++ b/python/delta_sharing/version.py @@ -14,4 +14,4 @@ # limitations under the License. # -__version__ = "1.0.0" +__version__ = "1.1.0" From 2760a82dafb2060ac26139eecd1fe3b7a767b898 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Wed, 29 May 2024 16:36:01 -0700 Subject: [PATCH 19/23] Update Python connector version to 1.1.0 From db823aa557a6e0527c6daf9ebc1a63e23ef68847 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Wed, 29 May 2024 16:36:31 -0700 Subject: [PATCH 20/23] Setting version to 1.1.0 --- version.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.sbt b/version.sbt index 326584492..48af491ec 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "1.0.0" +version in ThisBuild := "1.1.0" From cb248f611d6816a0b186d5afcd6e515dc86bb231 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Wed, 29 May 2024 16:38:53 -0700 Subject: [PATCH 21/23] Update Python connector version to 1.1.0 From 0eb5981ddf17e7dcd9612b28b25c8c7362621d49 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Wed, 29 May 2024 16:46:22 -0700 Subject: [PATCH 22/23] Update Python connector version to 1.1.0 From 58f06e1e9074aa54f2aeb482c74dced22b3158c6 Mon Sep 17 00:00:00 2001 From: Jade Wang Date: Fri, 31 May 2024 13:23:36 -0700 Subject: [PATCH 23/23] Update Python connector version to 1.1.0