Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 94 additions & 46 deletions cycode/cli/apps/mcp/mcp_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
import json
import logging
import os
import shutil
import sys
import tempfile
import uuid
from pathlib import Path
from typing import Annotated, Any

import typer
from pathvalidate import sanitize_filepath
from pydantic import Field

from cycode.cli.cli_types import McpTransportOption, ScanTypeOption
Expand All @@ -26,7 +27,7 @@

_logger = get_logger('Cycode MCP')

_DEFAULT_RUN_COMMAND_TIMEOUT = 5 * 60
_DEFAULT_RUN_COMMAND_TIMEOUT = 10 * 60

_FILES_TOOL_FIELD = Field(description='Files to scan, mapping file paths to their content')

Expand Down Expand Up @@ -91,48 +92,76 @@ async def _run_cycode_command(*args: str, timeout: int = _DEFAULT_RUN_COMMAND_TI
return {'error': f'Failed to run command: {e!s}'}


def _create_temp_files(files_content: dict[str, str]) -> list[str]:
"""Create temporary files from content and return their paths."""
temp_dir = tempfile.mkdtemp(prefix='cycode_mcp_')
temp_files = []
def _sanitize_file_path(file_path: str) -> str:
"""Sanitize file path to prevent path traversal and other security issues.

_logger.debug('Creating temporary files in directory: %s', temp_dir)
Args:
file_path: The file path to sanitize

for file_path, content in files_content.items():
safe_filename = f'{_gen_random_id()}_{Path(file_path).name}'
temp_file_path = os.path.join(temp_dir, safe_filename)
Returns:
Sanitized file path safe for use in temporary directory

os.makedirs(os.path.dirname(temp_file_path), exist_ok=True)
Raises:
ValueError: If the path is invalid or potentially dangerous
"""
if not file_path or not isinstance(file_path, str):
raise ValueError('File path must be a non-empty string')

_logger.debug('Creating temp file: %s', temp_file_path)
with open(temp_file_path, 'w', encoding='UTF-8') as f:
f.write(content)
return sanitize_filepath(file_path, platform='auto', validate_after_sanitize=True)

temp_files.append(temp_file_path)

return temp_files
class _TempFilesManager:
"""Context manager for creating and cleaning up temporary files.

Creates a temporary directory structure that preserves original file paths
inside a call_id as a suffix. Automatically cleans up all files and directories
when exiting the context.
"""

def _cleanup_temp_files(temp_files: list[str]) -> None:
"""Clean up temporary files and directories."""
def __init__(self, files_content: dict[str, str], call_id: str) -> None:
self.files_content = files_content
self.call_id = call_id
self.temp_base_dir = None
self.temp_files = []

temp_dirs = set()
for temp_file in temp_files:
try:
if os.path.exists(temp_file):
_logger.debug('Removing temp file: %s', temp_file)
os.remove(temp_file)
temp_dirs.add(os.path.dirname(temp_file))
except OSError as e:
_logger.warning('Failed to remove temp file %s: %s', temp_file, e)

for temp_dir in temp_dirs:
try:
if os.path.exists(temp_dir) and not os.listdir(temp_dir):
_logger.debug('Removing temp directory: %s', temp_dir)
os.rmdir(temp_dir)
except OSError as e:
_logger.warning('Failed to remove temp directory %s: %s', temp_dir, e)
def __enter__(self) -> list[str]:
self.temp_base_dir = tempfile.mkdtemp(prefix='cycode_mcp_', suffix=self.call_id)
_logger.debug('Creating temporary files in directory: %s', self.temp_base_dir)

for file_path, content in self.files_content.items():
try:
sanitized_path = _sanitize_file_path(file_path)
temp_file_path = os.path.join(self.temp_base_dir, sanitized_path)

# Ensure the normalized path is still within our temp directory
normalized_temp_path = os.path.normpath(temp_file_path)
normalized_base_path = os.path.normpath(self.temp_base_dir)
if not normalized_temp_path.startswith(normalized_base_path + os.sep):
raise ValueError(f'Path escapes temporary directory: {file_path}')

os.makedirs(os.path.dirname(temp_file_path), exist_ok=True)

_logger.debug('Creating temp file: %s (from: %s)', temp_file_path, file_path)
with open(temp_file_path, 'w', encoding='UTF-8') as f:
f.write(content)

self.temp_files.append(temp_file_path)
except ValueError as e:
_logger.error('Invalid file path rejected: %s - %s', file_path, str(e))
continue
except Exception as e:
_logger.error('Failed to create temp file for %s: %s', file_path, str(e))
continue

if not self.temp_files:
raise ValueError('No valid files provided after sanitization')

return self.temp_files

def __exit__(self, *_) -> None:
if self.temp_base_dir and os.path.exists(self.temp_base_dir):
_logger.debug('Removing temp directory recursively: %s', self.temp_base_dir)
shutil.rmtree(self.temp_base_dir, ignore_errors=True)


async def _run_cycode_scan(scan_type: ScanTypeOption, temp_files: list[str]) -> dict[str, Any]:
Expand All @@ -153,21 +182,36 @@ async def _cycode_scan_tool(scan_type: ScanTypeOption, files: dict[str, str] = _
_logger.error('No files provided for scan')
return json.dumps({'error': 'No files provided'})

temp_files = _create_temp_files(files)

try:
_logger.info(
'Running Cycode scan, %s',
{'scan_type': scan_type, 'files_count': len(temp_files), 'call_id': _tool_call_id},
)
result = await _run_cycode_scan(scan_type, temp_files)
_logger.info('Scan completed, %s', {'scan_type': scan_type, 'call_id': _tool_call_id})
return json.dumps(result, indent=2)
with _TempFilesManager(files, _tool_call_id) as temp_files:
original_count = len(files)
processed_count = len(temp_files)

if processed_count < original_count:
_logger.warning(
'Some files were rejected during sanitization, %s',
{
'scan_type': scan_type,
'original_count': original_count,
'processed_count': processed_count,
'call_id': _tool_call_id,
},
)

_logger.info(
'Running Cycode scan, %s',
{'scan_type': scan_type, 'files_count': processed_count, 'call_id': _tool_call_id},
)
result = await _run_cycode_scan(scan_type, temp_files)

_logger.info('Scan completed, %s', {'scan_type': scan_type, 'call_id': _tool_call_id})
return json.dumps(result, indent=2)
except ValueError as e:
_logger.error('Invalid input files, %s', {'scan_type': scan_type, 'call_id': _tool_call_id, 'error': str(e)})
return json.dumps({'error': f'Invalid input files: {e!s}'}, indent=2)
except Exception as e:
_logger.error('Scan failed, %s', {'scan_type': scan_type, 'call_id': _tool_call_id, 'error': str(e)})
return json.dumps({'error': f'Scan failed: {e!s}'}, indent=2)
finally:
_cleanup_temp_files(temp_files)


async def cycode_secret_scan(files: dict[str, str] = _FILES_TOOL_FIELD) -> str:
Expand Down Expand Up @@ -197,6 +241,10 @@ async def cycode_sca_scan(files: dict[str, str] = _FILES_TOOL_FIELD) -> str:
- verify software supply chain security
- review package.json, requirements.txt, pom.xml and other dependency files

Important:
You must also include lock files (like package-lock.json, Pipfile.lock, etc.) to get accurate results.
You must provide manifest and lock files together.

Args:
files: Dictionary mapping file paths to their content

Expand Down
19 changes: 18 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ typer = "^0.15.3"
tenacity = ">=9.0.0,<9.1.0"
mcp = { version = ">=1.9.3,<2.0.0", markers = "python_version >= '3.10'" }
pydantic = ">=2.11.5,<3.0.0"
pathvalidate = ">=3.3.1,<4.0.0"

[tool.poetry.group.test.dependencies]
mock = ">=4.0.3,<4.1.0"
Expand Down