@@ -390,6 +390,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
390390 enum State {
391391 case disconnected
392392 case connected( ChannelHandlerContext , LambdaState )
393+ case closing
393394
394395 enum LambdaState {
395396 /// this is the "normal" state. Transitions to `waitingForNextInvocation`
@@ -402,7 +403,6 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
402403 case waitingForResponse
403404 case sendingResponse
404405 case sentResponse( CheckedContinuation < Void , any Error > )
405- case closing
406406 }
407407 }
408408
@@ -426,11 +426,11 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
426426 self . sendNextRequest ( context: context)
427427 }
428428
429- case . connected( _, . closing) ,
430- . connected( _, . sendingResponse) ,
429+ case . connected( _, . sendingResponse) ,
431430 . connected( _, . sentResponse) ,
432431 . connected( _, . waitingForNextInvocation) ,
433- . connected( _, . waitingForResponse) :
432+ . connected( _, . waitingForResponse) ,
433+ . closing:
434434 fatalError ( " Invalid state: \( self . state) " )
435435
436436 case . disconnected:
@@ -475,7 +475,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
475475 case . disconnected:
476476 throw NewLambdaRuntimeError ( code: . connectionToControlPlaneLost)
477477
478- case . connected ( _ , . closing) :
478+ case . closing:
479479 throw NewLambdaRuntimeError ( code: . connectionToControlPlaneGoingAway)
480480 }
481481 }
@@ -503,7 +503,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
503503 case . disconnected:
504504 throw NewLambdaRuntimeError ( code: . connectionToControlPlaneLost)
505505
506- case . connected ( _ , . closing) :
506+ case . closing:
507507 throw NewLambdaRuntimeError ( code: . connectionToControlPlaneGoingAway)
508508 }
509509 }
@@ -536,7 +536,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
536536 case . disconnected:
537537 throw NewLambdaRuntimeError ( code: . connectionToControlPlaneLost)
538538
539- case . connected ( _ , . closing) :
539+ case . closing:
540540 throw NewLambdaRuntimeError ( code: . connectionToControlPlaneGoingAway)
541541 }
542542 }
@@ -681,20 +681,61 @@ extension LambdaChannelHandler: ChannelInboundHandler {
681681 self . state = . connected( context, . idle)
682682 case . connected:
683683 break
684+ case . closing:
685+ fatalError ( " Invalid state: \( self . state) " )
684686 }
685687 }
686688
687689 func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
688690 let response = unwrapInboundIn ( data)
689691
692+ // As defined in RFC 7230 Section 6.3:
693+ // HTTP/1.1 defaults to the use of "persistent connections", allowing
694+ // multiple requests and responses to be carried over a single
695+ // connection. The "close" connection option is used to signal that a
696+ // connection will not persist after the current request/response. HTTP
697+ // implementations SHOULD support persistent connections.
698+ //
699+ // That's why we only assume the connection shall be closed if we receive
700+ // a "connection = close" header.
701+ let serverCloseConnection =
702+ response. head. headers [ " connection " ] . contains ( where: { $0. lowercased ( ) == " close " } )
703+
704+ let closeConnection = serverCloseConnection || response. head. version != . http1_1
705+
706+ if closeConnection {
707+ // If we were succeeding the request promise here directly and closing the connection
708+ // after succeeding the promise we may run into a race condition:
709+ //
710+ // The lambda runtime will ask for the next work item directly after a succeeded post
711+ // response request. The desire for the next work item might be faster than the attempt
712+ // to close the connection. This will lead to a situation where we try to the connection
713+ // but the next request has already been scheduled on the connection that we want to
714+ // close. For this reason we postpone succeeding the promise until the connection has
715+ // been closed. This codepath will only be hit in the very, very unlikely event of the
716+ // Lambda control plane demanding to close connection. (It's more or less only
717+ // implemented to support http1.1 correctly.) This behavior is ensured with the test
718+ // `LambdaTest.testNoKeepAliveServer`.
719+ self . state = . closing
720+ self . delegate. connectionWillClose ( channel: context. channel)
721+ context. close ( promise: nil )
722+ } else {
723+ self . state = . connected( context, . idle)
724+ }
725+
726+ // handle response content
727+
690728 switch self . state {
691729 case . connected( let context, . waitingForNextInvocation( let continuation) ) :
692730 do {
693731 let metadata = try InvocationMetadata ( headers: response. head. headers)
694732 self . state = . connected( context, . waitingForResponse)
695733 continuation. resume ( returning: Invocation ( metadata: metadata, event: response. body ?? ByteBuffer ( ) ) )
696734 } catch {
697- self . state = . connected( context, . closing)
735+ self . state = . closing
736+
737+ self . delegate. connectionWillClose ( channel: context. channel)
738+ context. close ( promise: nil )
698739 continuation. resume (
699740 throwing: NewLambdaRuntimeError ( code: . invocationMissingMetadata, underlying: error)
700741 )
@@ -704,46 +745,14 @@ extension LambdaChannelHandler: ChannelInboundHandler {
704745 if response. head. status == . accepted {
705746 self . state = . connected( context, . idle)
706747 continuation. resume ( )
748+ } else {
749+ self . state = . connected( context, . idle)
750+ continuation. resume ( throwing: NewLambdaRuntimeError ( code: . unexpectedStatusCodeForRequest) )
707751 }
708752
709- case . disconnected, . connected( _, _) :
753+ case . disconnected, . closing , . connected( _, _) :
710754 break
711755 }
712-
713- // // As defined in RFC 7230 Section 6.3:
714- // // HTTP/1.1 defaults to the use of "persistent connections", allowing
715- // // multiple requests and responses to be carried over a single
716- // // connection. The "close" connection option is used to signal that a
717- // // connection will not persist after the current request/response. HTTP
718- // // implementations SHOULD support persistent connections.
719- // //
720- // // That's why we only assume the connection shall be closed if we receive
721- // // a "connection = close" header.
722- // let serverCloseConnection =
723- // response.head.headers["connection"].contains(where: { $0.lowercased() == "close" })
724- //
725- // let closeConnection = serverCloseConnection || response.head.version != .http1_1
726- //
727- // if closeConnection {
728- // // If we were succeeding the request promise here directly and closing the connection
729- // // after succeeding the promise we may run into a race condition:
730- // //
731- // // The lambda runtime will ask for the next work item directly after a succeeded post
732- // // response request. The desire for the next work item might be faster than the attempt
733- // // to close the connection. This will lead to a situation where we try to the connection
734- // // but the next request has already been scheduled on the connection that we want to
735- // // close. For this reason we postpone succeeding the promise until the connection has
736- // // been closed. This codepath will only be hit in the very, very unlikely event of the
737- // // Lambda control plane demanding to close connection. (It's more or less only
738- // // implemented to support http1.1 correctly.) This behavior is ensured with the test
739- // // `LambdaTest.testNoKeepAliveServer`.
740- // self.state = .waitForConnectionClose(httpResponse, promise)
741- // _ = context.channel.close()
742- // return
743- // } else {
744- // self.state = .idle
745- // promise.succeed(httpResponse)
746- // }
747756 }
748757
749758 func errorCaught( context: ChannelHandlerContext , error: Error ) {
0 commit comments