diff --git a/.sqlx/query-96fb7017f39b94fdf6835c98ebdeae0e77ab4f03b11871d6cce8ade31d6fb8c7.json b/.sqlx/query-17b0ff689068143898c9c217439cbffe59e443af89bb375e9f7f306dc0f0321b.json similarity index 63% rename from .sqlx/query-96fb7017f39b94fdf6835c98ebdeae0e77ab4f03b11871d6cce8ade31d6fb8c7.json rename to .sqlx/query-17b0ff689068143898c9c217439cbffe59e443af89bb375e9f7f306dc0f0321b.json index b53342edf..7fbf780a7 100644 --- a/.sqlx/query-96fb7017f39b94fdf6835c98ebdeae0e77ab4f03b11871d6cce8ade31d6fb8c7.json +++ b/.sqlx/query-17b0ff689068143898c9c217439cbffe59e443af89bb375e9f7f306dc0f0321b.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "SELECT run_id FROM workflow_status WHERE id = ?", + "query": "SELECT run_id FROM workflow WHERE id = ?", "describe": { "columns": [ { @@ -16,5 +16,5 @@ false ] }, - "hash": "96fb7017f39b94fdf6835c98ebdeae0e77ab4f03b11871d6cce8ade31d6fb8c7" + "hash": "17b0ff689068143898c9c217439cbffe59e443af89bb375e9f7f306dc0f0321b" } diff --git a/.sqlx/query-17b609df5736bdd3a5f512c882c0fdf1eebd528d6c32cc4ecd13ff506a5fdfbf.json b/.sqlx/query-17b609df5736bdd3a5f512c882c0fdf1eebd528d6c32cc4ecd13ff506a5fdfbf.json deleted file mode 100644 index 529df5ce9..000000000 --- a/.sqlx/query-17b609df5736bdd3a5f512c882c0fdf1eebd528d6c32cc4ecd13ff506a5fdfbf.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n UPDATE workflow_status\n SET has_detected_need_to_run_completion_script = 0,\n is_canceled = 0,\n is_archived = 0\n WHERE id = ?\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 1 - }, - "nullable": [] - }, - "hash": "17b609df5736bdd3a5f512c882c0fdf1eebd528d6c32cc4ecd13ff506a5fdfbf" -} diff --git a/.sqlx/query-212dc8ede4808ff5a0f1f827e46247ac6a8796193c04e974b04280522bba9702.json b/.sqlx/query-212dc8ede4808ff5a0f1f827e46247ac6a8796193c04e974b04280522bba9702.json new file mode 100644 index 000000000..4f17d3240 --- /dev/null +++ b/.sqlx/query-212dc8ede4808ff5a0f1f827e46247ac6a8796193c04e974b04280522bba9702.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n UPDATE workflow\n SET is_canceled = 0,\n is_archived = 0,\n run_id = run_id + 1\n WHERE id = ?\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 1 + }, + "nullable": [] + }, + "hash": "212dc8ede4808ff5a0f1f827e46247ac6a8796193c04e974b04280522bba9702" +} diff --git a/.sqlx/query-b86c184a2b669f92255cbdb7be68e2c67898e947f0560d3b944cd70ca2258318.json b/.sqlx/query-2a0bf40600d0a6a28a8bc33f482d495efad48a853df95ab87a4b4ef421ce19e1.json similarity index 63% rename from .sqlx/query-b86c184a2b669f92255cbdb7be68e2c67898e947f0560d3b944cd70ca2258318.json rename to .sqlx/query-2a0bf40600d0a6a28a8bc33f482d495efad48a853df95ab87a4b4ef421ce19e1.json index 38513b895..5afbfc6e1 100644 --- a/.sqlx/query-b86c184a2b669f92255cbdb7be68e2c67898e947f0560d3b944cd70ca2258318.json +++ b/.sqlx/query-2a0bf40600d0a6a28a8bc33f482d495efad48a853df95ab87a4b4ef421ce19e1.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "SELECT run_id FROM workflow_status WHERE id = $1", + "query": "SELECT run_id FROM workflow WHERE id = $1", "describe": { "columns": [ { @@ -16,5 +16,5 @@ false ] }, - "hash": "b86c184a2b669f92255cbdb7be68e2c67898e947f0560d3b944cd70ca2258318" + "hash": "2a0bf40600d0a6a28a8bc33f482d495efad48a853df95ab87a4b4ef421ce19e1" } diff --git a/.sqlx/query-3a988070b4c92d180d18de847a61bdee811afa2abdb5224d8b5b279c6dbd5593.json b/.sqlx/query-3a988070b4c92d180d18de847a61bdee811afa2abdb5224d8b5b279c6dbd5593.json deleted file mode 100644 index fb3facebd..000000000 --- a/.sqlx/query-3a988070b4c92d180d18de847a61bdee811afa2abdb5224d8b5b279c6dbd5593.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "DELETE FROM workflow_status WHERE id = $1", - "describe": { - "columns": [], - "parameters": { - "Right": 1 - }, - "nullable": [] - }, - "hash": "3a988070b4c92d180d18de847a61bdee811afa2abdb5224d8b5b279c6dbd5593" -} diff --git a/.sqlx/query-67e128d9ce384ee620e239c70ef555a6b8d0ea2f39443f0bdf739778a1af49dd.json b/.sqlx/query-67e128d9ce384ee620e239c70ef555a6b8d0ea2f39443f0bdf739778a1af49dd.json deleted file mode 100644 index 877964eff..000000000 --- a/.sqlx/query-67e128d9ce384ee620e239c70ef555a6b8d0ea2f39443f0bdf739778a1af49dd.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "SQLite", - "query": "SELECT COUNT(*) as total FROM ro_crate_entity WHERE workflow_id = $1", - "describe": { - "columns": [ - { - "name": "total", - "ordinal": 0, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false - ] - }, - "hash": "67e128d9ce384ee620e239c70ef555a6b8d0ea2f39443f0bdf739778a1af49dd" -} diff --git a/.sqlx/query-6e47fe4e806c69a1630ad2b7fc0c49fd0f66d1471d906ee931bcd478b520600d.json b/.sqlx/query-6e47fe4e806c69a1630ad2b7fc0c49fd0f66d1471d906ee931bcd478b520600d.json deleted file mode 100644 index ff012e161..000000000 --- a/.sqlx/query-6e47fe4e806c69a1630ad2b7fc0c49fd0f66d1471d906ee931bcd478b520600d.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "SQLite", - "query": "SELECT id, file_id FROM ro_crate_entity WHERE workflow_id = $1 AND file_id IS NOT NULL", - "describe": { - "columns": [ - { - "name": "id", - "ordinal": 0, - "type_info": "Integer" - }, - { - "name": "file_id", - "ordinal": 1, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - true - ] - }, - "hash": "6e47fe4e806c69a1630ad2b7fc0c49fd0f66d1471d906ee931bcd478b520600d" -} diff --git a/.sqlx/query-985c3a022488eab321c29d7ea1e1480b17c768a464fa67f8c68675a49ef18c77.json b/.sqlx/query-985c3a022488eab321c29d7ea1e1480b17c768a464fa67f8c68675a49ef18c77.json deleted file mode 100644 index f12bc2ec9..000000000 --- a/.sqlx/query-985c3a022488eab321c29d7ea1e1480b17c768a464fa67f8c68675a49ef18c77.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n UPDATE workflow_status\n SET run_id = ?,\n has_detected_need_to_run_completion_script = ?,\n is_canceled = ?,\n is_archived = ?\n WHERE id = ?\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 5 - }, - "nullable": [] - }, - "hash": "985c3a022488eab321c29d7ea1e1480b17c768a464fa67f8c68675a49ef18c77" -} diff --git a/.sqlx/query-9bc5fffe7a49355df5e7d98fd23e0e452ac29673fa6f7d5025b96bf492394b1a.json b/.sqlx/query-9bc5fffe7a49355df5e7d98fd23e0e452ac29673fa6f7d5025b96bf492394b1a.json deleted file mode 100644 index d63066a18..000000000 --- a/.sqlx/query-9bc5fffe7a49355df5e7d98fd23e0e452ac29673fa6f7d5025b96bf492394b1a.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "db_name": "SQLite", - "query": "SELECT id, run_id, has_detected_need_to_run_completion_script, is_canceled, is_archived FROM workflow_status WHERE id = ?", - "describe": { - "columns": [ - { - "name": "id", - "ordinal": 0, - "type_info": "Integer" - }, - { - "name": "run_id", - "ordinal": 1, - "type_info": "Integer" - }, - { - "name": "has_detected_need_to_run_completion_script", - "ordinal": 2, - "type_info": "Integer" - }, - { - "name": "is_canceled", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "is_archived", - "ordinal": 4, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "9bc5fffe7a49355df5e7d98fd23e0e452ac29673fa6f7d5025b96bf492394b1a" -} diff --git a/.sqlx/query-a71074c980b1b05131a55552965d30a78b2b95b9cd53a470f033503a6318da34.json b/.sqlx/query-a71074c980b1b05131a55552965d30a78b2b95b9cd53a470f033503a6318da34.json new file mode 100644 index 000000000..08258c5e2 --- /dev/null +++ b/.sqlx/query-a71074c980b1b05131a55552965d30a78b2b95b9cd53a470f033503a6318da34.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "UPDATE workflow SET is_archived = ? WHERE id = ?", + "describe": { + "columns": [], + "parameters": { + "Right": 2 + }, + "nullable": [] + }, + "hash": "a71074c980b1b05131a55552965d30a78b2b95b9cd53a470f033503a6318da34" +} diff --git a/.sqlx/query-b7a95f913fbf7b6f499da3a808a30ae29e0959e82efea4ed4e4b805a874005f0.json b/.sqlx/query-b7a95f913fbf7b6f499da3a808a30ae29e0959e82efea4ed4e4b805a874005f0.json new file mode 100644 index 000000000..f11ee354a --- /dev/null +++ b/.sqlx/query-b7a95f913fbf7b6f499da3a808a30ae29e0959e82efea4ed4e4b805a874005f0.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n UPDATE workflow\n SET is_canceled = 1\n WHERE id = ?\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 1 + }, + "nullable": [] + }, + "hash": "b7a95f913fbf7b6f499da3a808a30ae29e0959e82efea4ed4e4b805a874005f0" +} diff --git a/.sqlx/query-5e3e16fe72e0f841ac4a1323472468d4f8726461526fd1d70f41149d1ddfea3c.json b/.sqlx/query-c3376e741935563a695871ef16db6ac3f7529b108df47290496a0c255507868d.json similarity index 64% rename from .sqlx/query-5e3e16fe72e0f841ac4a1323472468d4f8726461526fd1d70f41149d1ddfea3c.json rename to .sqlx/query-c3376e741935563a695871ef16db6ac3f7529b108df47290496a0c255507868d.json index bc12db23c..1f5381457 100644 --- a/.sqlx/query-5e3e16fe72e0f841ac4a1323472468d4f8726461526fd1d70f41149d1ddfea3c.json +++ b/.sqlx/query-c3376e741935563a695871ef16db6ac3f7529b108df47290496a0c255507868d.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n SELECT j.id, r.return_code\n FROM job j\n JOIN result r ON j.id = r.job_id\n JOIN workflow_status ws ON j.workflow_id = ws.id AND r.run_id = ws.run_id\n WHERE j.workflow_id = ?\n AND j.status IN (?, ?, ?, ?)\n AND j.unblocking_processed = 0\n ", + "query": "\n SELECT j.id, r.return_code\n FROM job j\n JOIN result r ON j.id = r.job_id\n JOIN workflow w ON j.workflow_id = w.id AND r.run_id = w.run_id\n WHERE j.workflow_id = ?\n AND j.status IN (?, ?, ?, ?)\n AND j.unblocking_processed = 0\n ", "describe": { "columns": [ { @@ -22,5 +22,5 @@ false ] }, - "hash": "5e3e16fe72e0f841ac4a1323472468d4f8726461526fd1d70f41149d1ddfea3c" + "hash": "c3376e741935563a695871ef16db6ac3f7529b108df47290496a0c255507868d" } diff --git a/.sqlx/query-d89de81f33fa3ad43f7f7c4f0393c3db2aabb55b6f57fde91eb8b109e6a6ac5c.json b/.sqlx/query-d89de81f33fa3ad43f7f7c4f0393c3db2aabb55b6f57fde91eb8b109e6a6ac5c.json new file mode 100644 index 000000000..950581f56 --- /dev/null +++ b/.sqlx/query-d89de81f33fa3ad43f7f7c4f0393c3db2aabb55b6f57fde91eb8b109e6a6ac5c.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT id, file_id, metadata\n FROM ro_crate_entity\n WHERE workflow_id = $1 AND file_id IS NOT NULL\n ", + "describe": { + "columns": [ + { + "name": "id", + "ordinal": 0, + "type_info": "Integer" + }, + { + "name": "file_id", + "ordinal": 1, + "type_info": "Integer" + }, + { + "name": "metadata", + "ordinal": 2, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + true, + false + ] + }, + "hash": "d89de81f33fa3ad43f7f7c4f0393c3db2aabb55b6f57fde91eb8b109e6a6ac5c" +} diff --git a/.sqlx/query-d9899cd7b0730f91426077f3f5cafbf596e9ba045ab3ad1f66a752d631600254.json b/.sqlx/query-d9899cd7b0730f91426077f3f5cafbf596e9ba045ab3ad1f66a752d631600254.json deleted file mode 100644 index 854b025f2..000000000 --- a/.sqlx/query-d9899cd7b0730f91426077f3f5cafbf596e9ba045ab3ad1f66a752d631600254.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n INSERT INTO ro_crate_entity (workflow_id, file_id, entity_id, entity_type, metadata)\n VALUES ($1, $2, $3, $4, $5)\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 5 - }, - "nullable": [] - }, - "hash": "d9899cd7b0730f91426077f3f5cafbf596e9ba045ab3ad1f66a752d631600254" -} diff --git a/.sqlx/query-ac67d0ea031b16210dee614899f30e1ed55a57a80f08b7cf0a27353611d2e0b8.json b/.sqlx/query-e13329ec5ca6682229d5c3ce42f02a8c62d633ce710194ecb7dab005f219bcf0.json similarity index 65% rename from .sqlx/query-ac67d0ea031b16210dee614899f30e1ed55a57a80f08b7cf0a27353611d2e0b8.json rename to .sqlx/query-e13329ec5ca6682229d5c3ce42f02a8c62d633ce710194ecb7dab005f219bcf0.json index 9233039bd..8092ab334 100644 --- a/.sqlx/query-ac67d0ea031b16210dee614899f30e1ed55a57a80f08b7cf0a27353611d2e0b8.json +++ b/.sqlx/query-e13329ec5ca6682229d5c3ce42f02a8c62d633ce710194ecb7dab005f219bcf0.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n INSERT INTO workflow\n (\n name,\n description,\n user,\n env,\n timestamp,\n compute_node_expiration_buffer_seconds,\n compute_node_wait_for_new_jobs_seconds,\n compute_node_ignore_workflow_completion,\n compute_node_wait_for_healthy_database_minutes,\n compute_node_min_time_for_new_jobs_seconds,\n resource_monitor_config,\n slurm_defaults,\n use_pending_failed,\n enable_ro_crate,\n project,\n metadata,\n status_id,\n slurm_config,\n execution_config\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)\n RETURNING rowid\n ", + "query": "\n INSERT INTO workflow\n (\n name,\n description,\n user,\n env,\n timestamp,\n compute_node_expiration_buffer_seconds,\n compute_node_wait_for_new_jobs_seconds,\n compute_node_ignore_workflow_completion,\n compute_node_wait_for_healthy_database_minutes,\n compute_node_min_time_for_new_jobs_seconds,\n resource_monitor_config,\n slurm_defaults,\n use_pending_failed,\n enable_ro_crate,\n project,\n metadata,\n slurm_config,\n execution_config,\n run_id,\n is_archived,\n is_canceled\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, 0, 0, 0)\n RETURNING rowid\n ", "describe": { "columns": [ { @@ -10,11 +10,11 @@ } ], "parameters": { - "Right": 19 + "Right": 18 }, "nullable": [ false ] }, - "hash": "ac67d0ea031b16210dee614899f30e1ed55a57a80f08b7cf0a27353611d2e0b8" + "hash": "e13329ec5ca6682229d5c3ce42f02a8c62d633ce710194ecb7dab005f219bcf0" } diff --git a/.sqlx/query-eff9aa80713f24c3e8514c09e6648695dbe5f474bacd1303c7f423abe091b774.json b/.sqlx/query-eff9aa80713f24c3e8514c09e6648695dbe5f474bacd1303c7f423abe091b774.json deleted file mode 100644 index 6da429ced..000000000 --- a/.sqlx/query-eff9aa80713f24c3e8514c09e6648695dbe5f474bacd1303c7f423abe091b774.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n INSERT INTO workflow_status\n (run_id, is_archived, is_canceled, has_detected_need_to_run_completion_script)\n VALUES (0, 0, 0, 0)\n RETURNING rowid\n ", - "describe": { - "columns": [ - { - "name": "id", - "ordinal": 0, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 0 - }, - "nullable": [ - false - ] - }, - "hash": "eff9aa80713f24c3e8514c09e6648695dbe5f474bacd1303c7f423abe091b774" -} diff --git a/.sqlx/query-f0c87c852c9acb25674a1cdde06ec105d8822b4cbc27823bc212e7fc11c8dc79.json b/.sqlx/query-f0c87c852c9acb25674a1cdde06ec105d8822b4cbc27823bc212e7fc11c8dc79.json new file mode 100644 index 000000000..abd58385b --- /dev/null +++ b/.sqlx/query-f0c87c852c9acb25674a1cdde06ec105d8822b4cbc27823bc212e7fc11c8dc79.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO ro_crate_entity\n (workflow_id, file_id, entity_id, entity_type, metadata)\n VALUES ($1, $2, $3, $4, $5)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 5 + }, + "nullable": [] + }, + "hash": "f0c87c852c9acb25674a1cdde06ec105d8822b4cbc27823bc212e7fc11c8dc79" +} diff --git a/docs/src/core/reference/parameterization.md b/docs/src/core/reference/parameterization.md index a537ef44c..a31ce96fd 100644 --- a/docs/src/core/reference/parameterization.md +++ b/docs/src/core/reference/parameterization.md @@ -200,6 +200,120 @@ user_data: experiment: "['baseline','ablation','full']" ``` +## Workflow Variables + +Workflow-level `variables` are constants that get substituted into every string field of the spec +before parameter expansion. Use them to remove repetition of fixed strings -- paths, account codes, +image tags, project IDs -- that appear across many jobs, files, schedulers, or env entries. + +Variables are not the same as `parameters`: + +- `variables` are constants. Each `{name}` reference is replaced once with the variable's value. The + number of jobs/files does not change. +- `parameters` are sweep dimensions. They expand into multiple instances via Cartesian (or zip) + product. + +The two mechanisms compose freely: a single command string can mix `{variable}` references and +`{parameter}` references. Variables resolve first; parameters drive expansion afterwards. + +### Basic Usage + +```yaml +name: variables_demo +variables: + data_root: /scratch/proj42 + results_root: /shared/proj42/results + project: proj42 + account: my_hpc_account + +env: + PROJECT: "{project}" + +jobs: + - name: prepare_inputs + command: "python prepare.py --in {data_root}/raw --out {data_root}/clean" + + # Variables compose with parameters: {data_root} is a constant, + # {i} drives expansion into 4 jobs. + - name: "train_{i:02d}" + command: "python train.py --shard {i} --in {data_root}/clean" + parameters: + i: "1:4" + +slurm_schedulers: + - name: shared_sched + account: "{account}" + partition: short + walltime: "01:00:00" + nodes: 1 +``` + +### Validation Rules + +- **Variable names must be valid identifiers** (`[A-Za-z_][A-Za-z0-9_]*`). +- **No collisions.** A variable name must not match any parameter name (at the workflow level or in + any job/file/user_data `parameters` map). Spec loading fails with an error pointing at the + offending name. +- **No undefined references.** A `{name}` token whose name is neither a variable nor a parameter is + rejected as a typo. Tokens with non-identifier inner text (e.g. `find ... {} \;` or JSON-like + fragments) are ignored. Shell-style `${...}` expansion (used by `${TORC_JOB_ID}`, + `${files.input.X}`, etc.) is left alone -- the workflow variables system only consumes bare + `{name}` tokens. +- **Variable values must be plain literal strings.** Any `{name}` template reference inside a + variable's value is rejected, whether it points at another variable, a parameter, or a typo. This + keeps semantics simple and deterministic. Compose at the use site instead: `command: "{base}/sub"` + rather than `inputs: "{base}/sub"`. Shell-style `${...}` expansion (e.g. `${HOME}`, + `${TORC_JOB_ID}`) is allowed in variable values -- it is preserved verbatim and expanded at + runtime, not by the spec loader. Note that you can still reference a variable from a parameter + range: `i: "1:{n_max}"` works because that's a parameter value, not a variable value. +- Variables apply everywhere a string appears in the spec, including descriptions, env values, + scheduler fields, action arguments, file paths, commands, and parameter range values. They do not + apply to identifier fields (`parameters` keys, `use_parameters` entries). + +### KDL Syntax + +```kdl +variables { + data_root "/scratch/proj42" + project "proj42" + account "my_hpc_account" +} + +job "train_{i:02d}" { + command "python train.py --shard {i} --in {data_root}/clean" + parameters { + i "1:4" + } +} +``` + +### JSON5 Syntax + +```json5 +{ + variables: { + data_root: "/scratch/proj42", + project: "proj42", + }, + jobs: [ + { + name: "train_{i:02d}", + command: "python train.py --shard {i} --in {data_root}/clean", + parameters: { i: "1:4" }, + }, + ], +} +``` + +### When to Reach for Variables vs. Shared Parameters + +Use `variables` for plain constants (single value, no expansion) -- they DRY up the spec without +changing job counts and don't require any opt-in field on each job or file. + +Use shared `parameters` with `use_parameters` (next section) when the same _sweep dimension_ drives +expansion across multiple jobs and files. Shared parameters are still the right tool for +hyperparameter sweeps. + ## Shared (Workflow-Level) Parameters Define parameters once at the workflow level and reuse them across multiple jobs and files using @@ -425,3 +539,5 @@ job "train_{dataset}_{model}" { 7. **Use selective inheritance** - Only inherit the parameters each job actually needs 8. **Use zip mode for paired parameters** - When parameters have a 1:1 correspondence, use `parameter_mode: zip` +9. **Use `variables` for repeated constants** - Lift fixed strings (paths, account codes, image + tags) into the workflow-level `variables` map rather than copying them across jobs and files diff --git a/docs/src/core/reference/workflow-spec.md b/docs/src/core/reference/workflow-spec.md index 74004cb4a..77a2f9a58 100644 --- a/docs/src/core/reference/workflow-spec.md +++ b/docs/src/core/reference/workflow-spec.md @@ -15,6 +15,7 @@ The top-level container for a complete workflow definition. | `project` | string | none | Project name or identifier for grouping workflows | | `metadata` | string | none | Arbitrary metadata as JSON string | | `parameters` | map\ | none | Shared parameters that can be used by jobs and files via `use_parameters` | +| `variables` | map\ | none | Workflow-level constants substituted via `{name}` into every string field | | `env` | map\ | none | Environment variables exported for every job in the workflow | | `jobs` | [[JobSpec](#jobspec)] | _required_ | Jobs that make up this workflow | | `files` | [[FileSpec](#filespec)] | none | Files associated with this workflow | diff --git a/examples/json/variables_demo.json b/examples/json/variables_demo.json new file mode 100644 index 000000000..ee4fbafef --- /dev/null +++ b/examples/json/variables_demo.json @@ -0,0 +1,76 @@ +{ + "name": "variables_demo", + "variables": { + "data_root": "/scratch/proj42", + "results_root": "/shared/proj42/results", + "project": "proj42", + "image_tag": "pytorch:2.4", + "account": "my_hpc_account" + }, + "env": { + "PROJECT": "{project}", + "IMAGE": "{image_tag}" + }, + "jobs": [ + { + "name": "prepare_inputs", + "command": "python prepare.py --in {data_root}/raw --out {data_root}/clean", + "resource_requirements": "small_job", + "output_files": ["clean_data"], + "scheduler": "shared_sched" + }, + { + "name": "train_{i:02d}", + "command": "python train.py --shard {i} --img {image_tag} --in {data_root}/clean --out {results_root}/run_{i:02d}.pt", + "resource_requirements": "gpu_job", + "depends_on": ["prepare_inputs"], + "parameters": { "i": "1:4" }, + "scheduler": "gpu_sched" + }, + { + "name": "aggregate", + "command": "python aggregate.py --in {results_root} --tag {project}", + "resource_requirements": "small_job", + "depends_on_regexes": ["^train_.*$"], + "scheduler": "shared_sched" + } + ], + "files": [ + { + "name": "clean_data", + "path": "{data_root}/clean/all.parquet" + } + ], + "resource_requirements": [ + { + "name": "small_job", + "num_cpus": 1, + "memory": "2g", + "runtime": "PT15M" + }, + { + "name": "gpu_job", + "num_cpus": 4, + "num_gpus": 1, + "memory": "16g", + "runtime": "PT2H" + } + ], + "slurm_schedulers": [ + { + "name": "shared_sched", + "account": "{account}", + "partition": "short", + "walltime": "01:00:00", + "nodes": 1 + }, + { + "name": "gpu_sched", + "account": "{account}", + "partition": "gpu", + "gres": "gpu:1", + "walltime": "04:00:00", + "nodes": 1 + } + ] +} diff --git a/examples/json/variables_demo.json5 b/examples/json/variables_demo.json5 new file mode 100644 index 000000000..077b31fa9 --- /dev/null +++ b/examples/json/variables_demo.json5 @@ -0,0 +1,72 @@ +// Workflow Variables Demo (JSON5) +// +// `variables` is a workflow-level map of constants substituted via {name} into +// every string field of the spec before parameter expansion. Use it to DRY up +// repeated paths, account codes, image tags, etc. +// +// Variables do NOT trigger expansion (a single substitution per occurrence). +// Variable names must not collide with any parameter name. +{ + name: "variables_demo", + variables: { + data_root: "/scratch/proj42", + results_root: "/shared/proj42/results", + project: "proj42", + image_tag: "pytorch:2.4", + account: "my_hpc_account", + }, + env: { + PROJECT: "{project}", + IMAGE: "{image_tag}", + }, + jobs: [ + { + name: "prepare_inputs", + command: "python prepare.py --in {data_root}/raw --out {data_root}/clean", + resource_requirements: "small_job", + output_files: ["clean_data"], + scheduler: "shared_sched", + }, + // Variables compose with parameters: {data_root} comes from variables, + // {i} comes from local parameters and drives expansion. + { + name: "train_{i:02d}", + command: "python train.py --shard {i} --img {image_tag} --in {data_root}/clean --out {results_root}/run_{i:02d}.pt", + resource_requirements: "gpu_job", + depends_on: ["prepare_inputs"], + parameters: { i: "1:4" }, + scheduler: "gpu_sched", + }, + { + name: "aggregate", + command: "python aggregate.py --in {results_root} --tag {project}", + resource_requirements: "small_job", + depends_on_regexes: ["^train_.*$"], + scheduler: "shared_sched", + }, + ], + files: [ + { name: "clean_data", path: "{data_root}/clean/all.parquet" }, + ], + resource_requirements: [ + { name: "small_job", num_cpus: 1, memory: "2g", runtime: "PT15M" }, + { name: "gpu_job", num_cpus: 4, num_gpus: 1, memory: "16g", runtime: "PT2H" }, + ], + slurm_schedulers: [ + { + name: "shared_sched", + account: "{account}", + partition: "short", + walltime: "01:00:00", + nodes: 1, + }, + { + name: "gpu_sched", + account: "{account}", + partition: "gpu", + gres: "gpu:1", + walltime: "04:00:00", + nodes: 1, + }, + ], +} diff --git a/examples/kdl/variables_demo.kdl b/examples/kdl/variables_demo.kdl new file mode 100644 index 000000000..83e07230c --- /dev/null +++ b/examples/kdl/variables_demo.kdl @@ -0,0 +1,79 @@ +// Workflow Variables Demo (KDL) +// +// `variables` is a workflow-level map of constants substituted via {name} into +// every string field of the spec before parameter expansion. Use it to DRY up +// repeated paths, account codes, image tags, etc. +// +// Variables do NOT trigger expansion (a single substitution per occurrence). +// Variable names must not collide with any parameter name. + +name "variables_demo" + +variables { + data_root "/scratch/proj42" + results_root "/shared/proj42/results" + project "proj42" + image_tag "pytorch:2.4" + account "my_hpc_account" +} + +env { + PROJECT "{project}" + IMAGE "{image_tag}" +} + +file "clean_data" path="{data_root}/clean/all.parquet" + +resource_requirements "small_job" { + num_cpus 1 + memory "2g" + runtime "PT15M" +} + +resource_requirements "gpu_job" { + num_cpus 4 + num_gpus 1 + memory "16g" + runtime "PT2H" +} + +slurm_scheduler "shared_sched" { + account "{account}" + partition "short" + walltime "01:00:00" + nodes 1 +} + +slurm_scheduler "gpu_sched" { + account "{account}" + partition "gpu" + gres "gpu:1" + walltime "04:00:00" + nodes 1 +} + +job "prepare_inputs" { + command "python prepare.py --in {data_root}/raw --out {data_root}/clean" + resource_requirements "small_job" + output_file "clean_data" + scheduler "shared_sched" +} + +// Variables compose with parameters: {data_root} comes from variables, +// {i} comes from local parameters and drives expansion. +job "train_{i:02d}" { + command "python train.py --shard {i} --img {image_tag} --in {data_root}/clean --out {results_root}/run_{i:02d}.pt" + resource_requirements "gpu_job" + depends_on "prepare_inputs" + parameters { + i "1:4" + } + scheduler "gpu_sched" +} + +job "aggregate" { + command "python aggregate.py --in {results_root} --tag {project}" + resource_requirements "small_job" + depends_on_regexes "^train_.*$" + scheduler "shared_sched" +} diff --git a/examples/yaml/variables_demo.yaml b/examples/yaml/variables_demo.yaml new file mode 100644 index 000000000..fc93b0e76 --- /dev/null +++ b/examples/yaml/variables_demo.yaml @@ -0,0 +1,81 @@ +# Workflow Variables Demo +# +# Demonstrates the workflow-level `variables` map. Variables are constants +# substituted via {name} into every string field of the spec before parameter +# expansion. Unlike `parameters`, variables do NOT trigger Cartesian expansion +# -- each {var_name} reference is replaced once with the variable's value. +# +# Use variables to DRY up paths, account codes, image tags, project IDs, and +# any other constant string that would otherwise be repeated across jobs, +# files, schedulers, and env entries. +# +# Note: variable names must NOT collide with any parameter name. + +name: variables_demo + +variables: + data_root: /scratch/proj42 + results_root: /shared/proj42/results + project: proj42 + image_tag: pytorch:2.4 + account: my_hpc_account + +env: + PROJECT: "{project}" + IMAGE: "{image_tag}" + +jobs: + - name: prepare_inputs + command: "python prepare.py --in {data_root}/raw --out {data_root}/clean" + resource_requirements: small_job + output_files: + - clean_data + scheduler: shared_sched + + # Variables compose with parameters: {data_root} comes from variables, + # {i} comes from this job's local parameters and drives expansion. + - name: "train_{i:02d}" + command: "python train.py --shard {i} --img {image_tag} --in {data_root}/clean --out {results_root}/run_{i:02d}.pt" + resource_requirements: gpu_job + depends_on: + - prepare_inputs + parameters: + i: "1:4" + scheduler: gpu_sched + + - name: aggregate + command: "python aggregate.py --in {results_root} --tag {project}" + resource_requirements: small_job + depends_on_regexes: + - "^train_.*$" + scheduler: shared_sched + +files: + - name: clean_data + path: "{data_root}/clean/all.parquet" + +resource_requirements: + - name: small_job + num_cpus: 1 + memory: 2g + runtime: PT15M + + - name: gpu_job + num_cpus: 4 + num_gpus: 1 + memory: 16g + runtime: PT2H + +slurm_schedulers: + - name: shared_sched + account: "{account}" + partition: short + walltime: "01:00:00" + nodes: 1 + + - name: gpu_sched + account: "{account}" + partition: gpu + gres: "gpu:1" + walltime: "04:00:00" + nodes: 1 diff --git a/src/client/workflow_spec.rs b/src/client/workflow_spec.rs index 4e8e1e234..c30af3642 100644 --- a/src/client/workflow_spec.rs +++ b/src/client/workflow_spec.rs @@ -975,6 +975,382 @@ impl ExecutionConfig { } } +/// Apply workflow-level `variables` substitution to every string in the spec value. +/// +/// Runs before `serde_json::from_value` so that all string fields -- including ones +/// that are not currently parameter-substituted -- benefit. The `variables` map is +/// preserved in the output Value for round-trip serialization. +/// +/// Skip rules: keys of `parameters` maps and entries of `use_parameters` arrays are +/// identifiers, not user-facing strings, so they are not substituted. +fn apply_workflow_variables( + mut value: serde_json::Value, +) -> Result> { + let serde_json::Value::Object(ref map) = value else { + return Ok(value); + }; + let Some(vars_value) = map.get("variables") else { + return Ok(value); + }; + let serde_json::Value::Object(vars_map) = vars_value else { + return Err("workflow `variables` must be an object of string key/value pairs".into()); + }; + if vars_map.is_empty() { + return Ok(value); + } + + let mut variables: HashMap = HashMap::with_capacity(vars_map.len()); + for (name, value) in vars_map { + if !is_identifier(name) { + return Err(format!( + "workflow variable name '{}' must be a valid identifier \ + ([A-Za-z_][A-Za-z0-9_]*). Rename the variable.", + name + ) + .into()); + } + let serde_json::Value::String(s) = value else { + return Err(format!( + "workflow variable '{}' must be a string (got {})", + name, + json_value_kind(value) + ) + .into()); + }; + variables.insert(name.clone(), ParameterValue::String(s.clone())); + } + + let mut parameter_names: HashSet = HashSet::new(); + collect_parameter_names(&value, &mut parameter_names); + let mut collisions: Vec<&String> = variables + .keys() + .filter(|name| parameter_names.contains(*name)) + .collect(); + if !collisions.is_empty() { + collisions.sort(); + return Err(format!( + "workflow `variables` collide with parameter names: {}. \ + Rename the variable(s) or the parameter(s) so each name appears in only one map.", + collisions + .iter() + .map(|n| n.as_str()) + .collect::>() + .join(", ") + ) + .into()); + } + + // Variable values must be plain literal strings: no `{...}` template + // references at all (shell-style `${...}` is allowed, since it is reserved + // for shell expansion and the `${files.input.X}` family). Allowing template + // references inside variable values would either (a) make resolution + // order-dependent when one variable references another (HashMap iteration + // is randomized and cycles would not be detected), or (b) leak unresolved + // parameter tokens into wherever the variable is used. Composition belongs + // at the use site -- e.g. `command: "{base}/{sub}"`, not + // `combo: "{base}/{sub}"`. + for (name, vars_value) in vars_map { + let serde_json::Value::String(s) = vars_value else { + continue; // shape already validated above; non-strings already errored + }; + check_variable_value_tokens(name, s, &variables)?; + } + + let valid_token_names: HashSet = variables + .keys() + .cloned() + .chain(parameter_names.iter().cloned()) + .collect(); + + substitute_variables_in_value(&mut value, &variables, &valid_token_names, false)?; + + Ok(value) +} + +/// Validate the tokens inside a workflow variable's value. +/// +/// Variable values must be plain literal strings: any `{name}` template +/// reference is rejected. Shell-style `${...}` is allowed (it is reserved for +/// shell expansion and for the `${files.input.X}` / `${user_data.input.X}` +/// substitution that runs later in the workflow lifecycle). +/// +/// The "references another variable" branch produces a more pointed error; +/// every other template reference (parameter names, undefined names) is +/// rejected with the same uniform "must be a literal" message. +fn check_variable_value_tokens( + var_name: &str, + s: &str, + variables: &HashMap, +) -> Result<(), Box> { + let bytes = s.as_bytes(); + let mut i = 0; + while i < bytes.len() { + if bytes[i] != b'{' { + i += 1; + continue; + } + if i > 0 && bytes[i - 1] == b'$' { + i += 1; + continue; + } + let start = i + 1; + let mut j = start; + while j < bytes.len() && bytes[j] != b'}' && bytes[j] != b'{' { + j += 1; + } + if j >= bytes.len() || bytes[j] != b'}' { + i += 1; + continue; + } + let inner = &s[start..j]; + let token_name = inner.split(':').next().unwrap_or(""); + if !is_identifier(token_name) { + i = j + 1; + continue; + } + return if variables.contains_key(token_name) { + Err(format!( + "variable '{}' value '{}' references another variable '{{{}}}'. \ + Variable values may not reference other variables; resolution \ + order would be undefined. Inline the constant or compose at the \ + use site.", + var_name, s, inner + ) + .into()) + } else { + Err(format!( + "variable '{}' value '{}' contains template reference '{{{}}}'. \ + Variable values must be plain literal strings (shell-style \ + `${{...}}` is allowed). Compose at the use site instead.", + var_name, s, inner + ) + .into()) + }; + } + Ok(()) +} + +fn json_value_kind(value: &serde_json::Value) -> &'static str { + match value { + serde_json::Value::Null => "null", + serde_json::Value::Bool(_) => "boolean", + serde_json::Value::Number(_) => "number", + serde_json::Value::String(_) => "string", + serde_json::Value::Array(_) => "array", + serde_json::Value::Object(_) => "object", + } +} + +/// Collect every parameter name declared anywhere in the spec value. +/// Looks at top-level `parameters`, and at `parameters` inside any object found +/// in the `jobs`, `files`, or `user_data` arrays. +fn collect_parameter_names(value: &serde_json::Value, out: &mut HashSet) { + let serde_json::Value::Object(map) = value else { + return; + }; + if let Some(serde_json::Value::Object(params)) = map.get("parameters") { + for k in params.keys() { + out.insert(k.clone()); + } + } + for field in ["jobs", "files", "user_data"] { + let Some(serde_json::Value::Array(items)) = map.get(field) else { + continue; + }; + for item in items { + let serde_json::Value::Object(item_map) = item else { + continue; + }; + if let Some(serde_json::Value::Object(params)) = item_map.get("parameters") { + for k in params.keys() { + out.insert(k.clone()); + } + } + } + } +} + +/// Recursively walk a JSON value, substituting `{var}` and `{var:fmt}` in every +/// string node. Tokens whose name matches a parameter (rather than a variable) +/// are left intact for later parameter expansion. Tokens whose name matches +/// neither are reported as undefined-variable errors. +/// +/// `inside_variables` is true when the caller has descended into the top-level +/// `variables` map; in that scope the values must not be touched (they are the +/// substitution source itself). +fn substitute_variables_in_value( + value: &mut serde_json::Value, + variables: &HashMap, + valid_token_names: &HashSet, + inside_variables: bool, +) -> Result<(), Box> { + match value { + serde_json::Value::String(s) => { + if !inside_variables { + check_undefined_tokens(s, valid_token_names)?; + *s = substitute_workflow_variables_in_string(s, variables); + } + Ok(()) + } + serde_json::Value::Array(items) => { + for item in items { + substitute_variables_in_value( + item, + variables, + valid_token_names, + inside_variables, + )?; + } + Ok(()) + } + serde_json::Value::Object(map) => { + for (key, child) in map.iter_mut() { + if inside_variables { + // Don't substitute inside the variables map -- those strings define + // the substitution source, not consumers of it. + continue; + } + if key == "variables" { + substitute_variables_in_value(child, variables, valid_token_names, true)?; + continue; + } + if key == "parameters" { + // Substitute only in the values of the parameters map; keys are + // identifiers and must remain untouched. + if let serde_json::Value::Object(params) = child { + for v in params.values_mut() { + substitute_variables_in_value(v, variables, valid_token_names, false)?; + } + } + continue; + } + if key == "use_parameters" { + // Identifiers, not user-facing strings. + continue; + } + substitute_variables_in_value(child, variables, valid_token_names, false)?; + } + Ok(()) + } + _ => Ok(()), + } +} + +/// Scan a string for `{name}` and `{name:fmt}` tokens; error if any token's name +/// does not appear in `valid_token_names`. Tokens with a non-identifier name +/// (e.g. format-only `{:>5}`, JSON-like `{"x": 1}`) are ignored on the assumption +/// they are unrelated to template substitution. +/// +/// `${...}` blocks are skipped: that syntax is reserved for shell-style variable +/// expansion and the repo's existing `${files.input.X}` / `${user_data.input.X}` +/// substitution. They are never treated as workflow variable references. +fn check_undefined_tokens( + s: &str, + valid_token_names: &HashSet, +) -> Result<(), Box> { + let bytes = s.as_bytes(); + let mut i = 0; + while i < bytes.len() { + if bytes[i] != b'{' { + i += 1; + continue; + } + // Skip `${...}` -- that's shell-style variable expansion, not a workflow + // variable reference. + if i > 0 && bytes[i - 1] == b'$' { + i += 1; + continue; + } + let start = i + 1; + let mut j = start; + while j < bytes.len() && bytes[j] != b'}' && bytes[j] != b'{' { + j += 1; + } + if j >= bytes.len() || bytes[j] != b'}' { + i += 1; + continue; + } + let inner = &s[start..j]; + let name = inner.split(':').next().unwrap_or(""); + if is_identifier(name) && !valid_token_names.contains(name) { + return Err(format!( + "undefined template name '{{{}}}' in '{}': not declared in `variables` \ + or any `parameters` map. Add it to `variables` or fix the typo.", + inner, s + ) + .into()); + } + i = j + 1; + } + Ok(()) +} + +fn is_identifier(s: &str) -> bool { + let mut chars = s.chars(); + let Some(first) = chars.next() else { + return false; + }; + if !(first.is_ascii_alphabetic() || first == '_') { + return false; + } + chars.all(|c| c.is_ascii_alphanumeric() || c == '_') +} + +/// Substitute workflow variables into a string in a single pass. +/// +/// Replaces `{name}` and `{name:fmt}` with the value of `variables[name]` when +/// `name` matches a key. Unmatched tokens (parameter names that get expanded +/// later, or tokens whose name is non-identifier text) are left intact. +/// +/// Critically, `${...}` blocks are skipped entirely so that shell-style +/// expansions like `${HOME}` or `${TORC_JOB_ID}` are preserved verbatim -- +/// even when a variable happens to share a name with a shell variable. This +/// is what `substitute_parameters` (used by parameter expansion) does *not* +/// guarantee, since it relies on naive `string.replace`. +fn substitute_workflow_variables_in_string( + s: &str, + variables: &HashMap, +) -> String { + let mut result = String::with_capacity(s.len()); + let bytes = s.as_bytes(); + let mut last_copied = 0usize; + let mut i = 0usize; + while i < bytes.len() { + if bytes[i] != b'{' { + i += 1; + continue; + } + if i > 0 && bytes[i - 1] == b'$' { + // Shell-style ${...}; do not substitute. + i += 1; + continue; + } + let start = i + 1; + let Some(rel_end) = s[start..].find('}') else { + i += 1; + continue; + }; + let inner_end = start + rel_end; + let inner = &s[start..inner_end]; + let (name, fmt) = match inner.split_once(':') { + Some((n, f)) => (n, Some(f)), + None => (inner, None), + }; + let Some(value) = variables.get(name) else { + // Not a workflow variable -- leave intact (it might be a parameter + // name that gets expanded later, or just literal text). + i = inner_end + 1; + continue; + }; + result.push_str(&s[last_copied..i]); + result.push_str(&value.format(fmt)); + i = inner_end + 1; + last_copied = i; + } + result.push_str(&s[last_copied..]); + result +} + /// Specification for a complete workflow #[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] #[serde(deny_unknown_fields)] @@ -991,6 +1367,12 @@ pub struct WorkflowSpec { /// Jobs/files can reference these by setting use_parameters to parameter names #[serde(skip_serializing_if = "Option::is_none")] pub parameters: Option>, + /// Workflow-level constants substituted into every string field of the spec. + /// Unlike `parameters`, variables do not trigger Cartesian expansion -- each + /// `{name}` reference is replaced once with the variable's value before the + /// spec is processed. Variable names must not collide with any parameter name. + #[serde(skip_serializing_if = "Option::is_none")] + pub variables: Option>, /// Environment variables exported for every job in the workflow #[serde(skip_serializing_if = "Option::is_none")] pub env: Option>, @@ -1067,6 +1449,7 @@ impl WorkflowSpec { user: Some(user), description, parameters: None, + variables: None, env: None, compute_node_expiration_buffer_seconds: None, compute_node_wait_for_new_jobs_seconds: None, @@ -1105,6 +1488,7 @@ impl WorkflowSpec { .into(), ); } + let value = apply_workflow_variables(value)?; Ok(serde_json::from_value(value)?) } @@ -4452,6 +4836,11 @@ impl WorkflowSpec { obj.insert("parameters".to_string(), params); } } + "variables" => { + if let Some(vars) = Self::kdl_string_map_to_json(node, "Variable")? { + obj.insert("variables".to_string(), vars); + } + } "env" => { if let Some(env) = Self::kdl_string_map_to_json(node, "Environment key")? { obj.insert("env".to_string(), env); @@ -4602,6 +4991,18 @@ impl WorkflowSpec { } lines.push("}".to_string()); } + // Variables (workflow-level constants) + if let Some(ref vars) = self.variables + && !vars.is_empty() + { + lines.push("variables {".to_string()); + let mut entries: Vec<_> = vars.iter().collect(); + entries.sort_by_key(|(left, _)| *left); + for (key, value) in entries { + lines.push(format!(" {} {}", key, kdl_escape(value))); + } + lines.push("}".to_string()); + } if let Some(ref env) = self.env && !env.is_empty() { @@ -5809,6 +6210,7 @@ job "train_lr{lr:.4f}_bs{batch_size}" { compute_node_ignore_workflow_completion: None, compute_node_wait_for_new_jobs_seconds: None, parameters: None, + variables: None, env: None, jobs: vec![JobSpec { name: "job_{i}".to_string(), @@ -7054,4 +7456,447 @@ resource_requirements: let spec: WorkflowSpec = serde_yaml::from_str(yaml).expect("Failed to parse YAML"); assert!(spec.validate_scheduler_resources().is_empty()); } + + #[test] + fn test_workflow_variables_substitute_into_strings() { + let yaml = r#" +name: vars_demo +variables: + base_path: /scratch/proj + image: pytorch:2.4 +jobs: + - name: train + command: "{base_path}/run.sh --img {image}" +files: + - name: out + path: "{base_path}/out.txt" +"#; + let spec = WorkflowSpec::from_spec_file_content(yaml, "yaml") + .expect("variables substitution should succeed"); + assert_eq!( + spec.jobs[0].command, + "/scratch/proj/run.sh --img pytorch:2.4" + ); + assert_eq!( + spec.files.as_ref().unwrap()[0].path, + "/scratch/proj/out.txt" + ); + // The variables map itself must be preserved for round-trip serialization. + let vars = spec.variables.expect("variables map preserved"); + assert_eq!( + vars.get("base_path").map(String::as_str), + Some("/scratch/proj") + ); + } + + #[test] + fn test_workflow_variables_combined_with_parameters() { + let yaml = r#" +name: vars_and_params +variables: + base_path: /scratch/proj +jobs: + - name: "job_{i:03d}" + command: "{base_path}/run.sh --idx {i}" + parameters: + i: "1:3" +"#; + let mut spec = WorkflowSpec::from_spec_file_content(yaml, "yaml") + .expect("variables + parameters should parse"); + spec.expand_parameters().expect("expansion should succeed"); + assert_eq!(spec.jobs.len(), 3); + assert_eq!(spec.jobs[0].name, "job_001"); + assert_eq!(spec.jobs[0].command, "/scratch/proj/run.sh --idx 1"); + assert_eq!(spec.jobs[2].name, "job_003"); + assert_eq!(spec.jobs[2].command, "/scratch/proj/run.sh --idx 3"); + } + + #[test] + fn test_workflow_variables_collide_with_parameter_name() { + let yaml = r#" +name: collision +variables: + i: not_an_index +jobs: + - name: "job_{i}" + command: "echo {i}" + parameters: + i: "1:3" +"#; + let err = WorkflowSpec::from_spec_file_content(yaml, "yaml") + .expect_err("collision must be rejected"); + let msg = err.to_string(); + assert!( + msg.contains("collide with parameter names"), + "expected collision error, got: {msg}" + ); + assert!(msg.contains('i'), "expected colliding name 'i', got: {msg}"); + } + + #[test] + fn test_workflow_variables_undefined_token_rejected() { + let yaml = r#" +name: typo +variables: + base_path: /scratch/proj +jobs: + - name: train + command: "{baes_path}/run.sh" +"#; + let err = WorkflowSpec::from_spec_file_content(yaml, "yaml") + .expect_err("undefined token must be rejected"); + let msg = err.to_string(); + assert!( + msg.contains("undefined template name"), + "expected undefined-token error, got: {msg}" + ); + assert!( + msg.contains("baes_path"), + "error should name the typo: {msg}" + ); + } + + #[test] + fn test_workflow_variables_substitute_into_parameter_value() { + let yaml = r#" +name: var_in_param_value +variables: + n_max: "5" +jobs: + - name: "job_{i}" + command: "echo {i}" + parameters: + i: "1:{n_max}" +"#; + let mut spec = WorkflowSpec::from_spec_file_content(yaml, "yaml") + .expect("variable inside parameter value should be substituted"); + spec.expand_parameters().expect("expansion should succeed"); + assert_eq!(spec.jobs.len(), 5); + } + + #[test] + fn test_workflow_variables_substitute_into_env_and_scheduler() { + let yaml = r#" +name: env_and_scheduler +variables: + proj: my_project +env: + PROJECT: "{proj}" +jobs: + - name: t + command: echo + scheduler: "{proj}_sched" +slurm_schedulers: + - name: "{proj}_sched" + account: "{proj}" + partition: short + walltime: "PT1H" + nodes: 1 +"#; + let spec = WorkflowSpec::from_spec_file_content(yaml, "yaml") + .expect("variables in env and scheduler should substitute"); + assert_eq!( + spec.env + .as_ref() + .and_then(|e| e.get("PROJECT")) + .map(String::as_str), + Some("my_project") + ); + assert_eq!(spec.jobs[0].scheduler.as_deref(), Some("my_project_sched")); + let sched = &spec.slurm_schedulers.as_ref().unwrap()[0]; + assert_eq!(sched.name.as_deref(), Some("my_project_sched")); + assert_eq!(sched.account, "my_project"); + } + + #[test] + fn test_workflow_variables_does_not_reject_shell_style_expansion() { + // `${...}` is shell-style variable expansion (and is also used by the + // existing `${files.input.X}` / `${TORC_*}` substitution). The + // workflow-level variables system must leave those alone, even when + // `variables` is set. + let yaml = r#" +name: shell_vars_ok +variables: + base_path: /scratch/proj +jobs: + - name: t + command: "echo ${TORC_JOB_ID} ${HOME} {base_path}/run.sh" + env: + OUT: "${TORC_JOB_ID}.log" +"#; + let spec = WorkflowSpec::from_spec_file_content(yaml, "yaml") + .expect("shell-style ${...} must not trigger undefined-token errors"); + assert_eq!( + spec.jobs[0].command, + "echo ${TORC_JOB_ID} ${HOME} /scratch/proj/run.sh" + ); + assert_eq!( + spec.jobs[0] + .env + .as_ref() + .and_then(|e| e.get("OUT")) + .map(String::as_str), + Some("${TORC_JOB_ID}.log") + ); + } + + #[test] + fn test_workflow_variables_undefined_token_inside_variable_value() { + // A typo in a variable's value should be rejected at load time; otherwise + // it would silently propagate to wherever the variable is used. + let yaml = r#" +name: typo_in_var_value +variables: + base_path: /scratch/proj + bad: "{baes_path}/sub" +jobs: + - name: t + command: "echo {bad}" +"#; + let err = WorkflowSpec::from_spec_file_content(yaml, "yaml") + .expect_err("typo inside a variable value must be rejected"); + let msg = err.to_string(); + assert!( + msg.contains("variable 'bad'"), + "error should name the offending variable, got: {msg}" + ); + assert!( + msg.contains("baes_path"), + "error should name the typo, got: {msg}" + ); + } + + #[test] + fn test_workflow_variables_substitution_preserves_shell_expansion_with_colliding_name() { + // Even when a workflow variable's name matches a shell variable used + // in the spec via `${...}` syntax, the substitution must leave the + // shell expansion alone. Naive `string.replace` would corrupt + // `${HOME}` into `$`; the workflow-variables substituter is + // `${...}`-aware to avoid that. + let yaml = r#" +name: shell_collision +variables: + HOME: /should/not/leak + base: /scratch/proj +jobs: + - name: t + command: "echo ${HOME} {base}/run.sh" + env: + OUT_DIR: "${HOME}-{base}" +"#; + let spec = WorkflowSpec::from_spec_file_content(yaml, "yaml") + .expect("variable named HOME must not corrupt ${HOME}"); + assert_eq!( + spec.jobs[0].command, "echo ${HOME} /scratch/proj/run.sh", + "${{HOME}} must be preserved verbatim even though `HOME` is a workflow variable" + ); + assert_eq!( + spec.jobs[0] + .env + .as_ref() + .and_then(|e| e.get("OUT_DIR")) + .map(String::as_str), + Some("${HOME}-/scratch/proj"), + "${{HOME}} in env must also be preserved while {{base}} is substituted" + ); + } + + #[test] + fn test_workflow_variables_value_with_parameter_reference_rejected() { + // Variable values must be plain literal strings; even a + // valid-looking `{i}` referencing a parameter is rejected so the rule + // stays uniform. Composition belongs at the use site. + let yaml = r#" +name: param_ref_in_var_value +variables: + output_pattern: "results-{i}.json" +jobs: + - name: "job_{i}" + command: "echo {output_pattern}" + parameters: + i: "1:3" +"#; + let err = WorkflowSpec::from_spec_file_content(yaml, "yaml") + .expect_err("parameter reference inside a variable value must be rejected"); + let msg = err.to_string(); + assert!( + msg.contains("must be plain literal strings"), + "expected literal-only error, got: {msg}" + ); + assert!( + msg.contains("variable 'output_pattern'"), + "error should name the offending variable, got: {msg}" + ); + } + + #[test] + fn test_workflow_variables_value_referencing_another_variable_rejected() { + // Variable values must not reference other variables: HashMap iteration + // order would otherwise determine whether the inner reference resolves + // or leaks through as a literal token. + let yaml = r#" +name: nested_vars_rejected +variables: + base: /scratch + inputs: "{base}/inputs" +jobs: + - name: t + command: "ls {inputs}" +"#; + let err = WorkflowSpec::from_spec_file_content(yaml, "yaml") + .expect_err("variable referencing another variable must be rejected"); + let msg = err.to_string(); + assert!( + msg.contains("references another variable"), + "expected explicit cross-variable error, got: {msg}" + ); + assert!( + msg.contains("variable 'inputs'") && msg.contains("{base}"), + "error should name both the variable and its bad reference, got: {msg}" + ); + } + + #[test] + fn test_workflow_variables_invalid_name_rejected() { + // Variable names must be valid identifiers so they participate in typo + // detection and serialize cleanly to KDL. + let yaml = r#" +name: bad_var_name +variables: + "foo.bar": x +jobs: + - name: t + command: echo +"#; + let err = WorkflowSpec::from_spec_file_content(yaml, "yaml") + .expect_err("non-identifier variable name must be rejected"); + let msg = err.to_string(); + assert!( + msg.contains("must be a valid identifier"), + "expected identifier-validation error, got: {msg}" + ); + assert!( + msg.contains("foo.bar"), + "error should name the offending name, got: {msg}" + ); + } + + #[test] + fn test_workflow_variables_round_trip_json() { + let yaml = r#" +name: roundtrip +variables: + base: /a/b +jobs: + - name: t + command: "{base}/run" +"#; + let spec = WorkflowSpec::from_spec_file_content(yaml, "yaml").unwrap(); + let json = serde_json::to_string(&spec).unwrap(); + // Reparsing the serialized form must succeed (substitutions are baked in, + // so the variables map is harmless on a second pass). + let spec2 = WorkflowSpec::from_spec_file_content(&json, "json").unwrap(); + assert_eq!(spec.jobs[0].command, spec2.jobs[0].command); + assert_eq!(spec.variables, spec2.variables); + } + + fn assert_variables_demo_substituted(spec: &WorkflowSpec) { + assert_eq!(spec.name, "variables_demo"); + let prepare = spec + .jobs + .iter() + .find(|j| j.name == "prepare_inputs") + .expect("prepare_inputs job present"); + assert_eq!( + prepare.command, + "python prepare.py --in /scratch/proj42/raw --out /scratch/proj42/clean" + ); + let train = spec + .jobs + .iter() + .find(|j| j.name.starts_with("train_")) + .expect("train job template present"); + assert!( + train.command.contains("--img pytorch:2.4"), + "image_tag should be substituted, got: {}", + train.command + ); + assert!( + train.command.contains("--in /scratch/proj42/clean"), + "data_root should be substituted, got: {}", + train.command + ); + let aggregate = spec + .jobs + .iter() + .find(|j| j.name == "aggregate") + .expect("aggregate job present"); + assert_eq!( + aggregate.command, + "python aggregate.py --in /shared/proj42/results --tag proj42" + ); + let env = spec.env.as_ref().expect("env block present"); + assert_eq!(env.get("PROJECT").map(String::as_str), Some("proj42")); + assert_eq!(env.get("IMAGE").map(String::as_str), Some("pytorch:2.4")); + let schedulers = spec + .slurm_schedulers + .as_ref() + .expect("slurm_schedulers present"); + for sched in schedulers { + assert_eq!( + sched.account, "my_hpc_account", + "scheduler account should be substituted, got: {}", + sched.account + ); + } + } + + #[test] + fn test_variables_demo_yaml_example() { + let path = + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("examples/yaml/variables_demo.yaml"); + let mut spec = + WorkflowSpec::from_spec_file(&path).expect("YAML variables_demo should parse"); + spec.expand_parameters() + .expect("YAML variables_demo should expand"); + // 3 jobs declared; train_{i:02d} expands i=1..=4, plus prepare and aggregate -> 6 total. + assert_eq!(spec.jobs.len(), 6); + assert_variables_demo_substituted(&spec); + } + + #[test] + fn test_variables_demo_json_example() { + let path = + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("examples/json/variables_demo.json"); + let mut spec = + WorkflowSpec::from_spec_file(&path).expect("JSON variables_demo should parse"); + spec.expand_parameters() + .expect("JSON variables_demo should expand"); + assert_eq!(spec.jobs.len(), 6); + assert_variables_demo_substituted(&spec); + } + + #[test] + fn test_variables_demo_json5_example() { + let path = + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("examples/json/variables_demo.json5"); + let mut spec = + WorkflowSpec::from_spec_file(&path).expect("JSON5 variables_demo should parse"); + spec.expand_parameters() + .expect("JSON5 variables_demo should expand"); + assert_eq!(spec.jobs.len(), 6); + assert_variables_demo_substituted(&spec); + } + + #[test] + fn test_variables_demo_kdl_example() { + let path = + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("examples/kdl/variables_demo.kdl"); + let mut spec = + WorkflowSpec::from_spec_file(&path).expect("KDL variables_demo should parse"); + spec.expand_parameters() + .expect("KDL variables_demo should expand"); + assert_eq!(spec.jobs.len(), 6); + assert_variables_demo_substituted(&spec); + } }