Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51072][CORE] CallerContext to set Hadoop cloud audit context #49779

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
import scala.util.matching.Regex

import _root_.io.netty.channel.unix.Errors.NativeIoException
import com.google.common.annotations.VisibleForTesting
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import com.google.common.collect.Interners
import com.google.common.io.{ByteStreams, Files => GFiles}
Expand All @@ -54,6 +55,7 @@ import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
import org.apache.hadoop.ipc.{CallerContext => HadoopCallerContext}
import org.apache.hadoop.ipc.CallerContext.{Builder => HadoopCallerContextBuilder}
Expand Down Expand Up @@ -3164,6 +3166,9 @@ private[util] object CallerContext extends Logging {
* specific applications impacting parts of the Hadoop system and potential problems they may be
* creating (e.g. overloading NN). As HDFS mentioned in HDFS-9184, for a given HDFS operation, it's
* very helpful to track which upper level job issues it.
* The context information is also set in the audit context for cloud storage
* connectors. If supported, this gets marshalled as part of the HTTP Referrer header
* or similar field, and so ends up in the store service logs themselves.
*
* @param from who sets up the caller context (TASK, CLIENT, APPMASTER)
*
Expand Down Expand Up @@ -3214,12 +3219,28 @@ private[spark] class CallerContext(

/**
* Set up the caller context [[context]] by invoking Hadoop CallerContext API of
* [[HadoopCallerContext]].
* [[HadoopCallerContext]], which is included in IPC calls.
* and the hadoop audit context, which may be included in cloud storage
* requests for collection in cloud service logs.
*/
def setCurrentContext(): Unit = if (CallerContext.callerContextEnabled) {
val hdfsContext = new HadoopCallerContextBuilder(context).build()
HadoopCallerContext.setCurrent(hdfsContext)
def setCurrentContext(): Unit = {
setCurrentContext(CallerContext.callerContextEnabled)
}

/**
* Inner method to set the context.
* @param enabled should the thread-level contexts be updated?
*/
@VisibleForTesting
private[util] def setCurrentContext(enabled: Boolean): Unit = {
if (enabled) {
val hdfsContext = new HadoopCallerContextBuilder(context).build()
HadoopCallerContext.setCurrent(hdfsContext)
// audit context as passed down to object stores, use prefix "spark"
currentAuditContext.put("spark", context)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making a PR. This one line seems to be the actual change. Did I understand correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes! Rest of it is test related

}
}

}

/**
Expand Down
24 changes: 19 additions & 5 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.commons.lang3.SystemUtils
import org.apache.commons.math3.stat.inference.ChiSquareTest
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext
import org.apache.hadoop.ipc.{CallerContext => HadoopCallerContext}
import org.apache.logging.log4j.Level

Expand Down Expand Up @@ -1003,11 +1004,24 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties {
}

test("Set Spark CallerContext") {
val context = "test"
new CallerContext(context).setCurrentContext()
if (CallerContext.callerContextEnabled) {
assert(s"SPARK_$context" === HadoopCallerContext.getCurrent.toString)
}
currentAuditContext.reset
new CallerContext("test",
Some("upstream"),
Some("app"),
Some("attempt"),
Some(1),
Some(2),
Some(3),
Some(4),
Some(5),
).setCurrentContext(true)
val expected = s"SPARK_test_app_attempt_JId_1_SId_2_3_TId_4_5_upstream"
assert(expected === HadoopCallerContext.getCurrent.toString)
assert(expected === currentAuditContext.get("spark"))
// false invocations do not update the context
new CallerContext("context2").setCurrentContext(false)
assert(expected === HadoopCallerContext.getCurrent.toString)
assert(expected === currentAuditContext.get("spark"))
}

test("encodeFileNameToURIRawPath") {
Expand Down