Skip to content

Commit 6f4a833

Browse files
authored
Dask: Array (#952)
Add bindings for all record components to dask arrays (ND mesh record components and 1D particle record components).
1 parent 760c274 commit 6f4a833

File tree

4 files changed

+158
-1
lines changed

4 files changed

+158
-1
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ It was initially started by the [Computational Radiation Physics Group](https://
365365
The following people and institutions [contributed](https://github.com/openPMD/openPMD-api/graphs/contributors) to openPMD-api:
366366

367367
* [Axel Huebl (HZDR, now LBNL)](https://github.com/ax3l):
368-
project lead, releases, documentation, automated CI/CD, Python bindings, installation & packaging, prior reference implementations
368+
project lead, releases, documentation, automated CI/CD, Python bindings, Dask, installation & packaging, prior reference implementations
369369
* [Fabian Koller (HZDR)](https://github.com/C0nsultant):
370370
initial library design and implementation with HDF5 & ADIOS1 backend
371371
* [Franz Poeschel (CASUS)](https://github.com/franzpoeschel):

examples/11_particle_dataframe.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,18 @@
5757
# example3: save all data data to parquet files
5858
delayed_save = delayed(df.to_parquet("electrons.parquet"))
5959
delayed_save.compute()
60+
61+
# Dask example 2: meshes
62+
if found_dask:
63+
E_x = s.iterations[400].meshes["E"]["x"]
64+
E_y = s.iterations[400].meshes["E"]["y"]
65+
E_z = s.iterations[400].meshes["E"]["z"]
66+
darr_x = E_x.to_dask_array()
67+
darr_y = E_y.to_dask_array()
68+
darr_z = E_z.to_dask_array()
69+
70+
# example compute intensity
71+
Intensity = darr_x * darr_x + darr_y * darr_y + darr_z * darr_z
72+
Intensity_max = Intensity.max().compute()
73+
idx_max = da.argwhere(Intensity == Intensity_max).compute()[0]
74+
print("maximum intensity I={} at index={}".format(Intensity_max, idx_max))
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
"""
2+
This file is part of the openPMD-api.
3+
4+
Copyright 2021 openPMD contributors
5+
Authors: Axel Huebl
6+
License: LGPLv3+
7+
"""
8+
import math
9+
import numpy as np
10+
try:
11+
from dask.array import from_array
12+
found_dask = True
13+
except ImportError:
14+
found_dask = False
15+
16+
17+
class DaskRecordComponent:
18+
# shape, .ndim, .dtype and support numpy-style slicing
19+
def __init__(self, record_component):
20+
self.rc = record_component
21+
22+
@property
23+
def shape(self):
24+
# fixme: https://github.com/openPMD/openPMD-api/issues/808
25+
return tuple(self.rc.shape)
26+
27+
@property
28+
def ndim(self):
29+
return self.rc.ndim
30+
31+
@property
32+
def dtype(self):
33+
return self.rc.dtype
34+
35+
def __getitem__(self, slices):
36+
"""here we support what Record_Component implements: a tuple of slices,
37+
a slice or an index; we do not support fancy indexing
38+
"""
39+
# FIXME: implement handling of zero-slices in Record_Component
40+
# https://github.com/openPMD/openPMD-api/issues/957
41+
all_zero = True
42+
for s in slices:
43+
if s != np.s_[0:0]:
44+
all_zero = False
45+
if all_zero:
46+
return np.array([], dtype=self.dtype)
47+
48+
data = self.rc[slices]
49+
self.rc.series_flush()
50+
if not math.isclose(1.0, self.rc.unit_SI):
51+
data = np.multiply(data, self.rc.unit_SI)
52+
53+
return data
54+
55+
56+
def record_component_to_daskarray(record_component):
57+
"""
58+
Load a RecordComponent into a Dask.array.
59+
60+
Parameters
61+
----------
62+
record_component : openpmd_api.Record_Component
63+
A record component class in openPMD-api.
64+
65+
Returns
66+
-------
67+
dask.array
68+
A dask array.
69+
70+
Raises
71+
------
72+
ImportError
73+
Raises an exception if dask is not installed
74+
75+
See Also
76+
--------
77+
openpmd_api.BaseRecordComponent.available_chunks : available chunks that
78+
are used internally to parallelize reading
79+
dask.array : the (potentially distributed) array object created here
80+
"""
81+
if not found_dask:
82+
raise ImportError("dask NOT found. Install dask for Dask DataFrame "
83+
"support.")
84+
85+
# get optimal chunks
86+
chunks = record_component.available_chunks()
87+
88+
# sort and prepare the chunks for Dask's array API
89+
# https://docs.dask.org/en/latest/array-chunks.html
90+
# https://docs.dask.org/en/latest/array-api.html?highlight=from_array#other-functions
91+
# sorted and unique
92+
offsets_per_dim = list(map(list, zip(*[chunk.offset for chunk in chunks])))
93+
offsets_sorted_unique_per_dim = [sorted(set(o)) for o in offsets_per_dim]
94+
95+
# print("offsets_sorted_unique_per_dim=",
96+
# list(offsets_sorted_unique_per_dim))
97+
98+
# case 1: PIConGPU static load balancing (works with Dask assumptions,
99+
# chunk option no. 3)
100+
# all chunks in the same column have the same column width although
101+
# individual columns have different widths
102+
# case 2: AMReX boxes
103+
# all chunks are multiple of a common block size, offsets are a multiple
104+
# of a common blocksize
105+
# problem: too limited description in Dask
106+
# https://github.com/dask/dask/issues/7475
107+
# work-around: create smaller chunks (this incurs a read cost) by forcing
108+
# into case 1
109+
# (this can lead to larger blocks than using the gcd of the
110+
# extents aka AMReX block size)
111+
common_chunk_widths_per_dim = list()
112+
for d, offsets_in_dim in enumerate(offsets_sorted_unique_per_dim):
113+
# print("d=", d, offsets_in_dim, record_component.shape[d])
114+
offsets_in_dim_arr = np.array(offsets_in_dim)
115+
# note: this is in the right order of rows/columns, contrary to a
116+
# sorted extent list from chunks
117+
extents_in_dim = np.zeros_like(offsets_in_dim_arr)
118+
extents_in_dim[:-1] = offsets_in_dim_arr[1:]
119+
extents_in_dim[-1] = record_component.shape[d]
120+
if len(extents_in_dim) > 1:
121+
extents_in_dim[:-1] -= offsets_in_dim_arr[:-1]
122+
extents_in_dim[-1] -= offsets_in_dim_arr[-1]
123+
# print("extents_in_dim=", extents_in_dim)
124+
common_chunk_widths_per_dim.append(tuple(extents_in_dim))
125+
126+
common_chunk_widths_per_dim = tuple(common_chunk_widths_per_dim)
127+
# print("common_chunk_widths_per_dim=", common_chunk_widths_per_dim)
128+
129+
da = from_array(
130+
DaskRecordComponent(record_component),
131+
chunks=common_chunk_widths_per_dim,
132+
# name=None,
133+
asarray=True,
134+
fancy=False,
135+
# getitem=None,
136+
# meta=None,
137+
# inline_array=False
138+
)
139+
140+
return da

src/binding/python/openpmd_api/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from .openpmd_api_cxx import * # noqa
33
from .DataFrame import particles_to_dataframe
44
from .DaskDataFrame import particles_to_daskdataframe
5+
from .DaskArray import record_component_to_daskarray
56

67

78
__version__ = cxx.__version__
@@ -12,6 +13,7 @@
1213
# extend CXX classes with extra methods
1314
ParticleSpecies.to_df = particles_to_dataframe # noqa
1415
ParticleSpecies.to_dask = particles_to_daskdataframe # noqa
16+
Record_Component.to_dask_array = record_component_to_daskarray # noqa
1517

1618
# TODO remove in future versions (deprecated)
1719
Access_Type = Access # noqa

0 commit comments

Comments
 (0)