Skip to content

Commit 5bbfe82

Browse files
authored
Fix ordering of GSI updates when switching from PAY_PER_REQUEST to PROVISIONED billing mode (#145)
Issue #, if available: [2610](aws-controllers-k8s/community#2610) Description of changes: - Separate functions for adding, updating, and deleting GSIs - Update customUpdate function for Table to modify GSIs based on delta of billing mode - add e2e tests to validate GSI updates when changing billing mode By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
1 parent cc6a5bb commit 5bbfe82

File tree

6 files changed

+713
-74
lines changed

6 files changed

+713
-74
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
*.swp
33
*~
44
.idea
5+
.vscode
56
/docs/site
67
bin
78
build

pkg/resource/table/hooks.go

Lines changed: 82 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -208,10 +208,26 @@ func (rm *resourceManager) customUpdateTable(
208208
}
209209
}
210210

211+
addedGSIs, updatedGSIs, removedGSIs := computeGlobalSecondaryIndexDelta(
212+
latest.ko.Spec.GlobalSecondaryIndexes,
213+
desired.ko.Spec.GlobalSecondaryIndexes,
214+
)
215+
216+
// Delete GSIs that have been removed first to avoid errors when updating table properties
217+
// where required values have not been set for removed GSIs.
218+
if delta.DifferentAt("Spec.GlobalSecondaryIndexes") && len(removedGSIs) > 0 {
219+
err = rm.deleteGSIs(ctx, desired, latest, removedGSIs)
220+
if err != nil {
221+
return nil, err
222+
}
223+
}
224+
225+
// If billing mode changing from PAY_PER_REQUEST to PROVISIONED, need to include all GSI updates. Otherwise,
226+
// need to perform GSI updates one at a time afterwards.
211227
if delta.DifferentAt("Spec.BillingMode") ||
212228
delta.DifferentAt("Spec.TableClass") || delta.DifferentAt("Spec.DeletionProtectionEnabled") {
213-
if err := rm.syncTable(ctx, desired, delta); err != nil {
214-
return nil, fmt.Errorf("cannot update table %v", err)
229+
if err := rm.syncTable(ctx, desired, latest, delta); err != nil {
230+
return nil, err
215231
}
216232
}
217233

@@ -229,25 +245,29 @@ func (rm *resourceManager) customUpdateTable(
229245
}
230246
}
231247

248+
// Update any GSIs that have been modified.
249+
if delta.DifferentAt("Spec.GlobalSecondaryIndexes") && len(updatedGSIs) > 0 {
250+
if err := rm.updateGSIs(ctx, desired, latest, updatedGSIs); err != nil {
251+
return nil, err
252+
}
253+
}
254+
232255
// We want to update fast fields first
233256
// Then attributes
234257
// then GSI
235258
if delta.DifferentExcept("Spec.Tags", "Spec.TimeToLive") {
236259
switch {
237260
case delta.DifferentAt("Spec.StreamSpecification"):
238-
if err := rm.syncTable(ctx, desired, delta); err != nil {
261+
if err := rm.syncTable(ctx, desired, latest, delta); err != nil {
239262
return nil, err
240263
}
241264
case delta.DifferentAt("Spec.ProvisionedThroughput"):
242265
if err := rm.syncTableProvisionedThroughput(ctx, desired); err != nil {
243266
return nil, err
244267
}
245-
case delta.DifferentAt("Spec.GlobalSecondaryIndexes"):
246-
if err := rm.syncTableGlobalSecondaryIndexes(ctx, latest, desired); err != nil {
247-
if awsErr, ok := ackerr.AWSError(err); ok &&
248-
awsErr.ErrorCode() == "LimitExceededException" {
249-
return nil, requeueWaitGSIReady
250-
}
268+
// Create any new GSIs once all existing GSI have been updated.
269+
case delta.DifferentAt("Spec.GlobalSecondaryIndexes") && len(addedGSIs) > 0:
270+
if err := rm.addGSIs(ctx, desired, latest, addedGSIs); err != nil {
251271
return nil, err
252272
}
253273
case delta.DifferentAt("Spec.TableReplicas"):
@@ -271,74 +291,105 @@ func (rm *resourceManager) customUpdateTable(
271291
// or SSE specification.
272292
func (rm *resourceManager) syncTable(
273293
ctx context.Context,
274-
r *resource,
294+
desired *resource,
295+
latest *resource,
275296
delta *ackcompare.Delta,
276297
) (err error) {
277298
rlog := ackrtlog.FromContext(ctx)
278299
exit := rlog.Trace("rm.syncTable")
279300
defer exit(err)
280301

281-
input, err := rm.newUpdateTablePayload(ctx, r, delta)
302+
input, err := rm.newUpdateTablePayload(ctx, desired, latest, delta)
282303
if err != nil {
283304
return err
284305
}
285306
_, err = rm.sdkapi.UpdateTable(ctx, input)
286307
rm.metrics.RecordAPICall("UPDATE", "UpdateTable", err)
287308
if err != nil {
288-
return err
309+
return fmt.Errorf("cannot update table %v", err)
289310
}
311+
// If GSI update were included in the table update we need to requeue.
312+
if len(input.GlobalSecondaryIndexUpdates) > 0 {
313+
return requeueWaitGSIReady
314+
}
315+
290316
return nil
291317
}
292318

293319
// newUpdateTablePayload constructs the updateTableInput object.
294320
func (rm *resourceManager) newUpdateTablePayload(
295321
ctx context.Context,
296-
r *resource,
322+
desired *resource,
323+
latest *resource,
297324
delta *ackcompare.Delta,
298325
) (*svcsdk.UpdateTableInput, error) {
299326
input := &svcsdk.UpdateTableInput{
300-
TableName: aws.String(*r.ko.Spec.TableName),
327+
TableName: aws.String(*desired.ko.Spec.TableName),
301328
}
302-
329+
latestBillingMode := svcsdktypes.BillingMode(*latest.ko.Spec.BillingMode)
303330
if delta.DifferentAt("Spec.BillingMode") {
304-
if r.ko.Spec.BillingMode != nil {
305-
input.BillingMode = svcsdktypes.BillingMode(*r.ko.Spec.BillingMode)
331+
if desired.ko.Spec.BillingMode != nil {
332+
input.BillingMode = svcsdktypes.BillingMode(*desired.ko.Spec.BillingMode)
306333
} else {
307334
// set biling mode to the default value `PROVISIONED`
308335
input.BillingMode = svcsdktypes.BillingModeProvisioned
309336
}
337+
310338
if input.BillingMode == svcsdktypes.BillingModeProvisioned {
311339
input.ProvisionedThroughput = &svcsdktypes.ProvisionedThroughput{}
312-
if r.ko.Spec.ProvisionedThroughput != nil {
313-
if r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits != nil {
340+
if desired.ko.Spec.ProvisionedThroughput != nil {
341+
if desired.ko.Spec.ProvisionedThroughput.ReadCapacityUnits != nil {
314342
input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64(
315-
*r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits,
343+
*desired.ko.Spec.ProvisionedThroughput.ReadCapacityUnits,
316344
)
317345
} else {
318346
input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64(0)
319347
}
320348

321-
if r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits != nil {
349+
if desired.ko.Spec.ProvisionedThroughput.WriteCapacityUnits != nil {
322350
input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64(
323-
*r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits,
351+
*desired.ko.Spec.ProvisionedThroughput.WriteCapacityUnits,
324352
)
325353
} else {
326354
input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64(0)
327355
}
328356
}
329357
}
358+
359+
// If billing mode is changing from PAY_PER_REQUEST to PROVISIONED we need to include all GSI updates
360+
if latestBillingMode == svcsdktypes.BillingModePayPerRequest && input.BillingMode == svcsdktypes.BillingModeProvisioned {
361+
_, updatedGSIs, _ := computeGlobalSecondaryIndexDelta(
362+
latest.ko.Spec.GlobalSecondaryIndexes,
363+
desired.ko.Spec.GlobalSecondaryIndexes,
364+
)
365+
366+
// DynamoDB API fails if GSI updates are empty. Only set GlobalSecondaryIndexUpdates
367+
// if there are GSIs to update.
368+
if len(updatedGSIs) > 0 {
369+
input.GlobalSecondaryIndexUpdates = []svcsdktypes.GlobalSecondaryIndexUpdate{}
370+
for _, updatedGSI := range updatedGSIs {
371+
update := svcsdktypes.GlobalSecondaryIndexUpdate{
372+
Update: &svcsdktypes.UpdateGlobalSecondaryIndexAction{
373+
IndexName: aws.String(*updatedGSI.IndexName),
374+
ProvisionedThroughput: newSDKProvisionedThroughput(updatedGSI.ProvisionedThroughput),
375+
},
376+
}
377+
input.GlobalSecondaryIndexUpdates = append(input.GlobalSecondaryIndexUpdates, update)
378+
}
379+
}
380+
}
330381
}
331382
if delta.DifferentAt("Spec.StreamSpecification") {
332-
if r.ko.Spec.StreamSpecification != nil {
333-
if r.ko.Spec.StreamSpecification.StreamEnabled != nil {
383+
if desired.ko.Spec.StreamSpecification != nil {
384+
if desired.ko.Spec.StreamSpecification.StreamEnabled != nil {
334385
input.StreamSpecification = &svcsdktypes.StreamSpecification{
335-
StreamEnabled: aws.Bool(*r.ko.Spec.StreamSpecification.StreamEnabled),
386+
StreamEnabled: aws.Bool(*desired.ko.Spec.StreamSpecification.StreamEnabled),
336387
}
337388
// Only set streamViewType when streamSpefication is enabled and streamViewType is non-nil.
338-
if *r.ko.Spec.StreamSpecification.StreamEnabled &&
339-
r.ko.Spec.StreamSpecification.StreamViewType != nil {
389+
if *desired.ko.Spec.StreamSpecification.StreamEnabled &&
390+
desired.ko.Spec.StreamSpecification.StreamViewType != nil {
340391
input.StreamSpecification.StreamViewType = svcsdktypes.StreamViewType(
341-
*r.ko.Spec.StreamSpecification.StreamViewType,
392+
*desired.ko.Spec.StreamSpecification.StreamViewType,
342393
)
343394
}
344395
} else {
@@ -349,13 +400,13 @@ func (rm *resourceManager) newUpdateTablePayload(
349400
}
350401
}
351402
if delta.DifferentAt("Spec.TableClass") {
352-
if r.ko.Spec.TableClass != nil {
353-
input.TableClass = svcsdktypes.TableClass(*r.ko.Spec.TableClass)
403+
if desired.ko.Spec.TableClass != nil {
404+
input.TableClass = svcsdktypes.TableClass(*desired.ko.Spec.TableClass)
354405
}
355406
}
356407

357408
if delta.DifferentAt("Spec.DeletionProtectionEnabled") {
358-
input.DeletionProtectionEnabled = aws.Bool(*r.ko.Spec.DeletionProtectionEnabled)
409+
input.DeletionProtectionEnabled = aws.Bool(*desired.ko.Spec.DeletionProtectionEnabled)
359410
}
360411

361412
return input, nil

0 commit comments

Comments
 (0)