Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug closing thread in server for a delta sharing query #580

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,12 @@ class DeltaSharingRestClient(
httpRequest
}

import org.apache.http.HttpEntity
private def getEntityDebugStr(entity: HttpEntity): String = {
s"isRepeatable:${entity.isRepeatable},isChunked:${entity.isChunked}," +
s"getContentLength:${entity.getContentLength},isStreaming:${entity.isStreaming}"
}

/**
* Send the http request and return the table version in the header if any, and the response
* content.
Expand All @@ -981,12 +987,18 @@ class DeltaSharingRestClient(
// Reset queryId before calling RetryUtils, and before prepareHeaders.
queryId = Some(UUID.randomUUID().toString().split('-').head)
RetryUtils.runWithExponentialBackoff(numRetries, maxRetryDuration) {
val startTime = System.currentTimeMillis()
val profile = profileProvider.getProfile
val response = client.execute(
getHttpHost(profile.endpoint),
prepareHeaders(httpRequest),
HttpClientContext.create()
)

def elapsedTime: Long = {
System.currentTimeMillis() - startTime
}

try {
val status = response.getStatusLine()
val entity = response.getEntity()
Expand All @@ -1003,10 +1015,18 @@ class DeltaSharingRestClient(
new InputStreamReader(new BoundedInputStream(input), UTF_8)
)
var line: Option[String] = None

// scalastyle:off println
Console.println(s"----[linzhou]----before while:" +
s"${(System.currentTimeMillis() - startTime)}ms, ${getEntityDebugStr(entity)}")
while ({
line = Option(reader.readLine()); line.isDefined
}) {
lineBuffer += line.get
val a = line.get
Console.println(s"----[linzhou]----in while: ($elapsedTime)ms, newLine:[$a]")
Console.println(s"----[linzhou]----in while: ($elapsedTime)ms, " +
s"debug:[${getEntityDebugStr(entity)}]")
lineBuffer += a
}
lineBuffer.toList
}
Expand All @@ -1016,10 +1036,20 @@ class DeltaSharingRestClient(
logError(error)
lineBuffer += error
lineBuffer.toList
case otherE: Exception =>
val error = s"Request to delta sharing server failed due tooo ${otherE}."
logError(error)
throw otherE
} finally {
Console.println(s"----[linzhou]----in finally: ($elapsedTime)ms, " +
s"status:${response.getStatusLine}")
input.close()
}
}
Console.println(s"----[linzhou]----after: ($elapsedTime)ms, " +
s"status:${response.getStatusLine}")
Console.println(s"----[linzhou]----after: ($elapsedTime)ms, " +
s"entity:${getEntityDebugStr(response.getEntity)}")

val statusCode = status.getStatusCode
if (!(statusCode == HttpStatus.SC_OK ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,11 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
// The response is always in parquet format, because the client allows parquet and the
// table is a basic table.
Seq(
RESPONSE_FORMAT_PARQUET,
s"${RESPONSE_FORMAT_DELTA},${RESPONSE_FORMAT_PARQUET}",
s"${RESPONSE_FORMAT_PARQUET},${RESPONSE_FORMAT_DELTA}"
RESPONSE_FORMAT_PARQUET
// s"${RESPONSE_FORMAT_DELTA},${RESPONSE_FORMAT_PARQUET}",
// s"${RESPONSE_FORMAT_PARQUET},${RESPONSE_FORMAT_DELTA}"
).foreach { responseFormat =>
Seq(true, false).foreach { paginationEnabled => {
Seq(false).foreach { paginationEnabled => {
val client = new DeltaSharingRestClient(
testProfileProvider,
sslTrustAll = true,
Expand All @@ -455,19 +455,19 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
)
verifyTableFiles(tableFiles)

if (tableFiles.refreshToken.isDefined) {
val refreshedTableFiles =
client.getFiles(
table,
predicates = Nil,
limit = None,
versionAsOf = None,
timestampAsOf = None,
jsonPredicateHints = None,
refreshToken = tableFiles.refreshToken
)
verifyTableFiles(refreshedTableFiles)
}
// if (tableFiles.refreshToken.isDefined) {
// val refreshedTableFiles =
// client.getFiles(
// table,
// predicates = Nil,
// limit = None,
// versionAsOf = None,
// timestampAsOf = None,
// jsonPredicateHints = None,
// refreshToken = tableFiles.refreshToken
// )
// verifyTableFiles(refreshedTableFiles)
// }
} finally {
client.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import javax.annotation.Nullable
import scala.collection.JavaConverters._
import scala.util.Try

import com.linecorp.armeria.common.{HttpData, HttpHeaderNames, HttpHeaders, HttpMethod, HttpRequest, HttpResponse, HttpStatus, MediaType, ResponseHeaders, ResponseHeadersBuilder}
import com.linecorp.armeria.common.{HttpData, HttpHeaderNames, HttpHeaders, HttpMethod, HttpRequest, HttpResponse, HttpResponseWriter, HttpStatus, MediaType, ResponseHeaders, ResponseHeadersBuilder}
import com.linecorp.armeria.common.auth.OAuth2Token
import com.linecorp.armeria.internal.server.ResponseConversionUtil
import com.linecorp.armeria.server.{Server, ServiceRequestContext}
Expand Down Expand Up @@ -204,7 +204,7 @@ class DeltaSharingService(serverConfig: ServerConfig) {
case e: AccessDeniedException => throw e
case e: KernelException => throw e
case e: TableNotFoundException => throw e
case e: Throwable => throw new DeltaInternalException(e)
// case e: Throwable => throw new DeltaInternalException(e)
}
}

Expand Down Expand Up @@ -547,15 +547,64 @@ class DeltaSharingService(serverConfig: ServerConfig) {
}
logger.info(s"Took ${System.currentTimeMillis - start} ms to load the table " +
s"and sign ${queryResult.actions.length - 2} urls for table $share/$schema/$table")
streamingOutput(
Some(queryResult.version),
queryResult.responseFormat,
queryResult.actions,
includeEndStreamAction = includeEndStreamAction
)

import java.util.concurrent.Executors
import concurrent.{ExecutionContext, Await, Future}
import concurrent.duration._

// zhoulin
// scalastyle:off println
// single threaded execution context

println(s"----[zhoulin]-------Started for $share/$schema/$table-------")
startTime = System.currentTimeMillis()

if (true) {
val executor = Executors.newSingleThreadExecutor()
val responseWriter = streamingOutput(
Some(queryResult.version),
queryResult.responseFormat, queryResult.actions, Some(executor)
)
if (true) {
println(s"----[zhoulin]----before sleep ($elapsedTime)ms.")
Thread.sleep(3000)
println(s"----[zhoulin]----after sleep ($elapsedTime)ms.")
// executor.shutdownNow()
responseWriter.close()
}
responseWriter
} else {
implicit val context = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())

val executor = ServiceRequestContext.current().blockingTaskExecutor()
val f = Future {
streamingOutput(
Some(queryResult.version),
queryResult.responseFormat,
queryResult.actions,
Some(executor),
includeEndStreamAction = includeEndStreamAction
)
}

f.onComplete { a =>
println(s"----[zhoulin]----The future completes ($elapsedTime)ms:$a")
}
// scalastyle:off awaitresult
// scalastyle:off awaitready
val a = Await.ready(f, 10.seconds)
println(s"----[zhoulin]----($elapsedTime)ms, a:$a")
val b = Await.result(f, 10.seconds)
println(s"----[zhoulin]----($elapsedTime)ms, b:$b")
b
}
}
}

private def elapsedTime: Long = {
System.currentTimeMillis() - startTime
}

// scalastyle:off argcount
@Get("/shares/{share}/schemas/{schema}/tables/{table}/changes")
@ConsumesJson
Expand Down Expand Up @@ -604,11 +653,14 @@ class DeltaSharingService(serverConfig: ServerConfig) {
streamingOutput(Some(queryResult.version), queryResult.responseFormat, queryResult.actions)
}

private var startTime = System.currentTimeMillis()

private def streamingOutput(
version: Option[Long],
responseFormat: String,
actions: Seq[Object],
includeEndStreamAction: Boolean = false): HttpResponse = {
executorOpt: Option[java.util.concurrent.ExecutorService] = None,
includeEndStreamAction: Boolean = false): HttpResponseWriter = {
var capabilities = Seq[String](s"${DELTA_SHARING_RESPONSE_FORMAT}=$responseFormat")
if (includeEndStreamAction) {
capabilities = capabilities :+ s"$DELTA_SHARING_CAPABILITIES_INCLUDE_END_STREAM_ACTION=true"
Expand All @@ -634,9 +686,17 @@ class DeltaSharingService(serverConfig: ServerConfig) {
val out = new ByteArrayOutputStream
JsonUtils.mapper.writeValue(out, o)
out.write('\n')
Console.println(s"----[zhoulin]----server object,($elapsedTime)ms, [${out.toString}].")
if (out.toString.contains("https://delta-exchange-test")) {
Console.println(s"----[zhoulin]----trying to sleep.")
Thread.sleep(5000)
Console.println(s"----[zhoulin]----Done sleep.")
// executorOpt.get.shutdownNow()
// throw new IllegalArgumentException("lin zhou exception.")
}
HttpData.wrap(out.toByteArray)
},
ServiceRequestContext.current().blockingTaskExecutor())
executorOpt.getOrElse(ServiceRequestContext.current().blockingTaskExecutor()))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,15 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
}
}
val input = connection.getInputStream()

if (false) {
Thread.sleep(5000)
val a = server.close()
// scalastyle:off println
Console.println(s"----[linzhou]----stopped:[$a].")

}

val content = try {
IOUtils.toString(input)
} finally {
Expand Down Expand Up @@ -575,12 +584,12 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
}

integrationTest("table1 - non partitioned - /shares/{share}/schemas/{schema}/tables/{table}/query") {
Seq(true, false).foreach { includeEndStreamAction =>
Seq(false).foreach { includeEndStreamAction =>
Seq(
RESPONSE_FORMAT_PARQUET,
RESPONSE_FORMAT_DELTA,
s"$RESPONSE_FORMAT_DELTA,$RESPONSE_FORMAT_PARQUET",
s"$RESPONSE_FORMAT_PARQUET,$RESPONSE_FORMAT_DELTA"
// RESPONSE_FORMAT_DELTA,
// s"$RESPONSE_FORMAT_DELTA,$RESPONSE_FORMAT_PARQUET",
// s"$RESPONSE_FORMAT_PARQUET,$RESPONSE_FORMAT_DELTA"
).foreach { responseFormat =>
val respondedFormat = if (responseFormat == RESPONSE_FORMAT_DELTA) {
RESPONSE_FORMAT_DELTA
Expand Down Expand Up @@ -611,6 +620,7 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
assert(endAction.minUrlExpirationTimestamp != null)
lines = lines.dropRight(1)
}

if (responseFormat == RESPONSE_FORMAT_DELTA) {
val responseProtocol = JsonUtils.fromJson[DeltaResponseSingleAction](protocol).protocol
assert(responseProtocol.deltaProtocol.minReaderVersion == 1)
Expand Down Expand Up @@ -638,31 +648,31 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
schemaString = """{"type":"struct","fields":[{"name":"eventTime","type":"timestamp","nullable":true,"metadata":{}},{"name":"date","type":"date","nullable":true,"metadata":{}}]}""",
partitionColumns = Nil).wrap
assert(expectedMetadata == JsonUtils.fromJson[SingleAction](metadata))
val files = lines.drop(2)
val actualFiles = files.map(f => JsonUtils.fromJson[SingleAction](f).file)
assert(actualFiles.size == 2)
val expectedFiles = Seq(
AddFile(
url = actualFiles(0).url,
expirationTimestamp = actualFiles(0).expirationTimestamp,
id = "061cb3683a467066995f8cdaabd8667d",
partitionValues = Map.empty,
size = 781,
stats = """{"numRecords":1,"minValues":{"eventTime":"2021-04-28T06:32:22.421Z","date":"2021-04-28"},"maxValues":{"eventTime":"2021-04-28T06:32:22.421Z","date":"2021-04-28"},"nullCount":{"eventTime":0,"date":0}}"""
),
AddFile(
url = actualFiles(1).url,
expirationTimestamp = actualFiles(1).expirationTimestamp,
id = "e268cbf70dbaa6143e7e9fa3e2d3b00e",
partitionValues = Map.empty,
size = 781,
stats = """{"numRecords":1,"minValues":{"eventTime":"2021-04-28T06:32:02.070Z","date":"2021-04-28"},"maxValues":{"eventTime":"2021-04-28T06:32:02.070Z","date":"2021-04-28"},"nullCount":{"eventTime":0,"date":0}}"""
)
)
assert(actualFiles.count(_.expirationTimestamp != null) == 2)
assert(expectedFiles == actualFiles.toList)
verifyPreSignedUrl(actualFiles(0).url, 781)
verifyPreSignedUrl(actualFiles(1).url, 781)
// val files = lines.drop(2)
// val actualFiles = files.map(f => JsonUtils.fromJson[SingleAction](f).file)
// assert(actualFiles.size == 2)
// val expectedFiles = Seq(
// AddFile(
// url = actualFiles(0).url,
// expirationTimestamp = actualFiles(0).expirationTimestamp,
// id = "061cb3683a467066995f8cdaabd8667d",
// partitionValues = Map.empty,
// size = 781,
// stats = """{"numRecords":1,"minValues":{"eventTime":"2021-04-28T06:32:22.421Z","date":"2021-04-28"},"maxValues":{"eventTime":"2021-04-28T06:32:22.421Z","date":"2021-04-28"},"nullCount":{"eventTime":0,"date":0}}"""
// ),
// AddFile(
// url = actualFiles(1).url,
// expirationTimestamp = actualFiles(1).expirationTimestamp,
// id = "e268cbf70dbaa6143e7e9fa3e2d3b00e",
// partitionValues = Map.empty,
// size = 781,
// stats = """{"numRecords":1,"minValues":{"eventTime":"2021-04-28T06:32:02.070Z","date":"2021-04-28"},"maxValues":{"eventTime":"2021-04-28T06:32:02.070Z","date":"2021-04-28"},"nullCount":{"eventTime":0,"date":0}}"""
// )
// )
// assert(actualFiles.count(_.expirationTimestamp != null) == 2)
// assert(expectedFiles == actualFiles.toList)
// verifyPreSignedUrl(actualFiles(0).url, 781)
// verifyPreSignedUrl(actualFiles(1).url, 781)
}
}
}
Expand Down
Loading