diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index ff66b399b..4295cf1e7 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -60,6 +60,8 @@ jobs: run: cargo clippy --all --all-targets --all-features -- -D warnings - name: Check Rust-owned OpenAPI artifacts + env: + DATABASE_URL: sqlite:db/sqlite/dev.db run: | cargo test --lib --no-default-features --features openapi-codegen bash api/check_openapi_codegen_parity.sh diff --git a/.gitignore b/.gitignore index 3d8977cbb..c2669a028 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,4 @@ output torc_output .mcp.json .dprint-cache/ +/tmp diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index 1d70033b5..beda51e72 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -126,6 +126,7 @@ - [AI-Assisted Recovery Design](./specialized/design/ai-assisted-recovery.md) - [Workflow Graph](./specialized/design/workflow-graph.md) - [Interface Architecture](./specialized/design/interfaces.md) + - [RO-Crate Generation Design](./specialized/design/ro-crate.md) - [API Generation Architecture](./specialized/design/api-generation.md) - [Slurm Job Step Monitoring](./specialized/design/srun-monitoring.md) diff --git a/docs/src/core/concepts/ro-crate.md b/docs/src/core/concepts/ro-crate.md index f53b1cf93..b62c8dcd2 100644 --- a/docs/src/core/concepts/ro-crate.md +++ b/docs/src/core/concepts/ro-crate.md @@ -31,8 +31,9 @@ other research object with JSON-LD properties. Entities can be: ### Always recorded (all workflows) During workflow initialization, Torc creates **SoftwareApplication** entities for the torc binaries -(server, job runner, etc.) that processed the workflow. These record the software name and version, -providing a baseline provenance record even when full RO-Crate tracking is not enabled. +(server, CLI, job runner, etc.) that processed the workflow. In the current model, these are written +as both `SoftwareApplication` and `prov:SoftwareAgent` so the exported RO-Crate uses a PROV-shaped +provenance model. ### When `enable_ro_crate: true` @@ -42,12 +43,15 @@ When you enable RO-Crate on a workflow, Torc additionally creates file and job p - File entities are created for all **input files** (files that exist on disk) - Entities include MIME type inference, file size, and modification date +- Torc creates workflow-level provenance entities: `#torc-workflow` and `#torc-run-{run_id}` **When jobs complete successfully:** - File entities are created for all **output files** - CreateAction entities are created for each job (provenance) -- Output files are linked to their producing job via `wasGeneratedBy` +- Output files are linked to their producing job via `prov:wasGeneratedBy` +- Output files are linked to the workflow run via `prov:wasAttributedTo` +- Output files are linked to file inputs via `prov:wasDerivedFrom` This creates a complete provenance graph linking inputs → jobs → outputs. @@ -58,13 +62,15 @@ Automatically generated File entities include: ```json { "@id": "data/output.csv", - "@type": "File", + "@type": ["File", "prov:Entity"], "name": "output.csv", "encodingFormat": "text/csv", "contentSize": 1024, "sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", "dateModified": "2024-01-01T00:00:00Z", - "wasGeneratedBy": { "@id": "#job-42-attempt-1" } + "prov:wasGeneratedBy": { "@id": "#job-42-attempt-1" }, + "prov:wasAttributedTo": { "@id": "#torc-run-1" }, + "prov:wasDerivedFrom": { "@id": "data/input.csv" } } ``` @@ -75,13 +81,18 @@ Job provenance is captured as CreateAction entities: ```json { "@id": "#job-42-attempt-1", - "@type": "CreateAction", + "@type": ["CreateAction", "prov:Activity"], "name": "process_data", - "instrument": { "@id": "#workflow-123" }, + "prov:hadPlan": { "@id": "#torc-workflow" }, + "isPartOf": { "@id": "#torc-run-1" }, + "instrument": { "@id": "#software-torc-run-1" }, + "prov:used": { "@id": "data/input.csv" }, "result": [{ "@id": "data/output.csv" }] } ``` +The exported `@context` includes the `prov` namespace. + ## Enabling Automatic RO-Crate Add `enable_ro_crate: true` to your workflow specification: @@ -111,8 +122,9 @@ workflow is created. Files that exist are marked as inputs; files that don't exi After running this workflow: - `input_data` will have an RO-Crate File entity (created during initialization) -- `output_data` will have an RO-Crate File entity with `wasGeneratedBy` linking to the job +- `output_data` will have an RO-Crate File entity with `prov:wasGeneratedBy` linking to the job - A CreateAction entity will describe the `process` job execution +- `#torc-workflow` and `#torc-run-{run_id}` will describe the workflow plan and run activity ## Dataset Entities for Directories @@ -149,11 +161,11 @@ Dataset entities include file count, total size, and an optional hash: Torc supports three hash modes for datasets: -| Mode | Description | Speed | Detects | -| ---------- | ----------------------------------------- | ------- | ---------------------------------- | -| `manifest` | Hash of sorted (path, size, mtime) list | Fast | File additions, deletions, renames | -| `content` | SHA256 of all file contents (Merkle tree) | Slow | Any content change | -| `none` | No hash, only file count and size | Fastest | Nothing (stats only) | +| Mode | Description | Speed | Detects | +| ---------- | --------------------------------- | ------- | -------------------------- | +| `manifest` | Hash of sorted path/size/mtime | Fast | Additions, deletions, move | +| `content` | SHA256 of all file contents | Slow | Any content change | +| `none` | No hash, only file count and size | Fastest | Nothing | For large datasets, `manifest` mode provides a good balance—it detects structural changes without the I/O cost of reading terabytes of data. diff --git a/docs/src/core/how-to/ro-crate-metadata.md b/docs/src/core/how-to/ro-crate-metadata.md index 45c1d4054..8a7b6477a 100644 --- a/docs/src/core/how-to/ro-crate-metadata.md +++ b/docs/src/core/how-to/ro-crate-metadata.md @@ -36,8 +36,10 @@ When automatic generation is enabled: - **Input files** (files that exist on disk) get File entities created during workflow initialization -- **Output files** get File entities with provenance (`wasGeneratedBy`) created when jobs complete -- **Jobs** get CreateAction entities linking to their output files +- **Output files** get File entities with provenance (`prov:wasGeneratedBy`) created when jobs + complete +- **Jobs** get CreateAction entities linking inputs, outputs, plan, and run metadata +- **Workflow runs** get `#torc-workflow` and `#torc-run-{run_id}` provenance entities After running the workflow, export the metadata: @@ -50,10 +52,11 @@ The exported document includes complete provenance: ```json { "@id": "data/output.csv", - "@type": "File", + "@type": ["File", "prov:Entity"], "name": "output.csv", "encodingFormat": "text/csv", - "wasGeneratedBy": { "@id": "#job-1-attempt-1" } + "prov:wasGeneratedBy": { "@id": "#job-1-attempt-1" }, + "prov:wasAttributedTo": { "@id": "#torc-run-1" } } ``` @@ -86,7 +89,7 @@ Each RO-Crate entity has: | `file_id` | Optional link to a Torc file record | Entities are stored per-workflow. The `export` command assembles them into a complete RO-Crate -document with the required metadata descriptor and root dataset. +document with the required metadata descriptor, root dataset, and PROV-aware context. ## Creating Entities @@ -300,7 +303,10 @@ The exported document has this structure: ```json { - "@context": "https://w3id.org/ro/crate/1.1/context", + "@context": [ + "https://w3id.org/ro/crate/1.1/context", + { "prov": "http://www.w3.org/ns/prov#" } + ], "@graph": [ { "@id": "ro-crate-metadata.json", @@ -319,13 +325,14 @@ The exported document has this structure: }, { "@id": "data/output.parquet", - "@type": "File", + "@type": ["File", "prov:Entity"], "name": "Simulation Output", - "encodingFormat": "application/x-parquet" + "encodingFormat": "application/x-parquet", + "prov:wasGeneratedBy": {"@id": "#job-1-attempt-1"} }, { "@id": "https://example.com/simulation/v2.1", - "@type": "SoftwareApplication", + "@type": ["SoftwareApplication", "prov:SoftwareAgent"], "name": "My Simulation", "version": "2.1.0" } @@ -333,8 +340,8 @@ The exported document has this structure: } ``` -The `@id` and `@type` fields are always set from the entity record, overriding any values in the -metadata JSON. +Torc preserves any explicit `@id` and `@type` already present in the stored metadata. If either +field is missing, the exporter fills it in from the entity record. ## Workflow Export/Import diff --git a/docs/src/specialized/design/index.md b/docs/src/specialized/design/index.md index 295db76dc..f9ac6830a 100644 --- a/docs/src/specialized/design/index.md +++ b/docs/src/specialized/design/index.md @@ -13,5 +13,6 @@ Internal design documentation for developers. - [AI-Assisted Recovery Design](./ai-assisted-recovery.md) - AI-assisted error classification - [Workflow Graph](./workflow-graph.md) - Dependency graph implementation - [Interface Architecture](./interfaces.md) - Interface design patterns +- [RO-Crate Generation Design](./ro-crate.md) - RO-Crate entity lifecycle and provenance flow - [Slurm Job Step Monitoring](./srun-monitoring.md) - srun wrapping, sstat monitoring, sacct collection diff --git a/docs/src/specialized/design/ro-crate.md b/docs/src/specialized/design/ro-crate.md new file mode 100644 index 000000000..cebbd0c02 --- /dev/null +++ b/docs/src/specialized/design/ro-crate.md @@ -0,0 +1,138 @@ +# RO-Crate Generation Design + +This page describes how Torc creates and updates automatic RO-Crate provenance entities in the +current branch. + +## Current Model + +The important identity rules are: + +- Workflow plan entity: one per workflow, `#torc-workflow` +- Workflow run entity: one per run, `#torc-run-{run_id}` +- Torc software entities: one per run, `#software-{binary_name}-run-{run_id}` +- Job execution entities: one per job attempt, `#job-{job_id}-attempt-{attempt_id}` +- File entities: one per file record/path, updated in place across runs + +That last point is why `build_file_entity()` does not take `run_id`. Plain file entities are not +modeled as run-scoped records. Run-scoped provenance is attached through relationships: + +- Output files link to the workflow run with `prov:wasAttributedTo` +- Output files link to the producing job with `prov:wasGeneratedBy` +- Job `CreateAction` entities link to the run with `isPartOf` +- Job `CreateAction` entities link to software agents with `instrument` and `prov:wasAssociatedWith` + +If `run_id` were written directly into the base file entity metadata again, it would mix a stable +file identity with run-specific state. The current code instead keeps file identity stable and +updates the same file entity as a file moves from "input known at initialization" to "output with +provenance after job completion". + +This design is also consistent with the multi-run behavior covered by +`test_auto_ro_crate_second_run_replaces_entities`: file entities are replaced in place, while +software and job execution entities accumulate across runs and attempts. + +## Entity Creation Flow + +```mermaid +flowchart TD + A[Workflow initialize_jobs] --> B{enable_ro_crate?} + A --> C[Server creates
#software-torc-server-run-N] + A --> D[Client attempts to create
#software-torc-run-N
and optional
#software-torc-slurm-job-runner-run-N] + + B -->|yes| E[Server upserts input File entities
from DB rows with st_mtime] + B -->|yes| F[Client creates or updates
#torc-workflow and #torc-run-N] + B -->|yes| G[Client creates or updates
input File entities] + B -->|no| H[No automatic file provenance] + + G --> I[Workflow execution] + E --> I + F --> I + C --> I + D --> I + + I --> J[Job completes successfully] + J --> J2{Job has output files?} + J2 -->|yes| K[Client refreshes
#torc-workflow and #torc-run-N] + J2 -->|yes| L[Client creates
#job-job_id-attempt-attempt_id] + J2 -->|yes| M[Client creates or updates
output File entity] + J2 -->|no| P[No additional automatic
RO-Crate entities for this job] + + L --> N[Job CreateAction metadata] + N --> N1[prov:hadPlan -> #torc-workflow] + N --> N2[isPartOf -> #torc-run-N] + N --> N3[instrument -> #software-torc-run-N] + N --> N4[prov:used -> input file paths] + N --> N5[result -> output file paths] + + M --> O[Output File metadata] + O --> O1[prov:wasGeneratedBy -> job CreateAction] + O --> O2[prov:wasAttributedTo -> #torc-run-N] + O --> O3[prov:wasDerivedFrom -> input file paths] + + classDef init fill:#dbeafe,stroke:#1d4ed8,color:#0f172a,stroke-width:2px; + classDef software fill:#dcfce7,stroke:#15803d,color:#0f172a,stroke-width:2px; + classDef input fill:#fef3c7,stroke:#b45309,color:#0f172a,stroke-width:2px; + classDef run fill:#ede9fe,stroke:#6d28d9,color:#0f172a,stroke-width:2px; + classDef job fill:#fee2e2,stroke:#b91c1c,color:#0f172a,stroke-width:2px; + classDef output fill:#cffafe,stroke:#0f766e,color:#0f172a,stroke-width:2px; + classDef disabled fill:#e5e7eb,stroke:#4b5563,color:#111827,stroke-dasharray: 5 3; + + class A,I,J init; + class C,D software; + class E,G input; + class F,K run; + class L,N,N1,N2,N3,N4,N5 job; + class M,O,O1,O2,O3 output; + class H,P disabled; +``` + +## What Gets Created + +### Torc binaries + +- The server always creates `#software-torc-server-run-{run_id}` during `initialize_jobs()` +- The client attempts to create run-scoped software entities for `torc` and, on Linux, + `torc-slurm-job-runner` +- Client-side software entities are skipped when the corresponding binary cannot be found next to + the current executable or on `PATH` +- These are `SoftwareApplication` plus `prov:SoftwareAgent` + +### Jobs + +- The client creates one `CreateAction` per successful job completion **that has at least one output + file** +- The entity id is `#job-{job_id}-attempt-{attempt_id}` +- Jobs with no output files currently do not emit an automatic `CreateAction` +- When present, the job entity is the main join point between inputs, outputs, workflow run, and + software agents + +### Input files + +- Input files are detected by `st_mtime IS NOT NULL` +- During initialization, both the server and the client currently upsert the same input file entity +- The entity is keyed by workflow and `file_id`, with `entity_id = file.path` +- Input file entities are expected to exist before jobs run, but the code does not rely on them + being create-only; it is intentionally upsert-based + +### Output files + +- Output file entities are created or replaced after a job succeeds and the file record has been + refreshed +- If a file already had an entity from initialization or a prior run, the same DB row is updated + rather than creating a new file entity for each run +- Run-specific provenance is recorded in the metadata relationships, not by giving the file entity a + run-specific identity +- The same successful-job path also refreshes `#torc-workflow`, refreshes `#torc-run-{run_id}`, and + creates the job `CreateAction`, but only when there is at least one output file to process + +## Important Asymmetries + +- Software entities are run-scoped and accumulate across runs +- Job `CreateAction` entities are attempt-scoped and accumulate across attempts +- File entities are file-scoped and are replaced in place across runs + +These asymmetries are intentional and match `tests/test_auto_ro_crate.rs`, especially +`test_auto_ro_crate_second_run_replaces_entities`, which expects: + +- file entity count to stay stable across runs +- software entity count to grow across runs +- output file provenance to point at the newer `#torc-run-{run_id}` diff --git a/src/client/commands/ro_crate.rs b/src/client/commands/ro_crate.rs index 597d2be4a..e0843e566 100644 --- a/src/client/commands/ro_crate.rs +++ b/src/client/commands/ro_crate.rs @@ -12,6 +12,7 @@ use crate::client::commands::{ use crate::models; use rayon::prelude::*; use sha2::{Digest, Sha256}; +use std::collections::HashSet; use tabled::Tabled; #[derive(Tabled)] @@ -332,7 +333,7 @@ pub fn handle_ro_crate_commands(config: &Configuration, command: &RoCrateCommand Some(id) => *id, None => select_workflow_interactively(config, &user_name).unwrap(), }; - handle_export(config, selected_workflow_id, output.as_deref(), format); + handle_export(config, selected_workflow_id, output.as_deref()); } RoCrateCommands::AddDataset { workflow_id, @@ -375,15 +376,10 @@ fn read_metadata_input(metadata: &str) -> String { } } -fn handle_export( - config: &Configuration, - workflow_id: i64, - output_path: Option<&str>, - format: &str, -) { - // Fetch the workflow name for the root dataset - let workflow_name = match apis::workflows_api::get_workflow(config, workflow_id) { - Ok(w) => w.name, +fn handle_export(config: &Configuration, workflow_id: i64, output_path: Option<&str>) { + // Fetch the workflow name and current run for the root dataset. + let (workflow_name, run_id) = match apis::workflows_api::get_workflow(config, workflow_id) { + Ok(workflow) => (workflow.name, workflow.run_id.unwrap_or(0)), Err(e) => { print_error("getting workflow", &e); std::process::exit(1); @@ -400,61 +396,81 @@ fn handle_export( } }; - if format == "json" { - // In JSON format mode, just output the raw entities list - if let Ok(json) = serde_json::to_string_pretty(&entities) { - println!("{}", json); + let mut existing_ids: HashSet = entities.iter().map(|e| e.entity_id.clone()).collect(); + let run_entity_id = format!("#torc-run-{}", run_id); + let mut synthetic_entities: Vec = Vec::new(); + + if !existing_ids.contains("#torc-workflow") { + synthetic_entities.push(serde_json::json!({ + "@id": "#torc-workflow", + "@type": ["SoftwareApplication", "prov:Plan"], + "name": workflow_name.clone() + })); + existing_ids.insert("#torc-workflow".to_string()); + } + + if !existing_ids.contains(&run_entity_id) { + let run_entity = serde_json::json!({ + "@id": run_entity_id.clone(), + "@type": ["CreateAction", "prov:Activity"], + "name": format!("{} Run {}", workflow_name, run_id), + "prov:hadPlan": { "@id": "#torc-workflow" }, + "instrument": { "@id": format!("#software-torc-run-{}", run_id) }, + "prov:wasAssociatedWith": [ + { "@id": format!("#software-torc-run-{}", run_id) }, + { "@id": format!("#software-torc-server-run-{}", run_id) } + ] + }); + synthetic_entities.push(run_entity); + } + + // Build user and synthetic entities first so hasPart can include the final set. + let mut graph_entities: Vec = synthetic_entities; + for entity in &entities { + if let Ok(mut parsed) = serde_json::from_str::(&entity.metadata) { + if let Some(obj) = parsed.as_object_mut() { + obj.entry("@id".to_string()) + .or_insert_with(|| serde_json::json!(entity.entity_id)); + obj.entry("@type".to_string()) + .or_insert_with(|| serde_json::json!(entity.entity_type)); + } + graph_entities.push(parsed); + } else { + graph_entities.push(serde_json::json!({ + "@id": entity.entity_id, + "@type": entity.entity_type + })); } - return; } - // Build the RO-Crate metadata document - let mut graph: Vec = Vec::new(); + let has_part: Vec = graph_entities + .iter() + .filter_map(|entity| entity.get("@id").cloned()) + .map(|id| serde_json::json!({ "@id": id })) + .collect(); - // 1. The metadata descriptor entity + let mut graph: Vec = Vec::new(); graph.push(serde_json::json!({ "@id": "ro-crate-metadata.json", "@type": "CreativeWork", "about": {"@id": "./"}, "conformsTo": {"@id": "https://w3id.org/ro/crate/1.1"} })); - - // 2. Collect hasPart references from user entities - let has_part: Vec = entities - .iter() - .map(|e| serde_json::json!({"@id": e.entity_id})) - .collect(); - - // 3. The root dataset entity graph.push(serde_json::json!({ "@id": "./", "@type": "Dataset", "name": workflow_name, "hasPart": has_part })); - - // 4. User entities - for entity in &entities { - if let Ok(mut parsed) = serde_json::from_str::(&entity.metadata) { - // Ensure @id and @type are set from the entity record - if let Some(obj) = parsed.as_object_mut() { - obj.insert("@id".to_string(), serde_json::json!(entity.entity_id)); - obj.insert("@type".to_string(), serde_json::json!(entity.entity_type)); - } - graph.push(parsed); - } else { - // Fallback: create a minimal entity - graph.push(serde_json::json!({ - "@id": entity.entity_id, - "@type": entity.entity_type - })); - } - } + graph.extend(graph_entities); let ro_crate = serde_json::json!({ "@context": [ "https://w3id.org/ro/crate/1.1/context", - {"torc": "https://github.com/NatLabRockies/torc/terms/"} + { + "prov": "http://www.w3.org/ns/prov#", + "torc": "https://github.com/NatLabRockies/torc/terms/" + } ], "@graph": graph }); diff --git a/src/client/commands/workflow_export.rs b/src/client/commands/workflow_export.rs index 390ba9c85..ea185b167 100644 --- a/src/client/commands/workflow_export.rs +++ b/src/client/commands/workflow_export.rs @@ -229,7 +229,7 @@ impl IdMappings { /// /// This handles: /// - entity_id patterns like `#job-{old_id}-attempt-{n}` - /// - metadata JSON containing `wasGeneratedBy: {"@id": "#job-{old_id}-attempt-{n}"}` + /// - metadata JSON containing `prov:wasGeneratedBy: {"@id": "#job-{old_id}-attempt-{n}"}` /// /// Returns the updated (entity_id, metadata) tuple. pub fn remap_ro_crate_job_ids(&self, entity_id: &str, metadata: &str) -> (String, String) { @@ -291,8 +291,9 @@ mod tests { let mut mappings = IdMappings::new(); mappings.jobs.insert(42, 99); - // Test wasGeneratedBy in metadata - let metadata = r##"{"@id": "output.csv", "wasGeneratedBy": {"@id": "#job-42-attempt-1"}}"##; + // Test prov:wasGeneratedBy in metadata + let metadata = + r##"{"@id": "output.csv", "prov:wasGeneratedBy": {"@id": "#job-42-attempt-1"}}"##; let (_, new_metadata) = mappings.remap_ro_crate_job_ids("output.csv", metadata); assert!(new_metadata.contains("#job-99-attempt-1")); assert!(!new_metadata.contains("#job-42-attempt-1")); diff --git a/src/client/job_runner.rs b/src/client/job_runner.rs index 2fd8f0ceb..e72a5e80c 100644 --- a/src/client/job_runner.rs +++ b/src/client/job_runner.rs @@ -47,7 +47,7 @@ use crate::client::workflow_spec::{ExecutionConfig, ExecutionMode}; use crate::config::TorcConfig; use crate::memory_utils::memory_string_to_gb; use crate::models::{ - BatchCompleteJobsRequest, ComputeNodesResources, JobCompletionEntry, JobStatus, + BatchCompleteJobsRequest, ComputeNodesResources, FileModel, JobCompletionEntry, JobStatus, ResourceRequirementsModel, ResultModel, SlurmStatsModel, WorkflowModel, }; @@ -337,6 +337,12 @@ pub struct JobRunner { is_subtask: bool, running_jobs: HashMap, job_resources: HashMap, + /// Best-effort cache of file models keyed by file_id. + /// + /// This avoids repeated `get_file` API calls when many jobs share the same + /// input files (fan-in) and also reuses file metadata across output-file + /// validation and RO-Crate provenance generation. + file_model_cache: HashMap, /// Pool of GPU device identifiers available to this runner (e.g. `"0"`, `"1"` or UUIDs). /// /// When running in direct mode, Torc sets `CUDA_VISIBLE_DEVICES` (and friends) itself @@ -540,6 +546,7 @@ impl JobRunner { ); } let job_resources: HashMap = HashMap::new(); + let file_model_cache: HashMap = HashMap::new(); let mut resources = resources; let available_gpu_devices = if execution_config.effective_mode() == ExecutionMode::Slurm { @@ -619,6 +626,7 @@ impl JobRunner { is_subtask, running_jobs, job_resources, + file_model_cache, all_gpu_devices: Vec::from(available_gpu_devices.clone()), gpu_fallback_counter: 0, available_gpu_devices, @@ -666,6 +674,25 @@ impl JobRunner { }) } + fn get_file_model_cached( + &mut self, + file_id: i64, + ) -> Result> { + if let Some(file_model) = self.file_model_cache.get(&file_id) { + return Ok(file_model.clone()); + } + + let file_model = self.send_with_retries(|| { + Self::box_retry_error(apis::files_api::get_file(&self.config, file_id)) + })?; + self.file_model_cache.insert(file_id, file_model.clone()); + Ok(file_model) + } + + fn cache_file_model(&mut self, file_id: i64, file_model: FileModel) { + self.file_model_cache.insert(file_id, file_model); + } + /// Atomically claim a workflow action for execution. /// /// This is a convenience method that wraps [`utils::claim_action`] with @@ -747,6 +774,20 @@ impl JobRunner { info!("Created output directory: {}", self.output_dir.display()); } + if self.workflow.enable_ro_crate == Some(true) { + crate::client::ro_crate_utils::create_workflow_provenance_entities( + &self.config, + self.workflow_id, + self.run_id, + &self.workflow.name, + ); + } + crate::client::ro_crate_utils::create_software_entities( + &self.config, + self.workflow_id, + self.run_id, + ); + // Check and log server version let version_result = version_check::check_version(&self.config); let server_version = version_result @@ -1462,7 +1503,7 @@ impl JobRunner { /// Validate that all expected output files exist and update their st_mtime fn validate_and_update_output_files( - &self, + &mut self, job_id: i64, output_file_ids: &Option>, ) -> Result<(), String> { @@ -1483,9 +1524,7 @@ impl JobRunner { // Fetch file models and check existence for file_id in output_file_ids { - let file_model = match self.send_with_retries(|| { - Self::box_retry_error(apis::files_api::get_file(&self.config, *file_id)) - }) { + let file_model = match self.get_file_model_cached(*file_id) { Ok(file) => file, Err(e) => { return Err(format!( @@ -1542,12 +1581,10 @@ impl JobRunner { } // Update st_mtime for all files and collect file models for RO-Crate - let mut updated_file_models: Vec = Vec::new(); + let mut updated_file_models: Vec = Vec::new(); for (file_id, st_mtime) in files_to_update { - let mut file_model = match self.send_with_retries(|| { - Self::box_retry_error(apis::files_api::get_file(&self.config, file_id)) - }) { + let mut file_model = match self.get_file_model_cached(file_id) { Ok(file) => file, Err(e) => { error!( @@ -1568,6 +1605,7 @@ impl JobRunner { }) { Ok(_) => { debug!("Updated st_mtime for file_id {} to {}", file_id, st_mtime); + self.cache_file_model(file_id, file_model.clone()); updated_file_models.push(file_model); } Err(e) => { @@ -1594,9 +1632,9 @@ impl JobRunner { /// Creates both File entities with provenance and a CreateAction entity for the job. /// This is a non-blocking operation - warnings are logged but errors don't fail the job. fn create_ro_crate_entities_for_output_files( - &self, + &mut self, job_id: i64, - output_files: &[crate::models::FileModel], + output_files: &[FileModel], ) { // Check if RO-Crate is enabled if self.workflow.enable_ro_crate != Some(true) { @@ -1627,8 +1665,23 @@ impl JobRunner { } }; - // Use run_id as the attempt_id for the CreateAction - let attempt_id = self.run_id; + let attempt_id = job.attempt_id.unwrap_or(1); + + let mut input_file_paths = Vec::new(); + if let Some(input_file_ids) = job.input_file_ids.clone() { + input_file_paths.reserve(input_file_ids.len()); + for file_id in input_file_ids { + match self.get_file_model_cached(file_id) { + Ok(file) => input_file_paths.push(file.path), + Err(e) => { + warn!( + "Could not fetch input file {} for RO-Crate creation on job {}: {}", + file_id, job_id, e + ); + } + } + } + } // Collect output file paths for the CreateAction let output_file_paths: Vec = output_files.iter().map(|f| f.path.clone()).collect(); @@ -1640,6 +1693,7 @@ impl JobRunner { self.run_id, &job, attempt_id, + &input_file_paths, &output_file_paths, ); @@ -1656,6 +1710,7 @@ impl JobRunner { content_size, job_id, attempt_id, + &input_file_paths, ); } } diff --git a/src/client/ro_crate_utils.rs b/src/client/ro_crate_utils.rs index f978831e9..c8f41cbc9 100644 --- a/src/client/ro_crate_utils.rs +++ b/src/client/ro_crate_utils.rs @@ -7,6 +7,7 @@ use crate::client::apis; use crate::client::apis::configuration::Configuration; use crate::client::version_check; use crate::models::{FileModel, JobModel, RoCrateEntityModel}; +use crate::ro_crate_json_ld::typed_entity; use chrono::{DateTime, Utc}; use log::{debug, warn}; use sha2::{Digest, Sha256}; @@ -14,6 +15,20 @@ use std::fs::File; use std::io::{BufReader, Read as IoRead}; use std::path::Path; +fn id_ref(id: impl AsRef) -> serde_json::Value { + serde_json::json!({ "@id": id.as_ref() }) +} + +fn refs_value(ids: &[String]) -> Option { + match ids { + [] => None, + [id] => Some(id_ref(id)), + ids => Some(serde_json::Value::Array( + ids.iter().map(id_ref).collect::>(), + )), + } +} + /// Compute the SHA256 hash of a file. /// /// Returns the hash as a lowercase hexadecimal string, or None if the file @@ -55,10 +70,9 @@ pub fn compute_file_sha256(path: &str) -> Option { /// - `contentSize`: file size (when available) /// - `sha256`: SHA256 hash (when available) /// - `dateModified`: ISO8601 from st_mtime -/// - `torc:run_id`: workflow run that recorded this entity +/// - `@type` is emitted as `["File", "prov:Entity"]` pub fn build_file_entity( workflow_id: i64, - run_id: i64, file: &FileModel, content_size: Option, sha256: Option, @@ -78,10 +92,9 @@ pub fn build_file_entity( // Build metadata JSON object let mut metadata = serde_json::json!({ "@id": file_path, - "@type": "File", + "@type": typed_entity("File", "prov:Entity"), "name": basename, - "encodingFormat": mime_type, - "torc:run_id": run_id + "encodingFormat": mime_type }); // Add content size if available @@ -112,7 +125,8 @@ pub fn build_file_entity( /// Build an RO-Crate File entity with provenance linking to a CreateAction. /// -/// For output files, includes `wasGeneratedBy` linking to the job's CreateAction entity. +/// For output files, includes `prov:wasGeneratedBy` linking to the job's CreateAction entity. +#[allow(clippy::too_many_arguments)] pub fn build_file_entity_with_provenance( workflow_id: i64, run_id: i64, @@ -121,6 +135,7 @@ pub fn build_file_entity_with_provenance( sha256: Option, job_id: i64, attempt_id: i64, + derived_from_paths: &[String], ) -> RoCrateEntityModel { let file_path = &file.path; let basename = Path::new(file_path) @@ -140,11 +155,11 @@ pub fn build_file_entity_with_provenance( // Build metadata JSON object with provenance let mut metadata = serde_json::json!({ "@id": file_path, - "@type": "File", + "@type": typed_entity("File", "prov:Entity"), "name": basename, "encodingFormat": mime_type, - "wasGeneratedBy": { "@id": create_action_id }, - "torc:run_id": run_id + "prov:wasGeneratedBy": { "@id": create_action_id }, + "prov:wasAttributedTo": id_ref(format!("#torc-run-{}", run_id)) }); // Add content size if available @@ -163,6 +178,10 @@ pub fn build_file_entity_with_provenance( metadata["dateModified"] = serde_json::json!(datetime.to_rfc3339()); } + if let Some(derived_from) = refs_value(derived_from_paths) { + metadata["prov:wasDerivedFrom"] = derived_from; + } + RoCrateEntityModel { id: None, workflow_id, @@ -177,15 +196,17 @@ pub fn build_file_entity_with_provenance( /// /// Creates a JSON-LD entity representing the job execution: /// - `@id`: `#job-{job_id}-attempt-{attempt_id}` -/// - `@type`: "CreateAction" +/// - `@type`: `["CreateAction", "prov:Activity"]` /// - `name`: job name -/// - `instrument`: reference to workflow +/// - `prov:hadPlan`: reference to the workflow plan entity +/// - `instrument`: reference to the run-specific software agent /// - `result`: references to output file entities pub fn build_create_action_entity( workflow_id: i64, run_id: i64, job: &JobModel, attempt_id: i64, + input_file_paths: &[String], output_file_paths: &[String], ) -> RoCrateEntityModel { let action_id = format!("#job-{}-attempt-{}", job.id.unwrap_or(0), attempt_id); @@ -196,15 +217,25 @@ pub fn build_create_action_entity( .map(|path| serde_json::json!({ "@id": path })) .collect(); - let metadata = serde_json::json!({ + let mut metadata = serde_json::json!({ "@id": action_id, - "@type": "CreateAction", + "@type": typed_entity("CreateAction", "prov:Activity"), "name": job.name, - "instrument": { "@id": format!("#workflow-{}", workflow_id) }, + "prov:hadPlan": id_ref("#torc-workflow"), + "isPartOf": id_ref(format!("#torc-run-{}", run_id)), + "instrument": id_ref(format!("#software-torc-run-{}", run_id)), "result": results, - "torc:run_id": run_id + "prov:wasAssociatedWith": [ + id_ref(format!("#software-torc-run-{}", run_id)), + id_ref(format!("#software-torc-server-run-{}", run_id)) + ] }); + if let Some(inputs) = refs_value(input_file_paths) { + metadata["object"] = inputs.clone(); + metadata["prov:used"] = inputs; + } + RoCrateEntityModel { id: None, workflow_id, @@ -215,6 +246,99 @@ pub fn build_create_action_entity( } } +fn build_workflow_plan_entity(workflow_id: i64, workflow_name: &str) -> RoCrateEntityModel { + let metadata = serde_json::json!({ + "@id": "#torc-workflow", + "@type": typed_entity("SoftwareApplication", "prov:Plan"), + "name": workflow_name + }); + + RoCrateEntityModel { + id: None, + workflow_id, + file_id: None, + entity_id: "#torc-workflow".to_string(), + entity_type: "SoftwareApplication".to_string(), + metadata: metadata.to_string(), + } +} + +fn build_workflow_run_entity_base( + workflow_id: i64, + run_id: i64, + workflow_name: &str, +) -> RoCrateEntityModel { + let metadata = serde_json::json!({ + "@id": format!("#torc-run-{}", run_id), + "@type": typed_entity("CreateAction", "prov:Activity"), + "name": format!("{} Run {}", workflow_name, run_id), + "prov:hadPlan": id_ref("#torc-workflow"), + "instrument": id_ref(format!("#software-torc-run-{}", run_id)), + "prov:wasAssociatedWith": [ + id_ref(format!("#software-torc-run-{}", run_id)), + id_ref(format!("#software-torc-server-run-{}", run_id)) + ] + }); + + RoCrateEntityModel { + id: None, + workflow_id, + file_id: None, + entity_id: format!("#torc-run-{}", run_id), + entity_type: "CreateAction".to_string(), + metadata: metadata.to_string(), + } +} + +fn apply_workflow_run_entity_times( + entity: &mut RoCrateEntityModel, + start_time: DateTime, + end_time: Option>, +) -> bool { + let mut metadata = match serde_json::from_str::(&entity.metadata) { + Ok(value) => value, + Err(e) => { + warn!( + "Failed to parse RO-Crate run entity metadata for '{}': {}", + entity.entity_id, e + ); + return false; + } + }; + + let Some(obj) = metadata.as_object_mut() else { + warn!( + "RO-Crate run entity metadata for '{}' is not a JSON object", + entity.entity_id + ); + return false; + }; + + obj.insert( + "startTime".to_string(), + serde_json::json!(start_time.to_rfc3339()), + ); + if let Some(end_time) = end_time { + obj.insert( + "endTime".to_string(), + serde_json::json!(end_time.to_rfc3339()), + ); + } else { + obj.remove("endTime"); + } + + entity.metadata = metadata.to_string(); + true +} + +fn parse_entity_datetime(entity: &RoCrateEntityModel, field: &str) -> Option> { + let metadata = serde_json::from_str::(&entity.metadata).ok()?; + let value = metadata.get(field)?.as_str()?; + chrono::DateTime::parse_from_rfc3339(value) + .ok() + .map(|dt| dt.with_timezone(&Utc)) +} + /// Find an existing RO-Crate entity for a file. /// /// Returns the entity if one with the given file_id already exists, None otherwise. @@ -232,6 +356,20 @@ pub fn find_entity_for_file( } } +pub fn find_entity_by_entity_id( + config: &Configuration, + workflow_id: i64, + entity_id: &str, +) -> Option { + match apis::ro_crate_api::find_ro_crate_entity_by_entity_id(config, workflow_id, entity_id) { + Ok(entity) => entity, + Err(e) => { + warn!("Failed to check for existing RO-Crate entities: {}", e); + None + } + } +} + /// Check if an RO-Crate entity already exists for a file. /// /// Returns true if an entity with the given file_id already exists. @@ -239,17 +377,118 @@ pub fn entity_exists_for_file(config: &Configuration, workflow_id: i64, file_id: find_entity_for_file(config, workflow_id, file_id).is_some() } +fn create_or_update_entity_by_entity_id( + config: &Configuration, + workflow_id: i64, + entity: RoCrateEntityModel, +) { + if let Some(existing) = find_entity_by_entity_id(config, workflow_id, &entity.entity_id) { + let entity_db_id = match existing.id { + Some(id) => id, + None => { + warn!("Existing entity has no ID, cannot update"); + return; + } + }; + + let updated_entity = RoCrateEntityModel { + id: Some(entity_db_id), + ..entity + }; + + if let Err(e) = apis::ro_crate_entities_api::update_ro_crate_entity( + config, + entity_db_id, + updated_entity, + ) { + warn!( + "Failed to update RO-Crate entity '{}' (entity_id={}): {}", + existing.entity_type, existing.entity_id, e + ); + } + return; + } + + if let Err(e) = apis::ro_crate_entities_api::create_ro_crate_entity(config, entity) { + warn!("Failed to create RO-Crate entity: {}", e); + } +} + +fn create_or_update_run_entity( + config: &Configuration, + workflow_id: i64, + run_id: i64, + workflow_name: &str, +) { + let run_entity_id = format!("#torc-run-{}", run_id); + if let Some(existing_run_entity) = find_entity_by_entity_id(config, workflow_id, &run_entity_id) + { + let entity_db_id = match existing_run_entity.id { + Some(id) => id, + None => { + warn!("Existing run entity has no ID, cannot update"); + return; + } + }; + + let start_time = + parse_entity_datetime(&existing_run_entity, "startTime").unwrap_or_else(Utc::now); + let end_time = parse_entity_datetime(&existing_run_entity, "endTime"); + let mut updated_run_entity = RoCrateEntityModel { + id: Some(entity_db_id), + ..build_workflow_run_entity_base(workflow_id, run_id, workflow_name) + }; + if !apply_workflow_run_entity_times(&mut updated_run_entity, start_time, end_time) { + return; + } + + if let Err(e) = apis::ro_crate_entities_api::update_ro_crate_entity( + config, + entity_db_id, + updated_run_entity, + ) { + warn!( + "Failed to update RO-Crate run entity '{}' (entity_id={}): {}", + existing_run_entity.entity_type, existing_run_entity.entity_id, e + ); + } + return; + } + + let mut new_run_entity = build_workflow_run_entity_base(workflow_id, run_id, workflow_name); + if !apply_workflow_run_entity_times(&mut new_run_entity, Utc::now(), None) { + return; + } + if let Err(e) = apis::ro_crate_entities_api::create_ro_crate_entity(config, new_run_entity) { + warn!( + "Failed to create RO-Crate run entity '{}': {}", + run_entity_id, e + ); + } +} + +pub fn create_workflow_provenance_entities( + config: &Configuration, + workflow_id: i64, + run_id: i64, + workflow_name: &str, +) { + let plan_entity = build_workflow_plan_entity(workflow_id, workflow_name); + create_or_update_entity_by_entity_id(config, workflow_id, plan_entity); + + create_or_update_run_entity(config, workflow_id, run_id, workflow_name); +} + /// Create or replace an RO-Crate entity for a file. /// /// If an entity already exists for this file, it is updated with fresh metadata -/// (hash, size, timestamps, run_id). Otherwise a new entity is created. +/// (hash, size, timestamps). Otherwise a new entity is created. /// /// This is a non-blocking operation - warnings are logged but errors don't fail /// the calling operation. pub fn create_ro_crate_entity_for_file( config: &Configuration, workflow_id: i64, - run_id: i64, file: &FileModel, content_size: Option, ) { @@ -265,7 +504,7 @@ pub fn create_ro_crate_entity_for_file( let sha256 = compute_file_sha256(&file.path); // Build the entity - let entity = build_file_entity(workflow_id, run_id, file, content_size, sha256); + let entity = build_file_entity(workflow_id, file, content_size, sha256); // Check if entity already exists - if so, update it if let Some(existing) = find_entity_for_file(config, workflow_id, file_id) { @@ -321,10 +560,11 @@ pub fn create_ro_crate_entity_for_file( /// /// Creates the File entity and links it to the job's CreateAction. If an entity /// already exists for this file (e.g., created during initialization), updates it -/// to add the `wasGeneratedBy` provenance field. +/// to add the `prov:wasGeneratedBy` provenance field. /// /// This is a non-blocking operation - warnings are logged but errors don't fail /// the calling operation. +#[allow(clippy::too_many_arguments)] pub fn create_ro_crate_entity_for_output_file( config: &Configuration, workflow_id: i64, @@ -333,6 +573,7 @@ pub fn create_ro_crate_entity_for_output_file( content_size: Option, job_id: i64, attempt_id: i64, + derived_from_paths: &[String], ) { let file_id = match file.id { Some(id) => id, @@ -354,6 +595,7 @@ pub fn create_ro_crate_entity_for_output_file( sha256, job_id, attempt_id, + derived_from_paths, ); // Check if entity already exists - if so, replace it @@ -418,10 +660,17 @@ pub fn create_create_action_entity( run_id: i64, job: &JobModel, attempt_id: i64, + input_file_paths: &[String], output_file_paths: &[String], ) { - let entity = - build_create_action_entity(workflow_id, run_id, job, attempt_id, output_file_paths); + let entity = build_create_action_entity( + workflow_id, + run_id, + job, + attempt_id, + input_file_paths, + output_file_paths, + ); match apis::ro_crate_entities_api::create_ro_crate_entity(config, entity) { Ok(created) => { @@ -447,7 +696,6 @@ pub fn create_create_action_entity( pub fn create_entities_for_input_files( config: &Configuration, workflow_id: i64, - run_id: i64, files: &[FileModel], ) { for file in files { @@ -456,7 +704,7 @@ pub fn create_entities_for_input_files( // Get file size if the file exists let content_size = std::fs::metadata(&file.path).ok().map(|m| m.len()); - create_ro_crate_entity_for_file(config, workflow_id, run_id, file, content_size); + create_ro_crate_entity_for_file(config, workflow_id, file, content_size); } } } @@ -502,11 +750,10 @@ fn build_software_entity( let metadata = serde_json::json!({ "@id": entity_id, - "@type": "SoftwareApplication", + "@type": typed_entity("SoftwareApplication", "prov:SoftwareAgent"), "name": name, "version": version, "url": binary_path, - "torc:run_id": run_id, "torc:git_hash": git_hash, }); @@ -611,7 +858,7 @@ mod tests { st_mtime: Some(1704067200.0), // 2024-01-01T00:00:00Z }; - let entity = build_file_entity(100, 1, &file, Some(1024), None); + let entity = build_file_entity(100, &file, Some(1024), None); assert_eq!(entity.workflow_id, 100); assert_eq!(entity.file_id, Some(1)); @@ -620,11 +867,12 @@ mod tests { let metadata: serde_json::Value = serde_json::from_str(&entity.metadata).unwrap(); assert_eq!(metadata["@id"], "data/output.csv"); - assert_eq!(metadata["@type"], "File"); + assert_eq!(metadata["@type"][0], "File"); + assert_eq!(metadata["@type"][1], "prov:Entity"); assert_eq!(metadata["name"], "output.csv"); assert_eq!(metadata["encodingFormat"], "text/csv"); assert_eq!(metadata["contentSize"], 1024); - assert_eq!(metadata["torc:run_id"], 1); + assert!(metadata.get("prov:wasAttributedTo").is_none()); } #[test] @@ -637,11 +885,11 @@ mod tests { st_mtime: Some(1704067200.0), }; - let entity = build_file_entity_with_provenance(100, 1, &file, None, None, 42, 1); + let entity = build_file_entity_with_provenance(100, 1, &file, None, None, 42, 1, &[]); let metadata: serde_json::Value = serde_json::from_str(&entity.metadata).unwrap(); - assert_eq!(metadata["wasGeneratedBy"]["@id"], "#job-42-attempt-1"); - assert_eq!(metadata["torc:run_id"], 1); + assert_eq!(metadata["prov:wasGeneratedBy"]["@id"], "#job-42-attempt-1"); + assert_eq!(metadata["prov:wasAttributedTo"]["@id"], "#torc-run-1"); } #[test] @@ -654,23 +902,28 @@ mod tests { let mut job_with_id = job; job_with_id.id = Some(42); + let input_files = vec!["input/source.csv".to_string()]; let output_files = vec![ "output/result1.json".to_string(), "output/result2.json".to_string(), ]; - let entity = build_create_action_entity(100, 1, &job_with_id, 1, &output_files); + let entity = + build_create_action_entity(100, 1, &job_with_id, 1, &input_files, &output_files); assert_eq!(entity.entity_id, "#job-42-attempt-1"); assert_eq!(entity.entity_type, "CreateAction"); let metadata: serde_json::Value = serde_json::from_str(&entity.metadata).unwrap(); - assert_eq!(metadata["@type"], "CreateAction"); + assert_eq!(metadata["@type"][0], "CreateAction"); + assert_eq!(metadata["@type"][1], "prov:Activity"); assert_eq!(metadata["name"], "process_data"); - assert_eq!(metadata["instrument"]["@id"], "#workflow-100"); + assert_eq!(metadata["instrument"]["@id"], "#software-torc-run-1"); + assert_eq!(metadata["prov:hadPlan"]["@id"], "#torc-workflow"); + assert_eq!(metadata["prov:used"]["@id"], "input/source.csv"); assert!(metadata["result"].is_array()); assert_eq!(metadata["result"][0]["@id"], "output/result1.json"); - assert_eq!(metadata["torc:run_id"], 1); + assert_eq!(metadata["isPartOf"]["@id"], "#torc-run-1"); } #[test] @@ -687,7 +940,7 @@ mod tests { st_mtime: None, }; - let entity = build_file_entity(1, 1, &file, None, None); + let entity = build_file_entity(1, &file, None, None); let metadata: serde_json::Value = serde_json::from_str(&entity.metadata).unwrap(); let mime = metadata["encodingFormat"].as_str().unwrap(); @@ -711,7 +964,7 @@ mod tests { st_mtime: None, }; - let entity = build_file_entity(1, 1, &file, None, None); + let entity = build_file_entity(1, &file, None, None); let metadata: serde_json::Value = serde_json::from_str(&entity.metadata).unwrap(); let mime = metadata["encodingFormat"].as_str().unwrap(); @@ -798,7 +1051,7 @@ mod tests { }; let sha256 = Some("abc123def456".to_string()); - let entity = build_file_entity(100, 1, &file, Some(1024), sha256); + let entity = build_file_entity(100, &file, Some(1024), sha256); let metadata: serde_json::Value = serde_json::from_str(&entity.metadata).unwrap(); assert_eq!(metadata["sha256"], "abc123def456"); @@ -815,11 +1068,11 @@ mod tests { }; let sha256 = Some("deadbeef".to_string()); - let entity = build_file_entity_with_provenance(100, 1, &file, None, sha256, 42, 1); + let entity = build_file_entity_with_provenance(100, 1, &file, None, sha256, 42, 1, &[]); let metadata: serde_json::Value = serde_json::from_str(&entity.metadata).unwrap(); assert_eq!(metadata["sha256"], "deadbeef"); - assert_eq!(metadata["wasGeneratedBy"]["@id"], "#job-42-attempt-1"); + assert_eq!(metadata["prov:wasGeneratedBy"]["@id"], "#job-42-attempt-1"); } #[test] @@ -833,10 +1086,10 @@ mod tests { let metadata: serde_json::Value = serde_json::from_str(&entity.metadata).unwrap(); assert_eq!(metadata["@id"], "#software-torc-run-3"); - assert_eq!(metadata["@type"], "SoftwareApplication"); + assert_eq!(metadata["@type"][0], "SoftwareApplication"); + assert_eq!(metadata["@type"][1], "prov:SoftwareAgent"); assert_eq!(metadata["name"], "torc"); assert_eq!(metadata["url"], "/usr/local/bin/torc"); - assert_eq!(metadata["torc:run_id"], 3); // Version and git_hash are compile-time constants assert!(metadata.get("version").is_some()); assert!(metadata.get("torc:git_hash").is_some()); @@ -852,6 +1105,28 @@ mod tests { let metadata: serde_json::Value = serde_json::from_str(&entity.metadata).unwrap(); assert_eq!(metadata["name"], "torc-server"); assert_eq!(metadata["url"], "/opt/torc/torc-server"); - assert_eq!(metadata["torc:run_id"], 1); + assert_eq!(metadata["@type"][0], "SoftwareApplication"); + assert_eq!(metadata["@type"][1], "prov:SoftwareAgent"); + assert!(metadata.get("version").is_some()); + assert!(metadata.get("torc:git_hash").is_some()); + } + + #[test] + fn test_parse_entity_datetime() { + let entity = crate::models::RoCrateEntityModel { + id: Some(1), + workflow_id: 100, + file_id: None, + entity_id: "#torc-run-7".to_string(), + entity_type: "CreateAction".to_string(), + metadata: serde_json::json!({ + "startTime": "2024-01-01T00:00:00Z" + }) + .to_string(), + }; + + let parsed = parse_entity_datetime(&entity, "startTime").unwrap(); + assert_eq!(parsed.to_rfc3339(), "2024-01-01T00:00:00+00:00"); + assert!(parse_entity_datetime(&entity, "endTime").is_none()); } } diff --git a/src/client/workflow_manager.rs b/src/client/workflow_manager.rs index a0ad4b5c7..d54bc9b9d 100644 --- a/src/client/workflow_manager.rs +++ b/src/client/workflow_manager.rs @@ -687,12 +687,21 @@ impl WorkflowManager { self.create_ro_crate_entities_for_input_files(); // Always create SoftwareApplication entities for torc binaries - let run_id = self.get_run_id().unwrap_or(0); - crate::client::ro_crate_utils::create_software_entities( - &self.config, - self.workflow_id, - run_id, - ); + match self.get_run_id() { + Ok(run_id) => { + crate::client::ro_crate_utils::create_software_entities( + &self.config, + self.workflow_id, + run_id, + ); + } + Err(e) => { + warn!( + "Skipping RO-Crate software entity creation for workflow {}: failed to get run_id: {}", + self.workflow_id, e + ); + } + } Ok(()) } @@ -727,6 +736,23 @@ impl WorkflowManager { self.workflow_id ); + let run_id = match self.get_run_id() { + Ok(run_id) => run_id, + Err(e) => { + warn!( + "Skipping RO-Crate workflow provenance creation for workflow {}: failed to get run_id: {}", + self.workflow_id, e + ); + return; + } + }; + crate::client::ro_crate_utils::create_workflow_provenance_entities( + &self.config, + self.workflow_id, + run_id, + &workflow.name, + ); + // Collect all files with st_mtime set (input files) let params = FileListParams::new(); let files_iterator = iter_files(&self.config, self.workflow_id, params); @@ -752,11 +778,9 @@ impl WorkflowManager { "Creating RO-Crate entities for {} input files", input_files.len() ); - let run_id = self.get_run_id().unwrap_or(0); crate::client::ro_crate_utils::create_entities_for_input_files( &self.config, self.workflow_id, - run_id, &input_files, ); } diff --git a/src/lib.rs b/src/lib.rs index 6adedcd27..68fd3f506 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,7 @@ pub mod api_version; pub mod memory_utils; pub mod models; pub mod network_utils; +pub mod ro_crate_json_ld; pub mod time_utils; // Configuration module (requires config feature, enabled by client) diff --git a/src/ro_crate_json_ld.rs b/src/ro_crate_json_ld.rs new file mode 100644 index 000000000..593b11c7c --- /dev/null +++ b/src/ro_crate_json_ld.rs @@ -0,0 +1,3 @@ +pub fn typed_entity(primary_type: &str, prov_type: &str) -> serde_json::Value { + serde_json::json!([primary_type, prov_type]) +} diff --git a/src/server/api/ro_crate.rs b/src/server/api/ro_crate.rs index a6869024b..ea3d2825b 100644 --- a/src/server/api/ro_crate.rs +++ b/src/server/api/ro_crate.rs @@ -13,6 +13,7 @@ use crate::server::api_responses::{ }; use crate::models; +use crate::ro_crate_json_ld::typed_entity; use super::{ApiContext, MAX_RECORD_TRANSFER_COUNT, SqlQueryBuilder, database_error_with_msg}; @@ -149,14 +150,6 @@ impl RoCrateApiImpl { /// /// This is called during `initialize_jobs` when `enable_ro_crate` is true. pub async fn create_entities_for_input_files(&self, workflow_id: i64) -> Result { - // Get the current run_id from workflow - let run_id: i64 = - sqlx::query_scalar!("SELECT run_id FROM workflow WHERE id = $1", workflow_id,) - .fetch_optional(self.context.pool.as_ref()) - .await - .map_err(|e| database_error_with_msg(e, "Failed to get workflow run_id"))? - .unwrap_or(0); - // Get all files with st_mtime set (input files) let input_files = match sqlx::query!( r#" @@ -179,8 +172,12 @@ impl RoCrateApiImpl { }; // Get existing RO-Crate entities by file_id for upsert - let existing_entities: std::collections::HashMap = match sqlx::query!( - r#"SELECT id, file_id FROM ro_crate_entity WHERE workflow_id = $1 AND file_id IS NOT NULL"#, + let existing_entities: std::collections::HashMap = match sqlx::query!( + r#" + SELECT id, file_id, metadata + FROM ro_crate_entity + WHERE workflow_id = $1 AND file_id IS NOT NULL + "#, workflow_id ) .fetch_all(self.context.pool.as_ref()) @@ -188,7 +185,7 @@ impl RoCrateApiImpl { { Ok(rows) => rows .into_iter() - .filter_map(|r| r.file_id.map(|fid| (fid, r.id))) + .filter_map(|r| r.file_id.map(|fid| (fid, (r.id, r.metadata)))) .collect(), Err(e) => { return Err(super::database_error_with_msg( @@ -212,14 +209,24 @@ impl RoCrateApiImpl { .map(|s| s.to_string_lossy().to_string()) .unwrap_or_else(|| file.path.clone()); - // Build metadata JSON - let mut metadata = serde_json::json!({ - "@id": file.path, - "@type": "File", - "name": basename, - "encodingFormat": mime_type, - "torc:run_id": run_id - }); + // The server cannot inspect workflow files on disk. Preserve any richer + // client-provided metadata already stored for this entity and only refresh + // fields derivable from database state. + let mut metadata = existing_entities + .get(&file.id) + .and_then(|(_, existing_metadata)| { + serde_json::from_str::(existing_metadata).ok() + }) + .unwrap_or_else(|| serde_json::json!({})); + + if !metadata.is_object() { + metadata = serde_json::json!({}); + } + + metadata["@id"] = serde_json::json!(file.path); + metadata["@type"] = typed_entity("File", "prov:Entity"); + metadata["name"] = serde_json::json!(basename); + metadata["encodingFormat"] = serde_json::json!(mime_type); // Add dateModified if st_mtime is available if let Some(st_mtime) = file.st_mtime @@ -232,7 +239,7 @@ impl RoCrateApiImpl { let metadata_str = metadata.to_string(); // Update existing entity or create new one - let result = if let Some(&entity_db_id) = existing_entities.get(&file.id) { + let result = if let Some(&(entity_db_id, _)) = existing_entities.get(&file.id) { sqlx::query!( r#" UPDATE ro_crate_entity SET metadata = $1 WHERE id = $2 @@ -245,7 +252,8 @@ impl RoCrateApiImpl { } else { sqlx::query!( r#" - INSERT INTO ro_crate_entity (workflow_id, file_id, entity_id, entity_type, metadata) + INSERT INTO ro_crate_entity + (workflow_id, file_id, entity_id, entity_type, metadata) VALUES ($1, $2, $3, $4, $5) "#, workflow_id, @@ -327,11 +335,10 @@ impl RoCrateApiImpl { let metadata = serde_json::json!({ "@id": entity_id, - "@type": "SoftwareApplication", + "@type": typed_entity("SoftwareApplication", "prov:SoftwareAgent"), "name": "torc-server", "version": version, "url": exe_path, - "torc:run_id": run_id, "torc:git_hash": GIT_HASH, }); @@ -353,7 +360,8 @@ impl RoCrateApiImpl { { Ok(_) => { debug!( - "Created SoftwareApplication entity for torc-server version={} (workflow_id={})", + "Created SoftwareApplication entity for torc-server version={} \ + (workflow_id={})", version, workflow_id ); } diff --git a/tests/test_auto_ro_crate.rs b/tests/test_auto_ro_crate.rs index 8abf76994..f6afeaf91 100644 --- a/tests/test_auto_ro_crate.rs +++ b/tests/test_auto_ro_crate.rs @@ -14,6 +14,10 @@ use std::path::Path; use torc::client::apis; use torc::models; +fn is_job_create_action(entity: &models::RoCrateEntityModel) -> bool { + entity.entity_type == "CreateAction" && entity.entity_id.starts_with("#job-") +} + /// Create a simple workflow with enable_ro_crate enabled. /// Returns (workflow_id, input_file_id, output_file_id, job_id) fn create_ro_crate_enabled_workflow( @@ -251,7 +255,8 @@ fn test_auto_ro_crate_input_files_on_initialize(start_server: &ServerProcess) { // Parse and verify metadata let metadata: serde_json::Value = serde_json::from_str(&entity.metadata).expect("Failed to parse entity metadata"); - assert_eq!(metadata["@type"], "File"); + assert!(metadata["@type"].is_array(), "Should have array @type"); + assert_eq!(metadata["@type"][0], "File"); assert!( metadata["encodingFormat"].as_str().is_some(), "Should have encodingFormat" @@ -342,14 +347,44 @@ fn test_auto_ro_crate_output_files_on_job_completion(start_server: &ServerProces // Parse and verify metadata includes provenance let metadata: serde_json::Value = serde_json::from_str(&entity.metadata).expect("Failed to parse entity metadata"); - assert_eq!(metadata["@type"], "File"); + assert!(metadata["@type"].is_array(), "Should have array @type"); + assert_eq!(metadata["@type"][0], "File"); + assert!( + metadata["prov:wasGeneratedBy"].is_object(), + "Output file entity should have prov:wasGeneratedBy for provenance" + ); + assert_eq!( + metadata["prov:wasGeneratedBy"]["@id"], + format!("#job-{}-attempt-1", job_id), + ); + assert_eq!(metadata["prov:wasAttributedTo"]["@id"], "#torc-run-0"); + + let workflow_plan = items.iter().find(|e| e.entity_id == "#torc-workflow"); + assert!( + workflow_plan.is_some(), + "Should have a synthetic workflow plan entity" + ); + + let workflow_run = items.iter().find(|e| e.entity_id == "#torc-run-0"); + assert!( + workflow_run.is_some(), + "Should have a synthetic workflow run entity" + ); + let workflow_run_metadata: serde_json::Value = + serde_json::from_str(&workflow_run.unwrap().metadata) + .expect("Failed to parse workflow run metadata"); + assert_eq!(workflow_run_metadata["@type"][0], "CreateAction"); + assert_eq!( + workflow_run_metadata["name"], + "test_auto_ro_crate_workflow Run 0" + ); assert!( - metadata["wasGeneratedBy"].is_object(), - "Output file entity should have wasGeneratedBy for provenance" + workflow_run_metadata["prov:hadPlan"].is_object(), + "Workflow run should have prov:hadPlan" ); - // Find the CreateAction entity - let create_action = items.iter().find(|e| e.entity_type == "CreateAction"); + // Find the job CreateAction entity + let create_action = items.iter().find(|e| is_job_create_action(e)); assert!( create_action.is_some(), "Should have a CreateAction entity for job provenance" @@ -358,12 +393,20 @@ fn test_auto_ro_crate_output_files_on_job_completion(start_server: &ServerProces let action = create_action.unwrap(); let action_metadata: serde_json::Value = serde_json::from_str(&action.metadata).expect("Failed to parse CreateAction metadata"); - assert_eq!(action_metadata["@type"], "CreateAction"); + assert!( + action_metadata["@type"].is_array(), + "CreateAction should have array @type" + ); + assert_eq!(action_metadata["@type"][0], "CreateAction"); assert_eq!(action_metadata["name"], "process"); assert!( action_metadata["result"].is_array(), "CreateAction should have result array" ); + assert!( + action_metadata["prov:hadPlan"].is_object(), + "CreateAction should have prov:hadPlan" + ); } #[rstest] @@ -561,19 +604,41 @@ fn test_auto_ro_crate_diamond_workflow(start_server: &ServerProcess) { // Verify CreateAction entities exist let create_actions: Vec<_> = final_items .iter() - .filter(|e| e.entity_type == "CreateAction") + .filter(|e| is_job_create_action(e)) .collect(); assert!( - create_actions.len() >= 2, - "Should have CreateAction entities for each job. Found: {}", + create_actions.len() == 2, + "Should have one job CreateAction per job. Found: {}", create_actions.len() ); + let workflow_run = final_items.iter().find(|e| e.entity_id == "#torc-run-0"); + assert!( + workflow_run.is_some(), + "Should have a synthetic workflow run entity" + ); + let workflow_run_metadata: serde_json::Value = + serde_json::from_str(&workflow_run.unwrap().metadata) + .expect("Failed to parse workflow run metadata"); + assert_eq!(workflow_run_metadata["@type"][0], "CreateAction"); + assert_eq!( + workflow_run_metadata["name"], + "test_diamond_ro_crate_workflow Run 0" + ); + assert!( + workflow_run_metadata["prov:hadPlan"].is_object(), + "Workflow run should have prov:hadPlan" + ); + // Verify CreateAction metadata for action in create_actions { let metadata: serde_json::Value = serde_json::from_str(&action.metadata).expect("Failed to parse CreateAction metadata"); - assert_eq!(metadata["@type"], "CreateAction"); + assert!( + metadata["@type"].is_array(), + "CreateAction should have array @type" + ); + assert_eq!(metadata["@type"][0], "CreateAction"); assert!( metadata["name"].as_str().is_some(), "CreateAction should have name" @@ -657,16 +722,12 @@ fn test_auto_ro_crate_second_run_replaces_entities(start_server: &ServerProcess) "Should have File entities after first run" ); - // Verify run_id=0 in file entity metadata + // Verify the first run captured a hash for the input entity let input_entity_run1 = items_run1 .iter() .find(|e| e.file_id == Some(input_file_id)) .expect("Should have input file entity"); let meta_run1: serde_json::Value = serde_json::from_str(&input_entity_run1.metadata).unwrap(); - assert_eq!( - meta_run1["torc:run_id"], 0, - "First run should have run_id=0" - ); // Get the SHA256 of the input file from the first run let input_sha_run1 = meta_run1["sha256"].as_str().map(|s| s.to_string()); @@ -743,16 +804,12 @@ fn test_auto_ro_crate_second_run_replaces_entities(start_server: &ServerProcess) software_entities_run2.len() ); - // Verify the input file entity now has run_id=1 + // Verify the input file entity still exists after the second run let input_entity_run2 = items_run2 .iter() .find(|e| e.file_id == Some(input_file_id)) .expect("Should still have input file entity"); let meta_run2: serde_json::Value = serde_json::from_str(&input_entity_run2.metadata).unwrap(); - assert_eq!( - meta_run2["torc:run_id"], 1, - "Second run should have run_id=1 in input file entity" - ); // Verify the SHA256 changed (input file was modified) let input_sha_run2 = meta_run2["sha256"].as_str().map(|s| s.to_string()); @@ -763,19 +820,19 @@ fn test_auto_ro_crate_second_run_replaces_entities(start_server: &ServerProcess) ); } - // Verify the output file entity also has run_id=1 + // Verify the output file entity was refreshed for the second run let output_entity_run2 = items_run2 .iter() .find(|e| e.file_id == Some(output_file_id)) .expect("Should still have output file entity"); let output_meta_run2: serde_json::Value = serde_json::from_str(&output_entity_run2.metadata).unwrap(); - assert_eq!( - output_meta_run2["torc:run_id"], 1, - "Second run should have run_id=1 in output file entity" - ); assert!( - output_meta_run2["wasGeneratedBy"].is_object(), - "Output file entity should still have wasGeneratedBy provenance" + output_meta_run2["prov:wasGeneratedBy"].is_object(), + "Output file entity should still have prov:wasGeneratedBy provenance" + ); + assert_eq!( + output_meta_run2["prov:wasAttributedTo"]["@id"], "#torc-run-1", + "Output file should be attributed to the second workflow run" ); } diff --git a/tests/test_workflow_export.rs b/tests/test_workflow_export.rs index b0a81c6d0..d8b268045 100644 --- a/tests/test_workflow_export.rs +++ b/tests/test_workflow_export.rs @@ -664,7 +664,7 @@ fn test_import_id_remapping(start_server: &ServerProcess) { /// /// This tests: /// - CreateAction entity_id (e.g., #job-42-attempt-1) is remapped to new job ID -/// - File entity metadata with wasGeneratedBy is remapped to new job ID +/// - File entity metadata with prov:wasGeneratedBy is remapped to new job ID /// - file_id references in entities are also remapped #[rstest] fn test_export_import_ro_crate_job_id_remapping(start_server: &ServerProcess) { @@ -701,7 +701,7 @@ fn test_export_import_ro_crate_job_id_remapping(start_server: &ServerProcess) { let create_action_entity_id = format!("#job-{}-attempt-1", job_id); let create_action_metadata = serde_json::json!({ "@id": create_action_entity_id, - "@type": "CreateAction", + "@type": ["CreateAction", "prov:Activity"], "name": "process_data", "instrument": { "@id": format!("#workflow-{}", workflow_id) }, "result": [{ "@id": "data/output.csv" }] @@ -717,13 +717,13 @@ fn test_export_import_ro_crate_job_id_remapping(start_server: &ServerProcess) { apis::ro_crate_api::create_ro_crate_entity(config, create_action) .expect("Failed to create CreateAction entity"); - // 2. File entity with wasGeneratedBy reference to job + // 2. File entity with prov:wasGeneratedBy reference to job let file_metadata = serde_json::json!({ "@id": "data/output.csv", - "@type": "File", + "@type": ["File", "prov:Entity"], "name": "output.csv", "encodingFormat": "text/csv", - "wasGeneratedBy": { "@id": format!("#job-{}-attempt-1", job_id) } + "prov:wasGeneratedBy": { "@id": format!("#job-{}-attempt-1", job_id) } }); let file_entity = torc::models::RoCrateEntityModel { id: None, @@ -827,7 +827,7 @@ fn test_export_import_ro_crate_job_id_remapping(start_server: &ServerProcess) { "CreateAction @id in metadata should be remapped" ); - // Find the File entity and verify its wasGeneratedBy was remapped + // Find the File entity and verify its prov:wasGeneratedBy was remapped let file_entity = imported_entities .iter() .find(|e| e.entity_type == "File") @@ -836,8 +836,8 @@ fn test_export_import_ro_crate_job_id_remapping(start_server: &ServerProcess) { let file_metadata: Value = serde_json::from_str(&file_entity.metadata).expect("Failed to parse File metadata"); assert_eq!( - file_metadata["wasGeneratedBy"]["@id"], expected_new_entity_id, - "File wasGeneratedBy should reference the new job ID" + file_metadata["prov:wasGeneratedBy"]["@id"], expected_new_entity_id, + "File prov:wasGeneratedBy should reference the new job ID" ); // Verify file_id was also remapped