Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(security): fix path traversal vulnerability (CVE-2024-10834) #2267

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 62 additions & 11 deletions dbgpt/serve/agent/hub/plugin_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import json
import logging
import os
import re
import shutil
import tempfile
from pathlib import Path
from typing import Any

from fastapi import UploadFile
Expand Down Expand Up @@ -165,24 +167,73 @@ def refresh_hub_from_git(
except Exception as e:
raise ValueError(f"Update Agent Hub Db Info Faild!{str(e)}")

def _sanitize_filename(self, filename: str) -> str:
"""Sanitize the filename to prevent directory traversal attacks.

Args:
filename: The original filename

Returns:
str: Sanitized filename
"""
# Only keep the basic filename, remove any path information
filename = os.path.basename(filename)

# Remove any unsafe characters
filename = re.sub(r"[^a-zA-Z0-9._-]", "", filename)

# Ensure the filename is not empty and valid
if not filename or filename.startswith("."):
raise ValueError("Invalid filename")

return filename

async def upload_my_plugin(self, doc_file: UploadFile, user: Any = Default_User):
# We can not move temp file in windows system when we open file in context of `with`
file_path = os.path.join(self.plugin_dir, doc_file.filename)
# Verify and clean file names
try:
safe_filename = self._sanitize_filename(doc_file.filename)
except ValueError as e:
raise ValueError(f"Invalid plugin file: {str(e)}")

# Structure a safe file path
file_path = os.path.join(self.plugin_dir, safe_filename)

# Verify the final path is within the allowed directory
if (
not Path(file_path)
.resolve()
.is_relative_to(Path(self.plugin_dir).resolve())
):
raise ValueError("Invalid file path")

if os.path.exists(file_path):
os.remove(file_path)
tmp_fd, tmp_path = tempfile.mkstemp(dir=os.path.join(self.plugin_dir))
with os.fdopen(tmp_fd, "wb") as tmp:
tmp.write(await doc_file.read())
shutil.move(
tmp_path,
os.path.join(self.plugin_dir, doc_file.filename),
)

my_plugins = scan_plugins(self.plugin_dir, doc_file.filename)
# Use a temporary file for secure file writing
tmp_fd, tmp_path = tempfile.mkstemp(dir=self.plugin_dir)
try:
with os.fdopen(tmp_fd, "wb") as tmp:
tmp.write(await doc_file.read())
shutil.move(tmp_path, file_path)
except Exception as e:
# Ensure the temporary file is cleaned up
if os.path.exists(tmp_path):
os.remove(tmp_path)
raise e

# Scan and validate the plugin
try:
my_plugins = scan_plugins(self.plugin_dir, safe_filename)
except Exception as e:
# If the plugin validation fails, clean up the uploaded file
if os.path.exists(file_path):
os.remove(file_path)
raise ValueError(f"Invalid plugin file: {str(e)}")

if user is None or len(user) <= 0:
user = Default_User

# Update the database
for my_plugin in my_plugins:
my_plugin_entiy = self.my_plugin_dao.get_by_user_and_plugin(
user, my_plugin._name
Expand All @@ -195,7 +246,7 @@ async def upload_my_plugin(self, doc_file: UploadFile, user: Any = Default_User)
my_plugin_entiy.user_code = user
my_plugin_entiy.user_name = user
my_plugin_entiy.tenant = ""
my_plugin_entiy.file_name = doc_file.filename
my_plugin_entiy.file_name = safe_filename
self.my_plugin_dao.raw_update(my_plugin_entiy)

def reload_my_plugins(self):
Expand Down
Loading