Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0
rev: v6.0.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
Expand All @@ -14,14 +14,14 @@ repos:
- --maxkb=1024
- repo: https://github.com/astral-sh/uv-pre-commit
# uv version.
rev: 0.6.14
rev: 0.9.5
hooks:
# Keep uv.lock up to date.
- id: uv-lock

- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.11.5
rev: v0.14.1
hooks:
# Run the linter.
- id: ruff
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ dependencies = [
"multiprocess>=0.70.14",
"lazy_object_proxy",
"lockfile>=0.9.1",
"setproctitle; sys_platform != 'darwin'",
"tabulate>=0.8.2,<1.0.0",
"setproctitle",
"typing-extensions",
"click",
"psutil",
"pytz",
Expand Down Expand Up @@ -106,7 +107,6 @@ dev = [
"sphinx",
"sphinx_rtd_theme",
"sure",
"typing-extensions",
"vcrpy",
"twine",
]
Expand Down
13 changes: 13 additions & 0 deletions simpleflow/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,19 @@ def workflow_history(
"external_workflows_signaling": history.external_workflows_signaling,
"signaled_workflows": history.signaled_workflows,
}
elif output_format == "cooked2":
history.parse()
events = {
"workflow": [t for t in history.tasks if t.type == "child_workflow"],
"activities": [t for t in history.tasks if t.type == "activity"],
"child_workflows": history.child_workflows,
"markers": history.markers,
"timers": history.timers,
"signals": [t for t in history.tasks if t.type == "signal"],
"signal_lists": history.signal_lists,
"external_workflows_signaling": history.external_workflows_signaling,
"signaled_workflows": history.signaled_workflows,
}
else:
raise NotImplementedError
print(json.dumps(events, separators=(",", ":"), default=serialize_complex_object))
Expand Down
5 changes: 4 additions & 1 deletion simpleflow/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
if TYPE_CHECKING:
from typing import Any

from typing_extensions import Self

from simpleflow.swf.mapper.models.event.base import Event
from simpleflow.swf.mapper.models.event.marker import MarkerEvent
from simpleflow.swf.mapper.models.event.task import ActivityTaskEvent
Expand Down Expand Up @@ -760,6 +762,7 @@ def parse_timer_event(self, events: list[Event], event: TimerEvent):
timer.update(
{
"state": event.state,
"cause": event.cause,
"cancel_failed_event_id": event.id,
"cancel_failed_event_timestamp": event.timestamp,
}
Expand All @@ -771,7 +774,7 @@ def parse_decision_event(self, events: list[Event], event: Event):
if event.state == "completed":
self.completed_decision_id = event.id

TYPE_TO_PARSER: ClassVar[dict[str, Callable[[History, list[Event], Event], None]]] = {
TYPE_TO_PARSER: ClassVar[dict[str, Callable[[Self, History, list[Event], Event], None]]] = {
"ActivityTask": parse_activity_event,
"DecisionTask": parse_decision_event,
"ChildWorkflowExecution": parse_child_workflow_event,
Expand Down
21 changes: 12 additions & 9 deletions simpleflow/process/_named_mixin.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
from __future__ import annotations

import functools

from setproctitle import setproctitle
import sys

from simpleflow import logger


def with_state(state):
def with_state(state: str):
"""
Decorator used to change the process name when changing state.
:param state: new state
:type state: str
"""

def wrapper(method):
Expand All @@ -36,7 +33,7 @@ class NamedMixin:
method explicitly if not the first parent)
2- decorate your methods with "@with_state("my_state")"

You can optionnally expose some other attributes of your worker by defining
You can optionally expose some other attributes of your worker by defining
the "_named_mixin_properties" attribute to a list or tuple of fields you want
to include in your process title. For instance:

Expand All @@ -52,18 +49,24 @@ def __init__(self, *args, **kwargs):
self.state = kwargs.get("state", "initializing")

@property
def state(self):
def state(self) -> str:
return self._state

@state.setter
def state(self, value):
def state(self, value: str) -> None:
self._state = value
self.set_process_name()

def set_process_name(self):
if sys.platform != "darwin":
from setproctitle import setproctitle
else:
setproctitle = None

klass = self.__class__.__name__
properties = []
for prop in getattr(self, "_named_mixin_properties", []):
properties.append(f"{prop}={getattr(self, prop)}")
name = f"{klass}({', '.join(properties)})"
setproctitle(f"simpleflow {name}[{self.state}]")
if setproctitle:
setproctitle(f"simpleflow {name}[{self.state}]")
4 changes: 2 additions & 2 deletions simpleflow/process/_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def __init__(
self._args = arguments if arguments is not None else ()
self._background = background

self._processes = {}
self._processes: dict[int, psutil.Process] = {}
self._terminating = False

super().__init__()
Expand All @@ -106,7 +106,7 @@ def start(self):

def _cleanup_worker_processes(self):
# cleanup children
to_remove = []
to_remove: list[int] = []
for pid, child in self._processes.items():
try:
name, status = child.name(), child.status()
Expand Down
30 changes: 15 additions & 15 deletions simpleflow/swf/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
from simpleflow.workflow import Workflow

if TYPE_CHECKING:
from typing_extensions import Self

from simpleflow.activity import NotSet
from simpleflow.swf.mapper.models.domain import Domain

__all__ = ["Executor"]
Expand Down Expand Up @@ -199,7 +202,7 @@ def history(self) -> History | None:

def _make_task_id(
self,
a_task: ActivityTask | WorkflowTask,
a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask | TimerTask | CancelTimerTask,
workflow_id: str,
run_id: str,
*args,
Expand Down Expand Up @@ -428,7 +431,7 @@ def _get_future_from_timer_event(self, a_task: TimerTask, event: dict[str, Any])
future = futures.Future()
if not event:
return future
state = event["state"]
state: str = event["state"]
if state == "started":
future.set_running()
elif state == "fired":
Expand All @@ -438,7 +441,7 @@ def _get_future_from_timer_event(self, a_task: TimerTask, event: dict[str, Any])
elif state in ("start_failed", "cancel_failed"):
future.set_exception(
exceptions.TaskFailed(
name=event["timer_id"],
name=event["id"],
reason=event["cause"],
)
)
Expand Down Expand Up @@ -502,7 +505,7 @@ def find_timer_event(self, a_task: TimerTask | CancelTimerTask, history: History
return None
return event

TASK_TYPE_TO_EVENT_FINDER: ClassVar[dict[type, callable]] = {
TASK_TYPE_TO_EVENT_FINDER: ClassVar[dict[type, Callable]] = {
ActivityTask: find_activity_event,
WorkflowTask: find_child_workflow_event,
SignalTask: find_signal_event,
Expand Down Expand Up @@ -756,7 +759,7 @@ def _add_start_timer_decision(self, id, timeout=0):
dict[
str,
Callable[
[ActivityTask | WorkflowTask | SignalTask | MarkerTask, dict[str, Any]],
[Self, ActivityTask | WorkflowTask | SignalTask | MarkerTask, dict[str, Any]],
futures.Future | None,
],
]
Expand All @@ -771,7 +774,7 @@ def _add_start_timer_decision(self, id, timeout=0):

def resume(
self,
a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask,
a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask | TimerTask | CancelTimerTask,
*args,
**kwargs,
) -> futures.Future:
Expand Down Expand Up @@ -843,7 +846,7 @@ def resume(

def make_task_id(
self,
a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask,
a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask | TimerTask | CancelTimerTask,
*args,
**kwargs,
) -> None:
Expand All @@ -857,7 +860,11 @@ def make_task_id(
workflow_id, run_id = self._workflow_id, self._run_id
a_task.id = self._make_task_id(a_task, workflow_id, run_id, *args, **kwargs)

def _compute_priority(self, priority_set_on_submit, a_task):
def _compute_priority(
self,
priority_set_on_submit: str | int | NotSet,
a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask | TimerTask | CancelTimerTask,
) -> str | int | None:
"""
Computes the correct task priority, with the following precedence (first
is better/preferred):
Expand All @@ -866,14 +873,7 @@ def _compute_priority(self, priority_set_on_submit, a_task):
- priority set on the workflow execution
- None otherwise

:param priority_set_on_submit:
:type priority_set_on_submit: str|int|PRIORITY_NOT_SET

:param a_task:
:type a_task: ActivityTask|WorkflowTask

:returns: the priority for this task
:rtype: str|int|None
"""
if priority_set_on_submit is not PRIORITY_NOT_SET:
return priority_set_on_submit
Expand Down
1 change: 1 addition & 0 deletions simpleflow/swf/mapper/models/decision/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from simpleflow.swf.mapper.models.decision.base import Decision # NOQA
from simpleflow.swf.mapper.models.decision.marker import MarkerDecision # NOQA
from simpleflow.swf.mapper.models.decision.task import ActivityTaskDecision # NOQA
from simpleflow.swf.mapper.models.decision.timer import TimerDecision # NOQA
Expand Down
4 changes: 2 additions & 2 deletions simpleflow/swf/mapper/models/event/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,13 @@ def _extract_event_state(cls, event_type: str, event_name: str) -> str:
With event_name = 'StartChildWorkflowExecutionInitiated'
and event_type = 'ChildWorkflowExecution'
left == 'Start'
sep == 'ChildWorkflowExecution'
(separator) == 'ChildWorkflowExecution'
right == 'Initiated'

Returns: 'start_initiated'

"""
left, sep, right = event_name.partition(event_type)
left, _, right = event_name.partition(event_type)
return camel_to_underscore(left + right)


Expand Down
14 changes: 8 additions & 6 deletions simpleflow/swf/mapper/querysets/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ def get(self, name, version, *args, **kwargs):
error_code = extract_error_code(e)
message = extract_message(e)
if error_code == "UnknownResourceFault":
raise DoesNotExistError(message) from e
raise DoesNotExistError(message or "") from e

raise ResponseError(message, error_code=error_code) from e
raise ResponseError(message or "", error_code=error_code or "") from e

wt_info = response[self._infos]
wt_config = response["configuration"]
Expand Down Expand Up @@ -465,12 +465,14 @@ def to_WorkflowExecution(self, domain: Domain, execution_info: dict[str, Any], *
execution_info["workflowType"]["version"],
)

workflow_id: str = get_subkey(execution_info, ["execution", "workflowId"]) # type: ignore
status: str = execution_info.get("executionStatus") # type: ignore
return WorkflowExecution(
domain,
get_subkey(execution_info, ["execution", "workflowId"]), # workflow_id
workflow_id,
run_id=get_subkey(execution_info, ["execution", "runId"]),
workflow_type=workflow_type,
status=execution_info.get("executionStatus"),
status=status,
close_status=execution_info.get("closeStatus"),
tag_list=execution_info.get("tagList"),
start_timestamp=execution_info.get("startTimestamp"),
Expand All @@ -488,9 +490,9 @@ def get(self, workflow_id, run_id, *args, **kwargs):
error_code = extract_error_code(e)
message = extract_message(e)
if error_code == "UnknownResourceFault":
raise DoesNotExistError(message) from e
raise DoesNotExistError(message or "") from e

raise ResponseError(message, error_code=error_code) from e
raise ResponseError(message or "", error_code=error_code or "") from e

execution_info = response[self._infos]
execution_config = response["executionConfiguration"]
Expand Down
Loading
Loading