7
7
- test_commit: Make a transaction and commit.
8
8
- test_rollback: Make a transaction and rollback.
9
9
- test_invalid_connection_string: Check if initializing with an invalid connection string raises an exception.
10
- - test_connection_pooling_speed: Test connection pooling speed.
11
- - test_connection_pooling_basic: Test basic connection pooling functionality.
12
10
- test_autocommit_default: Check if autocommit is False by default.
13
11
- test_autocommit_setter: Test setting autocommit mode and its effect on transactions.
14
12
- test_set_autocommit: Test the setautocommit method.
@@ -288,131 +286,6 @@ def test_connection_close(conn_str):
288
286
# Check if the database connection can be closed
289
287
temp_conn .close ()
290
288
291
- def test_connection_pooling_speed (conn_str ):
292
- """Test that connection pooling provides performance benefits over multiple iterations."""
293
- import statistics
294
-
295
- # Warm up to eliminate cold start effects
296
- for _ in range (3 ):
297
- conn = connect (conn_str )
298
- conn .close ()
299
-
300
- # Disable pooling first
301
- pooling (enabled = False )
302
-
303
- # Test without pooling (multiple times)
304
- no_pool_times = []
305
- for _ in range (10 ):
306
- start = time .perf_counter ()
307
- conn = connect (conn_str )
308
- conn .close ()
309
- end = time .perf_counter ()
310
- no_pool_times .append (end - start )
311
-
312
- # Enable pooling
313
- pooling (max_size = 5 , idle_timeout = 30 )
314
-
315
- # Test with pooling (multiple times)
316
- pool_times = []
317
- for _ in range (10 ):
318
- start = time .perf_counter ()
319
- conn = connect (conn_str )
320
- conn .close ()
321
- end = time .perf_counter ()
322
- pool_times .append (end - start )
323
-
324
- # Use median times to reduce impact of outliers
325
- median_no_pool = statistics .median (no_pool_times )
326
- median_pool = statistics .median (pool_times )
327
-
328
- # Allow for some variance - pooling should be at least 30% faster on average
329
- improvement_threshold = 0.7 # Pool should be <= 70% of no-pool time
330
-
331
- print (f"No pool median: { median_no_pool :.6f} s" )
332
- print (f"Pool median: { median_pool :.6f} s" )
333
- print (f"Improvement ratio: { median_pool / median_no_pool :.2f} " )
334
-
335
- # Clean up - disable pooling for other tests
336
- pooling (enabled = False )
337
-
338
- assert median_pool <= median_no_pool * improvement_threshold , \
339
- f"Expected pooling to be at least 30% faster. No-pool: { median_no_pool :.6f} s, Pool: { median_pool :.6f} s"
340
-
341
- def test_connection_pooling_reuse_spid (conn_str ):
342
- """Test that connections are actually reused from the pool"""
343
- # Enable pooling
344
- pooling (max_size = 1 , idle_timeout = 30 )
345
-
346
- # Create and close a connection
347
- conn1 = connect (conn_str )
348
- cursor1 = conn1 .cursor ()
349
- cursor1 .execute ("SELECT @@SPID" ) # Get SQL Server process ID
350
- spid1 = cursor1 .fetchone ()[0 ]
351
- conn1 .close ()
352
-
353
- # Get another connection - should be the same one from pool
354
- conn2 = connect (conn_str )
355
- cursor2 = conn2 .cursor ()
356
- cursor2 .execute ("SELECT @@SPID" )
357
- spid2 = cursor2 .fetchone ()[0 ]
358
- conn2 .close ()
359
-
360
- # The SPID should be the same, indicating connection reuse
361
- assert spid1 == spid2 , "Connections not reused - different SPIDs"
362
-
363
- # Clean up
364
-
365
- def test_pool_exhaustion_max_size_1 (conn_str ):
366
- """Test pool exhaustion when max_size=1 and multiple concurrent connections are requested."""
367
- pooling (max_size = 1 , idle_timeout = 30 )
368
- conn1 = connect (conn_str )
369
- results = []
370
-
371
- def try_connect ():
372
- try :
373
- conn2 = connect (conn_str )
374
- results .append ("success" )
375
- conn2 .close ()
376
- except Exception as e :
377
- results .append (str (e ))
378
-
379
- # Start a thread that will attempt to get a second connection while the first is open
380
- t = threading .Thread (target = try_connect )
381
- t .start ()
382
- t .join (timeout = 2 )
383
- conn1 .close ()
384
-
385
- # Depending on implementation, either blocks, raises, or times out
386
- assert results , "Second connection attempt did not complete"
387
- # If pool blocks, the thread may not finish until conn1 is closed, so allow both outcomes
388
- assert results [0 ] == "success" or "pool" in results [0 ].lower () or "timeout" in results [0 ].lower (), \
389
- f"Unexpected pool exhaustion result: { results [0 ]} "
390
- pooling (enabled = False )
391
-
392
- def test_pool_idle_timeout_removes_connections (conn_str ):
393
- """Test that idle_timeout removes connections from the pool after the timeout."""
394
- pooling (max_size = 2 , idle_timeout = 2 )
395
- conn1 = connect (conn_str )
396
- spid_list = []
397
- cursor1 = conn1 .cursor ()
398
- cursor1 .execute ("SELECT @@SPID" )
399
- spid1 = cursor1 .fetchone ()[0 ]
400
- spid_list .append (spid1 )
401
- conn1 .close ()
402
-
403
- # Wait for longer than idle_timeout
404
- time .sleep (3 )
405
-
406
- # Get a new connection, which should not reuse the previous SPID
407
- conn2 = connect (conn_str )
408
- cursor2 = conn2 .cursor ()
409
- cursor2 .execute ("SELECT @@SPID" )
410
- spid2 = cursor2 .fetchone ()[0 ]
411
- spid_list .append (spid2 )
412
- conn2 .close ()
413
-
414
- assert spid1 != spid2 , "Idle timeout did not remove connection from pool"
415
-
416
289
def test_connection_timeout_invalid_password (conn_str ):
417
290
"""Test that connecting with an invalid password raises an exception quickly (timeout)."""
418
291
# Modify the connection string to use an invalid password
@@ -429,6 +302,7 @@ def test_connection_timeout_invalid_password(conn_str):
429
302
# Should fail quickly (within 10 seconds)
430
303
assert elapsed < 10 , f"Connection with invalid password took too long: { elapsed :.2f} s"
431
304
305
+
432
306
def test_connection_timeout_invalid_host (conn_str ):
433
307
"""Test that connecting to an invalid host fails with a timeout."""
434
308
# Replace server/host with an invalid one
@@ -449,106 +323,6 @@ def test_connection_timeout_invalid_host(conn_str):
449
323
# If it takes too long, it may indicate a misconfiguration or network issue.
450
324
assert elapsed < 30 , f"Connection to invalid host took too long: { elapsed :.2f} s"
451
325
452
- def test_pool_removes_invalid_connections (conn_str ):
453
- """Test that the pool removes connections that become invalid (simulate by closing underlying connection)."""
454
- pooling (max_size = 1 , idle_timeout = 30 )
455
- conn = connect (conn_str )
456
- cursor = conn .cursor ()
457
- cursor .execute ("SELECT 1" )
458
- # Simulate invalidation by forcibly closing the connection at the driver level
459
- try :
460
- # Try to access a private attribute or method to forcibly close the underlying connection
461
- # This is implementation-specific; if not possible, skip
462
- if hasattr (conn , "_conn" ) and hasattr (conn ._conn , "close" ):
463
- conn ._conn .close ()
464
- else :
465
- pytest .skip ("Cannot forcibly close underlying connection for this driver" )
466
- except Exception :
467
- pass
468
- # Safely close the connection, ignoring errors due to forced invalidation
469
- try :
470
- conn .close ()
471
- except RuntimeError as e :
472
- if "not initialized" not in str (e ):
473
- raise
474
- # Now, get a new connection from the pool and ensure it works
475
- new_conn = connect (conn_str )
476
- new_cursor = new_conn .cursor ()
477
- try :
478
- new_cursor .execute ("SELECT 1" )
479
- result = new_cursor .fetchone ()
480
- assert result is not None and result [0 ] == 1 , "Pool did not remove invalid connection"
481
- finally :
482
- new_conn .close ()
483
- pooling (enabled = False )
484
-
485
- def test_pool_recovery_after_failed_connection (conn_str ):
486
- """Test that the pool recovers after a failed connection attempt."""
487
- pooling (max_size = 1 , idle_timeout = 30 )
488
- # First, try to connect with a bad password (should fail)
489
- if "Pwd=" in conn_str :
490
- bad_conn_str = conn_str .replace ("Pwd=" , "Pwd=wrongpassword" )
491
- elif "Password=" in conn_str :
492
- bad_conn_str = conn_str .replace ("Password=" , "Password=wrongpassword" )
493
- else :
494
- pytest .skip ("No password found in connection string to modify" )
495
- with pytest .raises (Exception ):
496
- connect (bad_conn_str )
497
- # Now, connect with the correct string and ensure it works
498
- conn = connect (conn_str )
499
- cursor = conn .cursor ()
500
- cursor .execute ("SELECT 1" )
501
- result = cursor .fetchone ()
502
- assert result is not None and result [0 ] == 1 , "Pool did not recover after failed connection"
503
- conn .close ()
504
- pooling (enabled = False )
505
-
506
- def test_pool_capacity_limit_and_overflow (conn_str ):
507
- """Test that pool does not grow beyond max_size and handles overflow gracefully."""
508
- pooling (max_size = 2 , idle_timeout = 30 )
509
- conns = []
510
- try :
511
- # Open up to max_size connections
512
- conns .append (connect (conn_str ))
513
- conns .append (connect (conn_str ))
514
- # Try to open a third connection, which should fail or block
515
- overflow_result = []
516
- def try_overflow ():
517
- try :
518
- c = connect (conn_str )
519
- overflow_result .append ("success" )
520
- c .close ()
521
- except Exception as e :
522
- overflow_result .append (str (e ))
523
- t = threading .Thread (target = try_overflow )
524
- t .start ()
525
- t .join (timeout = 2 )
526
- assert overflow_result , "Overflow connection attempt did not complete"
527
- # Accept either block, error, or success if pool implementation allows overflow
528
- assert overflow_result [0 ] == "success" or "pool" in overflow_result [0 ].lower () or "timeout" in overflow_result [0 ].lower (), \
529
- f"Unexpected pool overflow result: { overflow_result [0 ]} "
530
- finally :
531
- for c in conns :
532
- c .close ()
533
- pooling (enabled = False )
534
-
535
- def test_connection_pooling_basic (conn_str ):
536
- # Enable pooling with small pool size
537
- pooling (max_size = 2 , idle_timeout = 5 )
538
- conn1 = connect (conn_str )
539
- conn2 = connect (conn_str )
540
- assert conn1 is not None
541
- assert conn2 is not None
542
- try :
543
- conn3 = connect (conn_str )
544
- assert conn3 is not None , "Third connection failed — pooling is not working or limit is too strict"
545
- conn3 .close ()
546
- except Exception as e :
547
- print (f"Expected: Could not open third connection due to max_size=2: { e } " )
548
-
549
- conn1 .close ()
550
- conn2 .close ()
551
-
552
326
def test_context_manager_commit (conn_str ):
553
327
"""Test that context manager closes connection on normal exit"""
554
328
# Create a permanent table for testing across connections
0 commit comments