This repository has been archived by the owner on Mar 20, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathio.py
433 lines (357 loc) · 15.7 KB
/
io.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
from thetis import *
from firedrake.petsc import PETSc
import datetime
import os
from adapt_utils.options import CoupledOptions
__all__ = ["save_mesh", "load_mesh", "initialise_field", "export_field", "initialise_bathymetry",
"export_bathymetry", "initialise_hydrodynamics", "export_hydrodynamics",
"OuterLoopLogger", "TimeDependentAdaptationLogger", "readfile", "index_string",
"get_date", "create_directory", "print_output", "COMM_WORLD", "File"]
def get_filename(fname, index_str):
if index_str is None:
return fname
try:
return '_'.join([fname, index_str])
except TypeError:
return '_'.join([fname, index_string(index_str)])
# --- Input
def load_mesh(fname, fpath='.', delete=False):
"""
:arg fname: file name (without '.h5' extension).
:kwarg fpath: directory where the file is stored.
:kwarg delete: toggle deletion of the file.
:return: mesh loaded from DMPlex format.
"""
if COMM_WORLD.size > 1:
raise IOError("Loading a mesh from HDF5 only works in serial.")
newplex = PETSc.DMPlex().create()
newplex.createFromFile(os.path.join(fpath, fname + '.h5'))
# Optionally delete the HDF5 file
if delete:
os.remove(os.path.join(fpath, fname) + '.h5')
return Mesh(newplex)
def initialise_field(fs, name, fname, fpath='.', outputdir=None, op=CoupledOptions(), **kwargs):
"""
Initialise bathymetry field with results from a previous simulation.
:arg fs: field will live in this finite element space.
:arg name: name used internally for field.
:arg fname: file name (without '.h5' extension).
:kwarg fpath: directory to read the data from.
:kwarg op: :class:`Options` parameter object.
:kwarg index_str: optional five digit string.
:kwarg delete: toggle deletion of the file.
"""
delete = kwargs.get('delete', False)
index_str = kwargs.get('index_str', None)
fname = get_filename(fname, index_str)
with timed_stage('initialising {:s}'.format(name)):
f = Function(fs, name=name)
with DumbCheckpoint(os.path.join(fpath, fname), mode=FILE_READ) as chk:
chk.load(f)
# Optionally delete the HDF5 file
if delete:
os.remove(os.path.join(fpath, fname) + '.h5')
# Plot to PVD
if outputdir is not None and op.plot_pvd:
File(os.path.join(outputdir, '_'.join([name, 'imported.pvd']))).write(f)
return f
def initialise_bathymetry(mesh, fpath, op=CoupledOptions(), **kwargs):
"""
Initialise bathymetry field with results from a previous simulation.
:arg mesh: field will be defined in finite element space on this mesh.
:arg fpath: directory to read the data from.
:kwarg op: :class:`Options` parameter object.
"""
# TODO: Would be nice to have consistency: here mesh is an arg but below it is read from file
fs = FunctionSpace(mesh, op.bathymetry_family.upper(), 1)
return initialise_field(fs, 'bathymetry', 'bathymetry', fpath, **kwargs)
def initialise_hydrodynamics(inputdir='.', outputdir=None, op=CoupledOptions(), **kwargs):
"""
Initialise velocity and elevation with results from a previous simulation.
:kwarg inputdir: directory to read the data from.
:kwarg outputdir: directory to optionally plot the data in .pvd format.
:kwarg op: :class:`Options` parameter object.
:kwarg delete: toggle deletion of the file.
:kwarg plexname: file name used for the DMPlex data file.
:kwarg variant: relates to distribution of quadrature nodes in an element.
"""
plexname = kwargs.get('plexname', 'myplex')
variant = kwargs.get('variant', 'equispaced')
delete = kwargs.get('delete', False)
index_str = kwargs.get('index_str', None)
# Get finite element
if op.family == 'dg-dg':
uv_element = VectorElement("DG", triangle, 1)
if variant is None:
elev_element = FiniteElement("DG", triangle, 1)
else:
elev_element = FiniteElement("DG", triangle, 1, variant=variant)
elif op.family == 'dg-cg':
uv_element = VectorElement("DG", triangle, 1)
if variant is None:
elev_element = FiniteElement("CG", triangle, 2)
else:
elev_element = FiniteElement("CG", triangle, 2, variant=variant)
elif op.family == 'cg-cg':
uv_element = VectorElement("CG", triangle, 2)
if variant is None:
elev_element = FiniteElement("CG", triangle, 1)
else:
elev_element = FiniteElement("CG", triangle, 1, variant=variant)
# Load mesh
with timed_stage('mesh'):
mesh = op.default_mesh if plexname is None else load_mesh(plexname, inputdir, delete=delete)
# Load velocity
name = "velocity"
fname = get_filename(name, index_str)
U = FunctionSpace(mesh, uv_element)
with timed_stage('initialising {:s}'.format(name)):
with DumbCheckpoint(os.path.join(inputdir, fname), mode=FILE_READ) as chk:
uv_init = Function(U, name=name)
chk.load(uv_init)
# Optionally delete the velocity HDF5 file
if delete:
os.remove(os.path.join(inputdir, fname) + '.h5')
# Load elevation
name = "elevation"
fname = get_filename(name, index_str)
H = FunctionSpace(mesh, elev_element)
with timed_stage('initialising {:s}'.format(name)):
with DumbCheckpoint(os.path.join(inputdir, fname), mode=FILE_READ) as chk:
elev_init = Function(H, name=name)
chk.load(elev_init)
# Optionally delete the elevation HDF5 file
if delete:
os.remove(os.path.join(inputdir, fname) + '.h5')
# Plot to .pvd
if outputdir is not None and op.plot_pvd:
uv_proj = Function(VectorFunctionSpace(mesh, "CG", 1), name="Initial velocity")
uv_proj.project(uv_init)
File(os.path.join(outputdir, "velocity_imported.pvd")).write(uv_proj)
elev_proj = Function(FunctionSpace(mesh, "CG", 1), name="Initial elevation")
elev_proj.project(elev_init)
File(os.path.join(outputdir, "elevation_imported.pvd")).write(elev_proj)
return uv_init, elev_init
# --- Output
def save_mesh(mesh, fname, fpath='.'):
"""
:arg mesh: mesh to be saved in DMPlex format.
:kwarg fname: file name (without '.h5' extension).
:arg fpath: directory to store the file.
"""
if COMM_WORLD.size > 1:
raise IOError("Saving a mesh to HDF5 only works in serial.")
try:
plex = mesh.topology_dm
except AttributeError:
plex = mesh._topology_dm # Backwards compatability
viewer = PETSc.Viewer().createHDF5(os.path.join(fpath, fname + '.h5'), 'w')
viewer(plex)
def export_field(f, name, fname, fpath='.', plexname='myplex', op=CoupledOptions(), index_str=None):
"""
Export some field to be used in a subsequent simulation.
:arg f: field (Firedrake :class:`Function`) to be stored.
:arg name: name used internally for field.
:arg fname: filename to save the data to.
:kwarg fpath: directory to save the data to.
:kwarg plexname: file name to be used for the DMPlex data file.
:kwarg op: :class:`Options` parameter object.
:kwarg index_str: optional five digit string.
"""
if not os.path.exists(fpath):
os.makedirs(fpath)
op.print_debug("I/O: Exporting {:s} for subsequent simulation".format(name))
# Create checkpoint to HDF5
fname = get_filename(fname, index_str)
with DumbCheckpoint(os.path.join(fpath, fname), mode=FILE_CREATE) as chk:
chk.store(f, name=name)
# Plot to PVD
if op.plot_pvd:
File(os.path.join(fpath, '_'.join([name, 'out.pvd']))).write(f)
# Save mesh to DMPlex format
if plexname is not None:
save_mesh(f.function_space().mesh(), plexname, fpath)
def export_bathymetry(bathymetry, fpath, **kwargs):
"""
Export bathymetry field to be used in a subsequent simulation.
:arg bathymetry: field to be stored.
:arg fpath: directory to save the data to.
:kwarg plexname: file name to be used for the DMPlex data file.
:kwarg op: :class:`Options` parameter object.
:kwarg index_str: optional five digit string.
"""
export_field(bathymetry, 'bathymetry', 'bathymetry', fpath, **kwargs)
def export_hydrodynamics(uv, elev, fpath='.', plexname='myplex', op=CoupledOptions(), **kwargs):
"""
Export velocity and elevation to be used in a subsequent simulation
:arg uv: velocity field to be stored.
:arg elev: elevation field to be stored.
:kwarg fpath: directory to save the data to.
:kwarg plexname: file name to be used for the DMPlex data file.
:kwarg op: :class:`Options` parameter object.
:kwarg index_str: optional five digit string.
"""
index_str = kwargs.get('index_str', None)
if not os.path.exists(fpath):
os.makedirs(fpath)
op.print_debug("I/O: Exporting fields for subsequent simulation")
# Check consistency of meshes
mesh = elev.function_space().mesh()
assert mesh == uv.function_space().mesh()
# Export velocity
name = "velocity"
fname = get_filename(name, index_str)
with DumbCheckpoint(os.path.join(fpath, fname), mode=FILE_CREATE) as chk:
chk.store(uv, name=name)
# Export elevation
name = "elevation"
fname = get_filename(name, index_str)
with DumbCheckpoint(os.path.join(fpath, fname), mode=FILE_CREATE) as chk:
chk.store(elev, name=name)
# Plot to .pvd
if op.plot_pvd:
uv_proj = Function(VectorFunctionSpace(mesh, "CG", 1), name="Initial velocity")
uv_proj.project(uv)
File(os.path.join(fpath, 'velocity_out.pvd')).write(uv_proj)
elev_proj = Function(FunctionSpace(mesh, "CG", 1), name="Initial elevation")
elev_proj.project(elev)
File(os.path.join(fpath, 'elevation_out.pvd')).write(elev_proj)
# Export mesh
if plexname is not None:
save_mesh(mesh, plexname, fpath)
# --- Logging
class OuterLoopLogger(object):
"""
A simple logger for simulations which have an outer loop over meshes. This might be for
convergence analysis on a hierarchy of fixed meshes, or on sequences of time-dependent adapted
meshes.
"""
def __init__(self, prob, verbose=True, **known):
"""
:arg prob: :class:`AdaptiveProblem` solver object.
:kwarg verbose: print during logging.
:kwargs known: expanded dictionary of parameters.
"""
self.prob = prob
self.verbose = verbose
self.divider = 80*'*' + '\n'
self.msg = " {:34s}: {:}\n"
# Create a log string
self.logstr = self.divider + 33*' ' + 'PARAMETERS\n' + self.divider
# Log known parameters
for key in known:
self.logstr += self.msg.format(key, known[key])
# Print parameters to screen
if self.verbose:
print_output(self.logstr + self.divider)
def log_git_sha(self):
"""
Add a line to the log string which records the sha of the adapt_utils git commit in use.
"""
adapt_utils_home = os.environ.get('ADAPT_UTILS_HOME')
with open(os.path.join(adapt_utils_home, '.git', 'logs', 'HEAD'), 'r') as gitlog:
for line in gitlog:
words = line.split() # TODO: Just read last line
self.logstr += self.msg.format('adapt_utils git commit', words[1])
def create_log_dir(self, fpath):
"""
:arg fpath: directory to save log file in.
"""
date = get_date()
j = 0
while True:
self.di = os.path.join(fpath, '{:s}-run-{:d}'.format(date, j))
if not os.path.exists(self.di):
create_directory(self.di)
break
j += 1
def write(self, fpath, save_meshes=False):
"""
Write the log out to file.
:arg fpath: directory to save log file in.
:kwarg save_meshes: toggle whether to save meshes to file in DMPlex format.
"""
if fpath is None:
return
self.create_log_dir(fpath)
with open(os.path.join(self.di, 'log'), 'w') as logfile:
logfile.write(self.logstr)
if save_meshes:
self.prob.store_meshes(fpath=self.di)
if self.verbose:
print_output("logdir: {:s}".format(self.di))
# TODO: Allow logging during simulation
def log(self, fname='log', fpath=None, save_meshes=False):
"""
:args unknown: expanded list of unknown parsed arguments.
:kwarg fname: filename for log file.
:kwarg fpath: directory to save log file in. If `None`, the log is simply printed.
:kwarg save_meshes: save meshes to file in the same directory.
"""
self.log_git_sha()
# Log element count and QoI from each outer iteration
self.logstr += self.divider + 35*' ' + 'SUMMARY\n' + self.divider
self.logstr += "{:8s} {:8s} {:7s} {:8s}\n".format('Elements', 'Vertices', 'QoI', 'Time')
for num_cells, num_vertices, qoi, time in zip(self.prob.num_cells, self.prob.num_vertices, self.prob.qois, self.prob.times):
self.logstr += "{:8d} {:8d} {:7.4e} {:.2f}\n".format(num_cells, num_vertices, qoi, time)
if self.verbose:
print_output(self.logstr)
# Write out
self.write(fpath, save_meshes=save_meshes)
class TimeDependentAdaptationLogger(OuterLoopLogger):
"""
A simple logger for simulations which use time-dependent mesh adaptation with an outer loop.
Statistics on metrics and meshes are printed to screen, saved to a log file, or both.
"""
# TODO: Allow logging during simulation
def log(self, *unknown, fname='log', fpath=None, save_meshes=False):
"""
:args unknown: expanded list of unknown parsed arguments.
:kwarg fname: filename for log file.
:kwarg fpath: directory to save log file in. If `None`, the log is simply printed.
:kwarg save_meshes: save meshes to file in the same directory.
"""
self.log_git_sha()
# Log unknown parameters
for i in range(len(unknown)//2):
self.logstr += self.msg.format(unknown[2*i][1:], unknown[2*i+1])
# Log mesh and metric stats from each outer iteration
self.logstr += self.divider + 35*' ' + 'SUMMARY\n' + self.divider
for n, (qoi, complexity) in enumerate(zip(self.prob.qois, self.prob.st_complexities)):
self.logstr += "Mesh iteration {:2d}: qoi {:.8e}".format(n+1, qoi)
if n > 0:
self.logstr += " space-time complexity {:.8e}".format(complexity)
self.logstr += "\n"
# Log stats from last outer iteration
self.logstr += self.divider + 30*' ' + 'FINAL ELEMENT & VERTEX COUNTS\n' + self.divider
l = self.prob.op.end_time/self.prob.op.num_meshes
for i, (num_cells, num_vertices) in enumerate(zip(self.prob.num_cells[-1], self.prob.num_vertices[-1])):
self.logstr += "Time window ({:7.1f},{:7.1f}]: {:7d} {:7d}\n".format(i*l, (i+1)*l, num_cells, num_vertices)
self.logstr += self.divider
if self.verbose:
print_output(self.logstr)
# Write out
self.write(fpath)
def readfile(filename, reverse=False):
"""
Read a file line-by-line.
:kwarg reverse: read the lines in reverse order.
"""
with open(filename, 'r') as read_obj:
lines = read_obj.readlines()
lines = [line.strip() for line in lines]
if reverse:
lines = reversed(lines)
return lines
def index_string(index, n=5):
"""
:arg index: integer form of index.
:return: n-digit string form of index.
"""
return (n - len(str(index)))*'0' + str(index)
def get_date():
"""
Get the date in year-month-day format.
"""
today = datetime.date.today()
return '{:d}-{:d}-{:d}'.format(today.year, today.month, today.day)