Skip to content

Commit c3704a9

Browse files
committed
fix: resolve concurrency issues in Metrics library
- Replace LINQ FirstOrDefault with thread-safe iteration in Metrics.cs and MetricDirective.cs - Change CustomMetadata from Dictionary to ConcurrentDictionary in Metadata.cs - Add comprehensive concurrency tests with proper cleanup - Fix IndexOutOfRangeException and InvalidOperationException in multi-threaded scenarios - Maintain backward compatibility and performance
1 parent 4c3c3ce commit c3704a9

File tree

4 files changed

+310
-5
lines changed

4 files changed

+310
-5
lines changed

libraries/src/AWS.Lambda.Powertools.Metrics/Metrics.cs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,7 @@ void IMetrics.AddMetric(string key, double value, MetricUnit unit, MetricResolut
199199

200200
if (metrics.Count > 0 &&
201201
(metrics.Count == PowertoolsConfigurations.MaxMetrics ||
202-
metrics.FirstOrDefault(x => x.Name == key)
203-
?.Values.Count == PowertoolsConfigurations.MaxMetrics))
202+
GetExistingMetric(metrics, key)?.Values.Count == PowertoolsConfigurations.MaxMetrics))
204203
{
205204
Instance.Flush(true);
206205
}
@@ -624,6 +623,35 @@ public static void Flush(bool metricsOverflow = false)
624623
Instance.Flush(metricsOverflow);
625624
}
626625

626+
/// <summary>
627+
/// Safely searches for an existing metric by name without using LINQ enumeration
628+
/// </summary>
629+
/// <param name="metrics">The metrics collection to search</param>
630+
/// <param name="key">The metric name to search for</param>
631+
/// <returns>The found metric or null if not found</returns>
632+
private static MetricDefinition GetExistingMetric(List<MetricDefinition> metrics, string key)
633+
{
634+
// Use a traditional for loop instead of LINQ to avoid enumeration issues
635+
// when the collection is modified concurrently
636+
for (int i = 0; i < metrics.Count; i++)
637+
{
638+
try
639+
{
640+
var metric = metrics[i];
641+
if (metric?.Name == key)
642+
{
643+
return metric;
644+
}
645+
}
646+
catch (ArgumentOutOfRangeException)
647+
{
648+
// Collection was modified during iteration, return null to be safe
649+
return null;
650+
}
651+
}
652+
return null;
653+
}
654+
627655
/// <summary>
628656
/// Helper method for testing purposes. Clears static instance between test execution
629657
/// </summary>

libraries/src/AWS.Lambda.Powertools.Metrics/Model/Metadata.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Concurrent;
23
using System.Collections.Generic;
34
using System.Text.Json.Serialization;
45

@@ -15,7 +16,7 @@ public class Metadata
1516
public Metadata()
1617
{
1718
CloudWatchMetrics = new List<MetricDirective> { new() };
18-
CustomMetadata = new Dictionary<string, object>();
19+
CustomMetadata = new ConcurrentDictionary<string, object>();
1920
}
2021

2122
/// <summary>
@@ -43,7 +44,7 @@ public Metadata()
4344
/// </summary>
4445
/// <value>The custom metadata.</value>
4546
[JsonIgnore]
46-
public Dictionary<string, object> CustomMetadata { get; }
47+
public ConcurrentDictionary<string, object> CustomMetadata { get; }
4748

4849
/// <summary>
4950
/// Deletes all metrics from memory

libraries/src/AWS.Lambda.Powertools.Metrics/Model/MetricDirective.cs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public void AddMetric(string name, double value, MetricUnit unit, MetricResoluti
160160
{
161161
lock (_lockObj)
162162
{
163-
var metric = Metrics.FirstOrDefault(m => m.Name == name);
163+
var metric = GetExistingMetric(Metrics, name);
164164
if (metric != null)
165165
{
166166
if (metric.Values.Count < PowertoolsConfigurations.MaxMetrics)
@@ -298,6 +298,35 @@ internal void AddDimensionSet(List<DimensionSet> dimensionSets)
298298
}
299299
}
300300

301+
/// <summary>
302+
/// Safely searches for an existing metric by name without using LINQ enumeration
303+
/// </summary>
304+
/// <param name="metrics">The metrics collection to search</param>
305+
/// <param name="name">The metric name to search for</param>
306+
/// <returns>The found metric or null if not found</returns>
307+
private static MetricDefinition GetExistingMetric(List<MetricDefinition> metrics, string name)
308+
{
309+
// Use a traditional for loop instead of LINQ to avoid enumeration issues
310+
// when the collection is modified concurrently
311+
for (int i = 0; i < metrics.Count; i++)
312+
{
313+
try
314+
{
315+
var metric = metrics[i];
316+
if (metric?.Name == name)
317+
{
318+
return metric;
319+
}
320+
}
321+
catch (ArgumentOutOfRangeException)
322+
{
323+
// Collection was modified during iteration, return null to be safe
324+
return null;
325+
}
326+
}
327+
return null;
328+
}
329+
301330
/// <summary>
302331
/// Clears both default dimensions and dimensions lists
303332
/// </summary>
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
using AWS.Lambda.Powertools.Metrics;
5+
using Xunit;
6+
7+
namespace AWS.Lambda.Powertools.Metrics.Tests
8+
{
9+
public class ConcurrencyIssueTest : IDisposable
10+
{
11+
[Fact]
12+
public async Task AddMetric_ConcurrentAccess_ShouldNotThrowException()
13+
{
14+
// Arrange
15+
Metrics.ResetForTest();
16+
Metrics.SetNamespace("TestNamespace");
17+
var exceptions = new List<Exception>();
18+
var tasks = new List<Task>();
19+
20+
// Act - Simulate concurrent access from multiple threads
21+
for (int i = 0; i < 10; i++)
22+
{
23+
var taskId = i;
24+
tasks.Add(Task.Run(() =>
25+
{
26+
try
27+
{
28+
// Simulate multiple metrics being added concurrently
29+
for (int j = 0; j < 100; j++)
30+
{
31+
Metrics.AddMetric($"TestMetric_{taskId}_{j}", 1.0, MetricUnit.Count);
32+
Metrics.AddMetric($"Client.{taskId}", 1.0, MetricUnit.Count);
33+
Metrics.AddMetadata($"TestMetadata_{taskId}_{j}", $"value_{j}");
34+
}
35+
}
36+
catch (Exception ex)
37+
{
38+
lock (exceptions)
39+
{
40+
exceptions.Add(ex);
41+
}
42+
}
43+
}));
44+
}
45+
46+
await Task.WhenAll(tasks);
47+
48+
// Assert
49+
foreach (var ex in exceptions)
50+
{
51+
Console.WriteLine($"Exception: {ex.GetType().Name}: {ex.Message}");
52+
if (ex.StackTrace != null)
53+
Console.WriteLine($"Stack trace: {ex.StackTrace}");
54+
}
55+
Assert.Empty(exceptions);
56+
57+
// Cleanup after test
58+
CleanupMetrics();
59+
}
60+
61+
[Fact]
62+
public async Task AddMetric_ConcurrentAccessWithSameKey_ShouldNotThrowException()
63+
{
64+
// Arrange
65+
Metrics.ResetForTest();
66+
Metrics.SetNamespace("TestNamespace");
67+
var exceptions = new List<Exception>();
68+
var tasks = new List<Task>();
69+
70+
// Act - Simulate the specific scenario where the same metric key is used concurrently
71+
// Increase concurrency to try to reproduce the issue
72+
for (int i = 0; i < 50; i++)
73+
{
74+
tasks.Add(Task.Run(() =>
75+
{
76+
try
77+
{
78+
// This simulates the scenario where the same metric key
79+
// (like "Client.6b70*28198e") is being added from multiple threads
80+
for (int j = 0; j < 200; j++)
81+
{
82+
Metrics.AddMetric("Client.SharedKey", 1.0, MetricUnit.Count);
83+
}
84+
}
85+
catch (Exception ex)
86+
{
87+
lock (exceptions)
88+
{
89+
exceptions.Add(ex);
90+
}
91+
}
92+
}));
93+
}
94+
95+
await Task.WhenAll(tasks);
96+
97+
// Assert - Should not have any exceptions
98+
foreach (var ex in exceptions)
99+
{
100+
Console.WriteLine($"Exception: {ex.GetType().Name}: {ex.Message}");
101+
if (ex.StackTrace != null)
102+
Console.WriteLine($"Stack trace: {ex.StackTrace}");
103+
}
104+
Assert.Empty(exceptions);
105+
106+
// Cleanup after test
107+
CleanupMetrics();
108+
}
109+
110+
[Fact]
111+
public async Task AddMetric_Batch_ShouldNotThrowException()
112+
{
113+
// Arrange
114+
Metrics.ResetForTest();
115+
Metrics.SetNamespace("TestNamespace");
116+
var exceptions = new List<Exception>();
117+
var tasks = new List<Task>();
118+
119+
for (int i = 0; i < 5; i++)
120+
{
121+
var batchId = i;
122+
tasks.Add(Task.Run(async () =>
123+
{
124+
try
125+
{
126+
// Simulate DataLoader batch processing
127+
var innerTasks = new List<Task>();
128+
for (int j = 0; j < 10; j++)
129+
{
130+
var itemId = j;
131+
innerTasks.Add(Task.Run(() =>
132+
{
133+
// Simulate metrics being added from parallel DataLoader operations
134+
Metrics.AddMetric($"DataLoader.InsidersStatusDataLoader", 1.0, MetricUnit.Count);
135+
Metrics.AddMetric($"Query.insidersStatus", 1.0, MetricUnit.Count);
136+
Metrics.AddMetric($"Client.6b70*28198e", 1.0, MetricUnit.Count);
137+
Metrics.AddMetadata($"Query.insidersStatus.OperationName", "GetInsidersStatus");
138+
Metrics.AddMetadata($"Query.insidersStatus.UserId", $"user_{batchId}_{itemId}");
139+
}));
140+
}
141+
await Task.WhenAll(innerTasks);
142+
}
143+
catch (Exception ex)
144+
{
145+
lock (exceptions)
146+
{
147+
exceptions.Add(ex);
148+
}
149+
}
150+
}));
151+
}
152+
153+
await Task.WhenAll(tasks);
154+
155+
// Assert
156+
foreach (var ex in exceptions)
157+
{
158+
Console.WriteLine($"Exception: {ex.GetType().Name}: {ex.Message}");
159+
if (ex.StackTrace != null)
160+
Console.WriteLine($"Stack trace: {ex.StackTrace}");
161+
}
162+
Assert.Empty(exceptions);
163+
164+
// Cleanup after test
165+
CleanupMetrics();
166+
}
167+
168+
[Fact]
169+
public async Task AddMetric_ReproduceFirstOrDefaultIssue_ShouldNotThrowException()
170+
{
171+
// Arrange
172+
Metrics.ResetForTest();
173+
Metrics.SetNamespace("TestNamespace");
174+
var exceptions = new List<Exception>();
175+
var tasks = new List<Task>();
176+
177+
// Act - This test specifically targets the FirstOrDefault issue in line 202-203 of Metrics.cs
178+
// metrics are added and flushed rapidly to trigger collection modification
179+
for (int i = 0; i < 100; i++)
180+
{
181+
var taskId = i;
182+
tasks.Add(Task.Run(() =>
183+
{
184+
try
185+
{
186+
// Add metrics rapidly to trigger the overflow condition that calls FirstOrDefault
187+
for (int j = 0; j < 150; j++) // This should trigger multiple flushes
188+
{
189+
Metrics.AddMetric($"TestMetric_{taskId}_{j}", 1.0, MetricUnit.Count);
190+
191+
// Also add the same metric key to trigger the FirstOrDefault path
192+
if (j % 10 == 0)
193+
{
194+
Metrics.AddMetric("SharedMetric", 1.0, MetricUnit.Count);
195+
}
196+
}
197+
}
198+
catch (Exception ex)
199+
{
200+
lock (exceptions)
201+
{
202+
exceptions.Add(ex);
203+
}
204+
}
205+
}));
206+
}
207+
208+
await Task.WhenAll(tasks);
209+
210+
// Assert
211+
foreach (var ex in exceptions)
212+
{
213+
Console.WriteLine($"Exception: {ex.GetType().Name}: {ex.Message}");
214+
if (ex.StackTrace != null)
215+
Console.WriteLine($"Stack trace: {ex.StackTrace}");
216+
}
217+
Assert.Empty(exceptions);
218+
219+
// Cleanup after test
220+
CleanupMetrics();
221+
}
222+
223+
/// <summary>
224+
/// Cleanup method to ensure no state leaks between tests
225+
/// </summary>
226+
private void CleanupMetrics()
227+
{
228+
try
229+
{
230+
// Reset the static instance to clean state
231+
Metrics.ResetForTest();
232+
}
233+
catch
234+
{
235+
// Ignore cleanup errors
236+
}
237+
}
238+
239+
/// <summary>
240+
/// IDisposable implementation for proper test cleanup
241+
/// </summary>
242+
public void Dispose()
243+
{
244+
CleanupMetrics();
245+
}
246+
}
247+
}

0 commit comments

Comments
 (0)