Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into SPARK-49428
Browse files Browse the repository at this point in the history
  • Loading branch information
hvanhovell committed Jan 30, 2025
2 parents 248fb93 + ecf6851 commit 2fde7e1
Show file tree
Hide file tree
Showing 49 changed files with 718 additions and 302 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/publish_snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ on:
description: 'list of branches to publish (JSON)'
required: true
# keep in sync with default value of strategy matrix 'branch'
default: '["master", "branch-3.5"]'
default: '["master", "branch-4.0", "branch-3.5"]'

jobs:
publish-snapshot:
Expand All @@ -38,7 +38,7 @@ jobs:
fail-fast: false
matrix:
# keep in sync with default value of workflow_dispatch input 'branch'
branch: ${{ fromJSON( inputs.branch || '["master", "branch-3.5"]' ) }}
branch: ${{ fromJSON( inputs.branch || '["master", "branch-4.0", "branch-3.5"]' ) }}
steps:
- name: Checkout Spark repository
uses: actions/checkout@v4
Expand Down
1 change: 0 additions & 1 deletion LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ jakarta.inject:jakarta.inject-api
jakarta.validation:jakarta.validation-api
javax.jdo:jdo-api
joda-time:joda-time
net.java.dev.jna:jna
net.sf.opencsv:opencsv
net.sf.supercsv:super-csv
net.sf.jpam:jpam
Expand Down
1 change: 1 addition & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@
<id>hive-provided</id>
<properties>
<hive.deps.scope>provided</hive.deps.scope>
<hive.llap.scope>provided</hive.llap.scope>
<hive.jackson.scope>provided</hive.jackson.scope>
</properties>
</profile>
Expand Down
7 changes: 7 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@
},
"sqlState" : "42604"
},
"AVRO_CANNOT_WRITE_NULL_FIELD" : {
"message" : [
"Cannot write null value for field <name> defined as non-null Avro data type <dataType>.",
"To allow null value for this field, specify its avro schema as a union type with \"null\" using `avroSchema` option."
],
"sqlState" : "22004"
},
"AVRO_INCOMPATIBLE_READ_TYPE" : {
"message" : [
"Cannot convert Avro <avroPath> to SQL <sqlPath> because the original encoded data type is <avroType>, however you're trying to read the field as <sqlType>, which would lead to an incorrect answer.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ private[spark] object LogKeys {
case object NUM_ROWS extends LogKey
case object NUM_RULE_OF_RUNS extends LogKey
case object NUM_SEQUENCES extends LogKey
case object NUM_SKIPPED extends LogKey
case object NUM_SLOTS extends LogKey
case object NUM_SPILLS extends LogKey
case object NUM_SPILL_WRITERS extends LogKey
Expand Down
157 changes: 157 additions & 0 deletions common/utils/src/main/scala/org/apache/spark/internal/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.internal

import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

import org.apache.logging.log4j.{CloseableThreadContext, Level, LogManager}
Expand All @@ -27,6 +28,7 @@ import org.apache.logging.log4j.core.filter.AbstractFilter
import org.slf4j.{Logger, LoggerFactory}

import org.apache.spark.internal.Logging.SparkShellLoggingFilter
import org.apache.spark.internal.LogKeys
import org.apache.spark.util.SparkClassUtils

/**
Expand Down Expand Up @@ -531,3 +533,158 @@ private[spark] object Logging {
override def isStopped: Boolean = status == LifeCycle.State.STOPPED
}
}

/**
* A thread-safe token bucket-based throttler implementation with nanosecond accuracy.
*
* Each instance must be shared across all scopes it should throttle.
* For global throttling that means either by extending this class in an `object` or
* by creating the instance as a field of an `object`.
*
* @param bucketSize This corresponds to the largest possible burst without throttling,
* in number of executions.
* @param tokenRecoveryInterval Time between two tokens being added back to the bucket.
* This is reciprocal of the long-term average unthrottled rate.
*
* Example: With a bucket size of 100 and a recovery interval of 1s, we could log up to 100 events
* in under a second without throttling, but at that point the bucket is exhausted and we only
* regain the ability to log more events at 1 event per second. If we log less than 1 event/s
* the bucket will slowly refill until it's back at 100.
* Either way, we can always log at least 1 event/s.
*/
class LogThrottler(
val bucketSize: Int = 100,
val tokenRecoveryInterval: FiniteDuration = 1.second,
val timeSource: NanoTimeTimeSource = SystemNanoTimeSource) extends Logging {

private var remainingTokens = bucketSize
private var nextRecovery: DeadlineWithTimeSource =
DeadlineWithTimeSource.now(timeSource) + tokenRecoveryInterval
private var numSkipped: Long = 0

/**
* Run `thunk` as long as there are tokens remaining in the bucket,
* otherwise skip and remember number of skips.
*
* The argument to `thunk` is how many previous invocations have been skipped since the last time
* an invocation actually ran.
*
* Note: This method is `synchronized`, so it is concurrency safe.
* However, that also means no heavy-lifting should be done as part of this
* if the throttler is shared between concurrent threads.
* This also means that the synchronized block of the `thunk` that *does* execute will still
* hold up concurrent `thunk`s that will actually get rejected once they hold the lock.
* This is fine at low concurrency/low recovery rates. But if we need this to be more efficient at
* some point, we will need to decouple the check from the `thunk` execution.
*/
def throttled(thunk: Long => Unit): Unit = this.synchronized {
tryRecoverTokens()
if (remainingTokens > 0) {
thunk(numSkipped)
numSkipped = 0
remainingTokens -= 1
} else {
numSkipped += 1L
}
}

/**
* Same as [[throttled]] but turns the number of skipped invocations into a logging message
* that can be appended to item being logged in `thunk`.
*/
def throttledWithSkippedLogMessage(thunk: MessageWithContext => Unit): Unit = {
this.throttled { numSkipped =>
val skippedStr = if (numSkipped != 0L) {
log"[${MDC(LogKeys.NUM_SKIPPED, numSkipped)} similar messages were skipped.]"
} else {
log""
}
thunk(skippedStr)
}
}

/**
* Try to recover tokens, if the rate allows.
*
* Only call from within a `this.synchronized` block!
*/
private[spark] def tryRecoverTokens(): Unit = {
try {
// Doing it one-by-one is a bit inefficient for long periods, but it's easy to avoid jumps
// and rounding errors this way. The inefficiency shouldn't matter as long as the bucketSize
// isn't huge.
while (remainingTokens < bucketSize && nextRecovery.isOverdue()) {
remainingTokens += 1
nextRecovery += tokenRecoveryInterval
}

val currentTime = DeadlineWithTimeSource.now(timeSource)
if (remainingTokens == bucketSize &&
(currentTime - nextRecovery) > tokenRecoveryInterval) {
// Reset the recovery time, so we don't accumulate infinite recovery while nothing is
// going on.
nextRecovery = currentTime + tokenRecoveryInterval
}
} catch {
case _: IllegalArgumentException =>
// Adding FiniteDuration throws IllegalArgumentException instead of wrapping on overflow.
// Given that this happens every ~300 years, we can afford some non-linearity here,
// rather than taking the effort to properly work around that.
nextRecovery = DeadlineWithTimeSource(Duration(-Long.MaxValue, NANOSECONDS), timeSource)
}
}

/**
* Resets throttler state to initial state.
* Visible for testing.
*/
def reset(): Unit = this.synchronized {
remainingTokens = bucketSize
nextRecovery = DeadlineWithTimeSource.now(timeSource) + tokenRecoveryInterval
numSkipped = 0
}
}

/**
* This is essentially the same as Scala's [[Deadline]],
* just with a custom source of nanoTime so it can actually be tested properly.
*/
case class DeadlineWithTimeSource(
time: FiniteDuration,
timeSource: NanoTimeTimeSource = SystemNanoTimeSource) {
// Only implemented the methods LogThrottler actually needs for now.

/**
* Return a deadline advanced (i.e., moved into the future) by the given duration.
*/
def +(other: FiniteDuration): DeadlineWithTimeSource = copy(time = time + other)

/**
* Calculate time difference between this and the other deadline, where the result is directed
* (i.e., may be negative).
*/
def -(other: DeadlineWithTimeSource): FiniteDuration = time - other.time

/**
* Determine whether the deadline lies in the past at the point where this method is called.
*/
def isOverdue(): Boolean = (time.toNanos - timeSource.nanoTime()) <= 0
}

object DeadlineWithTimeSource {
/**
* Construct a deadline due exactly at the point where this method is called. Useful for then
* advancing it to obtain a future deadline, or for sampling the current time exactly once and
* then comparing it to multiple deadlines (using subtraction).
*/
def now(timeSource: NanoTimeTimeSource = SystemNanoTimeSource): DeadlineWithTimeSource =
DeadlineWithTimeSource(Duration(timeSource.nanoTime(), NANOSECONDS), timeSource)
}

/** Generalisation of [[System.nanoTime()]]. */
private[spark] trait NanoTimeTimeSource {
def nanoTime(): Long
}
private[spark] object SystemNanoTimeSource extends NanoTimeTimeSource {
override def nanoTime(): Long = System.nanoTime()
}
Loading

0 comments on commit 2fde7e1

Please sign in to comment.