diff --git a/clifpy/utils/__init__.py b/clifpy/utils/__init__.py index b920b5c4..3b4b1295 100644 --- a/clifpy/utils/__init__.py +++ b/clifpy/utils/__init__.py @@ -15,6 +15,7 @@ from .stitching_encounters import stitch_encounters from .sofa import compute_sofa, _compute_sofa_from_extremal_values, _agg_extremal_values_by_id from .ase import compute_ase +from .imv_episodes import detect_imv_episodes, calculate_ventilator_free_days __all__ = [ # io @@ -42,6 +43,9 @@ '_agg_extremal_values_by_id', # ase 'compute_ase', + # imv_episodes + 'detect_imv_episodes', + 'calculate_ventilator_free_days', # validator (add main functions) 'validate_dataframe', 'validate_table', diff --git a/clifpy/utils/imv_episodes.py b/clifpy/utils/imv_episodes.py new file mode 100644 index 00000000..3d7ce04e --- /dev/null +++ b/clifpy/utils/imv_episodes.py @@ -0,0 +1,375 @@ +""" +IMV Episode Detection for CLIF Respiratory Support Data. + +This module provides functions to identify discrete invasive mechanical ventilation +(IMV) episodes from respiratory_support table data. +""" + +import pandas as pd +import numpy as np +from typing import Optional, List + + +# Observed variables that indicate active ventilation (excluding resp_rate_obs) +IMV_OBS_COLUMNS = [ + "tidal_volume_obs", + "plateau_pressure_obs", + "peak_inspiratory_pressure_obs", + "peep_obs", + "minute_vent_obs", + "mean_airway_pressure_obs", +] + +# Non-IMV device categories +NON_IMV_DEVICE_CATEGORIES = [ + "NIPPV", + "CPAP", + "High Flow NC", + "Face Mask", + "Trach Collar", + "Nasal Cannula", + "Room Air", + "Other", +] + + +def detect_imv_episodes( + respiratory_support: pd.DataFrame, + min_gap_hours: float = 1.0, + include_tracheostomy: bool = True, +) -> pd.DataFrame: + """ + Detect invasive mechanical ventilation (IMV) episodes from respiratory support data. + + This function identifies discrete IMV episodes by leveraging the fact that observed + ventilator measurements (tidal volume, pressures, etc.) don't occur when the + ventilator is in standby or disconnected. + + Episode Logic: + - IMV_start: First documented *_obs variable (OTHER than resp_rate_obs) + where device_category = 'IMV' + - IMV_end: Last *_obs followed by ANY documentation of non-IMV respiratory + data (any other device_category OR lpm_set > 0) + + Parameters + ---------- + respiratory_support : pd.DataFrame + Respiratory support table with required columns: + - hospitalization_id + - recorded_dttm + - device_category + - lpm_set + - Plus at least one *_obs column (tidal_volume_obs, peep_obs, etc.) + + min_gap_hours : float, default=1.0 + Minimum gap in hours between IMV observations to consider as separate episodes. + Shorter gaps are treated as continuous ventilation. + + include_tracheostomy : bool, default=True + If True, include tracheostomy status in output if available. + + Returns + ------- + pd.DataFrame + DataFrame with one row per IMV episode containing: + - hospitalization_id : str + Patient hospitalization identifier + - imv_episode_id : int + Sequential episode number within hospitalization (1, 2, 3, ...) + - imv_start_dttm : datetime + Start of IMV episode (first *_obs measurement) + - imv_end_dttm : datetime + End of IMV episode (last *_obs before non-IMV documentation) + - duration_hours : float + Episode duration in hours + - has_tracheostomy : int (0/1), optional + Whether tracheostomy was documented during episode + + Raises + ------ + ValueError + If required columns are missing from input DataFrame + + Examples + -------- + >>> from clifpy.utils import detect_imv_episodes + >>> episodes = detect_imv_episodes(respiratory_support_df) + >>> print(episodes.head()) + hospitalization_id imv_episode_id imv_start_dttm imv_end_dttm duration_hours + 0 H00001 1 2024-01-01 08:00:00 2024-01-03 14:00:00 54.0 + 1 H00001 2 2024-01-05 10:00:00 2024-01-07 16:00:00 54.0 + 2 H00002 1 2024-01-02 12:00:00 2024-01-02 18:00:00 6.0 + + Notes + ----- + The logic excludes resp_rate_obs from IMV detection because respiratory rate + can be documented even when the ventilator is on standby or the patient is + breathing spontaneously. + + See Also + -------- + stitch_encounters : For linking related hospitalizations + create_wide_dataset : For creating hourly respiratory data + """ + # Validate required columns + required_cols = ["hospitalization_id", "recorded_dttm", "device_category", "lpm_set"] + missing_cols = [col for col in required_cols if col not in respiratory_support.columns] + if missing_cols: + raise ValueError(f"Missing required columns: {missing_cols}") + + # Check for at least one *_obs column + available_obs_cols = [col for col in IMV_OBS_COLUMNS if col in respiratory_support.columns] + if not available_obs_cols: + raise ValueError( + f"At least one observation column required. Expected one of: {IMV_OBS_COLUMNS}" + ) + + # Work with a copy and ensure datetime + df = respiratory_support.copy() + df["recorded_dttm"] = pd.to_datetime(df["recorded_dttm"]) + + # Sort by hospitalization and time + df = df.sort_values(["hospitalization_id", "recorded_dttm"]).reset_index(drop=True) + + # Create flag for any *_obs value present (excluding resp_rate_obs) + df["has_imv_obs"] = df[available_obs_cols].notna().any(axis=1) + + # Create flag for IMV device + df["is_imv"] = df["device_category"].str.upper() == "IMV" + + # Create flag for non-IMV documentation (other device OR lpm > 0) + df["is_non_imv"] = ( + df["device_category"].isin(NON_IMV_DEVICE_CATEGORIES) | + (df["lpm_set"].notna() & (df["lpm_set"] > 0)) + ) + + # Active IMV = IMV device with observed measurements + df["active_imv"] = df["is_imv"] & df["has_imv_obs"] + + # Process each hospitalization + episodes_list = [] + + for hosp_id, hosp_df in df.groupby("hospitalization_id"): + hosp_episodes = _detect_episodes_for_hospitalization( + hosp_df, + hosp_id, + min_gap_hours=min_gap_hours, + include_tracheostomy=include_tracheostomy + ) + episodes_list.extend(hosp_episodes) + + # Create output DataFrame + if not episodes_list: + # Return empty DataFrame with correct schema + cols = ["hospitalization_id", "imv_episode_id", "imv_start_dttm", + "imv_end_dttm", "duration_hours"] + if include_tracheostomy and "tracheostomy" in respiratory_support.columns: + cols.append("has_tracheostomy") + return pd.DataFrame(columns=cols) + + episodes_df = pd.DataFrame(episodes_list) + + # Calculate duration + episodes_df["duration_hours"] = ( + (episodes_df["imv_end_dttm"] - episodes_df["imv_start_dttm"]) + .dt.total_seconds() / 3600 + ).round(2) + + return episodes_df + + +def _detect_episodes_for_hospitalization( + hosp_df: pd.DataFrame, + hosp_id: str, + min_gap_hours: float, + include_tracheostomy: bool, +) -> List[dict]: + """ + Detect IMV episodes within a single hospitalization. + + Parameters + ---------- + hosp_df : pd.DataFrame + Respiratory support data for one hospitalization, sorted by time. + hosp_id : str + Hospitalization identifier. + min_gap_hours : float + Minimum gap to consider separate episodes. + include_tracheostomy : bool + Whether to track tracheostomy status. + + Returns + ------- + List[dict] + List of episode dictionaries. + """ + episodes = [] + + # Get active IMV rows + imv_rows = hosp_df[hosp_df["active_imv"]].copy() + + if imv_rows.empty: + return episodes + + # Calculate time gaps between consecutive IMV observations + imv_rows = imv_rows.sort_values("recorded_dttm").reset_index(drop=True) + imv_rows["time_gap_hours"] = ( + imv_rows["recorded_dttm"].diff().dt.total_seconds() / 3600 + ) + + # Also check if there's non-IMV documentation between IMV rows + # by looking at the original dataframe + all_times = hosp_df[["recorded_dttm", "is_non_imv", "active_imv"]].copy() + + # Identify episode boundaries + # New episode starts when: + # 1. First IMV observation, OR + # 2. Gap > min_gap_hours, OR + # 3. Non-IMV documentation occurred since last IMV obs + + episode_id = 0 + current_episode_start = None + current_episode_end = None + current_has_trach = 0 + + for idx, row in imv_rows.iterrows(): + start_new_episode = False + + if current_episode_start is None: + # First IMV observation + start_new_episode = True + elif row["time_gap_hours"] > min_gap_hours: + # Gap too large + start_new_episode = True + else: + # Check for non-IMV documentation between last IMV and current + between_mask = ( + (all_times["recorded_dttm"] > current_episode_end) & + (all_times["recorded_dttm"] < row["recorded_dttm"]) & + (all_times["is_non_imv"]) + ) + if between_mask.any(): + start_new_episode = True + + if start_new_episode: + # Save previous episode if exists + if current_episode_start is not None: + episode_id += 1 + episode_dict = { + "hospitalization_id": hosp_id, + "imv_episode_id": episode_id, + "imv_start_dttm": current_episode_start, + "imv_end_dttm": current_episode_end, + } + if include_tracheostomy and "tracheostomy" in hosp_df.columns: + episode_dict["has_tracheostomy"] = current_has_trach + episodes.append(episode_dict) + + # Start new episode + current_episode_start = row["recorded_dttm"] + current_episode_end = row["recorded_dttm"] + current_has_trach = 0 + + # Update current episode end + current_episode_end = row["recorded_dttm"] + + # Track tracheostomy + if include_tracheostomy and "tracheostomy" in hosp_df.columns: + if pd.notna(row.get("tracheostomy")) and row.get("tracheostomy") == 1: + current_has_trach = 1 + + # Save final episode + if current_episode_start is not None: + episode_id += 1 + episode_dict = { + "hospitalization_id": hosp_id, + "imv_episode_id": episode_id, + "imv_start_dttm": current_episode_start, + "imv_end_dttm": current_episode_end, + } + if include_tracheostomy and "tracheostomy" in hosp_df.columns: + episode_dict["has_tracheostomy"] = current_has_trach + episodes.append(episode_dict) + + return episodes + + +def calculate_ventilator_free_days( + imv_episodes: pd.DataFrame, + hospitalization: pd.DataFrame, + observation_window_days: int = 28, + mortality_column: Optional[str] = "discharge_category", + death_values: Optional[List[str]] = None, +) -> pd.DataFrame: + """ + Calculate ventilator-free days (VFDs) from IMV episodes. + + Ventilator-free days are defined as days alive and free of mechanical + ventilation during the observation window. Patients who die receive 0 VFDs. + + Parameters + ---------- + imv_episodes : pd.DataFrame + Output from detect_imv_episodes(). + hospitalization : pd.DataFrame + Hospitalization table with admission_dttm and discharge info. + observation_window_days : int, default=28 + Number of days for VFD calculation (typically 28). + mortality_column : str, default="discharge_category" + Column name containing mortality/discharge information. + death_values : List[str], optional + Values in mortality_column that indicate death. + Default: ["Expired", "Dead", "Deceased", "Death"] + + Returns + ------- + pd.DataFrame + DataFrame with hospitalization_id and ventilator_free_days. + + Examples + -------- + >>> episodes = detect_imv_episodes(resp_df) + >>> vfds = calculate_ventilator_free_days(episodes, hosp_df) + >>> print(vfds.head()) + hospitalization_id ventilator_free_days total_imv_days died + 0 H00001 22.5 5.5 0 + 1 H00002 0.0 3.2 1 + """ + if death_values is None: + death_values = ["Expired", "Dead", "Deceased", "Death"] + + # Get hospitalization info + hosp_info = hospitalization[["hospitalization_id", "admission_dttm"]].copy() + if mortality_column in hospitalization.columns: + hosp_info["died"] = hospitalization[mortality_column].isin(death_values).astype(int) + else: + hosp_info["died"] = 0 + + hosp_info["admission_dttm"] = pd.to_datetime(hosp_info["admission_dttm"]) + hosp_info["window_end"] = hosp_info["admission_dttm"] + pd.Timedelta(days=observation_window_days) + + # Calculate total IMV days per hospitalization + if imv_episodes.empty: + hosp_info["total_imv_days"] = 0.0 + else: + imv_by_hosp = imv_episodes.groupby("hospitalization_id")["duration_hours"].sum() / 24 + imv_by_hosp = imv_by_hosp.reset_index() + imv_by_hosp.columns = ["hospitalization_id", "total_imv_days"] + hosp_info = hosp_info.merge(imv_by_hosp, on="hospitalization_id", how="left") + hosp_info["total_imv_days"] = hosp_info["total_imv_days"].fillna(0) + + # Cap IMV days at observation window + hosp_info["total_imv_days"] = hosp_info["total_imv_days"].clip(upper=observation_window_days) + + # Calculate VFDs (0 if died) + hosp_info["ventilator_free_days"] = np.where( + hosp_info["died"] == 1, + 0, + observation_window_days - hosp_info["total_imv_days"] + ) + + # Round to 1 decimal + hosp_info["ventilator_free_days"] = hosp_info["ventilator_free_days"].round(1) + hosp_info["total_imv_days"] = hosp_info["total_imv_days"].round(1) + + return hosp_info[["hospitalization_id", "ventilator_free_days", "total_imv_days", "died"]] diff --git a/tests/test_imv_episodes.py b/tests/test_imv_episodes.py new file mode 100644 index 00000000..1b3bd613 --- /dev/null +++ b/tests/test_imv_episodes.py @@ -0,0 +1,273 @@ +""" +Tests for IMV episode detection functionality. +""" + +import pytest +import pandas as pd +import numpy as np +from datetime import datetime, timedelta + +from clifpy.utils.imv_episodes import ( + detect_imv_episodes, + calculate_ventilator_free_days, + IMV_OBS_COLUMNS, + NON_IMV_DEVICE_CATEGORIES, +) + + +@pytest.fixture +def sample_respiratory_data(): + """Create sample respiratory support data with known IMV episodes.""" + base_time = datetime(2024, 1, 1, 8, 0, 0) + + data = [] + + # Episode 1: IMV from hour 0-6 + for hour in range(7): + data.append({ + "hospitalization_id": "H001", + "recorded_dttm": base_time + timedelta(hours=hour), + "device_category": "IMV", + "mode_category": "Assist Control-Volume Control", + "lpm_set": None, + "tidal_volume_obs": 450 + np.random.randint(-20, 20), + "peep_obs": 5.0, + "resp_rate_obs": 14, + "tracheostomy": 0, + }) + + # Extubation: Non-IMV documentation at hour 7 + data.append({ + "hospitalization_id": "H001", + "recorded_dttm": base_time + timedelta(hours=7), + "device_category": "High Flow NC", + "mode_category": "Other", + "lpm_set": 40.0, + "tidal_volume_obs": None, + "peep_obs": None, + "resp_rate_obs": 18, + "tracheostomy": 0, + }) + + # Continue on high flow for a few hours + for hour in range(8, 12): + data.append({ + "hospitalization_id": "H001", + "recorded_dttm": base_time + timedelta(hours=hour), + "device_category": "High Flow NC", + "mode_category": "Other", + "lpm_set": 40.0, + "tidal_volume_obs": None, + "peep_obs": None, + "resp_rate_obs": 16 + np.random.randint(-2, 2), + "tracheostomy": 0, + }) + + # Episode 2: Re-intubation at hour 12 + for hour in range(12, 20): + data.append({ + "hospitalization_id": "H001", + "recorded_dttm": base_time + timedelta(hours=hour), + "device_category": "IMV", + "mode_category": "Pressure Control", + "lpm_set": None, + "tidal_volume_obs": 420 + np.random.randint(-20, 20), + "peep_obs": 8.0, + "resp_rate_obs": 16, + "tracheostomy": 1 if hour >= 18 else 0, # Trach placed at hour 18 + }) + + return pd.DataFrame(data) + + +@pytest.fixture +def sample_hospitalization_data(): + """Create sample hospitalization data.""" + return pd.DataFrame([ + { + "hospitalization_id": "H001", + "patient_id": "P001", + "admission_dttm": datetime(2024, 1, 1, 6, 0, 0), + "discharge_dttm": datetime(2024, 1, 5, 12, 0, 0), + "discharge_category": "Home", + }, + { + "hospitalization_id": "H002", + "patient_id": "P002", + "admission_dttm": datetime(2024, 1, 2, 10, 0, 0), + "discharge_dttm": datetime(2024, 1, 4, 8, 0, 0), + "discharge_category": "Expired", + }, + ]) + + +class TestDetectIMVEpisodes: + """Tests for detect_imv_episodes function.""" + + def test_basic_episode_detection(self, sample_respiratory_data): + """Test that basic IMV episodes are detected correctly.""" + episodes = detect_imv_episodes(sample_respiratory_data) + + assert len(episodes) == 2, "Should detect 2 IMV episodes" + assert episodes.iloc[0]["imv_episode_id"] == 1 + assert episodes.iloc[1]["imv_episode_id"] == 2 + + def test_episode_start_times(self, sample_respiratory_data): + """Test that episode start times are correct.""" + episodes = detect_imv_episodes(sample_respiratory_data) + + base_time = datetime(2024, 1, 1, 8, 0, 0) + + # Episode 1 starts at hour 0 + assert episodes.iloc[0]["imv_start_dttm"] == base_time + + # Episode 2 starts at hour 12 + assert episodes.iloc[1]["imv_start_dttm"] == base_time + timedelta(hours=12) + + def test_episode_end_times(self, sample_respiratory_data): + """Test that episode end times are correct.""" + episodes = detect_imv_episodes(sample_respiratory_data) + + base_time = datetime(2024, 1, 1, 8, 0, 0) + + # Episode 1 ends at hour 6 (last IMV obs before HFNC at hour 7) + assert episodes.iloc[0]["imv_end_dttm"] == base_time + timedelta(hours=6) + + # Episode 2 ends at hour 19 (last observation) + assert episodes.iloc[1]["imv_end_dttm"] == base_time + timedelta(hours=19) + + def test_duration_calculation(self, sample_respiratory_data): + """Test that episode durations are calculated correctly.""" + episodes = detect_imv_episodes(sample_respiratory_data) + + # Episode 1: 6 hours (hour 0 to hour 6) + assert episodes.iloc[0]["duration_hours"] == 6.0 + + # Episode 2: 7 hours (hour 12 to hour 19) + assert episodes.iloc[1]["duration_hours"] == 7.0 + + def test_tracheostomy_tracking(self, sample_respiratory_data): + """Test that tracheostomy is tracked during episodes.""" + episodes = detect_imv_episodes(sample_respiratory_data, include_tracheostomy=True) + + # Episode 1: No trach + assert episodes.iloc[0]["has_tracheostomy"] == 0 + + # Episode 2: Trach placed during episode + assert episodes.iloc[1]["has_tracheostomy"] == 1 + + def test_missing_columns_raises_error(self): + """Test that missing required columns raises ValueError.""" + df = pd.DataFrame({"hospitalization_id": ["H001"], "recorded_dttm": [datetime.now()]}) + + with pytest.raises(ValueError, match="Missing required columns"): + detect_imv_episodes(df) + + def test_no_obs_columns_raises_error(self): + """Test that missing all *_obs columns raises ValueError.""" + df = pd.DataFrame({ + "hospitalization_id": ["H001"], + "recorded_dttm": [datetime.now()], + "device_category": ["IMV"], + "lpm_set": [None], + }) + + with pytest.raises(ValueError, match="observation column required"): + detect_imv_episodes(df) + + def test_empty_result_for_no_imv(self): + """Test that empty DataFrame is returned when no IMV data exists.""" + df = pd.DataFrame({ + "hospitalization_id": ["H001", "H001"], + "recorded_dttm": [datetime.now(), datetime.now() + timedelta(hours=1)], + "device_category": ["High Flow NC", "Nasal Cannula"], + "lpm_set": [40.0, 4.0], + "tidal_volume_obs": [None, None], + }) + + episodes = detect_imv_episodes(df) + assert len(episodes) == 0 + assert "hospitalization_id" in episodes.columns + + def test_min_gap_hours_parameter(self): + """Test that min_gap_hours correctly splits episodes.""" + base_time = datetime(2024, 1, 1, 8, 0, 0) + + # Create data with 3-hour gap between IMV observations + df = pd.DataFrame({ + "hospitalization_id": ["H001", "H001"], + "recorded_dttm": [base_time, base_time + timedelta(hours=3)], + "device_category": ["IMV", "IMV"], + "lpm_set": [None, None], + "tidal_volume_obs": [450, 460], + }) + + # With 2-hour min gap, should be 2 episodes + episodes_2hr = detect_imv_episodes(df, min_gap_hours=2.0) + assert len(episodes_2hr) == 2 + + # With 4-hour min gap, should be 1 episode + episodes_4hr = detect_imv_episodes(df, min_gap_hours=4.0) + assert len(episodes_4hr) == 1 + + +class TestCalculateVentilatorFreeDays: + """Tests for calculate_ventilator_free_days function.""" + + def test_basic_vfd_calculation(self, sample_respiratory_data, sample_hospitalization_data): + """Test basic VFD calculation.""" + episodes = detect_imv_episodes(sample_respiratory_data) + vfds = calculate_ventilator_free_days(episodes, sample_hospitalization_data) + + # H001 should have VFDs (not dead) + h001_vfd = vfds[vfds["hospitalization_id"] == "H001"].iloc[0] + assert h001_vfd["ventilator_free_days"] > 0 + assert h001_vfd["died"] == 0 + + def test_vfd_zero_for_death(self, sample_hospitalization_data): + """Test that VFD is 0 for patients who died.""" + # Empty episodes + episodes = pd.DataFrame(columns=["hospitalization_id", "imv_episode_id", + "imv_start_dttm", "imv_end_dttm", "duration_hours"]) + + vfds = calculate_ventilator_free_days(episodes, sample_hospitalization_data) + + # H002 died, so VFD should be 0 + h002_vfd = vfds[vfds["hospitalization_id"] == "H002"].iloc[0] + assert h002_vfd["ventilator_free_days"] == 0 + assert h002_vfd["died"] == 1 + + def test_vfd_capped_at_observation_window(self, sample_hospitalization_data): + """Test that IMV days are capped at observation window.""" + # Create episode with 30 days of IMV + episodes = pd.DataFrame({ + "hospitalization_id": ["H001"], + "imv_episode_id": [1], + "imv_start_dttm": [datetime(2024, 1, 1)], + "imv_end_dttm": [datetime(2024, 1, 31)], + "duration_hours": [30 * 24], # 30 days + }) + + vfds = calculate_ventilator_free_days( + episodes, + sample_hospitalization_data, + observation_window_days=28 + ) + + h001_vfd = vfds[vfds["hospitalization_id"] == "H001"].iloc[0] + assert h001_vfd["total_imv_days"] == 28.0 # Capped at 28 + assert h001_vfd["ventilator_free_days"] == 0.0 + + +class TestConstants: + """Tests for module constants.""" + + def test_imv_obs_columns_excludes_resp_rate(self): + """Verify resp_rate_obs is not in IMV_OBS_COLUMNS.""" + assert "resp_rate_obs" not in IMV_OBS_COLUMNS + + def test_non_imv_device_categories(self): + """Verify non-IMV categories are complete.""" + expected = {"NIPPV", "CPAP", "High Flow NC", "Face Mask", + "Trach Collar", "Nasal Cannula", "Room Air", "Other"} + assert set(NON_IMV_DEVICE_CATEGORIES) == expected