Skip to content

Commit 31215cd

Browse files
authored
resource limiting: complete rework to handle context cancelation and native rate-limiting on a template (#196)
This introduces a complete rework of resource usage by implementing contexts, so the lock can be cancelled, in case the lock takes too long to be obtained, or if the instance is shutting-down. It also introduces a native resource on all resolution based on the template name: it allows to reduce the concurrent execution of a given template, or to completely stop the execution of a template if it behave incorrectly. Signed-off-by: Romain Beuque <[email protected]>
1 parent 6c47e0d commit 31215cd

File tree

6 files changed

+230
-53
lines changed

6 files changed

+230
-53
lines changed

README.md

+42-6
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ Note that the operators `IN` and `NOTIN` expect a list of acceptable values in t
356356
- `dependencies`: a list of step names on which this step waits before running
357357
- `custom_states`: a list of personnalised allowed state for this step (can be assigned to the state's step using `conditions`)
358358
- `retry_pattern`: (`seconds`, `minutes`, `hours`) define on what temporal order of magnitude the re-runs of this step should be spread (default = `seconds`)
359+
- `resources`: a list of resources that will be used during the step execution, to control and limit the concurrent execution of the step (more information in [the resources section](#resources)).
359360

360361
<p align="center">
361362
<img src="./assets/img/utask_backoff.png" width="70%">
@@ -480,14 +481,16 @@ The `pre_hook` field of a step can be set to define an action that is executed b
480481
doSomeAuthPost:
481482
pre_hook:
482483
type: http
483-
method: "GET"
484-
url: "https://myAwesomeApi/otp"
484+
configuration:
485+
method: "GET"
486+
url: "https://example.org/otp"
485487
action:
486488
type: http
487-
method: "POST"
488-
url: "https://myAwesomeApi/doSomePost"
489-
headers:
490-
X-Otp: "{{ .pre_hook.output }}"
489+
configuration:
490+
method: "POST"
491+
url: "https://example.org/doSomePost"
492+
headers:
493+
X-Otp: "{{ .pre_hook.output }}"
491494
```
492495

493496
#### Functions <a name="functions"></a>
@@ -587,6 +590,39 @@ This output can be then passed to another step in json format:
587590
foreach: '{{.step.prefixStrings.children | toJson}}'
588591
```
589592

593+
#### Resources <a name="resources"></a>
594+
595+
Resources are a way to restrict the concurrency factor of certain operations, to control the throughput and avoid dangerous behavior e.g. flooding the targets.
596+
597+
High level view:
598+
599+
- For each action to execute, a list of target `resources` is determined. (see later)
600+
- In the µTask configuration, numerical limits can be set to each _resource_ label. This acts as a semaphore, allowing a certain number of concurrent slots for the given _resource_ label. If no limit is set for a resource label, the previously mentionned target resources have no effect. Limits are declared in the `resource_limits` property.
601+
602+
The target _resources_ for a step can be defined in its YAML definition, using the `resources` property.
603+
604+
```yaml
605+
steps:
606+
foobar:
607+
description: A dummy step, that should not execute in parallel
608+
resources: ["myLimitedResource"]
609+
action:
610+
type: echo
611+
configuration:
612+
output:
613+
foobar: fuzz
614+
```
615+
616+
Alternatively, some target resources are determined automatically by µTask Engine:
617+
618+
- When a task is run, the resource `template:my-template-name` is used automatically.
619+
- When a step is run, the plugin in charge of the execution automatically generates a list of resources. This includes generic resources such as `socket`, `url:www.example.org`, `fork`...
620+
allowing the µTask administrator to set-up generic limits such as `"socket": 48` or `"url:www.example.org": 1`.
621+
622+
Each builtin plugins declares resources which can be discovered using the _README_ of the plugin (example for [_http_ plugin](./pkg/plugins/builtin/script/README.md#Resources)).
623+
624+
Declared `resource_limits` must be positive integers. When a step is executed, if the number of concurrent executions is reached, the µTask Engine will wait for a slot to be released. If the resource is limited to the `0` value, then the step will not be executed and is set to `TO_RETRY` state, it will be run once the instance allows the execution of its resources. The default time that µTask Engine will wait for a resource to become available is `1 minute`, but it can be configured using the `resource_acquire_timeout` property.
625+
590626
### Task templates validation
591627

592628
A JSON-schema file is available to validate the syntax of task templates, it's available in `hack/template-schema.json`.

api/handler/resolution.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,24 @@ func RunResolution(c *gin.Context, in *runResolutionIn) error {
281281

282282
logrus.WithFields(logrus.Fields{"resolution_id": r.PublicID}).Debugf("Handler RunResolution: manual resolve %s", r.PublicID)
283283

284-
return engine.GetEngine().Resolve(in.PublicID, nil)
284+
ch := make(chan struct{})
285+
go func() {
286+
err = engine.GetEngine().Resolve(in.PublicID, nil)
287+
close(ch)
288+
}()
289+
290+
timeout := time.NewTicker(5 * time.Second)
291+
defer timeout.Stop()
292+
293+
// manual resolution can be blocked by a lock acquisition on the Execution pool
294+
// waiting for 5 seconds to get a response, otherwise let's consider the task will
295+
// start correctly when the Execution pool gets available, and prevent API thread to be blocked
296+
select {
297+
case <-ch:
298+
return err
299+
case <-timeout.C:
300+
return nil
301+
}
285302
}
286303

287304
type extendResolutionIn struct {

api/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func (s *Server) ListenAndServe() error {
106106
logrus.Info("Shutting down...")
107107
cancel()
108108

109-
if err := srv.Shutdown(ctx); err != nil {
109+
if err := srv.Shutdown(context.Background()); err != nil {
110110
logrus.Fatal(err)
111111
}
112112
}()

engine/engine.go

+42-14
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ var (
3636
eng Engine
3737

3838
// Used for stopping the current Engine
39-
stopRunningSteps chan struct{}
40-
gracePeriodEnd chan struct{}
39+
shutdownCtx context.Context
40+
gracePeriodEnd chan struct{}
4141
)
4242

4343
// Engine is the heart of utask: it is the active process
@@ -97,13 +97,11 @@ func Init(ctx context.Context, wg *sync.WaitGroup, store *configstore.Store) err
9797
}
9898

9999
// channels for handling graceful shutdown
100-
stopRunningSteps = make(chan struct{})
100+
shutdownCtx = ctx
101101
gracePeriodEnd = make(chan struct{})
102102
eng.wg = wg
103103
go func() {
104-
<-ctx.Done()
105-
// Stop running new steps
106-
close(stopRunningSteps)
104+
<-shutdownCtx.Done()
107105

108106
// Wait for the grace period to end
109107
time.Sleep(3 * time.Second)
@@ -215,14 +213,43 @@ func (e Engine) launchResolution(publicID string, async bool, sm *semaphore.Weig
215213

216214
res.Values.SetConfig(e.config)
217215

218-
// all ready, run remaining steps
219-
220-
utask.AcquireExecutionSlot()
221-
216+
// check if all resources are available before starting the resolution
217+
// first, check if we have a custom semaphore, for example, a semaphore that limits the concurrent execution of tasks recovery from a crashed instance.
218+
// This semaphore needs to go first, because it will always be smaller than the global execution pool.
222219
if sm != nil {
223-
sm.Acquire(context.Background(), 1)
220+
if err := sm.Acquire(shutdownCtx, 1); err != nil {
221+
debugLogger.Debugf("Engine: launchResolution() %s acquire resource: instance is shutting down", res.PublicID)
222+
return nil, errors.New("instance is shutting down")
223+
}
224+
}
225+
// second, check if we have a resource limit on the current template
226+
// template could be completely deactivated as a "dead resource". If it's the case, we need to exit because
227+
// the limit won't change until next instance's reboot.
228+
if acquiredErr := utask.AcquireResource(shutdownCtx, "template:"+t.TemplateName); acquiredErr != nil {
229+
if shutdownCtx.Err() != nil {
230+
debugLogger.Debugf("Engine: launchResolution() %s acquire resource: instance is shutting down", res.PublicID)
231+
return nil, errors.New("instance is shutting down")
232+
}
233+
debugLogger.Debugf("Engine: launchResolution() %s acquire resource %q: failed to acquire resource: %s", res.PublicID, "template:"+t.TemplateName, err)
234+
// otherwise, we either reached timeout on the lock for template, or the template is a "dead resource"
235+
t.SetState(task.StateBlocked)
236+
res.SetNextRetry(time.Now().Add(10 * time.Minute))
237+
res.SetState(resolution.StateToAutorunDelayed)
238+
if err := commit(dbp, res, t); err != nil {
239+
debugLogger.Debugf("Engine: launchResolution() %s acquire resource, FAILED TO COMMIT RESOLUTION: %s", res.PublicID, err)
240+
}
241+
if sm != nil {
242+
sm.Release(1)
243+
}
244+
return nil, fmt.Errorf("can't acquire lock for template %q: %s", t.TemplateName, err)
245+
}
246+
// finally, acquire the execution slot
247+
if acquiredErr := utask.AcquireExecutionSlot(shutdownCtx); acquiredErr != nil {
248+
debugLogger.Debugf("Engine: launchResolution() %s acquire resource: instance is shutting down", res.PublicID)
249+
return nil, errors.New("instance is shutting down")
224250
}
225251

252+
// all ready, run remaining steps
226253
recap := make([]string, 0)
227254
for name, s := range res.Steps {
228255
recap = append(recap, fmt.Sprintf("step %s = %s", name, s.State))
@@ -427,7 +454,7 @@ func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm
427454

428455
inShutdown := false
429456
select {
430-
case <-stopRunningSteps:
457+
case <-shutdownCtx.Done():
431458
inShutdown = true
432459
default:
433460
}
@@ -537,6 +564,7 @@ func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm
537564
sm.Release(1)
538565
}
539566

567+
utask.ReleaseResource("template:" + t.TemplateName)
540568
utask.ReleaseExecutionSlot()
541569
if err := resumeParentTask(dbp, t, sm, debugLogger); err != nil {
542570
debugLogger.WithError(err).Debugf("Engine: resolver(): failed to resume parent task: %s", err)
@@ -602,7 +630,7 @@ func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res
602630
expanded := 0
603631

604632
select {
605-
case <-stopRunningSteps:
633+
case <-shutdownCtx.Done():
606634
return 0
607635
default:
608636
for name, s := range av {
@@ -660,7 +688,7 @@ func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res
660688

661689
// run
662690
stepCopy := *s
663-
step.Run(&stepCopy, res.BaseConfigurations, res.Values, stepChan, wg, stopRunningSteps)
691+
step.Run(&stepCopy, res.BaseConfigurations, res.Values, stepChan, wg, shutdownCtx)
664692
}
665693
}
666694
}

engine/step/step.go

+31-15
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package step
22

33
import (
44
"bytes"
5+
"context"
56
"encoding/json"
67
"fmt"
78
"sort"
@@ -144,12 +145,12 @@ func uniqueSortedList(s []string) []string {
144145
}
145146

146147
type execution struct {
147-
baseCfgRaw json.RawMessage
148-
outputs []*executor.Output
149-
config json.RawMessage
150-
runner Runner
151-
ctx interface{}
152-
stopRunningSteps <-chan struct{}
148+
baseCfgRaw json.RawMessage
149+
outputs []*executor.Output
150+
config json.RawMessage
151+
runner Runner
152+
ctx interface{}
153+
shutdownCtx context.Context
153154
}
154155

155156
func (e *execution) generateOutput(st *Step, v *values.Values) error {
@@ -190,10 +191,10 @@ func (e *execution) generateOutput(st *Step, v *values.Values) error {
190191
return nil
191192
}
192193

193-
func (st *Step) generateExecution(action executor.Executor, baseConfig map[string]json.RawMessage, values *values.Values, stopRunningSteps <-chan struct{}) (*execution, error) {
194+
func (st *Step) generateExecution(action executor.Executor, baseConfig map[string]json.RawMessage, values *values.Values, shutdownCtx context.Context) (*execution, error) {
194195
var ret = execution{
195-
config: action.Configuration,
196-
stopRunningSteps: stopRunningSteps,
196+
config: action.Configuration,
197+
shutdownCtx: shutdownCtx,
197198
}
198199
var err error
199200

@@ -294,7 +295,7 @@ func (st *Step) generateExecution(action executor.Executor, baseConfig map[strin
294295
func (st *Step) execute(execution *execution, callback func(interface{}, interface{}, map[string]string, error)) {
295296

296297
select {
297-
case <-execution.stopRunningSteps:
298+
case <-execution.shutdownCtx.Done():
298299
st.State = StateToRetry
299300
return
300301
default:
@@ -303,7 +304,12 @@ func (st *Step) execute(execution *execution, callback func(interface{}, interfa
303304

304305
resources := append(execution.runner.Resources(execution.baseCfgRaw, execution.config), st.Resources...)
305306
limits := uniqueSortedList(resources)
306-
utask.AcquireResources(limits)
307+
if acquiredErr := utask.AcquireResources(execution.shutdownCtx, limits); acquiredErr != nil {
308+
// if resource acquisition takes too long (timeout or shutdown), let's put the step in ToRetry state
309+
// to release the Execution pool, or let the instance shutdowns correctly, as the step execution didn't started yet
310+
callback(`{}`, "", map[string]string{}, errors.NotProvisionedf("failed to acquire resources"))
311+
return
312+
}
307313
defer utask.ReleaseResources(limits)
308314

309315
output, metadata, tags, err := execution.runner.Exec(st.Name, execution.baseCfgRaw, execution.config, execution.ctx)
@@ -312,9 +318,9 @@ func (st *Step) execute(execution *execution, callback func(interface{}, interfa
312318

313319
// Run carries out the action defined by a Step, by providing values to its configuration
314320
// - a stepChan channel is provided for committing the result back
315-
// - a stopRunningSteps channel is provided to interrupt execution in flight
321+
// - a shutdownCtx context is provided to interrupt execution in flight
316322
// values IS NOT CONCURRENT SAFE, DO NOT SHARE WITH OTHER GOROUTINES
317-
func Run(st *Step, baseConfig map[string]json.RawMessage, stepValues *values.Values, stepChan chan<- *Step, wg *sync.WaitGroup, stopRunningSteps <-chan struct{}) {
323+
func Run(st *Step, baseConfig map[string]json.RawMessage, stepValues *values.Values, stepChan chan<- *Step, wg *sync.WaitGroup, shutdownCtx context.Context) {
318324

319325
// Step already ran, directly going to afterrun process
320326
if st.State == StateAfterrunError {
@@ -351,7 +357,7 @@ func Run(st *Step, baseConfig map[string]json.RawMessage, stepValues *values.Val
351357
preHookValues := stepValues.Clone()
352358
var preHookWg sync.WaitGroup
353359
if prehook != nil {
354-
preHookExecution, err := st.generateExecution(*prehook, baseConfig, stepValues, stopRunningSteps)
360+
preHookExecution, err := st.generateExecution(*prehook, baseConfig, stepValues, shutdownCtx)
355361
if err != nil {
356362
st.State = StateFatalError
357363
st.Error = fmt.Sprintf("prehook: %s", err)
@@ -384,8 +390,18 @@ func Run(st *Step, baseConfig map[string]json.RawMessage, stepValues *values.Val
384390
// Wait the prehook execution is done
385391
preHookWg.Wait()
386392

393+
// after pre-hook execution, let's verify if we are not shutting down the instance
394+
// in that case, we can just put the step in ToRetry instead of starting the main execution of the step
395+
select {
396+
case <-shutdownCtx.Done():
397+
st.State = StateToRetry
398+
go noopStep(st, stepChan)
399+
return
400+
default:
401+
}
402+
387403
// Generate the execution
388-
execution, err := st.generateExecution(st.Action, baseConfig, preHookValues, stopRunningSteps)
404+
execution, err := st.generateExecution(st.Action, baseConfig, preHookValues, shutdownCtx)
389405
if err != nil {
390406
st.State = StateFatalError
391407
st.Error = err.Error()

0 commit comments

Comments
 (0)