|
9 | 9 | """ |
10 | 10 |
|
11 | 11 | import pytest |
12 | | -from datetime import datetime, date, time |
| 12 | +from datetime import datetime, date, time, timedelta, timezone |
13 | 13 | import time as time_module |
14 | 14 | import decimal |
15 | 15 | from contextlib import closing |
@@ -6472,7 +6472,7 @@ def test_only_null_and_empty_binary(cursor, db_connection): |
6472 | 6472 | finally: |
6473 | 6473 | drop_table_if_exists(cursor, "#pytest_null_empty_binary") |
6474 | 6474 | db_connection.commit() |
6475 | | - |
| 6475 | + |
6476 | 6476 | # ---------------------- VARCHAR(MAX) ---------------------- |
6477 | 6477 |
|
6478 | 6478 | def test_varcharmax_short_fetch(cursor, db_connection): |
@@ -7560,6 +7560,169 @@ def test_decimal_separator_calculations(cursor, db_connection): |
7560 | 7560 | cursor.execute("DROP TABLE IF EXISTS #pytest_decimal_calc_test") |
7561 | 7561 | db_connection.commit() |
7562 | 7562 |
|
| 7563 | +def test_datetimeoffset_read_write(cursor, db_connection): |
| 7564 | + """Test reading and writing timezone-aware DATETIMEOFFSET values.""" |
| 7565 | + try: |
| 7566 | + test_cases = [ |
| 7567 | + # Valid timezone-aware datetimes |
| 7568 | + datetime(2023, 10, 26, 10, 30, 0, tzinfo=timezone(timedelta(hours=5, minutes=30))), |
| 7569 | + datetime(2023, 10, 27, 15, 45, 10, 123456, tzinfo=timezone(timedelta(hours=-8))), |
| 7570 | + datetime(2023, 10, 28, 20, 0, 5, 987654, tzinfo=timezone.utc) |
| 7571 | + ] |
| 7572 | + |
| 7573 | + cursor.execute("CREATE TABLE #pytest_datetimeoffset_read_write (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") |
| 7574 | + db_connection.commit() |
| 7575 | + |
| 7576 | + insert_stmt = "INSERT INTO #pytest_datetimeoffset_read_write (id, dto_column) VALUES (?, ?);" |
| 7577 | + for i, dt in enumerate(test_cases): |
| 7578 | + cursor.execute(insert_stmt, i, dt) |
| 7579 | + db_connection.commit() |
| 7580 | + |
| 7581 | + cursor.execute("SELECT id, dto_column FROM #pytest_datetimeoffset_read_write ORDER BY id;") |
| 7582 | + for i, dt in enumerate(test_cases): |
| 7583 | + row = cursor.fetchone() |
| 7584 | + assert row is not None |
| 7585 | + fetched_id, fetched_dt = row |
| 7586 | + assert fetched_dt.tzinfo is not None |
| 7587 | + expected_utc = dt.astimezone(timezone.utc) |
| 7588 | + fetched_utc = fetched_dt.astimezone(timezone.utc) |
| 7589 | + # Ignore sub-microsecond differences |
| 7590 | + expected_utc = expected_utc.replace(microsecond=int(expected_utc.microsecond / 1000) * 1000) |
| 7591 | + fetched_utc = fetched_utc.replace(microsecond=int(fetched_utc.microsecond / 1000) * 1000) |
| 7592 | + assert fetched_utc == expected_utc |
| 7593 | + finally: |
| 7594 | + cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_read_write;") |
| 7595 | + db_connection.commit() |
| 7596 | + |
| 7597 | +def test_datetimeoffset_max_min_offsets(cursor, db_connection): |
| 7598 | + """ |
| 7599 | + Test inserting and retrieving DATETIMEOFFSET with maximum and minimum allowed offsets (+14:00 and -14:00). |
| 7600 | + Uses fetchone() for retrieval. |
| 7601 | + """ |
| 7602 | + try: |
| 7603 | + cursor.execute("CREATE TABLE #pytest_datetimeoffset_read_write (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") |
| 7604 | + db_connection.commit() |
| 7605 | + |
| 7606 | + test_cases = [ |
| 7607 | + (1, datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone(timedelta(hours=14)))), # max offset |
| 7608 | + (2, datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone(timedelta(hours=-14)))), # min offset |
| 7609 | + ] |
| 7610 | + |
| 7611 | + insert_stmt = "INSERT INTO #pytest_datetimeoffset_read_write (id, dto_column) VALUES (?, ?);" |
| 7612 | + for row_id, dt in test_cases: |
| 7613 | + cursor.execute(insert_stmt, row_id, dt) |
| 7614 | + db_connection.commit() |
| 7615 | + |
| 7616 | + cursor.execute("SELECT id, dto_column FROM #pytest_datetimeoffset_read_write ORDER BY id;") |
| 7617 | + |
| 7618 | + for expected_id, expected_dt in test_cases: |
| 7619 | + row = cursor.fetchone() |
| 7620 | + assert row is not None, f"No row fetched for id {expected_id}." |
| 7621 | + fetched_id, fetched_dt = row |
| 7622 | + |
| 7623 | + assert fetched_id == expected_id, f"ID mismatch: expected {expected_id}, got {fetched_id}" |
| 7624 | + assert fetched_dt.tzinfo is not None, f"Fetched datetime object is naive for id {fetched_id}" |
| 7625 | + |
| 7626 | + # Compare in UTC to avoid offset differences |
| 7627 | + expected_utc = expected_dt.astimezone(timezone.utc).replace(tzinfo=None) |
| 7628 | + fetched_utc = fetched_dt.astimezone(timezone.utc).replace(tzinfo=None) |
| 7629 | + assert fetched_utc == expected_utc, ( |
| 7630 | + f"Value mismatch for id {expected_id}: expected UTC {expected_utc}, got {fetched_utc}" |
| 7631 | + ) |
| 7632 | + |
| 7633 | + finally: |
| 7634 | + cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_read_write;") |
| 7635 | + db_connection.commit() |
| 7636 | + |
| 7637 | +def test_datetimeoffset_invalid_offsets(cursor, db_connection): |
| 7638 | + """Verify driver rejects offsets beyond ±14 hours.""" |
| 7639 | + try: |
| 7640 | + cursor.execute("CREATE TABLE #pytest_datetimeoffset_invalid_offsets (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") |
| 7641 | + db_connection.commit() |
| 7642 | + |
| 7643 | + with pytest.raises(Exception): |
| 7644 | + cursor.execute("INSERT INTO #pytest_datetimeoffset_invalid_offsets (id, dto_column) VALUES (?, ?);", |
| 7645 | + 1, datetime(2025, 1, 1, 12, 0, tzinfo=timezone(timedelta(hours=15)))) |
| 7646 | + |
| 7647 | + with pytest.raises(Exception): |
| 7648 | + cursor.execute("INSERT INTO #pytest_datetimeoffset_invalid_offsets (id, dto_column) VALUES (?, ?);", |
| 7649 | + 2, datetime(2025, 1, 1, 12, 0, tzinfo=timezone(timedelta(hours=-15)))) |
| 7650 | + finally: |
| 7651 | + cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_invalid_offsets;") |
| 7652 | + db_connection.commit() |
| 7653 | + |
| 7654 | +def test_datetimeoffset_dst_transitions(cursor, db_connection): |
| 7655 | + """ |
| 7656 | + Test inserting and retrieving DATETIMEOFFSET values around DST transitions. |
| 7657 | + Ensures that driver handles DST correctly and does not crash. |
| 7658 | + """ |
| 7659 | + try: |
| 7660 | + cursor.execute("CREATE TABLE #pytest_datetimeoffset_dst_transitions (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") |
| 7661 | + db_connection.commit() |
| 7662 | + |
| 7663 | + # Example DST transition dates (replace with actual region offset if needed) |
| 7664 | + dst_test_cases = [ |
| 7665 | + (1, datetime(2025, 3, 9, 1, 59, 59, tzinfo=timezone(timedelta(hours=-5)))), # Just before spring forward |
| 7666 | + (2, datetime(2025, 3, 9, 3, 0, 0, tzinfo=timezone(timedelta(hours=-4)))), # Just after spring forward |
| 7667 | + (3, datetime(2025, 11, 2, 1, 59, 59, tzinfo=timezone(timedelta(hours=-4)))), # Just before fall back |
| 7668 | + (4, datetime(2025, 11, 2, 1, 0, 0, tzinfo=timezone(timedelta(hours=-5)))), # Just after fall back |
| 7669 | + ] |
| 7670 | + |
| 7671 | + insert_stmt = "INSERT INTO #pytest_datetimeoffset_dst_transitions (id, dto_column) VALUES (?, ?);" |
| 7672 | + for row_id, dt in dst_test_cases: |
| 7673 | + cursor.execute(insert_stmt, row_id, dt) |
| 7674 | + db_connection.commit() |
| 7675 | + |
| 7676 | + cursor.execute("SELECT id, dto_column FROM #pytest_datetimeoffset_dst_transitions ORDER BY id;") |
| 7677 | + |
| 7678 | + for expected_id, expected_dt in dst_test_cases: |
| 7679 | + row = cursor.fetchone() |
| 7680 | + assert row is not None, f"No row fetched for id {expected_id}." |
| 7681 | + fetched_id, fetched_dt = row |
| 7682 | + |
| 7683 | + assert fetched_id == expected_id, f"ID mismatch: expected {expected_id}, got {fetched_id}" |
| 7684 | + assert fetched_dt.tzinfo is not None, f"Fetched datetime object is naive for id {fetched_id}" |
| 7685 | + |
| 7686 | + # Compare UTC time to avoid issues due to offsets changing in DST |
| 7687 | + expected_utc = expected_dt.astimezone(timezone.utc).replace(tzinfo=None) |
| 7688 | + fetched_utc = fetched_dt.astimezone(timezone.utc).replace(tzinfo=None) |
| 7689 | + assert fetched_utc == expected_utc, ( |
| 7690 | + f"Value mismatch for id {expected_id}: expected UTC {expected_utc}, got {fetched_utc}" |
| 7691 | + ) |
| 7692 | + |
| 7693 | + finally: |
| 7694 | + cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_dst_transitions;") |
| 7695 | + db_connection.commit() |
| 7696 | + |
| 7697 | +def test_datetimeoffset_leap_second(cursor, db_connection): |
| 7698 | + """Ensure driver handles leap-second-like microsecond edge cases without crashing.""" |
| 7699 | + try: |
| 7700 | + cursor.execute("CREATE TABLE #pytest_datetimeoffset_leap_second (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") |
| 7701 | + db_connection.commit() |
| 7702 | + |
| 7703 | + leap_second_sim = datetime(2023, 12, 31, 23, 59, 59, 999999, tzinfo=timezone.utc) |
| 7704 | + cursor.execute("INSERT INTO #pytest_datetimeoffset_leap_second (id, dto_column) VALUES (?, ?);", 1, leap_second_sim) |
| 7705 | + db_connection.commit() |
| 7706 | + |
| 7707 | + row = cursor.execute("SELECT dto_column FROM #pytest_datetimeoffset_leap_second;").fetchone() |
| 7708 | + assert row[0].tzinfo is not None |
| 7709 | + finally: |
| 7710 | + cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_leap_second;") |
| 7711 | + db_connection.commit() |
| 7712 | + |
| 7713 | +def test_datetimeoffset_malformed_input(cursor, db_connection): |
| 7714 | + """Verify driver raises error for invalid datetimeoffset strings.""" |
| 7715 | + try: |
| 7716 | + cursor.execute("CREATE TABLE #pytest_datetimeoffset_malformed_input (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") |
| 7717 | + db_connection.commit() |
| 7718 | + |
| 7719 | + with pytest.raises(Exception): |
| 7720 | + cursor.execute("INSERT INTO #pytest_datetimeoffset_malformed_input (id, dto_column) VALUES (?, ?);", |
| 7721 | + 1, "2023-13-45 25:61:00 +99:99") # invalid string |
| 7722 | + finally: |
| 7723 | + cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_malformed_input;") |
| 7724 | + db_connection.commit() |
| 7725 | + |
7563 | 7726 | def test_lowercase_attribute(cursor, db_connection): |
7564 | 7727 | """Test that the lowercase attribute properly converts column names to lowercase""" |
7565 | 7728 |
|
|
0 commit comments