-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathmpiutil.py
473 lines (336 loc) · 11.8 KB
/
mpiutil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
import warnings
import sys
import numpy as np
rank = 0
size = 1
_comm = None
world = None
rank0 = True
## Try to setup MPI and get the comm, rank and size.
## If not they should end up as rank=0, size=1.
try:
from mpi4py import MPI
_comm = MPI.COMM_WORLD
world = _comm
rank = _comm.Get_rank()
size = _comm.Get_size()
if rank:
print "MPI process %i of %i." % (rank, size)
rank0 = True if rank == 0 else False
sys_excepthook = sys.excepthook
def mpi_excepthook(type, value, traceback):
sys_excepthook(type, value, traceback)
MPI.COMM_WORLD.Abort(1)
sys.excepthook = mpi_excepthook
except ImportError:
warnings.warn("Warning: mpi4py not installed.")
def partition_list_alternate(full_list, i, n):
"""Partition a list into `n` pieces. Return the `i`th partition."""
return full_list[i::n]
def partition_list_mpi(full_list):
"""Return the partition of a list specific to the current MPI process."""
return partition_list_alternate(full_list, rank, size)
def mpirange(*args):
"""An MPI aware version of `range`, each process gets its own sub section.
"""
full_list = range(*args)
#if alternate:
return partition_list_alternate(full_list, rank, size)
#else:
# return np.array_split(full_list, size)[rank]
def barrier():
if size > 1:
_comm.Barrier()
def parallel_map(func, glist):
"""Apply a parallel map using MPI.
Should be called collectively on the same list. All ranks return the full
set of results.
Parameters
----------
func : function
Function to apply.
glist : list
List of map over. Must be globally defined.
Returns
-------
results : list
Global list of results.
"""
# Synchronize
barrier()
# If we're only on a single node, then just perform without MPI
if size == 1 and rank == 0:
return [func(item) for item in glist]
# Pair up each list item with its position.
zlist = list(enumerate(glist))
# Partition list based on MPI rank
llist = partition_list_mpi(zlist)
# Operate on sublist
flist = [(ind, func(item)) for ind, item in llist]
barrier()
# Gather all results onto all ranks
rlist = _comm.allgather(flist)
# Flatten the list of results
flatlist = [item for sublist in rlist for item in sublist]
# Sort into original order
sortlist = sorted(flatlist, key=(lambda item: item[0]))
# Synchronize
barrier()
# Zip to remove indices and extract the return values into a list
return list(zip(*sortlist)[1])
def typemap(dtype):
"""Map a numpy dtype into an MPI_Datatype.
Parameters
----------
dtype : np.dtype
The numpy datatype.
Returns
-------
mpitype : MPI.Datatype
The MPI.Datatype.
"""
return MPI.__TypeDict__[np.dtype(dtype).char]
def split_m(n, m):
"""
Split a range (0, n-1) into m sub-ranges of similar length
Parameters
----------
n : integer
Length of range to split.
m : integer
Number of subranges to split into.
Returns
-------
num : np.ndarray[m]
Number in each sub-range
start : np.ndarray[m]
Starting of each sub-range.
end : np.ndarray[m]
End of each sub-range.
See Also
--------
`split_all`, `split_local`
"""
base = (n / m)
rem = n % m
part = base * np.ones(m, dtype=np.int) + (np.arange(m) < rem).astype(np.int)
bound = np.cumsum(np.insert(part, 0, 0))
return np.array([part, bound[:m], bound[1:(m + 1)]])
def split_all(n, comm=None):
"""
Split a range (0, n-1) into sub-ranges for each MPI Process.
Parameters
----------
n : integer
Length of range to split.
comm : MPI Communicator, optional
MPI Communicator to use (default COMM_WORLD).
Returns
-------
num : np.ndarray[m]
Number for each rank.
start : np.ndarray[m]
Starting of each sub-range on a given rank.
end : np.ndarray[m]
End of each sub-range.
See Also
--------
`split_all`, `split_local`
"""
m = size if comm is None else comm.size
return split_m(n, m)
def split_local(n, comm=None):
"""
Split a range (0, n-1) into sub-ranges for each MPI Process. This returns
the parameters only for the current rank.
Parameters
----------
n : integer
Length of range to split.
comm : MPI Communicator, optional
MPI Communicator to use (default COMM_WORLD).
Returns
-------
num : integer
Number on this rank.
start : integer
Starting of the sub-range for this rank.
end : integer
End of rank for this rank.
See Also
--------
`split_all`, `split_local`
"""
pse = split_all(n, comm=comm)
m = rank if comm is None else comm.rank
return pse[:, m]
def transpose_blocks(row_array, shape, comm=None):
"""
Take a 2D matrix which is split between processes row-wise and split it
column wise between processes.
Parameters
----------
row_array : np.ndarray
The local section of the global array (split row wise).
shape : 2-tuple
The shape of the global array
comm : MPI communicator
MPI communicator that array is distributed over. Default is MPI.COMM_WORLD.
Returns
-------
col_array : np.ndarray
Local section of the global array (split column wise).
"""
if not comm:
try:
comm=MPI.COMM_WORLD
except NameError:
if row_array.shape[:-1] == shape[:-1]:
# We are working on a single node and being asked to do the
# a trivial transpose.
# Note that to mimic the mpi behaviour we have to allow the
# last index to be trimmed.
return row_array[...,:shape[-1]].copy()
else:
raise
nr = shape[0]
nc = shape[-1]
nm = 1 if len(shape) <= 2 else np.prod(shape[1:-1])
pr, sr, er = split_local(nr, comm=comm) * nm
pc, sc, ec = split_local(nc, comm=comm)
par, sar, ear = split_all(nr, comm=comm) * nm
pac, sac, eac = split_all(nc, comm=comm)
#print pr, nc, shape, row_array.shape
row_array = row_array[:nr, ..., :nc].reshape(pr, nc)
requests_send = []
requests_recv = []
recv_buffer = np.empty((nr * nm, pc), dtype=row_array.dtype)
mpitype = typemap(row_array.dtype)
# Iterate over all processes row wise
for ir in range(comm.size):
# Get the start and end of each set of rows
sir, eir = sar[ir], ear[ir]
# Iterate over all processes column wise
for ic in range(comm.size):
# Get the start and end of each set of columns
sic, eic = sac[ic], eac[ic]
# Construct a unique tag
tag = ir * comm.size + ic
# Send and receive the messages as non-blocking passes
if comm.rank == ir:
# Construct the block to send by cutting out the correct
# columns
block = row_array[:, sic:eic].copy()
#print ir, ic, comm.rank, block.shape
# Send the message
request = comm.Isend([block, mpitype], dest=ic, tag=tag)
requests_send.append([ir, ic, request])
if comm.rank == ic:
# Receive the message into the correct set of rows of recv_buffer
request = comm.Irecv([recv_buffer[sir:eir], mpitype], source=ir, tag=tag)
requests_recv.append([ir, ic, request])
#print ir, ic, comm.rank, recv_buffer[sir:eir].shape
# Wait for all processes to have started their messages
comm.Barrier()
# For each node iterate over all sends and wait until completion
for ir, ic, request in requests_send:
stat = MPI.Status()
#try:
request.Wait(status=stat)
#except MPI.Exception:
# print comm.rank, ir, ic, sar[ir], ear[ir], sac[ic], eac[ic], shape
if stat.error != MPI.SUCCESS:
print "**** ERROR in MPI SEND (r: %i c: %i rank: %i) *****" % (ir, ic, comm.rank)
#print "rank %i: Done waiting on MPI SEND" % comm.rank
comm.Barrier()
# For each frequency iterate over all receives and wait until completion
for ir, ic, request in requests_recv:
stat = MPI.Status()
#try:
request.Wait(status=stat)
#except MPI.Exception:
# print comm.rank, (ir, ic), (ear[ir]-sar[ir], eac[ic]-sac[ic]),
#shape, recv_buffer[sar[ir]:ear[ir]].shape, recv_buffer.dtype, row_array.dtype
if stat.error != MPI.SUCCESS:
print "**** ERROR in MPI RECV (r: %i c: %i rank: %i) *****" % (ir, ir, comm.rank)
return recv_buffer.reshape(shape[:-1] + (pc,))
def allocate_hdf5_dataset(fname, dsetname, shape, dtype, comm=None):
"""Create a hdf5 dataset and return its offset and size.
The dataset will be created contiguously and immediately allocated,
however it will not be filled.
Parameters
----------
fname : string
Name of the file to write.
dsetname : string
Name of the dataset to write (must be at root level).
shape : tuple
Shape of the dataset.
dtype : numpy datatype
Type of the dataset.
comm : MPI communicator
Communicator over which to broadcast results.
Returns
-------
offset : integer
Offset into the file at which the dataset starts (in bytes).
size : integer
Size of the dataset in bytes.
"""
import h5py
if not comm:
comm=MPI.COMM_WORLD
state = None
if comm.rank == 0:
# Create/open file
f = h5py.File(fname, 'a')
# Create dataspace and HDF5 datatype
sp = h5py.h5s.create_simple(shape, shape)
tp = h5py.h5t.py_create(dtype)
# Create a new plist and tell it to allocate the space for dataset
# immediately, but don't fill the file with zeros.
plist = h5py.h5p.create(h5py.h5p.DATASET_CREATE)
plist.set_alloc_time(h5py.h5d.ALLOC_TIME_EARLY)
plist.set_fill_time(h5py.h5d.FILL_TIME_NEVER)
# Create the dataset
dset = h5py.h5d.create(f.id, dsetname, tp, sp, plist)
# Get the offset of the dataset into the file.
state = dset.get_offset(), dset.get_storage_size()
f.close()
state = comm.bcast(state, root=0)
return state
def lock_and_write_buffer(obj, fname, offset, size):
"""Write the contents of a buffer to disk at a given offset, and explicitly
lock the region of the file whilst doing so.
Parameters
----------
obj : buffer
Data to write to disk.
fname : string
Filename to write.
offset : integer
Offset into the file to start writing at.
size : integer
Size of the region to write to (and lock).
"""
import os
import os.fcntl as fcntl
buf = buffer(obj)
if len(buf) > size:
raise Exception("Size doesn't match array length.")
fd = os.open(fname, os.O_RDRW | os.O_CREAT)
fcntl.lockf(fd, fcntl.LOCK_EX, size, offset, os.SEEK_SET)
nb = os.write(fd, buf)
if nb != len(buf):
raise Exception("Something funny happened with the reading.")
fcntl.lockf(fd, fcntl.LOCK_UN)
os.close(fd)
def parallel_rows_write_hdf5(fname, dsetname, local_data, shape, comm=None):
"""Write out array (distributed across processes row wise) into a HDF5 in parallel.
"""
if not comm:
comm=MPI.COMM_WORLD
offset, size = allocate_hdf5_dataset(fname, dsetname, shape, local_data.dtype, comm=comm)
lr, sr, er = split_local(shape[0], comm=comm)
nc = np.prod(shape[1:])
lock_and_write_buffer(local_data, fname, offset + sr * nc, lr * nc)