Skip to content

Commit d8a0b5a

Browse files
authored
foreach: add foreach_strategy property to ensure sequence between foreach elements (#211)
Each elements of a foreach loop are run in parallel. `foreach_strategy` control the strategy in which the elements will be run sequentially or in parallel. Closes #197 Signed-off-by: Romain Beuque <[email protected]>
1 parent acc23c0 commit d8a0b5a

File tree

5 files changed

+163
-24
lines changed

5 files changed

+163
-24
lines changed

README.md

+5
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,11 @@ This output can be then passed to another step in json format:
591591
foreach: '{{.step.prefixStrings.children | toJson}}'
592592
```
593593

594+
It's possible to configure the strategy used to run each elements: default strategy is `parallel`: each elements will be run in parallel to maximize throughput ; `sequence` will run each element when the previous one is done, to ensure the sequence between elements. It can be declared in the template as is:
595+
```yaml
596+
foreach_strategy: "sequence"
597+
```
598+
594599
#### Resources <a name="resources"></a>
595600

596601
Resources are a way to restrict the concurrency factor of certain operations, to control the throughput and avoid dangerous behavior e.g. flooding the targets.

engine/engine.go

+11
Original file line numberDiff line numberDiff line change
@@ -718,6 +718,8 @@ func expandStep(s *step.Step, res *resolution.Resolution) {
718718
s.Error = err.Error()
719719
return
720720
}
721+
722+
var previousChildStepName string
721723
// generate all children steps
722724
for i, item := range items {
723725
childStepName := fmt.Sprintf("%s-%d", s.Name, i)
@@ -736,6 +738,15 @@ func expandStep(s *step.Step, res *resolution.Resolution) {
736738
Resources: s.Resources,
737739
Item: item,
738740
}
741+
742+
if s.ForEachStrategy == step.ForEachStrategySequence {
743+
if previousChildStepName != "" {
744+
res.Steps[childStepName].Dependencies = append(res.Steps[childStepName].Dependencies, previousChildStepName)
745+
}
746+
747+
previousChildStepName = childStepName
748+
}
749+
739750
delete(res.ForeachChildrenAlreadyContracted, childStepName)
740751
}
741752
// update parent dependencies to wait on children

engine/engine_test.go

+52
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/ghodss/yaml"
1616
"github.com/juju/errors"
1717
"github.com/loopfz/gadgeto/zesty"
18+
"github.com/maxatome/go-testdeep/td"
1819
"github.com/ovh/configstore"
1920
"github.com/stretchr/testify/assert"
2021
"github.com/stretchr/testify/require"
@@ -26,6 +27,7 @@ import (
2627
"github.com/ovh/utask/engine/functions"
2728
functionrunner "github.com/ovh/utask/engine/functions/runner"
2829
"github.com/ovh/utask/engine/step"
30+
"github.com/ovh/utask/engine/step/condition"
2931
"github.com/ovh/utask/engine/values"
3032
"github.com/ovh/utask/models/resolution"
3133
"github.com/ovh/utask/models/task"
@@ -644,6 +646,56 @@ func TestForeach(t *testing.T) {
644646
assert.Equal(t, "foo-b-bar-b", firstItemOutput["concat"])
645647
}
646648

649+
func TestForeachWithChainedIterations(t *testing.T) {
650+
_, require := td.AssertRequire(t)
651+
res, err := createResolution("foreach.yaml", map[string]interface{}{
652+
"list": []interface{}{"a", "b", "c", "d", "e"},
653+
}, nil)
654+
require.Nil(err)
655+
require.NotNil(res)
656+
657+
res.Steps["generateItems"].Conditions[0].Then["this"] = "DONE"
658+
res.Steps["generateItems"].Conditions = append(
659+
res.Steps["generateItems"].Conditions,
660+
&condition.Condition{
661+
Type: condition.CHECK,
662+
If: []*condition.Assert{
663+
{
664+
Value: "{{.iterator}}",
665+
Operator: condition.EQ,
666+
Expected: "d",
667+
},
668+
},
669+
Then: map[string]string{
670+
"this": "SERVER_ERROR",
671+
},
672+
},
673+
)
674+
res.Steps["generateItems"].ForEachStrategy = "sequence"
675+
err = updateResolution(res)
676+
require.Nil(err)
677+
678+
res, err = runResolution(res)
679+
require.NotNil(res)
680+
require.Nil(err)
681+
require.Cmp(res.State, resolution.StateError)
682+
683+
td.Cmp(t, res.Steps["emptyLoop"].State, step.StateDone) // running on empty collection is ok
684+
td.Cmp(t, res.Steps["concatItems"].State, step.StateTODO)
685+
td.Cmp(t, res.Steps["finalStep"].State, step.StateTODO)
686+
td.Cmp(t, res.Steps["bStep"].State, "B")
687+
td.Cmp(t, res.Steps["generateItems-0"].State, step.StateDone)
688+
td.Cmp(t, res.Steps["generateItems-1"].State, step.StateDone)
689+
td.Cmp(t, res.Steps["generateItems-2"].State, step.StateDone)
690+
td.Cmp(t, res.Steps["generateItems-3"].State, step.StateServerError)
691+
td.Cmp(t, res.Steps["generateItems-4"].State, step.StateTODO)
692+
td.CmpLen(t, res.Steps["generateItems-0"].Dependencies, 0)
693+
td.Cmp(t, res.Steps["generateItems-1"].Dependencies, []string{"generateItems-0"})
694+
td.Cmp(t, res.Steps["generateItems-2"].Dependencies, []string{"generateItems-1"})
695+
td.Cmp(t, res.Steps["generateItems-3"].Dependencies, []string{"generateItems-2"})
696+
td.Cmp(t, res.Steps["generateItems-4"].Dependencies, []string{"generateItems-3"})
697+
}
698+
647699
func TestForeachWithPreRun(t *testing.T) {
648700
input := map[string]interface{}{}
649701
res, err := createResolution("foreachAndPreRun.yaml", input, nil)

engine/step/step.go

+70-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ const (
2727
RetrySeconds = "seconds"
2828
RetryMinutes = "minutes"
2929
RetryHours = "hours"
30+
31+
ForEachStrategyParallel = "parallel"
32+
ForEachStrategySequence = "sequence"
3033
)
3134

3235
// possible states of a step
@@ -106,7 +109,8 @@ type Step struct {
106109
Conditions []*condition.Condition `json:"conditions,omitempty"`
107110
skipped bool
108111
// loop
109-
ForEach string `json:"foreach,omitempty"` // "parent" step: expression for list of items
112+
ForEach string `json:"foreach,omitempty"` // "parent" step: expression for list of items
113+
ForEachStrategy string `json:"foreach_strategy"`
110114
ChildrenSteps []string `json:"children_steps,omitempty"` // list of children names
111115
ChildrenStepMap map[string]bool `json:"children_steps_map,omitempty"`
112116
Item interface{} `json:"item,omitempty"` // "child" step: item value, issued from foreach
@@ -558,6 +562,71 @@ func (st *Step) ValidAndNormalize(name string, baseConfigs map[string]json.RawMe
558562
}
559563
}
560564

565+
// check that we don't set restricted field from the template
566+
if st.State != "" {
567+
return errors.NewNotValid(nil, "step state must not be set")
568+
}
569+
570+
if st.ChildrenSteps != nil {
571+
return errors.NewNotValid(nil, "step children_steps must not be set")
572+
}
573+
574+
if st.ChildrenStepMap != nil {
575+
return errors.NewNotValid(nil, "step children_steps_map must not be set")
576+
}
577+
578+
if st.Output != nil {
579+
return errors.NewNotValid(nil, "step output must not be set")
580+
}
581+
582+
if st.Metadata != nil {
583+
return errors.NewNotValid(nil, "step metadatas must not be set")
584+
}
585+
586+
if st.Tags != nil {
587+
return errors.NewNotValid(nil, "step tags must not be set")
588+
}
589+
590+
if st.Children != nil {
591+
return errors.NewNotValid(nil, "step children must not be set")
592+
}
593+
594+
if st.Error != "" {
595+
return errors.NewNotValid(nil, "step error must not be set")
596+
}
597+
598+
if st.Metadata != nil {
599+
return errors.NewNotValid(nil, "step metadatas must not be set")
600+
}
601+
602+
if st.TryCount != 0 {
603+
return errors.NewNotValid(nil, "step try_count must not be set")
604+
}
605+
606+
t := time.Time{}
607+
if st.LastRun != t {
608+
return errors.NewNotValid(nil, "step last_time must not be set")
609+
}
610+
611+
if st.Item != nil {
612+
return errors.NewNotValid(nil, "step item must not be set")
613+
}
614+
615+
if st.ForEachStrategy != "" && st.ForEach == "" {
616+
return errors.NewNotValid(nil, "step foreach_strategy can't be set without foreach")
617+
}
618+
619+
if st.ForEach != "" {
620+
switch st.ForEachStrategy {
621+
case ForEachStrategyParallel, ForEachStrategySequence:
622+
case "":
623+
// expliciting default value
624+
st.ForEachStrategy = ForEachStrategyParallel
625+
default:
626+
return errors.NewNotValid(nil, fmt.Sprintf("step foreach_strategy %q is not a valid value", st.ForEachStrategy))
627+
}
628+
}
629+
561630
// valid execution delay
562631
if st.ExecutionDelay < 0 || st.ExecutionDelay > maxExecutionDelay {
563632
return errors.NewNotValid(nil,

engine/templates_tests/foreach.yaml

+25-23
Original file line numberDiff line numberDiff line change
@@ -7,64 +7,66 @@ inputs:
77
steps:
88
emptyLoop:
99
description: a foreach step with empty input
10-
foreach: '[]'
10+
foreach: "[]"
1111
action:
1212
type: echo
1313
configuration:
14-
output: {foo: bar}
14+
output: { foo: bar }
1515
generateItems:
1616
description: generate list for next step
17-
foreach: '{{.input.list | toJson}}'
17+
foreach: "{{.input.list | toJson}}"
1818
conditions:
1919
- type: skip
2020
if:
21-
- value: '{{.iterator}}'
22-
operator: EQ
23-
expected: a
21+
- value: "{{.iterator}}"
22+
operator: EQ
23+
expected: a
2424
then:
25-
this: PRUNE
25+
this: PRUNE
2626
- type: check
2727
if:
28-
- value: '{{.iterator}}'
29-
operator: EQ
30-
expected: b
28+
- value: "{{.iterator}}"
29+
operator: EQ
30+
expected: b
3131
then:
32-
bStep: B
32+
bStep: B
3333
action:
3434
type: echo
3535
configuration:
3636
output:
37-
foo: 'foo-{{.iterator}}'
38-
bar: 'bar-{{.iterator}}'
37+
foo: "foo-{{.iterator}}"
38+
bar: "bar-{{.iterator}}"
3939
concatItems:
40-
description: transform a list of items
40+
description: transform a list of items
4141
dependencies: [generateItems]
42-
foreach: '{{.step.generateItems.children | toJson}}'
42+
foreach: "{{.step.generateItems.children | toJson}}"
4343
conditions:
4444
- type: check
4545
if:
46-
- value: '{{ index .step "this" "output" "concat"}}'
47-
operator: EQ
48-
expected: foo-c-bar-c
46+
- value: '{{ index .step "this" "output" "concat"}}'
47+
operator: EQ
48+
expected: foo-c-bar-c
4949
then:
50-
this: PRUNE
50+
this: PRUNE
5151
action:
5252
type: echo
5353
configuration:
54-
output: {concat: '{{.iterator.output.foo}}-{{.iterator.output.bar}}'}
54+
output:
55+
{
56+
concat: "{{.iterator.output.foo}}-{{.iterator.output.bar}}",
57+
}
5558
bStep:
5659
description: impacted by concatItems b step
5760
dependencies: [generateItems]
5861
custom_states: [B]
5962
action:
6063
type: echo
6164
configuration:
62-
output: {foo: 42}
65+
output: { foo: 42 }
6366
finalStep:
6467
description: pruned by concatItems("c")
6568
dependencies: [concatItems]
6669
action:
6770
type: echo
6871
configuration:
69-
output: {foo: bar}
70-
72+
output: { foo: bar }

0 commit comments

Comments
 (0)