Skip to content

Manifest Splitting #767

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 58 commits into
base: main
Choose a base branch
from
Draft

Manifest Splitting #767

wants to merge 58 commits into from

Conversation

dcherian
Copy link
Contributor

@dcherian dcherian commented Feb 21, 2025

  • does the config get serialized properly?
  • real-world benchmark; test with ERA5
  • add ndim based condition (3D vs 4D)

pub struct ManifestShards(Vec<ManifestExtents>);

impl ManifestShards {
pub fn default(ndim: usize) -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this, but it is certainly tied to ndim.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe ManifestSplits is an enum to avoid this?

enum ManifestSplits {
   Single,
   Multiple(Vec<ManifestExtents>)
}

What I don't like is the empty vector. I wonder if Rust has a NonEmptyVec type, otherwise, a trick people use is:

...
   Multiple{ first: ManifestExtents, rest: Vec<ManifestExtents>}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I don't need the default any more. It was an artifact that appeared because I implemented the core logic before wiring up the config. Now the default gets set when parsing the config using the Array Metadata
image

@@ -37,9 +33,77 @@ impl ManifestExtents {
Self(v)
}

pub fn contains(&self, coord: &[u32]) -> bool {
self.iter().zip(coord.iter()).all(|(range, that)| range.contains(that))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to start checking on writes that indexes have the proper size for the metadata

@dcherian dcherian force-pushed the split-manifests branch 2 times, most recently from e7d9221 to 09476a4 Compare March 6, 2025 23:02
Comment on lines 1394 to 1400
for chunk in chunks {
let shard_index = shards.which(&chunk.coord)?;
sharded_refs
.entry(shard_index)
.or_insert_with(|| Vec::with_capacity(ref_capacity))
.push(chunk);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am attempting to convert this method to accept a impl Stream<Item=SessionResult<ChunkInfo>> but I don't see how to convert this groupby logic.

// - 0: 120
// - path: ./temperature # 4D variable: (time, level, latitude, longitude)
// manifest-split-sizes:
// - "level": 1 # alternatively 0: 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs validation. E.g. do these dimensions exist? does that axis number make sense?

}

static DEFAULT_MANIFEST_PRELOAD_CONFIG: OnceLock<ManifestPreloadConfig> = OnceLock::new();
static DEFAULT_MANIFEST_SHARDING_CONFIG: OnceLock<ManifestShardingConfig> =
OnceLock::new();

impl ManifestConfig {
pub fn merge(&self, other: Self) -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging is for merging a user-defined value with the library's default value.

Self {
preload: other.preload.or(self.preload.clone()),
// FIXME: why prioritize one over the other?
sharding: other.sharding.or(self.sharding.clone()),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: this could be overwrite instead. We need to careful about ordering after merge.

@dcherian
Copy link
Contributor Author

dcherian commented Apr 2, 2025

Local benchmarks as of late last night:

Read: 4x speedup

python benchmarks/runner.py --pytest '-m read_benchmark' main 137f2834

benchmark code: one array with shape 500_000_000, chunks=1000, shard_size=100_000 so 5 shards with 100_000 chunk refs each. Reading one element (so really one chunk request).

    def fn():
        repo = ic.Repository.open(
            storage=synth_dataset.storage,
            config=ic.RepositoryConfig(manifest=ic.ManifestConfig(preload=preload)),
        )
        ds = xr.open_zarr(
            repo.readonly_session("main").store,
            group=synth_dataset.group,
            chunks=None,
            consolidated=False,
        )
        subset = ds.isel(synth_dataset.chunk_selector)
        subset[synth_dataset.load_variables].compute()

Parameterized over:

  • sharding : either unsharded or sharded (5 shards)
  • preload: off or default (makes no difference)
------------------ benchmark 'xarray-read test_time_xarray_read_chunks_cold_cache': 6 tests ------------------
Name (time in ms)                                                                             Median
--------------------------------------------------------------------------------------------------------------
test_time_xarray_read_chunks_cold_cache[large-manifest-sharded-default] (this PR)       10.1482 (1.0)
test_time_xarray_read_chunks_cold_cache[large-manifest-sharded-off] (this PR)           10.2042 (1.01)
test_time_xarray_read_chunks_cold_cache[large-manifest-unsharded-default] (main)     43.5519 (4.29)
test_time_xarray_read_chunks_cold_cache[large-manifest-unsharded-default] (this PR)     45.1441 (4.45)
test_time_xarray_read_chunks_cold_cache[large-manifest-unsharded-off] (main)         43.3678 (4.27)
test_time_xarray_read_chunks_cold_cache[large-manifest-unsharded-off] (this PR)         45.2304 (4.46)
--------------------------------------------------------------------------------------------------------------

Write: 10% slowdown on commit.

python benchmarks/runner.py --pytest '-k write_sharded_refs' main 137f2834

benchmark: This benchmarks only session.commit after setting 500_000 virtual chunk refs

sharding: None (default) or shard size = 10_000 refs, so 50 shards in total.

even for the default case of writing a single shard. But this is only 0.5s so I don't think it matters much.

---------------- benchmark 'refs-write test_write_sharded_refs': 3 tests ----------------
Name (time in ms)                                                        Median
-----------------------------------------------------------------------------------------
test_write_sharded_refs[no-sharding-large-1d] (main)           491.2220 (1.0)
test_write_sharded_refs[no-sharding-large-1d] (this PR)           545.9192 (1.11)
test_write_sharded_refs[shard-size-10_000-large-1d] (this PR)     555.0974 (1.13)
-----------------------------------------------------------------------------------------

dcherian added 6 commits April 2, 2025 13:43
Local
-----

S3
--
* main:
  Fix `Diff` python typing (#890)
  Fail when creating Storage for Tigris using s3_compatible (#889)
  Disallow tag/branch creation with non-existing snapshot (#888)
  Log errors during listing and deleting of objects (#886)
  Rust integration tests can run in more object stores. (#884)
  Update pyo3. (#885)
  Add expiration to stateful test (#868)
* main:
  Better `Debug` instances and __repr__ methods. (#891)
  Add chunk container repr, fix test dataset (#893)
dcherian added 2 commits April 4, 2025 13:59
* main:
  Release version v0.2.12 (#894)
  Use dask array native reduction (#864)
  Update sample-datasets page (#887)
@dcherian dcherian changed the title Manifest Sharding Manifest Splitting Apr 4, 2025
pub struct ManifestShards(Vec<ManifestExtents>);

impl ManifestShards {
pub fn default(ndim: usize) -> Self {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe ManifestSplits is an enum to avoid this?

enum ManifestSplits {
   Single,
   Multiple(Vec<ManifestExtents>)
}

What I don't like is the empty vector. I wonder if Rust has a NonEmptyVec type, otherwise, a trick people use is:

...
   Multiple{ first: ManifestExtents, rest: Vec<ManifestExtents>}

/// ]
/// );
/// assert_eq!(actual, expected);
/// ```
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you be willing to write some property tests for this function?

@@ -87,6 +100,15 @@ impl ArrayShape {
}
}

// Implement indexing for immutable access
impl Index<usize> for ArrayShape {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I don't like about this: panics. Maybe it should return an Option? Not sure if that's something people do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's what the trait requires, so our hands are tied, no?

I could add a get method instead that returns Option

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that is what i was trying to say, a get to get an option would be safer.

@@ -1708,6 +1899,7 @@ async fn fetch_manifest(
/// available in `from` and `to` arguments.
///
/// Yes, this is horrible.
#[allow(dead_code)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we no longer use this shit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, i do a pass to group the references in to a manifest shard, so I just accumulate in that pass. I can delete.

* main:
  Bump the rust-dependencies group with 2 updates (#909)
  Release version 0.2.13 (#907)
  Skip bytes logging in object_store (#906)
  More randomness for test repo prefixes (#905)
  S3 Storage supports setting storage class (#903)
  Update configuration.md (#899)
  Add example to exercise high read concurrency (#896)
  Bump the rust-dependencies group with 2 updates (#897)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants