Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate connect_hub performance when used against large, S3-based hubs #37

Closed
2 tasks done
bsweger opened this issue Jun 13, 2024 · 10 comments · Fixed by #47
Closed
2 tasks done

Investigate connect_hub performance when used against large, S3-based hubs #37

bsweger opened this issue Jun 13, 2024 · 10 comments · Fixed by #47
Assignees

Comments

@bsweger
Copy link
Contributor

bsweger commented Jun 13, 2024

Background

We've previously identified non-uniform parquet schemas as a performance culprit when using hubData (because you have to do a collect() before you can filter). That issue is logged here: hubverse-org/hubverse-transform#14

However, now that we have a larger hub on S3 (bucket: temp-flusight-archive-hub), we've found that even creating the initial connect_hub function (prior to doing any manipulations or collects) takes a prohibitively long time. @nickreich reported ~20 minutes, and I was getting ~8 min.

@annakrystalli previously ran some thorough benchmarks against a copy of the 2023-24 CDC FluSight hub, which had a baseline of 613 .csv model-output files at the time. This analysis found significant performance gains by converting the files to parquet and keeping them in the same place (i.e, same folder name, same filename).

The flusight archives have 4,217 parquet files in the model-output folder.

Definition of done

  • Determine cause(s) of the poor connect_hub performance against a large set of model-output files on S3
  • Make recommendations for next steps and open any corresponding issues
@bsweger bsweger converted this from a draft issue Jun 13, 2024
@bsweger bsweger self-assigned this Jun 13, 2024
@bsweger
Copy link
Contributor Author

bsweger commented Jun 13, 2024

After spending some time digging in, I believe that the increased number of "small" model-output files are causing two classes of performance problems for cloud-based hubs.

1. Those related to specific connect_hub checks/validations

connect_hub is a robust function that does a lot of work to shield users from various issues and provide helpful messages when something isn't right.

For two of these checks, the trade-off is poorer performance as the number of files increases, especially on cloud-based hubs:

Default is FALSE because checking all files up front incurs I/O and thus will be slower, especially on remote filesystems. If false and there are invalid files, there will be an error at scan time

These checks make sense, but they incur a significant cost on larger cloud-based hubs (the latter especially). From my ~8 min connect_hub baseline:

  • Removing check_file_format: ~7.5 min
  • Setting exclude_invalid_files=FALSE: 6.9 seconds
  • Doing both of the above: 4.5 seconds

2. Those related to data access methods and the "small files" problem

This set of tests is less definitive (I didn't do repeated, thorough benchmarks), but it's clear that reducing the number of parquet files on S3 reduces the elapsed time of arrow's open_dataset function, used in hub_connect (which makes sense).

To test fewer files, I partitioned the temp-flusight-archive-hub model-output files by target/output_type/location (based on the viz we were working on during the Hubverse retreat). The partitioned data is located: s3://temp-flusight-archive-hub/model-output-partitioned.

In the past, we worried about prematurely choosing the wrong partitioning strategy due to a lack of information about query patterns.

While we should choose a partitioning strategy carefully, even filtering by non-partitioned columns was faster than equivalent filters on the non-partitioned data in my limited tests. In other words, it's possible that the benefits of fewer files outweigh the drawbacks of a query that doesn't align with the chosen partition keys.

After copying connect_hub and making changes to accommodate the above partitioning strategy, I saw significantly-reduced times for running the function:

  • Targeting partitioned model-output: 15 seconds
  • Targeting partitioned model-outputs + removing check_file_format: 11.8 seconds
  • Targeting partitioned model-outputs + setting exclude_invalid_files=FALSE: 1.3 seconds
  • All of the above .98 seconds

For those who have access to Confluence, working notes here (not organized or intended as a definitive resource)

@bsweger
Copy link
Contributor Author

bsweger commented Jun 13, 2024

Based on the above, I'd recommend looking for an alternate way or place for doing the non-performant data quality checks in hub_connect (i.e., the first item in the above comment).

Partitioning the data is almost certainly something we should do. Not only to speed up hub_connect but also to ensure more performant data access for downstream applications like dashboards and visualizations.

However, partitioning is perhaps a good second step, since implementing it involves more work with more moving pieces. For example:

  • hub_connect doesn't work "out of the box" with alternate partitions (or maybe I missed something, entirely possible!)
  • would anything downstream break if we publish model-output files that aren't named in the round_id-model_id format?
  • for active hubs, we'd need a way to ensure periodic compaction of the model-output files in S3 (so we don't get the "many small files" over time)

@bsweger bsweger moved this from In Progress to Ready for Review in hubverse Development overview Jun 13, 2024
@annakrystalli
Copy link
Member

Note that excluding invalid files is needed if any other file type exists in the directory other than the file type being opened e.g. mixed file types or READMES etc. As these are common in standard hubs, the default is to remove them on opening, otherwise open_dataset just fails.

If the cloud transformation can guarantee these conditions, we can surface arguments to turn those checks off (probably a good idea to surface them anyways).

It would be nicer however for the function to be able to detect that it's dealing with a cloud hub were the above conditions are guaranteed and set those checks to off by default in that case. Perhaps through including a .cloud-hub file in the root of the hub that the function could use to detect such a hub?

Re: the repartioning, this will be easy to adapt hubData to handle but more work to do the actual repartioning.

Also it might be useful to compare to how easy it would be to just add files to a duckdb database instead of repartitioning.

@bsweger
Copy link
Contributor Author

bsweger commented Jun 14, 2024

Thanks for the 👀 @annakrystalli! Am splitting the response into 2 comments, one for validation defaults and one for repartitioning/duckdb.

Note that excluding invalid files is needed if any other file type exists in the directory other than the file type being opened e.g. mixed file types or READMES etc. As these are common in standard hubs, the default is to remove them on opening, otherwise open_dataset just fails.

If the cloud transformation can guarantee these conditions, we can surface arguments to turn those checks off (probably a good idea to surface them anyways).

It would be nicer however for the function to be able to detect that it's dealing with a cloud hub were the above conditions are guaranteed and set those checks to off by default in that case. Perhaps through including a .cloud-hub file in the root of the hub that the function could use to detect such a hub?

Good call--love the idea of implementing sensible default behaviors on users' behalf depending on where the data is. Currently, the transform function won't transform anything other than a .csv or .parquet file.. So I believe we can safely default to skipping checks for cloud-based hubs.

Rather than adding an explicit .cloud-hub file to hub repos, could we programmatically determine the default behavior? For example, looking at the attributes of Arrow's SubTreeFileSystem when in connect_hub.SubTreeFileSystem (e.g., if hub_path[["url_scheme"]] == "s3")? Or even just set the default to skip checks anytime the UseMethod dispatches to connect_hub.SubTreeFileSystem?

(While still surfacing the option to run or skip so people can override)

@annakrystalli
Copy link
Member

annakrystalli commented Jun 14, 2024

Rather than adding an explicit .cloud-hub file to hub repos, could we programmatically determine the default behavior? For example, looking at the attributes of Arrow's SubTreeFileSystem when in connect_hub.SubTreeFileSystem (e.g., if hub_path[["url_scheme"]] == "s3")? Or even just set the default to skip checks anytime the UseMethod dispatches to connect_hub.SubTreeFileSystem?

I initially considered that but the files just being on s3 does not guarantee that someone has for example not just dumped a hub as is on s3 or mirrored it in some other way (not out transformation action). What I was trying to get to with the file is some sort of signature that a cloud hub has been generated by our transformation function and therefore guaranteed to work without the checks. Very open to other suggestions!

@bsweger
Copy link
Contributor Author

bsweger commented Jun 14, 2024

Re: the repartioning, this will be easy to adapt hubData to handle but more work to do the actual repartioning.

Also it might be useful to compare to how easy it would be to just add files to a duckdb database instead of repartitioning.

Rather than adding an explicit .cloud-hub file to hub repos, could we programmatically determine the default behavior? For example, looking at the attributes of Arrow's SubTreeFileSystem when in connect_hub.SubTreeFileSystem (e.g., if hub_path[["url_scheme"]] == "s3")? Or even just set the default to skip checks anytime the UseMethod dispatches to connect_hub.SubTreeFileSystem?

I initially considered that but the files just being on s3 does not guarantee that someone has for example not just dumped a hub as is on s3 or mirrored it in some other way (not out transformation action). What I was trying to get to with the file is some sort of signature that a cloud hub has been generated by our transformation function and therefore guaranteed to work without the checks. Very open to other suggestions!

That's true, but as long as we're planning to allow someone to override the default and require checks/exclusions if required, I'd vote for "fewer moving parts" and checking programmatically instead of requiring a separate file in the repo (and if there's an error, provide a message suggesting to try again with "check=TRUE" or whatever we call it).

I'm not adverse to the transform function writing metadata to a hub's S3 bucket, but the transform operates on a file-by-file basis and doesn't inspect a hub's contents holistically (that's part of how it runs so quickly).

@bsweger
Copy link
Contributor Author

bsweger commented Jun 14, 2024

Re: the repartioning, this will be easy to adapt hubData to handle but more work to do the actual repartioning.

Oh that's great! Re: actual repartioning, I was thinking that the transform function could write out the data according to the partioning strategy we decide on. The bigger challenge, I think, is merging/compacting the small parquet files that will accumulate over time (again, because model-output files are being written to S3 one at a time).

Sounds like you were thinking something else, so maybe worth a chat next week?

Also it might be useful to compare to how easy it would be to just add files to a duckdb database instead of repartitioning.

Ah, I didn't have much luck with DuckDB for this purpose and should have noted that (updated).. I did think about a persistent DuckDB file on S3, but there are a few challenges:

  • Persistent DuckDB database files on S3 are read-only--you can't update them
  • I got intermittent errors when reading large amounts of data using the R DuckDB client
  • (caveat, this is based on limited testing on a laptop with a lot of memory): I had better performance using DuckDB's read_parquet function to create an in-memory table of model-output data than using the data file hosted on S3

Relying on clients like Arrow and DuckDB that can spin through a directory of parquet files (vs using a database) has the advantage of returning information about submissions as they arrive, even if we'd need to do a scheduled compaction.

Arrow works pretty well for multi-parquet file reads, especially when you're filtering on the partition columns. Since hubData is already based on Arrow and you've already smoothed out so many of the edge cases, we could stick with it for reading data, and try it out for partitioning data if we decide to do that.

@nickreich
Copy link
Contributor

One question I have about a potential future partitioning strategy standard, is that the one employed above target/output_type/location is specific to task-id fields that are project-specific. As in, output_type is a standard field, but both target and location might not be present for a hub. Would the partition need to be specified somewhere by the hub prior to creating the s3 bucket, and then the transform code would programmatically adapt?

@annakrystalli
Copy link
Member

I feel partitioning should not be enforced by us but configurable by hub Admins. We can of course recommend sensible defaults in hubTemplate (or wherever we host examples of configuring a hub for cloud).

@bsweger
Copy link
Contributor Author

bsweger commented Jun 20, 2024

I opened an issue to allow skipping data checks in connect_hub and connect_model_output: #43

As for the second question of data partitioning, agree with all of the above that there are additional considerations. And after a recent discussion, I've been thinking more about DuckDB...

DuckDB files hosted on S3 are read-only, which I'd originally considered a deal-breaker. However, based on a recent conversation, I think it's worth another look for these reasons:

  • our current dashboard priorities (viz, scoring) do not require real-time data updates: their backend data needs to be updated only after each round closes
  • if we consider scoring (which wasn't part of our retreat conversation), we'd ideally want a place to store model scores after computing them, so we don't have to run computations on the fly...once we enter the territory of computing and storing additional model/round-related information, a database option becomes more attractive

If there's a world where we can get reasonable DuckDB performance and can work out a way to recreate it and update the S3 .db file as part of a "round closing" process, it might be worth considering as a stand-alone data access solution or as part of data partitioning/compaction strategy.

bsweger added a commit to bsweger/hubData that referenced this issue Jun 27, 2024
This changeset adds an optional skip_checks parameter to
connect_hub.R and connect_model_output.R per the requirements
outlined in hubverse-org#37.

When working with hub data on a local filesystem, the behavior
is unchanged. When working with hub data in an S3 bucket, the
connect functions will now skip data checks by default to
improve performance. The former connection behavior for
S3-based hubs can obtained by explicitly setting
skip_checks=FALSE.

This comment fixes the test suite to work when using
skip_checks=FALSE to force the previous behavior. The
next commit will add new tests to ensure the new behavior
works as intended.
bsweger added a commit to bsweger/hubData that referenced this issue Jul 19, 2024
This changeset adds an optional skip_checks parameter to
connect_hub.R and connect_model_output.R per the requirements
outlined in hubverse-org#37.

When working with hub data on a local filesystem, the behavior
is unchanged. When working with hub data in an S3 bucket, the
connect functions will now skip data checks by default to
improve performance. The former connection behavior for
S3-based hubs can obtained by explicitly setting
skip_checks=FALSE.

This comment fixes the test suite to work when using
skip_checks=FALSE to force the previous behavior. The
next commit will add new tests to ensure the new behavior
works as intended.
bsweger added a commit to bsweger/hubData that referenced this issue Jul 25, 2024
This changeset adds an optional skip_checks parameter to
connect_hub.R and connect_model_output.R per the requirements
outlined in hubverse-org#37.

When working with hub data on a local filesystem, the behavior
is unchanged. When working with hub data in an S3 bucket, the
connect functions will now skip data checks by default to
improve performance. The former connection behavior for
S3-based hubs can obtained by explicitly setting
skip_checks=FALSE.

This comment fixes the test suite to work when using
skip_checks=FALSE to force the previous behavior. The
next commit will add new tests to ensure the new behavior
works as intended.
@github-project-automation github-project-automation bot moved this from Ready for Review to Done in hubverse Development overview Jul 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Development

Successfully merging a pull request may close this issue.

3 participants