Skip to content

Commit c800fc8

Browse files
committed
feat: add configurable stack concurrency limit
Add support for limiting concurrent stack reconciliations to prevent cluster overload. This feature uses the native controller-runtime MaxConcurrentReconciles mechanism. Key changes: - Add MaxConcurrentReconciles support in core reconciler - Create GetStackConcurrency() to read STACK_MAX_CONCURRENT env var - Configure Helm chart with stackMaxConcurrent parameter (default: 5) - Add comprehensive Helm documentation in STACK_CONCURRENCY.md - Minor formatting fixes in unrelated files The default limit of 5 concurrent reconciliations provides a good balance for most clusters. Can be configured via Helm values or set to 0 for unlimited concurrency.
1 parent e6d59a9 commit c800fc8

File tree

10 files changed

+295
-8
lines changed

10 files changed

+295
-8
lines changed

api/formance.com/v1beta1/benthos_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type BenthosSpec struct {
3737
//+optional
3838
Batching *Batching `json:"batching,omitempty"`
3939
//+optional
40-
InitContainers []corev1.Container `json:"initContainers"`
40+
InitContainers []corev1.Container `json:"initContainers"`
4141
ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"`
4242
}
4343

helm/operator/STACK_CONCURRENCY.md

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
# Stack Concurrency Configuration
2+
3+
## Overview
4+
5+
Control the number of Stack reconciliations that run in parallel to prevent cluster overload.
6+
7+
## Configuration
8+
9+
### Via Helm Values
10+
11+
Edit your `values.yaml` or use `--set`:
12+
13+
```yaml
14+
operator:
15+
stackMaxConcurrent: 5 # Max 5 concurrent stack reconciliations
16+
```
17+
18+
Or with Helm command:
19+
20+
```bash
21+
helm install operator ./helm/operator \
22+
--set operator.stackMaxConcurrent=5
23+
```
24+
25+
### Default Behavior
26+
27+
- **Default value: `5`** (good balance for most clusters)
28+
- Set to `0` to disable the limit (unlimited concurrency)
29+
30+
## Recommended Values
31+
32+
| Cluster Size | Stacks | CPU | Recommended Value |
33+
|-------------|--------|-----|-------------------|
34+
| Small | 1-10 | 2-4 CPU | `2-3` |
35+
| Medium | 10-30 | 4-8 CPU | `5` |
36+
| Large | 30-100 | 8-16 CPU | `10` |
37+
| XL | 100+ | 16+ CPU | `20` |
38+
39+
## Examples
40+
41+
### Example 1: Small Cluster
42+
43+
```yaml
44+
# values.yaml
45+
operator:
46+
stackMaxConcurrent: 3
47+
```
48+
49+
```bash
50+
helm upgrade operator ./helm/operator -f values.yaml
51+
```
52+
53+
### Example 2: Production Cluster
54+
55+
```yaml
56+
# values-prod.yaml
57+
operator:
58+
stackMaxConcurrent: 10
59+
enableLeaderElection: true
60+
region: "eu-west-1"
61+
env: "production"
62+
```
63+
64+
```bash
65+
helm upgrade operator ./helm/operator -f values-prod.yaml
66+
```
67+
68+
### Example 3: Override with --set
69+
70+
```bash
71+
helm upgrade operator ./helm/operator \
72+
--set operator.stackMaxConcurrent=5 \
73+
--set operator.region=us-east-1
74+
```
75+
76+
## How It Works
77+
78+
1. The Helm chart sets the `STACK_MAX_CONCURRENT` environment variable
79+
2. The operator reads this value on startup
80+
3. Stack reconciliations are limited to N concurrent executions
81+
4. Additional stacks are queued automatically by Kubernetes
82+
83+
### Behavior
84+
85+
**Without limit (default):**
86+
```
87+
Stack A ──┐
88+
Stack B ──┤
89+
Stack C ──┼─> All processed in parallel
90+
Stack D ──┤
91+
Stack E ──┘
92+
```
93+
94+
**With limit of 5:**
95+
```
96+
Stack A ──┐
97+
Stack B ──┤
98+
Stack C ──┼─> Max 5 in parallel
99+
Stack D ──┤
100+
Stack E ──┘
101+
Stack F ──> Queued (waiting)
102+
Stack G ──> Queued (waiting)
103+
```
104+
105+
## Verification
106+
107+
Check if the environment variable is set:
108+
109+
```bash
110+
# Get the pod name
111+
POD=$(kubectl get pods -n formance-system -l control-plane=formance-controller-manager -o jsonpath='{.items[0].metadata.name}')
112+
113+
# Check environment variables
114+
kubectl exec -n formance-system $POD -- env | grep STACK_MAX_CONCURRENT
115+
```
116+
117+
Expected output:
118+
```
119+
STACK_MAX_CONCURRENT=5
120+
```
121+
122+
## Troubleshooting
123+
124+
### Value not applied
125+
126+
1. **Check Helm values:**
127+
```bash
128+
helm get values operator -n formance-system
129+
```
130+
131+
2. **Verify deployment:**
132+
```bash
133+
kubectl get deployment operator-manager -n formance-system -o yaml | grep -A 2 "STACK_MAX_CONCURRENT"
134+
```
135+
136+
3. **Restart pods to apply changes:**
137+
```bash
138+
kubectl rollout restart deployment operator-manager -n formance-system
139+
```
140+
141+
### Performance Issues
142+
143+
**Too many concurrent reconciliations:**
144+
- Symptoms: High CPU/memory, slow reconciliations
145+
- Solution: Lower the value (e.g., from 10 to 5)
146+
147+
**Too few concurrent reconciliations:**
148+
- Symptoms: Long queue times, slow stack deployments
149+
- Solution: Increase the value (e.g., from 5 to 10)
150+
151+
## Monitoring
152+
153+
### Check current stack status
154+
155+
```bash
156+
# Total stacks
157+
kubectl get stacks --all-namespaces --no-headers | wc -l
158+
159+
# Ready stacks
160+
kubectl get stacks --all-namespaces -o json | jq '[.items[] | select(.status.ready==true)] | length'
161+
162+
# Not ready stacks (in queue or processing)
163+
kubectl get stacks --all-namespaces -o json | jq '[.items[] | select(.status.ready==false)] | length'
164+
```
165+
166+
### Operator logs
167+
168+
```bash
169+
kubectl logs -n formance-system -l control-plane=formance-controller-manager -f
170+
```
171+
172+
## Advanced Usage
173+
174+
### Environment-specific values
175+
176+
```yaml
177+
# values-dev.yaml
178+
operator:
179+
stackMaxConcurrent: 2
180+
env: "dev"
181+
182+
# values-staging.yaml
183+
operator:
184+
stackMaxConcurrent: 5
185+
env: "staging"
186+
187+
# values-prod.yaml
188+
operator:
189+
stackMaxConcurrent: 10
190+
env: "production"
191+
```
192+
193+
Deploy:
194+
```bash
195+
helm upgrade operator ./helm/operator -f values-prod.yaml
196+
```
197+
198+
### ArgoCD / GitOps
199+
200+
```yaml
201+
# argocd-application.yaml
202+
apiVersion: argoproj.io/v1alpha1
203+
kind: Application
204+
metadata:
205+
name: formance-operator
206+
spec:
207+
source:
208+
helm:
209+
values: |
210+
operator:
211+
stackMaxConcurrent: 5
212+
region: "eu-west-1"
213+
env: "production"
214+
```
215+
216+
## Technical Details
217+
218+
### Implementation
219+
220+
- **Environment Variable:** `STACK_MAX_CONCURRENT`
221+
- **Read by:** `internal/resources/stacks/config.go::GetStackConcurrency()`
222+
- **Applied in:** `internal/resources/stacks/init.go`
223+
- **Uses:** Native controller-runtime `MaxConcurrentReconciles`
224+
225+
### Source Code
226+
227+
```go
228+
// internal/resources/stacks/config.go
229+
func GetStackConcurrency() int {
230+
if v := os.Getenv("STACK_MAX_CONCURRENT"); v != "" {
231+
if n, err := strconv.Atoi(v); err == nil && n > 0 {
232+
return n
233+
}
234+
}
235+
return 0 // Default: unlimited
236+
}
237+
```
238+
239+
## Related Documentation
240+
241+
- [How to Limit Concurrent Stacks](../../docs/HOW_TO_LIMIT_CONCURRENT_STACKS.md)
242+
- [Concurrent Limit Implementation](../../CONCURRENT_LIMIT_IMPLEMENTATION.md)

helm/operator/templates/deployment.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ spec:
8181
port: {{ regexReplaceAll ":" .Values.operator.probeAddr "" | default "8081" }}
8282
initialDelaySeconds: 5
8383
periodSeconds: 10
84+
{{- if .Values.operator.stackMaxConcurrent }}
85+
env:
86+
- name: STACK_MAX_CONCURRENT
87+
value: {{ .Values.operator.stackMaxConcurrent | quote }}
88+
{{- end }}
8489
resources:
8590
{{- toYaml .Values.resources | nindent 12 }}
8691
{{- if .Values.webhooks.enabled }}

helm/operator/values.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ operator:
4949
# Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.
5050
enableLeaderElection: true
5151

52+
# Maximum number of concurrent stack reconciliations
53+
# Set to 0 for unlimited
54+
# Recommended values: 5 for small/medium clusters, 10 for large, 20 for XL
55+
# @section -- Stack Concurrency
56+
stackMaxConcurrent: 5
57+
5258
utils:
5359
tag: ""
5460

internal/core/reconciler.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/pkg/errors"
1313
"k8s.io/apimachinery/pkg/runtime"
14+
controllerconfig "sigs.k8s.io/controller-runtime/pkg/controller"
1415
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
1516

1617
"k8s.io/client-go/util/workqueue"
@@ -61,10 +62,11 @@ type finalizerConfig[T client.Object] struct {
6162
}
6263

6364
type ReconcilerOptions[T client.Object] struct {
64-
Owns map[client.Object][]builder.OwnsOption
65-
Watchers map[client.Object]ReconcilerOptionsWatch
66-
Finalizers []finalizerConfig[T]
67-
Raws []func(Context, *builder.Builder) error
65+
Owns map[client.Object][]builder.OwnsOption
66+
Watchers map[client.Object]ReconcilerOptionsWatch
67+
Finalizers []finalizerConfig[T]
68+
Raws []func(Context, *builder.Builder) error
69+
MaxConcurrentReconciles int
6870
}
6971

7072
type ReconcilerOption[T client.Object] func(*ReconcilerOptions[T])
@@ -81,6 +83,12 @@ func WithRaw[T client.Object](fn func(Context, *builder.Builder) error) Reconcil
8183
}
8284
}
8385

86+
func WithMaxConcurrentReconciles[T client.Object](max int) ReconcilerOption[T] {
87+
return func(options *ReconcilerOptions[T]) {
88+
options.MaxConcurrentReconciles = max
89+
}
90+
}
91+
8492
func BuildReconcileRequests(ctx context.Context, client client.Client, scheme *runtime.Scheme, target client.Object, opts ...client.ListOption) []reconcile.Request {
8593
kinds, _, err := scheme.ObjectKinds(target)
8694
if err != nil {
@@ -226,6 +234,13 @@ func withReconciler[T client.Object](controller ObjectController[T], opts ...Rec
226234
}
227235
}
228236

237+
// Appliquer MaxConcurrentReconciles si spécifié
238+
if options.MaxConcurrentReconciles > 0 {
239+
b = b.WithOptions(controllerconfig.Options{
240+
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
241+
})
242+
}
243+
229244
return b.Complete(reconcile.Func(reconcileObject(mgr, controller, options)))
230245
}
231246
}

internal/resources/orchestrations/deployments.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func createDeployment(
184184
Spec: appsv1.DeploymentSpec{
185185
Template: corev1.PodTemplateSpec{
186186
Spec: corev1.PodSpec{
187-
ImagePullSecrets: imageConfiguration.PullSecrets,
187+
ImagePullSecrets: imageConfiguration.PullSecrets,
188188
ServiceAccountName: serviceAccountName,
189189
Containers: []corev1.Container{{
190190
Name: "api",

internal/resources/payments/deployments.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ func createFullDeployment(
290290
return nil
291291
}
292292

293-
func createWorkerDeployment(ctx core.Context, stack *v1beta1.Stack, payments *v1beta1.Payments, database *v1beta1.Database, imageConfiguration *registries.ImageConfiguration, env []v1.EnvVar, appOpts applications.ProbeOpts, ) error {
293+
func createWorkerDeployment(ctx core.Context, stack *v1beta1.Stack, payments *v1beta1.Payments, database *v1beta1.Database, imageConfiguration *registries.ImageConfiguration, env []v1.EnvVar, appOpts applications.ProbeOpts) error {
294294
serviceAccountName, err := settings.GetAWSServiceAccount(ctx, stack.Name)
295295
if err != nil {
296296
return err
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package stacks
2+
3+
import (
4+
"os"
5+
"strconv"
6+
)
7+
8+
// GetStackConcurrency returns the maximum number of concurrent stack reconciliations
9+
// from the STACK_MAX_CONCURRENT environment variable, or a default value of 5
10+
func GetStackConcurrency() int {
11+
if v := os.Getenv("STACK_MAX_CONCURRENT"); v != "" {
12+
if n, err := strconv.Atoi(v); err == nil && n >= 0 {
13+
return n
14+
}
15+
}
16+
// Default: 5 concurrent reconciliations (good balance for most clusters)
17+
return 5
18+
}

internal/resources/stacks/init.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,7 @@ func init() {
366366
}),
367367
WithStdReconciler(Reconcile,
368368
WithOwn[*v1beta1.Stack](&corev1.Namespace{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})),
369+
WithMaxConcurrentReconciles[*v1beta1.Stack](GetStackConcurrency()),
369370
WithRaw[*v1beta1.Stack](func(ctx Context, b *builder.Builder) error {
370371
for _, rtype := range ctx.GetScheme().AllKnownTypes() {
371372
v := reflect.New(rtype).Interface()

pkg/client/formance.com/v1beta1/databases.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,4 @@ func (c *databasesClient) Watch(ctx context.Context, opts metav1.ListOptions) (w
5151
Resource("Databases").
5252
VersionedParams(&opts, scheme.ParameterCodec).
5353
Watch(ctx)
54-
}
54+
}

0 commit comments

Comments
 (0)