Skip to content

Commit

Permalink
Issue #1057 wrap cwl_content to support multiple source types
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Feb 19, 2025
1 parent 5828b93 commit 684b423
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 12 deletions.
12 changes: 6 additions & 6 deletions openeogeotrellis/deploy/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def _cwl_demo(args: ProcessArgs, env: EvalEnv):
# TODO: move this imports to top-level?
import kubernetes.config

from openeogeotrellis.integrations.calrissian import CalrissianJobLauncher
from openeogeotrellis.integrations.calrissian import CalrissianJobLauncher, CwLSource

# TODO: better place to load this config?
if os.path.exists(SERVICE_TOKEN_FILENAME):
Expand All @@ -152,7 +152,7 @@ def _cwl_demo(args: ProcessArgs, env: EvalEnv):

launcher = CalrissianJobLauncher.from_context()

cwl_content = textwrap.dedent(
cwl_source = CwLSource.from_string(
"""
cwlVersion: v1.0
class: CommandLineTool
Expand Down Expand Up @@ -181,7 +181,7 @@ def _cwl_demo(args: ProcessArgs, env: EvalEnv):
]

results = launcher.run_cwl_workflow(
cwl_content=cwl_content,
cwl_source=cwl_source,
cwl_arguments=cwl_arguments,
output_paths=["output.txt"],
)
Expand Down Expand Up @@ -218,7 +218,7 @@ def _cwl_insar(args: ProcessArgs, env: EvalEnv):
# TODO: move this imports to top-level?
import kubernetes.config

from openeogeotrellis.integrations.calrissian import CalrissianJobLauncher
from openeogeotrellis.integrations.calrissian import CalrissianJobLauncher, CwLSource

# TODO: better place to load this config?
if os.path.exists(SERVICE_TOKEN_FILENAME):
Expand All @@ -228,7 +228,7 @@ def _cwl_insar(args: ProcessArgs, env: EvalEnv):

launcher = CalrissianJobLauncher.from_context()

cwl_content = textwrap.dedent(
cwl_source = CwLSource.from_string(
f"""
cwlVersion: v1.0
class: CommandLineTool
Expand Down Expand Up @@ -260,7 +260,7 @@ def _cwl_insar(args: ProcessArgs, env: EvalEnv):

# TODO: Load the results as datacube with load_stac.
results = launcher.run_cwl_workflow(
cwl_content=cwl_content,
cwl_source=cwl_source,
cwl_arguments=cwl_arguments,
output_paths=["output.txt"],
)
Expand Down
46 changes: 42 additions & 4 deletions openeogeotrellis/integrations/calrissian.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from __future__ import annotations

import textwrap

import base64
import dataclasses
import logging
import requests
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union

Expand Down Expand Up @@ -42,6 +45,39 @@ def read(self, encoding: Union[None, str] = None) -> Union[bytes, str]:
return content


class CwLSource:
"""
Simple container of CWL source code.
For now, this is just a wrapper around a string (the CWL content),
with factories to support multiple sources (local path, URL, ...).
When necessary, this simple abstraction can be evolved easily in something more sophisticated.
"""

def __init__(self, content: str):
self._cwl = content

def get_content(self) -> str:
return self._cwl

@classmethod
def from_string(cls, content: str, auto_dedent: bool = True) -> CwLSource:
if auto_dedent:
content = textwrap.dedent(content)
return cls(content=content)

@classmethod
def from_path(cls, path: Union[str, Path]) -> CwLSource:
with Path(path).open(mode="r", encoding="utf-8") as f:
return cls(content=f.read())

@classmethod
def from_url(cls, url: str) -> CwLSource:
resp = requests.get(url)
resp.raise_for_status()
return cls(content=resp.text)


class CalrissianJobLauncher:
"""
Helper class to launch a Calrissian job on Kubernetes.
Expand Down Expand Up @@ -97,15 +133,17 @@ def _build_unique_name(self, infix: str) -> str:
suffix = generate_unique_id(date_prefix=False)[:8]
return f"{self._name_base}-{infix}-{suffix}"

def create_input_staging_job_manifest(self, cwl_content: str) -> Tuple[kubernetes.client.V1Job, str]:
def create_input_staging_job_manifest(self, cwl_source: CwLSource) -> Tuple[kubernetes.client.V1Job, str]:
"""
Create a k8s manifest for a Calrissian input staging job.
:param cwl_content: CWL content as a string to dump to CWL file in the input volume.
:param cwl_source: CWL source to dump to CWL file in the input volume.
:return: Tuple of
- k8s job manifest
- path to the CWL file in the input volume.
"""
cwl_content = cwl_source.get_content()

name = self._build_unique_name(infix="cal-inp")
_log.info(f"Creating input staging job manifest: {name=}")
yaml_parsed = list(yaml.safe_load_all(cwl_content))
Expand Down Expand Up @@ -315,7 +353,7 @@ def get_output_volume_name(self) -> str:
return volume_name

def run_cwl_workflow(
self, cwl_content: str, cwl_arguments: List[str], output_paths: List[str]
self, cwl_source: CwLSource, cwl_arguments: List[str], output_paths: List[str]
) -> Dict[str, CalrissianS3Result]:
"""
Run a CWL workflow on Calrissian and return the output as a string.
Expand All @@ -325,7 +363,7 @@ def run_cwl_workflow(
:return: output of the CWL workflow as a string.
"""
# Input staging
input_staging_manifest, cwl_path = self.create_input_staging_job_manifest(cwl_content=cwl_content)
input_staging_manifest, cwl_path = self.create_input_staging_job_manifest(cwl_source=cwl_source)
input_staging_job = self.launch_job_and_wait(manifest=input_staging_manifest)

# CWL job
Expand Down
40 changes: 38 additions & 2 deletions tests/integrations/test_calrissian.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from openeogeotrellis.integrations.calrissian import (
CalrissianJobLauncher,
CalrissianS3Result,
CwLSource,
)


Expand All @@ -29,7 +30,9 @@ class TestCalrissianJobLauncher:
def test_create_input_staging_job_manifest(self, generate_unique_id_mock):
launcher = CalrissianJobLauncher(namespace=self.NAMESPACE, name_base="r-1234")

manifest, cwl_path = launcher.create_input_staging_job_manifest(cwl_content="class: Dummy")
manifest, cwl_path = launcher.create_input_staging_job_manifest(
cwl_source=CwLSource.from_string("class: Dummy")
)

assert cwl_path == "/calrissian/input-data/r-1234-cal-inp-01234567.cwl"

Expand Down Expand Up @@ -233,7 +236,9 @@ def test_launch_job_and_wait_basic(self, k8s_batch_api, caplog):
def test_run_cwl_workflow_basic(self, k8_pvc_api, k8s_batch_api, generate_unique_id_mock, caplog):
launcher = CalrissianJobLauncher(namespace=self.NAMESPACE, name_base="r-456", s3_bucket="test-bucket")
res = launcher.run_cwl_workflow(
cwl_content="class: Dummy", cwl_arguments=["--message", "Howdy Earth!"], output_paths=["output.txt"]
cwl_source=CwLSource.from_string("class: Dummy"),
cwl_arguments=["--message", "Howdy Earth!"],
output_paths=["output.txt"],
)
assert res == {
"output.txt": CalrissianS3Result(
Expand Down Expand Up @@ -263,3 +268,34 @@ def test_read_encoding(self, s3_output):
bucket, key = s3_output
result = CalrissianS3Result(s3_bucket=bucket, s3_key=key)
assert result.read(encoding="utf-8") == "Howdy, Earth!"


class TestCwlSource:
def test_from_string(self):
content = "cwlVersion: v1.0\nclass: CommandLineTool\n"
cwl = CwLSource.from_string(content=content)
assert cwl.get_content() == "cwlVersion: v1.0\nclass: CommandLineTool\n"

def test_from_string_auto_dedent(self):
content = """
cwlVersion: v1.0
class: CommandLineTool
inputs:
message:
type: string
"""
cwl = CwLSource.from_string(content=content)
expected = "\ncwlVersion: v1.0\nclass: CommandLineTool\ninputs:\n message:\n type: string\n"
assert cwl.get_content() == expected

def test_from_path(self, tmp_path):
path = tmp_path / "dummy.cwl"
path.write_text("cwlVersion: v1.0\nclass: CommandLineTool\n")
cwl = CwLSource.from_path(path=path)
assert cwl.get_content() == "cwlVersion: v1.0\nclass: CommandLineTool\n"

def test_from_url(self, requests_mock):
url = "https://example.com/dummy.cwl"
requests_mock.get(url, text="cwlVersion: v1.0\nclass: CommandLineTool\n")
cwl = CwLSource.from_url(url=url)
assert cwl.get_content() == "cwlVersion: v1.0\nclass: CommandLineTool\n"

0 comments on commit 684b423

Please sign in to comment.