diff --git a/simpleflow/history.py b/simpleflow/history.py index ccc3dd95d..7e79dfe04 100644 --- a/simpleflow/history.py +++ b/simpleflow/history.py @@ -4,10 +4,9 @@ class History(object): def __init__(self, history): self._history = history - self._activities = collections.defaultdict( - lambda: {'type': 'activity'}) - self._child_workflows = collections.defaultdict( - lambda: {'type': 'child_workflow'}) + self._activities = collections.OrderedDict() + self._child_workflows = collections.OrderedDict() + self._tasks = collections.OrderedDict() @property def events(self): @@ -31,7 +30,9 @@ def get_activity(event): 'scheduled_id': event.id, } if event.activity_id not in self._activities: - self._activities[event.activity_id] = activity + id_ = event.activity_id + self._activities[id_] = activity + self._tasks[id_] = self._activities[id_] else: # When the executor retries a task, it schedules it again. # We have to take care of not overriding some values set by the @@ -40,10 +41,12 @@ def get_activity(event): # corresponds to the last execution. self._activities[event.activity_id].update(activity) elif event.state == 'schedule_failed': - activity = self._activities[event.activity_id] - activity['state'] = event.state - activity['cause'] = event.cause - activity['activity_type'] = event.activity_type.copy() + self._activities[event.activity_id] = { + 'type': 'activity', + 'state': event.state, + 'cause': event.cause, + 'activity_type': event.activity_type.copy(), + } elif event.state == 'started': activity = get_activity(event) activity['state'] = event.state @@ -106,7 +109,9 @@ def get_workflow(event): 'state': event.state, 'initiated_event_id': event.id, } - self._child_workflows[event.workflow_id] = workflow + id_ = event.workflow_id + self._child_workflows[id_] = workflow + self._tasks[id_] = self._child_workflows[id_] elif event.state == 'started': workflow = get_workflow(event) workflow['state'] = event.state diff --git a/simpleflow/swf/executor.py b/simpleflow/swf/executor.py index cf7d20e06..2c2d020cc 100644 --- a/simpleflow/swf/executor.py +++ b/simpleflow/swf/executor.py @@ -273,6 +273,26 @@ def starmap(self, callable, iterable): iterable = executor.get_actual_value(iterable) return super(Executor, self).starmap(callable, iterable) + def merge_previous_execution(self, execution): + previous_history = History(execution.history()) + # Override input with the previous execution value. + self._history.events[0].input = json.dumps( + previous_history.events[0].input.copy(), + ) + + # Override already completed tasks to not execute them again. + previous_history.parse() + self._history._activities.update({ + id_: activity for id_, activity in + previous_history._activities.iteritems() if + activity['state'] == 'completed' + }) + self._history._child_workflows.update({ + id_: child_workflow for id_, child_workflow in + previous_history._child_workflows.iteritems() if + child_workflow['state'] == 'completed' + }) + def replay(self, history): """Executes the workflow from the start until it blocks. @@ -291,6 +311,17 @@ def replay(self, history): args = input.get('args', ()) kwargs = input.get('kwargs', {}) + previous_workflow_execution = input.get('_previous_workflow_execution') + if previous_workflow_execution: + # Resume previous execution by injecting input and completed task + # in the current history. + ex = swf.models.WorkflowExecution( + domain=self.domain, + workflow_id=previous_workflow_execution['workflow_id'], + run_id=previous_workflow_execution['run_id'], + ) + self.merge_previous_execution(ex) + try: result = self.run_workflow(*args, **kwargs) except exceptions.ExecutionBlocked: diff --git a/tests/test_dataflow.py b/tests/test_dataflow.py index 7ebc23f0a..059607af4 100644 --- a/tests/test_dataflow.py +++ b/tests/test_dataflow.py @@ -1,5 +1,6 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +from __future__ import absolute_import import functools import mock @@ -1006,3 +1007,46 @@ def test_activity_not_found_schedule_failed_already_exists(): decisions, _ = executor.replay(history) check_task_scheduled_decision(decisions[0], increment) + + +def test_resume_stopped_workflow_execution(): + workflow = TestDefinition + executor = Executor(DOMAIN, workflow) + + previous_history = builder.History(workflow) + decision_id = previous_history.last_id + (previous_history + .add_activity_task( + increment, + decision_id=decision_id, + last_state='completed', + activity_id='activity-tests.test_dataflow.increment-1', + input={'args': 1}, + result=2) + .add_decision_task_scheduled() + .add_decision_task_started()) + + history = builder.History( + workflow, + input={ + '_previous_workflow_execution': { + 'workflow_id': 'WORKFLOW_ID', + 'run_id': 'RUN_ID', + } + }) + decision_id = previous_history.last_id + (history + .add_decision_task_scheduled() + .add_decision_task_started()) + + class FakeWorkflowExecution(object): + def __init__(self, *args, **kwargs): + pass + + def history(self): + return previous_history + + with mock.patch('swf.models.WorkflowExecution') as Mock: + Mock.return_value = FakeWorkflowExecution() + decisions, _ = executor.replay(history) + check_task_scheduled_decision(decisions[0], double)