Skip to content

Commit b3be0ce

Browse files
authored
Merge pull request #25 from CMB-S4/data_convert
Data conversion tools
2 parents 9725175 + 9d6b284 commit b3be0ce

File tree

3 files changed

+461
-0
lines changed

3 files changed

+461
-0
lines changed

dc1/noise_sim/compress_hdf5.py

+173
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
#!/usr/bin/env python3
2+
3+
"""
4+
This script loads v0 uncompressed TOAST observations and writes v1 compressed files.
5+
"""
6+
7+
import os
8+
import sys
9+
import shutil
10+
import re
11+
import glob
12+
13+
import datetime
14+
15+
import argparse
16+
17+
import numpy as np
18+
19+
from astropy import units as u
20+
21+
import toast
22+
import toast.ops
23+
24+
from toast.timing import gather_timers, dump, Timer
25+
26+
from toast.observation import default_values as defaults
27+
28+
29+
def parse_arguments():
30+
"""
31+
Defines and parses the arguments for the script.
32+
"""
33+
parser = argparse.ArgumentParser(
34+
description="Compress CMB-S4 simulation data"
35+
)
36+
37+
parser.add_argument(
38+
"--verify",
39+
required=False,
40+
action="store_true",
41+
default=False,
42+
help="Re-load the converted data and verify consistency",
43+
)
44+
45+
parser.add_argument(
46+
"--obs",
47+
type=str,
48+
required=False,
49+
nargs="+",
50+
help="One or more observation files",
51+
)
52+
53+
# The operators we want to configure from the command line or a parameter file.
54+
operators = list()
55+
56+
# Parse all of the operator configuration
57+
config, args, jobargs = toast.parse_config(parser, operators=operators)
58+
59+
return config, args, jobargs
60+
61+
62+
def main():
63+
env = toast.utils.Environment.get()
64+
log = toast.utils.Logger.get()
65+
env.enable_function_timers()
66+
global_timer = toast.timing.GlobalTimers.get()
67+
global_timer.start("compress HDF5 (total)")
68+
69+
config, args, jobargs = parse_arguments()
70+
71+
# Default group size
72+
comm = toast.Comm()
73+
74+
# Process each observation
75+
for obs_path in args.obs:
76+
obs_dir = os.path.dirname(obs_path)
77+
file_root = os.path.splitext(obs_path)[0]
78+
if comm.world_rank == 0:
79+
print(f"Working on {obs_path}:")
80+
backup = f"{file_root}_uncompressed.h5"
81+
timer = Timer()
82+
timer.start()
83+
obs = toast.io.load_hdf5(
84+
obs_path,
85+
comm,
86+
process_rows=comm.group_size,
87+
meta=None,
88+
detdata=None,
89+
shared=None,
90+
intervals=None,
91+
detectors=None,
92+
force_serial=False,
93+
)
94+
95+
if comm.comm_world is not None:
96+
comm.comm_world.barrier()
97+
timer.stop()
98+
if comm.world_rank == 0:
99+
print(f" Load {obs_path} in {timer.seconds()} s", flush=True)
100+
101+
if comm.world_rank == 0:
102+
os.rename(obs_path, backup)
103+
104+
if comm.comm_world is not None:
105+
comm.comm_world.barrier()
106+
107+
timer.start()
108+
obf = toast.io.save_hdf5(
109+
obs,
110+
obs_dir,
111+
meta=None,
112+
detdata=[
113+
(defaults.det_data, {"type": "flac"}),
114+
(defaults.det_flags, {"type": "gzip"}),
115+
],
116+
shared=None,
117+
intervals=None,
118+
config=None,
119+
times=defaults.times,
120+
force_serial=False,
121+
)
122+
if comm.comm_world is not None:
123+
comm.comm_world.barrier()
124+
timer.stop()
125+
if comm.world_rank == 0:
126+
print(f" Save {obs_path} in {timer.seconds()} s", flush=True)
127+
128+
if obf != obs_path:
129+
msg = f"Generated HDF5 ({obf}) does not match original "
130+
msg += f"file name ({obs_path})"
131+
raise RuntimeError(msg)
132+
133+
if args.verify:
134+
timer.start()
135+
compare = toast.io.load_hdf5(
136+
obs_path,
137+
comm,
138+
process_rows=comm.group_size,
139+
)
140+
if comm.comm_world is not None:
141+
comm.comm_world.barrier()
142+
timer.stop()
143+
if comm.world_rank == 0:
144+
print(
145+
f" Re-load {obs_path} for verification in {timer.seconds()} s",
146+
flush=True
147+
)
148+
149+
if compare != obs:
150+
msg = f"Observation HDF5 verify failed:\n"
151+
msg += f"Input = {obs}\n"
152+
msg += f"Loaded = {compare}"
153+
log.error(msg)
154+
raise RuntimeError(msg)
155+
elif comm.world_rank == 0:
156+
print(f" Verification PASS", flush=True)
157+
else:
158+
if comm.world_rank == 0:
159+
print(f" Skipping verification", flush=True)
160+
161+
# Dump all the timing information to the output dir
162+
163+
global_timer.stop("compress HDF5 (total)")
164+
alltimers = gather_timers(comm=comm.comm_world)
165+
if comm.world_rank == 0:
166+
out = os.path.join(".", "timing")
167+
dump(alltimers, out)
168+
169+
170+
if __name__ == "__main__":
171+
world, procs, rank = toast.mpi.get_world()
172+
with toast.mpi.exception_guard(comm=world):
173+
main()

dc1/noise_sim/compress_hdf5.slurm

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#!/bin/bash
2+
#SBATCH --qos=regular
3+
#SBATCH --time=05:00:00
4+
#SBATCH --nodes=1
5+
#SBATCH --job-name=CMBS4_todcompress
6+
#SBATCH --licenses=SCRATCH
7+
#SBATCH --constraint=cpu
8+
#SBATCH --account=mp107
9+
10+
# set parent directory corresponding to one frequency band
11+
PARENT_DIRECTORY="/global/cfs/cdirs/cmbs4/dc/dc0/staging/noise_sim/outputs_rk/LAT0_CHLAT/f090"
12+
13+
# set range of observations to compress by this script (inclusive)
14+
START_INDEX=1
15+
END_INDEX=30
16+
17+
echo "Listing all observations in $PARENT_DIRECTORY"
18+
19+
# list all observations in parent directory and save to a variable
20+
SUBDIR_LIST=$(find "$PARENT_DIRECTORY" -mindepth 1 -maxdepth 1 -type d | sort)
21+
# extract observations names for printing to console
22+
SUBDIR_NAMES=$(echo "$SUBDIR_LIST" | xargs -I{} basename {})
23+
echo "Observations found: "
24+
echo "$SUBDIR_NAMES"
25+
26+
echo "Proceeding to compress observations indexed in range: $START_INDEX-$END_INDEX"
27+
# select subset of observations (subdirectories) based on range and save to a variable
28+
SELECTED_SUBDIRS=$(echo "$SUBDIR_LIST" | sed -n "${START_INDEX},${END_INDEX}p")
29+
# extract selected observations names for printing to console
30+
SELECTED_SUBDIR_NAMES=$(echo "$SELECTED_SUBDIRS" | xargs -I{} basename {})
31+
echo "Selected observations: "
32+
echo "$SELECTED_SUBDIR_NAMES"
33+
34+
# loop through selected subdirectories and process each one
35+
for subdir in $SELECTED_SUBDIRS; do
36+
echo "Processing observation: $(basename $subdir)"
37+
# search for files with the expected starting keywords : 'RISING' or 'SETTING'
38+
FILE_LIST=$(find "$subdir" -type f \( -name "RISING*" -o -name "SETTING*" \) -printf "%p ")
39+
# extract file names for printing to console
40+
echo "Files to compress: "
41+
for filename in $FILE_LIST; do
42+
echo $(basename $filename)
43+
done
44+
45+
# call compression script on the list of files
46+
date
47+
echo "Calling flac compression script ..."
48+
srun -n 128 python compress_hdf5.py --verify --obs $FILE_LIST > "log_$(basename $subdir).txt" 2>&1
49+
50+
# if python script runs without error, delete backup files
51+
if [ $? -eq 0 ]; then
52+
echo "FLAC compression script ran successfully. Deleting backup files..."
53+
if find "$subdir" -type f -name "*uncompressed.h5" -delete; then
54+
echo "Backup files deleted successfully."
55+
date
56+
else
57+
# If backup files deletion fails for some reason we stop the everything to avoid any risk of running out of disk memory.
58+
echo "Error deleting backup files. Exiting loop over observations."
59+
date
60+
break
61+
fi
62+
else
63+
echo "FLAC compression script encountered an error. Not deleting any files."
64+
date
65+
fi
66+
done
67+
68+
echo "Observation batch $START_INDEX-$END_INDEX processing in $(basename $PARENT_DIRECTORY) band done. Please verify log files."

0 commit comments

Comments
 (0)