Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: fixed broken swe-bench docker image generation. #1262

Merged
merged 4 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/swe/agent/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List

from langchain_aws import BedrockChat
from langchain_aws import ChatBedrock
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langgraph.errors import GraphRecursionError
Expand Down Expand Up @@ -41,7 +41,7 @@ def retry_with_exponential_backoff(func, *args, **kwargs):
def get_llm_response(system_prompt: str, human_prompt: str) -> str:
try:
if MODEL == "claude":
client = BedrockChat(
client = ChatBedrock(
credentials_profile_name="default",
model_id="anthropic.claude-3-5-sonnet-20240620-v1:0",
region_name="us-west-2",
Expand Down
157 changes: 102 additions & 55 deletions python/swe/dockerfiles/build.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import signal
import subprocess
import threading
import typing as t
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
Expand All @@ -9,7 +11,89 @@
logs = Path.cwd() / "logs"
logs.mkdir(exist_ok=True)

errors = []
# Lock for synchronizing access to the success log
success_lock = threading.Lock()
# Global set to keep track of successfully pushed images
successful_builds = set()


# Load successful builds from logs/success.log into the global set
def load_successful_builds() -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for file operations in load_successful_builds() to handle potential IO errors gracefully:

def load_successful_builds() -> None:
    try:
        success_file = logs / "success.log"
        if success_file.exists():
            with open(success_file, "r", encoding="utf-8") as f:
                for line in f:
                    tag = line.strip()
                    if tag:
                        successful_builds.add(tag)
    except Exception as e:
        print(f"Error loading successful builds: {e}")

success_file = logs / "success.log"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using constants for frequently used strings and paths:

SUCCESS_LOG_FILE = "success.log"
DOCKER_IMAGE_PREFIX = "composio/swe"

# Then use them like:
success_file = logs / SUCCESS_LOG_FILE
full_tag = f"{DOCKER_IMAGE_PREFIX}:{tag_part}"

This makes the code more maintainable and reduces the risk of typos.

if success_file.exists():
with open(success_file, "r", encoding="utf-8") as f:
for line in f:
tag = line.strip()
if tag:
successful_builds.add(tag)


# Record a successful build by writing to the log file and updating the global set
def record_success(tag: str) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding docstrings to the new functions for better code documentation:

def record_success(tag: str) -> None:
    """Record a successful build by writing to the success log file and updating the global set.
    
    Args:
        tag: The docker image tag that was successfully built
        
    Thread-safe: Uses success_lock for synchronization
    """
    with success_lock:
        successful_builds.add(tag)
        with open(logs / "success.log", "a", encoding="utf-8") as f:
            f.write(tag + "\n")

with success_lock:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider validating the docker tag format before recording success to prevent invalid entries:

def is_valid_docker_tag(tag: str) -> bool:
    """Validate docker tag format according to Docker's rules."""
    # Basic validation - can be expanded based on needs
    return bool(re.match(r'^[a-z0-9][a-z0-9._-]*$', tag))

def record_success(tag: str) -> None:
    if not is_valid_docker_tag(tag):
        logger.warning(f"Invalid docker tag format: {tag}")
        return
    with success_lock:
        successful_builds.add(tag)
        with open(logs / "success.log", "a", encoding="utf-8") as f:
            f.write(tag + "\n")

successful_builds.add(tag)
with open(logs / "success.log", "a", encoding="utf-8") as f:
f.write(tag + "\n")


# Insert new global variables and functions for graceful stop and resume support
stop_requested = False


def handle_stop(signum, frame):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding proper logging instead of print statements for better observability and debugging:

def handle_stop(signum, frame):
    global stop_requested
    logger.info("Received stop signal. Gracefully stopping new builds...")
    stop_requested = True

global stop_requested
print("Received stop signal. Gracefully stopping new builds...")
stop_requested = True


def _base(generated: Path, multi: bool = False) -> None:
base = generated / "base"
with ThreadPoolExecutor() as executor:
futures = []
for file in base.iterdir():
if stop_requested:
print("Graceful stop activated. Halting base builds.")
break
try:
_, tag_part = file.name.split(".", maxsplit=1)
except ValueError:
print(f"Skipping invalid file name format: {file.name}")
continue
full_tag = f"composio/swe:{tag_part}"
if full_tag in successful_builds:
print(f"Skipping build for {full_tag} as it has been already pushed.")
continue
futures.append(executor.submit(_build, file, tag_part, multi))
[fut.result() for fut in futures]
Comment on lines +66 to +67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The futures.append() and fut.result() operations are not protected against stop_requested, allowing in-progress builds to continue after stop signal. Should check stop_requested before fut.result().

📝 Committable Code Suggestion

‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
futures.append(executor.submit(_build, file, tag_part, multi))
[fut.result() for fut in futures]
futures.append(executor.submit(_build, file, tag_part, multi))
[fut.result() for fut in futures if not stop_requested]



def _swes(generated: Path, multi: bool = False) -> None:
with ThreadPoolExecutor() as executor:
futures = []
for child in generated.iterdir():
if child.name == "base":
continue
if child.is_file():
continue
repo = child.name.replace("__", "-")
for version in child.iterdir():
if stop_requested:
print("Graceful stop activated. Halting SWES builds.")
break
tag_part = f"{repo}-{version.name.replace('.', '-') }"
full_tag = f"composio/swe:{tag_part}"
if full_tag in successful_builds:
print(
f"Skipping build for {full_tag} as it has been already pushed."
)
continue
futures.append(
executor.submit(_build, version / "Dockerfile", tag_part, multi)
)
if stop_requested:
break
[fut.result() for fut in futures]


ARCHS = ("linux/arm64", "linux/amd64")

Expand Down Expand Up @@ -50,81 +134,44 @@ def _build(file: Path, tag: str, multi: bool, *flags: str) -> None:

if process.returncode == 0:
print(f"Finished build for {tag}")
record_success(tag)
else:
print(f"Error building {tag} - {logs / log}")
Comment on lines 134 to 139
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing error handling for failed builds - successful builds are tracked but failures are silently ignored. Should maintain error state and handle failed builds appropriately.

📝 Committable Code Suggestion

‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
if process.returncode == 0:
print(f"Finished build for {tag}")
record_success(tag)
else:
print(f"Error building {tag} - {logs / log}")
if process.returncode == 0:
print(f"Finished build for {tag}")
record_success(tag)
else:
error_msg = f"Error building {tag} - {logs / log}"
print(error_msg)
record_failure(tag, error_msg)
raise BuildError(error_msg)

errors.append(f"Error building {tag} - {logs / log}")


def _base(generated: Path, multi: bool = False) -> None:
base = generated / "base"
with ThreadPoolExecutor() as executor:
futures = []
for file in base.iterdir():
_, tag = file.name.split(".", maxsplit=1)
futures.append(executor.submit(_build, file, tag, multi))
_ = [fut.result() for fut in futures]


def _swes(generated: Path, multi: bool = False) -> None:
with ThreadPoolExecutor() as executor:
futures = []
for child in generated.iterdir():
if child.name == "base":
continue

if child.is_file():
continue

repo = child.name.replace("__", "-")
for version in child.iterdir():
tag = f"{repo}-{version.name.replace('.', '-')}"
futures.append(
executor.submit(
_build,
version / "Dockerfile",
tag,
multi,
)
)

_ = [fut.result() for fut in futures]


def _pyenv(file: t.Optional[Path] = None, multi: bool = False) -> None:
print("Print building pyenv base")
file = file or Path(__file__).parent / "templates" / "Dockerfile.pyenv"
full_tag = "composio/swe:pyenv"
if full_tag in successful_builds:
print(f"Skipping build for {full_tag} as it has already been pushed.")
return
_build(file=file, tag="pyenv", multi=multi)


@click.command(name="build")
@click.argument(
"generated",
type=str,
default="./generated",
)
@click.argument("generated", type=str, default="./generated")
@click.option(
"--multi",
is_flag=True,
help="Use this flag to build multi-plaform images",
"--multi", is_flag=True, help="Use this flag to build multi-plaform images"
)
def build(generated: Path, multi: bool = False) -> None:
"""Build docker images for SWEKIT."""
load_successful_builds()
signal.signal(signal.SIGINT, handle_stop)
signal.signal(signal.SIGTERM, handle_stop)

_pyenv(multi=multi)
if len(errors) > 0:
print("==== Errors ====")
print("\n".join(errors))
return
print("==== Successful Builds (after pyenv) ====")
print("\n".join(successful_builds))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider sorting the 'successful_builds' set before printing for reproducible output.


generated = Path(generated or Path.cwd() / "generated").resolve()
_base(generated=generated, multi=multi)
if len(errors) > 0:
print("==== Errors ====")
print("\n".join(errors))
return
print("==== Successful Builds (after base) ====")
print("\n".join(successful_builds))

_swes(generated=generated, multi=multi)
print("==== Errors ====")
print("\n".join(errors))
print("==== Final Successful Builds ====")
print("\n".join(successful_builds))


if __name__ == "__main__":
Expand Down
40 changes: 29 additions & 11 deletions python/swe/dockerfiles/create_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
from pathlib import Path

import click
from swebench import get_eval_refs
from swebench.harness.constants import SWEbenchInstance
from swebench.harness.utils import load_swebench_dataset

from composio import Action, ComposioToolSet
from composio.utils.logging import WithLogger


def group_task_instances(task_instances):
def group_task_instances(task_instances: list[SWEbenchInstance]):
groups = {}
for instance in task_instances:
repo = instance["repo"]
Expand Down Expand Up @@ -43,11 +44,25 @@ def __init__(
self.outdir.mkdir()

def generate(self):
task_instances = get_eval_refs(data_path_or_name=self.dataset)
task_instance_groups = group_task_instances(task_instances.values())
task_instances = load_swebench_dataset(name=self.dataset)
task_instance_groups = group_task_instances(task_instances)
for repo, versions in task_instance_groups.items():
self.logger.info(f"Repo {repo} with {set(versions.keys())} versions")
for version, instances in versions.items():
outname = _repo_name(repo)
docker_outdir = Path("generated") / outname / version

# Check if files in generated directory are complete
if (
docker_outdir.exists()
and (docker_outdir / "deeplake").exists()
and (docker_outdir / "fqdn_cache.json").exists()
):
self.logger.info(
f"Skipping {repo} {version} - files already exist in generated directory"
)
continue

self.logger.info(f"\tGenerating for version - {version}")
self.create_index(
repository=repo, version=version, setup_ref_instance=instances[0]
Expand All @@ -58,8 +73,8 @@ def create_index(
):
outname = _repo_name(repository)
outdir = self.outdir / outname / version
if outdir.exists():
return
docker_outdir = Path("generated") / outname / version

repo_url = f"https://github.com/{repository}.git"
base_commit = setup_ref_instance["base_commit"]
if not (outdir / outname).exists():
Expand All @@ -76,10 +91,16 @@ def create_index(
["git", "checkout", base_commit], cwd=outdir / outname, check=True
)

composio_toolset = ComposioToolSet()
composio_toolset = ComposioToolSet(
metadata={
Action.CODE_ANALYSIS_TOOL_CREATE_CODE_MAP: {
"dir_to_index_path": str(outdir / outname),
},
},
)
composio_toolset.execute_action(
action=Action.CODE_ANALYSIS_TOOL_CREATE_CODE_MAP,
params={"dir_to_index_path": str(outdir / outname)},
params={},
)
with open(f"{Path.home()}/.composio/tmp/{outname}/fqdn_cache.json") as f:
fqdn_index = json.load(f)
Expand All @@ -91,16 +112,13 @@ def create_index(
)
fqdn_index[k] = v

docker_outdir = Path("generated") / outname / version
# docker_outdir.mkdir(exist_ok=True, parents=True)
with open(
docker_outdir / "fqdn_cache.json",
"w",
) as f:
json.dump(fqdn_index, f, indent=4)

DEEPLAKE_PATH = docker_outdir / "deeplake"
# DEEPLAKE_PATH.mkdir(exist_ok=True, parents=True)
if not DEEPLAKE_PATH.exists():
shutil.copytree(
f"{Path.home()}/.composio/tmp/{outname}/deeplake",
Comment on lines 122 to 124
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition when checking and creating directories. Use mkdir(parents=True, exist_ok=True) instead of separate check and create.

📝 Committable Code Suggestion

‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
if not DEEPLAKE_PATH.exists():
shutil.copytree(
f"{Path.home()}/.composio/tmp/{outname}/deeplake",
DEEPLAKE_PATH.mkdir(parents=True, exist_ok=True)
shutil.copytree(
f"{Path.home()}/.composio/tmp/{outname}/deeplake",

Expand Down
14 changes: 7 additions & 7 deletions python/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,39 @@ basepython = python
deps =
isort==5.12.0
commands =
isort composio/ scripts/ tests/ swe/ --profile black
isort composio/ scripts/ tests/ swe/ --profile black --skip swe/dockerfiles/generated --skip swe/dockerfiles/indexed
isort plugins/ --profile black

[testenv:isort-check]
basepython = python3
deps =
isort==5.12.0
commands =
isort composio/ scripts/ tests/ swe/ --check --profile black
isort composio/ scripts/ tests/ swe/ --check --profile black --skip swe/dockerfiles/generated --skip swe/dockerfiles/indexed
isort plugins/ --check --profile black

[testenv:black]
basepython = python
deps =
black==24.10.0
commands =
black composio/ scripts/ tests/ swe/
black composio/ scripts/ tests/ swe/ --exclude "swe/dockerfiles/(generated|indexed)"
black plugins/

[testenv:black-check]
basepython = python3
deps =
black==24.10.0
commands =
black composio/ scripts/ tests/ swe/ --check
black composio/ scripts/ tests/ swe/ --check --exclude "swe/dockerfiles/(generated|indexed)"
black plugins/ --check

[testenv:black-diff]
basepython = python3
deps =
black==24.10.0
commands =
black composio/ scripts/ tests/ swe/ --check --diff
black composio/ scripts/ tests/ swe/ --check --diff --exclude "swe/dockerfiles/(generated|indexed)"
black plugins/ --check --diff

[testenv:mypy]
Expand Down Expand Up @@ -171,15 +171,15 @@ sections=FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,PLUGINS,PACKAGES,LOCALFOLDER

[flake8]
max_line_length = 200
exclude= **/build, **/dist
exclude = **/build, **/dist, swe/dockerfiles/generated/*, swe/dockerfiles/indexed/*
per-file-ignores =
__init__.py:F401,W503
tests/**:E501
ignore = E231, W291, W503, E704

[mypy]
strict_optional = True
exclude=plugins/.*/setup\.py|plugins/.*/build/lib/|swe/build/lib/
exclude = plugins/.*/setup\.py|plugins/.*/build/lib/|swe/build/lib/|swe/dockerfiles/generated/.*|swe/dockerfiles/indexed/.*
ignore_missing_imports = True

[mypy-requests.*]
Expand Down