Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 240 additions & 100 deletions src/cli/astrograms.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,116 +14,256 @@
import logging
logging.basicConfig(level=logging.INFO)

def main(infile=None, outfile=None, memory=24, cores=4, jobs=8, npartitions=None,
astrogram=True, is_long=False,
# fps=12, window=60 * 12, hop=10 * 12,
test=False, queue='test.short', binary=True, local=False, **kwargs):
assert infile is not None
assert outfile is not None
def astrogram_meta(num_bins):
return pd.DataFrame({
"site_id": pd.Series(dtype="string"),
"date": pd.Series(dtype="datetime64[ns]"),
"dddn": pd.Series(dtype='string'),
**{
i: pd.Series(dtype="f8")
for i in range(num_bins)
}
})

outfile = Path(outfile)
def astrogram(
files_ddf: dd.DataFrame,
features_ddf: dd.DataFrame,
outfile: Path,
num_bins: int = 10,
**kwargs
) -> Tuple[dd.DataFrame, dd.Scalar | None] | Tuple[pd.DataFrame, None]:
group_by = ["file_id", "timestamp", "dddn"]
non_feature_columns = ["sr", "segment_id", "segment_idx", "file_id", "duration", "offset", "frame_length", "hop_length", "n_fft", "feature_length"]
feature_columns = features_ddf.columns[~features_ddf.columns.isin(non_feature_columns)].tolist()

ddf = (
files_ddf[files_columns]
.merge(features_ddf[["file_id", *feature_columns]], on="file_id", how="left")
.assign(date=lambda df: df.timestamp.dt.date.astype("datetime64[ns]"))
.melt(id_vars=[*files_columns, *non_feature_columns], var_name='frame', value_name='value')
)

astro_ddf = (
ddf
.groupby('feature')
.apply(
bin_and_cut,
column='value',
groupby=["site_id", "date", "dddn"],
upper=0.99,
lower=0.01,
num_bins=num_bins,
reset_index=True, # ??
meta=astrogram_meta(num_bins)
)
.rename(columns={
i: str(i) for i in range(num_bins)
})
)
# CHECK: normalisation??
# ddf_astro.loc[:,'0':] = ddf_astro.loc[:,'0':].div(ddf_astro.loc[:,'0':].sum(axis=1), axis=0)

logging.info('Astrogram')
logging.debug(astro_ddf.columns)
# astro_ddf = astro_ddf.drop(columns='date')

future: dd.Scalar = astro_ddf.to_parquet(
Path(outfile),
version=PYARROW_VERSION,
allow_truncated_timestamps=True,
write_index=False,
compute=False,
)

log.info(f"Building astrograms... Will persist to {outfile}")

if compute:
dask.compute(future)
return pd.read_parquet(Path(outfile)), None

return ddf, future

def main(
files_path: Path,
features_path: Path,
cluster: str | None,
memory: int,
cores: int,
jobs: int,
queue: str,
local: bool,
threads_per_worker: int,
debug: bool,
**kwargs: Any,
) -> dd.DataFrame | None:
"""
Process audio files using the specified parameters.

Args:
root_dir: Path to the audio root directory
infile(str, required): Path to a file index generated by 'index_audio' command.
outfile (str, required): Output file path.
cluster (str, optional): Name of the cluster to use. 'arc' or 'altair' or None if local==True. Defaults to None.
memory (int, optional): Memory limit for each worker in GB. Defaults to 32.
cores (int, optional): Number of CPU cores per worker. Defaults to 8.
jobs (int, optional): Number of worker jobs to start. Defaults to 12.
segment_duration (float, optional): Segment duration for audio. Defaults to 60s.
frame (int, optional): Frame size for feature extraction. Defaults to 16000.
hop (int, optional): Hop size for feature extraction. Defaults to 4000.
n_fft (int, optional): Number of FFT points for feature extraction. Defaults to 16000.
npartitions (int, optional): Number of partitions for Dask DataFrame. Defaults to 2000.
local (bool, optional): Flag indicating whether to use a local cluster for computation.
compute (bool, optional): Flag indicating whether to persist parquet eagerly. Defaults to false.
debug (bool, optional): Flag indicating whether to run synchronously. Defaults to false.

Returns:
dd.DataFrame

Raises:
ValueError: If an error occurs during processing.

Examples:
>>> main(indir='./data/ecolistening', outfile='./data/processed/ecolistening/features.parquet',
... frame=2048, hop=512, n_fft=1024,
... local=True, compute=True)
<Client: ...
"""
if not local:
# Start cluster
cluster = AltairGridEngineCluster(cores=cores, memory=memory, queue='test',
name=None) # .short', name=None)
logging.info(cluster.job_script())
cluster = clusters[cluster](
cores=cores,
memory=memory,
queue=queue,
name=None
)
log.info(cluster.job_script())
cluster.scale(jobs=jobs)
client = Client(cluster)
else:
memory_per_worker = f'{memory}GiB'
client = Client(n_workers=cores,
threads_per_worker=1,
memory_limit=memory_per_worker)
print(client)
logging.info(client)

# Read data
df = dd.read_parquet(infile)
logging.debug(f'Partitions: {df.npartitions}')
if npartitions is not None:
df = df.repartition(npartitions=npartitions)
df = df.persist()

logging.info('Initial Load')
print('df: (',len(df),',',len(df.columns),')')
print(f'df.meta: {df._meta}')
print(f'df.columns: {df.columns}')

if is_long:
df_long = df
else:
# Melt to long form
metadata = df.iloc[:, :df.columns.get_loc('0')]
features = df.iloc[:, df.columns.get_loc('0'):]

df_long = df.melt(id_vars=metadata.columns, value_vars=features.columns,
var_name='frame', value_name='value')#.compute()

# logging.info('Melted')
# print(df_long._meta)
# print(df_long.columns)

df_long = df_long.assign(date=lambda r: r.timestamp.dt.date.astype('datetime64[ns]'))

# logging.info('Date assignment')
# print(df_long._meta)
# print(df_long.columns)

# Create astrograms
nbins = 10
meta = {'country': 'string',
'habitat code': 'string',
'recorder': 'i4',
'date': 'datetime64[ns]',
'dddn': 'string'} | dict([i, 'f8'] for i in range(nbins))

if astrogram:
groupby = ['country', 'habitat code', 'recorder', 'date', 'dddn']
else:
groupby = ['country', 'habitat code', 'recorder', 'date']
meta.pop('dddn')
if debug:
cfg.set(scheduler='synchronous')

df_astro = df_long.groupby('feature').apply(bin_and_cut, column='value',
groupby=groupby,
upper=0.99, lower=0.01, nbins=nbins, reset_index=True,
meta=meta).rename(columns={i:str(i) for i in range(10)})
# df_astro.loc[:,'0':] = df_astro.loc[:,'0':].div(df_astro.loc[:,'0':].sum(axis=1), axis=0)
client = Client(
n_workers=cores,
threads_per_worker=threads_per_worker,
memory_limit=f'{memory}GiB'
)
log.info(client)

logging.info('Astrogram')
print(df_astro._meta)
print(df_astro.columns)

logging.debug(df_astro.columns)

# df_astro = df_astro.drop(columns='date')

# Date columns might not work for this...
# # Output
# logging.info(f'Outputting ddf to parquet')
# # Dask Parquet
# dd.to_parquet(df_astro, outfile.with_stem(f'{outfile.stem}_dask'), version='2.6', allow_truncated_timestamps=True)
# logging.info(f'Finished outputting dff to parquet')
# Pandas Parquet
logging.info(f'Outputting df to pandas parquet')
df_pandas = df_astro.compute()
df_pandas.to_parquet(outfile)
start_time = time.time()

if __name__ == '__main__':
parser = DaskArgumentParser('Compute astrograms from audio feature sets', memory=48, cores=1, jobs=95,
npartitions=None, queue='test.short')
astrogram(
files_ddf=dd.read_parquet(files_path)
features_ddf=dd.read_parquet(features_path)
**kwargs,
)

# parser.add_argument('--fps', type=int, default=12, help='Number of feature frames per second.')
# parser.add_argument('--window', type=int, default=60 * 12, help='Number of feature frames for a lzc frame.')
# parser.add_argument('--hop', type=int, default=10 * 12, help='Number of feature frames for the hop.')
log.info(f"Astrograms complete")
log.info(f"Time taken: {str(dt.timedelta(seconds=time.time() - start_time))}")

astro_histo = parser.add_mutually_exclusive_group()
astro_histo.add_argument('--histogram', dest='astrogram', default=True, action='store_false')
astro_histo.add_argument('--astrogram', dest='astrogram', default=True, action='store_true')

parser.add_argument('--long', dest='is_long', default=False, action='store_true')
def get_base_parser():
parser = argparse.ArgumentParser(
description='Extract acoustic features from audio files',
add_help=False,
)
parser.add_argument(
'--files-path',
type=lambda p: Path(p).expanduser(),
default=None,
help='File index parquet'
)
parser.add_argument(
'--features-path',
type=lambda p: Path(p).expanduser(),
default=None,
help='Acoustic features parquet'
)
parser.add_argument(
'--outfile',
type=lambda p: Path(p).expanduser(),
default=None,
help='Parquet file to save results'
)
parser.add_argument(
'--cluster',
default='artemis',
help='Which cluster to use?'
)
parser.add_argument(
'--memory',
default=16,
type=int,
help='Amount of memory required in GB (total per node)'
)
parser.add_argument(
'--cores',
default=1,
type=int,
help='Number of cores per node'
)
parser.add_argument(
'--jobs',
default=4,
type=int,
help='Number of simultaneous jobs'
)
parser.add_argument(
'--queue',
default="general",
type=str,
help='SLURM job queue name',
)
parser.add_argument(
'--local',
type=bool,
default=True,
help="When true, run locally, when false, run on the cluster"
)
parser.add_argument(
"--threads-per-worker",
type=int,
help="Threads per worker",
)
parser.add_argument(
'--debug',
default=False,
action='store_true',
help='Sets single-threaded for debugging.'
)
parser.add_argument(
'--npartitions',
default=None,
type=int,
help='Number of dask partitions for the data'
)
parser.add_argument(
'--compute',
type=int,
default=1,
help='Execute the program immediately'
)
parser.set_defaults(func=main, **{
"root_dir": "/data",
"files_path": "/results/files_table.parquet",
"features_path": "/results/recording_acoustic_features_table.parquet",
"outfile": "/results/astrograms.parquet",

args = parser.parse_args()
print(args)
'local': os.environ.get("LOCAL", True),
"memory": os.environ.get("MEM_PER_CPU", 0),
"cores": os.environ.get("CORES", 0),
"threads_per_worker": os.environ.get("THREADS_PER_WORKER", 1),
})
return parser

def register_subparser(subparsers):
parser = subparsers.add_parser(
"astrograms",
help="Build astrograms from acoustic features",
parents=[get_base_parser()],
add_help=True,
)

main(**vars(args))
if __name__ == '__main__':
parser = get_base_parser()
args = parser.parse_args()
log.info(args)
args.func(**vars(args))
Loading