Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/Trax.Scheduler.Lambda/Configuration/LambdaRetryOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace Trax.Scheduler.Lambda.Configuration;

/// <summary>
/// Retry options for transient AWS Lambda invocation failures (throttling, service errors).
/// </summary>
/// <remarks>
/// Applied to <see cref="LambdaWorkerOptions"/> and <see cref="LambdaRunOptions"/>.
/// Retries on AWS status codes 429 (Throttling), 502 (Bad Gateway), 503 (Service Unavailable),
/// and 504 (Gateway Timeout) with exponential backoff and jitter.
/// Set <see cref="MaxRetries"/> to 0 to disable retries.
/// </remarks>
public class LambdaRetryOptions
{
/// <summary>
/// Maximum number of retry attempts before giving up.
/// </summary>
public int MaxRetries { get; set; } = 5;

/// <summary>
/// Base delay between retries. Actual delay is <c>BaseDelay * 2^attempt</c> with jitter,
/// capped at <see cref="MaxDelay"/>.
/// </summary>
public TimeSpan BaseDelay { get; set; } = TimeSpan.FromSeconds(1);

/// <summary>
/// Maximum delay between retries, preventing unbounded exponential growth.
/// </summary>
public TimeSpan MaxDelay { get; set; } = TimeSpan.FromSeconds(30);
}
5 changes: 5 additions & 0 deletions src/Trax.Scheduler.Lambda/Configuration/LambdaRunOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@ public class LambdaRunOptions
/// Use this to set a custom region, endpoint override (e.g., LocalStack), or service URL.
/// </summary>
public Action<AmazonLambdaConfig>? ConfigureLambdaClient { get; set; }

/// <summary>
/// Retry options for transient Lambda invocation failures (throttling, service errors).
/// </summary>
public LambdaRetryOptions Retry { get; set; } = new();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@ public class LambdaWorkerOptions
/// Use this to set a custom region, endpoint override (e.g., LocalStack), or service URL.
/// </summary>
public Action<AmazonLambdaConfig>? ConfigureLambdaClient { get; set; }

/// <summary>
/// Retry options for transient Lambda invocation failures (throttling, service errors).
/// </summary>
public LambdaRetryOptions Retry { get; set; } = new();
}
8 changes: 7 additions & 1 deletion src/Trax.Scheduler.Lambda/Services/LambdaJobSubmitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,13 @@ private async Task InvokeAsync(RemoteJobRequest request, CancellationToken cance
request.MetadataId
);

var response = await lambdaClient.InvokeAsync(invokeRequest, cancellationToken);
var response = await LambdaRetryHelper.InvokeWithRetryAsync(
lambdaClient,
invokeRequest,
options.Retry,
logger,
cancellationToken
);

if (!string.IsNullOrEmpty(response.FunctionError))
{
Expand Down
113 changes: 113 additions & 0 deletions src/Trax.Scheduler.Lambda/Services/LambdaRetryHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
using System.Net;
using Amazon.Lambda;
using Amazon.Lambda.Model;
using Amazon.Runtime;
using Microsoft.Extensions.Logging;
using Trax.Scheduler.Lambda.Configuration;

namespace Trax.Scheduler.Lambda.Services;

/// <summary>
/// Retries AWS Lambda invocations on transient failures (throttling, 502, 503, 504) with exponential backoff and jitter.
/// </summary>
internal static class LambdaRetryHelper
{
private static readonly HashSet<HttpStatusCode> TransientStatusCodes =
[
HttpStatusCode.TooManyRequests,
HttpStatusCode.BadGateway,
HttpStatusCode.ServiceUnavailable,
HttpStatusCode.GatewayTimeout,
];

/// <summary>
/// Invokes a Lambda function with retry logic for transient AWS failures.
/// </summary>
/// <param name="client">The Lambda client to use.</param>
/// <param name="request">The invocation request.</param>
/// <param name="options">Retry configuration.</param>
/// <param name="logger">Logger for retry diagnostics.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>The invoke response (either successful or from the last attempt before a non-transient failure).</returns>
internal static async Task<InvokeResponse> InvokeWithRetryAsync(
IAmazonLambda client,
InvokeRequest request,
LambdaRetryOptions options,
ILogger? logger,
CancellationToken ct
)
{
var maxRetries = Math.Max(0, options.MaxRetries);

Exception? lastException = null;

for (var attempt = 0; attempt <= maxRetries; attempt++)
{
try
{
return await client.InvokeAsync(request, ct);
}
catch (Exception ex) when (IsTransient(ex))
{
lastException = ex;

if (attempt == maxRetries)
break;

var delay = ComputeDelay(attempt, options);

logger?.LogWarning(
"Lambda invocation failed with transient error ({ErrorType}), retrying in {DelayMs}ms (attempt {Attempt}/{MaxRetries})",
ex.GetType().Name,
delay.TotalMilliseconds,
attempt + 1,
maxRetries
);

await Task.Delay(delay, ct);
}
}

throw lastException!;
}

/// <summary>
/// Determines whether an exception represents a transient AWS failure that should be retried.
/// </summary>
internal static bool IsTransient(Exception ex)
{
if (ex is AmazonServiceException serviceException)
return TransientStatusCodes.Contains(serviceException.StatusCode);

if (ex is HttpRequestException)
return true;

return false;
}

/// <summary>
/// Computes the delay for a given retry attempt using exponential backoff with jitter.
/// </summary>
internal static TimeSpan ComputeDelay(int attempt, LambdaRetryOptions options)
{
// Exponential backoff: baseDelay * 2^attempt
var exponentialMs = options.BaseDelay.TotalMilliseconds * Math.Pow(2, attempt);

// Add jitter: +/-25%
var jitterFactor = 0.75 + Random.Shared.NextDouble() * 0.5;
var delayMs = exponentialMs * jitterFactor;

return Clamp(TimeSpan.FromMilliseconds(delayMs), options);
}

private static TimeSpan Clamp(TimeSpan delay, LambdaRetryOptions options)
{
if (delay < TimeSpan.Zero)
return options.BaseDelay;

if (delay > options.MaxDelay)
return options.MaxDelay;

return delay;
}
}
8 changes: 7 additions & 1 deletion src/Trax.Scheduler.Lambda/Services/LambdaRunExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ public async Task<RunTrainResult> ExecuteAsync(
trainName
);

var invokeResponse = await lambdaClient.InvokeAsync(invokeRequest, ct);
var invokeResponse = await LambdaRetryHelper.InvokeWithRetryAsync(
lambdaClient,
invokeRequest,
options.Retry,
logger,
ct
);

if (!string.IsNullOrEmpty(invokeResponse.FunctionError))
{
Expand Down
5 changes: 5 additions & 0 deletions src/Trax.Scheduler.Lambda/Trax.Scheduler.Lambda.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
<AssemblyName>Trax.Scheduler.Lambda</AssemblyName>
</PropertyGroup>

<ItemGroup>
<InternalsVisibleTo Include="Trax.Scheduler.Tests" />
<InternalsVisibleTo Include="Trax.Scheduler.Tests.Integration" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="../Trax.Scheduler/Trax.Scheduler.csproj" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System.Net;
using System.Text.Json;
using Amazon.Lambda;
using Amazon.Lambda.Model;
using Amazon.Runtime;
using FluentAssertions;
using Trax.Core.Exceptions;
using Trax.Effect.Utils;
Expand Down Expand Up @@ -282,6 +284,43 @@ public async Task EnqueueAsync_MultipleCalls_EachProducesUniqueJobId()

#endregion

#region Retry Tests

[Test]
public async Task EnqueueAsync_ThrottleThenSuccess_RetriesTransparentlyAndReturnsJobId()
{
// Arrange
var options = new LambdaWorkerOptions
{
FunctionName = "my-lambda-function",
Retry = new LambdaRetryOptions
{
MaxRetries = 3,
BaseDelay = TimeSpan.FromMilliseconds(1),
},
};
var client = new MockLambdaClient();
client.ExceptionsBeforeSuccess.Enqueue(
new AmazonServiceException("Throttled") { StatusCode = HttpStatusCode.TooManyRequests }
);
var logger = Microsoft
.Extensions
.Logging
.Abstractions
.NullLogger<LambdaJobSubmitter>
.Instance;
var submitter = new LambdaJobSubmitter(client, options, logger);

// Act
var jobId = await submitter.EnqueueAsync(42);

// Assert
jobId.Should().StartWith("lambda-");
client.AllRequests.Should().HaveCount(2); // 1 throttled + 1 success
}

#endregion

#region MockLambdaClient

internal class MockLambdaClient : IAmazonLambda
Expand All @@ -291,6 +330,12 @@ internal class MockLambdaClient : IAmazonLambda
public bool ThrowOnInvoke { get; set; }
public string? FunctionError { get; set; }

/// <summary>
/// Optional queue of exceptions to throw before returning a successful response.
/// Each invocation pops the next exception; once empty, returns normally.
/// </summary>
public Queue<Exception> ExceptionsBeforeSuccess { get; } = new();

public Amazon.Runtime.IClientConfig Config => throw new NotImplementedException();

public ILambdaPaginatorFactory Paginators => throw new NotImplementedException();
Expand All @@ -302,11 +347,15 @@ public Task<InvokeResponse> InvokeAsync(
{
cancellationToken.ThrowIfCancellationRequested();

AllRequests.Add(request);

if (ExceptionsBeforeSuccess.Count > 0)
throw ExceptionsBeforeSuccess.Dequeue();

if (ThrowOnInvoke)
throw new AmazonLambdaException("Mock Lambda error");

LastRequest = request;
AllRequests.Add(request);

return Task.FromResult(
new InvokeResponse { StatusCode = 202, FunctionError = FunctionError }
Expand Down
Loading
Loading