Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel load with no attributes #253

Merged
merged 8 commits into from
Feb 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ jobs:

- name: Install pip dependencies
run: |
pip install ruff pytest black mpi4py pyinstrument psutil pytest-lazy-fixture
pip install ruff pytest black mpi4py pyinstrument psutil pytest-lazy-fixtures
pip install -r requirements.txt
pip install -e .[compression]

@@ -62,7 +62,7 @@ jobs:
pip install -r requirements.txt
pip install zarr==2.11.3
pip install mpi4py numcodecs>=0.7.3 bitshuffle
pip install pytest pytest-lazy-fixture
pip install pytest pytest-lazy-fixtures
pip install -e .

- name: Run serial tests
2 changes: 1 addition & 1 deletion .readthedocs.yml
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ build:
# Build documentation in the docs/ directory with Sphinx
sphinx:
configuration: doc/conf.py
fail_on_warning: true
fail_on_warning: false

python:
install:
1 change: 1 addition & 0 deletions caput/__init__.py
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
tod
weighted_median
"""

from . import _version

__version__ = _version.get_versions()["version"]
1 change: 1 addition & 0 deletions caput/cache.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Tools for caching expensive calculations."""

import weakref

import numpy as np
1 change: 1 addition & 0 deletions caput/config.py
Original file line number Diff line number Diff line change
@@ -49,6 +49,7 @@
Richard 40.0 Sooty

"""

from __future__ import annotations

from typing import TYPE_CHECKING
1 change: 1 addition & 0 deletions caput/fileformats.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Interface for file formats supported by caput: HDF5 and Zarr."""

import logging
import os
import shutil
1 change: 0 additions & 1 deletion caput/interferometry.py
Original file line number Diff line number Diff line change
@@ -11,7 +11,6 @@
- :py:meth:`fringestop_phase`
"""


import numpy as np


55 changes: 41 additions & 14 deletions caput/memh5.py
Original file line number Diff line number Diff line change
@@ -58,13 +58,18 @@
from ast import literal_eval
from collections.abc import Mapping
from copy import deepcopy
from typing import Any
from pathlib import Path
from typing import TYPE_CHECKING, Any

import h5py
import numpy as np

from . import fileformats, misc, mpiarray, mpiutil, tools

if TYPE_CHECKING:
from mpi4py import MPI


logger = logging.getLogger(__name__)

try:
@@ -1856,6 +1861,10 @@ def from_file(
if distributed and file_format == fileformats.Zarr:
lockfile = f"{file_}.sync"
kwargs["synchronizer"] = zarr.ProcessSynchronizer(lockfile)

# NOTE: hints is not supported for ondisk files, remove the argument in
# case it's been passed indirectly
kwargs.pop("hints", None)
data = file_format.open(file_, **kwargs)
toclose = file_format == fileformats.HDF5

@@ -2863,25 +2872,35 @@ def _write_distributed_datasets(dest):


def _distributed_group_from_file(
fname,
comm=None,
_=True, # usually `hints`, but hints do not do anything in this method
convert_dataset_strings=False,
convert_attribute_strings=True,
file_format=fileformats.HDF5,
fname: str | Path,
comm: MPI.Comm | None = None,
hints: bool | dict = True,
convert_dataset_strings: bool = False,
convert_attribute_strings: bool = True,
file_format: type[fileformats.FileFormat] = fileformats.HDF5,
**kwargs,
):
"""Restore full tree from an HDF5 file or Zarr group into a distributed memh5 object.
"""Restore full tree from an HDF5 or Zarr into a distributed memh5 object.

A `selections=` parameter may be supplied as parts of 'kwargs'. See
`_deep_group_copy' for a description.

Hints may be a dictionary that can override the settings in the file itself. The
keys should be the path to the dataset and the value a dictionary with keys
`distributed` (boolean, required) and `axis` (integer, optional).
"""
# Create root group
group = MemGroup(distributed=True, comm=comm)
comm = group.comm

selections = kwargs.pop("selections", None)

# Fill the hints dict if set
hints_dict = {}
if isinstance(hints, dict):
hints_dict = hints
hints = True

# == Create some internal functions for doing the read ==
# Copy over attributes with a broadcast from rank = 0
def _copy_attrs_bcast(h5item, memitem, **kwargs):
@@ -2912,16 +2931,24 @@ def _copy_from_file(h5group, memgroup, selections=None):

# If dataset, create dataset
else:
dset_hints = hints_dict.get(item.name, {})

distributed = hints and (
dset_hints.get("distributed", False)
or item.attrs.get("__memh5_distributed_dset", False)
)
# Check if we are in a distributed dataset
if ("__memh5_distributed_dset" in item.attrs) and item.attrs[
"__memh5_distributed_dset"
]:
distributed_axis = item.attrs.get("__memh5_distributed_axis", 0)
if distributed:
distributed_axis = (
dset_hints["axis"]
if "axis" in dset_hints
else item.attrs.get("__memh5_distributed_axis", 0)
)

# Read from file into MPIArray
pdata = mpiarray.MPIArray.from_file(
h5group,
key,
fname,
item.name,
axis=distributed_axis,
comm=comm,
sel=selection,
21 changes: 21 additions & 0 deletions caput/misc.py
Original file line number Diff line number Diff line change
@@ -2,9 +2,15 @@

import importlib
import os
from pathlib import Path
from typing import TYPE_CHECKING, Optional, Union, overload

import h5py
import numpy as np

if TYPE_CHECKING:
from mpi4py import MPI


def vectorize(**base_kwargs):
"""Improved vectorization decorator.
@@ -182,6 +188,21 @@ def __get__(self, obj, objtype=None):
return _listize_desc


@overload
def open_h5py_mpi(
f: Union[str, Path, h5py.File],
mode: str,
use_mpi: bool = True,
comm: Optional["MPI.Comm"] = None,
) -> h5py.File: ...


@overload
def open_h5py_mpi(
f: h5py.Group, mode: str, use_mpi: bool = True, comm: Optional["MPI.Comm"] = None
) -> h5py.Group: ...


def open_h5py_mpi(f, mode, use_mpi=True, comm=None):
"""Ensure that we have an h5py File object.

1 change: 1 addition & 0 deletions caput/mpiarray.py
Original file line number Diff line number Diff line change
@@ -236,6 +236,7 @@
intermediate pickling process, which can lead to malformed arrays.

"""

import logging
import os
import time
2 changes: 1 addition & 1 deletion caput/scripts/runner.py
Original file line number Diff line number Diff line change
@@ -484,7 +484,7 @@ def queue(
if sfile != dfile:
shutil.copyfile(sfile, dfile)

if "modules" in rconf and rconf["modules"]:
if rconf.get("modules"):
modules = rconf["modules"]
modules = (modules,) if isinstance(modules, str) else modules
modstr = "module purge\nmodule load "
60 changes: 30 additions & 30 deletions caput/tests/test_memh5.py
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@
import h5py
import numpy as np
import pytest
from pytest_lazyfixture import lazy_fixture
from pytest_lazy_fixtures import lf
import zarr
import copy

@@ -183,8 +183,8 @@ def assertAttrsEqual(a, b):
@pytest.mark.parametrize(
"test_file,file_open_function",
[
(lazy_fixture("filled_h5_file"), h5py.File),
(lazy_fixture("filled_zarr_file"), zarr.open_group),
(lf("filled_h5_file"), h5py.File),
(lf("filled_zarr_file"), zarr.open_group),
],
)
def test_file_sanity(test_file, file_open_function):
@@ -196,12 +196,12 @@ def test_file_sanity(test_file, file_open_function):
@pytest.mark.parametrize(
"test_file,file_open_function,file_format",
[
(lazy_fixture("filled_h5_file"), h5py.File, None),
(lazy_fixture("filled_zarr_file"), zarr.open_group, None),
(lazy_fixture("filled_zarrzip_file"), zarr.open_group, None),
(lazy_fixture("filled_h5_file"), h5py.File, fileformats.HDF5),
(lazy_fixture("filled_zarr_file"), zarr.open_group, fileformats.Zarr),
(lazy_fixture("filled_zarrzip_file"), zarr.open_group, fileformats.Zarr),
(lf("filled_h5_file"), h5py.File, None),
(lf("filled_zarr_file"), zarr.open_group, None),
(lf("filled_zarrzip_file"), zarr.open_group, None),
(lf("filled_h5_file"), h5py.File, fileformats.HDF5),
(lf("filled_zarr_file"), zarr.open_group, fileformats.Zarr),
(lf("filled_zarrzip_file"), zarr.open_group, fileformats.Zarr),
],
)
def test_to_from_file(test_file, file_open_function, file_format):
@@ -223,8 +223,8 @@ def test_to_from_file(test_file, file_open_function, file_format):
@pytest.mark.parametrize(
"test_file,file_format",
[
(lazy_fixture("filled_h5_file"), fileformats.HDF5),
(lazy_fixture("filled_zarr_file"), fileformats.Zarr),
(lf("filled_h5_file"), fileformats.HDF5),
(lf("filled_zarr_file"), fileformats.Zarr),
],
)
def test_memdisk(test_file, file_format):
@@ -253,8 +253,8 @@ def test_memdisk(test_file, file_format):
@pytest.mark.parametrize(
"test_file,file_format",
[
(lazy_fixture("filled_h5_file"), fileformats.HDF5),
(lazy_fixture("filled_zarr_file"), fileformats.Zarr),
(lf("filled_h5_file"), fileformats.HDF5),
(lf("filled_zarr_file"), fileformats.Zarr),
],
)
def test_compression(test_file, file_format, compression, compression_opts, chunks):
@@ -302,10 +302,10 @@ class TempSubClass(memh5.MemDiskGroup):
@pytest.mark.parametrize(
"test_file,file_format",
[
(lazy_fixture("h5_file"), fileformats.HDF5),
(lazy_fixture("zarr_file"), fileformats.Zarr),
(lazy_fixture("h5_file"), None),
(lazy_fixture("zarr_file"), None),
(lf("h5_file"), fileformats.HDF5),
(lf("zarr_file"), fileformats.Zarr),
(lf("h5_file"), None),
(lf("zarr_file"), None),
],
)
def test_io(test_file, file_format):
@@ -395,9 +395,9 @@ def zarrzip_basiccont_file(zarr_basiccont_file):
@pytest.mark.parametrize(
"test_file,file_format",
[
(lazy_fixture("h5_basiccont_file"), fileformats.HDF5),
(lazy_fixture("zarr_basiccont_file"), fileformats.Zarr),
(lazy_fixture("zarrzip_basiccont_file"), fileformats.Zarr),
(lf("h5_basiccont_file"), fileformats.HDF5),
(lf("zarr_basiccont_file"), fileformats.Zarr),
(lf("zarrzip_basiccont_file"), fileformats.Zarr),
],
)
def test_access(test_file, file_format):
@@ -420,8 +420,8 @@ def test_access(test_file, file_format):
@pytest.mark.parametrize(
"test_file,file_format",
[
(lazy_fixture("h5_basiccont_file"), fileformats.HDF5),
(lazy_fixture("zarr_basiccont_file"), fileformats.Zarr),
(lf("h5_basiccont_file"), fileformats.HDF5),
(lf("zarr_basiccont_file"), fileformats.Zarr),
],
)
def test_history(test_file, file_format):
@@ -459,8 +459,8 @@ def test_history(test_file, file_format):
@pytest.mark.parametrize(
"test_file,file_format",
[
(lazy_fixture("h5_file"), fileformats.HDF5),
(lazy_fixture("zarr_file"), fileformats.Zarr),
(lf("h5_file"), fileformats.HDF5),
(lf("zarr_file"), fileformats.Zarr),
],
)
def test_to_from__file_unicode(test_file, file_format):
@@ -519,8 +519,8 @@ def test_to_from__file_unicode(test_file, file_format):
@pytest.mark.parametrize(
"test_file,file_format",
[
(lazy_fixture("h5_file"), fileformats.HDF5),
(lazy_fixture("zarr_file"), fileformats.Zarr),
(lf("h5_file"), fileformats.HDF5),
(lf("zarr_file"), fileformats.Zarr),
],
)
def test_failure(test_file, file_format):
@@ -537,8 +537,8 @@ def test_failure(test_file, file_format):
@pytest.mark.parametrize(
"test_file,file_format",
[
(lazy_fixture("h5_file"), fileformats.HDF5),
(lazy_fixture("zarr_file"), fileformats.Zarr),
(lf("h5_file"), fileformats.HDF5),
(lf("zarr_file"), fileformats.Zarr),
],
)
def test_to_from_hdf5(test_file, file_format):
@@ -567,8 +567,8 @@ def test_to_from_hdf5(test_file, file_format):
@pytest.mark.parametrize(
"test_file,file_format",
[
(lazy_fixture("h5_file"), fileformats.HDF5),
(lazy_fixture("zarr_file"), fileformats.Zarr),
(lf("h5_file"), fileformats.HDF5),
(lf("zarr_file"), fileformats.Zarr),
],
)
def test_json_failure(test_file, file_format):
10 changes: 5 additions & 5 deletions caput/tests/test_memh5_parallel.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Unit tests for the parallel features of the memh5 module."""

import pytest
from pytest_lazyfixture import lazy_fixture
from pytest_lazy_fixtures import lf
import numpy as np
import h5py
import zarr
@@ -62,9 +62,9 @@ def test_create_dataset():
@pytest.mark.parametrize(
"test_file,file_open_function,file_format",
[
(lazy_fixture("h5_file_distributed"), h5py.File, fileformats.HDF5),
(lf("h5_file_distributed"), h5py.File, fileformats.HDF5),
(
lazy_fixture("zarr_file_distributed"),
lf("zarr_file_distributed"),
zarr.open_group,
fileformats.Zarr,
),
@@ -173,9 +173,9 @@ def test_io(
@pytest.mark.parametrize(
"test_file,file_open_function,file_format",
[
(lazy_fixture("h5_file_distributed"), h5py.File, fileformats.HDF5),
(lf("h5_file_distributed"), h5py.File, fileformats.HDF5),
(
lazy_fixture("zarr_file_distributed"),
lf("zarr_file_distributed"),
zarr.open_group,
fileformats.Zarr,
),
Loading