Skip to content
This repository was archived by the owner on Jun 2, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,18 @@ func (api *API) RegisterSubscriber() error {
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedMachineManager.UpdateExpectedMachineOnSite)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedMachine: Successfully registered UpdateExpectedMachineOnSite activity")

// Register UpdateExpectedMachineOnFlow activity
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedMachineManager.UpdateExpectedMachineOnFlow)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedMachine: Successfully registered UpdateExpectedMachineOnFlow activity")

// Register DeleteExpectedMachineOnSite activity
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedMachineManager.DeleteExpectedMachineOnSite)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedMachine: Successfully registered DeleteExpectedMachineOnSite activity")

// Register DeleteExpectedMachineOnFlow activity
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedMachineManager.DeleteExpectedMachineOnFlow)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedMachine: Successfully registered DeleteExpectedMachineOnFlow activity")

// Register CreateExpectedMachinesOnSite activity (Batch)
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedMachineManager.CreateExpectedMachinesOnSite)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedMachine: Successfully registered CreateExpectedMachinesOnSite activity")
Expand All @@ -65,5 +73,9 @@ func (api *API) RegisterSubscriber() error {
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedMachineManager.UpdateExpectedMachinesOnSite)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedMachine: Successfully registered UpdateExpectedMachinesOnSite activity")

// Register UpdateExpectedMachinesOnFlow activity (Batch)
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedMachineManager.UpdateExpectedMachinesOnFlow)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedMachine: Successfully registered UpdateExpectedMachinesOnFlow activity")

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,17 @@ func (api *API) RegisterSubscriber() error {
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedPowerShelfManager.UpdateExpectedPowerShelfOnSite)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedPowerShelf: Successfully registered UpdateExpectedPowerShelfOnSite activity")

// Register UpdateExpectedPowerShelfOnFlow activity
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedPowerShelfManager.UpdateExpectedPowerShelfOnFlow)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedPowerShelf: Successfully registered UpdateExpectedPowerShelfOnFlow activity")

// Register DeleteExpectedPowerShelfOnSite activity
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedPowerShelfManager.DeleteExpectedPowerShelfOnSite)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedPowerShelf: Successfully registered DeleteExpectedPowerShelfOnSite activity")

// Register DeleteExpectedPowerShelfOnFlow activity
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedPowerShelfManager.DeleteExpectedPowerShelfOnFlow)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedPowerShelf: Successfully registered DeleteExpectedPowerShelfOnFlow activity")

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,18 @@ func (api *API) RegisterSubscriber() error {
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedRackManager.UpdateExpectedRackOnSite)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedRack: Successfully registered UpdateExpectedRackOnSite activity")

// Register UpdateExpectedRackOnFlow activity
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedRackManager.UpdateExpectedRackOnFlow)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedRack: Successfully registered UpdateExpectedRackOnFlow activity")

// Register DeleteExpectedRackOnSite activity
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedRackManager.DeleteExpectedRackOnSite)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedRack: Successfully registered DeleteExpectedRackOnSite activity")

// Register DeleteExpectedRackOnFlow activity
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedRackManager.DeleteExpectedRackOnFlow)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedRack: Successfully registered DeleteExpectedRackOnFlow activity")

// Register ReplaceAllExpectedRacksOnSite activity
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedRackManager.ReplaceAllExpectedRacksOnSite)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedRack: Successfully registered ReplaceAllExpectedRacksOnSite activity")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,17 @@ func (api *API) RegisterSubscriber() error {
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedSwitchManager.UpdateExpectedSwitchOnSite)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedSwitch: Successfully registered UpdateExpectedSwitchOnSite activity")

// Register UpdateExpectedSwitchOnFlow activity
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedSwitchManager.UpdateExpectedSwitchOnFlow)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedSwitch: Successfully registered UpdateExpectedSwitchOnFlow activity")

// Register DeleteExpectedSwitchOnSite activity
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedSwitchManager.DeleteExpectedSwitchOnSite)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedSwitch: Successfully registered DeleteExpectedSwitchOnSite activity")

// Register DeleteExpectedSwitchOnFlow activity
ManagerAccess.Data.EB.Managers.Workflow.Temporal.Worker.RegisterActivity(expectedSwitchManager.DeleteExpectedSwitchOnFlow)
ManagerAccess.Data.EB.Log.Info().Msg("ExpectedSwitch: Successfully registered DeleteExpectedSwitchOnFlow activity")

return nil
}
153 changes: 153 additions & 0 deletions site-workflow/pkg/activity/expectedmachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,129 @@ func (mem *ManageExpectedMachine) CreateExpectedMachinesOnFlow(ctx context.Conte
return nil
}

// UpdateExpectedMachineOnFlow patches an Expected Machine component in Flow via PatchComponent.
// Only patchable fields (firmware_version, position, description, rack_id, bmcs) are sent;
// identity fields like name/serial_number are immutable in Flow once created.
// Best-effort: missing/unconnected Flow client skips silently; RPC failures are returned.
func (mem *ManageExpectedMachine) UpdateExpectedMachineOnFlow(ctx context.Context, request *cwssaws.ExpectedMachine) error {
logger := log.With().Str("Activity", "UpdateExpectedMachineOnFlow").Logger()

logger.Info().Msg("Starting activity")

if request == nil {
return temporal.NewNonRetryableApplicationError("received empty update Expected Machine request for Flow", swe.ErrTypeInvalidRequest, errors.New("nil request"))
}
if request.GetId().GetValue() == "" {
return temporal.NewNonRetryableApplicationError("received update Expected Machine request for Flow without required id field", swe.ErrTypeInvalidRequest, errors.New("missing id"))
}

if mem.flowGrpcAtomicClient == nil {
logger.Warn().Msg("Flow client not configured, skipping Flow component update")
return nil
}

flowClient := mem.flowGrpcAtomicClient.GetClient()
if flowClient == nil {
logger.Warn().Msg("Flow client not connected, skipping Flow component update")
return nil
}

patchReq := componentToPatchRequest(expectedMachineToFlowComponent(request))
_, err := flowClient.GrpcServiceClient().PatchComponent(ctx, patchReq)
if err != nil {
logger.Warn().Err(err).Msg("Failed to update Expected Machine component on Flow")
return swe.WrapErr(err)
}

logger.Info().Msg("Completed activity")
return nil
}

// UpdateExpectedMachinesOnFlow patches multiple Expected Machine components in Flow via PatchComponent.
// Mirrors CreateExpectedMachinesOnFlow's per-item loop: per-item failures are counted
// and logged but never fail the activity, since Flow dual-write is best-effort.
func (mem *ManageExpectedMachine) UpdateExpectedMachinesOnFlow(ctx context.Context, request *cwssaws.BatchExpectedMachineOperationRequest) error {
logger := log.With().Str("Activity", "UpdateExpectedMachinesOnFlow").Logger()

logger.Info().Msg("Starting activity")

if mem.flowGrpcAtomicClient == nil {
logger.Warn().Msg("Flow client not configured, skipping Flow component update")
return nil
}

flowClient := mem.flowGrpcAtomicClient.GetClient()
if flowClient == nil {
logger.Warn().Msg("Flow client not connected, skipping Flow component update")
return nil
}

grpcServiceClient := flowClient.GrpcServiceClient()
machines := request.GetExpectedMachines().GetExpectedMachines()
successes := 0
failures := 0

// TODO(chet): Work with Flow team to add batch support so we don't have to loop here.
for _, machine := range machines {
if machine.GetId().GetValue() == "" {
logger.Warn().Msg("Skipping Expected Machine without id in batch Flow update")
failures++
continue
}
patchReq := componentToPatchRequest(expectedMachineToFlowComponent(machine))
_, err := grpcServiceClient.PatchComponent(ctx, patchReq)
if err != nil {
logger.Warn().Err(err).Str("ID", machine.GetId().GetValue()).Msg("Failed to update Expected Machine component on Flow")
failures++
} else {
successes++
}
}

logger.Info().
Int("Total", len(machines)).
Int("Succeeded", successes).
Int("Failed", failures).
Msg("Completed activity")

return nil
}
Comment on lines +542 to +590

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Missing nil request validation could cause runtime panic.

UpdateExpectedMachinesOnFlow accesses request.GetExpectedMachines().GetExpectedMachines() at line 562 without validating that request itself is non-nil. If the activity is invoked with a nil request, this will panic.

Compare with:

  • UpdateExpectedMachineOnFlow validates request at lines 513-518
  • UpdateExpectedMachinesOnSite validates request at line 724
  • CreateExpectedMachinesOnSite validates request at line 382

Add defensive validation at the start of the method for consistency and resilience.

🛡️ Add nil request validation
 func (mem *ManageExpectedMachine) UpdateExpectedMachinesOnFlow(ctx context.Context, request *cwssaws.BatchExpectedMachineOperationRequest) error {
 	logger := log.With().Str("Activity", "UpdateExpectedMachinesOnFlow").Logger()
 
 	logger.Info().Msg("Starting activity")
+
+	if request == nil {
+		return temporal.NewNonRetryableApplicationError("received empty batch update Expected Machine request for Flow", swe.ErrTypeInvalidRequest, errors.New("nil request"))
+	}
 
 	if mem.flowGrpcAtomicClient == nil {
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@site-workflow/pkg/activity/expectedmachine.go` around lines 542 - 590, Add a
nil-request guard at the top of UpdateExpectedMachinesOnFlow: check if request
== nil (and also ensure request.GetExpectedMachines() != nil) before calling
request.GetExpectedMachines().GetExpectedMachines(); if the check fails, log a
warning (using the existing logger) and return early (nil) to match the
defensive pattern used in UpdateExpectedMachineOnFlow and other similar methods
so the function never panics on a nil input.


// DeleteExpectedMachineOnFlow soft-deletes an Expected Machine component in Flow via DeleteComponent.
// Best-effort: missing/unconnected Flow client skips silently; RPC failures are returned.
func (mem *ManageExpectedMachine) DeleteExpectedMachineOnFlow(ctx context.Context, request *cwssaws.ExpectedMachineRequest) error {
logger := log.With().Str("Activity", "DeleteExpectedMachineOnFlow").Logger()

logger.Info().Msg("Starting activity")

if request == nil {
return temporal.NewNonRetryableApplicationError("received empty delete Expected Machine request for Flow", swe.ErrTypeInvalidRequest, errors.New("nil request"))
}
if request.GetId().GetValue() == "" {
return temporal.NewNonRetryableApplicationError("received delete Expected Machine request for Flow without required id field", swe.ErrTypeInvalidRequest, errors.New("missing id"))
}

if mem.flowGrpcAtomicClient == nil {
logger.Warn().Msg("Flow client not configured, skipping Flow component delete")
return nil
}

flowClient := mem.flowGrpcAtomicClient.GetClient()
if flowClient == nil {
logger.Warn().Msg("Flow client not connected, skipping Flow component delete")
return nil
}

_, err := flowClient.GrpcServiceClient().DeleteComponent(ctx, &flowv1.DeleteComponentRequest{Id: &flowv1.UUID{Id: request.GetId().GetValue()}})
if err != nil {
logger.Warn().Err(err).Msg("Failed to delete Expected Machine component on Flow")
return swe.WrapErr(err)
}

logger.Info().Msg("Completed activity")
return nil
}

// expectedMachineToFlowComponent converts a NICo ExpectedMachine proto to an Flow Component proto
func expectedMachineToFlowComponent(em *cwssaws.ExpectedMachine) *flowv1.Component {
component := &flowv1.Component{
Expand Down Expand Up @@ -559,6 +682,36 @@ func expectedMachineToFlowComponent(em *cwssaws.ExpectedMachine) *flowv1.Compone
return component
}

// componentToPatchRequest extracts the patchable subset of a Flow Component into a
// PatchComponentRequest. Used by the Expected{Machine,Switch,PowerShelf} Update-on-Flow
// activities so the conversion stays in sync with expected*ToFlowComponent.
//
// Empty / nil values are dropped rather than sent as explicit zero patches: the entity
// types don't model a "clear this field" intent, so a missing value means "leave the
// Flow side untouched", matching how the Create path declines to set empties.
func componentToPatchRequest(c *flowv1.Component) *flowv1.PatchComponentRequest {
if c == nil || c.GetInfo() == nil {
return nil
}
req := &flowv1.PatchComponentRequest{
Id: c.GetInfo().GetId(),
Bmcs: c.GetBmcs(),
}
if fv := c.GetFirmwareVersion(); fv != "" {
req.FirmwareVersion = &fv
}
if c.GetPosition() != nil {
req.Position = c.GetPosition()
}
if desc := c.GetInfo().GetDescription(); desc != "" {
req.Description = &desc
}
if rid := c.GetRackId(); rid.GetId() != "" {
req.RackId = rid
}
return req
}

// UpdateExpectedMachinesOnSite updates multiple Expected Machines on NICo using the batch endpoint
func (mem *ManageExpectedMachine) UpdateExpectedMachinesOnSite(ctx context.Context, request *cwssaws.BatchExpectedMachineOperationRequest) (*cwssaws.BatchExpectedMachineOperationResponse, error) {
logger := log.With().Str("Activity", "UpdateExpectedMachinesOnSite").Logger()
Expand Down
80 changes: 80 additions & 0 deletions site-workflow/pkg/activity/expectedmachine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,86 @@ func TestManageExpectedMachine_CreateExpectedMachineOnFlow(t *testing.T) {
})
}

func TestManageExpectedMachine_UpdateExpectedMachineOnFlow(t *testing.T) {
t.Run("nil request returns non-retryable error", func(t *testing.T) {
mm := ManageExpectedMachine{flowGrpcAtomicClient: nil}
err := mm.UpdateExpectedMachineOnFlow(context.Background(), nil)
assert.Error(t, err)
})

t.Run("empty id returns non-retryable error", func(t *testing.T) {
mm := ManageExpectedMachine{flowGrpcAtomicClient: nil}
err := mm.UpdateExpectedMachineOnFlow(context.Background(), &cwssaws.ExpectedMachine{
BmcMacAddress: "00:11:22:33:44:55", ChassisSerialNumber: "SN001",
})
assert.Error(t, err)
})

t.Run("nil Flow client skips gracefully", func(t *testing.T) {
mm := ManageExpectedMachine{flowGrpcAtomicClient: nil}
err := mm.UpdateExpectedMachineOnFlow(context.Background(), &cwssaws.ExpectedMachine{
Id: &cwssaws.UUID{Value: uuid.NewString()}, BmcMacAddress: "00:11:22:33:44:55", ChassisSerialNumber: "SN001",
})
assert.NoError(t, err)
})

t.Run("nil Flow client connection skips gracefully", func(t *testing.T) {
mm := ManageExpectedMachine{flowGrpcAtomicClient: cClient.NewFlowGrpcAtomicClient(&cClient.FlowGrpcClientConfig{})}
err := mm.UpdateExpectedMachineOnFlow(context.Background(), &cwssaws.ExpectedMachine{
Id: &cwssaws.UUID{Value: uuid.NewString()}, BmcMacAddress: "00:11:22:33:44:55", ChassisSerialNumber: "SN001",
})
assert.NoError(t, err)
})
}

func TestManageExpectedMachine_UpdateExpectedMachinesOnFlow(t *testing.T) {
req := &cwssaws.BatchExpectedMachineOperationRequest{
ExpectedMachines: &cwssaws.ExpectedMachineList{
ExpectedMachines: []*cwssaws.ExpectedMachine{
{Id: &cwssaws.UUID{Value: uuid.NewString()}, BmcMacAddress: "00:11:22:33:44:55", ChassisSerialNumber: "SN001"},
},
},
}

t.Run("nil Flow client skips gracefully", func(t *testing.T) {
mm := ManageExpectedMachine{flowGrpcAtomicClient: nil}
err := mm.UpdateExpectedMachinesOnFlow(context.Background(), req)
assert.NoError(t, err)
})

t.Run("nil Flow client connection skips gracefully", func(t *testing.T) {
mm := ManageExpectedMachine{flowGrpcAtomicClient: cClient.NewFlowGrpcAtomicClient(&cClient.FlowGrpcClientConfig{})}
err := mm.UpdateExpectedMachinesOnFlow(context.Background(), req)
assert.NoError(t, err)
})
}

func TestManageExpectedMachine_DeleteExpectedMachineOnFlow(t *testing.T) {
t.Run("nil request returns non-retryable error", func(t *testing.T) {
mm := ManageExpectedMachine{flowGrpcAtomicClient: nil}
err := mm.DeleteExpectedMachineOnFlow(context.Background(), nil)
assert.Error(t, err)
})

t.Run("empty id returns non-retryable error", func(t *testing.T) {
mm := ManageExpectedMachine{flowGrpcAtomicClient: nil}
err := mm.DeleteExpectedMachineOnFlow(context.Background(), &cwssaws.ExpectedMachineRequest{})
assert.Error(t, err)
})

t.Run("nil Flow client skips gracefully", func(t *testing.T) {
mm := ManageExpectedMachine{flowGrpcAtomicClient: nil}
err := mm.DeleteExpectedMachineOnFlow(context.Background(), &cwssaws.ExpectedMachineRequest{Id: &cwssaws.UUID{Value: uuid.NewString()}})
assert.NoError(t, err)
})

t.Run("nil Flow client connection skips gracefully", func(t *testing.T) {
mm := ManageExpectedMachine{flowGrpcAtomicClient: cClient.NewFlowGrpcAtomicClient(&cClient.FlowGrpcClientConfig{})}
err := mm.DeleteExpectedMachineOnFlow(context.Background(), &cwssaws.ExpectedMachineRequest{Id: &cwssaws.UUID{Value: uuid.NewString()}})
assert.NoError(t, err)
})
}

func TestManageExpectedMachine_CreateExpectedMachinesOnFlow(t *testing.T) {
t.Run("nil Flow client skips gracefully", func(t *testing.T) {
mm := ManageExpectedMachine{flowGrpcAtomicClient: nil}
Expand Down
Loading
Loading