diff --git a/simvue/api/objects/alert/fetch.py b/simvue/api/objects/alert/fetch.py index 05cc132d..ddda635f 100644 --- a/simvue/api/objects/alert/fetch.py +++ b/simvue/api/objects/alert/fetch.py @@ -150,4 +150,6 @@ def get( ), ) else: - raise RuntimeError(f"Unrecognised alert source '{_entry['source']}'") + raise RuntimeError( + f"Unrecognised alert source '{_entry['source']}' with data '{_entry}'" + ) diff --git a/simvue/api/objects/artifact/base.py b/simvue/api/objects/artifact/base.py index 9b656297..729fff07 100644 --- a/simvue/api/objects/artifact/base.py +++ b/simvue/api/objects/artifact/base.py @@ -31,8 +31,8 @@ Category = typing.Literal["code", "input", "output"] -UPLOAD_TIMEOUT: int = 30 -DOWNLOAD_TIMEOUT: int = 30 +UPLOAD_TIMEOUT_PER_MB: int = 1 +DOWNLOAD_TIMEOUT_PER_MB: int = 1 DOWNLOAD_CHUNK_SIZE: int = 8192 @@ -108,7 +108,7 @@ def on_reconnect(self, id_mapping: dict[str, str]) -> None: for id, category in _offline_staging.items(): self.attach_to_run(run_id=id_mapping[id], category=category) - def _upload(self, file: io.BytesIO) -> None: + def _upload(self, file: io.BytesIO, timeout: int, file_size: int) -> None: if self._offline: super().commit() return @@ -116,6 +116,13 @@ def _upload(self, file: io.BytesIO) -> None: if not (_url := self._staging.get("url")): return + if not timeout: + timeout = UPLOAD_TIMEOUT_PER_MB * file_size / 1024 / 1024 + + self._logger.debug( + f"Will wait for a period of {timeout}s for upload of file to complete." + ) + _name = self._staging["name"] _response = sv_post( @@ -123,6 +130,7 @@ def _upload(self, file: io.BytesIO) -> None: headers={}, params={}, is_json=False, + timeout=timeout, files={"file": file}, data=self._init_data.get("fields"), ) @@ -325,7 +333,9 @@ def download_content(self) -> typing.Generator[bytes, None, None]: f"Could not retrieve URL for artifact '{self._identifier}'" ) _response = sv_get( - f"{self.download_url}", timeout=DOWNLOAD_TIMEOUT, headers=None + f"{self.download_url}", + timeout=DOWNLOAD_TIMEOUT_PER_MB * self.size / 1024 / 1024, + headers=None, ) get_json_from_response( diff --git a/simvue/api/objects/artifact/file.py b/simvue/api/objects/artifact/file.py index 0bad9447..b2dfd078 100644 --- a/simvue/api/objects/artifact/file.py +++ b/simvue/api/objects/artifact/file.py @@ -37,6 +37,7 @@ def new( file_path: pydantic.FilePath, mime_type: str | None, metadata: dict[str, typing.Any] | None, + upload_timeout: int | None = None, offline: bool = False, **kwargs, ) -> Self: @@ -56,6 +57,8 @@ def new( the mime type for this file, else this is determined metadata : dict[str, Any] | None supply metadata information for this artifact + upload_timeout : int | None, optional + specify the timeout in seconds for upload offline : bool, optional whether to define this artifact locally, default is False @@ -100,6 +103,6 @@ def new( return _artifact with open(_file_orig_path, "rb") as out_f: - _artifact._upload(file=out_f) + _artifact._upload(file=out_f, timeout=upload_timeout, file_size=_file_size) return _artifact diff --git a/simvue/api/objects/artifact/object.py b/simvue/api/objects/artifact/object.py index de34ff60..efe1d9fa 100644 --- a/simvue/api/objects/artifact/object.py +++ b/simvue/api/objects/artifact/object.py @@ -39,6 +39,7 @@ def new( storage: str | None, obj: typing.Any, metadata: dict[str, typing.Any] | None, + upload_timeout: int | None = None, allow_pickling: bool = True, offline: bool = False, **kwargs, @@ -57,6 +58,8 @@ def new( object to serialize and upload metadata : dict[str, Any] | None supply metadata information for this artifact + upload_timeout : int | None, optional + specify the timeout in seconds for upload allow_pickling : bool, optional whether to allow the object to be pickled if no other serialization found. Default is True @@ -119,5 +122,11 @@ def new( if offline: return _artifact - _artifact._upload(file=io.BytesIO(_serialized)) + _file_data = io.BytesIO(_serialized) + + _artifact._upload( + file=_file_data, + timeout=upload_timeout, + file_size=len(_file_data.getbuffer()), + ) return _artifact diff --git a/simvue/api/request.py b/simvue/api/request.py index c8a76a31..fff056ed 100644 --- a/simvue/api/request.py +++ b/simvue/api/request.py @@ -68,6 +68,7 @@ def post( params: dict[str, str], data: typing.Any, is_json: bool = True, + timeout: int | None = None, files: dict[str, typing.Any] | None = None, ) -> requests.Response: """HTTP POST with retries @@ -102,7 +103,7 @@ def post( headers=headers, params=params, data=data_sent, - timeout=DEFAULT_API_TIMEOUT, + timeout=timeout, files=files, ) diff --git a/simvue/run.py b/simvue/run.py index c2419bb7..bc39c620 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -70,7 +70,6 @@ if typing.TYPE_CHECKING: from .factory.dispatch import DispatcherBaseClass -UPLOAD_TIMEOUT: int = 30 HEARTBEAT_INTERVAL: int = 60 RESOURCES_METRIC_PREFIX: str = "resources" diff --git a/tests/functional/test_scenarios.py b/tests/functional/test_scenarios.py index 9b9983c5..1a4042c4 100644 --- a/tests/functional/test_scenarios.py +++ b/tests/functional/test_scenarios.py @@ -1,11 +1,66 @@ +import pathlib import pytest import simvue import time import contextlib import random +import tempfile import threading from multiprocessing import Process, Manager +from simvue.api.objects.artifact.fetch import Artifact + + +@pytest.mark.scenario +@pytest.mark.parametrize( + "file_size", (1, 10, 100) +) +def test_large_file_upload(file_size: int, create_plain_run: tuple[simvue.Run, dict]) -> None: + FILE_SIZE_MB: int = file_size + run, _ = create_plain_run + run.update_metadata({"file_size_mb": file_size}) + _file = None + _temp_file_name = None + + try: + with tempfile.NamedTemporaryFile(mode="w+b", delete=False) as temp_f: + temp_f.seek(FILE_SIZE_MB * 1024 * 1024 - 1) + temp_f.write(b'\0') + temp_f.flush() + temp_f.seek(0) + temp_f.close() + _temp_file_name = temp_f.name + _input_file_size = pathlib.Path(f"{_temp_file_name}").stat().st_size + run.save_file(file_path=f"{temp_f.name}", category="output", name="test_large_file_artifact") + + run.close() + + client = simvue.Client() + + with tempfile.TemporaryDirectory() as tempd: + client.get_artifact_as_file( + run_id=run.id, + name="test_large_file_artifact", + output_dir=tempd + ) + + _file = next(pathlib.Path(tempd).glob("*")) + + # Assert the returned file size + assert _file.stat().st_size == _input_file_size + except Exception as e: + _run = simvue.Run() + _run.reconnect(run.id) + _run.set_status("failed") + raise e + finally: + if _file and _file.exists(): + _file.unlink() + if _temp_file_name and (_src := pathlib.Path(_temp_file_name)).exists(): + _src.unlink() + with contextlib.suppress(Exception): + Artifact.from_name("test_large_file_artifact", run_id=run.id).delete() + @pytest.mark.scenario def test_time_multi_run_create_threshold() -> None: