diff --git a/src/extension/pythonTestingPipeline/scripts/pipeline/agents.py b/src/extension/pythonTestingPipeline/scripts/pipeline/agents.py index 77583a6..f94549d 100644 --- a/src/extension/pythonTestingPipeline/scripts/pipeline/agents.py +++ b/src/extension/pythonTestingPipeline/scripts/pipeline/agents.py @@ -244,6 +244,8 @@ def run( 4. IMPORT source modules directly for coverage (add project root to sys.path first) 5. Use mocking for side effects (network, file I/O) 6. Use proc.terminate() instead of signal.SIGINT for stopping processes +7. Avoid top-level imports of source modules when they trigger optional dependencies or side effects; import lazily inside tests after patching +8. Do not leave global state dirty; avoid raw os.chdir when possible and restore cwd/env if you must change them Generate a complete, executable PyTest file.""" @@ -309,12 +311,15 @@ def improve_tests( coverage_percentage: float, uncovered_areas: str, syntax_errors: str = "", + validation_errors: str = "", security_issues: List[SecurityIssue] = None, ) -> Tuple[str, Path]: """Generates additional tests to improve coverage and address security issues.""" reasons = [] if coverage_percentage < 90.0: reasons.append(f"coverage ({coverage_percentage:.1f}%) below 90%") + if validation_errors: + reasons.append("semantic validation failed") if security_issues: severe = [ si for si in security_issues if si.severity in ("critical", "high") @@ -353,6 +358,19 @@ def improve_tests( - Ensure all strings are properly closed - Ensure all parentheses, brackets, and braces are balanced - Make sure indentation is consistent (use 4 spaces) +""" + + validation_context = "" + if validation_errors: + validation_context = f"""\n\nCRITICAL: The previous test file failed semantic validation and must be fixed: +{validation_errors} + +Common issues to avoid: +- The suite must pass `python -m py_compile` +- The suite must be collectable with `pytest --collect-only` +- Avoid top-level imports of source modules with optional dependencies or side effects +- Use lazy imports after patching optional dependencies +- Do not leave cwd, env vars, or other process-global state dirty """ # Build security context if there are security issues @@ -397,7 +415,7 @@ def improve_tests( - NEVER use `signal.SIGINT` to stop processes (not supported on Windows) - Use `proc.terminate()` or `proc.kill()` to stop subprocesses - For keyboard interrupt tests, mock the behavior instead of sending real signals -{error_context}{security_context} +{error_context}{validation_context}{security_context} Existing tests (may have errors - fix them): {existing_tests[:1500]} @@ -414,6 +432,8 @@ def improve_tests( 4. Each test function must start with 'test_' 5. Use mocking for side effects (network, file I/O) 6. Use proc.terminate() instead of signal.SIGINT for stopping processes +7. Avoid top-level imports of source modules when they trigger optional dependencies or side effects; import lazily inside tests after patching +8. Do not leave global state dirty; avoid raw os.chdir when possible and restore cwd/env if you must change them Generate a complete, executable PyTest file that: 1. Fixes any existing syntax errors diff --git a/src/extension/pythonTestingPipeline/scripts/pipeline/governance.py b/src/extension/pythonTestingPipeline/scripts/pipeline/governance.py index 01ab107..3a8d55b 100644 --- a/src/extension/pythonTestingPipeline/scripts/pipeline/governance.py +++ b/src/extension/pythonTestingPipeline/scripts/pipeline/governance.py @@ -124,7 +124,7 @@ def get_audit_trail(self) -> dict: failure_breakdown[key] = failure_breakdown.get(key, 0) + 1 return { - "governance_version": "1.1", + "governance_version": "1.2", "pipeline_start": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(self._start) ), @@ -141,7 +141,7 @@ def get_audit_trail(self) -> dict: "failed_validations": failed, "total_failures": len(self.failures), "failure_breakdown": failure_breakdown, - "status": "PASS" if failed == 0 else "REVIEW_NEEDED", + "status": "PASS" if failed == 0 and not self.failures else "REVIEW_NEEDED", }, } diff --git a/src/extension/pythonTestingPipeline/scripts/pipeline/test_runner.py b/src/extension/pythonTestingPipeline/scripts/pipeline/test_runner.py index c35e829..293fa40 100644 --- a/src/extension/pythonTestingPipeline/scripts/pipeline/test_runner.py +++ b/src/extension/pythonTestingPipeline/scripts/pipeline/test_runner.py @@ -1,10 +1,12 @@ -"""Test execution and dependency management for the Python Testing Pipeline.""" +"""Test execution and dependency management for the Python Testing Pipeline.""" -import importlib.metadata +import ast import json import re +import shutil import subprocess import sys +import time from pathlib import Path from typing import Dict, List, Optional, Tuple @@ -20,15 +22,474 @@ "run_tests", "parse_pytest_output", "parse_coverage_json", + "validate_generated_test_file", ] +_STDLIB_MODULES = set(getattr(sys, "stdlib_module_names", ())) | {"__future__"} +_PACKAGE_TO_IMPORT_CANDIDATES = { + "beautifulsoup4": ["bs4"], + "opencv-python": ["cv2"], + "pillow": ["PIL"], + "pyinstaller": ["PyInstaller"], + "pytest-asyncio": ["pytest_asyncio"], + "pytest-cov": ["pytest_cov"], + "pytest-timeout": ["pytest_timeout"], + "python-dotenv": ["dotenv"], + "pyyaml": ["yaml"], + "scikit-learn": ["sklearn"], +} +_IMPORT_TO_PACKAGE = { + "bs4": "beautifulsoup4", + "cv2": "opencv-python", + "dotenv": "python-dotenv", + "pil": "Pillow", + "pyinstaller": "PyInstaller", + "pytest_asyncio": "pytest-asyncio", + "pytest_cov": "pytest-cov", + "pytest_timeout": "pytest-timeout", + "sklearn": "scikit-learn", + "yaml": "PyYAML", +} + + +def _build_pytest_target(test_file: Path, run_cwd: Path) -> str: + """Return a pytest target path that is valid from the chosen working dir.""" + test_file = test_file.resolve() + run_cwd = run_cwd.resolve() + + try: + return str(test_file.relative_to(run_cwd)) + except ValueError: + return str(test_file) + + +def _build_coverage_json_path(codebase_path: Path) -> Path: + """Create a unique coverage output path for a single pytest run.""" + return codebase_path / f"coverage.{time.time_ns()}.json" + + +def _strip_version_spec(package: str) -> str: + """Normalize a requirement string down to its package name.""" + return ( + package.split("==")[0] + .split(">=")[0] + .split("<=")[0] + .split(">")[0] + .split("<")[0] + .strip() + ) + + +def _dedupe_preserve_order(values: List[str]) -> List[str]: + """Return values with duplicates removed while keeping the original order.""" + seen = set() + ordered = [] + for value in values: + if value and value not in seen: + seen.add(value) + ordered.append(value) + return ordered + + +def _build_codebase_modules(codebase_path: Path) -> set[str]: + """Collect importable root module names from the codebase.""" + modules = set() + if not codebase_path.exists(): + return modules + for py_file in codebase_path.rglob("*.py"): + parts = {part.lower() for part in py_file.parts} + if "__pycache__" in parts or "tests" in parts: + continue + modules.add(py_file.stem) + return modules + + +def _module_path_for_name(module_name: str, codebase_path: Path) -> Optional[Path]: + """Resolve a simple module name to a file inside the codebase.""" + direct = codebase_path / f"{module_name}.py" + if direct.exists(): + return direct + package_init = codebase_path / module_name / "__init__.py" + if package_init.exists(): + return package_init + return None + + +def _package_to_import_candidates(package: str) -> List[str]: + """Map a package name to the most likely import roots.""" + stripped = _strip_version_spec(package) + normalized = stripped.lower().replace("_", "-") + candidates = list(_PACKAGE_TO_IMPORT_CANDIDATES.get(normalized, ())) + candidates.extend([stripped, normalized.replace("-", "_")]) + return _dedupe_preserve_order(candidates) + + +def _package_for_import(module_name: str) -> str: + """Map an import root back to a likely installable package name.""" + normalized = module_name.lower().replace("_", "-") + return _IMPORT_TO_PACKAGE.get(normalized, module_name) + + +def _extract_import_roots_from_code(test_code: str) -> List[str]: + """Extract imported root modules from generated test code.""" + try: + tree = ast.parse(test_code) + except SyntaxError: + return [] + + modules: List[str] = [] + for node in ast.walk(tree): + if isinstance(node, ast.Import): + modules.extend(alias.name.split(".")[0] for alias in node.names) + elif isinstance(node, ast.ImportFrom) and node.module: + modules.append(node.module.split(".")[0]) + return _dedupe_preserve_order(modules) + + +def _probe_module_import( + module_name: str, + codebase_path: Path, + cwd: Path, + timeout: int = 30, +) -> Tuple[bool, str, Optional[str]]: + """Attempt to import a module in an isolated subprocess.""" + probe = f""" +import importlib +import sys +import traceback + +sys.path.insert(0, {json.dumps(str(codebase_path))}) + +try: + importlib.import_module({json.dumps(module_name)}) +except Exception: + traceback.print_exc() + raise +""" + result = subprocess.run( + [sys.executable, "-c", probe], + capture_output=True, + text=True, + timeout=timeout, + cwd=cwd, + ) + output = (result.stdout + "\n" + result.stderr).strip() + missing_module = None + match = re.search(r"No module named ['\"]([^'\"]+)['\"]", output) + if match: + missing_module = match.group(1).split(".")[0] + return result.returncode == 0, output, missing_module + + +def _is_main_guard(node: ast.If) -> bool: + """Check whether an if-statement is `if __name__ == "__main__":`.""" + test = node.test + return ( + isinstance(test, ast.Compare) + and isinstance(test.left, ast.Name) + and test.left.id == "__name__" + and len(test.ops) == 1 + and isinstance(test.ops[0], ast.Eq) + and len(test.comparators) == 1 + and isinstance(test.comparators[0], ast.Constant) + and test.comparators[0].value == "__main__" + ) + + +def _node_has_import_side_effect(node: ast.stmt) -> bool: + """Detect obvious module-level side effects in an imported source module.""" + if isinstance(node, ast.Expr): + return not ( + isinstance(node.value, ast.Constant) and isinstance(node.value.value, str) + ) + if isinstance(node, (ast.Assign, ast.AnnAssign, ast.AugAssign)): + value = getattr(node, "value", None) + return value is not None and any( + isinstance(child, ast.Call) for child in ast.walk(value) + ) + if isinstance(node, ast.If): + return not _is_main_guard(node) + if isinstance( + node, + ( + ast.Import, + ast.ImportFrom, + ast.FunctionDef, + ast.AsyncFunctionDef, + ast.ClassDef, + ast.Pass, + ), + ): + return False + return True + + +def _analyze_top_level_import_risk(module_name: str, codebase_path: Path) -> List[str]: + """Inspect a codebase module for import-time dependency or side-effect risks.""" + module_path = _module_path_for_name(module_name, codebase_path) + if module_path is None: + return [] + + try: + tree = ast.parse(module_path.read_text(encoding="utf-8", errors="ignore")) + except SyntaxError: + return [ + f"Top-level import of '{module_name}' is unsafe because {module_path.name} does not parse cleanly" + ] + + codebase_modules = _build_codebase_modules(codebase_path) + reasons = [] + external_imports = [] + for node in tree.body: + if _node_has_import_side_effect(node): + reasons.append( + f"Top-level import of '{module_name}' is unsafe because {module_path.name} executes module-level calls or state changes on import" + ) + break + + if isinstance(node, ast.Import): + external_imports.extend(alias.name.split(".")[0] for alias in node.names) + elif isinstance(node, ast.ImportFrom) and node.module: + external_imports.append(node.module.split(".")[0]) + + for dependency in _dedupe_preserve_order(external_imports): + if dependency in _STDLIB_MODULES or dependency in codebase_modules: + continue + success, _, missing_module = _probe_module_import( + dependency, codebase_path, codebase_path + ) + if not success: + missing_name = missing_module or dependency + reasons.append( + f"Top-level import of '{module_name}' is unsafe because dependency '{missing_name}' is not importable" + ) + + return reasons + + +def _find_unsafe_top_level_imports(test_file: Path, codebase_path: Path) -> List[str]: + """Reject generated suites that import risky source modules at file import time.""" + try: + tree = ast.parse(test_file.read_text(encoding="utf-8", errors="ignore")) + except SyntaxError: + return [] + + codebase_modules = _build_codebase_modules(codebase_path) + risky_modules: List[str] = [] + for node in tree.body: + if isinstance(node, ast.Import): + risky_modules.extend( + alias.name.split(".")[0] + for alias in node.names + if alias.name.split(".")[0] in codebase_modules + ) + elif isinstance(node, ast.ImportFrom) and node.module: + root = node.module.split(".")[0] + if root in codebase_modules: + risky_modules.append(root) + + issues = [] + for module_name in _dedupe_preserve_order(risky_modules): + issues.extend(_analyze_top_level_import_risk(module_name, codebase_path)) + return issues + + +def _discover_missing_dependencies( + packages: List[str], + cwd: Path, + test_code: str = "", + project_root: Optional[Path] = None, +) -> Tuple[List[str], List[str]]: + """Probe actual imports to determine which dependencies are still missing.""" + root = (project_root or cwd).resolve() + modules_to_probe: List[str] = [] + for package in packages: + modules_to_probe.extend(_package_to_import_candidates(package)) + if test_code: + modules_to_probe.extend(_extract_import_roots_from_code(test_code)) + modules_to_probe = _dedupe_preserve_order(modules_to_probe) + + codebase_modules = _build_codebase_modules(root) + missing_packages: List[str] = [] + diagnostics: List[str] = [] + + for module_name in modules_to_probe: + root_name = module_name.split(".")[0] + if root_name in _STDLIB_MODULES: + continue + + success, output, missing_module = _probe_module_import(module_name, root, cwd) + if success: + continue + + if root_name in codebase_modules: + if ( + missing_module + and missing_module not in _STDLIB_MODULES + and missing_module not in codebase_modules + ): + missing_packages.append(_package_for_import(missing_module)) + diagnostics.append( + f"Import probe for '{root_name}' failed because '{missing_module}' is not importable" + ) + else: + last_line = output.splitlines()[-1] if output else "unknown import error" + diagnostics.append( + f"Import probe for '{root_name}' failed: {last_line}" + ) + continue + + missing_packages.append(_package_for_import(root_name)) + if ( + missing_module + and missing_module not in _STDLIB_MODULES + and missing_module not in codebase_modules + ): + missing_packages.append(_package_for_import(missing_module)) + diagnostics.append(f"Module '{module_name}' is not importable") + + filtered_missing = [] + for package in _dedupe_preserve_order(missing_packages): + normalized = _strip_version_spec(package).lower().replace("_", "-") + if normalized in _STDLIB_MODULES: + continue + filtered_missing.append(package) + return filtered_missing, diagnostics + + +def _parse_collected_test_count(output: str) -> Optional[int]: + """Extract collected-test count from pytest --collect-only output.""" + patterns = ( + r"(\d+)\s+tests?\s+collected", + r"collected\s+(\d+)\s+items?", + ) + for pattern in patterns: + match = re.search(pattern, output, re.IGNORECASE) + if match: + return int(match.group(1)) + if "no tests collected" in output.lower(): + return 0 + return None + + +def validate_generated_test_file(test_file: Path, codebase_path: Path) -> Dict[str, object]: + """ + Run fast semantic checks before the full pytest+coverage execution. + + This keeps clearly invalid suites out of the main loop and gives the + implementation agent focused repair feedback. + """ + codebase_path = codebase_path.resolve() + test_file = test_file.resolve() + run_cwd = codebase_path + + if not test_file.exists(): + return { + "passed": False, + "stage": "missing_file", + "message": f"Generated test file does not exist: {test_file}", + "output": "", + "collected_tests": 0, + } + + try: + compile_result = subprocess.run( + [sys.executable, "-m", "py_compile", str(test_file)], + capture_output=True, + text=True, + timeout=30, + cwd=run_cwd, + ) + except subprocess.TimeoutExpired: + return { + "passed": False, + "stage": "py_compile", + "message": "py_compile timed out while validating generated tests", + "output": "", + "collected_tests": 0, + } + + compile_output = compile_result.stdout + "\n" + compile_result.stderr + if compile_result.returncode != 0: + return { + "passed": False, + "stage": "py_compile", + "message": "py_compile failed for generated tests", + "output": compile_output.strip(), + "collected_tests": 0, + } + + unsafe_top_level_imports = _find_unsafe_top_level_imports(test_file, codebase_path) + if unsafe_top_level_imports: + return { + "passed": False, + "stage": "top_level_imports", + "message": "Generated tests import risky source modules at file import time", + "output": "\n".join(unsafe_top_level_imports), + "collected_tests": 0, + } + + pytest_target = _build_pytest_target(test_file, run_cwd) + try: + collect_result = subprocess.run( + [ + sys.executable, + "-m", + "pytest", + pytest_target, + "--collect-only", + "-q", + ], + capture_output=True, + text=True, + timeout=60, + cwd=run_cwd, + ) + except subprocess.TimeoutExpired: + return { + "passed": False, + "stage": "collect_only", + "message": "pytest --collect-only timed out while validating generated tests", + "output": "", + "collected_tests": 0, + } + + collect_output = (collect_result.stdout + "\n" + collect_result.stderr).strip() + collected_tests = _parse_collected_test_count(collect_output) + + if collect_result.returncode != 0: + return { + "passed": False, + "stage": "collect_only", + "message": "pytest --collect-only failed for generated tests", + "output": collect_output, + "collected_tests": collected_tests or 0, + } + + if collected_tests == 0: + return { + "passed": False, + "stage": "collect_only", + "message": "pytest collected 0 tests from the generated suite", + "output": collect_output, + "collected_tests": 0, + } + + return { + "passed": True, + "stage": "collect_only", + "message": f"Semantic validation passed; collected {collected_tests or 'unknown'} test(s)", + "output": collect_output, + "collected_tests": collected_tests, + } + def analyze_dependencies_with_llm(test_code: str) -> Optional[List[str]]: """ Uses LLM to analyze test code and determine exact PyPI packages. Returns None if analysis fails, triggering fallback to regex. """ - print(" 🤖 Asking LLM to identify dependencies...") + print(" 🤖 Asking LLM to identify dependencies...") try: llm_client = create_llm_client(use_mock_on_failure=True) @@ -57,7 +518,7 @@ def analyze_dependencies_with_llm(test_code: str) -> Optional[List[str]]: return packages except Exception as e: - print(f" ⚠️ LLM dependency analysis failed: {e}") + print(f" ⚠️ LLM dependency analysis failed: {e}") return None @@ -68,11 +529,11 @@ def extract_dependencies(test_code: str) -> List[str]: # 1. Try LLM first llm_packages = analyze_dependencies_with_llm(test_code) if llm_packages is not None: - print(f" ✨ LLM identified packages: {', '.join(llm_packages)}") + print(f" ✨ LLM identified packages: {', '.join(llm_packages)}") return llm_packages # 2. Fallback to regex if LLM fails - print(" ⚠️ Falling back to regex dependency extraction...") + print(" ⚠️ Falling back to regex dependency extraction...") # Common import to package name mappings import_to_package = { @@ -115,12 +576,16 @@ def extract_dependencies(test_code: str) -> List[str]: return list(imports) -def install_dependencies_with_retry(packages: List[str], cwd: Path) -> Tuple[str, int]: - """Installs packages with LLM-guided retry logic on failure.""" - +def install_dependencies_with_retry( + packages: List[str], + cwd: Path, + test_code: str = "", + project_root: Optional[Path] = None, +) -> Tuple[str, int]: + """Install only dependencies that are still not importable.""" attempt = 0 max_retries = 3 - current_packages = packages.copy() + current_packages = _dedupe_preserve_order(packages) last_output = "" last_return_code = 0 @@ -128,108 +593,109 @@ def install_dependencies_with_retry(packages: List[str], cwd: Path) -> Tuple[str if not current_packages: return "No packages to install", 0 - # Check what's missing - missing_packages = [] - installed_dists = set() - for d in importlib.metadata.distributions(): - name = d.metadata.get("Name") - if name: - installed_dists.add(name.lower().replace("_", "-")) - - for package in current_packages: - # Normalize package name - pkg_name = ( - package.split("==")[0] - .split(">=")[0] - .split("<=")[0] - .split(">")[0] - .split("<")[0] - .strip() - .lower() - .replace("_", "-") - ) - - if pkg_name not in installed_dists: - missing_packages.append(package) - + missing_packages, diagnostics = _discover_missing_dependencies( + current_packages, + cwd, + test_code=test_code, + project_root=project_root, + ) if not missing_packages: + if diagnostics: + return "\n".join(diagnostics), 1 print( - f"\n✅ All dependencies already installed: {', '.join(current_packages)}" + f"\nAll dependencies already importable: {', '.join(current_packages)}" ) - return "All dependencies already installed", 0 + return "All dependencies already importable", 0 print( - f"\n📦 Installing dependencies (Attempt {attempt + 1}/{max_retries + 1}): {', '.join(missing_packages)}" + f"\nInstalling dependencies (Attempt {attempt + 1}/{max_retries + 1}): {', '.join(missing_packages)}" ) - cmd = [sys.executable, "-m", "pip", "install", "--quiet"] + missing_packages - try: result = subprocess.run( - cmd, capture_output=True, text=True, timeout=120, cwd=cwd + [sys.executable, "-m", "pip", "install", "--quiet", *missing_packages], + capture_output=True, + text=True, + timeout=120, + cwd=cwd, ) - last_output = result.stdout + "\n" + result.stderr - last_return_code = result.returncode - - if result.returncode == 0: - print(" ✅ Dependencies installed successfully") - return last_output, 0 - - # Installation failed - print(f" ❌ Installation failed: {result.stderr.strip()}") + except subprocess.TimeoutExpired: + return "Dependency installation timed out", 1 + except Exception as e: + return f"Error installing dependencies: {e}", 1 - if attempt < max_retries: - print(" 🤔 Asking LLM for a fix...") - llm_client = create_llm_client(use_mock_on_failure=True) + last_output = (result.stdout + "\n" + result.stderr).strip() + last_return_code = result.returncode - user_prompt = f"""Dependency installation failed. + if result.returncode == 0: + remaining_packages, remaining_diagnostics = _discover_missing_dependencies( + current_packages, + cwd, + test_code=test_code, + project_root=project_root, + ) + if not remaining_packages: + print(" Dependencies installed successfully") + return last_output, 0 + current_packages = remaining_packages + last_output = "\n".join( + part + for part in [last_output, "\n".join(remaining_diagnostics)] + if part + ) + last_return_code = 1 + print(" Installation completed, but some imports are still unavailable") + attempt += 1 + continue - Packages attempted: {missing_packages} + print(f" Installation failed: {result.stderr.strip()}") + if attempt >= max_retries: + break - Error message: - {result.stderr} + llm_client = create_llm_client(use_mock_on_failure=True) + user_prompt = f"""Dependency installation failed. - Suggest a fix.""" +Packages attempted: {missing_packages} - response, _ = llm_client.call(DEPENDENCY_FIX_SYSTEM_PROMPT, user_prompt) +Error message: +{result.stderr} - # Parse fix - if "```" in response: - json_match = re.search(r"```(?:json)?\s*([\s\S]*?)```", response) - if json_match: - response = json_match.group(1).strip() +Suggest a fix.""" - try: - data = json.loads(response) - new_packages = data.get("packages", []) - reason = data.get("reason", "No reason provided") + response, _ = llm_client.call(DEPENDENCY_FIX_SYSTEM_PROMPT, user_prompt) + if "```" in response: + json_match = re.search(r"```(?:json)?\s*([\s\S]*?)```", response) + if json_match: + response = json_match.group(1).strip() - if new_packages: - print(f" 💡 LLM Suggestion: {reason}") - print(f" 🔄 Retrying with: {', '.join(new_packages)}") - # Replace failed packages with suggested ones in our list - # For simplicity, we just use the new list for the next attempt - current_packages = new_packages - else: - print(" ⚠️ LLM could not suggest a fix.") - break - except json.JSONDecodeError: - print(" ⚠️ Failed to parse LLM suggestion.") - break + try: + data = json.loads(response) + except json.JSONDecodeError: + break - attempt += 1 + new_packages = _dedupe_preserve_order(data.get("packages", [])) + if not new_packages: + break - except subprocess.TimeoutExpired: - return "Dependency installation timed out", 1 - except Exception as e: - return f"Error installing dependencies: {e}", 1 + current_packages = new_packages + attempt += 1 return last_output, last_return_code -def install_dependencies(packages: List[str], cwd: Path) -> Tuple[str, int]: +def install_dependencies( + packages: List[str], + cwd: Path, + test_code: str = "", + project_root: Optional[Path] = None, +) -> Tuple[str, int]: """Wrapper for install_dependencies_with_retry.""" - return install_dependencies_with_retry(packages, cwd) + return install_dependencies_with_retry( + packages, + cwd, + test_code=test_code, + project_root=project_root, + ) def parse_pytest_output(output: str) -> Dict[str, int]: @@ -280,7 +746,7 @@ def parse_coverage_json(coverage_json_path: Path, source_root: Path) -> dict: }, } except Exception as e: - print(f" ⚠️ Could not parse coverage.json: {e}") + print(f" ⚠️ Could not parse coverage.json: {e}") return {"percentage": 0.0, "uncovered_areas_text": "", "detailed_reports": {}} @@ -303,8 +769,30 @@ def run_tests( """ print("\nRunning tests with coverage...") + codebase_path = codebase_path.resolve() + test_file = test_file.resolve() + run_cwd = codebase_path + + if not test_file.exists(): + return { + "output": f"Test file does not exist: {test_file}", + "exit_code": 1, + "total_tests": 0, + "passed": 0, + "failed": 0, + "coverage_percentage": 0.0, + "uncovered_areas_text": "", + "coverage_details": {}, + "mutation_score": 0.0, + "mutation_report": None, + "mutation_feedback": "", + } + # Get source directory to measure coverage source_dir = str(codebase_path) + pytest_target = _build_pytest_target(test_file, run_cwd) + coverage_json_path = _build_coverage_json_path(codebase_path) + canonical_coverage_path = codebase_path / "coverage.json" # Create a .coveragerc file to exclude test files from coverage measurement. # This prevents the AI from trying to generate tests for test files. @@ -329,38 +817,61 @@ def run_tests( with open(coveragerc_path, "w", encoding="utf-8") as f: f.write(coveragerc_content) except Exception as e: - print(f" ⚠️ Could not create .coveragerc: {e}") + print(f" ⚠️ Could not create .coveragerc: {e}") cmd = [ sys.executable, "-m", "pytest", - str(test_file), + pytest_target, "-v", "--tb=short", "--timeout=30", # Per-test timeout of 30 seconds f"--cov={source_dir}", "--cov-branch", "--cov-report=term-missing", - "--cov-report=json", + f"--cov-report=json:{coverage_json_path}", ] try: + run_started_at = time.time() result = subprocess.run( cmd, capture_output=True, text=True, timeout=120, # 2-minute overall timeout - cwd=test_file.parent.parent, + cwd=run_cwd, ) output = result.stdout + "\n" + result.stderr + if "file or directory not found" in output.lower(): + return { + "output": output, + "exit_code": result.returncode or 1, + "total_tests": 0, + "passed": 0, + "failed": 0, + "coverage_percentage": 0.0, + "uncovered_areas_text": "", + "coverage_details": {}, + "mutation_score": 0.0, + "mutation_report": None, + "mutation_feedback": "", + } + # Parse test results from output test_results = parse_pytest_output(output) - # Parse coverage from JSON report - coverage_json_path = test_file.parent.parent / "coverage.json" - coverage_data = parse_coverage_json(coverage_json_path, codebase_path) + # Parse coverage from this run only. Ignore stale coverage artifacts. + coverage_data = {"percentage": 0.0, "uncovered_areas_text": "", "detailed_reports": {}} + if coverage_json_path.exists() and coverage_json_path.stat().st_mtime >= run_started_at: + coverage_data = parse_coverage_json(coverage_json_path, codebase_path) + try: + shutil.copyfile(coverage_json_path, canonical_coverage_path) + except Exception as exc: + print(f" ⚠️ Could not refresh canonical coverage.json: {exc}") + else: + print(" ⚠️ Ignoring stale or missing coverage report from this pytest run") # Run mutation testing if enabled mutation_score = 0.0 @@ -426,3 +937,4 @@ def run_tests( "mutation_report": None, "mutation_feedback": "", } + diff --git a/src/extension/pythonTestingPipeline/scripts/pipeline/tests/test_governance.py b/src/extension/pythonTestingPipeline/scripts/pipeline/tests/test_governance.py new file mode 100644 index 0000000..8f83b20 --- /dev/null +++ b/src/extension/pythonTestingPipeline/scripts/pipeline/tests/test_governance.py @@ -0,0 +1,30 @@ +"""Unit tests for governance status reporting.""" + +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +from pipeline.governance import FailureReason, GovernanceLog + + +def test_governance_status_requires_no_failures_and_no_failed_validations(): + log = GovernanceLog() + log.log_decision("agent", "action", "because", confidence=0.9) + + summary = log.get_audit_trail()["summary"] + + assert summary["failed_validations"] == 0 + assert summary["total_failures"] == 0 + assert summary["status"] == "PASS" + + +def test_governance_status_requires_review_when_failures_exist(): + log = GovernanceLog() + log.log_decision("agent", "action", "because", confidence=0.9) + log.log_failure(FailureReason.COVERAGE_LOW, "coverage too low", iteration=1) + + summary = log.get_audit_trail()["summary"] + + assert summary["total_failures"] == 1 + assert summary["status"] == "REVIEW_NEEDED" diff --git a/src/extension/pythonTestingPipeline/scripts/pipeline/tests/test_test_runner.py b/src/extension/pythonTestingPipeline/scripts/pipeline/tests/test_test_runner.py new file mode 100644 index 0000000..f3d8cf5 --- /dev/null +++ b/src/extension/pythonTestingPipeline/scripts/pipeline/tests/test_test_runner.py @@ -0,0 +1,208 @@ +"""Unit tests for the pipeline test runner module.""" + +import os +import subprocess +import sys +from pathlib import Path +from unittest.mock import patch + +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +from pipeline.test_runner import ( + _discover_missing_dependencies, + run_tests, + validate_generated_test_file, +) + + +def _make_test_layout(tmp_path: Path) -> tuple[Path, Path]: + codebase = tmp_path / "sample_app" + tests_dir = codebase / "tests" + tests_dir.mkdir(parents=True) + test_file = tests_dir / "test_generated_sample.py" + test_file.write_text("def test_placeholder():\n assert True\n", encoding="utf-8") + return codebase, test_file + + +def test_run_tests_uses_path_valid_for_codebase_cwd_and_fresh_coverage(tmp_path): + codebase, test_file = _make_test_layout(tmp_path) + stale_coverage = codebase / "coverage.json" + stale_coverage.write_text("stale", encoding="utf-8") + + captured = {} + fresh_coverage = codebase / "coverage.123456789.json" + + def fake_run(cmd, capture_output, text, timeout, cwd): + captured["cmd"] = cmd + captured["cwd"] = cwd + fresh_coverage.write_text("{}", encoding="utf-8") + return subprocess.CompletedProcess(cmd, 0, stdout="1 passed in 0.10s", stderr="") + + with patch("pipeline.test_runner.subprocess.run", side_effect=fake_run), patch( + "pipeline.test_runner.parse_coverage_json", + return_value={"percentage": 88.8, "uncovered_areas_text": "missing", "detailed_reports": {"sample.py": {}}}, + ) as mock_parse, patch("pipeline.test_runner.time.time", return_value=100.0), patch( + "pipeline.test_runner.time.time_ns", return_value=123456789 + ): + result = run_tests(test_file, codebase) + + assert captured["cwd"] == codebase.resolve() + assert captured["cmd"][3] == str(Path("tests") / test_file.name) + assert f"--cov-report=json:{fresh_coverage}" in captured["cmd"] + mock_parse.assert_called_once_with(fresh_coverage, codebase.resolve()) + assert result["total_tests"] == 1 + assert result["passed"] == 1 + assert result["coverage_percentage"] == 88.8 + assert stale_coverage.read_text(encoding="utf-8") == "{}" + + +def test_run_tests_fails_fast_when_pytest_reports_missing_target(tmp_path): + codebase, test_file = _make_test_layout(tmp_path) + stale_coverage = codebase / "coverage.json" + stale_coverage.write_text("stale", encoding="utf-8") + + missing_output = "ERROR: file or directory not found: tests/test_generated_sample.py" + + with patch( + "pipeline.test_runner.subprocess.run", + return_value=subprocess.CompletedProcess([], 4, stdout="", stderr=missing_output), + ), patch("pipeline.test_runner.parse_coverage_json") as mock_parse, patch( + "pipeline.test_runner.time.time", return_value=100.0 + ), patch("pipeline.test_runner.time.time_ns", return_value=123456789): + result = run_tests(test_file, codebase) + + mock_parse.assert_not_called() + assert result["exit_code"] == 4 + assert result["coverage_percentage"] == 0.0 + assert result["total_tests"] == 0 + assert "file or directory not found" in result["output"].lower() + assert stale_coverage.read_text(encoding="utf-8") == "stale" + + +def test_run_tests_ignores_stale_run_specific_coverage_file(tmp_path): + codebase, test_file = _make_test_layout(tmp_path) + stale_coverage = codebase / "coverage.json" + stale_coverage.write_text("stale", encoding="utf-8") + fresh_coverage = codebase / "coverage.123456789.json" + + def fake_run(cmd, capture_output, text, timeout, cwd): + fresh_coverage.write_text("{}", encoding="utf-8") + os.utime(fresh_coverage, (50.0, 50.0)) + return subprocess.CompletedProcess(cmd, 0, stdout="1 passed in 0.10s", stderr="") + + with patch("pipeline.test_runner.subprocess.run", side_effect=fake_run), patch( + "pipeline.test_runner.parse_coverage_json" + ) as mock_parse, patch("pipeline.test_runner.time.time", return_value=100.0), patch( + "pipeline.test_runner.time.time_ns", return_value=123456789 + ): + result = run_tests(test_file, codebase) + + mock_parse.assert_not_called() + assert result["coverage_percentage"] == 0.0 + assert result["coverage_details"] == {} + assert stale_coverage.read_text(encoding="utf-8") == "stale" + + +def test_validate_generated_test_file_runs_py_compile_then_collect_only(tmp_path): + codebase, test_file = _make_test_layout(tmp_path) + commands = [] + + def fake_run(cmd, capture_output, text, timeout, cwd): + commands.append((cmd, cwd)) + if cmd[2] == "py_compile": + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + return subprocess.CompletedProcess( + cmd, 0, stdout="1 test collected in 0.01s", stderr="" + ) + + with patch("pipeline.test_runner.subprocess.run", side_effect=fake_run): + result = validate_generated_test_file(test_file, codebase) + + assert result["passed"] is True + assert result["collected_tests"] == 1 + assert commands[0][0][2] == "py_compile" + assert commands[1][0][3] == str(Path("tests") / test_file.name) + + +def test_validate_generated_test_file_fails_on_collect_only_error(tmp_path): + codebase, test_file = _make_test_layout(tmp_path) + + def fake_run(cmd, capture_output, text, timeout, cwd): + if cmd[2] == "py_compile": + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + return subprocess.CompletedProcess( + cmd, + 2, + stdout="", + stderr="ImportError while importing test module", + ) + + with patch("pipeline.test_runner.subprocess.run", side_effect=fake_run): + result = validate_generated_test_file(test_file, codebase) + + assert result["passed"] is False + assert result["stage"] == "collect_only" + assert "collect-only failed" in result["message"] + + +def test_discover_missing_dependencies_uses_import_probes_for_generated_suite(tmp_path): + codebase = tmp_path / "sample_app" + codebase.mkdir() + (codebase / "build.py").write_text( + "import PyInstaller.__main__\n", encoding="utf-8" + ) + + def fake_probe(module_name, codebase_path, cwd, timeout=30): + if module_name == "build": + return ( + False, + "ModuleNotFoundError: No module named 'PyInstaller'", + "PyInstaller", + ) + return True, "", None + + with patch("pipeline.test_runner._probe_module_import", side_effect=fake_probe): + missing, diagnostics = _discover_missing_dependencies( + ["pytest"], + codebase, + test_code="import pytest\nimport build\n", + project_root=codebase, + ) + + assert "PyInstaller" in missing + assert any("build" in diagnostic for diagnostic in diagnostics) + + +def test_validate_generated_test_file_rejects_risky_top_level_imports(tmp_path): + codebase = tmp_path / "sample_app" + tests_dir = codebase / "tests" + tests_dir.mkdir(parents=True) + (codebase / "build.py").write_text( + "import PyInstaller.__main__\n", encoding="utf-8" + ) + test_file = tests_dir / "test_generated_sample.py" + test_file.write_text( + "import build\n\n" + "def test_placeholder():\n" + " assert True\n", + encoding="utf-8", + ) + + def fake_run(cmd, capture_output, text, timeout, cwd): + if cmd[2] == "py_compile": + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + raise AssertionError("collect-only should not run for risky top-level imports") + + with patch("pipeline.test_runner.subprocess.run", side_effect=fake_run), patch( + "pipeline.test_runner._probe_module_import", + return_value=( + False, + "ModuleNotFoundError: No module named 'PyInstaller'", + "PyInstaller", + ), + ): + result = validate_generated_test_file(test_file, codebase) + + assert result["passed"] is False + assert result["stage"] == "top_level_imports" + assert "PyInstaller" in result["output"] diff --git a/src/extension/pythonTestingPipeline/scripts/pythonTestingPipeline.py b/src/extension/pythonTestingPipeline/scripts/pythonTestingPipeline.py index a35542b..149944b 100644 --- a/src/extension/pythonTestingPipeline/scripts/pythonTestingPipeline.py +++ b/src/extension/pythonTestingPipeline/scripts/pythonTestingPipeline.py @@ -42,6 +42,7 @@ extract_dependencies, install_dependencies, run_tests, + validate_generated_test_file, ) # ==================== Pipeline Implementation ==================== @@ -351,6 +352,7 @@ def generate_additional_tests( coverage_percentage: float, uncovered_areas: str, syntax_errors: str = "", + validation_errors: str = "", security_issues: list = None, ) -> tuple[str, Path]: """Generates additional tests to improve coverage and fix security issues.""" @@ -360,9 +362,109 @@ def generate_additional_tests( coverage_percentage, uncovered_areas, syntax_errors, + validation_errors, security_issues, ) + @staticmethod + def _empty_test_results(message: str) -> dict: + """Return a minimal failed test result payload.""" + return { + "output": message, + "exit_code": 1, + "total_tests": 0, + "passed": 0, + "failed": 0, + "coverage_percentage": 0.0, + "uncovered_areas_text": "", + "coverage_details": {}, + "mutation_results": None, + } + + @staticmethod + def _result_signature( + test_results: dict, evaluation: Optional[TestEvaluationOutput] + ) -> tuple: + """Build a stable signature for final replay comparison.""" + return ( + test_results.get("exit_code", 1), + test_results.get("total_tests", 0), + test_results.get("passed", 0), + test_results.get("failed", 0), + round(evaluation.code_coverage_percentage if evaluation else 0.0, 1), + ) + + def _final_acceptance_replay( + self, + test_file: Path, + codebase_path: Path, + scenarios: TestScenariosOutput, + expected_results: dict, + expected_evaluation: Optional[TestEvaluationOutput], + iteration: int, + ) -> tuple[dict, TestEvaluationOutput, bool]: + """Replay the saved suite and use that replay as the final source of truth.""" + validation = validate_generated_test_file(test_file, codebase_path) + if not validation["passed"]: + message = f"{validation['stage']}: {validation['message']}" + governance_log.log_validation( + "final_acceptance", str(test_file), False, message + ) + governance_log.log_failure( + FailureReason.TEST_FAILURE, + f"Final acceptance failed semantic validation: {message}", + iteration, + ) + failed_results = self._empty_test_results( + message + + ( + f"\n\n{validation['output']}" + if validation.get("output") + else "" + ) + ) + failed_evaluation = self.evaluate_results( + failed_results, scenarios, codebase_path + ) + return failed_results, failed_evaluation, False + + replay_results = run_tests(test_file, codebase_path) + replay_evaluation = self.evaluate_results( + replay_results, scenarios, codebase_path + ) + + if not expected_results or expected_evaluation is None: + governance_log.log_validation( + "final_acceptance", + str(test_file), + True, + "Final acceptance replay succeeded", + ) + return replay_results, replay_evaluation, True + + expected_signature = self._result_signature( + expected_results, expected_evaluation + ) + replay_signature = self._result_signature(replay_results, replay_evaluation) + if expected_signature == replay_signature: + governance_log.log_validation( + "final_acceptance", + str(test_file), + True, + "Final acceptance replay matched the saved suite metrics", + ) + return replay_results, replay_evaluation, True + + message = ( + "Final acceptance replay mismatch: " + f"expected {expected_signature}, got {replay_signature}" + ) + governance_log.log_validation( + "final_acceptance", str(test_file), False, message + ) + governance_log.log_failure(FailureReason.TEST_FAILURE, message, iteration) + return replay_results, replay_evaluation, False + def run_pipeline( self, codebase_path: Path, @@ -408,7 +510,12 @@ def run_pipeline( if should_run_tests: deps = extract_dependencies(test_code) if deps: - dep_output, dep_exit = install_dependencies(deps, codebase_path) + dep_output, dep_exit = install_dependencies( + deps, + codebase_path, + test_code=test_code, + project_root=codebase_path, + ) results["dependencies_installed"] = deps results["dependency_output"] = dep_output @@ -424,15 +531,88 @@ def run_pipeline( # Track progress to prevent getting stuck best_coverage = 0.0 best_test_code = None # Will store snapshot of best test code + best_test_results = None + best_evaluation = None best_severe_count = float("inf") consecutive_no_progress = 0 previous_coverage = 0.0 # For mutation testing delta trigger + current_coverage = 0.0 + test_results = {} + evaluation = None + has_severe_security = False while iteration < max_iterations: iteration += 1 iteration_start = time_module.time() print(f"\n--- Iteration {iteration} ---") + validation = validate_generated_test_file( + current_test_file, codebase_path + ) + if not validation["passed"]: + validation_message = ( + f"{validation['stage']}: {validation['message']}" + ) + print( + f" ⚠️ Semantic validation failed: {validation_message}" + ) + governance_log.log_validation( + "semantic_validator", + str(current_test_file), + False, + validation_message, + ) + governance_log.log_failure( + FailureReason.TEST_FAILURE, + validation_message, + iteration, + ) + + consecutive_no_progress += 1 + if consecutive_no_progress >= 5: + print( + f"\n⚠️ No progress limits for {consecutive_no_progress} iterations. Stopping." + ) + print(f" Best coverage: {best_coverage:.1f}%") + print(f" Lowest severe issues: {best_severe_count}") + break + + current_test_code, current_test_file = ( + self.generate_additional_tests( + codebase_path, + current_test_file, + current_coverage, + validation.get("output", "") + or "Semantic validation failed before coverage could be measured", + validation_errors=( + validation_message + + "\n\n" + + validation.get("output", "")[:2000] + ), + ) + ) + + new_deps = extract_dependencies(current_test_code) + if new_deps: + install_dependencies( + new_deps, + codebase_path, + test_code=current_test_code, + project_root=codebase_path, + ) + + iteration_time = time_module.time() - iteration_start + iteration_times.append(iteration_time) + print(f" ⏱️ Iteration time: {iteration_time:.1f}s") + continue + + governance_log.log_validation( + "semantic_validator", + str(current_test_file), + True, + validation["message"], + ) + # Determine if mutation testing should run this iteration from pipeline.mutation_testing import should_enable_mutation_testing @@ -511,6 +691,8 @@ def run_pipeline( best_coverage = current_coverage # Snapshot current test code (strings are immutable, so safe) best_test_code = current_test_code + best_test_results = test_results + best_evaluation = evaluation progress_made = True if current_severe_count < best_severe_count: @@ -587,6 +769,7 @@ def run_pipeline( current_coverage, uncovered_areas, syntax_errors=syntax_errors, + validation_errors="", security_issues=security_issues if has_severe_security else None, @@ -596,7 +779,12 @@ def run_pipeline( # Re-extract and install any new dependencies new_deps = extract_dependencies(current_test_code) if new_deps: - install_dependencies(new_deps, codebase_path) + install_dependencies( + new_deps, + codebase_path, + test_code=current_test_code, + project_root=codebase_path, + ) # Record iteration time iteration_time = time_module.time() - iteration_start @@ -609,7 +797,9 @@ def run_pipeline( print(f" Final coverage: {current_coverage:.1f}%") if has_severe_security: print(" ⚠️ Unresolved severe security issues remain") - recommendations = evaluation.actionable_recommendations + recommendations = ( + evaluation.actionable_recommendations if evaluation else [] + ) if recommendations: print(" Recommendations:") for rec in recommendations[:5]: @@ -623,6 +813,68 @@ def run_pipeline( with open(current_test_file, "w", encoding="utf-8") as f: f.write(best_test_code) current_test_code = best_test_code + final_validation = validate_generated_test_file( + current_test_file, codebase_path + ) + if final_validation["passed"]: + print( + " Re-running restored best suite so report and coverage match the saved file" + ) + governance_log.log_validation( + "semantic_validator", + str(current_test_file), + True, + final_validation["message"], + ) + test_results = run_tests(current_test_file, codebase_path) + results["test_output"] = test_results["output"] + results["exit_code"] = test_results["exit_code"] + evaluation = self.evaluate_results( + test_results, approved_scenarios, codebase_path + ) + results["evaluation"] = asdict(evaluation) + current_coverage = evaluation.code_coverage_percentage + else: + message = ( + f"{final_validation['stage']}: {final_validation['message']}" + ) + governance_log.log_validation( + "semantic_validator", + str(current_test_file), + False, + message, + ) + governance_log.log_failure( + FailureReason.TEST_FAILURE, + f"Restored best suite failed semantic validation: {message}", + iteration, + ) + if best_test_results is not None: + test_results = best_test_results + results["test_output"] = test_results["output"] + results["exit_code"] = test_results["exit_code"] + if best_evaluation is not None: + evaluation = best_evaluation + results["evaluation"] = asdict(evaluation) + current_coverage = evaluation.code_coverage_percentage + + results["test_file"] = str(current_test_file) + results["test_code"] = current_test_code + test_results, evaluation, acceptance_passed = ( + self._final_acceptance_replay( + current_test_file, + codebase_path, + approved_scenarios, + test_results, + evaluation, + iteration, + ) + ) + results["test_output"] = test_results["output"] + results["exit_code"] = test_results["exit_code"] + results["evaluation"] = asdict(evaluation) + if not acceptance_passed: + results["status"] = "failed" # Calculate total time total_time = time_module.time() - pipeline_start_time @@ -631,7 +883,8 @@ def run_pipeline( "iteration_times": [round(t, 2) for t in iteration_times], "iterations_count": len(iteration_times), } - results["status"] = "completed" + if results.get("status") != "failed": + results["status"] = "completed" # Save all prompts to JSON for later analysis run_id = str(int(time_module.time())) @@ -664,7 +917,7 @@ def run_pipeline( print("\n" + "=" * 60) print("✅ Pipeline Complete!") print("=" * 60) - print(f" Test file: {test_file}") + print(f" Test file: {results.get('test_file', test_file)}") print(f" Scenarios: {len(approved_scenarios.test_scenarios)}") if "evaluation" in results: