Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling AvailableData entries that end up as RemoteData #132

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/sirocco/core/graph_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def from_config(cls, config: ConfigBaseData, coordinates: dict) -> AvailableData
data_class = AvailableData if isinstance(config, ConfigAvailableData) else GeneratedData
return data_class(
name=config.name,
computer=config.computer,
type=config.type,
src=config.src,
coordinates=coordinates,
Expand Down
78 changes: 76 additions & 2 deletions src/sirocco/workgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def _prepare_for_shell_task(task: dict, inputs: dict) -> dict:

# Finally, we update the original `inputs` with the modified ones from the call to `prepare_shell_job_inputs`
inputs = {**inputs, **aiida_shell_inputs}

inputs.setdefault("metadata", {})
inputs["metadata"].update({"call_link_label": task["name"]})

Expand All @@ -61,11 +60,25 @@ def _prepare_for_shell_task(task: dict, inputs: dict) -> dict:
task_outputs = task_outputs.union(set(inputs.pop("outputs", [])))
missing_outputs = task_outputs.difference(default_outputs)
inputs["outputs"] = list(missing_outputs)

# NOTE: Hardcoded for now, possibly make user-facing option
inputs["metadata"]["options"]["use_symlinks"] = True

# Workaround ends here
# FIXME: The `GeneratedData.name` ends up as the actual filename
# try:
# computer = inputs['code'].computer
# with computer.get_transport() as transport:
# remote_is_file = transport.isfile(path=data.src)
# except:
# raise

# import ipdb; ipdb.set_trace()

return inputs



aiida_workgraph.engine.utils.prepare_for_shell_task = _prepare_for_shell_task


Expand Down Expand Up @@ -187,13 +200,29 @@ def _add_aiida_input_data_node(self, data: core.Data):
data_path = Path(data.src)
data_full_path = data.src if data_path.is_absolute() else self._core_workflow.config_rootdir / data_path

# ? Explicitly check for data type "remote" ?
if data.computer is not None:
try:
computer = aiida.orm.load_computer(data.computer)
except NotExistent as err:
msg = f"Could not find computer {data.computer!r} for input {data}."
raise ValueError(msg) from err
self._aiida_data_nodes[label] = aiida.orm.RemoteData(remote_path=data.src, label=label, computer=computer)

with computer.get_transport() as transport:
remote_is_file = transport.isfile(path=data.src)

# ? Is the `label` being used to construct the symlink
# TODO: If file provided as input, set the filename as label
# TODO: What to do for Folder
if not remote_is_file:
remote_data = aiida.orm.RemoteData(remote_path=data.src, label=label, computer=computer)
else:
# FIXME: Currently behavior of both cases is the same. Should this be handled in `aiida-shell` ???
# remote_data = aiida.orm.RemoteData(remote_path=str(Path(data.src).parent), label=label, computer=computer)
remote_data = aiida.orm.RemoteData(remote_path=data.src, label=label, computer=computer)

self._aiida_data_nodes[label] = remote_data

elif data.type == "file":
self._aiida_data_nodes[label] = aiida.orm.SinglefileData(label=label, file=data_full_path)
elif data.type == "dir":
Expand Down Expand Up @@ -230,6 +259,8 @@ def _create_shell_task_node(self, task: core.ShellTask):
# metadata
metadata: dict[str, Any] = {}
## Source file
# FIXME: Paths are resolved to paths on the local machine if not given as absolute paths. config_rootdir points
# to local path, so this breaks on the remote
env_source_paths = [
env_source_path
if (env_source_path := Path(env_source_file)).is_absolute()
Expand Down Expand Up @@ -317,6 +348,49 @@ def _set_shelljob_arguments(self, task: core.ShellTask):
_, arguments = self.split_cmd_arg(task.resolve_ports(input_labels))
workgraph_task_arguments.value = arguments

# filenames = []
# FIXME: This currently breaks...
# ipdb> input_socket
# SocketAny(name='icon_restart_date_2026_01_01_00_00_00', value=None)
# ipdb> workgraph_task
# DecoratedNode(name='icon_date_2026_03_01_00_00_00', properties=[], inputs=['metadata', 'code', 'monitors', 'remote_folder', 'nodes', 'filenames', 'arguments', 'outputs', 'parser', '_wait', 'command', 'resolve_command'], outputs=['remote_folder', 'remote_stash', 'retrieved', '_wait', '_outputs', 'stdout', 'stderr', 'icon_output', 'restart'])
# ipdb> workgraph_task.inputs
# NodeSocketNamespace(name='inputs', sockets=['metadata', 'code', 'monitors', 'remote_folder', 'nodes', 'filenames', 'arguments', 'outputs', 'parser', '_wait', 'command', 'resolve_command'])
# ipdb> workgraph_task.inputs.nodes
# TaskSocketNamespace(name='nodes', sockets=['icon_namelist', 'icon_restart_date_2026_01_01_00_00_00'])
# for input_socket in workgraph_task.inputs.nodes:
# try:
# aiida_data_node = self._aiida_data_nodes[input_socket._name]
# except:
# # Fails for non-existing data nodes
# # Somehow restrict to only GeneratedData
# continue
# # import ipdb; ipdb.set_trace()

# if isinstance(aiida_data_node, aiida.orm.FolderData):
# pass
# elif isinstance(aiida_data_node, aiida.orm.SinglefileData):
# filenames.append(aiida_data_node.filename)
# pass
# elif isinstance(aiida_data_node, aiida.orm.RemoteData):
# try:
# # This fails if it is a file
# # Could also use `.is_empty`, but seems more fragile
# aiida_data_node.listdir()
# remote_is_file = False
# except OSError:
# remote_is_file = True

# if remote_is_file:
# filenames.append(Path(aiida_data_node.get_remote_path()).name)
# else:
# raise Exception

# setattr(workgraph_task.inputs, "filenames", filenames)

# import ipdb; ipdb.set_trace()


def run(
self,
inputs: None | dict[str, Any] = None,
Expand Down
30 changes: 30 additions & 0 deletions tests/cases/small/config/calcinfo.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"codes_info": [
{
"code_uuid": "5f4a3f0d-160e-4f1d-b778-e8d584ce1b0f",
"cmdline_params": ["--restart", "--init", "initial_cond"],
"stdin_name": null,
"stdout_name": "stdout",
"stderr_name": "stderr"
}
],
"append_text": "echo $? > status",
"remote_copy_list": [],
"remote_symlink_list": [
[
"da8379b9-0b77-44ad-9dc2-6b9a03d7ca2b",
"/mnt/home/geiger_j/test/sirocco/data/initial_conditions/*",
"."
]
],
"retrieve_temporary_list": [
"restart",
"icon_output",
"status",
"stderr",
"stdout"
],
"provenance_exclude_list": ["input"],
"file_copy_operation_order": [1, 0, 2],
"uuid": "7683f368-c4e5-4be0-964e-28d58b88dc60"
}
65 changes: 65 additions & 0 deletions tests/cases/small/config/config-remote.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
---
start_date: &root_start_date '2026-01-01T00:00'
stop_date: &root_stop_date '2026-06-01T00:00'
cycles:
- bimonthly_tasks:
cycling:
start_date: *root_start_date
stop_date: *root_stop_date
period: P2M
tasks:
- icon:
inputs:
- icon_namelist:
port: UNUSED
- initial_cond:
when:
at: *root_start_date
port:
init
- icon_restart:
when:
after: *root_start_date
target_cycle:
lag: -P2M
port: restart
outputs: [icon_output, icon_restart]
- lastly:
tasks:
- cleanup:
wait_on:
- icon:
target_cycle:
date: 2026-05-01T00:00
tasks:
- icon:
computer: thor
plugin: shell
# src: scripts/icon.py
src: /mnt/home/geiger_j/test/sirocco/icon.py
command: "icon.py --restart {PORT::restart} --init {PORT::init}"
# env_source_files: data/dummy_source_file.sh
- cleanup:
computer: thor
plugin: shell
# src: scripts/cleanup.py
src: /mnt/home/geiger_j/test/sirocco/cleanup.py
command: "cleanup.py"
data:
available:
- icon_namelist:
type: file
src: data/input
- initial_cond:
type: file
computer: thor
# src: data/initial_conditions
# src: /mnt/home/geiger_j/test/sirocco/data
src: /mnt/home/geiger_j/test/sirocco/data/initial_conditions
generated:
- icon_output:
type: file
src: icon_output
- icon_restart:
type: file
src: restart
3 changes: 2 additions & 1 deletion tests/cases/small/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ data:
available:
- icon_namelist:
type: file
src: data/input
# src: data/input
src: ICON/NAMELIST_exclaim_ape_R02B04
- initial_conditions:
type: file
computer: localhost
Expand Down
3 changes: 3 additions & 0 deletions tests/cases/small/config/data/dummy_source_file.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#! /bin/bash

:
Loading