Skip to content

[SPARK-50854][SS] Make path fully qualified before passing it to FileStreamSink #49654

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

vrozov
Copy link
Member

@vrozov vrozov commented Jan 24, 2025

What changes were proposed in this pull request?

  1. Ensure that if relative path is used in DataStreamWriter, the path resolution is done on the Spark Driver and is not deferred to Spark Executor.
  2. Construct fully qualified path in DataSource similar to how it is done for DataFrameWriter before it is passed to FileStreamSink.
  3. Add a check to FileStreamSink that asserts that path is an absolute path.

https://lists.apache.org/thread/ffzwn1y2fgyjw0j09cv4np9z00wymxwv

Why are the changes needed?

To properly support relative paths in structured streaming. The use case mostly applies to single node local Spark cluster.

Does this PR introduce any user-facing change?

The change is only applicable to the use case when relative path is used in DataStreamWriter, resulting in data being output to correct directory. No changes are expected for absolute path (the most common production case).

How was this patch tested?

Added new test case to FileStreamSinkSuite.

Was this patch authored or co-authored using generative AI tooling?

No

@vrozov
Copy link
Member Author

vrozov commented Jan 29, 2025

@dongjoon-hyun @HeartSaVioR Please review

@vrozov
Copy link
Member Author

vrozov commented Jan 31, 2025

@dongjoon-hyun Please review

@HeartSaVioR
Copy link
Contributor

I'd like to make this be super clear what scenario(s) make us struggle without this fix and how this fix will help resolving it.

The use case mostly applies to single node local Spark cluster.

I don't think it is very common scenario that people installs Spark in multiple directories and runs driver and executor in separate directory (or any way to set different working directory). Using relative path which resolves to different directories per process doesn't seem like a common scenario and I'd like to see the detail.

@HeartSaVioR
Copy link
Contributor

You can explain in other way around - if you see that file source/sink resolves the path in driver in batch query, please describe what's your setup and how you tested and what's the result backing up your claim. It could be used to make a valid claim that we want to have consistence between batch and streaming.

@vrozov
Copy link
Member Author

vrozov commented Feb 4, 2025

I'd like to make this be super clear what scenario(s) make us struggle without this fix and how this fix will help resolving it.

Please see SPARK-50854. When relative path is used in structured streaming (DataStreamWriter), the parquet files are written to the location relative to the executor (for example /opt/homebrew/Cellar/apache-spark/3.5.4/libexec/work/app-20250203151257-0000/5/test.parquet/part-00000-54a33cfa-b34c-4e84-8589-cfe763b18ccd-c000.snappy.parquet) instead of the location relative to the driver as in the case of batch processing.

This PR does not address the validity and/or use case of the relative path usage that is likely limited to a single node cluster where driver, master and executors have access to the same local file system, though there may be other setups where the same condition applies.

I don't think it is very common scenario that people installs Spark in multiple directories and runs driver and executor in separate directory (or any way to set different working directory). Using relative path which resolves to different directories per process doesn't seem like a common scenario and I'd like to see the detail.

The PR does not target installation into different directories. The problem is reproducible on a single node cluster with default installation (for example dev build or brew install). The driver is likely to be executed from the root or from it's own directory as driver code is different from the Spark code.

You can explain in other way around - if you see that file source/sink resolves the path in driver in batch query, please describe what's your setup and how you tested and what's the result backing up your claim. It could be used to make a valid claim that we want to have consistence between batch and streaming.

There is no custom setup done for the Spark. It is default single node cluster.

@HeartSaVioR
Copy link
Contributor

While I strongly suspect this is purely based on the setup of the cluster, I'm open for the change to make FileStreamSink work with brew setup.

Though I don't like to make any behavior change as it is not justified that this will help any arbitrary setup. Could you please add the sink option in FileStreamSink, like "qualifyRelativePathInDriver", with default "false"?

@vrozov
Copy link
Member Author

vrozov commented Feb 6, 2025

Though I don't like to make any behavior change as it is not justified that this will help any arbitrary setup. Could you please add the sink option in FileStreamSink, like "qualifyRelativePathInDriver", with default "false"?

I don't see why this is necessary and how it will help as the proposal is not to change the behavior that is incorrect. As the change impacts only relative plan and does not impact cases when absolute plan is specified adding extra option will only add confusion, IMO.

While I strongly suspect this is purely based on the setup of the cluster, I'm open for the change to make FileStreamSink work with brew setup.

No, it is not specific to the cluster setup. Please try to run the code provided in the JIRA in your cluster setup.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Feb 10, 2025

I've finally found a time to play with this. Sorry, I don't deal with this kind of setup for daily job.

I see the default setup from Spark distribution also sets CWD differently between driver and worker. metadata files contain the data file path (despite it's stored outside) correctly, but Spark file reader can't still read the data files properly.

I also checked quickly with batch query and saw batch file writer is writing files into the path for driver. I'm yet to read the code path for batch in details, but probably the correction seems to be happening from moving temp file to the final path.

I'll take a look at the PR, but I'd love to be conservative about behavioral change. Spark is the project many companies and individuals are relying on to make revenue, and many of them have claimed "bug as a spec" when upgrading broke their workload.

Could you please add the sink option in FileStreamSink, like "doNotQualifyRelativePathInDriver", with default "false"? I'd also like to see this option be updated in SS doc. Please include the option in docs/streaming/apis-on-dataframes-and-datasets.md.

Thanks!

EDIT: I have thought about this a bit, and it seems to be uneasy to imagine the valid case leveraging prior behavior and seeing it work. I'm OK with the change without flag, but let's just be due diligent via updating migration guide (docs/streaming/ss-migration-guide.md), under Spark 4.0.

If we couldn't make it sooner like before Feb 15, this fix will target to Spark 4.1.

@github-actions github-actions bot added the DOCS label Feb 11, 2025
@vrozov
Copy link
Member Author

vrozov commented Feb 11, 2025

@HeartSaVioR please take a look

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending CI

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master/4.0.

HeartSaVioR pushed a commit that referenced this pull request Feb 12, 2025
…StreamSink

### What changes were proposed in this pull request?
1. Ensure that if relative path is used in `DataStreamWriter`, the path resolution is done on the Spark Driver and is not deferred to Spark Executor.
2. Construct fully qualified path in `DataSource` similar to how it is done for `DataFrameWriter` before it is passed to `FileStreamSink`.
3. Add a check to `FileStreamSink` that asserts that `path` is an absolute path.

https://lists.apache.org/thread/ffzwn1y2fgyjw0j09cv4np9z00wymxwv

### Why are the changes needed?
To properly support relative paths in structured streaming. The use case mostly applies to single node local Spark cluster.

### Does this PR introduce _any_ user-facing change?
The change is only applicable to the use case when relative path is used in `DataStreamWriter`, resulting in data being output to correct directory. No changes are expected for absolute path (the most common production case).

### How was this patch tested?
Added new test case to `FileStreamSinkSuite`.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #49654 from vrozov/SPARK-50854.

Authored-by: Vlad Rozov <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 16784fa)
Signed-off-by: Jungtaek Lim <[email protected]>
@vrozov vrozov deleted the SPARK-50854 branch February 12, 2025 16:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants