Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0
rev: v6.0.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
Expand All @@ -14,14 +14,14 @@ repos:
- --maxkb=1024
- repo: https://github.com/astral-sh/uv-pre-commit
# uv version.
rev: 0.6.14
rev: 0.9.5
hooks:
# Keep uv.lock up to date.
- id: uv-lock

- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.11.5
rev: v0.14.1
hooks:
# Run the linter.
- id: ruff
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies = [
"click",
"psutil",
"pytz",
"typer>=0.15.2",
]

[project.urls]
Expand Down Expand Up @@ -92,7 +93,7 @@ exclude = ["*~"]
[dependency-groups]
dev = [
"boto3-stubs[s3,swf]",
"cffi==v1.17.0rc1; python_full_version=='3.13.0b4'", # via cryptography via moto, secretstorage
"cffi==v1.17.1; python_full_version=='3.13.0b4'", # via cryptography via moto, secretstorage
"flaky",
"hatch",
"invoke",
Expand Down
2 changes: 1 addition & 1 deletion simpleflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from .activity import Activity # NOQA
from .runtime import logger # NOQA
from .signal import WaitForSignal # NOQA
from .simpleflow_signal import WaitForSignal # NOQA
from .workflow import Workflow # NOQA

__version__ = "0.34.1"
Expand Down
Empty file added simpleflow/cli/__init__.py
Empty file.
34 changes: 34 additions & 0 deletions simpleflow/cli/decider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

import typer
from typing_extensions import Annotated

from simpleflow.swf.process.decider import command

app = typer.Typer(no_args_is_help=True)


@app.command()
def start(
ctx: typer.Context,
workflows: Annotated[list[str] | None, typer.Argument()] = None,
*,
domain: Annotated[str, typer.Option(envvar="SWF_DOMAIN")],
task_list: Annotated[str, typer.Option("--task-list", "-t")] | None = None,
nb_processes: Annotated[int, typer.Option("--nb-processes", "-n")] | None = None,
):
"""
Start a decider.
"""
if not workflows and not task_list:
raise typer.BadParameter("workflows or task_list is required")
command.start(
workflows=workflows or [],
domain=domain,
task_list=task_list,
nb_processes=nb_processes,
)


if __name__ == "__main__":
app()
235 changes: 235 additions & 0 deletions simpleflow/cli/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
from __future__ import annotations

from datetime import datetime
from enum import Enum
from typing import Any

import typer
from typing_extensions import Annotated

from simpleflow import Workflow, format
from simpleflow.command import get_progression_callback, get_workflow_type, with_format
from simpleflow.history import History
from simpleflow.swf import helpers
from simpleflow.swf.mapper.models import WorkflowExecution
from simpleflow.swf.utils import set_workflow_class_name
from simpleflow.utils import import_from_module, json_dumps


class Status(str, Enum):
open = "open"
closed = "closed"


class CloseStatus(str, Enum):
completed = "completed"
failed = "failed"
canceled = "canceled"
terminated = "terminated"
continued_as_new = "continued_as_new"


class OutputFormat(str, Enum):
events = "events"
raw = "raw"
cooked = "cooked"
cooked_alt = "cooked_alt"


app = typer.Typer(no_args_is_help=True)

TIMESTAMP_FORMATS = [
"%Y-%m-%d",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%d %H:%M:%S%z",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d %H:%M:%S",
]


@app.command()
def filter(
ctx: typer.Context,
domain: Annotated[str, typer.Option(envvar="SWF_DOMAIN")],
status: Annotated[Status, typer.Option("--status", "-s")] = Status.open,
tag: str | None = None,
workflow_id: str | None = None,
workflow_type: str | None = None,
workflow_type_version: str | None = None,
close_status: CloseStatus | None = None,
started_since: Annotated[int, typer.Option("--started-since", "-n")] = 1,
from_date: Annotated[datetime, typer.Option(formats=TIMESTAMP_FORMATS)] | None = None,
to_date: Annotated[datetime, typer.Option(formats=TIMESTAMP_FORMATS)] | None = None,
):
"""
Filter workflow executions.
"""
status = status.upper()
kwargs: dict[str, Any] = {}
if status == WorkflowExecution.STATUS_OPEN:
if from_date:
kwargs["oldest_date"] = from_date
kwargs["latest_date"] = to_date
else:
kwargs["oldest_date"] = started_since
else:
if from_date:
kwargs["start_oldest_date"] = from_date
kwargs["start_latest_date"] = to_date
else:
kwargs["start_oldest_date"] = started_since

if close_status and status != WorkflowExecution.STATUS_CLOSED:
raise Exception("Closed status not supported for non-closed workflows.")
elif close_status:
kwargs["close_status"] = close_status.upper()

print(
with_format(ctx.parent)(helpers.filter_workflow_executions)(
domain,
status=status,
tag=tag,
workflow_id=workflow_id,
workflow_type_name=workflow_type,
workflow_type_version=workflow_type_version,
callback=get_progression_callback("executionInfos"),
**kwargs,
)
)


@app.command()
def start(
ctx: typer.Context,
workflow: str,
domain: Annotated[str, typer.Option(envvar="SWF_DOMAIN")],
input: Annotated[str, typer.Option("--input", "-i", help="input JSON")] | None = None,
):
"""
Start a workflow.
"""
workflow_class: type[Workflow] = import_from_module(workflow)
wf_input: dict[str, Any] = {}
if input is not None:
json_input = format.decode(input)
if isinstance(json_input, list):
wf_input = {"args": json_input, "kwargs": {}}
elif isinstance(json_input, dict) and ("args" not in json_input or "kwargs" not in json_input):
wf_input = {"args": [], "kwargs": json_input}
else:
wf_input = json_input
workflow_type = get_workflow_type(domain, workflow_class)
set_workflow_class_name(wf_input, workflow_class)
get_task_list = getattr(workflow_class, "get_task_list", None)
if get_task_list:
if not callable(get_task_list):
raise Exception("get_task_list must be a callable")
if isinstance(wf_input, dict):
args = wf_input.get("args", [])
kwargs = wf_input.get("kwargs", {})
else:
args = []
kwargs = wf_input
task_list = get_task_list(workflow_class, *args, **kwargs)
else:
task_list = workflow_class.task_list
execution = workflow_type.start_execution(
# workflow_id=workflow_id,
task_list=task_list,
# execution_timeout=execution_timeout,
input=wf_input,
# tag_list=tags,
# decision_tasks_timeout=decision_tasks_timeout,
)

def get_infos():
return ["workflow_id", "run_id"], [[execution.workflow_id, execution.run_id]]

print(with_format(ctx.parent)(get_infos)())


_NOTSET = object()


@app.command()
def history(
ctx: typer.Context,
domain: Annotated[str, typer.Option(envvar="SWF_DOMAIN")],
workflow_id: str,
run_id: str | None = None,
output_format: Annotated[OutputFormat, typer.Option("--output-format", "--of")] = OutputFormat.events,
reverse_order: bool = False,
):
# print(ctx)
# format = ctx.parent.parent.params.get("format")
# print(format)
from simpleflow.swf.mapper.models.history.base import History as BaseHistory

ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
if not ex:
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
ctx.exit(1)
events = ex.history_events(
callback=get_progression_callback("events"),
reverse_order=reverse_order,
)
if output_format == OutputFormat.events:
pass
else:
raw_history = BaseHistory.from_event_list(events)
history = History(raw_history)
if output_format == OutputFormat.raw:
events = []
for event in history.events:
e = {}
for k in ["id", "type", "state", "timestamp", "input", "control", *event.__dict__]:
if k.startswith("_") or k == "raw":
continue
v = getattr(event, k, _NOTSET)
if v is _NOTSET:
continue
e[k] = v
events.append(e)
elif output_format == OutputFormat.cooked:
history.parse()
events = {
"workflow": history.workflow,
"activities": history.activities,
"child_workflows": history.child_workflows,
"markers": history.markers,
"timers": history.timers,
"signals": history.signals,
"signal_lists": history.signal_lists,
"external_workflows_signaling": history.external_workflows_signaling,
"signaled_workflows": history.signaled_workflows,
}
elif output_format == OutputFormat.cooked_alt:
history.parse()
events = {
"workflow": [t for t in history.tasks if t.type == "child_workflow"],
"activities": [t for t in history.tasks if t.type == "activity"],
"child_workflows": history.child_workflows,
"markers": history.markers,
"timers": history.timers,
"signals": [t for t in history.tasks if t.type == "signal"],
"signal_lists": history.signal_lists,
"external_workflows_signaling": history.external_workflows_signaling,
"signaled_workflows": history.signaled_workflows,
}
else:
raise NotImplementedError
print(json_dumps(events))


if __name__ == "__main__":
# from click.core import Command
#
# parent = typer.Context(command=Command(name="main"))
# parent.params["format"] = "json"
# filter(
# ctx=typer.Context(
# command=Command(name="filter"), parent=typer.Context(command=Command(name="main"), parent=parent)
# ),
# domain="TestDomain",
# )
app()
24 changes: 21 additions & 3 deletions simpleflow/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,14 @@ def restart_workflow(domain: str, workflow_id: str, run_id: str | None):


def with_format(ctx):
with_header = ctx.parent.params.get("header")
fmt = ctx.parent.params.get("format") or pretty.DEFAULT_FORMAT
if fmt == "prettyjson":
with_header = True
fmt = "json"
return pretty.formatted(
with_header=ctx.parent.params["header"],
fmt=ctx.parent.params["format"] or pretty.DEFAULT_FORMAT,
with_header=with_header,
fmt=fmt,
)


Expand Down Expand Up @@ -430,7 +435,7 @@ def workflow_history(
history = History(raw_history)
if output_format == "raw":
events = []
for event in history.events[:10]:
for event in history.events:
e = {}
for k in ["id", "type", "state", "timestamp", "input", "control", *event.__dict__]:
if k.startswith("_") or k == "raw":
Expand All @@ -453,6 +458,19 @@ def workflow_history(
"external_workflows_signaling": history.external_workflows_signaling,
"signaled_workflows": history.signaled_workflows,
}
elif output_format == "cooked2":
history.parse()
events = {
"workflow": [t for t in history.tasks if t.type == "child_workflow"],
"activities": [t for t in history.tasks if t.type == "activity"],
"child_workflows": history.child_workflows,
"markers": history.markers,
"timers": history.timers,
"signals": [t for t in history.tasks if t.type == "signal"],
"signal_lists": history.signal_lists,
"external_workflows_signaling": history.external_workflows_signaling,
"signaled_workflows": history.signaled_workflows,
}
else:
raise NotImplementedError
print(json.dumps(events, separators=(",", ":"), default=serialize_complex_object))
Expand Down
Loading
Loading