diff --git a/CHANGELOG.md b/CHANGELOG.md index 485b063..421c3dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,15 @@ This file documents the changes to the features for supported feature versions. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). +## [0.3.0] + +### Added +- Added SDSC utils for generating task files. +- Added luigi dependency for workflow management +- Added bad alpha filtering in aggregation pipeline to improve data quality + +### Fixed +- `features.voltage_features_set` returns features by categories, sorted as the pydantic model definitions ## [0.2.2] - 2025-09-25 ### Added diff --git a/FEATURES_CHANGELOG.md b/FEATURES_CHANGELOG.md index cbe525d..245212b 100644 --- a/FEATURES_CHANGELOG.md +++ b/FEATURES_CHANGELOG.md @@ -6,6 +6,8 @@ The features are released when necessary with the following tag: yyyy_Www This file documents the changes to the features for supported feature versions. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). +## [2025_W43] + - Added bad alpha filtering in aggregation pipeline to improve data quality ## [2025_W39] diff --git a/examples/train_region_predictor_gradient_boosting.py b/examples/train_region_predictor_gradient_boosting.py index 6cc7749..15200e5 100644 --- a/examples/train_region_predictor_gradient_boosting.py +++ b/examples/train_region_predictor_gradient_boosting.py @@ -7,24 +7,24 @@ import sklearn.metrics from xgboost import XGBClassifier # pip install xgboost # https://xgboost.readthedocs.io/en/stable/prediction.html - +import xgboost import iblutil.numerical import ephysatlas.anatomy import ephysatlas.data import ephysatlas.fixtures import ephysatlas.regionclassifier -VINTAGE = '2024_W50' -VINTAGE = '2025_W28' -path_features = Path(f'/Users/olivier/Documents/datadisk/ephys-atlas-decoding/features/{VINTAGE}') # mac -path_features = Path(f'/mnt/s0/ephys-atlas-decoding/features/{VINTAGE}') # parede -path_features = Path(f'/datadisk/Data/paper-ephys-atlas/ephys-atlas-decoding/features/{VINTAGE}') # ferret +PROJECT = 'ea_active' +VINTAGE = '2025_W39' +LOWQ = ephysatlas.fixtures.misaligned_pids + +root_path_features = Path(f'/datadisk/Data/paper-ephys-atlas/ephys-atlas-decoding/features') # ferret +path_features = root_path_features.joinpath(PROJECT, VINTAGE, 'agg_full') if not path_features.exists(): from one.api import ONE one = ONE() - ephysatlas.data.download_tables(path_features.parent, label=VINTAGE, one=one) -LOWQ = ephysatlas.fixtures.misaligned_pids + ephysatlas.data.download_tables(path_features, label=VINTAGE, one=one) brain_atlas = ephysatlas.anatomy.ClassifierAtlas() @@ -42,6 +42,26 @@ # "micro-manipulator", ] x_list = ephysatlas.features.voltage_features_set(FEATURE_SET) +# x_list = list(set(x_list) - set( +# ['aperiodic_exponent', +# 'aperiodic_offset', +# 'decay_fit_error', +# 'decay_fit_r_squared', +# 'decay_n_peaks', +# 'psd_residual_alpha', +# 'psd_residual_beta', +# 'psd_residual_delta', +# 'psd_residual_gamma', +# 'psd_residual_lfp', +# 'psd_residual_theta'])) + + +# x_list = list(set(x_list) - set( +# ['decay_fit_error', +# 'decay_fit_r_squared' +# ])) + + x_list.append("outside") TRAIN_LABEL = "Cosmos_id" # ['Beryl_id', 'Cosmos_id'] @@ -56,10 +76,10 @@ def train(test_idx, fold_label=''): train_idx = ~test_idx print(f"{fold_label}: {df_features.shape[0]} channels", f'training set {np.sum(test_idx) / test_idx.size}') df_features.loc[train_idx, :].groupby(TRAIN_LABEL).count() - x_train = df_features.loc[train_idx, x_list].values - x_test = df_features.loc[test_idx, x_list].values - y_train = df_features.loc[train_idx, TRAIN_LABEL].values - y_test = df_features.loc[test_idx, TRAIN_LABEL].values + x_train = df_features.loc[train_idx, x_list].values.astype(float) + x_test = df_features.loc[test_idx, x_list].values.astype(float) + y_train = df_features.loc[train_idx, TRAIN_LABEL].values.astype(float) + y_test = df_features.loc[test_idx, TRAIN_LABEL].values.astype(float) df_test = df_features.loc[test_idx, :].copy() classes = np.unique(df_features.loc[train_idx, TRAIN_LABEL]) @@ -82,6 +102,7 @@ def train(test_idx, fold_label=''): print(f"{fold_label} Accuracy: {accuracy}") np.testing.assert_array_equal(classes, rids) + return classifier.predict_proba(x_test), classifier, accuracy, confusion_matrix @@ -124,7 +145,7 @@ def train(test_idx, fold_label=''): accuracy = sklearn.metrics.accuracy_score(df_features[TRAIN_LABEL].values, df_predictions['prediction'].values.astype(int)) -sklearn.metrics.ConfusionMatrixDisplay.from_predictions(df_features[TRAIN_LABEL].values, df_predictions['prediction'].values.astype(int), normalize='true', cmap='Blues') +sklearn.metrics.ConfusionMatrixDisplay.from_predictions(df_features[TRAIN_LABEL].values, df_predictions['prediction'].values.astype(int), normalize='true', cmap='Blues', im_kw=dict(vmax=.75)) _, classifier, _, _ = train(test_idx=np.zeros(df_features.shape[0], dtype=bool)) meta = dict( RANDOM_SEED=rs, diff --git a/pyproject.toml b/pyproject.toml index e8289b6..79f83ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,8 @@ dependencies = [ "spikeinterface>=0.101.2", "xgboost>=3.0.1", "joblib==1.5.1", - "iblscripts @ git+https://github.com/int-brain-lab/iblscripts.git" + "iblscripts @ git+https://github.com/int-brain-lab/iblscripts.git", + "luigi>=3.6.0", ] [build-system] diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index c13e426..0000000 --- a/requirements.txt +++ /dev/null @@ -1,25 +0,0 @@ ---extra-index-url https://download.pytorch.org/whl/cpu -dartsort @ git+https://github.com/cwindolf/dartsort@iblsorter -dredge @ git+https://github.com/evarol/dredge@v0.2.2 -h5py -hdbscan -iblatlas >= 0.3.0 -ibllib -ibl_style -linear_operator -matplotlib -mock -numba -numpy >= 2.0 -opt-einsum -pandas -pandera -pydantic -pytest -scikit-learn -scipy >= 1.13 -seaborn -spikeinterface -torch >= 2.0 -tqdm -xgboost \ No newline at end of file diff --git a/src/ephysatlas/__init__.py b/src/ephysatlas/__init__.py index 35d5975..344c967 100644 --- a/src/ephysatlas/__init__.py +++ b/src/ephysatlas/__init__.py @@ -10,4 +10,4 @@ >>> from ephysatlas import features, aggregation, anatomy """ -__version__ = "0.2.2" +__version__ = "0.3.0" diff --git a/src/ephysatlas/aggregation.py b/src/ephysatlas/aggregation.py index adb6c97..933a9bf 100644 --- a/src/ephysatlas/aggregation.py +++ b/src/ephysatlas/aggregation.py @@ -300,6 +300,7 @@ def get_aggregated_features_per_pid(snippet_df_per_pid: pd.DataFrame): Note: - The function requires that all rows have the same PID value. - Channel metadata is loaded from channels.pqt in the snippet directory's parent. + - Channels with bad alpha are filtered out. And those values are set to NaN. - The result includes both aggregated features and channel position information. """ # Ensure only one PID is present in the DataFrame @@ -319,6 +320,16 @@ def get_aggregated_features_per_pid(snippet_df_per_pid: pd.DataFrame): # Reset the index to make pid and channel regular columns agg_df_per_pid = agg_df_per_pid.reset_index() + #Filter out the channels with bad alpha + ialpha_bad = np.logical_or( + agg_df_per_pid['alpha_mean'].values >= (1e3 * np.nanmedian(agg_df_per_pid['alpha_mean'])), + agg_df_per_pid['alpha_std'].values >= 1e3 * np.nanmedian(agg_df_per_pid['alpha_std']) + ) + # then we join with the channel information to get coordinates and anatomical information + logger.info(f"Number of channels with bad alpha: {np.sum(ialpha_bad)}") + agg_df_per_pid.loc[ialpha_bad, 'alpha_mean'] = np.nan + agg_df_per_pid.loc[ialpha_bad, 'alpha_std'] = np.nan + # Load channel metadata (axial and lateral positions) from channels.pqt # Construct the path to the snippet directory to find its parent snippet_level_dir = Path(snippet_df_per_pid["base_level_dir"].iloc[0]) / Path( diff --git a/src/ephysatlas/data.py b/src/ephysatlas/data.py index ac9d23f..68f4ce7 100644 --- a/src/ephysatlas/data.py +++ b/src/ephysatlas/data.py @@ -410,6 +410,7 @@ def read_features_from_disk( if strict: df_features = pd.DataFrame(ephysatlas.features.ModelRawFeatures(df_features)) + return df_features diff --git a/src/ephysatlas/features.py b/src/ephysatlas/features.py index 0884c01..c6f0750 100644 --- a/src/ephysatlas/features.py +++ b/src/ephysatlas/features.py @@ -133,7 +133,7 @@ logger = logging.getLogger(__name__) __features_version__ = ( - "2025.09.18" # this is the version of this feature extractor code + "2025.10.24" # this is the version of this feature extractor code ) @@ -284,24 +284,23 @@ class ModelLfFeatures(BaseChannelFeatures): rms_lf: Series[float] = pa.Field( coerce=True, metadata={"transform": lambda x: 20 * np.log10(x)} ) + psd_lfp: Series[float] = pa.Field(coerce=True) psd_delta: Series[float] = pa.Field(coerce=True) psd_theta: Series[float] = pa.Field(coerce=True) psd_alpha: Series[float] = pa.Field(coerce=True) psd_beta: Series[float] = pa.Field(coerce=True) psd_gamma: Series[float] = pa.Field(coerce=True) - psd_lfp: Series[float] = pa.Field(coerce=True) - aperiodic_offset: Optional[Series[float]] = pa.Field(coerce=True) - aperiodic_exponent: Optional[Series[float]] = pa.Field(coerce=True) - decay_fit_error: Optional[Series[float]] = pa.Field(coerce=True) - decay_fit_r_squared: Optional[Series[float]] = pa.Field(coerce=True) - decay_n_peaks: Optional[Series[int]] = pa.Field(coerce=True) + psd_residual_lfp: Optional[Series[float]] = pa.Field(coerce=True, nullable=True) psd_residual_delta: Optional[Series[float]] = pa.Field(coerce=True, nullable=True) psd_residual_theta: Optional[Series[float]] = pa.Field(coerce=True, nullable=True) psd_residual_alpha: Optional[Series[float]] = pa.Field(coerce=True, nullable=True) psd_residual_beta: Optional[Series[float]] = pa.Field(coerce=True, nullable=True) psd_residual_gamma: Optional[Series[float]] = pa.Field(coerce=True, nullable=True) - psd_residual_lfp: Optional[Series[float]] = pa.Field(coerce=True, nullable=True) - + aperiodic_offset: Optional[Series[float]] = pa.Field(coerce=True) + aperiodic_exponent: Optional[Series[float]] = pa.Field(coerce=True) + decay_fit_error: Optional[Series[float]] = pa.Field(coerce=True) + decay_fit_r_squared: Optional[Series[float]] = pa.Field(coerce=True) + decay_n_peaks: Optional[Series[int]] = pa.Field(coerce=True) class ModelCsdFeatures(BaseChannelFeatures): """Schema for current source density features. @@ -491,40 +490,15 @@ def voltage_features_set(features_list=FEATURES_LIST): for feature_group in features_list: match feature_group: case "raw_ap": - x_list += sorted( - list( - set(ModelApFeatures.to_schema().columns.keys()) - - set(["channel"]) - ) - ) + x_list += list(ModelApFeatures.to_schema().columns.keys()) case "raw_lf": - x_list += sorted( - list( - set(ModelLfFeatures.to_schema().columns.keys()) - - set(["channel"]) - ) - ) + x_list += list(ModelLfFeatures.to_schema().columns.keys()) case "raw_lf_csd": - x_list += sorted( - list( - set(ModelCsdFeatures.to_schema().columns.keys()) - - set(["channel"]) - ) - ) + x_list += list(ModelCsdFeatures.to_schema().columns.keys()) case "waveforms": - x_list += sorted( - list( - set(ModelSpikeFeatures.to_schema().columns.keys()) - - set(["channel"]) - ) - ) + x_list += list(ModelSpikeFeatures.to_schema().columns.keys()) case "micro-manipulator": - x_list += sorted( - list( - set(ModelHistologyPlanned.to_schema().columns.keys()) - - set(["channel"]) - ) - ) + x_list += list(ModelHistologyPlanned.to_schema().columns.keys()) return x_list diff --git a/src/ephysatlas/sdsc/Runprogram_template.sh b/src/ephysatlas/sdsc/Runprogram_template.sh new file mode 100644 index 0000000..3822952 --- /dev/null +++ b/src/ephysatlas/sdsc/Runprogram_template.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +pid=$2 +start_time=$8 +duration=${10} + +module purge + +source {VENV_PATH} + +pid_dir={OUTPUT_DIR}/logs/${pid} +#Create a pid directory if it doesn't exist +mkdir -p ${pid_dir} + +python {OUTPUT_DIR}/computation.py "$@" > ${pid_dir}/${start_time}_${duration}.log 2>&1 diff --git a/src/ephysatlas/sdsc/__init__.py b/src/ephysatlas/sdsc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ephysatlas/sdsc/computation_template.py b/src/ephysatlas/sdsc/computation_template.py new file mode 100644 index 0000000..badd1e7 --- /dev/null +++ b/src/ephysatlas/sdsc/computation_template.py @@ -0,0 +1,46 @@ +# Script to launch feature calculations for one pid. + +from pathlib import Path +from deploy.iblsdsc import OneSdsc +import argparse +from ephysatlas.feature_computation import compute_features_from_pid + +import logging + +logging.basicConfig(level=logging.INFO) + +# Constants +DURATION = 5 # Duration in seconds +OUTPUT_DIR = Path("/mnt/sdceph/users/prai1/data/projects/psychedlics/output/") + +def calc_features(probe_dict): + + print(f"Processing probe {probe_dict['pid'] , probe_dict['t_start']}") + one = OneSdsc(mode='local' , cache_rest=None) + # assert isinstance(one, OneSdsc), "one must be an instance of OneSdsc" + compute_features_from_pid(**probe_dict, one = one, output_dir=OUTPUT_DIR, scratch_dir="/scratch/prai1/dartsort") + + +def main(): + parser = argparse.ArgumentParser(description='Create task file for feature computation') + parser.add_argument('--pid', type=str, required=True, help='Probe ID') + parser.add_argument('--eid', type=str, required=True, help='Experiment ID') + parser.add_argument('--probe_name', type=str, required=True, help='Probe name') + parser.add_argument('--start_time', type=float, required=True, help='Start time of the passive period') + parser.add_argument('--duration', type=float, default=DURATION, help=f'Duration of each snippet (default: {DURATION})') + + args = parser.parse_args() + + # Create probe dictionary + probe_dict = { + 'pid': args.pid, + 'eid': args.eid, + 'probe_name': args.probe_name.lower(), + 't_start': args.start_time, + 'duration': args.duration + } + # Calculate features + calc_features(probe_dict) + +if __name__ == "__main__": + main() diff --git a/src/ephysatlas/sdsc/sdsc_utils.py b/src/ephysatlas/sdsc/sdsc_utils.py new file mode 100644 index 0000000..681c4d8 --- /dev/null +++ b/src/ephysatlas/sdsc/sdsc_utils.py @@ -0,0 +1,314 @@ + +import luigi +import pandas as pd +from pathlib import Path +import os +import logging +import numpy as np + +# Meta variables for the workflow +PROJECT_NAME = "test_project" # Change this to your project name +OUTPUT_DIR = Path(f"/mnt/sdceph/users/prai1/data/projects/{PROJECT_NAME}") +VENV_PATH = "/mnt/home/prai1/projects/passive_ephys/.venv/bin/activate" +TEMPLATE_COMPUTATION_PATH = Path(__file__).parent / "computation_template.py" +TEMPLATE_RUNPROGRAM_PATH = Path(__file__).parent / "Runprogram_template.sh" + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class GetPidList(luigi.Task): + """Task to get list of pids, eids and probe_names and save as CSV.""" + + def output(self): + return luigi.LocalTarget(str(OUTPUT_DIR / "pids_eids_probes.csv")) + + def run(self): + """Get the list of pids, eids and probe_names.""" + logger.info("Getting PID list...") + + # Call your implemented function + df = get_pid_list() + + # Validate output data types + required_columns = ['pid', 'eid', 'probe_name'] + if not all(col in df.columns for col in required_columns): + raise ValueError(f"Missing required columns: {required_columns}") + + # Validate data types + #TODO + + # Ensure output directory exists + OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + + # Save to CSV + df.to_csv(self.output().path, index=False) + logger.info(f"PID list saved to {self.output().path}") + + +class CreateSnippetsFile(luigi.Task): + """Task to create snippets dataframe with t_starts and duration.""" + + def requires(self): + return GetPidList() + + def output(self): + return luigi.LocalTarget(str(OUTPUT_DIR / f"{PROJECT_NAME}_snippets_df.csv")) + + def run(self): + """Create snippets dataframe from PID list.""" + logger.info("Creating snippets file...") + + # Read the input CSV + df = pd.read_csv(self.input().path) + + # Call your implemented function + snippets_df = create_snippets_file(df) + + # Validate output data types + required_columns = ['pid', 'eid', 'probe_name', 't_start', 'duration'] + if not all(col in snippets_df.columns for col in required_columns): + raise ValueError(f"Missing required columns: {required_columns}") + + # Validate data types + #TODO + + # Save to CSV + snippets_df.to_csv(self.output().path, index=False) + logger.info(f"Snippets file saved to {self.output().path}") + + +class CreateComputationFile(luigi.Task): + """Task to create computation.py file from template.""" + + def output(self): + return luigi.LocalTarget(str(OUTPUT_DIR / "computation.py")) + + def run(self): + """Create computation.py file from template.""" + logger.info("Creating computation file...") + + # Call your implemented function + computation_content = create_computation_file() + + # Validate output + if not isinstance(computation_content, str): + raise ValueError("Computation file content should be a string") + + # Ensure OUTPUT_DIR exists + OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + + # Write the file + with open(self.output().path, 'w') as f: + f.write(computation_content) + + logger.info(f"Computation file created at {self.output().path}") + + +class CreateRunProgramFile(luigi.Task): + """Task to create Runprogram.sh file.""" + + def requires(self): + return CreateComputationFile() + + def output(self): + return luigi.LocalTarget(str(OUTPUT_DIR / "Runprogram.sh")) + + def run(self): + """Create Runprogram.sh file from template.""" + logger.info("Creating Runprogram.sh file...") + + # Read the template file + template_path = TEMPLATE_RUNPROGRAM_PATH + if not template_path.exists(): + raise FileNotFoundError(f"Runprogram template file not found at {template_path}") + + with open(template_path, 'r') as f: + run_program_template = f.read() + + # Replace placeholders in the template + run_program_content = run_program_template.replace( + '{VENV_PATH}', VENV_PATH + ).replace( + '{OUTPUT_DIR}', str(OUTPUT_DIR) + ) + + # Ensure OUTPUT_DIR exists + OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + + # Write the file + with open(self.output().path, 'w') as f: + f.write(run_program_content) + + # Make it executable + os.chmod(self.output().path, 0o755) + + logger.info(f"Runprogram.sh created at {self.output().path}") + + +class CreateTaskFile(luigi.Task): + """Task to create the final task file.""" + + def requires(self): + return { + 'snippets': CreateSnippetsFile(), + 'run_program': CreateRunProgramFile() + } + + def output(self): + return luigi.LocalTarget(str(OUTPUT_DIR / "Full_Task_file")) + + def run(self): + """Create the task file.""" + logger.info("Creating task file...") + + # Read snippets file + snippets_df = pd.read_csv(self.input()['snippets'].path) + + # Get run program path + run_program_path = self.input()['run_program'].path + + run_program_path = Path(run_program_path).as_posix() + + outp_file = OUTPUT_DIR / 'Full_Task_file' + + # Call your implemented function + _ = create_task_file(snippets_df, outp_file, run_program_path) + + # Validate output + if not Path(outp_file).exists(): + raise ValueError("Task file was not created") + + logger.info(f"Task file created at {self.output().path}") + + +class WorkflowPipeline(luigi.Task): + """Main workflow pipeline that runs all tasks.""" + + def requires(self): + return CreateTaskFile() + + def output(self): + return luigi.LocalTarget(str(OUTPUT_DIR / "workflow_complete.txt")) + + def run(self): + """Run the complete workflow.""" + logger.info("Workflow pipeline completed successfully!") + + # Create a completion marker + with open(self.output().path, 'w') as f: + f.write("Workflow completed successfully\n") + f.write(f"Project: {PROJECT_NAME}\n") + f.write(f"Output directory: {OUTPUT_DIR}\n") + f.write("Generated files:\n") + f.write(" - pids_eids_probes.csv\n") + f.write(f" - {PROJECT_NAME}_snippets_df.csv\n") + f.write(" - computation.py\n") + f.write(" - Runprogram.sh\n") + f.write(" - Full_Task_file\n") + + +# Placeholder functions - implement these with your actual logic +def get_pid_list() -> pd.DataFrame: + """ + Get the list of pids, eids and probe_names. + + Returns: + pd.DataFrame: DataFrame with columns ['pid', 'eid', 'probe_name'] + """ + + from one.api import ONE + + one = ONE() + # Rest query for getting psychedelics insertions + insertions = one.alyx.rest('insertions', 'list', django='session__projects__name__icontains,psychedelics') + + # Get pids from insertions + _, alyx_pids = [item["id"] for item in insertions], insertions + + # Get the corresponding eids, and probe_names + df= pd.DataFrame([{'pid':val['id'],'eid':val['session'],'probe_name':val['name']} for val in alyx_pids],columns=['pid','eid','probe_name']) + + #TODO - Add exclude pids + return df + + +def create_snippets_file(df: pd.DataFrame) -> pd.DataFrame: + """ + Create snippets dataframe from PID list with t_starts and duration. + """ + DURATION = 5 + list_df = [] + for index, row in df.iterrows(): + df_temp = pd.DataFrame(columns=['pid', 'eid', 'probe_name', 'snippet_index', 't_start', 'duration']) + # Set the start times for the snippets. + start_times = np.array([300,600,900,1200,1500]) + df_temp['snippet_index'] = np.arange(len(start_times)) + df_temp['t_start'] = start_times + df_temp['pid'] = row['pid'] + df_temp['eid'] = row['eid'] + df_temp['probe_name'] = row['probe_name'] + df_temp['duration'] = DURATION + list_df.append(df_temp) + + df = pd.concat(list_df) + return df + # df.to_csv(OUTPUT_DIR / 'psychedlics_snippets_df.csv', index=False) + + +def create_computation_file() -> str: + """ + Create computation.py file content from template. + + Returns: + str: Content of the computation.py file + """ + # Read the template file + template_path = TEMPLATE_COMPUTATION_PATH + if template_path.exists(): + with open(template_path, 'r') as f: + template_content = f.read() + + # Replace the OUTPUT_DIR in the template with the actual OUTPUT_DIR + computation_content = template_content.replace( + 'OUTPUT_DIR = Path("/mnt/sdceph/users/prai1/data/projects/psychedlics/output/")', + f'OUTPUT_DIR = Path("{OUTPUT_DIR}/output/")' + ) + return computation_content + else: + raise FileNotFoundError(f"Template file not found at {template_path}") + +def create_task_file(inp_file, outp_file, run_program_path="Runprogram.sh"): + # Read the CSV file + + if isinstance(inp_file, pd.DataFrame): + df = inp_file + elif isinstance(inp_file, (str,Path)): + df = pd.read_csv(inp_file) + else: + raise ValueError(f"Invalid input type: {type(inp_file)}") + + # Create the task file + task_file_path = outp_file + + # Generate command lines for each row + with open(task_file_path, 'w') as f: + for _, row in df.iterrows(): + command = f"source {run_program_path} --pid {row['pid']} --eid {row['eid']} --probe_name {row['probe_name']} --start_time {row['t_start']} --duration {row['duration']}\n" + f.write(command) + + print(f"Task file created at: {task_file_path}") + + +# Function to run the workflow +def run_workflow(): + """ + Run the complete workflow pipeline. + """ + + luigi.build([WorkflowPipeline()], local_scheduler=True) + +if __name__ == "__main__": + run_workflow() +