Skip to content

Kzscisoft/781 uploads silently fail resulting in 0b files #814

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion simvue/api/objects/alert/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,6 @@ def get(
),
)
else:
raise RuntimeError(f"Unrecognised alert source '{_entry['source']}'")
raise RuntimeError(
f"Unrecognised alert source '{_entry['source']}' with data '{_entry}'"
)
18 changes: 14 additions & 4 deletions simvue/api/objects/artifact/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@

Category = typing.Literal["code", "input", "output"]

UPLOAD_TIMEOUT: int = 30
DOWNLOAD_TIMEOUT: int = 30
UPLOAD_TIMEOUT_PER_MB: int = 1
DOWNLOAD_TIMEOUT_PER_MB: int = 1
DOWNLOAD_CHUNK_SIZE: int = 8192


Expand Down Expand Up @@ -108,21 +108,29 @@ def on_reconnect(self, id_mapping: dict[str, str]) -> None:
for id, category in _offline_staging.items():
self.attach_to_run(run_id=id_mapping[id], category=category)

def _upload(self, file: io.BytesIO) -> None:
def _upload(self, file: io.BytesIO, timeout: int, file_size: int) -> None:
if self._offline:
super().commit()
return

if not (_url := self._staging.get("url")):
return

if not timeout:
timeout = UPLOAD_TIMEOUT_PER_MB * file_size / 1024 / 1024

self._logger.debug(
f"Will wait for a period of {timeout}s for upload of file to complete."
)

_name = self._staging["name"]

_response = sv_post(
url=_url,
headers={},
params={},
is_json=False,
timeout=timeout,
files={"file": file},
data=self._init_data.get("fields"),
)
Expand Down Expand Up @@ -325,7 +333,9 @@ def download_content(self) -> typing.Generator[bytes, None, None]:
f"Could not retrieve URL for artifact '{self._identifier}'"
)
_response = sv_get(
f"{self.download_url}", timeout=DOWNLOAD_TIMEOUT, headers=None
f"{self.download_url}",
timeout=DOWNLOAD_TIMEOUT_PER_MB * self.size / 1024 / 1024,
headers=None,
)

get_json_from_response(
Expand Down
5 changes: 4 additions & 1 deletion simvue/api/objects/artifact/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def new(
file_path: pydantic.FilePath,
mime_type: str | None,
metadata: dict[str, typing.Any] | None,
upload_timeout: int | None = None,
offline: bool = False,
**kwargs,
) -> Self:
Expand All @@ -56,6 +57,8 @@ def new(
the mime type for this file, else this is determined
metadata : dict[str, Any] | None
supply metadata information for this artifact
upload_timeout : int | None, optional
specify the timeout in seconds for upload
offline : bool, optional
whether to define this artifact locally, default is False

Expand Down Expand Up @@ -100,6 +103,6 @@ def new(
return _artifact

with open(_file_orig_path, "rb") as out_f:
_artifact._upload(file=out_f)
_artifact._upload(file=out_f, timeout=upload_timeout, file_size=_file_size)

return _artifact
11 changes: 10 additions & 1 deletion simvue/api/objects/artifact/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def new(
storage: str | None,
obj: typing.Any,
metadata: dict[str, typing.Any] | None,
upload_timeout: int | None = None,
allow_pickling: bool = True,
offline: bool = False,
**kwargs,
Expand All @@ -57,6 +58,8 @@ def new(
object to serialize and upload
metadata : dict[str, Any] | None
supply metadata information for this artifact
upload_timeout : int | None, optional
specify the timeout in seconds for upload
allow_pickling : bool, optional
whether to allow the object to be pickled if no other
serialization found. Default is True
Expand Down Expand Up @@ -119,5 +122,11 @@ def new(
if offline:
return _artifact

_artifact._upload(file=io.BytesIO(_serialized))
_file_data = io.BytesIO(_serialized)

_artifact._upload(
file=_file_data,
timeout=upload_timeout,
file_size=len(_file_data.getbuffer()),
)
return _artifact
3 changes: 2 additions & 1 deletion simvue/api/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def post(
params: dict[str, str],
data: typing.Any,
is_json: bool = True,
timeout: int | None = None,
files: dict[str, typing.Any] | None = None,
) -> requests.Response:
"""HTTP POST with retries
Expand Down Expand Up @@ -102,7 +103,7 @@ def post(
headers=headers,
params=params,
data=data_sent,
timeout=DEFAULT_API_TIMEOUT,
timeout=timeout,
files=files,
)

Expand Down
1 change: 0 additions & 1 deletion simvue/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
if typing.TYPE_CHECKING:
from .factory.dispatch import DispatcherBaseClass

UPLOAD_TIMEOUT: int = 30
HEARTBEAT_INTERVAL: int = 60
RESOURCES_METRIC_PREFIX: str = "resources"

Expand Down
55 changes: 55 additions & 0 deletions tests/functional/test_scenarios.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,66 @@
import pathlib
import pytest
import simvue
import time
import contextlib
import random
import tempfile
import threading
from multiprocessing import Process, Manager

from simvue.api.objects.artifact.fetch import Artifact


@pytest.mark.scenario
@pytest.mark.parametrize(
"file_size", (1, 10, 100)
)
def test_large_file_upload(file_size: int, create_plain_run: tuple[simvue.Run, dict]) -> None:
FILE_SIZE_MB: int = file_size
run, _ = create_plain_run
run.update_metadata({"file_size_mb": file_size})
_file = None
_temp_file_name = None

try:
with tempfile.NamedTemporaryFile(mode="w+b", delete=False) as temp_f:
temp_f.seek(FILE_SIZE_MB * 1024 * 1024 - 1)
temp_f.write(b'\0')
temp_f.flush()
temp_f.seek(0)
temp_f.close()
_temp_file_name = temp_f.name
_input_file_size = pathlib.Path(f"{_temp_file_name}").stat().st_size
run.save_file(file_path=f"{temp_f.name}", category="output", name="test_large_file_artifact")

run.close()

client = simvue.Client()

with tempfile.TemporaryDirectory() as tempd:
client.get_artifact_as_file(
run_id=run.id,
name="test_large_file_artifact",
output_dir=tempd
)

_file = next(pathlib.Path(tempd).glob("*"))

# Assert the returned file size
assert _file.stat().st_size == _input_file_size
except Exception as e:
_run = simvue.Run()
_run.reconnect(run.id)
_run.set_status("failed")
raise e
finally:
if _file and _file.exists():
_file.unlink()
if _temp_file_name and (_src := pathlib.Path(_temp_file_name)).exists():
_src.unlink()
with contextlib.suppress(Exception):
Artifact.from_name("test_large_file_artifact", run_id=run.id).delete()


@pytest.mark.scenario
def test_time_multi_run_create_threshold() -> None:
Expand Down
Loading