Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51136][CORE] Set CallerContext for History Server #49858

Closed
wants to merge 1 commit into from

Conversation

cnauroth
Copy link
Contributor

@cnauroth cnauroth commented Feb 9, 2025

What changes were proposed in this pull request?

Initialize the Hadoop RPC CallerContext during History Server startup, before FileSystem access. Calls to HDFS will get tagged in the audit log as originating from the History Server.

Why are the changes needed?

Other YARN-based Spark processes set the CallerContext, so that additional auditing context propagates in Hadoop RPC calls. This PR provides auditing context for calls from the History Server. Other callers provide additional information like app ID, attempt ID, etc. We don't provide that here through History Server, which serves multiple apps/attempts.

Does this PR introduce any user-facing change?

Yes. In environments that configure hadoop.caller.context.enabled=true, users will now see additional information in the HDFS audit logs explicitly stating that calls originated from the History Server.

How was this patch tested?

A new unit test has been added. All tests pass in the history package.

build/mvn -pl core test -Dtest=none -DmembersOnlySuites=org.apache.spark.deploy.history

When the changes are deployed to a running cluster, the new caller context is visible in the HDFS audit logs.

2025-02-07 23:00:54,657 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=open	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0012	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
2025-02-07 23:00:54,683 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=open	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0011	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
2025-02-07 23:00:54,699 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=open	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0011	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
2025-02-07 23:00:54,715 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=open	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0010	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
2025-02-07 23:00:54,729 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=open	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0010	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
2025-02-07 23:00:54,743 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=open	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0009	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
2025-02-07 23:00:54,755 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=open	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0009	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
2025-02-07 23:00:54,767 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=open	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0008	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
2025-02-07 23:00:54,779 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=open	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0008	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
2025-02-07 23:01:04,160 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=listStatus	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the CORE label Feb 9, 2025
@@ -3151,9 +3152,31 @@ private[spark] object Utils
}
}

private[util] object CallerContext extends Logging {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed to relax visibility on things here to facilitate unit testing with caller context enabled. LMK if a different approach is preferred (reflection?).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change looks okay for me.

@cnauroth
Copy link
Contributor Author

cnauroth commented Feb 9, 2025

If approved, can this also go into branch-3.5 please? The cherry-pick would need a minor merge conflict resolution in FsHistoryProvider import statements, or I can send a separate pull request.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-51136][HISTORYSERVER] Set CallerContext for History Server [SPARK-51136][CORE] Set CallerContext for History Server Feb 10, 2025
SparkHadoopUtil.get.conf.getBoolean("hadoop.caller.context.enabled", false)
private[spark] object CallerContext extends Logging {
var callerContextEnabled: Boolean = SparkHadoopUtil.get.conf.getBoolean(
HADOOP_CALLER_CONTEXT_ENABLED_KEY, HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mind, please revert this change. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, I can definitely do that. :)

Can you please help me understand the reasoning though? Does the Spark codebase prefer not to reference Hadoop's configuration constants?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, right. We prefer to avoid those compilation dependency. Not only for Hadoop, but also Hive, too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, and I will keep it in mind in future patches.

*
* VisibleForTesting
*/
def withCallerContextEnabled[T](enabled: Boolean)(func: => T): T = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a pure test-helper utility instead of VisibleForTesting. Do we have any benefit in main code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no benefit to main code, other than keeping all access to callerContextEnabled in the same object. Otherwise, people reading the code might be confused about why it's var instead of val.

If you prefer, I can just leave a comment on why it's var and move this to a new CallerContextTestUtils object under test (or some existing test utils file if you have another suggestion).

I would potentially like to reuse this in tests covering other usage of caller context, which is why I didn't put it directly in FsHistoryProviderSuite.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you confirm this PR description claims for non-YARN Spark deamons like Spark Master/Spark Worker/Spark ThriftServer and so on? Or, is this only referring YARN ApplicationMaster and Client-related code?

Other Spark processes set the CallerContext, so that additional auditing context propagates in Hadoop RPC calls.

@cnauroth
Copy link
Contributor Author

cnauroth commented Feb 10, 2025

Could you confirm this PR description claims for non-YARN Spark deamons like Spark Master/Spark Worker/Spark ThriftServer and so on? Or, is this only referring YARN ApplicationMaster and Client-related code?

Other Spark processes set the CallerContext, so that additional auditing context propagates in Hadoop RPC calls.

@dongjoon-hyun , thank you for the review. That's a good point. I have only confirmed existing support for YARN-based workloads, so I will update the PR description.

We can continue to check support in the other processes.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Feb 10, 2025

Thank you for replies.

BTW, for the following question, we cannot backport the AS-IS SPARK-51136 because it's filed as Improvement.

If approved, can this also go into branch-3.5 please? The cherry-pick would need a minor merge conflict resolution in FsHistoryProvider import statements, or I can send a separate pull request.

Screenshot 2025-02-10 at 15 06 59

If you want this in Apache Spark 3.5.5, please convert the JIRA to Bug from Improvement and describe the rationals why this is a bug fix. Then, I can help you.

@cnauroth
Copy link
Contributor Author

@dongjoon-hyun , regarding backport to 3.5, I don't think I can justify calling it a bug. It's providing an improvement, not fixing existing functionality that has been broken. I'll retract my request for the backport.

I pushed up a change to stop referencing the Hadoop constants. LMK your preference on placement of the test helper. Happy to push up another change.

#49858 (comment)

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Feb 11, 2025

BTW, I'm investigating the relevant code at this chance while reviewing this PR. It's because the existing test coverage also looks suspicious due to val callerContextEnabled. The code of this PR itself looks okay, but let me figure out what is the best way to test Spark's Hadoop Caller Context support. Sorry for the delay.

test("Set Spark CallerContext") {
val context = "test"
new CallerContext(context).setCurrentContext()
if (CallerContext.callerContextEnabled) {
assert(s"SPARK_$context" === HadoopCallerContext.getCurrent.toString)
}
}

@cnauroth
Copy link
Contributor Author

BTW, I'm investigating the relevant code at this chance while reviewing this PR. It's because the existing test coverage also looks suspicious due to val callerContextEnabled. The code of this PR itself looks okay, but let me figure out what is the best way to test Spark's Hadoop Caller Context support. Sorry for the delay.

test("Set Spark CallerContext") {
val context = "test"
new CallerContext(context).setCurrentContext()
if (CallerContext.callerContextEnabled) {
assert(s"SPARK_$context" === HadoopCallerContext.getCurrent.toString)
}
}

If coverage looks suspicious, then it might be that the flag is only initialized once per JVM startup, and test runs probably don't have configuration for hadoop.caller.context.enabled=true. Steve hinted at this in #49779. My test helper is trying to work around this (with the downside of compromising visibility and mutability of the flag).

No worries on the delay. Committers are busy! 😄

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a PR to enable hadoop.caller.context.enabled always during testing, @cnauroth .

@dongjoon-hyun
Copy link
Member

Could you rebase this PR to the master branch, @cnauroth ?

### What changes were proposed in this pull request?

Initialize the Hadoop RPC `CallerContext` during History Server startup, before `FileSystem` access. Calls to HDFS will get tagged in the audit log as originating from the History Server.

### Why are the changes needed?

Other YARN-based Spark processes set the `CallerContext`, so that additional auditing context propagates in Hadoop RPC calls. This PR provides auditing context for calls from the History Server. Other callers provide additional information like app ID, attempt ID, etc. We don't provide that here through History Server, which serves multiple apps/attempts.

### Does this PR introduce _any_ user-facing change?

Yes. In environments that configure `hadoop.caller.context.enabled=true`, users will now see additional information in the HDFS audit logs explicitly stating that calls originated from the History Server.

### How was this patch tested?

A new unit test has been added. All tests pass in the history package.

```
build/mvn -pl core test -Dtest=none -DmembersOnlySuites=org.apache.spark.deploy.history
```

When the changes are deployed to a running cluster, the new caller context is visible in the HDFS audit logs.

```
2025-02-07 23:00:54,657 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=open	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0012	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
2025-02-07 23:00:54,683 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=open	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0011	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
2025-02-07 23:00:54,699 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=open	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0011	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
2025-02-07 23:00:54,715 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=open	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0010	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
2025-02-07 23:00:54,729 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=open	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0010	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
2025-02-07 23:00:54,743 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=open	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0009	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
2025-02-07 23:00:54,755 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=open	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0009	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
2025-02-07 23:00:54,767 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=open	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0008	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
2025-02-07 23:00:54,779 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=open	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history/application_1738779819434_0008	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
2025-02-07 23:01:04,160 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true	ugi=spark (auth:SIMPLE)	ip=/10.240.5.205	cmd=listStatus	src=/133bcb94-52b8-4356-ad9b-7358c78ce7fd/spark-job-history	dst=null	perm=null	proto=rpc	callerContext=SPARK_HISTORY
```

### Was this patch authored or co-authored using generative AI tooling?

No.
@cnauroth
Copy link
Contributor Author

@dongjoon-hyun , I rebased to current master and removed my test helper. Thank you.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. (Pending CIs).

@dongjoon-hyun
Copy link
Member

All core module tests passed. Merged to master for Apache Spark 4.1.0.

@cnauroth
Copy link
Contributor Author

Great, really appreciate your efforts on testing strategy for this change! Thank you, @dongjoon-hyun .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants