77- test_commit: Make a transaction and commit.
88- test_rollback: Make a transaction and rollback.
99- test_invalid_connection_string: Check if initializing with an invalid connection string raises an exception.
10- - test_connection_pooling_speed: Test connection pooling speed.
11- - test_connection_pooling_basic: Test basic connection pooling functionality.
1210- test_autocommit_default: Check if autocommit is False by default.
1311- test_autocommit_setter: Test setting autocommit mode and its effect on transactions.
1412- test_set_autocommit: Test the setautocommit method.
@@ -288,131 +286,6 @@ def test_connection_close(conn_str):
288286 # Check if the database connection can be closed
289287 temp_conn .close ()
290288
291- def test_connection_pooling_speed (conn_str ):
292- """Test that connection pooling provides performance benefits over multiple iterations."""
293- import statistics
294-
295- # Warm up to eliminate cold start effects
296- for _ in range (3 ):
297- conn = connect (conn_str )
298- conn .close ()
299-
300- # Disable pooling first
301- pooling (enabled = False )
302-
303- # Test without pooling (multiple times)
304- no_pool_times = []
305- for _ in range (10 ):
306- start = time .perf_counter ()
307- conn = connect (conn_str )
308- conn .close ()
309- end = time .perf_counter ()
310- no_pool_times .append (end - start )
311-
312- # Enable pooling
313- pooling (max_size = 5 , idle_timeout = 30 )
314-
315- # Test with pooling (multiple times)
316- pool_times = []
317- for _ in range (10 ):
318- start = time .perf_counter ()
319- conn = connect (conn_str )
320- conn .close ()
321- end = time .perf_counter ()
322- pool_times .append (end - start )
323-
324- # Use median times to reduce impact of outliers
325- median_no_pool = statistics .median (no_pool_times )
326- median_pool = statistics .median (pool_times )
327-
328- # Allow for some variance - pooling should be at least 30% faster on average
329- improvement_threshold = 0.7 # Pool should be <= 70% of no-pool time
330-
331- print (f"No pool median: { median_no_pool :.6f} s" )
332- print (f"Pool median: { median_pool :.6f} s" )
333- print (f"Improvement ratio: { median_pool / median_no_pool :.2f} " )
334-
335- # Clean up - disable pooling for other tests
336- pooling (enabled = False )
337-
338- assert median_pool <= median_no_pool * improvement_threshold , \
339- f"Expected pooling to be at least 30% faster. No-pool: { median_no_pool :.6f} s, Pool: { median_pool :.6f} s"
340-
341- def test_connection_pooling_reuse_spid (conn_str ):
342- """Test that connections are actually reused from the pool"""
343- # Enable pooling
344- pooling (max_size = 1 , idle_timeout = 30 )
345-
346- # Create and close a connection
347- conn1 = connect (conn_str )
348- cursor1 = conn1 .cursor ()
349- cursor1 .execute ("SELECT @@SPID" ) # Get SQL Server process ID
350- spid1 = cursor1 .fetchone ()[0 ]
351- conn1 .close ()
352-
353- # Get another connection - should be the same one from pool
354- conn2 = connect (conn_str )
355- cursor2 = conn2 .cursor ()
356- cursor2 .execute ("SELECT @@SPID" )
357- spid2 = cursor2 .fetchone ()[0 ]
358- conn2 .close ()
359-
360- # The SPID should be the same, indicating connection reuse
361- assert spid1 == spid2 , "Connections not reused - different SPIDs"
362-
363- # Clean up
364-
365- def test_pool_exhaustion_max_size_1 (conn_str ):
366- """Test pool exhaustion when max_size=1 and multiple concurrent connections are requested."""
367- pooling (max_size = 1 , idle_timeout = 30 )
368- conn1 = connect (conn_str )
369- results = []
370-
371- def try_connect ():
372- try :
373- conn2 = connect (conn_str )
374- results .append ("success" )
375- conn2 .close ()
376- except Exception as e :
377- results .append (str (e ))
378-
379- # Start a thread that will attempt to get a second connection while the first is open
380- t = threading .Thread (target = try_connect )
381- t .start ()
382- t .join (timeout = 2 )
383- conn1 .close ()
384-
385- # Depending on implementation, either blocks, raises, or times out
386- assert results , "Second connection attempt did not complete"
387- # If pool blocks, the thread may not finish until conn1 is closed, so allow both outcomes
388- assert results [0 ] == "success" or "pool" in results [0 ].lower () or "timeout" in results [0 ].lower (), \
389- f"Unexpected pool exhaustion result: { results [0 ]} "
390- pooling (enabled = False )
391-
392- def test_pool_idle_timeout_removes_connections (conn_str ):
393- """Test that idle_timeout removes connections from the pool after the timeout."""
394- pooling (max_size = 2 , idle_timeout = 2 )
395- conn1 = connect (conn_str )
396- spid_list = []
397- cursor1 = conn1 .cursor ()
398- cursor1 .execute ("SELECT @@SPID" )
399- spid1 = cursor1 .fetchone ()[0 ]
400- spid_list .append (spid1 )
401- conn1 .close ()
402-
403- # Wait for longer than idle_timeout
404- time .sleep (3 )
405-
406- # Get a new connection, which should not reuse the previous SPID
407- conn2 = connect (conn_str )
408- cursor2 = conn2 .cursor ()
409- cursor2 .execute ("SELECT @@SPID" )
410- spid2 = cursor2 .fetchone ()[0 ]
411- spid_list .append (spid2 )
412- conn2 .close ()
413-
414- assert spid1 != spid2 , "Idle timeout did not remove connection from pool"
415-
416289def test_connection_timeout_invalid_password (conn_str ):
417290 """Test that connecting with an invalid password raises an exception quickly (timeout)."""
418291 # Modify the connection string to use an invalid password
@@ -429,6 +302,7 @@ def test_connection_timeout_invalid_password(conn_str):
429302 # Should fail quickly (within 10 seconds)
430303 assert elapsed < 10 , f"Connection with invalid password took too long: { elapsed :.2f} s"
431304
305+
432306def test_connection_timeout_invalid_host (conn_str ):
433307 """Test that connecting to an invalid host fails with a timeout."""
434308 # Replace server/host with an invalid one
@@ -449,106 +323,6 @@ def test_connection_timeout_invalid_host(conn_str):
449323 # If it takes too long, it may indicate a misconfiguration or network issue.
450324 assert elapsed < 30 , f"Connection to invalid host took too long: { elapsed :.2f} s"
451325
452- def test_pool_removes_invalid_connections (conn_str ):
453- """Test that the pool removes connections that become invalid (simulate by closing underlying connection)."""
454- pooling (max_size = 1 , idle_timeout = 30 )
455- conn = connect (conn_str )
456- cursor = conn .cursor ()
457- cursor .execute ("SELECT 1" )
458- # Simulate invalidation by forcibly closing the connection at the driver level
459- try :
460- # Try to access a private attribute or method to forcibly close the underlying connection
461- # This is implementation-specific; if not possible, skip
462- if hasattr (conn , "_conn" ) and hasattr (conn ._conn , "close" ):
463- conn ._conn .close ()
464- else :
465- pytest .skip ("Cannot forcibly close underlying connection for this driver" )
466- except Exception :
467- pass
468- # Safely close the connection, ignoring errors due to forced invalidation
469- try :
470- conn .close ()
471- except RuntimeError as e :
472- if "not initialized" not in str (e ):
473- raise
474- # Now, get a new connection from the pool and ensure it works
475- new_conn = connect (conn_str )
476- new_cursor = new_conn .cursor ()
477- try :
478- new_cursor .execute ("SELECT 1" )
479- result = new_cursor .fetchone ()
480- assert result is not None and result [0 ] == 1 , "Pool did not remove invalid connection"
481- finally :
482- new_conn .close ()
483- pooling (enabled = False )
484-
485- def test_pool_recovery_after_failed_connection (conn_str ):
486- """Test that the pool recovers after a failed connection attempt."""
487- pooling (max_size = 1 , idle_timeout = 30 )
488- # First, try to connect with a bad password (should fail)
489- if "Pwd=" in conn_str :
490- bad_conn_str = conn_str .replace ("Pwd=" , "Pwd=wrongpassword" )
491- elif "Password=" in conn_str :
492- bad_conn_str = conn_str .replace ("Password=" , "Password=wrongpassword" )
493- else :
494- pytest .skip ("No password found in connection string to modify" )
495- with pytest .raises (Exception ):
496- connect (bad_conn_str )
497- # Now, connect with the correct string and ensure it works
498- conn = connect (conn_str )
499- cursor = conn .cursor ()
500- cursor .execute ("SELECT 1" )
501- result = cursor .fetchone ()
502- assert result is not None and result [0 ] == 1 , "Pool did not recover after failed connection"
503- conn .close ()
504- pooling (enabled = False )
505-
506- def test_pool_capacity_limit_and_overflow (conn_str ):
507- """Test that pool does not grow beyond max_size and handles overflow gracefully."""
508- pooling (max_size = 2 , idle_timeout = 30 )
509- conns = []
510- try :
511- # Open up to max_size connections
512- conns .append (connect (conn_str ))
513- conns .append (connect (conn_str ))
514- # Try to open a third connection, which should fail or block
515- overflow_result = []
516- def try_overflow ():
517- try :
518- c = connect (conn_str )
519- overflow_result .append ("success" )
520- c .close ()
521- except Exception as e :
522- overflow_result .append (str (e ))
523- t = threading .Thread (target = try_overflow )
524- t .start ()
525- t .join (timeout = 2 )
526- assert overflow_result , "Overflow connection attempt did not complete"
527- # Accept either block, error, or success if pool implementation allows overflow
528- assert overflow_result [0 ] == "success" or "pool" in overflow_result [0 ].lower () or "timeout" in overflow_result [0 ].lower (), \
529- f"Unexpected pool overflow result: { overflow_result [0 ]} "
530- finally :
531- for c in conns :
532- c .close ()
533- pooling (enabled = False )
534-
535- def test_connection_pooling_basic (conn_str ):
536- # Enable pooling with small pool size
537- pooling (max_size = 2 , idle_timeout = 5 )
538- conn1 = connect (conn_str )
539- conn2 = connect (conn_str )
540- assert conn1 is not None
541- assert conn2 is not None
542- try :
543- conn3 = connect (conn_str )
544- assert conn3 is not None , "Third connection failed — pooling is not working or limit is too strict"
545- conn3 .close ()
546- except Exception as e :
547- print (f"Expected: Could not open third connection due to max_size=2: { e } " )
548-
549- conn1 .close ()
550- conn2 .close ()
551-
552326def test_context_manager_commit (conn_str ):
553327 """Test that context manager closes connection on normal exit"""
554328 # Create a permanent table for testing across connections
0 commit comments