Skip to content

Extend the upload script for other local use cases #36

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/vllm-benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ jobs:
ls -lah "${BENCHMARK_RESULTS}"

python upload_benchmark_results.py \
--vllm vllm \
--repo vllm \
--benchmark-name "vLLM benchmark" \
--benchmark-results "${BENCHMARK_RESULTS}" \
--device "${GPU_DEVICE}" \
--s3-bucket ossci-benchmarks \
--model "${MODELS//\//_}"
221 changes: 166 additions & 55 deletions vllm-benchmarks/upload_benchmark_results.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3

import requests
import glob
import gzip
import json
Expand All @@ -11,6 +12,7 @@
from argparse import Action, ArgumentParser, Namespace
from logging import info, warning
from typing import Any, Dict, List, Optional, Tuple
from json.decoder import JSONDecodeError

import boto3
import psutil
Expand All @@ -19,8 +21,8 @@

logging.basicConfig(level=logging.INFO)


REPO = "vllm-project/vllm"
username = os.environ.get("UPLOADER_USERNAME")
password = os.environ.get("UPLOADER_PASSWORD")


class ValidateDir(Action):
Expand All @@ -38,47 +40,87 @@ def __call__(
parser.error(f"{values} is not a valid directory")


class ValidateURL(Action):
def __call__(
self,
parser: ArgumentParser,
namespace: Namespace,
values: Any,
option_string: Optional[str] = None,
) -> None:
if username or password:
setattr(namespace, self.dest, values)
return

parser.error(f"No username or password set for URL {values}")


def parse_args() -> Any:
parser = ArgumentParser("Upload vLLM benchmarks results to S3")
vllm_metadata = parser.add_mutually_exclusive_group(required=True)
vllm_metadata.add_argument(
"--vllm",
parser = ArgumentParser("Upload benchmarks results")

# Git metadata
repo_metadata = parser.add_mutually_exclusive_group(required=True)
repo_metadata.add_argument(
"--repo",
type=str,
action=ValidateDir,
help="the directory that vllm repo is checked out",
help="the directory that the repo is checked out",
)
branch_commit = repo_metadata.add_argument_group("the branch and commit metadata")
branch_commit.add_argument(
"--repo-name",
type=str,
help="the name of the repo",
)
branch_commit = vllm_metadata.add_argument_group("vLLM branch and commit metadata")
branch_commit.add_argument(
"--head-branch",
type=str,
default="main",
help="the name of the vLLM branch the benchmark runs on",
help="the name of the branch the benchmark runs on",
)
branch_commit.add_argument(
"--head-sha",
type=str,
help="the commit SHA the benchmark runs on",
)

# Benchmark info
parser.add_argument(
"--benchmark-results",
"--benchmark-name",
type=str,
required=True,
action=ValidateDir,
help="the directory with the benchmark results",
help="the name of the benchmark",
)
parser.add_argument(
"--s3-bucket",
"--benchmark-results",
type=str,
required=False,
default="ossci-benchmarks",
help="the S3 bucket to upload the benchmark results",
required=True,
action=ValidateDir,
help="the directory with the benchmark results",
)

# Device info
parser.add_argument(
"--device",
type=str,
required=True,
help="the name of the GPU device coming from nvidia-smi or amd-smi",
)

# Where to upload
uploader = parser.add_mutually_exclusive_group(required=True)
uploader.add_argument(
"--s3-bucket",
type=str,
help="the S3 bucket to upload the benchmark results to",
)
uploader.add_argument(
"--upload-url",
type=str,
action=ValidateURL,
help="the URL to upload the benchmark results to",
)

parser.add_argument(
"--model",
type=str,
Expand All @@ -92,27 +134,33 @@ def parse_args() -> Any:
return parser.parse_args()


def get_git_metadata(vllm_dir: str) -> Tuple[str, str]:
repo = Repo(vllm_dir)
def get_git_metadata(repo_dir: str) -> Tuple[str, str]:
repo = Repo(repo_dir)
# Git metadata
repo_name = repo.remotes.origin.url.split(".git")[0].split(":")[-1]
hexsha = repo.head.object.hexsha
committed_date = repo.head.object.committed_date

try:
return (
repo_name,
repo.active_branch.name,
repo.head.object.hexsha,
repo.head.object.committed_date,
hexsha,
committed_date,
)
except TypeError:
# This is a detached HEAD, default the branch to main
return "main", repo.head.object.hexsha, repo.head.object.committed_date
return repo_name, "main", hexsha, committed_date


def get_benchmark_metadata(
head_branch: str, head_sha: str, timestamp: int
repo_name: str, head_branch: str, head_sha: str, timestamp: int, benchmark_name
) -> Dict[str, Any]:
return {
"timestamp": timestamp,
"schema_version": "v3",
"name": "vLLM benchmark",
"repo": REPO,
"name": benchmark_name,
"repo": repo_name,
"head_branch": head_branch,
"head_sha": head_sha,
"workflow_id": os.getenv("WORKFLOW_ID", timestamp),
Expand Down Expand Up @@ -146,27 +194,58 @@ def get_runner_info() -> Dict[str, Any]:
}


def load(benchmark_results: str) -> Dict[str, List]:
def read_benchmark_results(filepath: str) -> List[Dict[str, Any]]:
results = []
with open(filepath) as f:
try:
r = json.load(f)
# Handle the JSONEachRow case where there is only one record in the
# JSON file, it can still be loaded normally, but will need to be
# added into the list of benchmark results with the length of 1
if isinstance(r, dict):
results.append(r)
elif isinstance(r, list):
results = r

except JSONDecodeError:
f.seek(0)

# Try again in ClickHouse JSONEachRow format
for line in f:
try:
r = json.loads(line)
# Each row needs to be a dictionary in JSON format or a list
if isinstance(r, dict):
results.append(r)
elif isinstance(r, list):
results.extend(r)
else:
warning(f"Not a JSON dict or list {line}, skipping")
continue

except JSONDecodeError:
warning(f"Invalid JSON {line}, skipping")

return results


def load(benchmark_results_dir: str) -> Dict[str, List]:
results = {}

for file in glob.glob(f"{benchmark_results}/*.json"):
for file in glob.glob(f"{benchmark_results_dir}/*.json"):
filename = os.path.basename(file)
with open(file) as f:
try:
r = json.load(f)
except json.JSONDecodeError as e:
warning(f"Fail to load {file}: {e}")
continue
r = read_benchmark_results(file)

if not r:
warning(f"Find no benchmark results in {file}")
continue
if not r:
warning(f"{file} is empty")
continue

if type(r) is not list or "benchmark" not in r[0]:
warning(f"Find no PyTorch benchmark results in {file}")
continue
if type(r) is not list or "benchmark" not in r[0]:
warning(f"Find no PyTorch benchmark results in {file}")
continue

results[filename] = r
info(f"Loading benchmark results from {file}")
results[filename] = r

return results

Expand All @@ -183,8 +262,39 @@ def aggregate(
return aggregated_results


def upload_to_s3(
def upload_s3(s3_bucket: str, s3_path: str, data: str) -> None:
boto3.resource("s3").Object(
f"{s3_bucket}",
f"{s3_path}",
).put(
Body=gzip.compress(data.encode()),
ContentEncoding="gzip",
ContentType="application/json",
)


def upload_via_api(
upload_url: str,
s3_path: str,
data: str,
) -> None:
json_data = {
"username": os.environ.get("UPLOADER_USERNAME"),
"password": os.environ.get("UPLOADER_PASSWORD"),
"content": data,
"s3_path": s3_path,
}

headers = {"content-type": "application/json"}

r = requests.post(upload_url, json=json_data, headers=headers)
info(r.content)


def upload(
s3_bucket: str,
upload_url: str,
repo_name: str,
head_branch: str,
head_sha: str,
aggregated_results: List[Dict[str, Any]],
Expand All @@ -193,41 +303,42 @@ def upload_to_s3(
dry_run: bool = True,
) -> None:
model_suffix = f"_{model}" if model else ""
s3_path = f"v3/{REPO}/{head_branch}/{head_sha}/{device}/benchmark_results{model_suffix}.json"
info(f"Upload benchmark results to s3://{s3_bucket}/{s3_path}")
s3_path = f"v3/{repo_name}/{head_branch}/{head_sha}/{device}/benchmark_results{model_suffix}.json"
info(f"Upload benchmark results to {s3_path}")
if not dry_run:
# Write in JSONEachRow format
data = "\n".join([json.dumps(r) for r in aggregated_results])
boto3.resource("s3").Object(
f"{s3_bucket}",
f"{s3_path}",
).put(
Body=gzip.compress(data.encode()),
ContentEncoding="gzip",
ContentType="application/json",
)
if s3_bucket:
upload_s3(s3_bucket, s3_path, data)
elif upload_url:
upload_via_api(upload_url, s3_path, data)


def main() -> None:
args = parse_args()

if args.vllm:
head_branch, head_sha, timestamp = get_git_metadata(args.vllm)
if args.repo:
repo_name, head_branch, head_sha, timestamp = get_git_metadata(args.repo)
else:
head_branch, head_sha, timestamp = (
repo_name, head_branch, head_sha, timestamp = (
args.repo_name,
args.head_branch,
args.head_sha,
int(time.time()),
)

# Gather some information about the benchmark
metadata = get_benchmark_metadata(head_branch, head_sha, timestamp)
metadata = get_benchmark_metadata(
repo_name, head_branch, head_sha, timestamp, args.benchmark_name
)
runner = get_runner_info()

# Extract and aggregate the benchmark results
aggregated_results = aggregate(metadata, runner, load(args.benchmark_results))
upload_to_s3(
upload(
args.s3_bucket,
args.upload_url,
repo_name,
head_branch,
head_sha,
aggregated_results,
Expand Down