From e2b433afda77836666662b4d90e093ac0a9297df Mon Sep 17 00:00:00 2001 From: Kun Zhao Date: Mon, 1 Jun 2026 20:22:41 -0700 Subject: [PATCH] feat: Dual-write Expected* update and delete to Flow Add Update*OnFlow and Delete*OnFlow activities for ExpectedRack, ExpectedMachine (single + batch), ExpectedPowerShelf, and ExpectedSwitch, wire them into the existing workflows after the OnSite call as best-effort follow-ups, and register them with the Temporal worker. Reuse Flow's existing PatchRack / DeleteRack / PatchComponent / DeleteComponent RPCs. Extend the activity and workflow tests to cover the new Flow paths. Signed-off-by: Kun Zhao --- .../managers/expectedmachine/subscriber.go | 12 ++ .../managers/expectedpowershelf/subscriber.go | 8 + .../managers/expectedrack/subscriber.go | 8 + .../managers/expectedswitch/subscriber.go | 8 + site-workflow/pkg/activity/expectedmachine.go | 153 ++++++++++++++++++ .../pkg/activity/expectedmachine_test.go | 80 +++++++++ .../pkg/activity/expectedpowershelf.go | 75 +++++++++ .../pkg/activity/expectedpowershelf_test.go | 62 +++++++ site-workflow/pkg/activity/expectedrack.go | 73 +++++++++ .../pkg/activity/expectedrack_test.go | 52 ++++++ site-workflow/pkg/activity/expectedswitch.go | 75 +++++++++ .../pkg/activity/expectedswitch_test.go | 62 +++++++ site-workflow/pkg/workflow/expectedmachine.go | 26 ++- .../pkg/workflow/expectedmachine_test.go | 104 ++++++++++-- .../pkg/workflow/expectedpowershelf.go | 18 ++- .../pkg/workflow/expectedpowershelf_test.go | 57 ++++++- site-workflow/pkg/workflow/expectedrack.go | 22 ++- .../pkg/workflow/expectedrack_test.go | 55 ++++++- site-workflow/pkg/workflow/expectedswitch.go | 18 ++- .../pkg/workflow/expectedswitch_test.go | 57 ++++++- 20 files changed, 968 insertions(+), 57 deletions(-) diff --git a/site-agent/pkg/components/managers/expectedmachine/subscriber.go b/site-agent/pkg/components/managers/expectedmachine/subscriber.go index 52d234c05..b930a8a2f 100644 --- a/site-agent/pkg/components/managers/expectedmachine/subscriber.go +++ b/site-agent/pkg/components/managers/expectedmachine/subscriber.go @@ -49,10 +49,18 @@ func (api *API) RegisterSubscriber() error { ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedMachineManager.UpdateExpectedMachineOnSite) ManagerAccess.Data.EB.Log.Info().Msg("ExpectedMachine: Successfully registered UpdateExpectedMachineOnSite activity") + // Register UpdateExpectedMachineOnFlow activity + ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedMachineManager.UpdateExpectedMachineOnFlow) + ManagerAccess.Data.EB.Log.Info().Msg("ExpectedMachine: Successfully registered UpdateExpectedMachineOnFlow activity") + // Register DeleteExpectedMachineOnSite activity ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedMachineManager.DeleteExpectedMachineOnSite) ManagerAccess.Data.EB.Log.Info().Msg("ExpectedMachine: Successfully registered DeleteExpectedMachineOnSite activity") + // Register DeleteExpectedMachineOnFlow activity + ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedMachineManager.DeleteExpectedMachineOnFlow) + ManagerAccess.Data.EB.Log.Info().Msg("ExpectedMachine: Successfully registered DeleteExpectedMachineOnFlow activity") + // Register CreateExpectedMachinesOnSite activity (Batch) ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedMachineManager.CreateExpectedMachinesOnSite) ManagerAccess.Data.EB.Log.Info().Msg("ExpectedMachine: Successfully registered CreateExpectedMachinesOnSite activity") @@ -65,5 +73,9 @@ func (api *API) RegisterSubscriber() error { ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedMachineManager.UpdateExpectedMachinesOnSite) ManagerAccess.Data.EB.Log.Info().Msg("ExpectedMachine: Successfully registered UpdateExpectedMachinesOnSite activity") + // Register UpdateExpectedMachinesOnFlow activity (Batch) + ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedMachineManager.UpdateExpectedMachinesOnFlow) + ManagerAccess.Data.EB.Log.Info().Msg("ExpectedMachine: Successfully registered UpdateExpectedMachinesOnFlow activity") + return nil } diff --git a/site-agent/pkg/components/managers/expectedpowershelf/subscriber.go b/site-agent/pkg/components/managers/expectedpowershelf/subscriber.go index d7cd611de..d3d4bdaf2 100644 --- a/site-agent/pkg/components/managers/expectedpowershelf/subscriber.go +++ b/site-agent/pkg/components/managers/expectedpowershelf/subscriber.go @@ -41,9 +41,17 @@ func (api *API) RegisterSubscriber() error { ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedPowerShelfManager.UpdateExpectedPowerShelfOnSite) ManagerAccess.Data.EB.Log.Info().Msg("ExpectedPowerShelf: Successfully registered UpdateExpectedPowerShelfOnSite activity") + // Register UpdateExpectedPowerShelfOnFlow activity + ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedPowerShelfManager.UpdateExpectedPowerShelfOnFlow) + ManagerAccess.Data.EB.Log.Info().Msg("ExpectedPowerShelf: Successfully registered UpdateExpectedPowerShelfOnFlow activity") + // Register DeleteExpectedPowerShelfOnSite activity ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedPowerShelfManager.DeleteExpectedPowerShelfOnSite) ManagerAccess.Data.EB.Log.Info().Msg("ExpectedPowerShelf: Successfully registered DeleteExpectedPowerShelfOnSite activity") + // Register DeleteExpectedPowerShelfOnFlow activity + ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedPowerShelfManager.DeleteExpectedPowerShelfOnFlow) + ManagerAccess.Data.EB.Log.Info().Msg("ExpectedPowerShelf: Successfully registered DeleteExpectedPowerShelfOnFlow activity") + return nil } diff --git a/site-agent/pkg/components/managers/expectedrack/subscriber.go b/site-agent/pkg/components/managers/expectedrack/subscriber.go index 852b1f456..cd14e13cd 100644 --- a/site-agent/pkg/components/managers/expectedrack/subscriber.go +++ b/site-agent/pkg/components/managers/expectedrack/subscriber.go @@ -49,10 +49,18 @@ func (api *API) RegisterSubscriber() error { ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedRackManager.UpdateExpectedRackOnSite) ManagerAccess.Data.EB.Log.Info().Msg("ExpectedRack: Successfully registered UpdateExpectedRackOnSite activity") + // Register UpdateExpectedRackOnFlow activity + ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedRackManager.UpdateExpectedRackOnFlow) + ManagerAccess.Data.EB.Log.Info().Msg("ExpectedRack: Successfully registered UpdateExpectedRackOnFlow activity") + // Register DeleteExpectedRackOnSite activity ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedRackManager.DeleteExpectedRackOnSite) ManagerAccess.Data.EB.Log.Info().Msg("ExpectedRack: Successfully registered DeleteExpectedRackOnSite activity") + // Register DeleteExpectedRackOnFlow activity + ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedRackManager.DeleteExpectedRackOnFlow) + ManagerAccess.Data.EB.Log.Info().Msg("ExpectedRack: Successfully registered DeleteExpectedRackOnFlow activity") + // Register ReplaceAllExpectedRacksOnSite activity ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedRackManager.ReplaceAllExpectedRacksOnSite) ManagerAccess.Data.EB.Log.Info().Msg("ExpectedRack: Successfully registered ReplaceAllExpectedRacksOnSite activity") diff --git a/site-agent/pkg/components/managers/expectedswitch/subscriber.go b/site-agent/pkg/components/managers/expectedswitch/subscriber.go index e0dd26425..55fe1dbcc 100644 --- a/site-agent/pkg/components/managers/expectedswitch/subscriber.go +++ b/site-agent/pkg/components/managers/expectedswitch/subscriber.go @@ -41,9 +41,17 @@ func (api *API) RegisterSubscriber() error { ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedSwitchManager.UpdateExpectedSwitchOnSite) ManagerAccess.Data.EB.Log.Info().Msg("ExpectedSwitch: Successfully registered UpdateExpectedSwitchOnSite activity") + // Register UpdateExpectedSwitchOnFlow activity + ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedSwitchManager.UpdateExpectedSwitchOnFlow) + ManagerAccess.Data.EB.Log.Info().Msg("ExpectedSwitch: Successfully registered UpdateExpectedSwitchOnFlow activity") + // Register DeleteExpectedSwitchOnSite activity ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedSwitchManager.DeleteExpectedSwitchOnSite) ManagerAccess.Data.EB.Log.Info().Msg("ExpectedSwitch: Successfully registered DeleteExpectedSwitchOnSite activity") + // Register DeleteExpectedSwitchOnFlow activity + ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedSwitchManager.DeleteExpectedSwitchOnFlow) + ManagerAccess.Data.EB.Log.Info().Msg("ExpectedSwitch: Successfully registered DeleteExpectedSwitchOnFlow activity") + return nil } diff --git a/site-workflow/pkg/activity/expectedmachine.go b/site-workflow/pkg/activity/expectedmachine.go index 3e515e791..ed3f59a35 100644 --- a/site-workflow/pkg/activity/expectedmachine.go +++ b/site-workflow/pkg/activity/expectedmachine.go @@ -501,6 +501,129 @@ func (mem *ManageExpectedMachine) CreateExpectedMachinesOnFlow(ctx context.Conte return nil } +// UpdateExpectedMachineOnFlow patches an Expected Machine component in Flow via PatchComponent. +// Only patchable fields (firmware_version, position, description, rack_id, bmcs) are sent; +// identity fields like name/serial_number are immutable in Flow once created. +// Best-effort: missing/unconnected Flow client skips silently; RPC failures are returned. +func (mem *ManageExpectedMachine) UpdateExpectedMachineOnFlow(ctx context.Context, request *cwssaws.ExpectedMachine) error { + logger := log.With().Str("Activity", "UpdateExpectedMachineOnFlow").Logger() + + logger.Info().Msg("Starting activity") + + if request == nil { + return temporal.NewNonRetryableApplicationError("received empty update Expected Machine request for Flow", swe.ErrTypeInvalidRequest, errors.New("nil request")) + } + if request.GetId().GetValue() == "" { + return temporal.NewNonRetryableApplicationError("received update Expected Machine request for Flow without required id field", swe.ErrTypeInvalidRequest, errors.New("missing id")) + } + + if mem.flowGrpcAtomicClient == nil { + logger.Warn().Msg("Flow client not configured, skipping Flow component update") + return nil + } + + flowClient := mem.flowGrpcAtomicClient.GetClient() + if flowClient == nil { + logger.Warn().Msg("Flow client not connected, skipping Flow component update") + return nil + } + + patchReq := componentToPatchRequest(expectedMachineToFlowComponent(request)) + _, err := flowClient.GrpcServiceClient().PatchComponent(ctx, patchReq) + if err != nil { + logger.Warn().Err(err).Msg("Failed to update Expected Machine component on Flow") + return swe.WrapErr(err) + } + + logger.Info().Msg("Completed activity") + return nil +} + +// UpdateExpectedMachinesOnFlow patches multiple Expected Machine components in Flow via PatchComponent. +// Mirrors CreateExpectedMachinesOnFlow's per-item loop: per-item failures are counted +// and logged but never fail the activity, since Flow dual-write is best-effort. +func (mem *ManageExpectedMachine) UpdateExpectedMachinesOnFlow(ctx context.Context, request *cwssaws.BatchExpectedMachineOperationRequest) error { + logger := log.With().Str("Activity", "UpdateExpectedMachinesOnFlow").Logger() + + logger.Info().Msg("Starting activity") + + if mem.flowGrpcAtomicClient == nil { + logger.Warn().Msg("Flow client not configured, skipping Flow component update") + return nil + } + + flowClient := mem.flowGrpcAtomicClient.GetClient() + if flowClient == nil { + logger.Warn().Msg("Flow client not connected, skipping Flow component update") + return nil + } + + grpcServiceClient := flowClient.GrpcServiceClient() + machines := request.GetExpectedMachines().GetExpectedMachines() + successes := 0 + failures := 0 + + // TODO(chet): Work with Flow team to add batch support so we don't have to loop here. + for _, machine := range machines { + if machine.GetId().GetValue() == "" { + logger.Warn().Msg("Skipping Expected Machine without id in batch Flow update") + failures++ + continue + } + patchReq := componentToPatchRequest(expectedMachineToFlowComponent(machine)) + _, err := grpcServiceClient.PatchComponent(ctx, patchReq) + if err != nil { + logger.Warn().Err(err).Str("ID", machine.GetId().GetValue()).Msg("Failed to update Expected Machine component on Flow") + failures++ + } else { + successes++ + } + } + + logger.Info(). + Int("Total", len(machines)). + Int("Succeeded", successes). + Int("Failed", failures). + Msg("Completed activity") + + return nil +} + +// DeleteExpectedMachineOnFlow soft-deletes an Expected Machine component in Flow via DeleteComponent. +// Best-effort: missing/unconnected Flow client skips silently; RPC failures are returned. +func (mem *ManageExpectedMachine) DeleteExpectedMachineOnFlow(ctx context.Context, request *cwssaws.ExpectedMachineRequest) error { + logger := log.With().Str("Activity", "DeleteExpectedMachineOnFlow").Logger() + + logger.Info().Msg("Starting activity") + + if request == nil { + return temporal.NewNonRetryableApplicationError("received empty delete Expected Machine request for Flow", swe.ErrTypeInvalidRequest, errors.New("nil request")) + } + if request.GetId().GetValue() == "" { + return temporal.NewNonRetryableApplicationError("received delete Expected Machine request for Flow without required id field", swe.ErrTypeInvalidRequest, errors.New("missing id")) + } + + if mem.flowGrpcAtomicClient == nil { + logger.Warn().Msg("Flow client not configured, skipping Flow component delete") + return nil + } + + flowClient := mem.flowGrpcAtomicClient.GetClient() + if flowClient == nil { + logger.Warn().Msg("Flow client not connected, skipping Flow component delete") + return nil + } + + _, err := flowClient.GrpcServiceClient().DeleteComponent(ctx, &flowv1.DeleteComponentRequest{Id: &flowv1.UUID{Id: request.GetId().GetValue()}}) + if err != nil { + logger.Warn().Err(err).Msg("Failed to delete Expected Machine component on Flow") + return swe.WrapErr(err) + } + + logger.Info().Msg("Completed activity") + return nil +} + // expectedMachineToFlowComponent converts a NICo ExpectedMachine proto to an Flow Component proto func expectedMachineToFlowComponent(em *cwssaws.ExpectedMachine) *flowv1.Component { component := &flowv1.Component{ @@ -559,6 +682,36 @@ func expectedMachineToFlowComponent(em *cwssaws.ExpectedMachine) *flowv1.Compone return component } +// componentToPatchRequest extracts the patchable subset of a Flow Component into a +// PatchComponentRequest. Used by the Expected{Machine,Switch,PowerShelf} Update-on-Flow +// activities so the conversion stays in sync with expected*ToFlowComponent. +// +// Empty / nil values are dropped rather than sent as explicit zero patches: the entity +// types don't model a "clear this field" intent, so a missing value means "leave the +// Flow side untouched", matching how the Create path declines to set empties. +func componentToPatchRequest(c *flowv1.Component) *flowv1.PatchComponentRequest { + if c == nil || c.GetInfo() == nil { + return nil + } + req := &flowv1.PatchComponentRequest{ + Id: c.GetInfo().GetId(), + Bmcs: c.GetBmcs(), + } + if fv := c.GetFirmwareVersion(); fv != "" { + req.FirmwareVersion = &fv + } + if c.GetPosition() != nil { + req.Position = c.GetPosition() + } + if desc := c.GetInfo().GetDescription(); desc != "" { + req.Description = &desc + } + if rid := c.GetRackId(); rid.GetId() != "" { + req.RackId = rid + } + return req +} + // UpdateExpectedMachinesOnSite updates multiple Expected Machines on NICo using the batch endpoint func (mem *ManageExpectedMachine) UpdateExpectedMachinesOnSite(ctx context.Context, request *cwssaws.BatchExpectedMachineOperationRequest) (*cwssaws.BatchExpectedMachineOperationResponse, error) { logger := log.With().Str("Activity", "UpdateExpectedMachinesOnSite").Logger() diff --git a/site-workflow/pkg/activity/expectedmachine_test.go b/site-workflow/pkg/activity/expectedmachine_test.go index 9072e540a..2218f1f75 100644 --- a/site-workflow/pkg/activity/expectedmachine_test.go +++ b/site-workflow/pkg/activity/expectedmachine_test.go @@ -690,6 +690,86 @@ func TestManageExpectedMachine_CreateExpectedMachineOnFlow(t *testing.T) { }) } +func TestManageExpectedMachine_UpdateExpectedMachineOnFlow(t *testing.T) { + t.Run("nil request returns non-retryable error", func(t *testing.T) { + mm := ManageExpectedMachine{flowGrpcAtomicClient: nil} + err := mm.UpdateExpectedMachineOnFlow(context.Background(), nil) + assert.Error(t, err) + }) + + t.Run("empty id returns non-retryable error", func(t *testing.T) { + mm := ManageExpectedMachine{flowGrpcAtomicClient: nil} + err := mm.UpdateExpectedMachineOnFlow(context.Background(), &cwssaws.ExpectedMachine{ + BmcMacAddress: "00:11:22:33:44:55", ChassisSerialNumber: "SN001", + }) + assert.Error(t, err) + }) + + t.Run("nil Flow client skips gracefully", func(t *testing.T) { + mm := ManageExpectedMachine{flowGrpcAtomicClient: nil} + err := mm.UpdateExpectedMachineOnFlow(context.Background(), &cwssaws.ExpectedMachine{ + Id: &cwssaws.UUID{Value: uuid.NewString()}, BmcMacAddress: "00:11:22:33:44:55", ChassisSerialNumber: "SN001", + }) + assert.NoError(t, err) + }) + + t.Run("nil Flow client connection skips gracefully", func(t *testing.T) { + mm := ManageExpectedMachine{flowGrpcAtomicClient: cClient.NewFlowGrpcAtomicClient(&cClient.FlowGrpcClientConfig{})} + err := mm.UpdateExpectedMachineOnFlow(context.Background(), &cwssaws.ExpectedMachine{ + Id: &cwssaws.UUID{Value: uuid.NewString()}, BmcMacAddress: "00:11:22:33:44:55", ChassisSerialNumber: "SN001", + }) + assert.NoError(t, err) + }) +} + +func TestManageExpectedMachine_UpdateExpectedMachinesOnFlow(t *testing.T) { + req := &cwssaws.BatchExpectedMachineOperationRequest{ + ExpectedMachines: &cwssaws.ExpectedMachineList{ + ExpectedMachines: []*cwssaws.ExpectedMachine{ + {Id: &cwssaws.UUID{Value: uuid.NewString()}, BmcMacAddress: "00:11:22:33:44:55", ChassisSerialNumber: "SN001"}, + }, + }, + } + + t.Run("nil Flow client skips gracefully", func(t *testing.T) { + mm := ManageExpectedMachine{flowGrpcAtomicClient: nil} + err := mm.UpdateExpectedMachinesOnFlow(context.Background(), req) + assert.NoError(t, err) + }) + + t.Run("nil Flow client connection skips gracefully", func(t *testing.T) { + mm := ManageExpectedMachine{flowGrpcAtomicClient: cClient.NewFlowGrpcAtomicClient(&cClient.FlowGrpcClientConfig{})} + err := mm.UpdateExpectedMachinesOnFlow(context.Background(), req) + assert.NoError(t, err) + }) +} + +func TestManageExpectedMachine_DeleteExpectedMachineOnFlow(t *testing.T) { + t.Run("nil request returns non-retryable error", func(t *testing.T) { + mm := ManageExpectedMachine{flowGrpcAtomicClient: nil} + err := mm.DeleteExpectedMachineOnFlow(context.Background(), nil) + assert.Error(t, err) + }) + + t.Run("empty id returns non-retryable error", func(t *testing.T) { + mm := ManageExpectedMachine{flowGrpcAtomicClient: nil} + err := mm.DeleteExpectedMachineOnFlow(context.Background(), &cwssaws.ExpectedMachineRequest{}) + assert.Error(t, err) + }) + + t.Run("nil Flow client skips gracefully", func(t *testing.T) { + mm := ManageExpectedMachine{flowGrpcAtomicClient: nil} + err := mm.DeleteExpectedMachineOnFlow(context.Background(), &cwssaws.ExpectedMachineRequest{Id: &cwssaws.UUID{Value: uuid.NewString()}}) + assert.NoError(t, err) + }) + + t.Run("nil Flow client connection skips gracefully", func(t *testing.T) { + mm := ManageExpectedMachine{flowGrpcAtomicClient: cClient.NewFlowGrpcAtomicClient(&cClient.FlowGrpcClientConfig{})} + err := mm.DeleteExpectedMachineOnFlow(context.Background(), &cwssaws.ExpectedMachineRequest{Id: &cwssaws.UUID{Value: uuid.NewString()}}) + assert.NoError(t, err) + }) +} + func TestManageExpectedMachine_CreateExpectedMachinesOnFlow(t *testing.T) { t.Run("nil Flow client skips gracefully", func(t *testing.T) { mm := ManageExpectedMachine{flowGrpcAtomicClient: nil} diff --git a/site-workflow/pkg/activity/expectedpowershelf.go b/site-workflow/pkg/activity/expectedpowershelf.go index 4c0ff9a0d..61331add1 100644 --- a/site-workflow/pkg/activity/expectedpowershelf.go +++ b/site-workflow/pkg/activity/expectedpowershelf.go @@ -431,6 +431,81 @@ func expectedPowerShelfToFlowComponent(eps *cwssaws.ExpectedPowerShelf) *flowv1. return component } +// UpdateExpectedPowerShelfOnFlow patches an Expected Power Shelf component in Flow via PatchComponent. +// Only patchable fields (firmware_version, position, description, rack_id, bmcs) are sent; +// identity fields are immutable in Flow once created. +// Best-effort: missing/unconnected Flow client skips silently; RPC failures are returned. +func (meps *ManageExpectedPowerShelf) UpdateExpectedPowerShelfOnFlow(ctx context.Context, request *cwssaws.ExpectedPowerShelf) error { + logger := log.With().Str("Activity", "UpdateExpectedPowerShelfOnFlow").Logger() + + logger.Info().Msg("Starting activity") + + if request == nil { + return temporal.NewNonRetryableApplicationError("received empty update Expected Power Shelf request for Flow", swe.ErrTypeInvalidRequest, errors.New("nil request")) + } + if request.GetExpectedPowerShelfId().GetValue() == "" { + return temporal.NewNonRetryableApplicationError("received update Expected Power Shelf request for Flow without required id field", swe.ErrTypeInvalidRequest, errors.New("missing id")) + } + + if meps.flowGrpcAtomicClient == nil { + logger.Warn().Msg("Flow client not configured, skipping Flow component update") + return nil + } + + grpcClient := meps.flowGrpcAtomicClient.GetClient() + if grpcClient == nil { + logger.Warn().Msg("Flow client not connected, skipping Flow component update") + return nil + } + grpcServiceClient := grpcClient.GrpcServiceClient() + + patchReq := componentToPatchRequest(expectedPowerShelfToFlowComponent(request)) + _, err := grpcServiceClient.PatchComponent(ctx, patchReq) + if err != nil { + logger.Warn().Err(err).Msg("Failed to update Expected Power Shelf component on Flow") + return swe.WrapErr(err) + } + + logger.Info().Msg("Completed activity") + return nil +} + +// DeleteExpectedPowerShelfOnFlow soft-deletes an Expected Power Shelf component in Flow via DeleteComponent. +// Best-effort: missing/unconnected Flow client skips silently; RPC failures are returned. +func (meps *ManageExpectedPowerShelf) DeleteExpectedPowerShelfOnFlow(ctx context.Context, request *cwssaws.ExpectedPowerShelfRequest) error { + logger := log.With().Str("Activity", "DeleteExpectedPowerShelfOnFlow").Logger() + + logger.Info().Msg("Starting activity") + + if request == nil { + return temporal.NewNonRetryableApplicationError("received empty delete Expected Power Shelf request for Flow", swe.ErrTypeInvalidRequest, errors.New("nil request")) + } + if request.GetExpectedPowerShelfId().GetValue() == "" { + return temporal.NewNonRetryableApplicationError("received delete Expected Power Shelf request for Flow without required id field", swe.ErrTypeInvalidRequest, errors.New("missing id")) + } + + if meps.flowGrpcAtomicClient == nil { + logger.Warn().Msg("Flow client not configured, skipping Flow component delete") + return nil + } + + grpcClient := meps.flowGrpcAtomicClient.GetClient() + if grpcClient == nil { + logger.Warn().Msg("Flow client not connected, skipping Flow component delete") + return nil + } + grpcServiceClient := grpcClient.GrpcServiceClient() + + _, err := grpcServiceClient.DeleteComponent(ctx, &flowv1.DeleteComponentRequest{Id: &flowv1.UUID{Id: request.GetExpectedPowerShelfId().GetValue()}}) + if err != nil { + logger.Warn().Err(err).Msg("Failed to delete Expected Power Shelf component on Flow") + return swe.WrapErr(err) + } + + logger.Info().Msg("Completed activity") + return nil +} + // DeleteExpectedPowerShelfOnSite deletes Expected Power Shelf on NICo func (meps *ManageExpectedPowerShelf) DeleteExpectedPowerShelfOnSite(ctx context.Context, request *cwssaws.ExpectedPowerShelfRequest) error { logger := log.With().Str("Activity", "DeleteExpectedPowerShelfOnSite").Logger() diff --git a/site-workflow/pkg/activity/expectedpowershelf_test.go b/site-workflow/pkg/activity/expectedpowershelf_test.go index 5786b8f97..166715cff 100644 --- a/site-workflow/pkg/activity/expectedpowershelf_test.go +++ b/site-workflow/pkg/activity/expectedpowershelf_test.go @@ -462,6 +462,68 @@ func TestManageExpectedPowerShelf_CreateExpectedPowerShelfOnFlow(t *testing.T) { }) } +func TestManageExpectedPowerShelf_UpdateExpectedPowerShelfOnFlow(t *testing.T) { + t.Run("nil request returns non-retryable error", func(t *testing.T) { + mm := ManageExpectedPowerShelf{flowGrpcAtomicClient: nil} + err := mm.UpdateExpectedPowerShelfOnFlow(context.Background(), nil) + assert.Error(t, err) + }) + + t.Run("empty id returns non-retryable error", func(t *testing.T) { + mm := ManageExpectedPowerShelf{flowGrpcAtomicClient: nil} + err := mm.UpdateExpectedPowerShelfOnFlow(context.Background(), &cwssaws.ExpectedPowerShelf{ + BmcMacAddress: "00:11:22:33:44:55", ShelfSerialNumber: "SHELF001", + }) + assert.Error(t, err) + }) + + t.Run("nil Flow client skips gracefully", func(t *testing.T) { + mm := ManageExpectedPowerShelf{flowGrpcAtomicClient: nil} + err := mm.UpdateExpectedPowerShelfOnFlow(context.Background(), &cwssaws.ExpectedPowerShelf{ + ExpectedPowerShelfId: &cwssaws.UUID{Value: uuid.NewString()}, BmcMacAddress: "00:11:22:33:44:55", ShelfSerialNumber: "SHELF001", + }) + assert.NoError(t, err) + }) + + t.Run("nil Flow client connection skips gracefully", func(t *testing.T) { + mm := ManageExpectedPowerShelf{flowGrpcAtomicClient: cClient.NewFlowGrpcAtomicClient(&cClient.FlowGrpcClientConfig{})} + err := mm.UpdateExpectedPowerShelfOnFlow(context.Background(), &cwssaws.ExpectedPowerShelf{ + ExpectedPowerShelfId: &cwssaws.UUID{Value: uuid.NewString()}, BmcMacAddress: "00:11:22:33:44:55", ShelfSerialNumber: "SHELF001", + }) + assert.NoError(t, err) + }) +} + +func TestManageExpectedPowerShelf_DeleteExpectedPowerShelfOnFlow(t *testing.T) { + t.Run("nil request returns non-retryable error", func(t *testing.T) { + mm := ManageExpectedPowerShelf{flowGrpcAtomicClient: nil} + err := mm.DeleteExpectedPowerShelfOnFlow(context.Background(), nil) + assert.Error(t, err) + }) + + t.Run("empty id returns non-retryable error", func(t *testing.T) { + mm := ManageExpectedPowerShelf{flowGrpcAtomicClient: nil} + err := mm.DeleteExpectedPowerShelfOnFlow(context.Background(), &cwssaws.ExpectedPowerShelfRequest{}) + assert.Error(t, err) + }) + + t.Run("nil Flow client skips gracefully", func(t *testing.T) { + mm := ManageExpectedPowerShelf{flowGrpcAtomicClient: nil} + err := mm.DeleteExpectedPowerShelfOnFlow(context.Background(), &cwssaws.ExpectedPowerShelfRequest{ + ExpectedPowerShelfId: &cwssaws.UUID{Value: uuid.NewString()}, + }) + assert.NoError(t, err) + }) + + t.Run("nil Flow client connection skips gracefully", func(t *testing.T) { + mm := ManageExpectedPowerShelf{flowGrpcAtomicClient: cClient.NewFlowGrpcAtomicClient(&cClient.FlowGrpcClientConfig{})} + err := mm.DeleteExpectedPowerShelfOnFlow(context.Background(), &cwssaws.ExpectedPowerShelfRequest{ + ExpectedPowerShelfId: &cwssaws.UUID{Value: uuid.NewString()}, + }) + assert.NoError(t, err) + }) +} + func Test_expectedPowerShelfToFlowComponent(t *testing.T) { strPtr := func(s string) *string { return &s } int32Ptr := func(i int32) *int32 { return &i } diff --git a/site-workflow/pkg/activity/expectedrack.go b/site-workflow/pkg/activity/expectedrack.go index 8a129140f..e23a076ba 100644 --- a/site-workflow/pkg/activity/expectedrack.go +++ b/site-workflow/pkg/activity/expectedrack.go @@ -424,6 +424,79 @@ func (mer *ManageExpectedRack) CreateExpectedRackOnFlow(ctx context.Context, req return nil } +// UpdateExpectedRackOnFlow patches an existing Expected Rack in Flow via PatchRack. +// Best-effort: missing/unconnected Flow client skips silently; RPC failures are +// returned so the workflow can log and continue (mirroring CreateExpectedRackOnFlow). +func (mer *ManageExpectedRack) UpdateExpectedRackOnFlow(ctx context.Context, request *cwssaws.ExpectedRack) error { + logger := log.With().Str("Activity", "UpdateExpectedRackOnFlow").Logger() + + logger.Info().Msg("Starting activity") + + if request == nil { + return temporal.NewNonRetryableApplicationError("received empty update Expected Rack request for Flow", swe.ErrTypeInvalidRequest, errors.New("nil request")) + } + + if mer.flowGrpcAtomicClient == nil { + logger.Warn().Msg("Flow client not configured, skipping Flow expected rack update") + return nil + } + + grpcClient := mer.flowGrpcAtomicClient.GetClient() + if grpcClient == nil { + logger.Warn().Msg("Flow client not connected, skipping Flow expected rack update") + return nil + } + grpcServiceClient := grpcClient.GrpcServiceClient() + + rack := expectedRackToFlowRack(request) + _, err := grpcServiceClient.PatchRack(ctx, &flowv1.PatchRackRequest{Rack: rack}) + if err != nil { + logger.Warn().Err(err).Msg("Failed to update Expected Rack on Flow") + return swe.WrapErr(err) + } + + logger.Info().Msg("Completed activity") + return nil +} + +// DeleteExpectedRackOnFlow soft-deletes an Expected Rack in Flow via DeleteRack, +// which also cascades to the rack's components. +// Best-effort: missing/unconnected Flow client skips silently; RPC failures are +// returned so the workflow can log and continue. +func (mer *ManageExpectedRack) DeleteExpectedRackOnFlow(ctx context.Context, request *cwssaws.ExpectedRackRequest) error { + logger := log.With().Str("Activity", "DeleteExpectedRackOnFlow").Logger() + + logger.Info().Msg("Starting activity") + + if request == nil { + return temporal.NewNonRetryableApplicationError("received empty delete Expected Rack request for Flow", swe.ErrTypeInvalidRequest, errors.New("nil request")) + } + if request.GetRackId() == "" { + return temporal.NewNonRetryableApplicationError("received delete Expected Rack request for Flow without required rack_id field", swe.ErrTypeInvalidRequest, errors.New("missing rack_id")) + } + + if mer.flowGrpcAtomicClient == nil { + logger.Warn().Msg("Flow client not configured, skipping Flow expected rack delete") + return nil + } + + grpcClient := mer.flowGrpcAtomicClient.GetClient() + if grpcClient == nil { + logger.Warn().Msg("Flow client not connected, skipping Flow expected rack delete") + return nil + } + grpcServiceClient := grpcClient.GrpcServiceClient() + + _, err := grpcServiceClient.DeleteRack(ctx, &flowv1.DeleteRackRequest{Id: &flowv1.UUID{Id: request.GetRackId()}}) + if err != nil { + logger.Warn().Err(err).Msg("Failed to delete Expected Rack on Flow") + return swe.WrapErr(err) + } + + logger.Info().Msg("Completed activity") + return nil +} + // labelValue extracts the value for a label key from a metadata label slice. Returns // empty string if the key is not present or the value is nil. func labelValue(labels []*cwssaws.Label, key string) string { diff --git a/site-workflow/pkg/activity/expectedrack_test.go b/site-workflow/pkg/activity/expectedrack_test.go index 28847342e..9a121c1b9 100644 --- a/site-workflow/pkg/activity/expectedrack_test.go +++ b/site-workflow/pkg/activity/expectedrack_test.go @@ -365,6 +365,58 @@ func TestManageExpectedRack_CreateExpectedRackOnFlow(t *testing.T) { }) } +func TestManageExpectedRack_UpdateExpectedRackOnFlow(t *testing.T) { + t.Run("nil request returns non-retryable error", func(t *testing.T) { + mer := ManageExpectedRack{flowGrpcAtomicClient: nil} + err := mer.UpdateExpectedRackOnFlow(context.Background(), nil) + assert.Error(t, err) + }) + + t.Run("nil Flow client skips gracefully", func(t *testing.T) { + mer := ManageExpectedRack{flowGrpcAtomicClient: nil} + err := mer.UpdateExpectedRackOnFlow(context.Background(), &cwssaws.ExpectedRack{ + RackId: &cwssaws.RackId{Id: uuid.NewString()}, + RackType: uuid.NewString(), + }) + assert.NoError(t, err) + }) + + t.Run("nil Flow client connection skips gracefully", func(t *testing.T) { + mer := ManageExpectedRack{flowGrpcAtomicClient: cClient.NewFlowGrpcAtomicClient(&cClient.FlowGrpcClientConfig{})} + err := mer.UpdateExpectedRackOnFlow(context.Background(), &cwssaws.ExpectedRack{ + RackId: &cwssaws.RackId{Id: uuid.NewString()}, + RackType: uuid.NewString(), + }) + assert.NoError(t, err) + }) +} + +func TestManageExpectedRack_DeleteExpectedRackOnFlow(t *testing.T) { + t.Run("nil request returns non-retryable error", func(t *testing.T) { + mer := ManageExpectedRack{flowGrpcAtomicClient: nil} + err := mer.DeleteExpectedRackOnFlow(context.Background(), nil) + assert.Error(t, err) + }) + + t.Run("empty rack_id returns non-retryable error", func(t *testing.T) { + mer := ManageExpectedRack{flowGrpcAtomicClient: nil} + err := mer.DeleteExpectedRackOnFlow(context.Background(), &cwssaws.ExpectedRackRequest{RackId: ""}) + assert.Error(t, err) + }) + + t.Run("nil Flow client skips gracefully", func(t *testing.T) { + mer := ManageExpectedRack{flowGrpcAtomicClient: nil} + err := mer.DeleteExpectedRackOnFlow(context.Background(), &cwssaws.ExpectedRackRequest{RackId: uuid.NewString()}) + assert.NoError(t, err) + }) + + t.Run("nil Flow client connection skips gracefully", func(t *testing.T) { + mer := ManageExpectedRack{flowGrpcAtomicClient: cClient.NewFlowGrpcAtomicClient(&cClient.FlowGrpcClientConfig{})} + err := mer.DeleteExpectedRackOnFlow(context.Background(), &cwssaws.ExpectedRackRequest{RackId: uuid.NewString()}) + assert.NoError(t, err) + }) +} + func Test_expectedRackToFlowRack(t *testing.T) { strPtr := func(s string) *string { return &s } diff --git a/site-workflow/pkg/activity/expectedswitch.go b/site-workflow/pkg/activity/expectedswitch.go index e913f3b51..c908bf190 100644 --- a/site-workflow/pkg/activity/expectedswitch.go +++ b/site-workflow/pkg/activity/expectedswitch.go @@ -426,6 +426,81 @@ func expectedSwitchToFlowComponent(es *cwssaws.ExpectedSwitch) *flowv1.Component return component } +// UpdateExpectedSwitchOnFlow patches an Expected Switch component in Flow via PatchComponent. +// Only patchable fields (firmware_version, position, description, rack_id, bmcs) are sent; +// identity fields are immutable in Flow once created. +// Best-effort: missing/unconnected Flow client skips silently; RPC failures are returned. +func (mes *ManageExpectedSwitch) UpdateExpectedSwitchOnFlow(ctx context.Context, request *cwssaws.ExpectedSwitch) error { + logger := log.With().Str("Activity", "UpdateExpectedSwitchOnFlow").Logger() + + logger.Info().Msg("Starting activity") + + if request == nil { + return temporal.NewNonRetryableApplicationError("received empty update Expected Switch request for Flow", swe.ErrTypeInvalidRequest, errors.New("nil request")) + } + if request.GetExpectedSwitchId().GetValue() == "" { + return temporal.NewNonRetryableApplicationError("received update Expected Switch request for Flow without required id field", swe.ErrTypeInvalidRequest, errors.New("missing id")) + } + + if mes.flowGrpcAtomicClient == nil { + logger.Warn().Msg("Flow client not configured, skipping Flow component update") + return nil + } + + grpcClient := mes.flowGrpcAtomicClient.GetClient() + if grpcClient == nil { + logger.Warn().Msg("Flow client not connected, skipping Flow component update") + return nil + } + grpcServiceClient := grpcClient.GrpcServiceClient() + + patchReq := componentToPatchRequest(expectedSwitchToFlowComponent(request)) + _, err := grpcServiceClient.PatchComponent(ctx, patchReq) + if err != nil { + logger.Warn().Err(err).Msg("Failed to update Expected Switch component on Flow") + return swe.WrapErr(err) + } + + logger.Info().Msg("Completed activity") + return nil +} + +// DeleteExpectedSwitchOnFlow soft-deletes an Expected Switch component in Flow via DeleteComponent. +// Best-effort: missing/unconnected Flow client skips silently; RPC failures are returned. +func (mes *ManageExpectedSwitch) DeleteExpectedSwitchOnFlow(ctx context.Context, request *cwssaws.ExpectedSwitchRequest) error { + logger := log.With().Str("Activity", "DeleteExpectedSwitchOnFlow").Logger() + + logger.Info().Msg("Starting activity") + + if request == nil { + return temporal.NewNonRetryableApplicationError("received empty delete Expected Switch request for Flow", swe.ErrTypeInvalidRequest, errors.New("nil request")) + } + if request.GetExpectedSwitchId().GetValue() == "" { + return temporal.NewNonRetryableApplicationError("received delete Expected Switch request for Flow without required id field", swe.ErrTypeInvalidRequest, errors.New("missing id")) + } + + if mes.flowGrpcAtomicClient == nil { + logger.Warn().Msg("Flow client not configured, skipping Flow component delete") + return nil + } + + grpcClient := mes.flowGrpcAtomicClient.GetClient() + if grpcClient == nil { + logger.Warn().Msg("Flow client not connected, skipping Flow component delete") + return nil + } + grpcServiceClient := grpcClient.GrpcServiceClient() + + _, err := grpcServiceClient.DeleteComponent(ctx, &flowv1.DeleteComponentRequest{Id: &flowv1.UUID{Id: request.GetExpectedSwitchId().GetValue()}}) + if err != nil { + logger.Warn().Err(err).Msg("Failed to delete Expected Switch component on Flow") + return swe.WrapErr(err) + } + + logger.Info().Msg("Completed activity") + return nil +} + // DeleteExpectedSwitchOnSite deletes Expected Switch on NICo func (mes *ManageExpectedSwitch) DeleteExpectedSwitchOnSite(ctx context.Context, request *cwssaws.ExpectedSwitchRequest) error { logger := log.With().Str("Activity", "DeleteExpectedSwitchOnSite").Logger() diff --git a/site-workflow/pkg/activity/expectedswitch_test.go b/site-workflow/pkg/activity/expectedswitch_test.go index fa9a6f989..6e010b08c 100644 --- a/site-workflow/pkg/activity/expectedswitch_test.go +++ b/site-workflow/pkg/activity/expectedswitch_test.go @@ -462,6 +462,68 @@ func TestManageExpectedSwitch_CreateExpectedSwitchOnFlow(t *testing.T) { }) } +func TestManageExpectedSwitch_UpdateExpectedSwitchOnFlow(t *testing.T) { + t.Run("nil request returns non-retryable error", func(t *testing.T) { + mm := ManageExpectedSwitch{flowGrpcAtomicClient: nil} + err := mm.UpdateExpectedSwitchOnFlow(context.Background(), nil) + assert.Error(t, err) + }) + + t.Run("empty id returns non-retryable error", func(t *testing.T) { + mm := ManageExpectedSwitch{flowGrpcAtomicClient: nil} + err := mm.UpdateExpectedSwitchOnFlow(context.Background(), &cwssaws.ExpectedSwitch{ + BmcMacAddress: "00:11:22:33:44:55", SwitchSerialNumber: "SW001", + }) + assert.Error(t, err) + }) + + t.Run("nil Flow client skips gracefully", func(t *testing.T) { + mm := ManageExpectedSwitch{flowGrpcAtomicClient: nil} + err := mm.UpdateExpectedSwitchOnFlow(context.Background(), &cwssaws.ExpectedSwitch{ + ExpectedSwitchId: &cwssaws.UUID{Value: uuid.NewString()}, BmcMacAddress: "00:11:22:33:44:55", SwitchSerialNumber: "SW001", + }) + assert.NoError(t, err) + }) + + t.Run("nil Flow client connection skips gracefully", func(t *testing.T) { + mm := ManageExpectedSwitch{flowGrpcAtomicClient: cClient.NewFlowGrpcAtomicClient(&cClient.FlowGrpcClientConfig{})} + err := mm.UpdateExpectedSwitchOnFlow(context.Background(), &cwssaws.ExpectedSwitch{ + ExpectedSwitchId: &cwssaws.UUID{Value: uuid.NewString()}, BmcMacAddress: "00:11:22:33:44:55", SwitchSerialNumber: "SW001", + }) + assert.NoError(t, err) + }) +} + +func TestManageExpectedSwitch_DeleteExpectedSwitchOnFlow(t *testing.T) { + t.Run("nil request returns non-retryable error", func(t *testing.T) { + mm := ManageExpectedSwitch{flowGrpcAtomicClient: nil} + err := mm.DeleteExpectedSwitchOnFlow(context.Background(), nil) + assert.Error(t, err) + }) + + t.Run("empty id returns non-retryable error", func(t *testing.T) { + mm := ManageExpectedSwitch{flowGrpcAtomicClient: nil} + err := mm.DeleteExpectedSwitchOnFlow(context.Background(), &cwssaws.ExpectedSwitchRequest{}) + assert.Error(t, err) + }) + + t.Run("nil Flow client skips gracefully", func(t *testing.T) { + mm := ManageExpectedSwitch{flowGrpcAtomicClient: nil} + err := mm.DeleteExpectedSwitchOnFlow(context.Background(), &cwssaws.ExpectedSwitchRequest{ + ExpectedSwitchId: &cwssaws.UUID{Value: uuid.NewString()}, + }) + assert.NoError(t, err) + }) + + t.Run("nil Flow client connection skips gracefully", func(t *testing.T) { + mm := ManageExpectedSwitch{flowGrpcAtomicClient: cClient.NewFlowGrpcAtomicClient(&cClient.FlowGrpcClientConfig{})} + err := mm.DeleteExpectedSwitchOnFlow(context.Background(), &cwssaws.ExpectedSwitchRequest{ + ExpectedSwitchId: &cwssaws.UUID{Value: uuid.NewString()}, + }) + assert.NoError(t, err) + }) +} + func Test_expectedSwitchToFlowComponent(t *testing.T) { strPtr := func(s string) *string { return &s } int32Ptr := func(i int32) *int32 { return &i } diff --git a/site-workflow/pkg/workflow/expectedmachine.go b/site-workflow/pkg/workflow/expectedmachine.go index c32044fb5..d0168bd7d 100644 --- a/site-workflow/pkg/workflow/expectedmachine.go +++ b/site-workflow/pkg/workflow/expectedmachine.go @@ -93,8 +93,8 @@ func CreateExpectedMachine(ctx workflow.Context, request *cwssaws.ExpectedMachin return nil } -// UpdateExpectedMachine is a workflow to update Expected Machines using the UpdateExpectedMachineOnSite activity -// TODO: Add Flow PatchComponent dual-write when update/delete Flow support is implemented +// UpdateExpectedMachine is a workflow to update Expected Machines using the UpdateExpectedMachineOnSite activity, +// then also patches the component in Flow via UpdateExpectedMachineOnFlow (best-effort). func UpdateExpectedMachine(ctx workflow.Context, request *cwssaws.ExpectedMachine) error { logger := log.With().Str("Workflow", "ExpectedMachine").Str("Action", "Update").Str("ID", request.GetId().GetValue()).Str("Expected MAC address", request.BmcMacAddress).Str("Serial", request.ChassisSerialNumber).Logger() @@ -124,6 +124,11 @@ func UpdateExpectedMachine(ctx workflow.Context, request *cwssaws.ExpectedMachin return err } + err = workflow.ExecuteActivity(ctx, expectedMachineManager.UpdateExpectedMachineOnFlow, request).Get(ctx, nil) + if err != nil { + logger.Warn().Err(err).Str("Activity", "UpdateExpectedMachineOnFlow").Msg("Failed to update component on Flow, Core write succeeded") + } + logger.Info().Msg("completing workflow") return nil @@ -174,7 +179,8 @@ func CreateExpectedMachines(ctx workflow.Context, request *cwssaws.BatchExpected return &response, nil } -// UpdateExpectedMachines is a workflow to update multiple Expected Machines using the UpdateExpectedMachinesOnSite activity +// UpdateExpectedMachines is a workflow to update multiple Expected Machines using the UpdateExpectedMachinesOnSite activity, +// then also patches the components in Flow via UpdateExpectedMachinesOnFlow (best-effort). func UpdateExpectedMachines(ctx workflow.Context, request *cwssaws.BatchExpectedMachineOperationRequest) (*cwssaws.BatchExpectedMachineOperationResponse, error) { logger := log.With().Str("Workflow", "ExpectedMachines").Str("Action", "Update").Int("Count", len(request.GetExpectedMachines().GetExpectedMachines())).Logger() @@ -206,13 +212,18 @@ func UpdateExpectedMachines(ctx workflow.Context, request *cwssaws.BatchExpected return nil, err } + err = workflow.ExecuteActivity(ctx, expectedMachineManager.UpdateExpectedMachinesOnFlow, request).Get(ctx, nil) + if err != nil { + logger.Warn().Err(err).Str("Activity", "UpdateExpectedMachinesOnFlow").Msg("Failed to update components on Flow, Core write succeeded") + } + logger.Info().Msg("completing workflow") return &response, nil } -// DeleteExpectedMachine is a workflow to Delete Expected Machines using the DeleteExpectedMachineOnSite activity -// TODO: Add Flow DeleteComponent dual-write when update/delete Flow support is implemented +// DeleteExpectedMachine is a workflow to Delete Expected Machines using the DeleteExpectedMachineOnSite activity, +// then also deletes the component from Flow via DeleteExpectedMachineOnFlow (best-effort). func DeleteExpectedMachine(ctx workflow.Context, request *cwssaws.ExpectedMachineRequest) error { logger := log.With().Str("Workflow", "ExpectedMachine").Str("Action", "Delete").Str("ID", request.GetId().GetValue()).Str("optional MAC address", request.BmcMacAddress).Logger() @@ -242,6 +253,11 @@ func DeleteExpectedMachine(ctx workflow.Context, request *cwssaws.ExpectedMachin return err } + err = workflow.ExecuteActivity(ctx, expectedMachineManager.DeleteExpectedMachineOnFlow, request).Get(ctx, nil) + if err != nil { + logger.Warn().Err(err).Str("Activity", "DeleteExpectedMachineOnFlow").Msg("Failed to delete component on Flow, Core write succeeded") + } + logger.Info().Msg("completing workflow") return nil diff --git a/site-workflow/pkg/workflow/expectedmachine_test.go b/site-workflow/pkg/workflow/expectedmachine_test.go index 5091801f6..bb5da5849 100644 --- a/site-workflow/pkg/workflow/expectedmachine_test.go +++ b/site-workflow/pkg/workflow/expectedmachine_test.go @@ -174,11 +174,12 @@ func (uemts *UpdateExpectedMachineTestSuite) Test_UpdateExpectedMachine_Success( BmcMacAddress: "00:11:22:33:44:55", } - // Mock UpdateExpectedMachineOnSite activity uemts.env.RegisterActivity(expectedMachineManager.UpdateExpectedMachineOnSite) uemts.env.OnActivity(expectedMachineManager.UpdateExpectedMachineOnSite, mock.Anything, mock.Anything).Return(nil) - // Execute workflow + uemts.env.RegisterActivity(expectedMachineManager.UpdateExpectedMachineOnFlow) + uemts.env.OnActivity(expectedMachineManager.UpdateExpectedMachineOnFlow, mock.Anything, mock.Anything).Return(nil) + uemts.env.ExecuteWorkflow(UpdateExpectedMachine, request) uemts.True(uemts.env.IsWorkflowCompleted()) uemts.NoError(uemts.env.GetWorkflowError()) @@ -194,16 +195,35 @@ func (uemts *UpdateExpectedMachineTestSuite) Test_UpdateExpectedMachine_Failure( errMsg := "Site Controller communication error" - // Mock UpdateExpectedMachineOnSite activity uemts.env.RegisterActivity(expectedMachineManager.UpdateExpectedMachineOnSite) uemts.env.OnActivity(expectedMachineManager.UpdateExpectedMachineOnSite, mock.Anything, mock.Anything).Return(errors.New(errMsg)) - // execute UpdateExpectedMachine workflow + uemts.env.RegisterActivity(expectedMachineManager.UpdateExpectedMachineOnFlow) + uemts.env.ExecuteWorkflow(UpdateExpectedMachine, request) uemts.True(uemts.env.IsWorkflowCompleted()) uemts.Error(uemts.env.GetWorkflowError()) } +func (uemts *UpdateExpectedMachineTestSuite) Test_UpdateExpectedMachine_CoreSuccess_FlowFailure() { + var expectedMachineManager iActivity.ManageExpectedMachine + + request := &cwssaws.ExpectedMachine{ + Id: &cwssaws.UUID{Value: "test-update-workflow-002"}, + BmcMacAddress: "00:11:22:33:44:55", + } + + uemts.env.RegisterActivity(expectedMachineManager.UpdateExpectedMachineOnSite) + uemts.env.OnActivity(expectedMachineManager.UpdateExpectedMachineOnSite, mock.Anything, mock.Anything).Return(nil) + + uemts.env.RegisterActivity(expectedMachineManager.UpdateExpectedMachineOnFlow) + uemts.env.OnActivity(expectedMachineManager.UpdateExpectedMachineOnFlow, mock.Anything, mock.Anything).Return(errors.New("Flow unavailable")) + + uemts.env.ExecuteWorkflow(UpdateExpectedMachine, request) + uemts.True(uemts.env.IsWorkflowCompleted()) + uemts.NoError(uemts.env.GetWorkflowError()) +} + func TestUpdateExpectedMachineTestSuite(t *testing.T) { suite.Run(t, new(UpdateExpectedMachineTestSuite)) } @@ -231,11 +251,12 @@ func (demts *DeleteExpectedMachineTestSuite) Test_DeleteExpectedMachine_Success( BmcMacAddress: "00:11:22:33:44:55", } - // Mock DeleteExpectedMachineOnSite activity demts.env.RegisterActivity(expectedMachineManager.DeleteExpectedMachineOnSite) demts.env.OnActivity(expectedMachineManager.DeleteExpectedMachineOnSite, mock.Anything, mock.Anything).Return(nil) - // execute workflow + demts.env.RegisterActivity(expectedMachineManager.DeleteExpectedMachineOnFlow) + demts.env.OnActivity(expectedMachineManager.DeleteExpectedMachineOnFlow, mock.Anything, mock.Anything).Return(nil) + demts.env.ExecuteWorkflow(DeleteExpectedMachine, request) demts.True(demts.env.IsWorkflowCompleted()) demts.NoError(demts.env.GetWorkflowError()) @@ -251,16 +272,35 @@ func (demts *DeleteExpectedMachineTestSuite) Test_DeleteExpectedMachine_Failure( errMsg := "Site Controller communication error" - // Mock DeleteExpectedMachineOnSite activity demts.env.RegisterActivity(expectedMachineManager.DeleteExpectedMachineOnSite) demts.env.OnActivity(expectedMachineManager.DeleteExpectedMachineOnSite, mock.Anything, mock.Anything).Return(errors.New(errMsg)) - // execute DeleteExpectedMachine workflow + demts.env.RegisterActivity(expectedMachineManager.DeleteExpectedMachineOnFlow) + demts.env.ExecuteWorkflow(DeleteExpectedMachine, request) demts.True(demts.env.IsWorkflowCompleted()) demts.Error(demts.env.GetWorkflowError()) } +func (demts *DeleteExpectedMachineTestSuite) Test_DeleteExpectedMachine_CoreSuccess_FlowFailure() { + var expectedMachineManager iActivity.ManageExpectedMachine + + request := &cwssaws.ExpectedMachineRequest{ + Id: &cwssaws.UUID{Value: "test-delete-workflow-002"}, + BmcMacAddress: "00:11:22:33:44:55", + } + + demts.env.RegisterActivity(expectedMachineManager.DeleteExpectedMachineOnSite) + demts.env.OnActivity(expectedMachineManager.DeleteExpectedMachineOnSite, mock.Anything, mock.Anything).Return(nil) + + demts.env.RegisterActivity(expectedMachineManager.DeleteExpectedMachineOnFlow) + demts.env.OnActivity(expectedMachineManager.DeleteExpectedMachineOnFlow, mock.Anything, mock.Anything).Return(errors.New("Flow unavailable")) + + demts.env.ExecuteWorkflow(DeleteExpectedMachine, request) + demts.True(demts.env.IsWorkflowCompleted()) + demts.NoError(demts.env.GetWorkflowError()) +} + func TestDeleteExpectedMachineTestSuite(t *testing.T) { suite.Run(t, new(DeleteExpectedMachineTestSuite)) } @@ -505,11 +545,12 @@ func (uemts *UpdateExpectedMachinesTestSuite) Test_UpdateExpectedMachines_Succes }, } - // Mock UpdateExpectedMachinesOnSite activity uemts.env.RegisterActivity(expectedMachineManager.UpdateExpectedMachinesOnSite) uemts.env.OnActivity(expectedMachineManager.UpdateExpectedMachinesOnSite, mock.Anything, mock.Anything).Return(expectedResponse, nil) - // Execute UpdateExpectedMachines workflow + uemts.env.RegisterActivity(expectedMachineManager.UpdateExpectedMachinesOnFlow) + uemts.env.OnActivity(expectedMachineManager.UpdateExpectedMachinesOnFlow, mock.Anything, mock.Anything).Return(nil) + uemts.env.ExecuteWorkflow(UpdateExpectedMachines, request) uemts.True(uemts.env.IsWorkflowCompleted()) uemts.NoError(uemts.env.GetWorkflowError()) @@ -571,11 +612,12 @@ func (uemts *UpdateExpectedMachinesTestSuite) Test_UpdateExpectedMachines_Partia }, } - // Mock UpdateExpectedMachinesOnSite activity uemts.env.RegisterActivity(expectedMachineManager.UpdateExpectedMachinesOnSite) uemts.env.OnActivity(expectedMachineManager.UpdateExpectedMachinesOnSite, mock.Anything, mock.Anything).Return(expectedResponse, nil) - // Execute UpdateExpectedMachines workflow + uemts.env.RegisterActivity(expectedMachineManager.UpdateExpectedMachinesOnFlow) + uemts.env.OnActivity(expectedMachineManager.UpdateExpectedMachinesOnFlow, mock.Anything, mock.Anything).Return(nil) + uemts.env.ExecuteWorkflow(UpdateExpectedMachines, request) uemts.True(uemts.env.IsWorkflowCompleted()) uemts.NoError(uemts.env.GetWorkflowError()) @@ -612,16 +654,50 @@ func (uemts *UpdateExpectedMachinesTestSuite) Test_UpdateExpectedMachines_Failur errMsg := "Site Controller communication error" - // Mock UpdateExpectedMachinesOnSite activity uemts.env.RegisterActivity(expectedMachineManager.UpdateExpectedMachinesOnSite) uemts.env.OnActivity(expectedMachineManager.UpdateExpectedMachinesOnSite, mock.Anything, mock.Anything).Return(nil, errors.New(errMsg)) - // execute UpdateExpectedMachines workflow + uemts.env.RegisterActivity(expectedMachineManager.UpdateExpectedMachinesOnFlow) + uemts.env.ExecuteWorkflow(UpdateExpectedMachines, request) uemts.True(uemts.env.IsWorkflowCompleted()) uemts.Error(uemts.env.GetWorkflowError()) } +func (uemts *UpdateExpectedMachinesTestSuite) Test_UpdateExpectedMachines_CoreSuccess_FlowFailure() { + var expectedMachineManager iActivity.ManageExpectedMachine + + request := &cwssaws.BatchExpectedMachineOperationRequest{ + ExpectedMachines: &cwssaws.ExpectedMachineList{ + ExpectedMachines: []*cwssaws.ExpectedMachine{ + { + Id: &cwssaws.UUID{Value: "test-batch-update-003"}, + BmcMacAddress: "00:11:22:33:44:77", + ChassisSerialNumber: "SN003", + }, + }, + }, + AcceptPartialResults: true, + } + + first := request.GetExpectedMachines().GetExpectedMachines()[0] + expectedResponse := &cwssaws.BatchExpectedMachineOperationResponse{ + Results: []*cwssaws.ExpectedMachineOperationResult{ + {Id: first.GetId(), Success: true, ExpectedMachine: first}, + }, + } + + uemts.env.RegisterActivity(expectedMachineManager.UpdateExpectedMachinesOnSite) + uemts.env.OnActivity(expectedMachineManager.UpdateExpectedMachinesOnSite, mock.Anything, mock.Anything).Return(expectedResponse, nil) + + uemts.env.RegisterActivity(expectedMachineManager.UpdateExpectedMachinesOnFlow) + uemts.env.OnActivity(expectedMachineManager.UpdateExpectedMachinesOnFlow, mock.Anything, mock.Anything).Return(errors.New("Flow unavailable")) + + uemts.env.ExecuteWorkflow(UpdateExpectedMachines, request) + uemts.True(uemts.env.IsWorkflowCompleted()) + uemts.NoError(uemts.env.GetWorkflowError()) +} + func TestUpdateExpectedMachinesTestSuite(t *testing.T) { suite.Run(t, new(UpdateExpectedMachinesTestSuite)) } diff --git a/site-workflow/pkg/workflow/expectedpowershelf.go b/site-workflow/pkg/workflow/expectedpowershelf.go index c74ffcac5..0a5543932 100644 --- a/site-workflow/pkg/workflow/expectedpowershelf.go +++ b/site-workflow/pkg/workflow/expectedpowershelf.go @@ -93,8 +93,8 @@ func CreateExpectedPowerShelf(ctx workflow.Context, request *cwssaws.ExpectedPow return nil } -// UpdateExpectedPowerShelf is a workflow to update an Expected Power Shelf using the UpdateExpectedPowerShelfOnSite activity -// TODO: Add Flow PatchComponent dual-write when update/delete Flow support is implemented +// UpdateExpectedPowerShelf is a workflow to update an Expected Power Shelf using the UpdateExpectedPowerShelfOnSite activity, +// then also patches the component in Flow via UpdateExpectedPowerShelfOnFlow (best-effort). func UpdateExpectedPowerShelf(ctx workflow.Context, request *cwssaws.ExpectedPowerShelf) error { logger := log.With().Str("Workflow", "ExpectedPowerShelf").Str("Action", "Update").Str("ID", request.GetExpectedPowerShelfId().GetValue()).Str("Expected MAC address", request.BmcMacAddress).Str("Serial", request.ShelfSerialNumber).Logger() @@ -124,13 +124,18 @@ func UpdateExpectedPowerShelf(ctx workflow.Context, request *cwssaws.ExpectedPow return err } + err = workflow.ExecuteActivity(ctx, expectedPowerShelfManager.UpdateExpectedPowerShelfOnFlow, request).Get(ctx, nil) + if err != nil { + logger.Warn().Err(err).Str("Activity", "UpdateExpectedPowerShelfOnFlow").Msg("Failed to update component on Flow, Core write succeeded") + } + logger.Info().Msg("completing workflow") return nil } -// DeleteExpectedPowerShelf is a workflow to Delete an Expected Power Shelf using the DeleteExpectedPowerShelfOnSite activity -// TODO: Add Flow DeleteComponent dual-write when update/delete Flow support is implemented +// DeleteExpectedPowerShelf is a workflow to Delete an Expected Power Shelf using the DeleteExpectedPowerShelfOnSite activity, +// then also deletes the component from Flow via DeleteExpectedPowerShelfOnFlow (best-effort). func DeleteExpectedPowerShelf(ctx workflow.Context, request *cwssaws.ExpectedPowerShelfRequest) error { logger := log.With().Str("Workflow", "ExpectedPowerShelf").Str("Action", "Delete").Str("ID", request.GetExpectedPowerShelfId().GetValue()).Str("optional MAC address", request.BmcMacAddress).Logger() @@ -160,6 +165,11 @@ func DeleteExpectedPowerShelf(ctx workflow.Context, request *cwssaws.ExpectedPow return err } + err = workflow.ExecuteActivity(ctx, expectedPowerShelfManager.DeleteExpectedPowerShelfOnFlow, request).Get(ctx, nil) + if err != nil { + logger.Warn().Err(err).Str("Activity", "DeleteExpectedPowerShelfOnFlow").Msg("Failed to delete component on Flow, Core write succeeded") + } + logger.Info().Msg("completing workflow") return nil diff --git a/site-workflow/pkg/workflow/expectedpowershelf_test.go b/site-workflow/pkg/workflow/expectedpowershelf_test.go index 3d0d9d4c0..e9e1bee61 100644 --- a/site-workflow/pkg/workflow/expectedpowershelf_test.go +++ b/site-workflow/pkg/workflow/expectedpowershelf_test.go @@ -155,11 +155,12 @@ func (uepsts *UpdateExpectedPowerShelfTestSuite) Test_UpdateExpectedPowerShelf_S ShelfSerialNumber: "SHELF-001", } - // Mock UpdateExpectedPowerShelfOnSite activity uepsts.env.RegisterActivity(expectedPowerShelfManager.UpdateExpectedPowerShelfOnSite) uepsts.env.OnActivity(expectedPowerShelfManager.UpdateExpectedPowerShelfOnSite, mock.Anything, mock.Anything).Return(nil) - // Execute workflow + uepsts.env.RegisterActivity(expectedPowerShelfManager.UpdateExpectedPowerShelfOnFlow) + uepsts.env.OnActivity(expectedPowerShelfManager.UpdateExpectedPowerShelfOnFlow, mock.Anything, mock.Anything).Return(nil) + uepsts.env.ExecuteWorkflow(UpdateExpectedPowerShelf, request) uepsts.True(uepsts.env.IsWorkflowCompleted()) uepsts.NoError(uepsts.env.GetWorkflowError()) @@ -176,16 +177,36 @@ func (uepsts *UpdateExpectedPowerShelfTestSuite) Test_UpdateExpectedPowerShelf_F errMsg := "Site Controller communication error" - // Mock UpdateExpectedPowerShelfOnSite activity uepsts.env.RegisterActivity(expectedPowerShelfManager.UpdateExpectedPowerShelfOnSite) uepsts.env.OnActivity(expectedPowerShelfManager.UpdateExpectedPowerShelfOnSite, mock.Anything, mock.Anything).Return(errors.New(errMsg)) - // execute UpdateExpectedPowerShelf workflow + uepsts.env.RegisterActivity(expectedPowerShelfManager.UpdateExpectedPowerShelfOnFlow) + uepsts.env.ExecuteWorkflow(UpdateExpectedPowerShelf, request) uepsts.True(uepsts.env.IsWorkflowCompleted()) uepsts.Error(uepsts.env.GetWorkflowError()) } +func (uepsts *UpdateExpectedPowerShelfTestSuite) Test_UpdateExpectedPowerShelf_CoreSuccess_FlowFailure() { + var expectedPowerShelfManager iActivity.ManageExpectedPowerShelf + + request := &cwssaws.ExpectedPowerShelf{ + ExpectedPowerShelfId: &cwssaws.UUID{Value: "test-update-workflow-002"}, + BmcMacAddress: "00:11:22:33:44:55", + ShelfSerialNumber: "SHELF-002", + } + + uepsts.env.RegisterActivity(expectedPowerShelfManager.UpdateExpectedPowerShelfOnSite) + uepsts.env.OnActivity(expectedPowerShelfManager.UpdateExpectedPowerShelfOnSite, mock.Anything, mock.Anything).Return(nil) + + uepsts.env.RegisterActivity(expectedPowerShelfManager.UpdateExpectedPowerShelfOnFlow) + uepsts.env.OnActivity(expectedPowerShelfManager.UpdateExpectedPowerShelfOnFlow, mock.Anything, mock.Anything).Return(errors.New("Flow unavailable")) + + uepsts.env.ExecuteWorkflow(UpdateExpectedPowerShelf, request) + uepsts.True(uepsts.env.IsWorkflowCompleted()) + uepsts.NoError(uepsts.env.GetWorkflowError()) +} + func TestUpdateExpectedPowerShelfTestSuite(t *testing.T) { suite.Run(t, new(UpdateExpectedPowerShelfTestSuite)) } @@ -213,11 +234,12 @@ func (depsts *DeleteExpectedPowerShelfTestSuite) Test_DeleteExpectedPowerShelf_S BmcMacAddress: "00:11:22:33:44:55", } - // Mock DeleteExpectedPowerShelfOnSite activity depsts.env.RegisterActivity(expectedPowerShelfManager.DeleteExpectedPowerShelfOnSite) depsts.env.OnActivity(expectedPowerShelfManager.DeleteExpectedPowerShelfOnSite, mock.Anything, mock.Anything).Return(nil) - // execute workflow + depsts.env.RegisterActivity(expectedPowerShelfManager.DeleteExpectedPowerShelfOnFlow) + depsts.env.OnActivity(expectedPowerShelfManager.DeleteExpectedPowerShelfOnFlow, mock.Anything, mock.Anything).Return(nil) + depsts.env.ExecuteWorkflow(DeleteExpectedPowerShelf, request) depsts.True(depsts.env.IsWorkflowCompleted()) depsts.NoError(depsts.env.GetWorkflowError()) @@ -233,16 +255,35 @@ func (depsts *DeleteExpectedPowerShelfTestSuite) Test_DeleteExpectedPowerShelf_F errMsg := "Site Controller communication error" - // Mock DeleteExpectedPowerShelfOnSite activity depsts.env.RegisterActivity(expectedPowerShelfManager.DeleteExpectedPowerShelfOnSite) depsts.env.OnActivity(expectedPowerShelfManager.DeleteExpectedPowerShelfOnSite, mock.Anything, mock.Anything).Return(errors.New(errMsg)) - // execute DeleteExpectedPowerShelf workflow + depsts.env.RegisterActivity(expectedPowerShelfManager.DeleteExpectedPowerShelfOnFlow) + depsts.env.ExecuteWorkflow(DeleteExpectedPowerShelf, request) depsts.True(depsts.env.IsWorkflowCompleted()) depsts.Error(depsts.env.GetWorkflowError()) } +func (depsts *DeleteExpectedPowerShelfTestSuite) Test_DeleteExpectedPowerShelf_CoreSuccess_FlowFailure() { + var expectedPowerShelfManager iActivity.ManageExpectedPowerShelf + + request := &cwssaws.ExpectedPowerShelfRequest{ + ExpectedPowerShelfId: &cwssaws.UUID{Value: "test-delete-workflow-002"}, + BmcMacAddress: "00:11:22:33:44:55", + } + + depsts.env.RegisterActivity(expectedPowerShelfManager.DeleteExpectedPowerShelfOnSite) + depsts.env.OnActivity(expectedPowerShelfManager.DeleteExpectedPowerShelfOnSite, mock.Anything, mock.Anything).Return(nil) + + depsts.env.RegisterActivity(expectedPowerShelfManager.DeleteExpectedPowerShelfOnFlow) + depsts.env.OnActivity(expectedPowerShelfManager.DeleteExpectedPowerShelfOnFlow, mock.Anything, mock.Anything).Return(errors.New("Flow unavailable")) + + depsts.env.ExecuteWorkflow(DeleteExpectedPowerShelf, request) + depsts.True(depsts.env.IsWorkflowCompleted()) + depsts.NoError(depsts.env.GetWorkflowError()) +} + func TestDeleteExpectedPowerShelfTestSuite(t *testing.T) { suite.Run(t, new(DeleteExpectedPowerShelfTestSuite)) } diff --git a/site-workflow/pkg/workflow/expectedrack.go b/site-workflow/pkg/workflow/expectedrack.go index e1f377328..2297b32bc 100644 --- a/site-workflow/pkg/workflow/expectedrack.go +++ b/site-workflow/pkg/workflow/expectedrack.go @@ -96,9 +96,9 @@ func CreateExpectedRack(ctx workflow.Context, request *cwssaws.ExpectedRack) err return nil } -// UpdateExpectedRack is a workflow to update an Expected Rack using the -// UpdateExpectedRackOnSite activity. -// TODO: Add Flow PatchComponent dual-write when update/delete Flow support is implemented +// UpdateExpectedRack is a workflow to update an Expected Rack using +// UpdateExpectedRackOnSite, then also patches the rack in Flow via +// UpdateExpectedRackOnFlow (best-effort). func UpdateExpectedRack(ctx workflow.Context, request *cwssaws.ExpectedRack) error { logger := log.With().Str("Workflow", "ExpectedRack").Str("Action", "Update").Str("ID", request.GetRackId().GetId()).Str("RackProfileID", request.GetRackType()).Logger() @@ -114,14 +114,19 @@ func UpdateExpectedRack(ctx workflow.Context, request *cwssaws.ExpectedRack) err return err } + err = workflow.ExecuteActivity(ctx, expectedRackManager.UpdateExpectedRackOnFlow, request).Get(ctx, nil) + if err != nil { + logger.Warn().Err(err).Str("Activity", "UpdateExpectedRackOnFlow").Msg("Failed to update rack on Flow, Core write succeeded") + } + logger.Info().Msg("completing workflow") return nil } -// DeleteExpectedRack is a workflow to delete an Expected Rack using the -// DeleteExpectedRackOnSite activity. -// TODO: Add Flow PatchComponent dual-write when update/delete Flow support is implemented +// DeleteExpectedRack is a workflow to delete an Expected Rack using +// DeleteExpectedRackOnSite, then also deletes the rack from Flow via +// DeleteExpectedRackOnFlow (best-effort). func DeleteExpectedRack(ctx workflow.Context, request *cwssaws.ExpectedRackRequest) error { logger := log.With().Str("Workflow", "ExpectedRack").Str("Action", "Delete").Str("ID", request.GetRackId()).Logger() @@ -137,6 +142,11 @@ func DeleteExpectedRack(ctx workflow.Context, request *cwssaws.ExpectedRackReque return err } + err = workflow.ExecuteActivity(ctx, expectedRackManager.DeleteExpectedRackOnFlow, request).Get(ctx, nil) + if err != nil { + logger.Warn().Err(err).Str("Activity", "DeleteExpectedRackOnFlow").Msg("Failed to delete rack on Flow, Core write succeeded") + } + logger.Info().Msg("completing workflow") return nil diff --git a/site-workflow/pkg/workflow/expectedrack_test.go b/site-workflow/pkg/workflow/expectedrack_test.go index ac4ab9280..60fee2af6 100644 --- a/site-workflow/pkg/workflow/expectedrack_test.go +++ b/site-workflow/pkg/workflow/expectedrack_test.go @@ -123,11 +123,12 @@ func (uerts *UpdateExpectedRackTestSuite) Test_UpdateExpectedRack_Success() { RackType: "test-update-rack-profile-001", } - // Mock UpdateExpectedRackOnSite activity uerts.env.RegisterActivity(expectedRackManager.UpdateExpectedRackOnSite) uerts.env.OnActivity(expectedRackManager.UpdateExpectedRackOnSite, mock.Anything, mock.Anything).Return(nil) - // Execute workflow + uerts.env.RegisterActivity(expectedRackManager.UpdateExpectedRackOnFlow) + uerts.env.OnActivity(expectedRackManager.UpdateExpectedRackOnFlow, mock.Anything, mock.Anything).Return(nil) + uerts.env.ExecuteWorkflow(UpdateExpectedRack, request) uerts.True(uerts.env.IsWorkflowCompleted()) uerts.NoError(uerts.env.GetWorkflowError()) @@ -143,16 +144,35 @@ func (uerts *UpdateExpectedRackTestSuite) Test_UpdateExpectedRack_Failure() { errMsg := "Site Controller communication error" - // Mock UpdateExpectedRackOnSite activity uerts.env.RegisterActivity(expectedRackManager.UpdateExpectedRackOnSite) uerts.env.OnActivity(expectedRackManager.UpdateExpectedRackOnSite, mock.Anything, mock.Anything).Return(errors.New(errMsg)) - // Execute UpdateExpectedRack workflow + uerts.env.RegisterActivity(expectedRackManager.UpdateExpectedRackOnFlow) + uerts.env.ExecuteWorkflow(UpdateExpectedRack, request) uerts.True(uerts.env.IsWorkflowCompleted()) uerts.Error(uerts.env.GetWorkflowError()) } +func (uerts *UpdateExpectedRackTestSuite) Test_UpdateExpectedRack_CoreSuccess_FlowFailure() { + var expectedRackManager iActivity.ManageExpectedRack + + request := &cwssaws.ExpectedRack{ + RackId: &cwssaws.RackId{Id: "test-update-rack-workflow-002"}, + RackType: "test-update-rack-profile-002", + } + + uerts.env.RegisterActivity(expectedRackManager.UpdateExpectedRackOnSite) + uerts.env.OnActivity(expectedRackManager.UpdateExpectedRackOnSite, mock.Anything, mock.Anything).Return(nil) + + uerts.env.RegisterActivity(expectedRackManager.UpdateExpectedRackOnFlow) + uerts.env.OnActivity(expectedRackManager.UpdateExpectedRackOnFlow, mock.Anything, mock.Anything).Return(errors.New("Flow unavailable")) + + uerts.env.ExecuteWorkflow(UpdateExpectedRack, request) + uerts.True(uerts.env.IsWorkflowCompleted()) + uerts.NoError(uerts.env.GetWorkflowError()) +} + func TestUpdateExpectedRackTestSuite(t *testing.T) { suite.Run(t, new(UpdateExpectedRackTestSuite)) } @@ -179,11 +199,12 @@ func (derts *DeleteExpectedRackTestSuite) Test_DeleteExpectedRack_Success() { RackId: "test-delete-rack-workflow-001", } - // Mock DeleteExpectedRackOnSite activity derts.env.RegisterActivity(expectedRackManager.DeleteExpectedRackOnSite) derts.env.OnActivity(expectedRackManager.DeleteExpectedRackOnSite, mock.Anything, mock.Anything).Return(nil) - // Execute workflow + derts.env.RegisterActivity(expectedRackManager.DeleteExpectedRackOnFlow) + derts.env.OnActivity(expectedRackManager.DeleteExpectedRackOnFlow, mock.Anything, mock.Anything).Return(nil) + derts.env.ExecuteWorkflow(DeleteExpectedRack, request) derts.True(derts.env.IsWorkflowCompleted()) derts.NoError(derts.env.GetWorkflowError()) @@ -198,16 +219,34 @@ func (derts *DeleteExpectedRackTestSuite) Test_DeleteExpectedRack_Failure() { errMsg := "Site Controller communication error" - // Mock DeleteExpectedRackOnSite activity derts.env.RegisterActivity(expectedRackManager.DeleteExpectedRackOnSite) derts.env.OnActivity(expectedRackManager.DeleteExpectedRackOnSite, mock.Anything, mock.Anything).Return(errors.New(errMsg)) - // Execute DeleteExpectedRack workflow + derts.env.RegisterActivity(expectedRackManager.DeleteExpectedRackOnFlow) + derts.env.ExecuteWorkflow(DeleteExpectedRack, request) derts.True(derts.env.IsWorkflowCompleted()) derts.Error(derts.env.GetWorkflowError()) } +func (derts *DeleteExpectedRackTestSuite) Test_DeleteExpectedRack_CoreSuccess_FlowFailure() { + var expectedRackManager iActivity.ManageExpectedRack + + request := &cwssaws.ExpectedRackRequest{ + RackId: "test-delete-rack-workflow-002", + } + + derts.env.RegisterActivity(expectedRackManager.DeleteExpectedRackOnSite) + derts.env.OnActivity(expectedRackManager.DeleteExpectedRackOnSite, mock.Anything, mock.Anything).Return(nil) + + derts.env.RegisterActivity(expectedRackManager.DeleteExpectedRackOnFlow) + derts.env.OnActivity(expectedRackManager.DeleteExpectedRackOnFlow, mock.Anything, mock.Anything).Return(errors.New("Flow unavailable")) + + derts.env.ExecuteWorkflow(DeleteExpectedRack, request) + derts.True(derts.env.IsWorkflowCompleted()) + derts.NoError(derts.env.GetWorkflowError()) +} + func TestDeleteExpectedRackTestSuite(t *testing.T) { suite.Run(t, new(DeleteExpectedRackTestSuite)) } diff --git a/site-workflow/pkg/workflow/expectedswitch.go b/site-workflow/pkg/workflow/expectedswitch.go index 0af459f31..92825adf9 100644 --- a/site-workflow/pkg/workflow/expectedswitch.go +++ b/site-workflow/pkg/workflow/expectedswitch.go @@ -93,8 +93,8 @@ func CreateExpectedSwitch(ctx workflow.Context, request *cwssaws.ExpectedSwitch) return nil } -// UpdateExpectedSwitch is a workflow to update an Expected Switch using the UpdateExpectedSwitchOnSite activity -// TODO: Add Flow PatchComponent dual-write when update/delete Flow support is implemented +// UpdateExpectedSwitch is a workflow to update an Expected Switch using the UpdateExpectedSwitchOnSite activity, +// then also patches the component in Flow via UpdateExpectedSwitchOnFlow (best-effort). func UpdateExpectedSwitch(ctx workflow.Context, request *cwssaws.ExpectedSwitch) error { logger := log.With().Str("Workflow", "ExpectedSwitch").Str("Action", "Update").Str("ID", request.GetExpectedSwitchId().GetValue()).Str("Expected MAC address", request.BmcMacAddress).Str("Serial", request.SwitchSerialNumber).Logger() @@ -124,13 +124,18 @@ func UpdateExpectedSwitch(ctx workflow.Context, request *cwssaws.ExpectedSwitch) return err } + err = workflow.ExecuteActivity(ctx, expectedSwitchManager.UpdateExpectedSwitchOnFlow, request).Get(ctx, nil) + if err != nil { + logger.Warn().Err(err).Str("Activity", "UpdateExpectedSwitchOnFlow").Msg("Failed to update component on Flow, Core write succeeded") + } + logger.Info().Msg("completing workflow") return nil } -// DeleteExpectedSwitch is a workflow to Delete an Expected Switch using the DeleteExpectedSwitchOnSite activity -// TODO: Add Flow DeleteComponent dual-write when update/delete Flow support is implemented +// DeleteExpectedSwitch is a workflow to Delete an Expected Switch using the DeleteExpectedSwitchOnSite activity, +// then also deletes the component from Flow via DeleteExpectedSwitchOnFlow (best-effort). func DeleteExpectedSwitch(ctx workflow.Context, request *cwssaws.ExpectedSwitchRequest) error { logger := log.With().Str("Workflow", "ExpectedSwitch").Str("Action", "Delete").Str("ID", request.GetExpectedSwitchId().GetValue()).Str("optional MAC address", request.BmcMacAddress).Logger() @@ -160,6 +165,11 @@ func DeleteExpectedSwitch(ctx workflow.Context, request *cwssaws.ExpectedSwitchR return err } + err = workflow.ExecuteActivity(ctx, expectedSwitchManager.DeleteExpectedSwitchOnFlow, request).Get(ctx, nil) + if err != nil { + logger.Warn().Err(err).Str("Activity", "DeleteExpectedSwitchOnFlow").Msg("Failed to delete component on Flow, Core write succeeded") + } + logger.Info().Msg("completing workflow") return nil diff --git a/site-workflow/pkg/workflow/expectedswitch_test.go b/site-workflow/pkg/workflow/expectedswitch_test.go index 1c21aa962..cb522c3f7 100644 --- a/site-workflow/pkg/workflow/expectedswitch_test.go +++ b/site-workflow/pkg/workflow/expectedswitch_test.go @@ -155,11 +155,12 @@ func (uests *UpdateExpectedSwitchTestSuite) Test_UpdateExpectedSwitch_Success() SwitchSerialNumber: "SWITCH-001", } - // Mock UpdateExpectedSwitchOnSite activity uests.env.RegisterActivity(expectedSwitchManager.UpdateExpectedSwitchOnSite) uests.env.OnActivity(expectedSwitchManager.UpdateExpectedSwitchOnSite, mock.Anything, mock.Anything).Return(nil) - // Execute workflow + uests.env.RegisterActivity(expectedSwitchManager.UpdateExpectedSwitchOnFlow) + uests.env.OnActivity(expectedSwitchManager.UpdateExpectedSwitchOnFlow, mock.Anything, mock.Anything).Return(nil) + uests.env.ExecuteWorkflow(UpdateExpectedSwitch, request) uests.True(uests.env.IsWorkflowCompleted()) uests.NoError(uests.env.GetWorkflowError()) @@ -176,16 +177,36 @@ func (uests *UpdateExpectedSwitchTestSuite) Test_UpdateExpectedSwitch_Failure() errMsg := "Site Controller communication error" - // Mock UpdateExpectedSwitchOnSite activity uests.env.RegisterActivity(expectedSwitchManager.UpdateExpectedSwitchOnSite) uests.env.OnActivity(expectedSwitchManager.UpdateExpectedSwitchOnSite, mock.Anything, mock.Anything).Return(errors.New(errMsg)) - // execute UpdateExpectedSwitch workflow + uests.env.RegisterActivity(expectedSwitchManager.UpdateExpectedSwitchOnFlow) + uests.env.ExecuteWorkflow(UpdateExpectedSwitch, request) uests.True(uests.env.IsWorkflowCompleted()) uests.Error(uests.env.GetWorkflowError()) } +func (uests *UpdateExpectedSwitchTestSuite) Test_UpdateExpectedSwitch_CoreSuccess_FlowFailure() { + var expectedSwitchManager iActivity.ManageExpectedSwitch + + request := &cwssaws.ExpectedSwitch{ + ExpectedSwitchId: &cwssaws.UUID{Value: "test-update-workflow-002"}, + BmcMacAddress: "00:11:22:33:44:55", + SwitchSerialNumber: "SWITCH-002", + } + + uests.env.RegisterActivity(expectedSwitchManager.UpdateExpectedSwitchOnSite) + uests.env.OnActivity(expectedSwitchManager.UpdateExpectedSwitchOnSite, mock.Anything, mock.Anything).Return(nil) + + uests.env.RegisterActivity(expectedSwitchManager.UpdateExpectedSwitchOnFlow) + uests.env.OnActivity(expectedSwitchManager.UpdateExpectedSwitchOnFlow, mock.Anything, mock.Anything).Return(errors.New("Flow unavailable")) + + uests.env.ExecuteWorkflow(UpdateExpectedSwitch, request) + uests.True(uests.env.IsWorkflowCompleted()) + uests.NoError(uests.env.GetWorkflowError()) +} + func TestUpdateExpectedSwitchTestSuite(t *testing.T) { suite.Run(t, new(UpdateExpectedSwitchTestSuite)) } @@ -213,11 +234,12 @@ func (dests *DeleteExpectedSwitchTestSuite) Test_DeleteExpectedSwitch_Success() BmcMacAddress: "00:11:22:33:44:55", } - // Mock DeleteExpectedSwitchOnSite activity dests.env.RegisterActivity(expectedSwitchManager.DeleteExpectedSwitchOnSite) dests.env.OnActivity(expectedSwitchManager.DeleteExpectedSwitchOnSite, mock.Anything, mock.Anything).Return(nil) - // execute workflow + dests.env.RegisterActivity(expectedSwitchManager.DeleteExpectedSwitchOnFlow) + dests.env.OnActivity(expectedSwitchManager.DeleteExpectedSwitchOnFlow, mock.Anything, mock.Anything).Return(nil) + dests.env.ExecuteWorkflow(DeleteExpectedSwitch, request) dests.True(dests.env.IsWorkflowCompleted()) dests.NoError(dests.env.GetWorkflowError()) @@ -233,16 +255,35 @@ func (dests *DeleteExpectedSwitchTestSuite) Test_DeleteExpectedSwitch_Failure() errMsg := "Site Controller communication error" - // Mock DeleteExpectedSwitchOnSite activity dests.env.RegisterActivity(expectedSwitchManager.DeleteExpectedSwitchOnSite) dests.env.OnActivity(expectedSwitchManager.DeleteExpectedSwitchOnSite, mock.Anything, mock.Anything).Return(errors.New(errMsg)) - // execute DeleteExpectedSwitch workflow + dests.env.RegisterActivity(expectedSwitchManager.DeleteExpectedSwitchOnFlow) + dests.env.ExecuteWorkflow(DeleteExpectedSwitch, request) dests.True(dests.env.IsWorkflowCompleted()) dests.Error(dests.env.GetWorkflowError()) } +func (dests *DeleteExpectedSwitchTestSuite) Test_DeleteExpectedSwitch_CoreSuccess_FlowFailure() { + var expectedSwitchManager iActivity.ManageExpectedSwitch + + request := &cwssaws.ExpectedSwitchRequest{ + ExpectedSwitchId: &cwssaws.UUID{Value: "test-delete-workflow-002"}, + BmcMacAddress: "00:11:22:33:44:55", + } + + dests.env.RegisterActivity(expectedSwitchManager.DeleteExpectedSwitchOnSite) + dests.env.OnActivity(expectedSwitchManager.DeleteExpectedSwitchOnSite, mock.Anything, mock.Anything).Return(nil) + + dests.env.RegisterActivity(expectedSwitchManager.DeleteExpectedSwitchOnFlow) + dests.env.OnActivity(expectedSwitchManager.DeleteExpectedSwitchOnFlow, mock.Anything, mock.Anything).Return(errors.New("Flow unavailable")) + + dests.env.ExecuteWorkflow(DeleteExpectedSwitch, request) + dests.True(dests.env.IsWorkflowCompleted()) + dests.NoError(dests.env.GetWorkflowError()) +} + func TestDeleteExpectedSwitchTestSuite(t *testing.T) { suite.Run(t, new(DeleteExpectedSwitchTestSuite)) }