Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 179 additions & 41 deletions src/dstack/_internal/cli/utils/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,24 @@

from dstack._internal.cli.utils.common import NO_OFFERS_WARNING, add_row_from_dict, console
from dstack._internal.core.models.configurations import DevEnvironmentConfiguration
from dstack._internal.core.models.instances import InstanceAvailability
from dstack._internal.core.models.instances import InstanceAvailability, Resources
from dstack._internal.core.models.profiles import (
DEFAULT_RUN_TERMINATION_IDLE_TIME,
TerminationPolicy,
)
from dstack._internal.core.models.runs import (
Job,
JobProvisioningData,
JobRuntimeData,
JobStatus,
JobSubmission,
Probe,
ProbeSpec,
RunPlan,
RunStatus,
)
from dstack._internal.core.models.runs import (
Run as CoreRun,
)
from dstack._internal.core.services.profiles import get_termination
from dstack._internal.utils.common import (
Expand Down Expand Up @@ -182,15 +190,151 @@ def th(s: str) -> str:
console.print(NO_OFFERS_WARNING)


def _format_run_status(run) -> str:
status_text = (
run.latest_job_submission.status_message
if run.status.is_finished() and run.latest_job_submission
else run.status_message
)
# Inline of _get_run_status_style
color_map = {
RunStatus.PENDING: "white",
RunStatus.SUBMITTED: "grey",
RunStatus.PROVISIONING: "deep_sky_blue1",
RunStatus.RUNNING: "sea_green3",
RunStatus.TERMINATING: "deep_sky_blue1",
RunStatus.TERMINATED: "grey",
RunStatus.FAILED: "indian_red1",
RunStatus.DONE: "grey",
}
if status_text == "no offers" or status_text == "interrupted":
color = "gold1"
elif status_text == "pulling":
color = "sea_green3"
else:
color = color_map.get(run.status, "white")
status_style = f"bold {color}" if not run.status.is_finished() else color
return f"[{status_style}]{status_text}[/]"


def _format_job_submission_status(job_submission: JobSubmission, verbose: bool) -> str:
status_message = job_submission.status_message
job_status = job_submission.status
if status_message in ("no offers", "interrupted"):
color = "gold1"
elif status_message == "stopped":
color = "grey"
else:
color_map = {
JobStatus.SUBMITTED: "grey",
JobStatus.PROVISIONING: "deep_sky_blue1",
JobStatus.PULLING: "sea_green3",
JobStatus.RUNNING: "sea_green3",
JobStatus.TERMINATING: "deep_sky_blue1",
JobStatus.TERMINATED: "grey",
JobStatus.ABORTED: "gold1",
JobStatus.FAILED: "indian_red1",
JobStatus.DONE: "grey",
}
color = color_map.get(job_status, "white")
status_style = f"bold {color}" if not job_status.is_finished() else color
formatted_status_message = f"[{status_style}]{status_message}[/]"
if verbose and job_submission.inactivity_secs:
inactive_for = format_duration_multiunit(job_submission.inactivity_secs)
formatted_status_message += f" (inactive for {inactive_for})"
return formatted_status_message


def _get_show_deployment_replica_job(run: CoreRun, verbose: bool) -> tuple[bool, bool, bool]:
show_deployment_num = (
verbose and run.run_spec.configuration.type == "service"
) or run.is_deployment_in_progress()

replica_nums = {job.job_spec.replica_num for job in run.jobs}
show_replica = len(replica_nums) > 1

jobs_by_replica: Dict[int, List[Any]] = {}
for job in run.jobs:
replica_num = job.job_spec.replica_num
if replica_num not in jobs_by_replica:
jobs_by_replica[replica_num] = []
jobs_by_replica[replica_num].append(job)

show_job = any(
len({j.job_spec.job_num for j in jobs}) > 1 for jobs in jobs_by_replica.values()
)

return show_deployment_num, show_replica, show_job


def _format_job_name(
job: Job,
latest_job_submission: JobSubmission,
show_deployment_num: bool,
show_replica: bool,
show_job: bool,
) -> str:
name_parts = []
if show_replica:
name_parts.append(f"replica={job.job_spec.replica_num}")
if show_job:
name_parts.append(f"job={job.job_spec.job_num}")
name_suffix = (
f" deployment={latest_job_submission.deployment_num}" if show_deployment_num else ""
)
name_value = " " + (" ".join(name_parts) if name_parts else "")
name_value += name_suffix
return name_value


def _format_price(price: float, is_spot: bool) -> str:
price_str = f"${price:.4f}".rstrip("0").rstrip(".")
if is_spot:
price_str += " (spot)"
return price_str


def _format_backend(backend: Any, region: str) -> str:
backend_str = getattr(backend, "value", backend)
backend_str = backend_str.replace("remote", "ssh")
return f"{backend_str} ({region})"


def _format_instance_type(jpd: JobProvisioningData, jrd: Optional[JobRuntimeData]) -> str:
instance_type = jpd.instance_type.name
if jrd is not None and getattr(jrd, "offer", None) is not None:
if jrd.offer.total_blocks > 1:
instance_type += f" ({jrd.offer.blocks}/{jrd.offer.total_blocks})"
if jpd.reservation:
instance_type += f" ({jpd.reservation})"
return instance_type


def _get_resources(jpd: JobProvisioningData, jrd: Optional[JobRuntimeData]) -> Resources:
resources: Resources = jpd.instance_type.resources
if jrd is not None and getattr(jrd, "offer", None) is not None:
resources = jrd.offer.instance.resources
return resources


def _format_run_name(run: CoreRun, show_deployment_num: bool) -> str:
parts: List[str] = [run.run_spec.run_name]
if show_deployment_num:
parts.append(f" [secondary]deployment={run.deployment_num}[/]")
return "".join(parts)


def get_runs_table(
runs: List[Run], verbose: bool = False, format_date: DateFormatter = pretty_date
) -> Table:
table = Table(box=None, expand=shutil.get_terminal_size(fallback=(120, 40)).columns <= 110)
table.add_column("NAME", style="bold", no_wrap=True, ratio=2)
table.add_column("BACKEND", style="grey58", ratio=2)
table.add_column("RESOURCES", ratio=3 if not verbose else 2)
if verbose:
table.add_column("INSTANCE TYPE", no_wrap=True, ratio=1)
table.add_column("RESOURCES", style="grey58", ratio=3)
table.add_column("INSTANCE TYPE", style="grey58", no_wrap=True, ratio=1)
else:
table.add_column("GPU", ratio=2)
table.add_column("PRICE", style="grey58", ratio=1)
table.add_column("STATUS", no_wrap=True, ratio=1)
if verbose or any(
Expand All @@ -205,22 +349,18 @@ def get_runs_table(

for run in runs:
run = run._run # TODO(egor-s): make public attribute
show_deployment_num = (
verbose
and run.run_spec.configuration.type == "service"
or run.is_deployment_in_progress()
show_deployment_num, show_replica, show_job = _get_show_deployment_replica_job(
run, verbose
)
merge_job_rows = len(run.jobs) == 1 and not show_deployment_num

run_row: Dict[Union[str, int], Any] = {
"NAME": run.run_spec.run_name
+ (f" [secondary]deployment={run.deployment_num}[/]" if show_deployment_num else ""),
"NAME": _format_run_name(run, show_deployment_num),
"SUBMITTED": format_date(run.submitted_at),
"STATUS": (
run.latest_job_submission.status_message
if run.status.is_finished() and run.latest_job_submission
else run.status_message
),
"STATUS": _format_run_status(run),
"RESOURCES": "-",
"GPU": "-",
"PRICE": "-",
}
if run.error:
run_row["ERROR"] = run.error
Expand All @@ -229,46 +369,44 @@ def get_runs_table(

for job in run.jobs:
latest_job_submission = job.job_submissions[-1]
status = latest_job_submission.status.value
if verbose and latest_job_submission.inactivity_secs:
inactive_for = format_duration_multiunit(latest_job_submission.inactivity_secs)
status += f" (inactive for {inactive_for})"
status_formatted = _format_job_submission_status(latest_job_submission, verbose)

job_row: Dict[Union[str, int], Any] = {
"NAME": f" replica={job.job_spec.replica_num} job={job.job_spec.job_num}"
+ (
f" deployment={latest_job_submission.deployment_num}"
if show_deployment_num
else ""
"NAME": _format_job_name(
job, latest_job_submission, show_deployment_num, show_replica, show_job
),
"STATUS": latest_job_submission.status_message,
"STATUS": status_formatted,
"PROBES": _format_job_probes(
job.job_spec.probes, latest_job_submission.probes, latest_job_submission.status
),
"SUBMITTED": format_date(latest_job_submission.submitted_at),
"ERROR": latest_job_submission.error,
"RESOURCES": "-",
"GPU": "-",
"PRICE": "-",
}
jpd = latest_job_submission.job_provisioning_data
if jpd is not None:
resources = jpd.instance_type.resources
instance_type = jpd.instance_type.name
jrd = latest_job_submission.job_runtime_data
if jrd is not None and jrd.offer is not None:
resources = jrd.offer.instance.resources
if jrd.offer.total_blocks > 1:
instance_type += f" ({jrd.offer.blocks}/{jrd.offer.total_blocks})"
if jpd.reservation:
instance_type += f" ({jpd.reservation})"
job_row.update(
{
"BACKEND": f"{jpd.backend.value.replace('remote', 'ssh')} ({jpd.region})",
"RESOURCES": resources.pretty_format(include_spot=True),
"INSTANCE TYPE": instance_type,
"PRICE": f"${jpd.price:.4f}".rstrip("0").rstrip("."),
}
)
resources = _get_resources(jpd, jrd)
update_dict: Dict[Union[str, int], Any] = {
"BACKEND": _format_backend(jpd.backend, jpd.region),
"RESOURCES": resources.pretty_format(include_spot=False),
"GPU": resources.pretty_format(gpu_only=True, include_spot=False),
"INSTANCE TYPE": _format_instance_type(jpd, jrd),
"PRICE": _format_price(jpd.price, resources.spot),
}
job_row.update(update_dict)
if merge_job_rows:
# merge rows
_status = job_row["STATUS"]
_resources = job_row["RESOURCES"]
_gpu = job_row["GPU"]
_price = job_row["PRICE"]
job_row.update(run_row)
job_row["RESOURCES"] = _resources
job_row["GPU"] = _gpu
job_row["PRICE"] = _price
job_row["STATUS"] = _status
add_row_from_dict(table, job_row, style="secondary" if len(run.jobs) != 1 else None)

return table
Expand Down
19 changes: 18 additions & 1 deletion src/dstack/_internal/core/models/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,23 @@ def _pretty_format(
gpus: List[Gpu],
spot: bool,
include_spot: bool = False,
gpu_only: bool = False,
) -> str:
if gpu_only:
if not gpus:
return "-"
gpu = gpus[0]
gpu_resources = {
"gpu_name": gpu.name,
"gpu_count": len(gpus),
}
if gpu.memory_mib > 0:
gpu_resources["gpu_memory"] = f"{gpu.memory_mib / 1024:.0f}GB"
output = pretty_resources(**gpu_resources)
if include_spot and spot:
output += " (spot)"
return output

resources = {}
if cpus > 0:
resources["cpus"] = cpus
Expand All @@ -103,7 +119,7 @@ def _pretty_format(
output += " (spot)"
return output

def pretty_format(self, include_spot: bool = False) -> str:
def pretty_format(self, include_spot: bool = False, gpu_only: bool = False) -> str:
return Resources._pretty_format(
self.cpus,
self.cpu_arch,
Expand All @@ -112,6 +128,7 @@ def pretty_format(self, include_spot: bool = False) -> str:
self.gpus,
self.spot,
include_spot,
gpu_only,
)


Expand Down
19 changes: 19 additions & 0 deletions src/tests/_internal/cli/utils/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from unittest.mock import Mock

import pytest

from dstack._internal.server.services.docker import ImageConfig, ImageConfigObject


@pytest.fixture
def image_config_mock(monkeypatch: pytest.MonkeyPatch) -> ImageConfig:
image_config = ImageConfig.parse_obj({"User": None, "Entrypoint": None, "Cmd": ["/bin/bash"]})
monkeypatch.setattr(
"dstack._internal.server.services.jobs.configurators.base._get_image_config",
Mock(return_value=image_config),
)
monkeypatch.setattr(
"dstack._internal.server.services.docker.get_image_config",
Mock(return_value=ImageConfigObject(config=image_config)),
)
return image_config
Loading