Skip to content

Commit e3b8ea2

Browse files
authored
fix: reconcile state after msg resend (#3613)
1 parent 4af6ae5 commit e3b8ea2

File tree

2 files changed

+22
-10
lines changed

2 files changed

+22
-10
lines changed

openmeter/notification/eventhandler/deliverystatus.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,10 @@ func sortDeliveryStatusStateByPriority(states []notification.EventDeliveryStatus
5959

6060
priority := map[notification.EventDeliveryStatusState]int8{
6161
notification.EventDeliveryStatusStatePending: 0,
62-
notification.EventDeliveryStatusStateResending: 0,
63-
notification.EventDeliveryStatusStateSending: 1,
64-
notification.EventDeliveryStatusStateFailed: 2,
65-
notification.EventDeliveryStatusStateSuccess: 2,
62+
notification.EventDeliveryStatusStateResending: 1,
63+
notification.EventDeliveryStatusStateSending: 2,
64+
notification.EventDeliveryStatusStateFailed: 3,
65+
notification.EventDeliveryStatusStateSuccess: 3,
6666
}
6767

6868
slices.SortFunc(states, func(a, b notification.EventDeliveryStatus) int {

openmeter/notification/eventhandler/webhook.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,12 @@ func (h *Handler) reconcileWebhookEvent(ctx context.Context, event *notification
5454
if len(sortedActiveStatuses) == 0 {
5555
return nil
5656
}
57+
var err error
5758

5859
// Fetch the list of webhook endpoints for the active delivery statuses.
59-
webhooksOut, err := h.webhook.ListWebhooks(ctx, webhook.ListWebhooksInput{
60+
var webhooksOut []webhook.Webhook
61+
62+
webhooksOut, err = h.webhook.ListWebhooks(ctx, webhook.ListWebhooksInput{
6063
Namespace: event.Namespace,
6164
IDs: lo.Map(sortedActiveStatuses, func(item notification.EventDeliveryStatus, _ int) string {
6265
return item.ChannelID
@@ -73,7 +76,9 @@ func (h *Handler) reconcileWebhookEvent(ctx context.Context, event *notification
7376

7477
// Check if the message already exists in the webhook provider.
7578
// Return the error if it is other than not found as it is going to be handled by the reconciler logic.
76-
msg, err := h.getWebhookMessage(ctx, event)
79+
var msg *webhook.Message
80+
81+
msg, err = h.getWebhookMessage(ctx, event)
7782
if err != nil && !webhook.IsNotFoundError(err) {
7883
return fmt.Errorf("failed to get webhook message: %w", err)
7984
}
@@ -144,7 +149,7 @@ func (h *Handler) reconcileWebhookEvent(ctx context.Context, event *notification
144149
NextAttempt: nil,
145150
Attempts: status.Attempts,
146151
}
147-
case webhook.IsUnrecoverableError(err):
152+
case webhook.IsUnrecoverableError(err), webhook.IsValidationError(err):
148153
// Unrecoverable error happened, no retry is possible.
149154

150155
span.AddEvent("fetching webhook message from provider returned unrecoverable error",
@@ -155,7 +160,10 @@ func (h *Handler) reconcileWebhookEvent(ctx context.Context, event *notification
155160
span.RecordError(err, trace.WithAttributes(spanAttrs...), trace.WithAttributes(deliveryStatusAttrs...))
156161

157162
h.logger.ErrorContext(ctx, "fetching webhook message from provider returned unrecoverable error",
158-
"error", err.Error(), "delivery_status.state", status.State, "namespace", event.Namespace, "event_id", event.ID)
163+
"error", err.Error(),
164+
"notification.delivery_status.state", status.State,
165+
"namespace", event.Namespace,
166+
"notification.event.id", event.ID)
159167

160168
input = &notification.UpdateEventDeliveryStatusInput{
161169
NamespacedID: status.NamespacedID,
@@ -232,7 +240,9 @@ func (h *Handler) reconcileWebhookEvent(ctx context.Context, event *notification
232240
)
233241
}
234242
case notification.EventDeliveryStatusStateResending:
235-
err = h.resendWebhookMessage(ctx, event, &status)
243+
// Note: keep error local, so we do not break the reconcile logic
244+
// for delivery status in 'PENDING' or 'SENDING' states.
245+
err := h.resendWebhookMessage(ctx, event, &status)
236246
if err != nil {
237247
errs = append(errs, fmt.Errorf("failed to resend webhook message: %w", err))
238248
}
@@ -525,7 +535,9 @@ func (h *Handler) sendWebhookMessage(ctx context.Context, event *notification.Ev
525535

526536
payload, err := eventAsPayload(event)
527537
if err != nil {
528-
return nil, fmt.Errorf("failed to serialize webhook message payload: %w", err)
538+
return nil, webhook.NewValidationError(
539+
fmt.Errorf("failed to serialize webhook message payload: %w", err),
540+
)
529541
}
530542

531543
msg, err := h.webhook.SendMessage(ctx, webhook.SendMessageInput{

0 commit comments

Comments
 (0)