diff --git a/src/cli/astrograms.py b/src/cli/astrograms.py index 3df583b..8aea4c5 100644 --- a/src/cli/astrograms.py +++ b/src/cli/astrograms.py @@ -14,116 +14,256 @@ import logging logging.basicConfig(level=logging.INFO) -def main(infile=None, outfile=None, memory=24, cores=4, jobs=8, npartitions=None, - astrogram=True, is_long=False, - # fps=12, window=60 * 12, hop=10 * 12, - test=False, queue='test.short', binary=True, local=False, **kwargs): - assert infile is not None - assert outfile is not None +def astrogram_meta(num_bins): + return pd.DataFrame({ + "site_id": pd.Series(dtype="string"), + "date": pd.Series(dtype="datetime64[ns]"), + "dddn": pd.Series(dtype='string'), + **{ + i: pd.Series(dtype="f8") + for i in range(num_bins) + } + }) - outfile = Path(outfile) +def astrogram( + files_ddf: dd.DataFrame, + features_ddf: dd.DataFrame, + outfile: Path, + num_bins: int = 10, + **kwargs +) -> Tuple[dd.DataFrame, dd.Scalar | None] | Tuple[pd.DataFrame, None]: + group_by = ["file_id", "timestamp", "dddn"] + non_feature_columns = ["sr", "segment_id", "segment_idx", "file_id", "duration", "offset", "frame_length", "hop_length", "n_fft", "feature_length"] + feature_columns = features_ddf.columns[~features_ddf.columns.isin(non_feature_columns)].tolist() + ddf = ( + files_ddf[files_columns] + .merge(features_ddf[["file_id", *feature_columns]], on="file_id", how="left") + .assign(date=lambda df: df.timestamp.dt.date.astype("datetime64[ns]")) + .melt(id_vars=[*files_columns, *non_feature_columns], var_name='frame', value_name='value') + ) + + astro_ddf = ( + ddf + .groupby('feature') + .apply( + bin_and_cut, + column='value', + groupby=["site_id", "date", "dddn"], + upper=0.99, + lower=0.01, + num_bins=num_bins, + reset_index=True, # ?? + meta=astrogram_meta(num_bins) + ) + .rename(columns={ + i: str(i) for i in range(num_bins) + }) + ) + # CHECK: normalisation?? + # ddf_astro.loc[:,'0':] = ddf_astro.loc[:,'0':].div(ddf_astro.loc[:,'0':].sum(axis=1), axis=0) + + logging.info('Astrogram') + logging.debug(astro_ddf.columns) + # astro_ddf = astro_ddf.drop(columns='date') + + future: dd.Scalar = astro_ddf.to_parquet( + Path(outfile), + version=PYARROW_VERSION, + allow_truncated_timestamps=True, + write_index=False, + compute=False, + ) + + log.info(f"Building astrograms... Will persist to {outfile}") + + if compute: + dask.compute(future) + return pd.read_parquet(Path(outfile)), None + + return ddf, future + +def main( + files_path: Path, + features_path: Path, + cluster: str | None, + memory: int, + cores: int, + jobs: int, + queue: str, + local: bool, + threads_per_worker: int, + debug: bool, + **kwargs: Any, +) -> dd.DataFrame | None: + """ + Process audio files using the specified parameters. + + Args: + root_dir: Path to the audio root directory + infile(str, required): Path to a file index generated by 'index_audio' command. + outfile (str, required): Output file path. + cluster (str, optional): Name of the cluster to use. 'arc' or 'altair' or None if local==True. Defaults to None. + memory (int, optional): Memory limit for each worker in GB. Defaults to 32. + cores (int, optional): Number of CPU cores per worker. Defaults to 8. + jobs (int, optional): Number of worker jobs to start. Defaults to 12. + segment_duration (float, optional): Segment duration for audio. Defaults to 60s. + frame (int, optional): Frame size for feature extraction. Defaults to 16000. + hop (int, optional): Hop size for feature extraction. Defaults to 4000. + n_fft (int, optional): Number of FFT points for feature extraction. Defaults to 16000. + npartitions (int, optional): Number of partitions for Dask DataFrame. Defaults to 2000. + local (bool, optional): Flag indicating whether to use a local cluster for computation. + compute (bool, optional): Flag indicating whether to persist parquet eagerly. Defaults to false. + debug (bool, optional): Flag indicating whether to run synchronously. Defaults to false. + + Returns: + dd.DataFrame + + Raises: + ValueError: If an error occurs during processing. + + Examples: + >>> main(indir='./data/ecolistening', outfile='./data/processed/ecolistening/features.parquet', + ... frame=2048, hop=512, n_fft=1024, + ... local=True, compute=True) + np.ndarray: + Generate bin edges for a given column of data in a pandas DataFrame. + +cut(r, column, bins) -> pd.Series: + Bin values in a column of data in a pandas DataFrame and count them. + +bin_and_cut(r, column, groupby=['country', 'location', 'dddn'], upper=0.95, lower=0.05, nbins=10, reset_index=False) -> pd.DataFrame: + Apply the bin and cut functions to multiple groups in a pandas DataFrame. + +time_of_day_heatmap(drop='feature', index='dddn', *args, **kwargs) -> None: + Create a heatmap of data grouped by time of day. + +""" + +from typing import List + +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import seaborn as sns + + +def bin(r: pd.DataFrame, column: str, upper: float = 0.95, lower: float = 0.05, nbins: int = 10) -> np.ndarray: + """ + Compute the bin edges based on even spacing between upper and lower quantiles of the entire dataframe. + + Parameters + ---------- + r : pd.DataFrame + Input data to compute the bins for. + column : str + Column name in the input data frame to compute the bins for. + upper : float, optional + The upper quantile to consider for binning (default is 0.95). + lower : float, optional + The lower quantile to consider for binning (default is 0.05). + nbins : int, optional + The number of bins to compute (default is 10). + + Returns + ------- + np.ndarray + The computed bin edges. + + Examples + -------- + >>> import pandas as pd + >>> data = pd.DataFrame({'col': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]}) + >>> bin(data, 'col', upper=0.9, lower=0.1, nbins=3) + array([-inf, 2.6, 5.2, inf]) + >>> bin(data, 'col', upper=0.8, lower=0.2, nbins=4) + array([-inf, 3.4, 5.8, 8.2, inf]) + """ + bins = np.linspace(*r[column].quantile([lower, upper]), nbins + 1) + bins[0] = -np.inf + bins[-1] = np.inf + + return bins + + +def cut(r: pd.DataFrame, column: str, bins: np.ndarray) -> pd.DataFrame: + """ + Cut the data into bins based on the computed bin edges. + + Parameters + ---------- + r : pd.DataFrame + Input data to cut into bins. + column : str + Column name in the input data frame to cut. + bins : np.ndarray + The bin edges computed using the `bin` function. + + Returns + ------- + pd.DataFrame + The data binned into the specified bins. + """ + r = pd.cut(x=r[column], bins=bins).value_counts().sort_index().reset_index(drop=True) + r.index.rename('bin') + return r + + +def bin_and_cut(r: pd.DataFrame, column: str, groupby: List[str] = ['country', 'location', 'dddn'], + upper: float = 0.95, lower: float = 0.05, nbins: int = 10, reset_index: bool = False) -> pd.DataFrame: + """ + Compute the bins and cut the data into the bins based on the computed bin edges. + + Parameters + ---------- + r : pd.DataFrame + Input data to compute the bins and cut into bins. + column : str + Column name in the input data frame to compute the bins and cut into bins. + groupby : List[str], optional + The columns to group the input data by before computing the bins (default is ['country','location','dddn']). + upper : float, optional + The upper quantile to consider for binning (default is 0.95). + lower : float, optional + The lower quantile to consider for binning (default is 0.05). + nbins : int, optional + The number of bins to compute (default is 10). + reset_index : bool, optional + Whether to reset the index of the resulting data frame (default is False). + + Returns + ------- + pd.DataFrame + The data binned into the specified bins. + """ + try: + # Compute the bins based on the upper and lower values of the entire (ungrouped) dataframe. + bins = bin(r, column, upper=upper, lower=lower, nbins=nbins) + binned = r.groupby(by=groupby).apply(cut, column=column, bins=bins) + binned = binned.div(binned.sum(axis=1), axis=0) + if reset_index: + return binned.reset_index() + return binned + except ValueError as e: + print(e) + + +def time_of_day_heatmap(drop: str = 'feature', index: str = 'dddn', *args, **kwargs) -> None: + """ + Create a heatmap plot of time series data aggregated by hour of the day. + + Parameters + ---------- + drop : str, optional + The column to drop from the plot, by default 'feature' + index : str, optional + The column to use as the index, by default 'dddn' + args, kwargs + Additional arguments and keyword arguments passed to `sns.heatmap()`. + + Returns + ------- + None + + Raises + ------ + TypeError + If `data` argument is not provided. + + Example + ------- + >>> import pandas as pd + >>> import numpy as np + >>> import seaborn as sns + >>> from matplotlib import pyplot as plt + >>> from typing import Tuple + >>> + >>> def generate_data() -> Tuple[pd.DataFrame, str]: + ... index = pd.date_range('2022-01-01', '2022-01-02', freq='30T') + ... df = pd.DataFrame({ + ... 'date': index, + ... 'value': np.random.rand(len(index)), + ... 'category': np.random.choice(['A', 'B', 'C'], len(index)) + ... }) + ... return df, 'date' + >>> + >>> data, index = generate_data() + >>> time_of_day_heatmap(data=data, index=index, cmap='YlOrRd', annot=True) + + """ + data = kwargs.pop('data', None) + if data is None: + raise TypeError("Missing required keyword argument: 'data'") + + heatmap_data = data.drop(columns=drop).set_index(index).mul(100).transpose().iloc[::-1] + sns.heatmap(heatmap_data, ax=plt.gca(), square=True, cbar=False, *args, **kwargs)