Skip to content

Commit dd6039e

Browse files
authored
feat(plugin): add new task-batching plugin (#495)
1 parent 4ad19c6 commit dd6039e

File tree

13 files changed

+984
-38
lines changed

13 files changed

+984
-38
lines changed

api/handler/batch.go

+15-37
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@ package handler
22

33
import (
44
"github.com/gin-gonic/gin"
5-
"github.com/juju/errors"
65
"github.com/loopfz/gadgeto/zesty"
76

87
"github.com/ovh/utask"
98
"github.com/ovh/utask/models/task"
10-
"github.com/ovh/utask/models/tasktemplate"
9+
"github.com/ovh/utask/pkg/batch"
1110
"github.com/ovh/utask/pkg/metadata"
12-
"github.com/ovh/utask/pkg/taskutils"
1311
"github.com/ovh/utask/pkg/utils"
1412
)
1513

@@ -34,11 +32,6 @@ func CreateBatch(c *gin.Context, in *createBatchIn) (*task.Batch, error) {
3432

3533
metadata.AddActionMetadata(c, metadata.TemplateName, in.TemplateName)
3634

37-
tt, err := tasktemplate.LoadFromName(dbp, in.TemplateName)
38-
if err != nil {
39-
return nil, err
40-
}
41-
4235
if err := utils.ValidateTags(in.Tags); err != nil {
4336
return nil, err
4437
}
@@ -49,45 +42,30 @@ func CreateBatch(c *gin.Context, in *createBatchIn) (*task.Batch, error) {
4942

5043
b, err := task.CreateBatch(dbp)
5144
if err != nil {
52-
dbp.Rollback()
45+
_ = dbp.Rollback()
5346
return nil, err
5447
}
5548

5649
metadata.AddActionMetadata(c, metadata.BatchID, b.PublicID)
5750

58-
for _, inp := range in.Inputs {
59-
input, err := conjMap(in.CommonInput, inp)
60-
if err != nil {
61-
dbp.Rollback()
62-
return nil, err
63-
}
64-
65-
_, err = taskutils.CreateTask(c, dbp, tt, in.WatcherUsernames, in.WatcherGroups, []string{}, []string{}, input, b, in.Comment, nil, in.Tags)
66-
if err != nil {
67-
dbp.Rollback()
68-
return nil, err
69-
}
51+
_, err = batch.Populate(c, b, dbp, batch.TaskArgs{
52+
TemplateName: in.TemplateName,
53+
Inputs: in.Inputs,
54+
CommonInput: in.CommonInput,
55+
Comment: in.Comment,
56+
WatcherUsernames: in.WatcherUsernames,
57+
WatcherGroups: in.WatcherGroups,
58+
Tags: in.Tags,
59+
})
60+
if err != nil {
61+
_ = dbp.Rollback()
62+
return nil, err
7063
}
7164

7265
if err := dbp.Commit(); err != nil {
73-
dbp.Rollback()
66+
_ = dbp.Rollback()
7467
return nil, err
7568
}
7669

7770
return b, nil
7871
}
79-
80-
func conjMap(common, particular map[string]interface{}) (map[string]interface{}, error) {
81-
conj := make(map[string]interface{})
82-
for key, value := range particular {
83-
conj[key] = value
84-
}
85-
86-
for key, value := range common {
87-
if _, ok := conj[key]; ok {
88-
return nil, errors.NewBadRequest(nil, "Conflicting keys in input maps")
89-
}
90-
conj[key] = value
91-
}
92-
return conj, nil
93-
}

engine/engine.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/ovh/utask/pkg/jsonschema"
3030
"github.com/ovh/utask/pkg/metadata"
3131
"github.com/ovh/utask/pkg/now"
32+
pluginbatch "github.com/ovh/utask/pkg/plugins/builtin/batch"
3233
"github.com/ovh/utask/pkg/taskutils"
3334
"github.com/ovh/utask/pkg/utils"
3435
)
@@ -524,7 +525,9 @@ forLoop:
524525
if mapStatus[status] {
525526
if status == resolution.StateWaiting && recheckWaiting {
526527
for name, s := range res.Steps {
527-
if s.State == step.StateWaiting {
528+
// Steps using the batch plugin shouldn't be run again when WAITING. Running them second time
529+
// may lead to a race condition when the last task of a sub-batch tries to resume its parent
530+
if s.State == step.StateWaiting && s.Action.Type != pluginbatch.Plugin.PluginName() {
528531
delete(executedSteps, name)
529532
}
530533
}

engine/engine_test.go

+123
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ import (
77
"fmt"
88
"os"
99
"path/filepath"
10+
"strings"
1011
"sync"
1112
"testing"
1213
"time"
1314

15+
"github.com/Masterminds/squirrel"
1416
"github.com/juju/errors"
1517
"github.com/loopfz/gadgeto/zesty"
1618
"github.com/maxatome/go-testdeep/td"
@@ -23,6 +25,7 @@ import (
2325
"github.com/ovh/utask/api"
2426
"github.com/ovh/utask/db"
2527
"github.com/ovh/utask/db/pgjuju"
28+
"github.com/ovh/utask/db/sqlgenerator"
2629
"github.com/ovh/utask/engine"
2730
"github.com/ovh/utask/engine/functions"
2831
functionrunner "github.com/ovh/utask/engine/functions/runner"
@@ -36,6 +39,7 @@ import (
3639
compress "github.com/ovh/utask/pkg/compress/init"
3740
"github.com/ovh/utask/pkg/now"
3841
"github.com/ovh/utask/pkg/plugins"
42+
pluginbatch "github.com/ovh/utask/pkg/plugins/builtin/batch"
3943
plugincallback "github.com/ovh/utask/pkg/plugins/builtin/callback"
4044
"github.com/ovh/utask/pkg/plugins/builtin/echo"
4145
"github.com/ovh/utask/pkg/plugins/builtin/script"
@@ -91,6 +95,7 @@ func TestMain(m *testing.M) {
9195
step.RegisterRunner(echo.Plugin.PluginName(), echo.Plugin)
9296
step.RegisterRunner(script.Plugin.PluginName(), script.Plugin)
9397
step.RegisterRunner(pluginsubtask.Plugin.PluginName(), pluginsubtask.Plugin)
98+
step.RegisterRunner(pluginbatch.Plugin.PluginName(), pluginbatch.Plugin)
9499
step.RegisterRunner(plugincallback.Plugin.PluginName(), plugincallback.Plugin)
95100

96101
os.Exit(m.Run())
@@ -194,6 +199,21 @@ func templateFromYAML(dbp zesty.DBProvider, filename string) (*tasktemplate.Task
194199
return tasktemplate.LoadFromName(dbp, tmpl.Name)
195200
}
196201

202+
func listBatchTasks(dbp zesty.DBProvider, batchID int64) ([]string, error) {
203+
query, params, err := sqlgenerator.PGsql.
204+
Select("public_id").
205+
From("task").
206+
Where(squirrel.Eq{"id_batch": batchID}).
207+
ToSql()
208+
if err != nil {
209+
return nil, err
210+
}
211+
212+
var taskIDs []string
213+
_, err = dbp.DB().Select(&taskIDs, query, params...)
214+
return taskIDs, err
215+
}
216+
197217
func TestSimpleTemplate(t *testing.T) {
198218
input := map[string]interface{}{
199219
"foo": "bar",
@@ -1370,3 +1390,106 @@ func TestB64RawEncodeDecode(t *testing.T) {
13701390
assert.Equal(t, "cmF3IG1lc3NhZ2U", output["a"])
13711391
assert.Equal(t, "raw message", output["b"])
13721392
}
1393+
1394+
func TestBatch(t *testing.T) {
1395+
dbp, err := zesty.NewDBProvider(utask.DBName)
1396+
require.Nil(t, err)
1397+
1398+
_, err = templateFromYAML(dbp, "batchedTask.yaml")
1399+
require.Nil(t, err)
1400+
1401+
_, err = templateFromYAML(dbp, "batch.yaml")
1402+
require.Nil(t, err)
1403+
1404+
res, err := createResolution("batch.yaml", map[string]interface{}{}, nil)
1405+
require.Nil(t, err, "failed to create resolution: %s", err)
1406+
1407+
res, err = runResolution(res)
1408+
require.Nil(t, err)
1409+
require.NotNil(t, res)
1410+
assert.Equal(t, resolution.StateWaiting, res.State)
1411+
1412+
for _, batchStepName := range []string{"batchJsonInputs", "batchYamlInputs"} {
1413+
batchStepMetadataRaw, ok := res.Steps[batchStepName].Metadata.(string)
1414+
assert.True(t, ok, "wrong type of metadata for step '%s'", batchStepName)
1415+
1416+
assert.Nil(t, res.Steps[batchStepName].Output, "output nil for step '%s'", batchStepName)
1417+
1418+
// The plugin formats Metadata in a special way that we need to revert before unmarshalling them
1419+
batchStepMetadataRaw = strings.ReplaceAll(batchStepMetadataRaw, `\"`, `"`)
1420+
var batchStepMetadata map[string]any
1421+
err := json.Unmarshal([]byte(batchStepMetadataRaw), &batchStepMetadata)
1422+
require.Nil(t, err, "metadata unmarshalling of step '%s'", batchStepName)
1423+
1424+
batchPublicID := batchStepMetadata["batch_id"].(string)
1425+
assert.NotEqual(t, "", batchPublicID, "wrong batch ID '%s'", batchPublicID)
1426+
1427+
b, err := task.LoadBatchFromPublicID(dbp, batchPublicID)
1428+
require.Nil(t, err)
1429+
1430+
taskIDs, err := listBatchTasks(dbp, b.ID)
1431+
require.Nil(t, err)
1432+
assert.Len(t, taskIDs, 2)
1433+
1434+
for i, publicID := range taskIDs {
1435+
child, err := task.LoadFromPublicID(dbp, publicID)
1436+
require.Nil(t, err)
1437+
assert.Equal(t, task.StateTODO, child.State)
1438+
1439+
childResolution, err := resolution.Create(dbp, child, nil, "", false, nil)
1440+
require.Nil(t, err)
1441+
1442+
childResolution, err = runResolution(childResolution)
1443+
require.Nil(t, err)
1444+
assert.Equal(t, resolution.StateDone, childResolution.State)
1445+
1446+
for k, v := range childResolution.Steps {
1447+
assert.Equal(t, step.StateDone, v.State, "not valid state for step %s", k)
1448+
}
1449+
1450+
child, err = task.LoadFromPublicID(dbp, child.PublicID)
1451+
require.Nil(t, err)
1452+
assert.Equal(t, task.StateDone, child.State)
1453+
1454+
parentTaskToResume, err := taskutils.ShouldResumeParentTask(dbp, child)
1455+
require.Nil(t, err)
1456+
if i == len(taskIDs)-1 {
1457+
// Only the last child task should resume the parent
1458+
require.NotNil(t, parentTaskToResume)
1459+
assert.Equal(t, res.TaskID, parentTaskToResume.ID)
1460+
} else {
1461+
require.Nil(t, parentTaskToResume)
1462+
}
1463+
}
1464+
}
1465+
1466+
// checking if the parent task is picked up after that the subtask is resolved.
1467+
// need to sleep a bit because the parent task is resumed asynchronously
1468+
ti := time.Second
1469+
i := time.Duration(0)
1470+
for i < ti {
1471+
res, err = resolution.LoadFromPublicID(dbp, res.PublicID)
1472+
require.Nil(t, err)
1473+
if res.State != resolution.StateWaiting {
1474+
break
1475+
}
1476+
1477+
time.Sleep(time.Millisecond * 10)
1478+
i += time.Millisecond * 10
1479+
}
1480+
1481+
ti = time.Second
1482+
i = time.Duration(0)
1483+
for i < ti {
1484+
res, err = resolution.LoadFromPublicID(dbp, res.PublicID)
1485+
require.Nil(t, err)
1486+
if res.State != resolution.StateRunning {
1487+
break
1488+
}
1489+
1490+
time.Sleep(time.Millisecond * 10)
1491+
i += time.Millisecond * 10
1492+
1493+
}
1494+
assert.Equal(t, resolution.StateDone, res.State)
1495+
}

engine/templates_tests/batch.yaml

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
name: batchTemplate
2+
description: Template to test the batch plugin
3+
title_format: "[test] batch template test"
4+
5+
steps:
6+
batchJsonInputs:
7+
description: Batching tasks JSON
8+
action:
9+
type: batch
10+
configuration:
11+
template_name: batchedtasktemplate
12+
json_inputs: '[{"specific_string": "specific-1"}, {"specific_string": "specific-2"}]'
13+
common_json_inputs: '{"common_string": "common"}'
14+
sub_batch_size: 2
15+
batchYamlInputs:
16+
description: Batching tasks YAML
17+
action:
18+
type: batch
19+
configuration:
20+
template_name: batchedtasktemplate
21+
inputs:
22+
- specific_string: specific-1
23+
- specific_string: specific-2
24+
common_inputs:
25+
common_string: common
26+
sub_batch_size: 2
+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
name: batchedTaskTemplate
2+
description: Template made to be spawned by the testing batch plugin
3+
title_format: "[test] batched task template"
4+
5+
inputs:
6+
- name: specific_string
7+
description: A string specific to this task
8+
type: string
9+
- name: common_string
10+
description: A string common to all tasks in the same batch
11+
type: string
12+
13+
steps:
14+
simpleStep:
15+
description: Simple step
16+
action:
17+
type: echo
18+
configuration:
19+
output: >-
20+
{
21+
"specific": "{{.input.specific_string}}",
22+
"common": "{{.input.common_string}}"
23+
}

0 commit comments

Comments
 (0)