Skip to content

Commit 8116ba3

Browse files
sebstoSebastien Stormacq
andauthored
Allow multiple invocations of streaming Lambda functions with the local test server (#590)
The Local HTTP Server (used when testing) used to block after one invocation of a streaming lambda function. Now you can invoke multiple times your streaming function without having to restart the local HTTP server. ### Motivation: Bug #588 ### Modifications: The flow to respond to streaming and non-streaming requests are different. In the streaming request flow, we forgot to send an 202 accept response to the lambda runtime client after it posted the end chunck of the response (in other words, `POST /response` never received an HTTP 202 response.) This caused the Lambda Runtime to hang and never issue the next `GET /next `request. ### Result: You can now send multiple invocations to your streaming lambda. --------- Co-authored-by: Sebastien Stormacq <[email protected]>
1 parent e3b74f3 commit 8116ba3

File tree

3 files changed

+357
-2
lines changed

3 files changed

+357
-2
lines changed

Sources/AWSLambdaRuntime/Lambda+LocalServer.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,14 @@ internal struct LambdaHTTPServer {
301301
await self.responsePool.push(
302302
LocalServerResponse(id: requestId, final: true)
303303
)
304+
305+
// Send acknowledgment back to Lambda runtime client for streaming END
306+
// This is the single HTTP response to the chunked HTTP request
307+
try await self.sendResponse(
308+
.init(id: requestId, status: .accepted, final: true),
309+
outbound: outbound,
310+
logger: logger
311+
)
304312
} else {
305313
// process the buffered response for non streaming requests
306314
try await self.processRequestAndSendResponse(
Lines changed: 346 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,346 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Logging
16+
import NIOCore
17+
import NIOHTTP1
18+
import NIOPosix
19+
import Testing
20+
21+
@testable import AWSLambdaRuntime
22+
23+
#if canImport(FoundationEssentials)
24+
import FoundationEssentials
25+
#else
26+
import Foundation
27+
#endif
28+
29+
#if canImport(FoundationNetworking)
30+
import FoundationNetworking
31+
#else
32+
import Foundation
33+
#endif
34+
35+
extension LambdaLocalServerTest {
36+
@Test("Streaming handler sends multiple chunks and completes successfully")
37+
@available(LambdaSwift 2.0, *)
38+
func testStreamingHandlerMultipleChunks() async throws {
39+
let customPort = 8090
40+
41+
// Set environment variable
42+
setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1)
43+
defer { unsetenv("LOCAL_LAMBDA_PORT") }
44+
45+
let results = try await withThrowingTaskGroup(of: StreamingTestResult.self) { group in
46+
47+
// Start the Lambda runtime with streaming handler
48+
group.addTask {
49+
struct StreamingTestHandler: StreamingLambdaHandler {
50+
func handle(
51+
_ event: ByteBuffer,
52+
responseWriter: some LambdaResponseStreamWriter,
53+
context: LambdaContext
54+
) async throws {
55+
// Send multiple chunks with delays to test streaming
56+
for i in 1...3 {
57+
try await responseWriter.write(ByteBuffer(string: "Chunk \(i)\n"))
58+
try await Task.sleep(for: .milliseconds(50))
59+
}
60+
try await responseWriter.finish()
61+
}
62+
}
63+
64+
let runtime = LambdaRuntime(
65+
handler: StreamingTestHandler()
66+
)
67+
68+
try await runtime._run()
69+
return StreamingTestResult(chunks: [], statusCode: 0, completed: false)
70+
}
71+
72+
// Start HTTP client to make streaming request
73+
group.addTask {
74+
// Give server time to start
75+
try await Task.sleep(for: .milliseconds(200))
76+
77+
return try await self.makeStreamingInvokeRequest(
78+
host: "127.0.0.1",
79+
port: customPort,
80+
payload: "\"test-event\""
81+
)
82+
}
83+
84+
// Get the first result (streaming response) and cancel the runtime
85+
let first = try await group.next()
86+
group.cancelAll()
87+
return first ?? StreamingTestResult(chunks: [], statusCode: 0, completed: false)
88+
}
89+
90+
// Verify streaming response
91+
#expect(results.statusCode == 200, "Expected 200 OK, got \(results.statusCode)")
92+
#expect(results.completed, "Streaming response should be completed")
93+
#expect(results.chunks.count >= 1, "Expected at least 1 chunk, got \(results.chunks.count)")
94+
95+
// The streaming chunks are concatenated in the HTTP response
96+
let fullResponse = results.chunks.joined()
97+
let expectedContent = "Chunk 1\nChunk 2\nChunk 3\n"
98+
#expect(fullResponse == expectedContent, "Response was '\(fullResponse)', expected '\(expectedContent)'")
99+
}
100+
101+
@Test("Multiple streaming invocations work correctly")
102+
@available(LambdaSwift 2.0, *)
103+
func testMultipleStreamingInvocations() async throws {
104+
let customPort = 8091
105+
106+
setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1)
107+
defer { unsetenv("LOCAL_LAMBDA_PORT") }
108+
109+
let results = try await withThrowingTaskGroup(of: [StreamingTestResult].self) { group in
110+
111+
// Start the Lambda runtime
112+
group.addTask {
113+
struct MultiStreamingHandler: StreamingLambdaHandler {
114+
func handle(
115+
_ event: ByteBuffer,
116+
responseWriter: some LambdaResponseStreamWriter,
117+
context: LambdaContext
118+
) async throws {
119+
let eventString = String(buffer: event)
120+
try await responseWriter.write(ByteBuffer(string: "Echo: \(eventString)\n"))
121+
try await responseWriter.finish()
122+
}
123+
}
124+
125+
let runtime = LambdaRuntime(
126+
handler: MultiStreamingHandler()
127+
)
128+
129+
try await runtime._run()
130+
return []
131+
}
132+
133+
// Make multiple streaming requests
134+
group.addTask {
135+
try await Task.sleep(for: .milliseconds(200))
136+
137+
var results: [StreamingTestResult] = []
138+
139+
// Make 3 sequential streaming requests
140+
for i in 1...3 {
141+
let result = try await self.makeStreamingInvokeRequest(
142+
host: "127.0.0.1",
143+
port: customPort,
144+
payload: "\"request-\(i)\""
145+
)
146+
results.append(result)
147+
148+
// Small delay between requests
149+
try await Task.sleep(for: .milliseconds(100))
150+
}
151+
152+
return results
153+
}
154+
155+
let first = try await group.next()
156+
group.cancelAll()
157+
return first ?? []
158+
}
159+
160+
// Verify all requests completed successfully
161+
#expect(results.count == 3, "Expected 3 responses, got \(results.count)")
162+
163+
for (index, result) in results.enumerated() {
164+
#expect(result.statusCode == 200, "Request \(index + 1) returned \(result.statusCode), expected 200")
165+
#expect(result.completed, "Request \(index + 1) should be completed")
166+
#expect(result.chunks.count == 1, "Request \(index + 1) should have 1 chunk, got \(result.chunks.count)")
167+
168+
let expectedContent = "Echo: \"request-\(index + 1)\"\n"
169+
#expect(result.chunks.first == expectedContent, "Request \(index + 1) content mismatch")
170+
}
171+
}
172+
173+
@Test("Streaming handler with custom headers works correctly")
174+
@available(LambdaSwift 2.0, *)
175+
func testStreamingHandlerWithCustomHeaders() async throws {
176+
let customPort = 8092
177+
178+
setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1)
179+
defer { unsetenv("LOCAL_LAMBDA_PORT") }
180+
181+
let results = try await withThrowingTaskGroup(of: StreamingTestResult.self) { group in
182+
183+
group.addTask {
184+
struct HeaderStreamingHandler: StreamingLambdaHandler {
185+
func handle(
186+
_ event: ByteBuffer,
187+
responseWriter: some LambdaResponseStreamWriter,
188+
context: LambdaContext
189+
) async throws {
190+
// Send custom headers
191+
try await responseWriter.writeStatusAndHeaders(
192+
StreamingLambdaStatusAndHeadersResponse(
193+
statusCode: 201,
194+
headers: [
195+
"Content-Type": "text/plain",
196+
"X-Custom-Header": "streaming-test",
197+
]
198+
)
199+
)
200+
201+
try await responseWriter.write(ByteBuffer(string: "Custom response"))
202+
try await responseWriter.finish()
203+
}
204+
}
205+
206+
let runtime = LambdaRuntime(
207+
handler: HeaderStreamingHandler()
208+
)
209+
210+
try await runtime._run()
211+
return StreamingTestResult(chunks: [], statusCode: 0, completed: false)
212+
}
213+
214+
group.addTask {
215+
try await Task.sleep(for: .milliseconds(200))
216+
217+
return try await self.makeStreamingInvokeRequest(
218+
host: "127.0.0.1",
219+
port: customPort,
220+
payload: "\"header-test\""
221+
)
222+
}
223+
224+
let first = try await group.next()
225+
group.cancelAll()
226+
return first ?? StreamingTestResult(chunks: [], statusCode: 0, completed: false)
227+
}
228+
229+
// Verify response (custom headers are returned as JSON in the response body)
230+
#expect(results.statusCode == 200, "Expected 200 OK, got \(results.statusCode)")
231+
#expect(results.completed, "Streaming response should be completed")
232+
#expect(results.chunks.count >= 1, "Expected at least 1 chunk, got \(results.chunks.count)")
233+
234+
// The response contains both the headers JSON and the content
235+
let fullResponse = results.chunks.joined()
236+
#expect(fullResponse.contains("\"statusCode\":201"), "Response should contain custom status code")
237+
#expect(
238+
fullResponse.contains("\"X-Custom-Header\":\"streaming-test\""),
239+
"Response should contain custom header"
240+
)
241+
#expect(fullResponse.contains("Custom response"), "Response should contain custom content")
242+
}
243+
244+
@Test("Streaming handler error handling works correctly")
245+
@available(LambdaSwift 2.0, *)
246+
func testStreamingHandlerErrorHandling() async throws {
247+
let customPort = 8093
248+
249+
setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1)
250+
defer { unsetenv("LOCAL_LAMBDA_PORT") }
251+
252+
let results = try await withThrowingTaskGroup(of: StreamingTestResult.self) { group in
253+
254+
group.addTask {
255+
struct ErrorStreamingHandler: StreamingLambdaHandler {
256+
func handle(
257+
_ event: ByteBuffer,
258+
responseWriter: some LambdaResponseStreamWriter,
259+
context: LambdaContext
260+
) async throws {
261+
let eventString = String(buffer: event)
262+
263+
if eventString.contains("error") {
264+
throw TestStreamingError.intentionalError
265+
}
266+
267+
try await responseWriter.write(ByteBuffer(string: "Success"))
268+
try await responseWriter.finish()
269+
}
270+
}
271+
272+
let runtime = LambdaRuntime(
273+
handler: ErrorStreamingHandler()
274+
)
275+
276+
try await runtime._run()
277+
return StreamingTestResult(chunks: [], statusCode: 0, completed: false)
278+
}
279+
280+
group.addTask {
281+
try await Task.sleep(for: .milliseconds(200))
282+
283+
return try await self.makeStreamingInvokeRequest(
284+
host: "127.0.0.1",
285+
port: customPort,
286+
payload: "\"trigger-error\""
287+
)
288+
}
289+
290+
let first = try await group.next()
291+
group.cancelAll()
292+
return first ?? StreamingTestResult(chunks: [], statusCode: 0, completed: false)
293+
}
294+
295+
// Verify error response
296+
#expect(results.statusCode == 500, "Expected 500 Internal Server Error, got \(results.statusCode)")
297+
#expect(results.completed, "Error response should be completed")
298+
}
299+
300+
// MARK: - Helper Methods
301+
302+
private func makeStreamingInvokeRequest(
303+
host: String,
304+
port: Int,
305+
payload: String
306+
) async throws -> StreamingTestResult {
307+
let url = URL(string: "http://\(host):\(port)/invoke")!
308+
var request = URLRequest(url: url)
309+
request.httpMethod = "POST"
310+
request.setValue("application/json", forHTTPHeaderField: "Content-Type")
311+
request.httpBody = payload.data(using: .utf8)
312+
request.timeoutInterval = 10.0
313+
314+
let (data, response) = try await URLSession.shared.data(for: request)
315+
316+
guard let httpResponse = response as? HTTPURLResponse else {
317+
// On Linux, create a custom error since URLError might not be available
318+
struct HTTPError: Error {
319+
let message: String
320+
}
321+
throw HTTPError(message: "Bad server response")
322+
}
323+
324+
// Parse the streaming response
325+
let responseString = String(data: data, encoding: .utf8) ?? ""
326+
let chunks = responseString.isEmpty ? [] : [responseString]
327+
328+
return StreamingTestResult(
329+
chunks: chunks,
330+
statusCode: httpResponse.statusCode,
331+
completed: true
332+
)
333+
}
334+
}
335+
336+
// MARK: - Test Support Types
337+
338+
struct StreamingTestResult {
339+
let chunks: [String]
340+
let statusCode: Int
341+
let completed: Bool
342+
}
343+
344+
enum TestStreamingError: Error {
345+
case intentionalError
346+
}

Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ import Testing
1919

2020
@testable import AWSLambdaRuntime
2121

22-
extension LambdaRuntimeTests {
23-
22+
// serialized to start only one runtime at a time
23+
@Suite(.serialized)
24+
struct LambdaLocalServerTest {
2425
@Test("Local server respects LOCAL_LAMBDA_PORT environment variable")
2526
@available(LambdaSwift 2.0, *)
2627
func testLocalServerCustomPort() async throws {

0 commit comments

Comments
 (0)