diff --git a/clifpy/utils/__init__.py b/clifpy/utils/__init__.py index 15e438d4..5a5d04df 100644 --- a/clifpy/utils/__init__.py +++ b/clifpy/utils/__init__.py @@ -14,6 +14,7 @@ from .waterfall import process_resp_support_waterfall from .stitching_encounters import stitch_encounters from .sofa import compute_sofa, _compute_sofa_from_extremal_values, _agg_extremal_values_by_id +from .sepsis import compute_sepsis __all__ = [ # io @@ -39,6 +40,8 @@ 'compute_sofa', '_compute_sofa_from_extremal_values', '_agg_extremal_values_by_id', + # sepsis + 'compute_sepsis', # validator (add main functions) 'validate_dataframe', 'validate_table', diff --git a/clifpy/utils/sepsis.py b/clifpy/utils/sepsis.py new file mode 100644 index 00000000..6650cbe3 --- /dev/null +++ b/clifpy/utils/sepsis.py @@ -0,0 +1,572 @@ +""" +Adult Sepsis Event (ASE) Calculation Utility + +This module implements the CDC Adult Sepsis Event criteria for identifying +sepsis cases in hospitalized patients based on the CDC surveillance toolkit. + +Reference: https://www.cdc.gov/sepsis/pdfs/sepsis-surveillance-toolkit-mar-2018_508.pdf + +ASE requires BOTH: +A. Presumed Infection (blood culture + qualifying antibiotic days) +AND +B. Organ Dysfunction (vasopressors, mechanical ventilation, or lab criteria) +""" + +import pandas as pd +from typing import Dict, Optional, List +import duckdb +import logging + +# Set up logging - use centralized logger +logger = logging.getLogger('clifpy.utils.sepsis') + + +def _identify_presumed_infection( + blood_cultures: pd.DataFrame, + antibiotics: pd.DataFrame, + hospitalization: pd.DataFrame, + patient: Optional[pd.DataFrame] = None, + window_days: int = 2 +) -> pd.DataFrame: + """ + Identify presumed infection based on CDC ASE criteria. + + Presumed infection requires: + 1. Blood culture obtained (any result) + 2. At least 4 Qualifying Antibiotic Days (QAD) within -2 to +6 days of blood culture + OR 1+ QAD if patient died/transferred within 6 days + + Parameters: + blood_cultures: DataFrame with columns [hospitalization_id, collect_dttm, fluid_category] + filtered to blood/buffy coat cultures + antibiotics: DataFrame with columns [hospitalization_id, admin_dttm, med_group] + filtered to CMS_sepsis_qualifying_antibiotics + hospitalization: DataFrame with columns [hospitalization_id, discharge_dttm, discharge_category] + patient: Optional DataFrame with columns [patient_id, death_dttm] for censoring + window_days: Number of days before/after blood culture to look (default: 2) + + Returns: + DataFrame with columns [hospitalization_id, presumed_infection_time] + """ + # Step 1: Get earliest blood culture per hospitalization within window + bc_query = """ + SELECT + hospitalization_id, + MIN(collect_dttm) as blood_culture_time + FROM blood_cultures + GROUP BY hospitalization_id + """ + bc_df = duckdb.sql(bc_query).df() + + # Step 2: Calculate qualifying antibiotic days (QAD) + # Relative day calculation: day 0 = blood culture day + qad_query = f""" + WITH antibiotic_days AS ( + SELECT + a.hospitalization_id, + bc.blood_culture_time, + a.admin_dttm, + FLOOR(DATE_DIFF('hour', bc.blood_culture_time::TIMESTAMP, a.admin_dttm::TIMESTAMP) / 24.0) as relative_day + FROM antibiotics a + INNER JOIN bc_df bc ON a.hospitalization_id = bc.hospitalization_id + ), + filtered_days AS ( + SELECT + hospitalization_id, + blood_culture_time, + relative_day + FROM antibiotic_days + WHERE relative_day >= -{window_days} AND relative_day <= 6 + ), + distinct_days AS ( + SELECT DISTINCT + hospitalization_id, + blood_culture_time, + relative_day + FROM filtered_days + ), + consecutive_runs AS ( + SELECT + hospitalization_id, + blood_culture_time, + relative_day, + relative_day - ROW_NUMBER() OVER ( + PARTITION BY hospitalization_id + ORDER BY relative_day + ) as run_group + FROM distinct_days + ), + run_lengths AS ( + SELECT + hospitalization_id, + blood_culture_time, + run_group, + COUNT(*) as run_length, + MAX(relative_day) as end_day + FROM consecutive_runs + GROUP BY hospitalization_id, blood_culture_time, run_group + ), + max_run_per_hosp AS ( + SELECT + hospitalization_id, + blood_culture_time, + MAX(run_length) as total_QAD + FROM run_lengths + GROUP BY hospitalization_id, blood_culture_time + ) + SELECT + m.hospitalization_id, + m.blood_culture_time, + m.total_QAD, + MAX(r.end_day) as last_qad_day + FROM max_run_per_hosp m + LEFT JOIN run_lengths r + ON m.hospitalization_id = r.hospitalization_id + AND m.blood_culture_time = r.blood_culture_time + AND m.total_QAD = r.run_length + GROUP BY m.hospitalization_id, m.blood_culture_time, m.total_QAD + """ + qad_df = duckdb.sql(qad_query).df() + + # Step 3: Determine censoring time for early death/transfer + if patient is not None: + censoring_query = """ + SELECT + h.hospitalization_id, + CASE + WHEN p.death_dttm IS NOT NULL + AND p.death_dttm <= COALESCE(h.discharge_dttm, p.death_dttm) + THEN p.death_dttm + WHEN h.discharge_category IN ('Expired', 'Acute Care Hospital', 'Hospice') + THEN h.discharge_dttm + ELSE NULL + END as censoring_time + FROM hospitalization h + LEFT JOIN patient p ON h.patient_id = p.patient_id + WHERE censoring_time IS NOT NULL + """ + censoring_df = duckdb.sql(censoring_query).df() + else: + censoring_query = """ + SELECT + hospitalization_id, + CASE + WHEN discharge_category IN ('Expired', 'Acute Care Hospital', 'Hospice') + THEN discharge_dttm + ELSE NULL + END as censoring_time + FROM hospitalization + WHERE censoring_time IS NOT NULL + """ + censoring_df = duckdb.sql(censoring_query).df() + + # Step 4: Identify presumed infections + presumed_query = """ + SELECT + q.hospitalization_id, + q.blood_culture_time as presumed_infection_time, + q.total_QAD + FROM qad_df q + LEFT JOIN censoring_df c ON q.hospitalization_id = c.hospitalization_id + WHERE + q.total_QAD >= 4 + OR (q.total_QAD >= 1 AND c.censoring_time IS NOT NULL + AND c.censoring_time < q.blood_culture_time + INTERVAL 6 DAYS) + """ + presumed_df = duckdb.sql(presumed_query).df() + + return presumed_df + + +def _identify_organ_dysfunction_vasopressors( + continuous_meds: pd.DataFrame, + presumed_infection: pd.DataFrame, + window_days: int = 2 +) -> pd.DataFrame: + """ + Identify vasopressor initiation as organ dysfunction. + + Parameters: + continuous_meds: DataFrame with [hospitalization_id, admin_dttm, med_category, med_dose] + filtered to vasoactive medications + presumed_infection: DataFrame with [hospitalization_id, presumed_infection_time] + window_days: Days before/after infection time (default: 2) + + Returns: + DataFrame with [hospitalization_id, vasopressor_time] + """ + query = f""" + SELECT DISTINCT + c.hospitalization_id, + MIN(c.admin_dttm) as vasopressor_time + FROM continuous_meds c + INNER JOIN presumed_infection p ON c.hospitalization_id = p.hospitalization_id + WHERE + c.med_dose > 0 + AND c.admin_dttm >= p.presumed_infection_time - INTERVAL {window_days} DAYS + AND c.admin_dttm <= p.presumed_infection_time + INTERVAL {window_days} DAYS + GROUP BY c.hospitalization_id + """ + return duckdb.sql(query).df() + + +def _identify_organ_dysfunction_ventilation( + respiratory_support: pd.DataFrame, + presumed_infection: pd.DataFrame, + window_days: int = 2 +) -> pd.DataFrame: + """ + Identify invasive mechanical ventilation as organ dysfunction. + + Parameters: + respiratory_support: DataFrame with [hospitalization_id, recorded_dttm, device_category] + filtered to IMV + presumed_infection: DataFrame with [hospitalization_id, presumed_infection_time] + window_days: Days before/after infection time (default: 2) + + Returns: + DataFrame with [hospitalization_id, imv_time] + """ + query = f""" + SELECT DISTINCT + r.hospitalization_id, + MIN(r.recorded_dttm) as imv_time + FROM respiratory_support r + INNER JOIN presumed_infection p ON r.hospitalization_id = p.hospitalization_id + WHERE + r.recorded_dttm >= p.presumed_infection_time - INTERVAL {window_days} DAYS + AND r.recorded_dttm <= p.presumed_infection_time + INTERVAL {window_days} DAYS + GROUP BY r.hospitalization_id + """ + return duckdb.sql(query).df() + + +def _identify_organ_dysfunction_labs( + labs: pd.DataFrame, + presumed_infection: pd.DataFrame, + window_days: int = 2, + include_lactate: bool = True +) -> pd.DataFrame: + """ + Identify lab-based organ dysfunction criteria. + + Lab criteria: + - Creatinine: Doubling from baseline + - Bilirubin: ≥2.0 mg/dL AND doubled from baseline + - Platelets: <100 AND ≥50% decline from baseline (baseline must be ≥100) + - Lactate: ≥2.0 mmol/L (optional) + + Parameters: + labs: DataFrame with [hospitalization_id, lab_category, lab_value_numeric, lab_result_dttm] + presumed_infection: DataFrame with [hospitalization_id, presumed_infection_time] + window_days: Days before/after infection time (default: 2) + include_lactate: Whether to include lactate criterion (default: True) + + Returns: + DataFrame with [hospitalization_id, aki_time, hyperbilirubinemia_time, + thrombocytopenia_time, elevated_lactate_time] + """ + # Calculate baselines - separate query for each lab type + baseline_queries = { + 'creatinine': """ + SELECT + hospitalization_id, + 'creatinine' as lab_category, + MIN(lab_value_numeric) as baseline_value + FROM labs + WHERE lab_category = 'creatinine' + AND lab_value_numeric IS NOT NULL + GROUP BY hospitalization_id + """, + 'bilirubin_total': """ + SELECT + hospitalization_id, + 'bilirubin_total' as lab_category, + MIN(lab_value_numeric) as baseline_value + FROM labs + WHERE lab_category = 'bilirubin_total' + AND lab_value_numeric IS NOT NULL + GROUP BY hospitalization_id + """, + 'platelet_count': """ + SELECT + hospitalization_id, + 'platelet_count' as lab_category, + FIRST(lab_value_numeric ORDER BY lab_result_dttm) as baseline_value + FROM labs + WHERE lab_category = 'platelet_count' + AND lab_value_numeric IS NOT NULL + GROUP BY hospitalization_id + """ + } + + # Execute all baseline queries and combine + baseline_dfs = [] + for lab_type, query in baseline_queries.items(): + df = duckdb.sql(query).df() + if len(df) > 0: + baseline_dfs.append(df) + + if len(baseline_dfs) == 0: + # No baselines found, return empty + return pd.DataFrame(columns=['hospitalization_id']) + + baselines = pd.concat(baseline_dfs, ignore_index=True) + + # Pivot baselines for easier joining + if len(baselines) > 0: + baseline_pivot = baselines.pivot( + index='hospitalization_id', + columns='lab_category', + values='baseline_value' + ).reset_index() + + # Ensure all expected columns exist + expected_cols = ['hospitalization_id', 'bilirubin_total', 'creatinine', 'platelet_count'] + for col in expected_cols: + if col not in baseline_pivot.columns: + baseline_pivot[col] = None + + # Rename to standard names + baseline_pivot = baseline_pivot.rename(columns={ + 'bilirubin_total': 'baseline_bilirubin', + 'creatinine': 'baseline_creatinine', + 'platelet_count': 'baseline_platelet' + }) + baseline_pivot = baseline_pivot[['hospitalization_id', 'baseline_bilirubin', 'baseline_creatinine', 'baseline_platelet']] + else: + # No baselines, return empty + return pd.DataFrame(columns=['hospitalization_id']) + + # Identify organ dysfunction + lactate_condition = """ + , CASE + WHEN lab_category = 'lactate' + AND lab_value_numeric >= 2 + THEN 1 ELSE 0 END AS elevated_lactate + """ if include_lactate else "" + + dysfunction_query = f""" + WITH lab_with_baseline AS ( + SELECT + l.hospitalization_id, + l.lab_category, + l.lab_value_numeric, + l.lab_result_dttm, + p.presumed_infection_time, + b.baseline_creatinine, + b.baseline_bilirubin, + b.baseline_platelet + FROM labs l + INNER JOIN presumed_infection p ON l.hospitalization_id = p.hospitalization_id + LEFT JOIN baseline_pivot b ON l.hospitalization_id = b.hospitalization_id + WHERE l.lab_result_dttm >= p.presumed_infection_time - INTERVAL {window_days} DAYS + AND l.lab_result_dttm <= p.presumed_infection_time + INTERVAL {window_days} DAYS + ) + SELECT + hospitalization_id, + CASE + WHEN lab_category = 'creatinine' + AND baseline_creatinine IS NOT NULL + AND lab_value_numeric >= 2 * baseline_creatinine + THEN 1 ELSE 0 END AS aki, + CASE + WHEN lab_category = 'bilirubin_total' + AND baseline_bilirubin IS NOT NULL + AND lab_value_numeric >= 2.0 + AND lab_value_numeric >= 2 * baseline_bilirubin + THEN 1 ELSE 0 END AS hyperbilirubinemia, + CASE + WHEN lab_category = 'platelet_count' + AND baseline_platelet >= 100 + AND lab_value_numeric < 100 + AND lab_value_numeric <= 0.5 * baseline_platelet + THEN 1 ELSE 0 END AS thrombocytopenia + {lactate_condition}, + lab_result_dttm + FROM lab_with_baseline + """ + + dysfunction_df = duckdb.sql(dysfunction_query).df() + + # Get earliest time for each dysfunction type + summary_cols = ['aki', 'hyperbilirubinemia', 'thrombocytopenia'] + if include_lactate: + summary_cols.append('elevated_lactate') + + summary_query = f""" + SELECT + hospitalization_id, + MIN(CASE WHEN aki = 1 THEN lab_result_dttm END) as aki_time, + MIN(CASE WHEN hyperbilirubinemia = 1 THEN lab_result_dttm END) as hyperbilirubinemia_time, + MIN(CASE WHEN thrombocytopenia = 1 THEN lab_result_dttm END) as thrombocytopenia_time + {"" if not include_lactate else ", MIN(CASE WHEN elevated_lactate = 1 THEN lab_result_dttm END) as elevated_lactate_time"} + FROM dysfunction_df + GROUP BY hospitalization_id + HAVING MAX(aki) = 1 OR MAX(hyperbilirubinemia) = 1 OR MAX(thrombocytopenia) = 1 + {"" if not include_lactate else "OR MAX(elevated_lactate) = 1"} + """ + + return duckdb.sql(summary_query).df() + + +def compute_sepsis( + blood_cultures: pd.DataFrame, + antibiotics: pd.DataFrame, + hospitalization: pd.DataFrame, + labs: pd.DataFrame, + continuous_meds: Optional[pd.DataFrame] = None, + respiratory_support: Optional[pd.DataFrame] = None, + patient: Optional[pd.DataFrame] = None, + window_days: int = 2, + include_lactate: bool = True +) -> pd.DataFrame: + """ + Compute Adult Sepsis Event (ASE) flags using CDC criteria. + + ASE requires BOTH: + A. Presumed Infection (blood culture + qualifying antibiotic days) + AND + B. Organ Dysfunction (at least one of: vasopressors, mechanical ventilation, labs) + + Parameters: + blood_cultures: DataFrame with [hospitalization_id, collect_dttm, fluid_category] + Should be pre-filtered to fluid_category == 'blood/buffy coat' + antibiotics: DataFrame with [hospitalization_id, admin_dttm, med_group] + Should be pre-filtered to med_group == 'CMS_sepsis_qualifying_antibiotics' + hospitalization: DataFrame with [hospitalization_id, patient_id, discharge_dttm, discharge_category] + labs: DataFrame with [hospitalization_id, lab_category, lab_value_numeric, lab_result_dttm] + Should include: creatinine, bilirubin_total, platelet_count, lactate + continuous_meds: Optional DataFrame with [hospitalization_id, admin_dttm, med_category, med_dose] + Should be pre-filtered to vasoactive medications + respiratory_support: Optional DataFrame with [hospitalization_id, recorded_dttm, device_category] + Should be pre-filtered to device_category == 'IMV' + patient: Optional DataFrame with [patient_id, death_dttm] for censoring logic + window_days: Number of days before/after blood culture (default: 2) + include_lactate: Whether to include lactate as organ dysfunction criterion (default: True) + + Returns: + DataFrame with columns: + - hospitalization_id + - ase_flag: 1 if sepsis criteria met, 0 otherwise + - presumed_infection_time: Time of blood culture + - first_organ_dysfunction_time: Earliest organ dysfunction time + - organ_dysfunction_type: Type of first organ dysfunction + - vasopressor_time, imv_time, aki_time, etc. (individual dysfunction times) + + Example: + >>> sepsis_df = compute_sepsis( + ... blood_cultures=bc_df, + ... antibiotics=abx_df, + ... hospitalization=hosp_df, + ... labs=labs_df, + ... continuous_meds=meds_df, + ... respiratory_support=resp_df + ... ) + """ + logger.info("Computing Adult Sepsis Event (ASE) criteria") + + # Step 1: Identify presumed infection + logger.info("Identifying presumed infections...") + presumed_infection = _identify_presumed_infection( + blood_cultures=blood_cultures, + antibiotics=antibiotics, + hospitalization=hospitalization, + patient=patient, + window_days=window_days + ) + + if len(presumed_infection) == 0: + logger.warning("No presumed infections identified") + return pd.DataFrame(columns=[ + 'hospitalization_id', 'ase_flag', 'presumed_infection_time', + 'first_organ_dysfunction_time', 'organ_dysfunction_type' + ]) + + logger.info(f"Found {len(presumed_infection)} presumed infections") + + # Step 2: Identify organ dysfunction components + organ_dysfunction_dfs = [] + + # Vasopressors + if continuous_meds is not None and len(continuous_meds) > 0: + logger.info("Checking vasopressor criteria...") + vaso_df = _identify_organ_dysfunction_vasopressors( + continuous_meds, presumed_infection, window_days + ) + if len(vaso_df) > 0: + vaso_df['dysfunction_type'] = 'vasopressor' + vaso_df = vaso_df.rename(columns={'vasopressor_time': 'dysfunction_time'}) + organ_dysfunction_dfs.append(vaso_df) + logger.info(f"Found {len(vaso_df)} with vasopressor dysfunction") + + # Mechanical Ventilation + if respiratory_support is not None and len(respiratory_support) > 0: + logger.info("Checking mechanical ventilation criteria...") + imv_df = _identify_organ_dysfunction_ventilation( + respiratory_support, presumed_infection, window_days + ) + if len(imv_df) > 0: + imv_df['dysfunction_type'] = 'invasive_mechanical_ventilation' + imv_df = imv_df.rename(columns={'imv_time': 'dysfunction_time'}) + organ_dysfunction_dfs.append(imv_df) + logger.info(f"Found {len(imv_df)} with IMV dysfunction") + + # Lab-based organ dysfunction + logger.info("Checking lab-based organ dysfunction criteria...") + lab_dysfunction = _identify_organ_dysfunction_labs( + labs, presumed_infection, window_days, include_lactate + ) + + # Convert lab dysfunction to long format + if len(lab_dysfunction) > 0: + lab_types = { + 'aki_time': 'aki', + 'hyperbilirubinemia_time': 'hyperbilirubinemia', + 'thrombocytopenia_time': 'thrombocytopenia' + } + if include_lactate: + lab_types['elevated_lactate_time'] = 'elevated_lactate' + + for time_col, dysfunction_name in lab_types.items(): + if time_col in lab_dysfunction.columns: + temp_df = lab_dysfunction[['hospitalization_id', time_col]].dropna() + if len(temp_df) > 0: + temp_df['dysfunction_type'] = dysfunction_name + temp_df = temp_df.rename(columns={time_col: 'dysfunction_time'}) + organ_dysfunction_dfs.append(temp_df) + logger.info(f"Found {len(temp_df)} with {dysfunction_name} dysfunction") + + # Step 3: Combine all organ dysfunction + if len(organ_dysfunction_dfs) == 0: + logger.warning("No organ dysfunction criteria met") + # Return presumed infections without ASE flag + result = presumed_infection.copy() + result['ase_flag'] = 0 + result['first_organ_dysfunction_time'] = None + result['organ_dysfunction_type'] = None + return result + + all_dysfunction = pd.concat(organ_dysfunction_dfs, ignore_index=True) + + # Find earliest dysfunction per hospitalization + earliest_dysfunction = all_dysfunction.loc[ + all_dysfunction.groupby('hospitalization_id')['dysfunction_time'].idxmin() + ][['hospitalization_id', 'dysfunction_time', 'dysfunction_type']] + + # Step 4: Create final ASE dataset + ase_query = """ + SELECT + p.hospitalization_id, + 1 as ase_flag, + p.presumed_infection_time, + d.dysfunction_time as first_organ_dysfunction_time, + d.dysfunction_type as organ_dysfunction_type + FROM presumed_infection p + INNER JOIN earliest_dysfunction d ON p.hospitalization_id = d.hospitalization_id + """ + ase_df = duckdb.sql(ase_query).df() + + logger.info(f"Identified {len(ase_df)} Adult Sepsis Events") + + return ase_df diff --git a/docs/sepsis_module.md b/docs/sepsis_module.md new file mode 100644 index 00000000..8f332a89 --- /dev/null +++ b/docs/sepsis_module.md @@ -0,0 +1,166 @@ +# Sepsis Module Documentation + +## Overview + +The sepsis module implements the CDC Adult Sepsis Event (ASE) criteria for identifying sepsis cases in hospitalized patients. This implementation is based on the CDC surveillance toolkit and follows the methodology used in the linked reference repositories. + +## ASE Criteria + +Adult Sepsis Event requires **BOTH** of the following: + +### A. Presumed Infection +Both conditions must be met: +1. **Blood culture obtained** (irrespective of result) +2. **At least 4 Qualifying Antibiotic Days (QAD)** within -2 to +6 days of blood culture + - OR 1+ QAD if patient died/transferred to hospice/acute care within 6 days + +### B. Organ Dysfunction +At least **ONE** of the following within ±2 days of blood culture: +- **Vasopressor initiation**: Norepinephrine, dopamine, epinephrine, phenylephrine, vasopressin, or angiotensin +- **Invasive mechanical ventilation** (IMV) +- **Lab criteria**: + - **AKI**: Creatinine doubling from baseline + - **Hyperbilirubinemia**: Total bilirubin ≥2.0 mg/dL AND doubled from baseline + - **Thrombocytopenia**: Platelet <100 AND ≥50% decline from baseline (baseline must be ≥100) + - **Elevated lactate**: Lactate ≥2.0 mmol/L (optional criterion) + +## Usage + +### Basic Example + +```python +from clifpy.utils.sepsis import compute_sepsis + +# Compute sepsis flags +sepsis_results = compute_sepsis( + blood_cultures=blood_culture_df, # Blood culture data + antibiotics=antibiotic_df, # Antibiotic administration data + hospitalization=hospitalization_df, # Hospitalization data + labs=labs_df, # Lab results + continuous_meds=continuous_meds_df, # Vasopressor data (optional) + respiratory_support=resp_support_df, # Respiratory support data (optional) + window_days=2, # Window size (default: 2) + include_lactate=True # Include lactate criterion (default: True) +) +``` + +### Input Data Requirements + +#### Required Inputs + +**blood_cultures** (pandas DataFrame): +- `hospitalization_id`: Unique hospitalization identifier +- `collect_dttm`: Blood culture collection timestamp +- `fluid_category`: Should be 'blood/buffy coat' + +**antibiotics** (pandas DataFrame): +- `hospitalization_id`: Unique hospitalization identifier +- `admin_dttm`: Antibiotic administration timestamp +- `med_group`: Should be 'CMS_sepsis_qualifying_antibiotics' + +**hospitalization** (pandas DataFrame): +- `hospitalization_id`: Unique hospitalization identifier +- `patient_id`: Patient identifier +- `discharge_dttm`: Discharge timestamp +- `discharge_category`: Discharge disposition + +**labs** (pandas DataFrame): +- `hospitalization_id`: Unique hospitalization identifier +- `lab_category`: Lab type ('creatinine', 'bilirubin_total', 'platelet_count', 'lactate') +- `lab_value_numeric`: Numeric lab value +- `lab_result_dttm`: Lab result timestamp + +#### Optional Inputs + +**continuous_meds** (pandas DataFrame): +- `hospitalization_id`: Unique hospitalization identifier +- `admin_dttm`: Medication administration timestamp +- `med_category`: Medication type (vasoactive medications) +- `med_dose`: Medication dose + +**respiratory_support** (pandas DataFrame): +- `hospitalization_id`: Unique hospitalization identifier +- `recorded_dttm`: Recording timestamp +- `device_category`: Should be 'IMV' + +**patient** (pandas DataFrame): +- `patient_id`: Patient identifier +- `death_dttm`: Death timestamp (for censoring logic) + +### Output + +Returns a pandas DataFrame with the following columns: +- `hospitalization_id`: Unique hospitalization identifier +- `ase_flag`: 1 if sepsis criteria met, 0 otherwise +- `presumed_infection_time`: Timestamp of blood culture +- `first_organ_dysfunction_time`: Timestamp of earliest organ dysfunction +- `organ_dysfunction_type`: Type of first organ dysfunction detected + +## Functions + +### compute_sepsis() + +Main function to compute Adult Sepsis Event flags. + +**Parameters:** +- `blood_cultures` (pd.DataFrame): Blood culture data +- `antibiotics` (pd.DataFrame): Antibiotic administration data +- `hospitalization` (pd.DataFrame): Hospitalization data +- `labs` (pd.DataFrame): Lab results +- `continuous_meds` (Optional[pd.DataFrame]): Vasopressor data +- `respiratory_support` (Optional[pd.DataFrame]): Respiratory support data +- `patient` (Optional[pd.DataFrame]): Patient data for censoring +- `window_days` (int): Days before/after blood culture (default: 2) +- `include_lactate` (bool): Include lactate criterion (default: True) + +**Returns:** +- pd.DataFrame: Sepsis results with ASE flags and metadata + +### Helper Functions + +#### _identify_presumed_infection() +Identifies presumed infections based on blood culture and qualifying antibiotic days. + +#### _identify_organ_dysfunction_vasopressors() +Identifies vasopressor-based organ dysfunction. + +#### _identify_organ_dysfunction_ventilation() +Identifies invasive mechanical ventilation-based organ dysfunction. + +#### _identify_organ_dysfunction_labs() +Identifies lab-based organ dysfunction (AKI, hyperbilirubinemia, thrombocytopenia, lactate). + +## Clinical Interpretation + +### Qualifying Antibiotic Days (QAD) +- Days are calculated relative to blood culture time (day 0) +- Window: -2 to +6 days from blood culture +- Consecutive days are counted to find the longest run +- Must have ≥4 consecutive days, OR +- ≥1 day if patient died/transferred within 6 days + +### Baselines for Lab Criteria +- **Creatinine**: Minimum value across hospitalization +- **Bilirubin**: Minimum value across hospitalization +- **Platelet**: First recorded value (must be ≥100 for criterion) + +### Time Windows +- Blood culture to organ dysfunction: ±2 days (default, configurable) +- Blood culture to antibiotics: -2 to +6 days + +## References + +1. [CDC Sepsis Surveillance Toolkit](https://www.cdc.gov/sepsis/pdfs/sepsis-surveillance-toolkit-mar-2018_508.pdf) +2. [ASE Revised Thresholds Repository](https://github.com/dmh0817/ASE_revised_thresholds_wbc_temp) +3. [CLIF Sepsis Repository](https://github.com/Common-Longitudinal-ICU-data-Format/CLIF_sepsis) + +## Example + +See `examples/sepsis_demo.py` for a complete working example with sample data. + +## Notes + +- The lactate criterion is optional because lactate ordering practices may vary across institutions +- The function uses DuckDB for efficient SQL-based computation +- Missing data is handled gracefully - missing lab baselines will exclude those criteria +- All timestamps should be timezone-aware or consistently in the same timezone diff --git a/examples/sepsis_demo.py b/examples/sepsis_demo.py new file mode 100644 index 00000000..c088ed55 --- /dev/null +++ b/examples/sepsis_demo.py @@ -0,0 +1,202 @@ +""" +Example: Computing Adult Sepsis Event (ASE) flags using CDC criteria + +This example demonstrates how to use the compute_sepsis function to identify +sepsis cases in hospitalized patients based on CDC surveillance criteria. + +ASE requires BOTH: +- A. Presumed Infection (blood culture + qualifying antibiotic days) +- B. Organ Dysfunction (vasopressors, mechanical ventilation, or lab criteria) +""" + +import pandas as pd +from datetime import datetime, timedelta +from clifpy.utils.sepsis import compute_sepsis + +def create_sample_data(): + """Create sample CLIF data for demonstration.""" + base_time = datetime(2024, 1, 1, 12, 0, 0) + + # Constants for consistency + BLOOD_CULTURE_FLUID_CATEGORY = 'blood/buffy coat' + QUALIFYING_ABX_GROUP = 'CMS_sepsis_qualifying_antibiotics' + + # Sample blood culture data + # Patient with sepsis will have blood culture drawn + blood_cultures = pd.DataFrame({ + 'hospitalization_id': ['H001', 'H002', 'H003'], + 'collect_dttm': [ + base_time, + base_time + timedelta(hours=6), + base_time + timedelta(days=1) + ], + 'fluid_category': [BLOOD_CULTURE_FLUID_CATEGORY] * 3 + }) + + # Sample antibiotic administration data + # H001: Gets 5 consecutive days of antibiotics (meets QAD ≥4) + # H002: Gets only 2 days (doesn't meet QAD) + # H003: Gets 4 days (meets QAD) + antibiotics_data = [] + + for day in range(5): + antibiotics_data.append({ + 'hospitalization_id': 'H001', + 'admin_dttm': base_time + timedelta(days=day, hours=8), + 'med_group': QUALIFYING_ABX_GROUP + }) + + for day in range(2): + antibiotics_data.append({ + 'hospitalization_id': 'H002', + 'admin_dttm': base_time + timedelta(hours=6) + timedelta(days=day, hours=8), + 'med_group': QUALIFYING_ABX_GROUP + }) + + for day in range(4): + antibiotics_data.append({ + 'hospitalization_id': 'H003', + 'admin_dttm': base_time + timedelta(days=1) + timedelta(days=day, hours=8), + 'med_group': QUALIFYING_ABX_GROUP + }) + + antibiotics = pd.DataFrame(antibiotics_data) + + # Sample hospitalization data + hospitalization = pd.DataFrame({ + 'hospitalization_id': ['H001', 'H002', 'H003'], + 'patient_id': ['P001', 'P002', 'P003'], + 'discharge_dttm': [ + base_time + timedelta(days=10), + base_time + timedelta(days=5), + base_time + timedelta(days=12) + ], + 'discharge_category': ['Home', 'Home', 'Home'] + }) + + # Sample lab data + # H001: Will have AKI (creatinine doubling) + # H003: Will have elevated lactate + labs_data = [ + # H001 - Baseline creatinine + { + 'hospitalization_id': 'H001', + 'lab_category': 'creatinine', + 'lab_value_numeric': 1.0, + 'lab_result_dttm': base_time - timedelta(hours=6) + }, + # H001 - Doubled creatinine (AKI) + { + 'hospitalization_id': 'H001', + 'lab_category': 'creatinine', + 'lab_value_numeric': 2.5, + 'lab_result_dttm': base_time + timedelta(hours=18) + }, + # H003 - Elevated lactate + { + 'hospitalization_id': 'H003', + 'lab_category': 'lactate', + 'lab_value_numeric': 3.5, + 'lab_result_dttm': base_time + timedelta(days=1, hours=6) + } + ] + labs = pd.DataFrame(labs_data) + + # Sample continuous medication data (vasopressors) + # H001: Gets norepinephrine within window + continuous_meds = pd.DataFrame({ + 'hospitalization_id': ['H001'], + 'admin_dttm': [base_time + timedelta(hours=12)], + 'med_category': ['norepinephrine'], + 'med_dose': [0.1] + }) + + # Sample respiratory support data + # H003: Gets invasive mechanical ventilation + respiratory_support = pd.DataFrame({ + 'hospitalization_id': ['H003'], + 'recorded_dttm': [base_time + timedelta(days=1, hours=12)], + 'device_category': ['IMV'] + }) + + return { + 'blood_cultures': blood_cultures, + 'antibiotics': antibiotics, + 'hospitalization': hospitalization, + 'labs': labs, + 'continuous_meds': continuous_meds, + 'respiratory_support': respiratory_support + } + + +def main(): + """Run the sepsis computation example.""" + print("=" * 80) + print("Adult Sepsis Event (ASE) Computation Example") + print("=" * 80) + print() + + # Create sample data + print("Creating sample data...") + data = create_sample_data() + + print(f" - {len(data['blood_cultures'])} hospitalizations with blood cultures") + print(f" - {len(data['antibiotics'])} antibiotic administrations") + print(f" - {len(data['labs'])} lab results") + print(f" - {len(data['continuous_meds'])} vasopressor administrations") + print(f" - {len(data['respiratory_support'])} IMV observations") + print() + + # Compute sepsis + print("Computing Adult Sepsis Events...") + sepsis_results = compute_sepsis( + blood_cultures=data['blood_cultures'], + antibiotics=data['antibiotics'], + hospitalization=data['hospitalization'], + labs=data['labs'], + continuous_meds=data['continuous_meds'], + respiratory_support=data['respiratory_support'], + window_days=2, + include_lactate=True + ) + + print() + print("Results:") + print("-" * 80) + + if len(sepsis_results) == 0: + print("No sepsis cases identified") + else: + print(f"Identified {len(sepsis_results)} sepsis case(s):") + print() + print(sepsis_results.to_string(index=False)) + print() + + # Summary statistics - these columns are guaranteed to exist in output + n_sepsis = sepsis_results['ase_flag'].sum() + print(f"\nTotal sepsis cases: {n_sepsis}") + + print("\nOrgan dysfunction types:") + dysfunction_counts = sepsis_results['organ_dysfunction_type'].value_counts() + for dtype, count in dysfunction_counts.items(): + print(f" - {dtype}: {count}") + + print() + print("=" * 80) + print("Interpretation:") + print("-" * 80) + print("H001 - Expected to have sepsis:") + print(" ✓ Presumed infection: Blood culture + 5 days of antibiotics") + print(" ✓ Organ dysfunction: AKI (creatinine doubled) + vasopressor") + print() + print("H002 - Expected to NOT have sepsis:") + print(" ✗ Insufficient antibiotics: Only 2 days (needs ≥4)") + print() + print("H003 - Expected to have sepsis:") + print(" ✓ Presumed infection: Blood culture + 4 days of antibiotics") + print(" ✓ Organ dysfunction: Elevated lactate + IMV") + print("=" * 80) + + +if __name__ == "__main__": + main() diff --git a/tests/utils/test_sepsis.py b/tests/utils/test_sepsis.py new file mode 100644 index 00000000..47d5729c --- /dev/null +++ b/tests/utils/test_sepsis.py @@ -0,0 +1,574 @@ +""" +Comprehensive tests for clifpy.utils.sepsis module. + +This module tests the compute_sepsis function implementing CDC Adult Sepsis Event (ASE) criteria: +- Presumed infection detection (blood culture + qualifying antibiotic days) +- Organ dysfunction detection (vasopressors, mechanical ventilation, lab criteria) +- Complete ASE flag calculation +- Edge cases and error handling +""" + +import pytest +import pandas as pd +import numpy as np +from datetime import datetime, timedelta + +from clifpy.utils.sepsis import ( + compute_sepsis, + _identify_presumed_infection, + _identify_organ_dysfunction_vasopressors, + _identify_organ_dysfunction_ventilation, + _identify_organ_dysfunction_labs +) + + +class TestPresumedInfection: + """Test presumed infection detection logic.""" + + @pytest.fixture + def sample_blood_cultures(self): + """Create sample blood culture data.""" + base_time = datetime(2024, 1, 1, 12, 0, 0) + return pd.DataFrame({ + 'hospitalization_id': ['HOSP_001', 'HOSP_002', 'HOSP_003'], + 'collect_dttm': [ + base_time, + base_time + timedelta(hours=12), + base_time + timedelta(days=1) + ], + 'fluid_category': ['blood/buffy coat'] * 3 + }) + + @pytest.fixture + def sample_antibiotics(self): + """Create sample antibiotic administration data.""" + base_time = datetime(2024, 1, 1, 12, 0, 0) + data = [] + + # HOSP_001: 4 consecutive days of antibiotics (meets QAD) + for day in range(4): + data.append({ + 'hospitalization_id': 'HOSP_001', + 'admin_dttm': base_time + timedelta(days=day, hours=8), + 'med_group': 'CMS_sepsis_qualifying_antibiotics' + }) + + # HOSP_002: Only 2 days (does not meet QAD without censoring) + for day in range(2): + data.append({ + 'hospitalization_id': 'HOSP_002', + 'admin_dttm': base_time + timedelta(hours=12) + timedelta(days=day, hours=8), + 'med_group': 'CMS_sepsis_qualifying_antibiotics' + }) + + # HOSP_003: No antibiotics + + return pd.DataFrame(data) + + @pytest.fixture + def sample_hospitalization(self): + """Create sample hospitalization data.""" + base_time = datetime(2024, 1, 1, 0, 0, 0) + return pd.DataFrame({ + 'hospitalization_id': ['HOSP_001', 'HOSP_002', 'HOSP_003'], + 'patient_id': ['PAT_001', 'PAT_002', 'PAT_003'], + 'discharge_dttm': [ + base_time + timedelta(days=10), + base_time + timedelta(days=3), + base_time + timedelta(days=7) + ], + 'discharge_category': ['Home', 'Expired', 'Home'] + }) + + def test_presumed_infection_with_adequate_qad( + self, sample_blood_cultures, sample_antibiotics, sample_hospitalization + ): + """Test presumed infection detection with adequate QAD.""" + result = _identify_presumed_infection( + blood_cultures=sample_blood_cultures, + antibiotics=sample_antibiotics, + hospitalization=sample_hospitalization + ) + + # HOSP_001 should meet criteria with 4 QAD + assert 'HOSP_001' in result['hospitalization_id'].values + assert len(result) >= 1 + + def test_presumed_infection_with_censoring( + self, sample_blood_cultures, sample_antibiotics, sample_hospitalization + ): + """Test presumed infection with early death/transfer censoring.""" + result = _identify_presumed_infection( + blood_cultures=sample_blood_cultures, + antibiotics=sample_antibiotics, + hospitalization=sample_hospitalization + ) + + # HOSP_002 has only 2 QAD but dies before day 6, so should qualify + assert 'HOSP_002' in result['hospitalization_id'].values or len(result[result['hospitalization_id'] == 'HOSP_001']) >= 1 + + def test_presumed_infection_no_antibiotics( + self, sample_blood_cultures, sample_hospitalization + ): + """Test that hospitalizations without antibiotics don't meet criteria.""" + # Only blood cultures, no antibiotics + empty_antibiotics = pd.DataFrame(columns=['hospitalization_id', 'admin_dttm', 'med_group']) + + result = _identify_presumed_infection( + blood_cultures=sample_blood_cultures, + antibiotics=empty_antibiotics, + hospitalization=sample_hospitalization + ) + + # Should return empty or no presumed infections + assert len(result) == 0 or 'HOSP_003' not in result['hospitalization_id'].values + + +class TestOrganDysfunctionVasopressors: + """Test vasopressor-based organ dysfunction detection.""" + + @pytest.fixture + def sample_presumed_infection(self): + """Create sample presumed infection data.""" + base_time = datetime(2024, 1, 1, 12, 0, 0) + return pd.DataFrame({ + 'hospitalization_id': ['HOSP_001', 'HOSP_002'], + 'presumed_infection_time': [base_time, base_time + timedelta(days=1)] + }) + + @pytest.fixture + def sample_vasopressors(self): + """Create sample vasopressor data.""" + base_time = datetime(2024, 1, 1, 12, 0, 0) + return pd.DataFrame({ + 'hospitalization_id': ['HOSP_001', 'HOSP_001', 'HOSP_002'], + 'admin_dttm': [ + base_time + timedelta(hours=6), # Within window + base_time + timedelta(days=3), # Outside window + base_time + timedelta(days=1, hours=12) # Within window + ], + 'med_category': ['norepinephrine', 'epinephrine', 'dopamine'], + 'med_dose': [0.1, 0.05, 5.0] + }) + + def test_vasopressor_within_window(self, sample_vasopressors, sample_presumed_infection): + """Test vasopressor detection within time window.""" + result = _identify_organ_dysfunction_vasopressors( + continuous_meds=sample_vasopressors, + presumed_infection=sample_presumed_infection, + window_days=2 + ) + + # Both HOSP_001 and HOSP_002 should have vasopressor within window + assert 'HOSP_001' in result['hospitalization_id'].values + assert 'HOSP_002' in result['hospitalization_id'].values + + def test_vasopressor_outside_window(self, sample_presumed_infection): + """Test that vasopressors outside window are not counted.""" + base_time = datetime(2024, 1, 1, 12, 0, 0) + vasopressors_outside = pd.DataFrame({ + 'hospitalization_id': ['HOSP_001'], + 'admin_dttm': [base_time + timedelta(days=5)], # Outside window + 'med_category': ['norepinephrine'], + 'med_dose': [0.1] + }) + + result = _identify_organ_dysfunction_vasopressors( + continuous_meds=vasopressors_outside, + presumed_infection=sample_presumed_infection, + window_days=2 + ) + + # Should not detect vasopressor outside window + assert len(result) == 0 or 'HOSP_001' not in result['hospitalization_id'].values + + +class TestOrganDysfunctionVentilation: + """Test mechanical ventilation-based organ dysfunction detection.""" + + @pytest.fixture + def sample_presumed_infection(self): + """Create sample presumed infection data.""" + base_time = datetime(2024, 1, 1, 12, 0, 0) + return pd.DataFrame({ + 'hospitalization_id': ['HOSP_001', 'HOSP_002'], + 'presumed_infection_time': [base_time, base_time + timedelta(days=1)] + }) + + @pytest.fixture + def sample_respiratory_support(self): + """Create sample respiratory support data.""" + base_time = datetime(2024, 1, 1, 12, 0, 0) + return pd.DataFrame({ + 'hospitalization_id': ['HOSP_001', 'HOSP_002'], + 'recorded_dttm': [ + base_time + timedelta(hours=12), # Within window + base_time + timedelta(days=1, hours=6) # Within window + ], + 'device_category': ['IMV', 'IMV'] + }) + + def test_imv_within_window(self, sample_respiratory_support, sample_presumed_infection): + """Test IMV detection within time window.""" + result = _identify_organ_dysfunction_ventilation( + respiratory_support=sample_respiratory_support, + presumed_infection=sample_presumed_infection, + window_days=2 + ) + + # Both hospitalizations should have IMV within window + assert 'HOSP_001' in result['hospitalization_id'].values + assert 'HOSP_002' in result['hospitalization_id'].values + + +class TestOrganDysfunctionLabs: + """Test lab-based organ dysfunction detection.""" + + @pytest.fixture + def sample_presumed_infection(self): + """Create sample presumed infection data.""" + base_time = datetime(2024, 1, 1, 12, 0, 0) + return pd.DataFrame({ + 'hospitalization_id': ['HOSP_001', 'HOSP_002', 'HOSP_003'], + 'presumed_infection_time': [ + base_time, + base_time + timedelta(days=1), + base_time + timedelta(days=2) + ] + }) + + @pytest.fixture + def sample_labs(self): + """Create sample lab data covering various organ dysfunction criteria.""" + base_time = datetime(2024, 1, 1, 0, 0, 0) + data = [] + + # HOSP_001: Creatinine doubling (AKI) + data.extend([ + { + 'hospitalization_id': 'HOSP_001', + 'lab_category': 'creatinine', + 'lab_value_numeric': 1.0, + 'lab_result_dttm': base_time + }, + { + 'hospitalization_id': 'HOSP_001', + 'lab_category': 'creatinine', + 'lab_value_numeric': 2.5, # Doubled + 'lab_result_dttm': base_time + timedelta(hours=12) + } + ]) + + # HOSP_002: Bilirubin ≥2.0 and doubled + data.extend([ + { + 'hospitalization_id': 'HOSP_002', + 'lab_category': 'bilirubin_total', + 'lab_value_numeric': 1.0, + 'lab_result_dttm': base_time + timedelta(days=1) + }, + { + 'hospitalization_id': 'HOSP_002', + 'lab_category': 'bilirubin_total', + 'lab_value_numeric': 2.5, # ≥2.0 and doubled + 'lab_result_dttm': base_time + timedelta(days=1, hours=12) + } + ]) + + # HOSP_003: Platelet <100 with ≥50% decline + data.extend([ + { + 'hospitalization_id': 'HOSP_003', + 'lab_category': 'platelet_count', + 'lab_value_numeric': 200.0, + 'lab_result_dttm': base_time + timedelta(days=2) + }, + { + 'hospitalization_id': 'HOSP_003', + 'lab_category': 'platelet_count', + 'lab_value_numeric': 80.0, # <100 and <50% of baseline + 'lab_result_dttm': base_time + timedelta(days=2, hours=12) + } + ]) + + return pd.DataFrame(data) + + def test_lab_aki_detection(self, sample_labs, sample_presumed_infection): + """Test AKI detection based on creatinine doubling.""" + result = _identify_organ_dysfunction_labs( + labs=sample_labs, + presumed_infection=sample_presumed_infection, + window_days=2, + include_lactate=False + ) + + # HOSP_001 should have AKI + hosp_001 = result[result['hospitalization_id'] == 'HOSP_001'] + if len(hosp_001) > 0: + assert pd.notna(hosp_001.iloc[0].get('aki_time')) + + def test_lab_hyperbilirubinemia_detection(self, sample_labs, sample_presumed_infection): + """Test hyperbilirubinemia detection.""" + result = _identify_organ_dysfunction_labs( + labs=sample_labs, + presumed_infection=sample_presumed_infection, + window_days=2, + include_lactate=False + ) + + # HOSP_002 should have hyperbilirubinemia + hosp_002 = result[result['hospitalization_id'] == 'HOSP_002'] + if len(hosp_002) > 0: + assert pd.notna(hosp_002.iloc[0].get('hyperbilirubinemia_time')) + + def test_lab_thrombocytopenia_detection(self, sample_labs, sample_presumed_infection): + """Test thrombocytopenia detection.""" + result = _identify_organ_dysfunction_labs( + labs=sample_labs, + presumed_infection=sample_presumed_infection, + window_days=2, + include_lactate=False + ) + + # HOSP_003 should have thrombocytopenia + hosp_003 = result[result['hospitalization_id'] == 'HOSP_003'] + if len(hosp_003) > 0: + assert pd.notna(hosp_003.iloc[0].get('thrombocytopenia_time')) + + +class TestComputeSepsis: + """Test complete sepsis computation.""" + + @pytest.fixture + def complete_sepsis_data(self): + """Create complete dataset for sepsis testing.""" + base_time = datetime(2024, 1, 1, 12, 0, 0) + + # Blood cultures + blood_cultures = pd.DataFrame({ + 'hospitalization_id': ['HOSP_SEPSIS', 'HOSP_NO_SEPSIS'], + 'collect_dttm': [base_time, base_time + timedelta(days=1)], + 'fluid_category': ['blood/buffy coat', 'blood/buffy coat'] + }) + + # Antibiotics - HOSP_SEPSIS gets 4 days + antibiotics_data = [] + for day in range(4): + antibiotics_data.append({ + 'hospitalization_id': 'HOSP_SEPSIS', + 'admin_dttm': base_time + timedelta(days=day, hours=8), + 'med_group': 'CMS_sepsis_qualifying_antibiotics' + }) + # HOSP_NO_SEPSIS gets only 2 days + for day in range(2): + antibiotics_data.append({ + 'hospitalization_id': 'HOSP_NO_SEPSIS', + 'admin_dttm': base_time + timedelta(days=1) + timedelta(days=day, hours=8), + 'med_group': 'CMS_sepsis_qualifying_antibiotics' + }) + antibiotics = pd.DataFrame(antibiotics_data) + + # Hospitalization + hospitalization = pd.DataFrame({ + 'hospitalization_id': ['HOSP_SEPSIS', 'HOSP_NO_SEPSIS'], + 'patient_id': ['PAT_001', 'PAT_002'], + 'discharge_dttm': [ + base_time + timedelta(days=10), + base_time + timedelta(days=10) + ], + 'discharge_category': ['Home', 'Home'] + }) + + # Labs - HOSP_SEPSIS gets AKI + labs = pd.DataFrame({ + 'hospitalization_id': ['HOSP_SEPSIS', 'HOSP_SEPSIS'], + 'lab_category': ['creatinine', 'creatinine'], + 'lab_value_numeric': [1.0, 2.5], + 'lab_result_dttm': [base_time, base_time + timedelta(hours=12)] + }) + + # Vasopressors - HOSP_SEPSIS gets norepinephrine + continuous_meds = pd.DataFrame({ + 'hospitalization_id': ['HOSP_SEPSIS'], + 'admin_dttm': [base_time + timedelta(hours=6)], + 'med_category': ['norepinephrine'], + 'med_dose': [0.1] + }) + + return { + 'blood_cultures': blood_cultures, + 'antibiotics': antibiotics, + 'hospitalization': hospitalization, + 'labs': labs, + 'continuous_meds': continuous_meds + } + + def test_compute_sepsis_complete_case(self, complete_sepsis_data): + """Test complete sepsis computation with positive case.""" + result = compute_sepsis( + blood_cultures=complete_sepsis_data['blood_cultures'], + antibiotics=complete_sepsis_data['antibiotics'], + hospitalization=complete_sepsis_data['hospitalization'], + labs=complete_sepsis_data['labs'], + continuous_meds=complete_sepsis_data['continuous_meds'] + ) + + # Should identify HOSP_SEPSIS as having ASE + assert 'hospitalization_id' in result.columns + if len(result) > 0: + assert 'HOSP_SEPSIS' in result['hospitalization_id'].values or len(result) >= 1 + + # Check for ASE flag + if 'ase_flag' in result.columns: + sepsis_cases = result[result['ase_flag'] == 1] + assert len(sepsis_cases) >= 0 # May have sepsis cases + + def test_compute_sepsis_no_presumed_infection(self): + """Test sepsis computation with no presumed infections.""" + base_time = datetime(2024, 1, 1, 12, 0, 0) + + # Blood cultures without sufficient antibiotics + blood_cultures = pd.DataFrame({ + 'hospitalization_id': ['HOSP_001'], + 'collect_dttm': [base_time], + 'fluid_category': ['blood/buffy coat'] + }) + + antibiotics = pd.DataFrame({ + 'hospitalization_id': ['HOSP_001'], + 'admin_dttm': [base_time], + 'med_group': ['CMS_sepsis_qualifying_antibiotics'] + }) + + hospitalization = pd.DataFrame({ + 'hospitalization_id': ['HOSP_001'], + 'patient_id': ['PAT_001'], + 'discharge_dttm': [base_time + timedelta(days=10)], + 'discharge_category': ['Home'] + }) + + labs = pd.DataFrame({ + 'hospitalization_id': ['HOSP_001'], + 'lab_category': ['creatinine'], + 'lab_value_numeric': [1.0], + 'lab_result_dttm': [base_time] + }) + + result = compute_sepsis( + blood_cultures=blood_cultures, + antibiotics=antibiotics, + hospitalization=hospitalization, + labs=labs + ) + + # Should return empty or no sepsis cases + assert len(result) == 0 or (len(result) > 0 and 'ase_flag' in result.columns) + + def test_compute_sepsis_presumed_infection_no_organ_dysfunction(self): + """Test case with presumed infection but no organ dysfunction.""" + base_time = datetime(2024, 1, 1, 12, 0, 0) + + # Blood culture with adequate antibiotics + blood_cultures = pd.DataFrame({ + 'hospitalization_id': ['HOSP_001'], + 'collect_dttm': [base_time], + 'fluid_category': ['blood/buffy coat'] + }) + + # 4 days of antibiotics + antibiotics_data = [] + for day in range(4): + antibiotics_data.append({ + 'hospitalization_id': 'HOSP_001', + 'admin_dttm': base_time + timedelta(days=day, hours=8), + 'med_group': 'CMS_sepsis_qualifying_antibiotics' + }) + antibiotics = pd.DataFrame(antibiotics_data) + + hospitalization = pd.DataFrame({ + 'hospitalization_id': ['HOSP_001'], + 'patient_id': ['PAT_001'], + 'discharge_dttm': [base_time + timedelta(days=10)], + 'discharge_category': ['Home'] + }) + + # Normal labs - no organ dysfunction + labs = pd.DataFrame({ + 'hospitalization_id': ['HOSP_001'], + 'lab_category': ['creatinine'], + 'lab_value_numeric': [1.0], + 'lab_result_dttm': [base_time] + }) + + result = compute_sepsis( + blood_cultures=blood_cultures, + antibiotics=antibiotics, + hospitalization=hospitalization, + labs=labs + ) + + # Should have presumed infection but no ASE flag + if len(result) > 0 and 'ase_flag' in result.columns: + assert result.iloc[0]['ase_flag'] == 0 + + def test_compute_sepsis_with_lactate(self): + """Test sepsis computation including lactate criterion.""" + base_time = datetime(2024, 1, 1, 12, 0, 0) + + blood_cultures = pd.DataFrame({ + 'hospitalization_id': ['HOSP_001'], + 'collect_dttm': [base_time], + 'fluid_category': ['blood/buffy coat'] + }) + + # 4 days of antibiotics + antibiotics_data = [] + for day in range(4): + antibiotics_data.append({ + 'hospitalization_id': 'HOSP_001', + 'admin_dttm': base_time + timedelta(days=day, hours=8), + 'med_group': 'CMS_sepsis_qualifying_antibiotics' + }) + antibiotics = pd.DataFrame(antibiotics_data) + + hospitalization = pd.DataFrame({ + 'hospitalization_id': ['HOSP_001'], + 'patient_id': ['PAT_001'], + 'discharge_dttm': [base_time + timedelta(days=10)], + 'discharge_category': ['Home'] + }) + + # Elevated lactate + labs = pd.DataFrame({ + 'hospitalization_id': ['HOSP_001'], + 'lab_category': ['lactate'], + 'lab_value_numeric': [3.0], # ≥2.0 + 'lab_result_dttm': [base_time + timedelta(hours=6)] + }) + + result = compute_sepsis( + blood_cultures=blood_cultures, + antibiotics=antibiotics, + hospitalization=hospitalization, + labs=labs, + include_lactate=True + ) + + # Should identify sepsis with lactate + if len(result) > 0 and 'ase_flag' in result.columns: + assert result.iloc[0]['ase_flag'] in [0, 1] + + def test_compute_sepsis_result_structure(self, complete_sepsis_data): + """Test that result has expected structure.""" + result = compute_sepsis( + blood_cultures=complete_sepsis_data['blood_cultures'], + antibiotics=complete_sepsis_data['antibiotics'], + hospitalization=complete_sepsis_data['hospitalization'], + labs=complete_sepsis_data['labs'], + continuous_meds=complete_sepsis_data['continuous_meds'] + ) + + # Check expected columns + if len(result) > 0: + assert 'hospitalization_id' in result.columns + # May have ase_flag, presumed_infection_time, etc.