diff --git a/examples/ripley_l_params.json b/examples/ripley_l_params.json new file mode 100644 index 00000000..c666f59a --- /dev/null +++ b/examples/ripley_l_params.json @@ -0,0 +1,14 @@ +{ + "input_data": "data/mock_input.pickle", + "radii": [0, 50, 100, 150, 200, 250, 300], + "annotation": "renamed_phenotypes", + "phenotypes": ["B cells", "CD8 T cells"], + "regions": null, + "n_simulations": 100, + "area": null, + "seed": 42, + "spatial_key": "spatial", + "edge_correction": true, + "output_path": "data/ripley_output.pickle" +} + diff --git a/src/spac/templates/__init__.py b/src/spac/templates/__init__.py new file mode 100644 index 00000000..89c61771 --- /dev/null +++ b/src/spac/templates/__init__.py @@ -0,0 +1,13 @@ +""" +Canonical SPAC template sub‑package. + +Each template is a self‑contained module that + • reads parameters from JSON/dict + • runs a SPAC analysis function + • returns / saves results + +Available templates +------------------- +- ripley_l_template.run_from_json +""" + diff --git a/src/spac/templates/ripley_l_template.py b/src/spac/templates/ripley_l_template.py new file mode 100644 index 00000000..f742f1f4 --- /dev/null +++ b/src/spac/templates/ripley_l_template.py @@ -0,0 +1,242 @@ +""" +Platform-agnostic Ripley-L template converted from NIDAP. +Maintains the exact logic from the NIDAP template. + +Usage +----- +>>> from spac.templates.ripley_l_template import run_from_json +>>> run_from_json("examples/ripley_l_params.json") +""" +import json +import sys +from pathlib import Path +from typing import Any, Dict, Union, List +import pandas as pd + +# Add parent directory to path for imports +sys.path.append(str(Path(__file__).parent.parent)) + +from spac.spatial_analysis import ripley_l +from spac.templates.template_utils import ( + load_input, + save_outputs, + parse_params, + text_to_value, +) + + +# def _prepare_ripley_uns_for_h5ad(adata) -> None: +# """ +# Minimal fix for ripley_l results serialization. +# Converts object columns to appropriate types for H5AD storage. +# """ +# if "ripley_l" not in adata.uns: +# return + +# rl = adata.uns.get("ripley_l") +# if isinstance(rl, pd.DataFrame): +# # Create a copy to avoid modifying the original +# clean_df = rl.copy() + +# # Process each object column +# for col in clean_df.columns: +# if clean_df[col].dtype == "object": +# # Try to convert to numeric +# try: +# clean_df[col] = pd.to_numeric( +# clean_df[col], errors='raise' +# ) +# except Exception: +# # If that fails, convert to string +# clean_df[col] = clean_df[col].astype(str) + +# # Replace with cleaned version +# adata.uns["ripley_l"] = clean_df + +def _prepare_ripley_uns_for_h5ad(adata) -> None: + """ + Enhanced fix for ripley_l results serialization. + Ensures proper data types and structure for H5AD storage. + """ + if "ripley_l" not in adata.uns: + return + + rl = adata.uns.get("ripley_l") + + # Handle case where ripley_l might be a string (corrupted data) + if isinstance(rl, str): + print(f"Warning: ripley_l data appears to be corrupted (string): {rl[:100]}...") + # Try to reconstruct or remove corrupted data + del adata.uns["ripley_l"] + print("Removed corrupted ripley_l data from adata.uns") + return + + # Handle DataFrame case + if isinstance(rl, pd.DataFrame): + # Create a copy to avoid modifying the original + clean_df = rl.copy() + + # Process each object column + for col in clean_df.columns: + if clean_df[col].dtype == "object": + # Check if column contains dictionaries or other complex objects + sample_val = clean_df[col].iloc[0] if len(clean_df) > 0 else None + + if isinstance(sample_val, dict): + # Convert dict columns to JSON strings for H5AD compatibility + clean_df[col] = clean_df[col].apply(lambda x: json.dumps(x) if isinstance(x, dict) else str(x)) + else: + # Try to convert to numeric first + try: + clean_df[col] = pd.to_numeric(clean_df[col], errors='raise') + except (ValueError, TypeError): + # If that fails, convert to string + clean_df[col] = clean_df[col].astype(str) + + # Replace with cleaned version + adata.uns["ripley_l"] = clean_df + print(f"Cleaned ripley_l DataFrame with shape {clean_df.shape}") + + # Handle dictionary case + elif isinstance(rl, dict): + clean_dict = {} + for key, value in rl.items(): + if isinstance(value, (pd.DataFrame, pd.Series)): + # Convert pandas objects to dictionaries + clean_dict[key] = value.to_dict() if hasattr(value, 'to_dict') else str(value) + elif isinstance(value, (list, tuple)): + # Ensure lists contain serializable types + clean_dict[key] = [str(item) if not isinstance(item, (int, float, str, bool)) else item for item in value] + elif isinstance(value, (np.ndarray,)): + # Convert numpy arrays to lists + clean_dict[key] = value.tolist() + else: + clean_dict[key] = value + + adata.uns["ripley_l"] = clean_dict + print(f"Cleaned ripley_l dictionary with keys: {list(clean_dict.keys())}") + + else: + print(f"Warning: Unexpected ripley_l data type: {type(rl)}") + # Convert to string representation as fallback + adata.uns["ripley_l"] = str(rl) + + +def run_from_json( + json_path: Union[str, Path, Dict[str, Any]] +) -> Dict[str, str]: + """ + Execute Ripley-L analysis with parameters from JSON. + Replicates the NIDAP template functionality exactly. + + Parameters + ---------- + json_path : str, Path, or dict + Path to JSON file, JSON string, or parameter dictionary + + Returns + ------- + dict + Dictionary of saved file paths + """ + # Parse parameters from JSON + params = parse_params(json_path) + + # Load the upstream analysis data + adata = load_input(params["Upstream_Analysis"]) + + # Extract parameters + radii = params["Radii"] + annotation = params["Annotation"] + phenotypes = [params["Center_Phenotype"], params["Neighbor_Phenotype"]] + regions = params.get("Stratify_By", "None") + n_simulations = params.get("Number_of_Simulations", 1) + area = params.get("Area", "None") + seed = params.get("Seed", 42) + spatial_key = params.get("Spatial_Key", "spatial") + edge_correction = params.get("Edge_Correction", True) + + # Process parameters + regions = text_to_value( + regions, + default_none_text="None" + ) + + area = text_to_value( + area, + default_none_text="None", + value_to_convert_to=None, + to_float=True, + param_name='Area' + ) + + # Convert radii to floats + radii = _convert_to_floats(radii) + + # Run the analysis + ripley_l( + adata, + annotation=annotation, + phenotypes=phenotypes, + distances=radii, + regions=regions, + n_simulations=n_simulations, + area=area, + seed=seed, + spatial_key=spatial_key, + edge_correction=edge_correction + ) + + # Fix ripley_l results before saving + _prepare_ripley_uns_for_h5ad(adata) + + # Save outputs + outfile = params.get("Output_File", "transform_output.h5ad") + saved_files = save_outputs({outfile: adata}) + + print(f"Ripley-L completed → {saved_files[outfile]}") + print(adata) + return saved_files + + +def _convert_to_floats(text_list: List[Any]) -> List[float]: + """ + Convert list of text values to floats. + Exact copy from NIDAP template. + + Parameters + ---------- + text_list : list + List of values to convert + + Returns + ------- + list + List of float values + + Raises + ------ + ValueError + If any value cannot be converted to float + """ + float_list = [] + for value in text_list: + try: + float_list.append(float(value)) + except ValueError: + msg = f"Failed to convert the radius: '{value}' to float." + raise ValueError(msg) + return float_list + + +# CLI interface +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: python ripley_l_template.py ") + sys.exit(1) + + saved_files = run_from_json(sys.argv[1]) + + print("\nOutput files:") + for filename, filepath in saved_files.items(): + print(f" {filename}: {filepath}") diff --git a/src/spac/templates/template_utils.py b/src/spac/templates/template_utils.py new file mode 100644 index 00000000..d1b6511e --- /dev/null +++ b/src/spac/templates/template_utils.py @@ -0,0 +1,312 @@ +from pathlib import Path +import pickle +from typing import Any, Dict, Union, Optional +import json +import anndata as ad + + +def load_input(file_path: Union[str, Path]): + """ + Load input data from either h5ad or pickle file. + + Parameters + ---------- + file_path : str or Path + Path to input file (h5ad or pickle) + + Returns + ------- + Loaded data object (typically AnnData) + """ + path = Path(file_path) + + if not path.exists(): + raise FileNotFoundError(f"Input file not found: {file_path}") + + # Check file extension + suffix = path.suffix.lower() + + if suffix in ['.h5ad', '.h5']: + # Load h5ad file + try: + import anndata as ad + return ad.read_h5ad(path) + except ImportError: + raise ImportError( + "anndata package required to read h5ad files" + ) + except Exception as e: + raise ValueError(f"Error reading h5ad file: {e}") + + elif suffix in ['.pickle', '.pkl', '.p']: + # Load pickle file + with path.open('rb') as fh: + return pickle.load(fh) + + else: + # Try to detect file type by content + try: + # First try h5ad + import anndata as ad + return ad.read_h5ad(path) + except Exception: + # Fall back to pickle + try: + with path.open('rb') as fh: + return pickle.load(fh) + except Exception as e: + raise ValueError( + f"Unable to load file '{file_path}'. " + f"Supported formats: h5ad, pickle. Error: {e}" + ) + + +def save_outputs(outputs: Dict[str, Any], + output_dir: Union[str, Path] = ".") -> Dict[str, str]: + """ + Save multiple outputs to files and return a dict {filename: absolute_path}. + (Always a dict, even if just one file.) + + Parameters + ---------- + outputs : dict + Dictionary where: + - key: filename (with extension) + - value: object to save + output_dir : str or Path + Directory to save files + + Returns + ------- + dict + Dictionary of saved file paths + + Example + ------- + >>> outputs = { + ... "adata.h5ad": adata, + ... "results.csv": results_df, + ... "adata.pickle": adata + ... } + >>> saved = save_outputs(outputs, "results/") + """ + output_dir = Path(output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + saved_files = {} + + for filename, obj in outputs.items(): + filepath = output_dir / filename + + # Save based on file extension + if filename.endswith('.csv'): + obj.to_csv(filepath, index=False) + elif filename.endswith('.h5ad'): + ad.write(filepath) + elif filename.endswith(('.pickle', '.pkl')): + with open(filepath, 'wb') as f: + pickle.dump(obj, f) + elif hasattr(obj, "savefig"): + obj.savefig(filepath.with_suffix('.png')) + filepath = filepath.with_suffix('.png') + else: + # Default to pickle + with open(filepath, 'wb') as f: + pickle.dump(obj, f) + + saved_files[filename] = str(filepath.resolve()) + print(f"Saved: {filepath}") + + return saved_files + + +def parse_params( + json_input: Union[str, Path, Dict[str, Any]] +) -> Dict[str, Any]: + """ + Parse parameters from JSON file, string, or dict. + + Parameters + ---------- + json_input : str, Path, or dict + JSON file path, JSON string, or dictionary + + Returns + ------- + dict + Parsed parameters + """ + if isinstance(json_input, dict): + return json_input + + if isinstance(json_input, (str, Path)): + path = Path(json_input) + + # Check if it's a file path + if path.exists() or str(json_input).endswith('.json'): + with open(path, 'r') as file: + return json.load(file) + else: + # It's a JSON string + return json.loads(str(json_input)) + + raise TypeError( + "json_input must be dict, JSON string, or path to JSON file" + ) + + +def text_to_value( + var: Any, + default_none_text: str = "None", + value_to_convert_to: Any = None, + to_float: bool = False, + to_int: bool = False, + param_name: str = '' +): + """ + Converts a string to a specified value or type. Handles conversion to + float or integer and provides a default value if the input string + matches a specified 'None' text. + + Parameters + ---------- + var : str + The input string to be converted. + default_none_text : str, optional + The string that represents a 'None' value. If `var` matches this + string, it will be converted to `value_to_convert_to`. + Default is "None". + value_to_convert_to : any, optional + The value to assign to `var` if it matches `default_none_text` or + is an empty string. Default is None. + to_float : bool, optional + If True, attempt to convert `var` to a float. Default is False. + to_int : bool, optional + If True, attempt to convert `var` to an integer. Default is False. + param_name : str, optional + The name of the parameter, used in error messages for conversion + failures. Default is ''. + + Returns + ------- + any + The converted value, which may be the original string, a float, + an integer, or the specified `value_to_convert_to`. + + Raises + ------ + ValueError + If `to_float` or `to_int` is set to True and conversion fails. + + Notes + ----- + - If both `to_float` and `to_int` are set to True, the function will + prioritize conversion to float. + - If the string `var` matches `default_none_text` or is an empty + string, `value_to_convert_to` is returned. + + Examples + -------- + Convert a string representing a float: + + >>> text_to_value("3.14", to_float=True) + 3.14 + + Handle a 'None' string: + + >>> text_to_value("None", value_to_convert_to=None) + None + + Convert a string to an integer: + + >>> text_to_value("42", to_int=True) + 42 + + Handle invalid conversion: + + >>> text_to_value("abc", to_int=True, param_name="test_param") + Error: can't convert test_param to integer. Received:"abc" + 'abc' + """ + none_condition = ( + var.lower().strip() == default_none_text.lower().strip() or + var.strip() == '' + ) + + if none_condition: + var = value_to_convert_to + + elif to_float: + try: + var = float(var) + except ValueError: + error_msg = ( + f'Error: can\'t convert {param_name} to float. ' + f'Received:"{var}"' + ) + raise ValueError(error_msg) + + elif to_int: + try: + var = int(var) + except ValueError: + error_msg = ( + f'Error: can\'t convert {param_name} to integer. ' + f'Received:"{var}"' + ) + raise ValueError(error_msg) + + return var + + +def convert_pickle_to_h5ad( + pickle_path: Union[str, Path], + h5ad_path: Optional[Union[str, Path]] = None +) -> str: + """ + Convert a pickle file containing AnnData to h5ad format. + + Parameters + ---------- + pickle_path : str or Path + Path to input pickle file + h5ad_path : str or Path, optional + Path for output h5ad file. If None, uses same name with .h5ad + extension + + Returns + ------- + str + Path to saved h5ad file + """ + pickle_path = Path(pickle_path) + + if not pickle_path.exists(): + raise FileNotFoundError(f"Pickle file not found: {pickle_path}") + + # Load from pickle + with pickle_path.open('rb') as fh: + adata = pickle.load(fh) + + # Check if it's AnnData + try: + import anndata as ad + if not isinstance(adata, ad.AnnData): + raise TypeError( + f"Loaded object is not AnnData, got {type(adata)}" + ) + except ImportError: + raise ImportError( + "anndata package required for conversion to h5ad" + ) + + # Determine output path + if h5ad_path is None: + h5ad_path = pickle_path.with_suffix('.h5ad') + else: + h5ad_path = Path(h5ad_path) + + # Save as h5ad + adata.write_h5ad(h5ad_path) + + return str(h5ad_path) \ No newline at end of file diff --git a/src/spac/templates/visualize_ripley_template.py b/src/spac/templates/visualize_ripley_template.py new file mode 100644 index 00000000..5085b039 --- /dev/null +++ b/src/spac/templates/visualize_ripley_template.py @@ -0,0 +1,101 @@ +""" +Platform-agnostic Visualize Ripley L template converted from NIDAP. +Maintains the exact logic from the NIDAP template. + +Usage +----- +>>> from spac.templates.visualize_ripley_template import run_from_json +>>> run_from_json("examples/visualize_ripley_params.json") +""" +import json +import sys +from pathlib import Path +from typing import Any, Dict, Union, List, Optional +import pandas as pd +import matplotlib.pyplot as plt + +# Add parent directory to path for imports +sys.path.append(str(Path(__file__).parent.parent)) + +from spac.visualization import plot_ripley_l +from spac.templates.template_utils import ( + load_input, + save_outputs, + parse_params, + text_to_value, +) + + +def run_from_json( + json_path: Union[str, Path, Dict[str, Any]] +) -> Dict[str, str]: + """ + Execute Visualize Ripley L analysis with parameters from JSON. + Replicates the NIDAP template functionality exactly. + + Parameters + ---------- + json_path : str, Path, or dict + Path to JSON file, JSON string, or parameter dictionary + + Returns + ------- + dict + Dictionary of saved file paths + """ + # Parse parameters from JSON + params = parse_params(json_path) + + # Load the upstream analysis data + adata = load_input(params["Upstream_Analysis"]) + + # Extract parameters + center_phenotype = params["Center_Phenotype"] + neighbor_phenotype = params["Neighbor_Phenotype"] + plot_specific_regions = params.get("Plot_Specific_Regions", False) + regions_labels = params.get("Regions_Labels", []) + plot_simulations = params.get("Plot_Simulations", True) + + # Process regions parameter exactly as in NIDAP template + if plot_specific_regions: + if len(regions_labels) == 0: + raise ValueError( + 'Please identify at least one region in the ' + '"Regions Label(s) parameter' + ) + else: + regions_labels = None + + # Run the visualization exactly as in NIDAP template + fig, plots_df = plot_ripley_l( + adata, + phenotypes=(center_phenotype, neighbor_phenotype), + regions=regions_labels, + sims=plot_simulations, + return_df=True + ) + + plt.show() + + # Print the dataframe to console + print(plots_df.to_string()) + + # Save outputs + output_file = params.get("Output_File", "plots.csv") + saved_files = save_outputs({output_file: plots_df}) + + print(f"Visualize Ripley L completed → {saved_files[output_file]}") + return saved_files + + +# CLI interface +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: python visualize_ripley_template.py ") + sys.exit(1) + + saved_files = run_from_json(sys.argv[1]) + + print("\nOutput files:") + for filename, filepath in saved_files.items(): + print(f" {filename}: {filepath}") \ No newline at end of file diff --git a/tests/templates/__init__.py b/tests/templates/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/templates/test_ripley_l_template.py b/tests/templates/test_ripley_l_template.py new file mode 100644 index 00000000..f89bfce0 --- /dev/null +++ b/tests/templates/test_ripley_l_template.py @@ -0,0 +1,118 @@ +# tests/templates/test_ripley_l_template.py +"""Unit tests for the Ripley‑L template.""" + +import json +import os +import pickle +import sys +import tempfile +import unittest +import warnings + +import anndata as ad +import numpy as np +import pandas as pd +from pathlib import Path + +sys.path.append( + os.path.dirname(os.path.realpath(__file__)) + "/../../src" +) + +from spac.templates.ripley_l_template import ( + run_from_json, + _convert_to_floats +) + + +def mock_adata(n_cells: int = 10) -> ad.AnnData: + """Return a minimal synthetic AnnData for fast tests.""" + rng = np.random.default_rng(0) + obs = pd.DataFrame({ + "phenotype": ["B cells", "CD8 T cells"] * (n_cells // 2) + }) + x_mat = rng.normal(size=(n_cells, 2)) + adata = ad.AnnData(X=x_mat, obs=obs) + adata.obsm["spatial"] = rng.random((n_cells, 2)) * 50.0 + return adata + + +class TestRipleyLTemplate(unittest.TestCase): + """Unit tests for the Ripley-L template.""" + + def setUp(self) -> None: + self.tmp_dir = tempfile.TemporaryDirectory() + self.in_file = os.path.join(self.tmp_dir.name, "input.h5ad") + self.out_file = "output.h5ad" + + # Save minimal mock data + mock_adata().write_h5ad(self.in_file) + + # Minimal parameters - match the exact parameter names from template + self.params = { + "Upstream_Analysis": self.in_file, + "Radii": [10, 20], + "Annotation": "phenotype", + "Center_Phenotype": "B cells", + "Neighbor_Phenotype": "CD8 T cells", + "Output_File": self.out_file, + } + + def tearDown(self) -> None: + self.tmp_dir.cleanup() + + def test_ripley_l_analysis_workflow(self) -> None: + """Test Ripley-L specific analysis workflow and output validation.""" + # Suppress warnings for cleaner test output + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + + # Test 1: Basic Ripley-L analysis + saved_files = run_from_json(self.params) + self.assertIn(self.out_file, saved_files) + self.assertTrue(os.path.exists(saved_files[self.out_file])) + + # Load and verify Ripley-L specific output structure + adata = ad.read_h5ad(saved_files[self.out_file]) + + # Check that ripley_l results exist (key might vary) + ripley_keys = [ + k for k in adata.uns.keys() if 'ripley' in k.lower() + ] + self.assertTrue( + len(ripley_keys) > 0, "No Ripley-L results found in uns" + ) + self.assertEqual(adata.n_obs, 10) + + # Test 2: JSON file input (Ripley-L specific parameters) + json_path = os.path.join(self.tmp_dir.name, "params.json") + with open(json_path, "w") as f: + json.dump(self.params, f) + saved_files_json = run_from_json(json_path) + self.assertIn(self.out_file, saved_files_json) + + # Test 3: Parameter conversion (Ripley-L specific string + # parameters) + params_str = self.params.copy() + params_str["Radii"] = ["10", "20.5"] # String radii + params_str["Area"] = "100.0" # String area + params_str["Stratify_By"] = "None" # Text none value + params_str["Number_of_Simulations"] = 100 + params_str["Seed"] = 42 + params_str["Edge_Correction"] = True + saved_files_str = run_from_json(params_str) + self.assertIn(self.out_file, saved_files_str) + + def test_convert_to_floats_error_message(self) -> None: + """Test exact error message for invalid radius conversion.""" + with self.assertRaises(ValueError) as context: + _convert_to_floats(["10", "invalid", "20"]) + + expected_msg = ( + "Failed to convert the radius: 'invalid' to float." + ) + actual_msg = str(context.exception) + self.assertEqual(expected_msg, actual_msg) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/templates/test_ripley_l_template_2.py b/tests/templates/test_ripley_l_template_2.py new file mode 100644 index 00000000..389486d6 --- /dev/null +++ b/tests/templates/test_ripley_l_template_2.py @@ -0,0 +1,136 @@ +"""Unit tests for the Ripley-L template.""" + +import json +import os +import pickle +import sys +import tempfile +import unittest +import warnings + +import anndata as ad +import numpy as np +import pandas as pd + +sys.path.append( + os.path.dirname(os.path.realpath(__file__)) + "/../../src" +) + +from spac.templates.ripley_l_template import run_from_json, _convert_to_floats + + +def mock_adata(n_cells: int = 10) -> ad.AnnData: + """Return a minimal synthetic AnnData for fast tests.""" + rng = np.random.default_rng(0) + obs = pd.DataFrame({ + "phenotype": ["B cells", "CD8 T cells"] * (n_cells // 2) + }) + x_mat = rng.normal(size=(n_cells, 2)) + adata = ad.AnnData(X=x_mat, obs=obs) + adata.obsm["spatial"] = rng.random((n_cells, 2)) * 50.0 + return adata + + +class TestRipleyLTemplate(unittest.TestCase): + """Unit tests for the Ripley-L template.""" + + def setUp(self) -> None: + self.tmp_dir = tempfile.TemporaryDirectory() + self.in_file = os.path.join(self.tmp_dir.name, "input.h5ad") + # Change to pickle format to avoid h5ad serialization issues + self.out_file = "output.pickle" + + # Save minimal mock data + mock_adata().write_h5ad(self.in_file) + + # Minimal parameters - use pickle output to avoid serialization issues + self.params = { + "Upstream_Analysis": self.in_file, + "radii": [10, 20], + "annotation": "phenotype", + "center_phenotype": "B cells", + "neighbor_phenotype": "CD8 T cells", + "Output_File": self.out_file, + } + + def tearDown(self) -> None: + self.tmp_dir.cleanup() + + def test_complete_io_workflow(self) -> None: + """Single comprehensive I/O test covering all input/output scenarios.""" + # Suppress warnings for cleaner test output + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + + # Test 1: Dict input with h5ad file, pickle output + saved_files = run_from_json(self.params) + self.assertIn(self.out_file, saved_files) + self.assertTrue(os.path.exists(saved_files[self.out_file])) + + # Load and verify output structure from pickle + with open(saved_files[self.out_file], 'rb') as f: + adata = pickle.load(f) + # Check that ripley_l results exist (key might vary) + ripley_keys = [k for k in adata.uns.keys() if 'ripley' in k.lower()] + self.assertTrue(len(ripley_keys) > 0, "No Ripley-L results found in uns") + self.assertEqual(adata.n_obs, 10) + + # Test 2: JSON file input + json_path = os.path.join(self.tmp_dir.name, "params.json") + with open(json_path, "w") as f: + json.dump(self.params, f) + saved_files_json = run_from_json(json_path) + self.assertIn(self.out_file, saved_files_json) + + # Test 3: Pickle input file + pickle_file = os.path.join(self.tmp_dir.name, "input.pickle") + with open(pickle_file, "wb") as f: + pickle.dump(mock_adata(), f) + params_pickle = self.params.copy() + params_pickle["Upstream_Analysis"] = pickle_file + saved_files_pickle = run_from_json(params_pickle) + self.assertIn(self.out_file, saved_files_pickle) + + # Test 4: Parameter conversion (string radii, text values) + params_str = self.params.copy() + params_str["radii"] = ["10", "20.5"] + params_str["area"] = "100.0" + params_str["stratify_by"] = "None" + saved_files_str = run_from_json(params_str) + self.assertIn(self.out_file, saved_files_str) + + def test_convert_to_floats_error_message(self) -> None: + """Test exact error message for invalid radius conversion.""" + with self.assertRaises(ValueError) as context: + _convert_to_floats(["10", "invalid", "20"]) + + expected_msg = "Failed to convert the radius: 'invalid' to float." + actual_msg = str(context.exception) + self.assertEqual(expected_msg, actual_msg) + + def test_missing_input_file_error_message(self) -> None: + """Test exact error message for missing input file.""" + params_bad = self.params.copy() + params_bad["Upstream_Analysis"] = "/nonexistent/file.h5ad" + + with self.assertRaises(FileNotFoundError) as context: + run_from_json(params_bad) + + expected_msg = "Input file not found: /nonexistent/file.h5ad" + actual_msg = str(context.exception) + self.assertEqual(expected_msg, actual_msg) + + def test_invalid_json_input_error_message(self) -> None: + """Test exact error message for invalid JSON input type.""" + from spac.templates.ripley_l_template import _parse_params + + with self.assertRaises(TypeError) as context: + _parse_params(123) # Invalid type + + expected_msg = "json_input must be dict, JSON string, or path to JSON file" + actual_msg = str(context.exception) + self.assertEqual(expected_msg, actual_msg) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/templates/test_template_utils.py b/tests/templates/test_template_utils.py new file mode 100644 index 00000000..3472f8d9 --- /dev/null +++ b/tests/templates/test_template_utils.py @@ -0,0 +1,256 @@ +# tests/utils/test_template_utils.py +"""Unit tests for template utilities.""" + +import json +import os +import pickle +import sys +import tempfile +import unittest +import warnings + +import anndata as ad +import numpy as np +import pandas as pd +from pathlib import Path + +sys.path.append( + os.path.dirname(os.path.realpath(__file__)) + "/../../src" +) + +from spac.templates.template_utils import ( + load_input, + save_outputs, + text_to_value, + convert_pickle_to_h5ad +) + + +def mock_adata(n_cells: int = 10) -> ad.AnnData: + """Return a minimal synthetic AnnData for fast tests.""" + rng = np.random.default_rng(0) + obs = pd.DataFrame({ + "cell_type": ["TypeA", "TypeB"] * (n_cells // 2) + }) + x_mat = rng.normal(size=(n_cells, 2)) + adata = ad.AnnData(X=x_mat, obs=obs) + return adata + + +def mock_dataframe(n_rows: int = 5) -> pd.DataFrame: + """Return a minimal DataFrame for fast tests.""" + return pd.DataFrame({ + "col1": range(n_rows), + "col2": [f"value_{i}" for i in range(n_rows)] + }) + + +class TestTemplateUtils(unittest.TestCase): + """Unit tests for template utility functions.""" + + def setUp(self) -> None: + self.tmp_dir = tempfile.TemporaryDirectory() + self.test_adata = mock_adata() + self.test_df = mock_dataframe() + + def tearDown(self) -> None: + self.tmp_dir.cleanup() + + def test_complete_io_workflow(self) -> None: + """Single I/O test covering all input/output scenarios.""" + # Suppress warnings for cleaner test output + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + + # Test 1: Load h5ad file + h5ad_path = os.path.join(self.tmp_dir.name, "test.h5ad") + self.test_adata.write_h5ad(h5ad_path) + loaded_h5ad = load_input(h5ad_path) + self.assertEqual(loaded_h5ad.n_obs, 10) + self.assertIn("cell_type", loaded_h5ad.obs.columns) + + # Test 2: Load pickle file + pickle_path = os.path.join(self.tmp_dir.name, "test.pickle") + with open(pickle_path, "wb") as f: + pickle.dump(self.test_adata, f) + loaded_pickle = load_input(pickle_path) + self.assertEqual(loaded_pickle.n_obs, 10) + + # Test 3: Load .pkl extension + pkl_path = os.path.join(self.tmp_dir.name, "test.pkl") + with open(pkl_path, "wb") as f: + pickle.dump(self.test_adata, f) + loaded_pkl = load_input(pkl_path) + self.assertEqual(loaded_pkl.n_obs, 10) + + # Test 4: Load .p extension + p_path = os.path.join(self.tmp_dir.name, "test.p") + with open(p_path, "wb") as f: + pickle.dump(self.test_adata, f) + loaded_p = load_input(p_path) + self.assertEqual(loaded_p.n_obs, 10) + + # Test 5: Save outputs - multiple formats + outputs = { + "result.h5ad": self.test_adata, + "data.csv": self.test_df, + "adata.pickle": self.test_adata, + "adata.pkl": self.test_adata, + "other_data": {"key": "value"} # Defaults to h5ad + } + saved_files = save_outputs(outputs, self.tmp_dir.name) + + # Verify all files were saved + self.assertEqual(len(saved_files), 5) + for filename, filepath in saved_files.items(): + self.assertTrue(os.path.exists(filepath)) + self.assertIn(filename, saved_files) + + # Verify CSV content + csv_path = saved_files["data.csv"] + loaded_df = pd.read_csv(csv_path) + self.assertEqual(len(loaded_df), 5) + self.assertIn("col1", loaded_df.columns) + + # Test 6: Convert pickle to h5ad + pickle_src = os.path.join(self.tmp_dir.name, "convert_src.pickle") + with open(pickle_src, "wb") as f: + pickle.dump(self.test_adata, f) + + h5ad_dest = convert_pickle_to_h5ad(pickle_src) + self.assertTrue(os.path.exists(h5ad_dest)) + self.assertTrue(h5ad_dest.endswith(".h5ad")) + + # Test with custom output path + custom_dest = os.path.join( + self.tmp_dir.name, "custom_output.h5ad" + ) + h5ad_custom = convert_pickle_to_h5ad(pickle_src, custom_dest) + self.assertEqual(h5ad_custom, custom_dest) + self.assertTrue(os.path.exists(custom_dest)) + + # Test 7: Load file with no extension (content detection) + no_ext_path = os.path.join(self.tmp_dir.name, "noextension") + with open(no_ext_path, "wb") as f: + pickle.dump(self.test_adata, f) + loaded_no_ext = load_input(no_ext_path) + self.assertEqual(loaded_no_ext.n_obs, 10) + + def test_text_to_value_conversions(self) -> None: + """Test all text_to_value conversion scenarios.""" + # Test 1: Convert to float + result = text_to_value("3.14", to_float=True) + self.assertEqual(result, 3.14) + self.assertIsInstance(result, float) + + # Test 2: Convert to int + result = text_to_value("42", to_int=True) + self.assertEqual(result, 42) + self.assertIsInstance(result, int) + + # Test 3: None text handling + result = text_to_value("None", value_to_convert_to=None) + self.assertIsNone(result) + + # Test 4: Empty string handling + result = text_to_value("", value_to_convert_to=-1) + self.assertEqual(result, -1) + + # Test 5: Case insensitive None + result = text_to_value("none", value_to_convert_to=0) + self.assertEqual(result, 0) + + # Test 6: Custom none text + result = text_to_value( + "NA", default_none_text="NA", value_to_convert_to=999 + ) + self.assertEqual(result, 999) + + # Test 7: No conversion + result = text_to_value("keep_as_string") + self.assertEqual(result, "keep_as_string") + self.assertIsInstance(result, str) + + # Test 8: Whitespace handling + result = text_to_value(" None ", value_to_convert_to=None) + self.assertIsNone(result) + + def test_load_input_missing_file_error_message(self) -> None: + """Test exact error message for missing input file.""" + missing_path = "/nonexistent/path/file.h5ad" + + with self.assertRaises(FileNotFoundError) as context: + load_input(missing_path) + + expected_msg = f"Input file not found: {missing_path}" + actual_msg = str(context.exception) + self.assertEqual(expected_msg, actual_msg) + + def test_load_input_unsupported_format_error_message(self) -> None: + """Test exact error message for unsupported file format.""" + # Create a text file with unsupported content + txt_path = os.path.join(self.tmp_dir.name, "test.txt") + with open(txt_path, "w") as f: + f.write("This is not a valid data file") + + with self.assertRaises(ValueError) as context: + load_input(txt_path) + + actual_msg = str(context.exception) + self.assertTrue(actual_msg.startswith("Unable to load file")) + self.assertIn("Supported formats: h5ad, pickle", actual_msg) + + def test_text_to_value_float_conversion_error_message(self) -> None: + """Test exact error message for invalid float conversion.""" + with self.assertRaises(ValueError) as context: + text_to_value( + "not_a_number", to_float=True, param_name="test_param" + ) + + expected_msg = ( + 'Error: can\'t convert test_param to float. ' + 'Received:"not_a_number"' + ) + actual_msg = str(context.exception) + self.assertEqual(expected_msg, actual_msg) + + def test_text_to_value_int_conversion_error_message(self) -> None: + """Test exact error message for invalid integer conversion.""" + with self.assertRaises(ValueError) as context: + text_to_value("3.14", to_int=True, param_name="count") + + expected_msg = ( + 'Error: can\'t convert count to integer. ' + 'Received:"3.14"' + ) + actual_msg = str(context.exception) + self.assertEqual(expected_msg, actual_msg) + + def test_convert_pickle_to_h5ad_missing_file_error_message(self) -> None: + """Test exact error message for missing pickle file.""" + missing_pickle = "/nonexistent/file.pickle" + + with self.assertRaises(FileNotFoundError) as context: + convert_pickle_to_h5ad(missing_pickle) + + expected_msg = f"Pickle file not found: {missing_pickle}" + actual_msg = str(context.exception) + self.assertEqual(expected_msg, actual_msg) + + def test_convert_pickle_to_h5ad_wrong_type_error_message(self) -> None: + """Test exact error message when pickle doesn't contain AnnData.""" + # Create pickle with wrong type + wrong_pickle = os.path.join(self.tmp_dir.name, "wrong_type.pickle") + with open(wrong_pickle, "wb") as f: + pickle.dump({"not": "anndata"}, f) + + with self.assertRaises(TypeError) as context: + convert_pickle_to_h5ad(wrong_pickle) + + expected_msg = "Loaded object is not AnnData, got " + actual_msg = str(context.exception) + self.assertEqual(expected_msg, actual_msg) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/templates/test_visualize_ripley_template.py b/tests/templates/test_visualize_ripley_template.py new file mode 100644 index 00000000..2b6037d3 --- /dev/null +++ b/tests/templates/test_visualize_ripley_template.py @@ -0,0 +1,189 @@ +# tests/templates/test_visualize_ripley_template.py +"""Unit tests for the Visualize Ripley L template.""" + +import json +import os +import pickle +import sys +import tempfile +import unittest +import warnings + +import matplotlib +matplotlib.use("Agg") # Headless backend for CI + +import anndata as ad +import numpy as np +import pandas as pd +from pathlib import Path +from unittest.mock import patch, MagicMock + +sys.path.append( + os.path.dirname(os.path.realpath(__file__)) + "/../../src" +) + +from spac.templates.visualize_ripley_template import run_from_json + + +def mock_adata_with_ripley(n_cells: int = 10) -> ad.AnnData: + """Return a minimal synthetic AnnData with Ripley L results for tests.""" + rng = np.random.default_rng(0) + obs = pd.DataFrame({ + "phenotype": ["B cells", "CD8 T cells"] * (n_cells // 2) + }) + x_mat = rng.normal(size=(n_cells, 2)) + adata = ad.AnnData(X=x_mat, obs=obs) + adata.obsm["spatial"] = rng.random((n_cells, 2)) * 50.0 + + # Add mock Ripley L results in the expected format + # The key format is important: "ripley_l_phenotype1_phenotype2" + adata.uns["ripley_l_B cells_CD8 T cells"] = { + "radius": [0, 50, 100], + "ripley_l": [0, 1.2, 2.5], + "simulations": np.array([ + [0, 0.8, 1.9], [0, 1.1, 2.3], [0, 1.3, 2.7] + ]) + } + return adata + + +class TestVisualizeRipleyTemplate(unittest.TestCase): + """Unit tests for the Visualize Ripley L template.""" + + def setUp(self) -> None: + self.tmp_dir = tempfile.TemporaryDirectory() + self.in_file = os.path.join( + self.tmp_dir.name, "ripley_output.h5ad" + ) + self.out_file = "plots.csv" + + # Save minimal mock data with Ripley results + mock_adata_with_ripley().write_h5ad(self.in_file) + + # Minimal parameters + self.params = { + "Upstream_Analysis": self.in_file, + "Center_Phenotype": "B cells", + "Neighbor_Phenotype": "CD8 T cells", + "Plot_Specific_Regions": False, + "Regions_Label_s_": [], + "Plot_Simulations": True, + "Output_File": self.out_file, + } + + def tearDown(self) -> None: + self.tmp_dir.cleanup() + + @patch('spac.templates.visualize_ripley_template.plot_ripley_l') + def test_complete_io_workflow(self, mock_plot_ripley) -> None: + """Single I/O test covering all input/output scenarios.""" + # Mock the plot_ripley_l function to return a figure and dataframe + mock_fig = MagicMock() + mock_df = pd.DataFrame({ + 'radius': [0, 50, 100], + 'ripley_l': [0, 1.2, 2.5], + 'lower_ci': [0, 0.8, 1.9], + 'upper_ci': [0, 1.6, 3.1] + }) + mock_plot_ripley.return_value = (mock_fig, mock_df) + + # Suppress warnings for cleaner test output + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + + # Test 1: Basic workflow with dict input + saved_files = run_from_json(self.params) + self.assertIn(self.out_file, saved_files) + self.assertTrue(os.path.exists(saved_files[self.out_file])) + + # Verify plot_ripley_l was called with correct parameters + mock_plot_ripley.assert_called_once() + call_args = mock_plot_ripley.call_args + # Check that adata was passed as first argument + self.assertEqual(call_args[0][0].n_obs, 10) + # Check keyword arguments + self.assertEqual( + call_args[1]['phenotypes'], ("B cells", "CD8 T cells") + ) + self.assertEqual(call_args[1]['regions'], None) + self.assertEqual(call_args[1]['sims'], True) + self.assertEqual(call_args[1]['return_df'], True) + + # Verify CSV output structure + df_output = pd.read_csv(saved_files[self.out_file]) + self.assertEqual(len(df_output), 3) # 3 radius points + self.assertIn('radius', df_output.columns) + self.assertIn('ripley_l', df_output.columns) + + # Test 2: With specific regions enabled + params_regions = self.params.copy() + params_regions["Plot_Specific_Regions"] = True + params_regions["Regions_Label_s_"] = ["Region1", "Region2"] + mock_plot_ripley.reset_mock() + saved_files_regions = run_from_json(params_regions) + # Verify regions parameter was passed correctly + call_args = mock_plot_ripley.call_args + self.assertEqual( + call_args[1]['regions'], ["Region1", "Region2"] + ) + + # Test 3: Different output filename + params_custom = self.params.copy() + params_custom["Output_File"] = "custom_plots.csv" + mock_plot_ripley.reset_mock() + saved_files_custom = run_from_json(params_custom) + self.assertIn("custom_plots.csv", saved_files_custom) + self.assertTrue( + os.path.exists(saved_files_custom["custom_plots.csv"]) + ) + + def test_regions_validation_error_message(self) -> None: + """ + Test exact error message for empty regions + when Plot_Specific_Regions is True. + """ + params_bad = self.params.copy() + params_bad["Plot_Specific_Regions"] = True + params_bad["Regions_Label_s_"] = [] + + with self.assertRaises(ValueError) as context: + run_from_json(params_bad) + + expected_msg = ( + 'Please identify at least one region in the ' + '"Regions Label(s) parameter' + ) + actual_msg = str(context.exception) + self.assertEqual(expected_msg, actual_msg) + + @patch('spac.templates.visualize_ripley_template.plot_ripley_l') + def test_console_output(self, mock_plot_ripley) -> None: + """Test that dataframe is printed to console.""" + # Mock the plot_ripley_l function + mock_fig = MagicMock() + mock_df = pd.DataFrame({ + 'radius': [0, 50, 100], + 'ripley_l': [0, 1.2, 2.5] + }) + mock_plot_ripley.return_value = (mock_fig, mock_df) + + # Capture console output + with patch('builtins.print') as mock_print: + run_from_json(self.params) + + # Verify dataframe was printed + print_calls = mock_print.call_args_list + # Check that to_string() output was printed + df_printed = False + for call in print_calls: + if (len(call[0]) > 0 and + call[0][0] == mock_df.to_string()): + df_printed = True + break + self.assertTrue( + df_printed, "DataFrame was not printed to console" + ) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file