diff --git a/src/dstack/_internal/cli/utils/run.py b/src/dstack/_internal/cli/utils/run.py index 58497c084..5e68143b4 100644 --- a/src/dstack/_internal/cli/utils/run.py +++ b/src/dstack/_internal/cli/utils/run.py @@ -6,16 +6,24 @@ from dstack._internal.cli.utils.common import NO_OFFERS_WARNING, add_row_from_dict, console from dstack._internal.core.models.configurations import DevEnvironmentConfiguration -from dstack._internal.core.models.instances import InstanceAvailability +from dstack._internal.core.models.instances import InstanceAvailability, Resources from dstack._internal.core.models.profiles import ( DEFAULT_RUN_TERMINATION_IDLE_TIME, TerminationPolicy, ) from dstack._internal.core.models.runs import ( + Job, + JobProvisioningData, + JobRuntimeData, JobStatus, + JobSubmission, Probe, ProbeSpec, RunPlan, + RunStatus, +) +from dstack._internal.core.models.runs import ( + Run as CoreRun, ) from dstack._internal.core.services.profiles import get_termination from dstack._internal.utils.common import ( @@ -182,15 +190,151 @@ def th(s: str) -> str: console.print(NO_OFFERS_WARNING) +def _format_run_status(run) -> str: + status_text = ( + run.latest_job_submission.status_message + if run.status.is_finished() and run.latest_job_submission + else run.status_message + ) + # Inline of _get_run_status_style + color_map = { + RunStatus.PENDING: "white", + RunStatus.SUBMITTED: "grey", + RunStatus.PROVISIONING: "deep_sky_blue1", + RunStatus.RUNNING: "sea_green3", + RunStatus.TERMINATING: "deep_sky_blue1", + RunStatus.TERMINATED: "grey", + RunStatus.FAILED: "indian_red1", + RunStatus.DONE: "grey", + } + if status_text == "no offers" or status_text == "interrupted": + color = "gold1" + elif status_text == "pulling": + color = "sea_green3" + else: + color = color_map.get(run.status, "white") + status_style = f"bold {color}" if not run.status.is_finished() else color + return f"[{status_style}]{status_text}[/]" + + +def _format_job_submission_status(job_submission: JobSubmission, verbose: bool) -> str: + status_message = job_submission.status_message + job_status = job_submission.status + if status_message in ("no offers", "interrupted"): + color = "gold1" + elif status_message == "stopped": + color = "grey" + else: + color_map = { + JobStatus.SUBMITTED: "grey", + JobStatus.PROVISIONING: "deep_sky_blue1", + JobStatus.PULLING: "sea_green3", + JobStatus.RUNNING: "sea_green3", + JobStatus.TERMINATING: "deep_sky_blue1", + JobStatus.TERMINATED: "grey", + JobStatus.ABORTED: "gold1", + JobStatus.FAILED: "indian_red1", + JobStatus.DONE: "grey", + } + color = color_map.get(job_status, "white") + status_style = f"bold {color}" if not job_status.is_finished() else color + formatted_status_message = f"[{status_style}]{status_message}[/]" + if verbose and job_submission.inactivity_secs: + inactive_for = format_duration_multiunit(job_submission.inactivity_secs) + formatted_status_message += f" (inactive for {inactive_for})" + return formatted_status_message + + +def _get_show_deployment_replica_job(run: CoreRun, verbose: bool) -> tuple[bool, bool, bool]: + show_deployment_num = ( + verbose and run.run_spec.configuration.type == "service" + ) or run.is_deployment_in_progress() + + replica_nums = {job.job_spec.replica_num for job in run.jobs} + show_replica = len(replica_nums) > 1 + + jobs_by_replica: Dict[int, List[Any]] = {} + for job in run.jobs: + replica_num = job.job_spec.replica_num + if replica_num not in jobs_by_replica: + jobs_by_replica[replica_num] = [] + jobs_by_replica[replica_num].append(job) + + show_job = any( + len({j.job_spec.job_num for j in jobs}) > 1 for jobs in jobs_by_replica.values() + ) + + return show_deployment_num, show_replica, show_job + + +def _format_job_name( + job: Job, + latest_job_submission: JobSubmission, + show_deployment_num: bool, + show_replica: bool, + show_job: bool, +) -> str: + name_parts = [] + if show_replica: + name_parts.append(f"replica={job.job_spec.replica_num}") + if show_job: + name_parts.append(f"job={job.job_spec.job_num}") + name_suffix = ( + f" deployment={latest_job_submission.deployment_num}" if show_deployment_num else "" + ) + name_value = " " + (" ".join(name_parts) if name_parts else "") + name_value += name_suffix + return name_value + + +def _format_price(price: float, is_spot: bool) -> str: + price_str = f"${price:.4f}".rstrip("0").rstrip(".") + if is_spot: + price_str += " (spot)" + return price_str + + +def _format_backend(backend: Any, region: str) -> str: + backend_str = getattr(backend, "value", backend) + backend_str = backend_str.replace("remote", "ssh") + return f"{backend_str} ({region})" + + +def _format_instance_type(jpd: JobProvisioningData, jrd: Optional[JobRuntimeData]) -> str: + instance_type = jpd.instance_type.name + if jrd is not None and getattr(jrd, "offer", None) is not None: + if jrd.offer.total_blocks > 1: + instance_type += f" ({jrd.offer.blocks}/{jrd.offer.total_blocks})" + if jpd.reservation: + instance_type += f" ({jpd.reservation})" + return instance_type + + +def _get_resources(jpd: JobProvisioningData, jrd: Optional[JobRuntimeData]) -> Resources: + resources: Resources = jpd.instance_type.resources + if jrd is not None and getattr(jrd, "offer", None) is not None: + resources = jrd.offer.instance.resources + return resources + + +def _format_run_name(run: CoreRun, show_deployment_num: bool) -> str: + parts: List[str] = [run.run_spec.run_name] + if show_deployment_num: + parts.append(f" [secondary]deployment={run.deployment_num}[/]") + return "".join(parts) + + def get_runs_table( runs: List[Run], verbose: bool = False, format_date: DateFormatter = pretty_date ) -> Table: table = Table(box=None, expand=shutil.get_terminal_size(fallback=(120, 40)).columns <= 110) table.add_column("NAME", style="bold", no_wrap=True, ratio=2) table.add_column("BACKEND", style="grey58", ratio=2) - table.add_column("RESOURCES", ratio=3 if not verbose else 2) if verbose: - table.add_column("INSTANCE TYPE", no_wrap=True, ratio=1) + table.add_column("RESOURCES", style="grey58", ratio=3) + table.add_column("INSTANCE TYPE", style="grey58", no_wrap=True, ratio=1) + else: + table.add_column("GPU", ratio=2) table.add_column("PRICE", style="grey58", ratio=1) table.add_column("STATUS", no_wrap=True, ratio=1) if verbose or any( @@ -205,22 +349,18 @@ def get_runs_table( for run in runs: run = run._run # TODO(egor-s): make public attribute - show_deployment_num = ( - verbose - and run.run_spec.configuration.type == "service" - or run.is_deployment_in_progress() + show_deployment_num, show_replica, show_job = _get_show_deployment_replica_job( + run, verbose ) merge_job_rows = len(run.jobs) == 1 and not show_deployment_num run_row: Dict[Union[str, int], Any] = { - "NAME": run.run_spec.run_name - + (f" [secondary]deployment={run.deployment_num}[/]" if show_deployment_num else ""), + "NAME": _format_run_name(run, show_deployment_num), "SUBMITTED": format_date(run.submitted_at), - "STATUS": ( - run.latest_job_submission.status_message - if run.status.is_finished() and run.latest_job_submission - else run.status_message - ), + "STATUS": _format_run_status(run), + "RESOURCES": "-", + "GPU": "-", + "PRICE": "-", } if run.error: run_row["ERROR"] = run.error @@ -229,46 +369,44 @@ def get_runs_table( for job in run.jobs: latest_job_submission = job.job_submissions[-1] - status = latest_job_submission.status.value - if verbose and latest_job_submission.inactivity_secs: - inactive_for = format_duration_multiunit(latest_job_submission.inactivity_secs) - status += f" (inactive for {inactive_for})" + status_formatted = _format_job_submission_status(latest_job_submission, verbose) + job_row: Dict[Union[str, int], Any] = { - "NAME": f" replica={job.job_spec.replica_num} job={job.job_spec.job_num}" - + ( - f" deployment={latest_job_submission.deployment_num}" - if show_deployment_num - else "" + "NAME": _format_job_name( + job, latest_job_submission, show_deployment_num, show_replica, show_job ), - "STATUS": latest_job_submission.status_message, + "STATUS": status_formatted, "PROBES": _format_job_probes( job.job_spec.probes, latest_job_submission.probes, latest_job_submission.status ), "SUBMITTED": format_date(latest_job_submission.submitted_at), "ERROR": latest_job_submission.error, + "RESOURCES": "-", + "GPU": "-", + "PRICE": "-", } jpd = latest_job_submission.job_provisioning_data if jpd is not None: - resources = jpd.instance_type.resources - instance_type = jpd.instance_type.name jrd = latest_job_submission.job_runtime_data - if jrd is not None and jrd.offer is not None: - resources = jrd.offer.instance.resources - if jrd.offer.total_blocks > 1: - instance_type += f" ({jrd.offer.blocks}/{jrd.offer.total_blocks})" - if jpd.reservation: - instance_type += f" ({jpd.reservation})" - job_row.update( - { - "BACKEND": f"{jpd.backend.value.replace('remote', 'ssh')} ({jpd.region})", - "RESOURCES": resources.pretty_format(include_spot=True), - "INSTANCE TYPE": instance_type, - "PRICE": f"${jpd.price:.4f}".rstrip("0").rstrip("."), - } - ) + resources = _get_resources(jpd, jrd) + update_dict: Dict[Union[str, int], Any] = { + "BACKEND": _format_backend(jpd.backend, jpd.region), + "RESOURCES": resources.pretty_format(include_spot=False), + "GPU": resources.pretty_format(gpu_only=True, include_spot=False), + "INSTANCE TYPE": _format_instance_type(jpd, jrd), + "PRICE": _format_price(jpd.price, resources.spot), + } + job_row.update(update_dict) if merge_job_rows: - # merge rows + _status = job_row["STATUS"] + _resources = job_row["RESOURCES"] + _gpu = job_row["GPU"] + _price = job_row["PRICE"] job_row.update(run_row) + job_row["RESOURCES"] = _resources + job_row["GPU"] = _gpu + job_row["PRICE"] = _price + job_row["STATUS"] = _status add_row_from_dict(table, job_row, style="secondary" if len(run.jobs) != 1 else None) return table diff --git a/src/dstack/_internal/core/models/instances.py b/src/dstack/_internal/core/models/instances.py index 204b423c1..f814cdaea 100644 --- a/src/dstack/_internal/core/models/instances.py +++ b/src/dstack/_internal/core/models/instances.py @@ -83,7 +83,23 @@ def _pretty_format( gpus: List[Gpu], spot: bool, include_spot: bool = False, + gpu_only: bool = False, ) -> str: + if gpu_only: + if not gpus: + return "-" + gpu = gpus[0] + gpu_resources = { + "gpu_name": gpu.name, + "gpu_count": len(gpus), + } + if gpu.memory_mib > 0: + gpu_resources["gpu_memory"] = f"{gpu.memory_mib / 1024:.0f}GB" + output = pretty_resources(**gpu_resources) + if include_spot and spot: + output += " (spot)" + return output + resources = {} if cpus > 0: resources["cpus"] = cpus @@ -103,7 +119,7 @@ def _pretty_format( output += " (spot)" return output - def pretty_format(self, include_spot: bool = False) -> str: + def pretty_format(self, include_spot: bool = False, gpu_only: bool = False) -> str: return Resources._pretty_format( self.cpus, self.cpu_arch, @@ -112,6 +128,7 @@ def pretty_format(self, include_spot: bool = False) -> str: self.gpus, self.spot, include_spot, + gpu_only, ) diff --git a/src/tests/_internal/cli/utils/conftest.py b/src/tests/_internal/cli/utils/conftest.py new file mode 100644 index 000000000..0f87c88bd --- /dev/null +++ b/src/tests/_internal/cli/utils/conftest.py @@ -0,0 +1,19 @@ +from unittest.mock import Mock + +import pytest + +from dstack._internal.server.services.docker import ImageConfig, ImageConfigObject + + +@pytest.fixture +def image_config_mock(monkeypatch: pytest.MonkeyPatch) -> ImageConfig: + image_config = ImageConfig.parse_obj({"User": None, "Entrypoint": None, "Cmd": ["/bin/bash"]}) + monkeypatch.setattr( + "dstack._internal.server.services.jobs.configurators.base._get_image_config", + Mock(return_value=image_config), + ) + monkeypatch.setattr( + "dstack._internal.server.services.docker.get_image_config", + Mock(return_value=ImageConfigObject(config=image_config)), + ) + return image_config diff --git a/src/tests/_internal/cli/utils/test_run.py b/src/tests/_internal/cli/utils/test_run.py new file mode 100644 index 000000000..b824c001a --- /dev/null +++ b/src/tests/_internal/cli/utils/test_run.py @@ -0,0 +1,494 @@ +import re +from datetime import datetime, timezone +from typing import Optional +from unittest.mock import Mock + +import pytest +from rich.table import Table +from rich.text import Text +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload + +from dstack._internal.cli.utils.run import get_runs_table +from dstack._internal.core.models.backends.base import BackendType +from dstack._internal.core.models.configurations import ( + AnyRunConfiguration, + ServiceConfiguration, + TaskConfiguration, +) +from dstack._internal.core.models.instances import Disk, InstanceType, Resources +from dstack._internal.core.models.profiles import Profile +from dstack._internal.core.models.resources import Range +from dstack._internal.core.models.runs import ( + JobProvisioningData, + JobStatus, + JobTerminationReason, + RunStatus, +) +from dstack._internal.server.models import RunModel +from dstack._internal.server.services import encryption # noqa: F401 # import for side-effect +from dstack._internal.server.services.runs import run_model_to_run +from dstack._internal.server.testing.common import ( + create_job, + create_project, + create_repo, + create_run, + create_user, + get_job_provisioning_data, + get_run_spec, +) +from dstack.api import Run +from dstack.api.server import APIClient + + +def _strip_rich_markup(text: str) -> str: + return re.sub(r"\[[^\]]*\]([^\[]*)\[/[^\]]*\]", r"\1", text) + + +def get_table_cells(table: Table) -> list[dict[str, str]]: + rows = [] + + if not table.columns: + return rows + + num_rows = len(table.columns[0]._cells) + + for row_idx in range(num_rows): + row = {} + for col in table.columns: + col_name = str(col.header) + if row_idx < len(col._cells): + cell_value = col._cells[row_idx] + if isinstance(cell_value, Text): + row[col_name] = cell_value.plain + else: + text = str(cell_value) + row[col_name] = _strip_rich_markup(text) + else: + row[col_name] = "" + rows.append(row) + + return rows + + +def get_table_cell_style(table: Table, column_name: str, row_idx: int = 0) -> Optional[str]: + for col in table.columns: + if str(col.header) == column_name: + if row_idx < len(col._cells): + cell_value = col._cells[row_idx] + if isinstance(cell_value, Text): + return str(cell_value.style) if cell_value.style else None + text = str(cell_value) + match = re.search(r"\[([^\]]+)\][^\[]*\[/\]", text) + if match: + return match.group(1) + return None + return None + + +async def create_run_with_job( + session: AsyncSession, + run_name: str = "test-run", + run_status: Optional[RunStatus] = None, + job_status: JobStatus = JobStatus.RUNNING, + configuration: Optional[AnyRunConfiguration] = None, + job_provisioning_data: Optional[JobProvisioningData] = None, + termination_reason: Optional[JobTerminationReason] = None, + exit_status: Optional[int] = None, + submitted_at: Optional[datetime] = None, +) -> Run: + if submitted_at is None: + submitted_at = datetime(2023, 1, 2, 3, 4, 5, tzinfo=timezone.utc) + + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + + if configuration is None: + configuration = TaskConfiguration( + type="task", + image="ubuntu:latest", + commands=["echo hello"], + ) + + if run_status is None: + if job_status == JobStatus.DONE: + run_status = RunStatus.DONE + elif job_status == JobStatus.FAILED: + run_status = RunStatus.FAILED + elif job_status in [JobStatus.TERMINATED, JobStatus.ABORTED]: + run_status = RunStatus.TERMINATED + elif job_status == JobStatus.TERMINATING: + run_status = RunStatus.TERMINATING + elif job_status == JobStatus.PROVISIONING: + run_status = RunStatus.PROVISIONING + elif job_status == JobStatus.PULLING: + run_status = RunStatus.PROVISIONING + else: + run_status = RunStatus.RUNNING + + run_spec = get_run_spec( + run_name=run_name, + repo_id=repo.name, + profile=Profile(name="default"), + configuration=configuration, + ) + + run_model_db = await create_run( + session=session, + project=project, + repo=repo, + user=user, + run_name=run_name, + run_spec=run_spec, + status=run_status, + submitted_at=submitted_at, + ) + + if job_provisioning_data is None: + resources = Resources( + cpus=2, + memory_mib=4096, + gpus=[], + spot=False, + disk=Disk(size_mib=102400), + ) + instance_type = InstanceType(name="t2.medium", resources=resources) + job_provisioning_data = get_job_provisioning_data( + backend=BackendType.AWS, + region="us-east-1", + cpu_count=2, + memory_gib=4, + spot=False, + hostname="1.2.3.4", + price=0.0464, + instance_type=instance_type, + ) + + job_model = await create_job( + session=session, + run=run_model_db, + status=job_status, + submitted_at=submitted_at, + last_processed_at=submitted_at, + job_provisioning_data=job_provisioning_data, + termination_reason=termination_reason, + ) + + if exit_status is not None: + job_model.exit_status = exit_status + await session.commit() + + await session.refresh(run_model_db) + + res = await session.execute( + select(RunModel).where(RunModel.id == run_model_db.id).options(selectinload(RunModel.jobs)) + ) + run_model_db = res.scalar_one() + + run_model = run_model_to_run(run_model_db) + + return Run( + api_client=Mock(spec=APIClient), + project=project.name, + run=run_model, + ) + + +pytestmark = [ + pytest.mark.asyncio, + pytest.mark.usefixtures("test_db", "image_config_mock"), + pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True), +] + + +class TestGetRunsTable: + async def test_simple_run(self, session: AsyncSession): + api_run = await create_run_with_job(session=session) + table = get_runs_table([api_run], verbose=False) + + cells = get_table_cells(table) + assert len(cells) == 1 + row = cells[0] + + assert row["NAME"] == "test-run" + assert row["BACKEND"] == "aws (us-east-1)" + assert row["GPU"] == "-" + assert row["PRICE"] == "$0.0464" + assert row["STATUS"] == "running" + assert row["SUBMITTED"] == "3 years ago" + + name_column = next(col for col in table.columns if str(col.header) == "NAME") + assert name_column.style == "bold" + + status_style = get_table_cell_style(table, "STATUS", 0) + assert status_style == "bold sea_green3" + + @pytest.mark.parametrize( + "job_status,termination_reason,exit_status,expected_status,expected_style", + [ + (JobStatus.DONE, None, None, "exited (0)", "grey"), + ( + JobStatus.FAILED, + JobTerminationReason.CONTAINER_EXITED_WITH_ERROR, + 1, + "exited (1)", + "indian_red1", + ), + ( + JobStatus.FAILED, + JobTerminationReason.CONTAINER_EXITED_WITH_ERROR, + 42, + "exited (42)", + "indian_red1", + ), + ( + JobStatus.FAILED, + JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, + None, + "no offers", + "gold1", + ), + ( + JobStatus.FAILED, + JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY, + None, + "interrupted", + "gold1", + ), + ( + JobStatus.FAILED, + JobTerminationReason.INSTANCE_UNREACHABLE, + None, + "error", + "indian_red1", + ), + ( + JobStatus.TERMINATED, + JobTerminationReason.TERMINATED_BY_USER, + None, + "stopped", + "grey", + ), + (JobStatus.TERMINATED, JobTerminationReason.ABORTED_BY_USER, None, "aborted", "grey"), + (JobStatus.RUNNING, None, None, "running", "bold sea_green3"), + (JobStatus.PROVISIONING, None, None, "provisioning", "bold deep_sky_blue1"), + (JobStatus.PULLING, None, None, "pulling", "bold sea_green3"), + (JobStatus.TERMINATING, None, None, "terminating", "bold deep_sky_blue1"), + ], + ) + async def test_status_messages( + self, + session: AsyncSession, + job_status: JobStatus, + termination_reason: Optional[JobTerminationReason], + exit_status: Optional[int], + expected_status: str, + expected_style: str, + ): + api_run = await create_run_with_job( + session=session, + job_status=job_status, + termination_reason=termination_reason, + exit_status=exit_status, + ) + + table = get_runs_table([api_run], verbose=False) + cells = get_table_cells(table) + + assert len(cells) == 1 + assert cells[0]["STATUS"] == expected_status + + status_style = get_table_cell_style(table, "STATUS", 0) + assert status_style == expected_style + + async def test_multi_node_task_with_multiple_jobs(self, session: AsyncSession): + # Verifies that a multi-node task with 3 jobs (all same replica_num=0, different job_num=0,1,2) + # displays only job= in table rows, not replica=, since all jobs share the same replica. + # Expected: 4 rows total (1 run header + 3 job rows with job=0,1,2). + submitted_at = datetime(2023, 1, 2, 3, 4, 5, tzinfo=timezone.utc) + + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + + configuration = TaskConfiguration( + type="task", + image="ubuntu:latest", + commands=["echo hello"], + nodes=3, + ) + + run_spec = get_run_spec( + run_name="multi-node-run", + repo_id=repo.name, + profile=Profile(name="default"), + configuration=configuration, + ) + + run_model_db = await create_run( + session=session, + project=project, + repo=repo, + user=user, + run_name="multi-node-run", + run_spec=run_spec, + status=RunStatus.RUNNING, + submitted_at=submitted_at, + ) + + resources = Resources( + cpus=2, + memory_mib=4096, + gpus=[], + spot=False, + disk=Disk(size_mib=102400), + ) + instance_type = InstanceType(name="t2.medium", resources=resources) + job_provisioning_data = get_job_provisioning_data( + backend=BackendType.AWS, + region="us-east-1", + cpu_count=2, + memory_gib=4, + spot=False, + hostname="1.2.3.4", + price=0.0464, + instance_type=instance_type, + ) + + for job_num in range(3): + await create_job( + session=session, + run=run_model_db, + status=JobStatus.RUNNING, + submitted_at=submitted_at, + last_processed_at=submitted_at, + job_provisioning_data=job_provisioning_data, + replica_num=0, + job_num=job_num, + ) + + await session.refresh(run_model_db) + + res = await session.execute( + select(RunModel) + .where(RunModel.id == run_model_db.id) + .options(selectinload(RunModel.jobs)) + ) + run_model_db = res.scalar_one() + + run_model = run_model_to_run(run_model_db) + + api_run = Run( + api_client=Mock(spec=APIClient), + project=project.name, + run=run_model, + ) + + table = get_runs_table([api_run], verbose=False) + cells = get_table_cells(table) + + assert len(cells) == 4 + assert cells[0]["NAME"] == "multi-node-run" + + for i in range(1, 4): + job_row = cells[i] + assert "replica=" not in job_row["NAME"] + assert f"job={i - 1}" in job_row["NAME"] + assert job_row["STATUS"] == "running" + + async def test_service_with_multiple_replicas_and_jobs(self, session: AsyncSession): + # Verifies that a service with 3 replicas and 1 job per replica displays replica= but not job= + # in table rows (since there's only one job per replica). Expected: 4 rows total (1 run header + 3 job rows). + submitted_at = datetime(2023, 1, 2, 3, 4, 5, tzinfo=timezone.utc) + + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + + configuration = ServiceConfiguration( + type="service", + image="ubuntu:latest", + commands=["echo hello"], + port=8000, + replicas=Range[int](min=3, max=3), + ) + + run_spec = get_run_spec( + run_name="service-run", + repo_id=repo.name, + profile=Profile(name="default"), + configuration=configuration, + ) + + run_model_db = await create_run( + session=session, + project=project, + repo=repo, + user=user, + run_name="service-run", + run_spec=run_spec, + status=RunStatus.RUNNING, + submitted_at=submitted_at, + ) + + resources = Resources( + cpus=2, + memory_mib=4096, + gpus=[], + spot=False, + disk=Disk(size_mib=102400), + ) + instance_type = InstanceType(name="t2.medium", resources=resources) + job_provisioning_data = get_job_provisioning_data( + backend=BackendType.AWS, + region="us-east-1", + cpu_count=2, + memory_gib=4, + spot=False, + hostname="1.2.3.4", + price=0.0464, + instance_type=instance_type, + ) + + for replica_num in range(3): + await create_job( + session=session, + run=run_model_db, + status=JobStatus.RUNNING, + submitted_at=submitted_at, + last_processed_at=submitted_at, + job_provisioning_data=job_provisioning_data, + replica_num=replica_num, + job_num=0, + ) + + await session.refresh(run_model_db) + + res = await session.execute( + select(RunModel) + .where(RunModel.id == run_model_db.id) + .options(selectinload(RunModel.jobs)) + ) + run_model_db = res.scalar_one() + + run_model = run_model_to_run(run_model_db) + + api_run = Run( + api_client=Mock(spec=APIClient), + project=project.name, + run=run_model, + ) + + table = get_runs_table([api_run], verbose=False) + cells = get_table_cells(table) + + assert len(cells) == 4 + assert cells[0]["NAME"] == "service-run" + + for i in range(1, 4): + job_row = cells[i] + assert f"replica={i - 1}" in job_row["NAME"] + assert "job=" not in job_row["NAME"] + assert job_row["STATUS"] == "running"