@@ -3,7 +3,7 @@ import Logging
33
44struct QueryResult {
55 enum Value : Equatable {
6- case noRows( String )
6+ case noRows( PSQLRowStream . StatementSummary )
77 case rowDescription( [ RowDescription . Column ] )
88 }
99
@@ -16,25 +16,30 @@ struct QueryResult {
1616final class PSQLRowStream : @unchecked Sendable {
1717 private typealias AsyncSequenceSource = NIOThrowingAsyncSequenceProducer < DataRow , Error , AdaptiveRowBuffer , PSQLRowStream > . Source
1818
19+ enum StatementSummary : Equatable {
20+ case tag( String )
21+ case emptyResponse
22+ }
23+
1924 enum Source {
2025 case stream( [ RowDescription . Column ] , PSQLRowsDataSource )
21- case noRows( Result < String , Error > )
26+ case noRows( Result < StatementSummary , Error > )
2227 }
2328
2429 let eventLoop : EventLoop
2530 let logger : Logger
26-
31+
2732 private enum BufferState {
2833 case streaming( buffer: CircularBuffer < DataRow > , dataSource: PSQLRowsDataSource )
29- case finished( buffer: CircularBuffer < DataRow > , commandTag : String )
34+ case finished( buffer: CircularBuffer < DataRow > , summary : StatementSummary )
3035 case failure( Error )
3136 }
32-
37+
3338 private enum DownstreamState {
3439 case waitingForConsumer( BufferState )
3540 case iteratingRows( onRow: ( PostgresRow ) throws -> ( ) , EventLoopPromise < Void > , PSQLRowsDataSource )
3641 case waitingForAll( [ PostgresRow ] , EventLoopPromise < [ PostgresRow ] > , PSQLRowsDataSource )
37- case consumed( Result < String , Error > )
42+ case consumed( Result < StatementSummary , Error > )
3843 case asyncSequence( AsyncSequenceSource , PSQLRowsDataSource , onFinish: @Sendable ( ) -> ( ) )
3944 }
4045
@@ -52,9 +57,9 @@ final class PSQLRowStream: @unchecked Sendable {
5257 case . stream( let rowDescription, let dataSource) :
5358 self . rowDescription = rowDescription
5459 bufferState = . streaming( buffer: . init( ) , dataSource: dataSource)
55- case . noRows( . success( let commandTag ) ) :
60+ case . noRows( . success( let summary ) ) :
5661 self . rowDescription = [ ]
57- bufferState = . finished( buffer: . init( ) , commandTag : commandTag )
62+ bufferState = . finished( buffer: . init( ) , summary : summary )
5863 case . noRows( . failure( let error) ) :
5964 self . rowDescription = [ ]
6065 bufferState = . failure( error)
@@ -98,12 +103,12 @@ final class PSQLRowStream: @unchecked Sendable {
98103 self . downstreamState = . asyncSequence( source, dataSource, onFinish: onFinish)
99104 self . executeActionBasedOnYieldResult ( yieldResult, source: dataSource)
100105
101- case . finished( let buffer, let commandTag ) :
106+ case . finished( let buffer, let summary ) :
102107 _ = source. yield ( contentsOf: buffer)
103108 source. finish ( )
104109 onFinish ( )
105- self . downstreamState = . consumed( . success( commandTag ) )
106-
110+ self . downstreamState = . consumed( . success( summary ) )
111+
107112 case . failure( let error) :
108113 source. finish ( error)
109114 self . downstreamState = . consumed( . failure( error) )
@@ -190,12 +195,12 @@ final class PSQLRowStream: @unchecked Sendable {
190195 dataSource. request ( for: self )
191196 return promise. futureResult
192197
193- case . finished( let buffer, let commandTag ) :
198+ case . finished( let buffer, let summary ) :
194199 let rows = buffer. map {
195200 PostgresRow ( data: $0, lookupTable: self . lookupTable, columns: self . rowDescription)
196201 }
197202
198- self . downstreamState = . consumed( . success( commandTag ) )
203+ self . downstreamState = . consumed( . success( summary ) )
199204 return self . eventLoop. makeSucceededFuture ( rows)
200205
201206 case . failure( let error) :
@@ -247,8 +252,8 @@ final class PSQLRowStream: @unchecked Sendable {
247252 }
248253
249254 return promise. futureResult
250-
251- case . finished( let buffer, let commandTag ) :
255+
256+ case . finished( let buffer, let summary ) :
252257 do {
253258 for data in buffer {
254259 let row = PostgresRow (
@@ -259,7 +264,7 @@ final class PSQLRowStream: @unchecked Sendable {
259264 try onRow ( row)
260265 }
261266
262- self . downstreamState = . consumed( . success( commandTag ) )
267+ self . downstreamState = . consumed( . success( summary ) )
263268 return self . eventLoop. makeSucceededVoidFuture ( )
264269 } catch {
265270 self . downstreamState = . consumed( . failure( error) )
@@ -292,7 +297,7 @@ final class PSQLRowStream: @unchecked Sendable {
292297
293298 case . waitingForConsumer( . finished) , . waitingForConsumer( . failure) :
294299 preconditionFailure ( " How can new rows be received, if an end was already signalled? " )
295-
300+
296301 case . iteratingRows( let onRow, let promise, let dataSource) :
297302 do {
298303 for data in newRows {
@@ -347,25 +352,25 @@ final class PSQLRowStream: @unchecked Sendable {
347352 private func receiveEnd( _ commandTag: String ) {
348353 switch self . downstreamState {
349354 case . waitingForConsumer( . streaming( buffer: let buffer, _) ) :
350- self . downstreamState = . waitingForConsumer( . finished( buffer: buffer, commandTag : commandTag) )
351-
352- case . waitingForConsumer( . finished) , . waitingForConsumer( . failure) :
355+ self . downstreamState = . waitingForConsumer( . finished( buffer: buffer, summary : . tag ( commandTag) ) )
356+
357+ case . waitingForConsumer( . finished) , . waitingForConsumer( . failure) , . consumed ( . success ( . emptyResponse ) ) :
353358 preconditionFailure ( " How can we get another end, if an end was already signalled? " )
354359
355360 case . iteratingRows( _, let promise, _) :
356- self . downstreamState = . consumed( . success( commandTag) )
361+ self . downstreamState = . consumed( . success( . tag ( commandTag) ) )
357362 promise. succeed ( ( ) )
358363
359364 case . waitingForAll( let rows, let promise, _) :
360- self . downstreamState = . consumed( . success( commandTag) )
365+ self . downstreamState = . consumed( . success( . tag ( commandTag) ) )
361366 promise. succeed ( rows)
362367
363368 case . asyncSequence( let source, _, let onFinish) :
364- self . downstreamState = . consumed( . success( commandTag) )
369+ self . downstreamState = . consumed( . success( . tag ( commandTag) ) )
365370 source. finish ( )
366371 onFinish ( )
367372
368- case . consumed:
373+ case . consumed( . success ( . tag ) ) , . consumed ( . failure ) :
369374 break
370375 }
371376 }
@@ -375,7 +380,7 @@ final class PSQLRowStream: @unchecked Sendable {
375380 case . waitingForConsumer( . streaming) :
376381 self . downstreamState = . waitingForConsumer( . failure( error) )
377382
378- case . waitingForConsumer( . finished) , . waitingForConsumer( . failure) :
383+ case . waitingForConsumer( . finished) , . waitingForConsumer( . failure) , . consumed ( . success ( . emptyResponse ) ) :
379384 preconditionFailure ( " How can we get another end, if an end was already signalled? " )
380385
381386 case . iteratingRows( _, let promise, _) :
@@ -391,7 +396,7 @@ final class PSQLRowStream: @unchecked Sendable {
391396 consumer. finish ( error)
392397 onFinish ( )
393398
394- case . consumed:
399+ case . consumed( . success ( . tag ) ) , . consumed ( . failure ) :
395400 break
396401 }
397402 }
@@ -413,10 +418,15 @@ final class PSQLRowStream: @unchecked Sendable {
413418 }
414419
415420 var commandTag : String {
416- guard case . consumed( . success( let commandTag ) ) = self . downstreamState else {
421+ guard case . consumed( . success( let consumed ) ) = self . downstreamState else {
417422 preconditionFailure ( " commandTag may only be called if all rows have been consumed " )
418423 }
419- return commandTag
424+ switch consumed {
425+ case . tag( let tag) :
426+ return tag
427+ case . emptyResponse:
428+ return " "
429+ }
420430 }
421431}
422432
0 commit comments