@@ -23,110 +23,112 @@ import (
2323)
2424
2525// NewTaskRunner creates a TaskRunner instance based on the task type.
26- func NewTaskRunner (taskName string , task model.Task , taskSupport TaskSupport ) (TaskRunner , error ) {
26+ func NewTaskRunner (taskName string , task model.Task , workflowDef * model. Workflow ) (TaskRunner , error ) {
2727 switch t := task .(type ) {
2828 case * model.SetTask :
29- return NewSetTaskRunner (taskName , t , taskSupport )
29+ return NewSetTaskRunner (taskName , t )
3030 case * model.RaiseTask :
31- return NewRaiseTaskRunner (taskName , t , taskSupport )
31+ return NewRaiseTaskRunner (taskName , t , workflowDef )
3232 case * model.DoTask :
33- return NewDoTaskRunner (t .Do , taskSupport )
33+ return NewDoTaskRunner (t .Do , workflowDef )
3434 case * model.ForTask :
35- return NewForTaskRunner (taskName , t , taskSupport )
35+ return NewForTaskRunner (taskName , t )
36+ case * model.CallHTTP :
37+ return NewCallHttpRunner (taskName , t )
3638 default :
3739 return nil , fmt .Errorf ("unsupported task type '%T' for task '%s'" , t , taskName )
3840 }
3941}
4042
41- func NewDoTaskRunner (taskList * model.TaskList , taskSupport TaskSupport ) (* DoTaskRunner , error ) {
43+ func NewDoTaskRunner (taskList * model.TaskList , workflowDef * model. Workflow ) (* DoTaskRunner , error ) {
4244 return & DoTaskRunner {
4345 TaskList : taskList ,
44- TaskSupport : taskSupport ,
46+ WorkflowDef : workflowDef ,
4547 }, nil
4648}
4749
4850type DoTaskRunner struct {
4951 TaskList * model.TaskList
50- TaskSupport TaskSupport
52+ WorkflowDef * model. Workflow
5153}
5254
53- func (d * DoTaskRunner ) Run (input interface {}) (output interface {}, err error ) {
55+ func (d * DoTaskRunner ) Run (input interface {}, taskSupport TaskSupport ) (output interface {}, err error ) {
5456 if d .TaskList == nil {
5557 return input , nil
5658 }
57- return d .runTasks (input , d . TaskList )
59+ return d .runTasks (input , taskSupport )
5860}
5961
6062func (d * DoTaskRunner ) GetTaskName () string {
6163 return ""
6264}
6365
6466// runTasks runs all defined tasks sequentially.
65- func (d * DoTaskRunner ) runTasks (input interface {}, tasks * model. TaskList ) (output interface {}, err error ) {
67+ func (d * DoTaskRunner ) runTasks (input interface {}, taskSupport TaskSupport ) (output interface {}, err error ) {
6668 output = input
67- if tasks == nil {
69+ if d . TaskList == nil {
6870 return output , nil
6971 }
7072
7173 idx := 0
72- currentTask := (* tasks )[idx ]
74+ currentTask := (* d . TaskList )[idx ]
7375
7476 for currentTask != nil {
75- if err = d . TaskSupport .SetTaskDef (currentTask ); err != nil {
77+ if err = taskSupport .SetTaskDef (currentTask ); err != nil {
7678 return nil , err
7779 }
78- if err = d . TaskSupport .SetTaskReferenceFromName (currentTask .Key ); err != nil {
80+ if err = taskSupport .SetTaskReferenceFromName (currentTask .Key ); err != nil {
7981 return nil , err
8082 }
8183
82- if shouldRun , err := d .shouldRunTask (input , currentTask ); err != nil {
84+ if shouldRun , err := d .shouldRunTask (input , taskSupport , currentTask ); err != nil {
8385 return output , err
8486 } else if ! shouldRun {
85- idx , currentTask = tasks .Next (idx )
87+ idx , currentTask = d . TaskList .Next (idx )
8688 continue
8789 }
8890
89- d . TaskSupport .SetTaskStatus (currentTask .Key , ctx .PendingStatus )
91+ taskSupport .SetTaskStatus (currentTask .Key , ctx .PendingStatus )
9092
9193 // Check if this task is a SwitchTask and handle it
9294 if switchTask , ok := currentTask .Task .(* model.SwitchTask ); ok {
93- flowDirective , err := d .evaluateSwitchTask (input , currentTask .Key , switchTask )
95+ flowDirective , err := d .evaluateSwitchTask (input , taskSupport , currentTask .Key , switchTask )
9496 if err != nil {
95- d . TaskSupport .SetTaskStatus (currentTask .Key , ctx .FaultedStatus )
97+ taskSupport .SetTaskStatus (currentTask .Key , ctx .FaultedStatus )
9698 return output , err
9799 }
98- d . TaskSupport .SetTaskStatus (currentTask .Key , ctx .CompletedStatus )
100+ taskSupport .SetTaskStatus (currentTask .Key , ctx .CompletedStatus )
99101
100102 // Process FlowDirective: update idx/currentTask accordingly
101- idx , currentTask = tasks .KeyAndIndex (flowDirective .Value )
103+ idx , currentTask = d . TaskList .KeyAndIndex (flowDirective .Value )
102104 if currentTask == nil {
103105 return nil , fmt .Errorf ("flow directive target '%s' not found" , flowDirective .Value )
104106 }
105107 continue
106108 }
107109
108- runner , err := NewTaskRunner (currentTask .Key , currentTask .Task , d . TaskSupport )
110+ runner , err := NewTaskRunner (currentTask .Key , currentTask .Task , taskSupport . GetWorkflowDef () )
109111 if err != nil {
110112 return output , err
111113 }
112114
113- d . TaskSupport .SetTaskStatus (currentTask .Key , ctx .RunningStatus )
114- if output , err = d .runTask (input , runner , currentTask .Task .GetBase ()); err != nil {
115- d . TaskSupport .SetTaskStatus (currentTask .Key , ctx .FaultedStatus )
115+ taskSupport .SetTaskStatus (currentTask .Key , ctx .RunningStatus )
116+ if output , err = d .runTask (input , taskSupport , runner , currentTask .Task .GetBase ()); err != nil {
117+ taskSupport .SetTaskStatus (currentTask .Key , ctx .FaultedStatus )
116118 return output , err
117119 }
118120
119- d . TaskSupport .SetTaskStatus (currentTask .Key , ctx .CompletedStatus )
121+ taskSupport .SetTaskStatus (currentTask .Key , ctx .CompletedStatus )
120122 input = deepCloneValue (output )
121- idx , currentTask = tasks .Next (idx )
123+ idx , currentTask = d . TaskList .Next (idx )
122124 }
123125
124126 return output , nil
125127}
126128
127- func (d * DoTaskRunner ) shouldRunTask (input interface {}, task * model.TaskItem ) (bool , error ) {
129+ func (d * DoTaskRunner ) shouldRunTask (input interface {}, taskSupport TaskSupport , task * model.TaskItem ) (bool , error ) {
128130 if task .GetBase ().If != nil {
129- output , err := traverseAndEvaluateBool (task .GetBase ().If .String (), input , d . TaskSupport .GetContext ())
131+ output , err := traverseAndEvaluateBool (task .GetBase ().If .String (), input , taskSupport .GetContext ())
130132 if err != nil {
131133 return false , model .NewErrExpression (err , task .Key )
132134 }
@@ -135,15 +137,15 @@ func (d *DoTaskRunner) shouldRunTask(input interface{}, task *model.TaskItem) (b
135137 return true , nil
136138}
137139
138- func (d * DoTaskRunner ) evaluateSwitchTask (input interface {}, taskKey string , switchTask * model.SwitchTask ) (* model.FlowDirective , error ) {
140+ func (d * DoTaskRunner ) evaluateSwitchTask (input interface {}, taskSupport TaskSupport , taskKey string , switchTask * model.SwitchTask ) (* model.FlowDirective , error ) {
139141 var defaultThen * model.FlowDirective
140142 for _ , switchItem := range switchTask .Switch {
141143 for _ , switchCase := range switchItem {
142144 if switchCase .When == nil {
143145 defaultThen = switchCase .Then
144146 continue
145147 }
146- result , err := traverseAndEvaluateBool (model .NormalizeExpr (switchCase .When .String ()), input , d . TaskSupport .GetContext ())
148+ result , err := traverseAndEvaluateBool (model .NormalizeExpr (switchCase .When .String ()), input , taskSupport .GetContext ())
147149 if err != nil {
148150 return nil , model .NewErrExpression (err , taskKey )
149151 }
@@ -162,86 +164,86 @@ func (d *DoTaskRunner) evaluateSwitchTask(input interface{}, taskKey string, swi
162164}
163165
164166// runTask executes an individual task.
165- func (d * DoTaskRunner ) runTask (input interface {}, runner TaskRunner , task * model.TaskBase ) (output interface {}, err error ) {
167+ func (d * DoTaskRunner ) runTask (input interface {}, taskSupport TaskSupport , runner TaskRunner , task * model.TaskBase ) (output interface {}, err error ) {
166168 taskName := runner .GetTaskName ()
167169
168- d . TaskSupport .SetTaskStartedAt (time .Now ())
169- d . TaskSupport .SetTaskRawInput (input )
170- d . TaskSupport .SetTaskName (taskName )
170+ taskSupport .SetTaskStartedAt (time .Now ())
171+ taskSupport .SetTaskRawInput (input )
172+ taskSupport .SetTaskName (taskName )
171173
172174 if task .Input != nil {
173- if input , err = d .processTaskInput (task , input , taskName ); err != nil {
175+ if input , err = d .processTaskInput (task , input , taskSupport ); err != nil {
174176 return nil , err
175177 }
176178 }
177179
178- output , err = runner .Run (input )
180+ output , err = runner .Run (input , taskSupport )
179181 if err != nil {
180182 return nil , err
181183 }
182184
183- d . TaskSupport .SetTaskRawOutput (output )
185+ taskSupport .SetTaskRawOutput (output )
184186
185- if output , err = d .processTaskOutput (task , output , taskName ); err != nil {
187+ if output , err = d .processTaskOutput (task , output , taskSupport ); err != nil {
186188 return nil , err
187189 }
188190
189- if err = d .processTaskExport (task , output , taskName ); err != nil {
191+ if err = d .processTaskExport (task , output , taskSupport ); err != nil {
190192 return nil , err
191193 }
192194
193195 return output , nil
194196}
195197
196198// processTaskInput processes task input validation and transformation.
197- func (d * DoTaskRunner ) processTaskInput (task * model.TaskBase , taskInput interface {}, taskName string ) (output interface {}, err error ) {
199+ func (d * DoTaskRunner ) processTaskInput (task * model.TaskBase , taskInput interface {}, taskSupport TaskSupport ) (output interface {}, err error ) {
198200 if task .Input == nil {
199201 return taskInput , nil
200202 }
201203
202- if err = validateSchema (taskInput , task .Input .Schema , taskName ); err != nil {
204+ if err = validateSchema (taskInput , task .Input .Schema , d . GetTaskName () ); err != nil {
203205 return nil , err
204206 }
205207
206- if output , err = traverseAndEvaluate (task .Input .From , taskInput , taskName , d . TaskSupport .GetContext ()); err != nil {
208+ if output , err = traverseAndEvaluate (task .Input .From , taskInput , d . GetTaskName (), taskSupport .GetContext ()); err != nil {
207209 return nil , err
208210 }
209211
210212 return output , nil
211213}
212214
213215// processTaskOutput processes task output validation and transformation.
214- func (d * DoTaskRunner ) processTaskOutput (task * model.TaskBase , taskOutput interface {}, taskName string ) (output interface {}, err error ) {
216+ func (d * DoTaskRunner ) processTaskOutput (task * model.TaskBase , taskOutput interface {}, taskSupport TaskSupport ) (output interface {}, err error ) {
215217 if task .Output == nil {
216218 return taskOutput , nil
217219 }
218220
219- if output , err = traverseAndEvaluate (task .Output .As , taskOutput , taskName , d . TaskSupport .GetContext ()); err != nil {
221+ if output , err = traverseAndEvaluate (task .Output .As , taskOutput , d . GetTaskName (), taskSupport .GetContext ()); err != nil {
220222 return nil , err
221223 }
222224
223- if err = validateSchema (output , task .Output .Schema , taskName ); err != nil {
225+ if err = validateSchema (output , task .Output .Schema , d . GetTaskName () ); err != nil {
224226 return nil , err
225227 }
226228
227229 return output , nil
228230}
229231
230- func (d * DoTaskRunner ) processTaskExport (task * model.TaskBase , taskOutput interface {}, taskName string ) (err error ) {
232+ func (d * DoTaskRunner ) processTaskExport (task * model.TaskBase , taskOutput interface {}, taskSupport TaskSupport ) (err error ) {
231233 if task .Export == nil {
232234 return nil
233235 }
234236
235- output , err := traverseAndEvaluate (task .Export .As , taskOutput , taskName , d . TaskSupport .GetContext ())
237+ output , err := traverseAndEvaluate (task .Export .As , taskOutput , d . GetTaskName (), taskSupport .GetContext ())
236238 if err != nil {
237239 return err
238240 }
239241
240- if err = validateSchema (output , task .Export .Schema , taskName ); err != nil {
242+ if err = validateSchema (output , task .Export .Schema , d . GetTaskName () ); err != nil {
241243 return nil
242244 }
243245
244- d . TaskSupport .SetWorkflowInstanceCtx (output )
246+ taskSupport .SetWorkflowInstanceCtx (output )
245247
246248 return nil
247249}
0 commit comments