@@ -181,4 +181,100 @@ class ExtendedQueryStateMachineTests: XCTestCase {
181181 XCTAssertEqual ( state. commandCompletedReceived ( " SELECT 4 " ) , . wait)
182182 XCTAssertEqual ( state. readyForQueryReceived ( . idle) , . fireEventReadyForQuery)
183183 }
184+
185+ func testCancelQueryAfterServerError( ) {
186+ var state = ConnectionStateMachine . readyForQuery ( )
187+
188+ let logger = Logger . psqlTest
189+ let promise = EmbeddedEventLoop ( ) . makePromise ( of: PSQLRowStream . self)
190+ promise. fail ( PSQLError . uncleanShutdown) // we don't care about the error at all.
191+ let query : PostgresQuery = " SELECT version() "
192+ let queryContext = ExtendedQueryContext ( query: query, logger: logger, promise: promise)
193+
194+ XCTAssertEqual ( state. enqueue ( task: . extendedQuery( queryContext) ) , . sendParseDescribeBindExecuteSync( query) )
195+ XCTAssertEqual ( state. parseCompleteReceived ( ) , . wait)
196+ XCTAssertEqual ( state. parameterDescriptionReceived ( . init( dataTypes: [ . int8] ) ) , . wait)
197+
198+ // We need to ensure that even though the row description from the wire says that we
199+ // will receive data in `.text` format, we will actually receive it in binary format,
200+ // since we requested it in binary with our bind message.
201+ let input : [ RowDescription . Column ] = [
202+ . init( name: " version " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . text, dataTypeSize: - 1 , dataTypeModifier: - 1 , format: . text)
203+ ]
204+ let expected : [ RowDescription . Column ] = input. map {
205+ . init( name: $0. name, tableOID: $0. tableOID, columnAttributeNumber: $0. columnAttributeNumber, dataType: $0. dataType,
206+ dataTypeSize: $0. dataTypeSize, dataTypeModifier: $0. dataTypeModifier, format: . binary)
207+ }
208+
209+ XCTAssertEqual ( state. rowDescriptionReceived ( . init( columns: input) ) , . wait)
210+ XCTAssertEqual ( state. bindCompleteReceived ( ) , . succeedQuery( queryContext, columns: expected) )
211+ let dataRows1 : [ DataRow ] = [
212+ [ ByteBuffer ( string: " test1 " ) ] ,
213+ [ ByteBuffer ( string: " test2 " ) ] ,
214+ [ ByteBuffer ( string: " test3 " ) ]
215+ ]
216+ for row in dataRows1 {
217+ XCTAssertEqual ( state. dataRowReceived ( row) , . wait)
218+ }
219+ XCTAssertEqual ( state. channelReadComplete ( ) , . forwardRows( dataRows1) )
220+ XCTAssertEqual ( state. readEventCaught ( ) , . wait)
221+ XCTAssertEqual ( state. requestQueryRows ( ) , . read)
222+ let dataRows2 : [ DataRow ] = [
223+ [ ByteBuffer ( string: " test4 " ) ] ,
224+ [ ByteBuffer ( string: " test5 " ) ] ,
225+ [ ByteBuffer ( string: " test6 " ) ]
226+ ]
227+ for row in dataRows2 {
228+ XCTAssertEqual ( state. dataRowReceived ( row) , . wait)
229+ }
230+ let serverError = PostgresBackendMessage . ErrorResponse ( fields: [ . severity: " Error " , . sqlState: " 123 " ] )
231+ XCTAssertEqual ( state. errorReceived ( serverError) , . forwardStreamError( . server( serverError) , read: false , cleanupContext: . none) )
232+
233+ XCTAssertEqual ( state. channelReadComplete ( ) , . wait)
234+ XCTAssertEqual ( state. readEventCaught ( ) , . read)
235+
236+ XCTAssertEqual ( state. readyForQueryReceived ( . idle) , . fireEventReadyForQuery)
237+ }
238+
239+ func testQueryErrorDoesNotKillConnection( ) {
240+ var state = ConnectionStateMachine . readyForQuery ( )
241+
242+ let logger = Logger . psqlTest
243+ let promise = EmbeddedEventLoop ( ) . makePromise ( of: PSQLRowStream . self)
244+ promise. fail ( PSQLError . uncleanShutdown) // we don't care about the error at all.
245+ let query : PostgresQuery = " SELECT version() "
246+ let queryContext = ExtendedQueryContext ( query: query, logger: logger, promise: promise)
247+
248+ XCTAssertEqual ( state. enqueue ( task: . extendedQuery( queryContext) ) , . sendParseDescribeBindExecuteSync( query) )
249+ XCTAssertEqual ( state. parseCompleteReceived ( ) , . wait)
250+ XCTAssertEqual ( state. parameterDescriptionReceived ( . init( dataTypes: [ . int8] ) ) , . wait)
251+
252+ let serverError = PostgresBackendMessage . ErrorResponse ( fields: [ . severity: " Error " , . sqlState: " 123 " ] )
253+ XCTAssertEqual (
254+ state. errorReceived ( serverError) , . failQuery( queryContext, with: . server( serverError) , cleanupContext: . none)
255+ )
256+
257+ XCTAssertEqual ( state. readyForQueryReceived ( . idle) , . fireEventReadyForQuery)
258+ }
259+
260+ func testQueryErrorAfterCancelDoesNotKillConnection( ) {
261+ var state = ConnectionStateMachine . readyForQuery ( )
262+
263+ let logger = Logger . psqlTest
264+ let promise = EmbeddedEventLoop ( ) . makePromise ( of: PSQLRowStream . self)
265+ promise. fail ( PSQLError . uncleanShutdown) // we don't care about the error at all.
266+ let query : PostgresQuery = " SELECT version() "
267+ let queryContext = ExtendedQueryContext ( query: query, logger: logger, promise: promise)
268+
269+ XCTAssertEqual ( state. enqueue ( task: . extendedQuery( queryContext) ) , . sendParseDescribeBindExecuteSync( query) )
270+ XCTAssertEqual ( state. parseCompleteReceived ( ) , . wait)
271+ XCTAssertEqual ( state. parameterDescriptionReceived ( . init( dataTypes: [ . int8] ) ) , . wait)
272+ XCTAssertEqual ( state. cancelQueryStream ( ) , . failQuery( queryContext, with: . queryCancelled, cleanupContext: . none) )
273+
274+ let serverError = PostgresBackendMessage . ErrorResponse ( fields: [ . severity: " Error " , . sqlState: " 123 " ] )
275+ XCTAssertEqual ( state. errorReceived ( serverError) , . wait)
276+
277+ XCTAssertEqual ( state. readyForQueryReceived ( . idle) , . fireEventReadyForQuery)
278+ }
279+
184280}
0 commit comments