Skip to content

Commit 03aed02

Browse files
authored
fix(plugin): Fixed batch plugin stuck when underlying batch is collected (#543)
1 parent 68525b4 commit 03aed02

File tree

3 files changed

+47
-14
lines changed

3 files changed

+47
-14
lines changed

engine/templates_tests/batch.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ steps:
1111
template_name: batchedtasktemplate
1212
json_inputs: '[{"specific_string": "specific-1"}, {"specific_string": "specific-2"}]'
1313
common_json_inputs: '{"common_string": "common"}'
14-
sub_batch_size: 2
14+
sub_batch_size: "2"
1515
batchYamlInputs:
1616
description: Batching tasks YAML
1717
action:
@@ -23,4 +23,4 @@ steps:
2323
- specific_string: specific-2
2424
common_inputs:
2525
common_string: common
26-
sub_batch_size: 2
26+
sub_batch_size: "2"

pkg/plugins/builtin/batch/README.md

+6-5
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
# `batch` Plugin
22

3-
This plugin creates a batch of tasks based on the same template and waits for it to complete. It acts like the `subtask` combined with a `foreach`, but doesn't modify the resolution by adding new steps dynamically. As it makes less calls to the underlying database, this plugin is suited for large batches of tasks, where the `subtask` / `foreach` combination would usually struggle, escpecially by bloating the database.
4-
Tasks belonging to the same batch share a common `BatchID` as well as tag holding their parent's ID.
3+
This plugin creates a batch of tasks based on the same template and waits for it to complete. It acts like the `subtask` combined with a `foreach`, but doesn't modify the resolution by adding new steps dynamically. As it makes less calls to the underlying database, this plugin is suited for large batches of tasks, where the `subtask` / `foreach` combination would usually struggle, especially by bloating the database.
4+
Tasks belonging to the same batch share a common `BatchID` as well as a tag holding their parent's ID.
55

66
##### Remarks:
7+
Like the subtask plugin, it's unadvised to have a step based on the batch plugin running alongside other steps in a template. If these other steps take time to return a result, the batch plugin may miss the wake up call from its children tasks.
78
The output of child tasks is not made available in this plugin's output. This feature will come later.
89

910
## Configuration
@@ -16,7 +17,7 @@ The output of child tasks is not made available in this plugin's output. This fe
1617
| `common_inputs` | a map of named values, as accepted on µTask's API, given to all task in the batch by combining it with each input |
1718
| `common_json_inputs` | same as `common_inputs` but as a JSON string. If specified, it overrides `common_inputs` |
1819
| `tags` | a map of named strings added as tags when creating child tasks |
19-
| `sub_batch_size` | the number tasks to create and run at once. `0` for infinity (i.e.: all tasks are created at once and waited for) (default). Higher values reduce the amount of calls made to the database, but increase sensitivity to database unavailability (if a task creation fails, the whole sub batch must be created again) |
20+
| `sub_batch_size` | the number tasks to create and run at once, as a string. `0` for infinity (i.e.: all tasks are created at once and waited for) (default). Higher values reduce the amount of calls made to the database, but increase sensitivity to database unavailability (if a task creation fails, the whole sub batch must be created again) |
2021
| `comment` | a string set as `comment` when creating child tasks |
2122
| `resolver_usernames` | a string containing a JSON array of additional resolver users for child tasks |
2223
| `resolver_groups` | a string containing a JSON array of additional resolver groups for child tasks |
@@ -33,7 +34,7 @@ action:
3334
configuration:
3435
# [Required]
3536
# A template that must already be registered on this instance of µTask
36-
template: some-task-template
37+
template_name: some-task-template
3738
# Valid inputs, as defined by the referred template, here requiring 3 inputs: foo, otherFoo and fooCommon
3839
inputs:
3940
- foo: bar-1
@@ -50,7 +51,7 @@ action:
5051
fooTag: value-of-foo-tag
5152
barTag: value-of-bar-tag
5253
# The amount of tasks to run at once
53-
sub_batch_size: 2
54+
sub_batch_size: "2"
5455
# A list of users which are authorized to resolve this specific task
5556
resolver_usernames: '["authorizedUser"]'
5657
resolver_groups: '["authorizedGroup"]'

pkg/plugins/builtin/batch/batch.go

+39-7
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"strconv"
78
"strings"
89

910
jujuErrors "github.com/juju/errors"
1011
"github.com/loopfz/gadgeto/zesty"
1112
"github.com/sirupsen/logrus"
1213

1314
"github.com/ovh/utask"
15+
"github.com/ovh/utask/db/pgjuju"
1416
"github.com/ovh/utask/models/resolution"
1517
"github.com/ovh/utask/models/task"
1618
"github.com/ovh/utask/models/tasktemplate"
@@ -46,8 +48,9 @@ type BatchConfig struct {
4648
Tags map[string]string `json:"tags"`
4749
ResolverUsernames string `json:"resolver_usernames"`
4850
ResolverGroups string `json:"resolver_groups"`
49-
// How many tasks will run concurrently. 0 for infinity (default)
50-
SubBatchSize int `json:"sub_batch_size"`
51+
// How many tasks will run concurrently. 0 for infinity (default). It's supplied as a string to support templating
52+
SubBatchSizeStr string `json:"sub_batch_size"`
53+
SubBatchSize int64 `json:"-"`
5154
}
5255

5356
// quotedString is a string with doubly escaped quotes, so the string stays simply escaped after being processed
@@ -132,6 +135,11 @@ func exec(stepName string, config any, ictx any) (any, any, error) {
132135
return nil, batchCtx.RawMetadata.Format(), err
133136
}
134137

138+
if len(conf.Inputs) == 0 {
139+
// Empty input, there's nothing to do
140+
return nil, BatchMetadata{}, nil
141+
}
142+
135143
if conf.Tags == nil {
136144
conf.Tags = make(map[string]string)
137145
}
@@ -235,7 +243,7 @@ func populateBatch(
235243

236244
// Computing how many tasks to start
237245
remaining := int64(len(conf.Inputs)) - tasksStarted
238-
toStart := int64(conf.SubBatchSize) - running // How many tasks can be started
246+
toStart := conf.SubBatchSize - running // How many tasks can be started
239247
if remaining < toStart || conf.SubBatchSize == 0 {
240248
// There's less tasks remaining to start than the amount of available running slots or slots are unlimited
241249
toStart = remaining
@@ -270,12 +278,25 @@ func runBatch(
270278

271279
b, err := task.LoadBatchFromPublicID(dbp, metadata.BatchID)
272280
if err != nil {
273-
if jujuErrors.IsNotFound(err) {
274-
// The batch has been collected (deleted in DB) because no remaining task referenced it. There's
275-
// nothing more to do.
281+
if !jujuErrors.Is(err, jujuErrors.NotFound) {
282+
return metadata, err
283+
}
284+
// else, the batch has been collected (deleted in DB) because no task referenced it anymore.
285+
286+
if metadata.TasksStarted == int64(len(conf.Inputs)) {
287+
// There is no more tasks to create, the work is done
288+
metadata.RemainingTasks = 0
276289
return metadata, nil
277290
}
278-
return metadata, err
291+
// else, the batch was collected but we still have tasks to create. We need to recreate the batch with
292+
// the same public ID and populate it.
293+
// It can happen when the garbage collector runs after a sub-batch is done, but before the batch plugin
294+
// could populate the batch with more tasks.
295+
296+
b = &task.Batch{BatchDBModel: task.BatchDBModel{PublicID: metadata.BatchID}}
297+
if err := dbp.DB().Insert(&b.BatchDBModel); err != nil {
298+
return metadata, pgjuju.Interpret(err)
299+
}
279300
}
280301

281302
if metadata.TasksStarted < int64(len(conf.Inputs)) {
@@ -349,6 +370,17 @@ func parseInputs(conf *BatchConfig, batchCtx *BatchContext) error {
349370
return jujuErrors.NewBadRequest(err, "JSON inputs unmarshalling failure")
350371
}
351372
}
373+
374+
if conf.SubBatchSizeStr == "" {
375+
conf.SubBatchSize = 0
376+
} else {
377+
subBatchSize, err := strconv.ParseInt(conf.SubBatchSizeStr, 10, 64)
378+
if err != nil {
379+
return jujuErrors.NewBadRequest(err, "parsing failure of field 'SubBatchSize'")
380+
}
381+
conf.SubBatchSize = subBatchSize
382+
}
383+
352384
return nil
353385
}
354386

0 commit comments

Comments
 (0)