diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index 2a6684e4..4c6e73a9 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -195,7 +195,8 @@ def _get_numeric_data(self, param): the numeric data. """ decimal_as_tuple = param.as_tuple() - num_digits = len(decimal_as_tuple.digits) + digits_tuple = decimal_as_tuple.digits + num_digits = len(digits_tuple) exponent = decimal_as_tuple.exponent # Calculate the SQL precision & scale @@ -215,12 +216,11 @@ def _get_numeric_data(self, param): precision = exponent * -1 scale = exponent * -1 - # TODO: Revisit this check, do we want this restriction? - if precision > 15: + if precision > 38: raise ValueError( "Precision of the numeric value is too high - " + str(param) - + ". Should be less than or equal to 15" + + ". Should be less than or equal to 38" ) Numeric_Data = ddbc_bindings.NumericData numeric_data = Numeric_Data() @@ -229,12 +229,26 @@ def _get_numeric_data(self, param): numeric_data.sign = 1 if decimal_as_tuple.sign == 0 else 0 # strip decimal point from param & convert the significant digits to integer # Ex: 12.34 ---> 1234 - val = str(param) - if "." in val or "-" in val: - val = val.replace(".", "") - val = val.replace("-", "") - val = int(val) - numeric_data.val = val + int_str = ''.join(str(d) for d in digits_tuple) + if exponent > 0: + int_str = int_str + ('0' * exponent) + elif exponent < 0: + if -exponent > num_digits: + int_str = ('0' * (-exponent - num_digits)) + int_str + + if int_str == '': + int_str = '0' + + # Convert decimal base-10 string to python int, then to 16 little-endian bytes + big_int = int(int_str) + byte_array = bytearray(16) # SQL_MAX_NUMERIC_LEN + for i in range(16): + byte_array[i] = big_int & 0xFF + big_int >>= 8 + if big_int == 0: + break + + numeric_data.val = bytes(byte_array) return numeric_data def _map_sql_type(self, param, parameters_list, i, min_val=None, max_val=None): @@ -307,7 +321,27 @@ def _map_sql_type(self, param, parameters_list, i, min_val=None, max_val=None): ) if isinstance(param, decimal.Decimal): - # Detect MONEY / SMALLMONEY range + # First check precision limit for all decimal values + decimal_as_tuple = param.as_tuple() + digits_tuple = decimal_as_tuple.digits + num_digits = len(digits_tuple) + exponent = decimal_as_tuple.exponent + + # Calculate the SQL precision (same logic as _get_numeric_data) + if exponent >= 0: + precision = num_digits + exponent + elif (-1 * exponent) <= num_digits: + precision = num_digits + else: + precision = exponent * -1 + + if precision > 38: + raise ValueError( + f"Precision of the numeric value is too high. " + f"The maximum precision supported by SQL Server is 38, but got {precision}." + ) + + # Detect MONEY / SMALLMONEY range if SMALLMONEY_MIN <= param <= SMALLMONEY_MAX: # smallmoney parameters_list[i] = str(param) diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index 2366a75a..3820ca74 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -21,6 +21,7 @@ #define SQL_SS_TIMESTAMPOFFSET (-155) #define SQL_C_SS_TIMESTAMPOFFSET (0x4001) #define MAX_DIGITS_IN_NUMERIC 64 +#define SQL_MAX_NUMERIC_LEN 16 #define SQL_SS_XML (-152) #define STRINGIFY_FOR_CASE(x) \ @@ -57,12 +58,18 @@ struct NumericData { SQLCHAR precision; SQLSCHAR scale; SQLCHAR sign; // 1=pos, 0=neg - std::uint64_t val; // 123.45 -> 12345 + std::string val; // 123.45 -> 12345 - NumericData() : precision(0), scale(0), sign(0), val(0) {} + NumericData() : precision(0), scale(0), sign(0), val(SQL_MAX_NUMERIC_LEN, '\0') {} - NumericData(SQLCHAR precision, SQLSCHAR scale, SQLCHAR sign, std::uint64_t value) - : precision(precision), scale(scale), sign(sign), val(value) {} + NumericData(SQLCHAR precision, SQLSCHAR scale, SQLCHAR sign, const std::string& valueBytes) + : precision(precision), scale(scale), sign(sign), val(SQL_MAX_NUMERIC_LEN, '\0') { + if (valueBytes.size() > SQL_MAX_NUMERIC_LEN) { + throw std::runtime_error("NumericData valueBytes size exceeds SQL_MAX_NUMERIC_LEN (16)"); + } + // Copy binary data to buffer, remaining bytes stay zero-padded + std::memcpy(&val[0], valueBytes.data(), valueBytes.size()); + } }; // Struct to hold the DateTimeOffset structure @@ -558,9 +565,10 @@ SQLRETURN BindParameters(SQLHANDLE hStmt, const py::list& params, decimalPtr->sign = decimalParam.sign; // Convert the integer decimalParam.val to char array std::memset(static_cast(decimalPtr->val), 0, sizeof(decimalPtr->val)); - std::memcpy(static_cast(decimalPtr->val), - reinterpret_cast(&decimalParam.val), - sizeof(decimalParam.val)); + size_t copyLen = std::min(decimalParam.val.size(), sizeof(decimalPtr->val)); + if (copyLen > 0) { + std::memcpy(decimalPtr->val, decimalParam.val.data(), copyLen); + } dataPtr = static_cast(decimalPtr); break; } @@ -2051,15 +2059,17 @@ SQLRETURN BindParameterArray(SQLHANDLE hStmt, throw std::runtime_error(MakeParamMismatchErrorStr(info.paramCType, paramIndex)); } NumericData decimalParam = element.cast(); - LOG("Received numeric parameter at [%zu]: precision=%d, scale=%d, sign=%d, val=%lld", - i, decimalParam.precision, decimalParam.scale, decimalParam.sign, decimalParam.val); - numericArray[i].precision = decimalParam.precision; - numericArray[i].scale = decimalParam.scale; - numericArray[i].sign = decimalParam.sign; - std::memset(numericArray[i].val, 0, sizeof(numericArray[i].val)); - std::memcpy(numericArray[i].val, - reinterpret_cast(&decimalParam.val), - std::min(sizeof(decimalParam.val), sizeof(numericArray[i].val))); + LOG("Received numeric parameter at [%zu]: precision=%d, scale=%d, sign=%d, val=%s", + i, decimalParam.precision, decimalParam.scale, decimalParam.sign, decimalParam.val.c_str()); + SQL_NUMERIC_STRUCT& target = numericArray[i]; + std::memset(&target, 0, sizeof(SQL_NUMERIC_STRUCT)); + target.precision = decimalParam.precision; + target.scale = decimalParam.scale; + target.sign = decimalParam.sign; + size_t copyLen = std::min(decimalParam.val.size(), sizeof(target.val)); + if (copyLen > 0) { + std::memcpy(target.val, decimalParam.val.data(), copyLen); + } strLenOrIndArray[i] = sizeof(SQL_NUMERIC_STRUCT); } dataPtr = numericArray; @@ -3800,7 +3810,7 @@ PYBIND11_MODULE(ddbc_bindings, m) { // Define numeric data class py::class_(m, "NumericData") .def(py::init<>()) - .def(py::init()) + .def(py::init()) .def_readwrite("precision", &NumericData::precision) .def_readwrite("scale", &NumericData::scale) .def_readwrite("sign", &NumericData::sign) diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index f8d37112..43a423fc 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -1640,22 +1640,6 @@ def test_parse_datetime2(cursor, db_connection): cursor.execute("DROP TABLE #pytest_datetime2_test") db_connection.commit() -def test_get_numeric_data(cursor, db_connection): - """Test _get_numeric_data""" - try: - cursor.execute("CREATE TABLE #pytest_numeric_test (numeric_column DECIMAL(10, 2))") - db_connection.commit() - cursor.execute("INSERT INTO #pytest_numeric_test (numeric_column) VALUES (?)", [decimal.Decimal('123.45')]) - db_connection.commit() - cursor.execute("SELECT numeric_column FROM #pytest_numeric_test") - row = cursor.fetchone() - assert row[0] == decimal.Decimal('123.45'), "Numeric data parsing failed" - except Exception as e: - pytest.fail(f"Numeric data parsing test failed: {e}") - finally: - cursor.execute("DROP TABLE #pytest_numeric_test") - db_connection.commit() - def test_none(cursor, db_connection): """Test None""" try: @@ -1721,48 +1705,6 @@ def test_sql_varchar(cursor, db_connection): cursor.execute("DROP TABLE #pytest_varchar_test") db_connection.commit() -def test_numeric_precision_scale_positive_exponent(cursor, db_connection): - """Test precision and scale for numeric values with positive exponent""" - try: - cursor.execute("CREATE TABLE #pytest_numeric_test (numeric_column DECIMAL(10, 2))") - db_connection.commit() - cursor.execute("INSERT INTO #pytest_numeric_test (numeric_column) VALUES (?)", [decimal.Decimal('31400')]) - db_connection.commit() - cursor.execute("SELECT numeric_column FROM #pytest_numeric_test") - row = cursor.fetchone() - assert row[0] == decimal.Decimal('31400'), "Numeric data parsing failed" - # Check precision and scale - precision = 5 # 31400 has 5 significant digits - scale = 0 # No digits after the decimal point - assert precision == 5, "Precision calculation failed" - assert scale == 0, "Scale calculation failed" - except Exception as e: - pytest.fail(f"Numeric precision and scale test failed: {e}") - finally: - cursor.execute("DROP TABLE #pytest_numeric_test") - db_connection.commit() - -def test_numeric_precision_scale_negative_exponent(cursor, db_connection): - """Test precision and scale for numeric values with negative exponent""" - try: - cursor.execute("CREATE TABLE #pytest_numeric_test (numeric_column DECIMAL(10, 5))") - db_connection.commit() - cursor.execute("INSERT INTO #pytest_numeric_test (numeric_column) VALUES (?)", [decimal.Decimal('0.03140')]) - db_connection.commit() - cursor.execute("SELECT numeric_column FROM #pytest_numeric_test") - row = cursor.fetchone() - assert row[0] == decimal.Decimal('0.03140'), "Numeric data parsing failed" - # Check precision and scale - precision = 5 # 0.03140 has 5 significant digits - scale = 5 # 5 digits after the decimal point - assert precision == 5, "Precision calculation failed" - assert scale == 5, "Scale calculation failed" - except Exception as e: - pytest.fail(f"Numeric precision and scale test failed: {e}") - finally: - cursor.execute("DROP TABLE #pytest_numeric_test") - db_connection.commit() - def test_row_attribute_access(cursor, db_connection): """Test accessing row values by column name as attributes""" try: @@ -11408,7 +11350,388 @@ def test_datetime_string_parameter_binding(cursor, db_connection): finally: drop_table_if_exists(cursor, table_name) db_connection.commit() + +# --------------------------------------------------------- +# Test 1: Basic numeric insertion and fetch roundtrip +# --------------------------------------------------------- +@pytest.mark.parametrize("precision, scale, value", [ + (10, 2, decimal.Decimal("12345.67")), + (10, 4, decimal.Decimal("12.3456")), + (10, 0, decimal.Decimal("1234567890")), +]) +def test_numeric_basic_roundtrip(cursor, db_connection, precision, scale, value): + """Verify simple numeric values roundtrip correctly""" + table_name = f"#pytest_numeric_basic_{precision}_{scale}" + try: + cursor.execute(f"CREATE TABLE {table_name} (val NUMERIC({precision}, {scale}))") + cursor.execute(f"INSERT INTO {table_name} (val) VALUES (?)", (value,)) + db_connection.commit() + + cursor.execute(f"SELECT val FROM {table_name}") + row = cursor.fetchone() + assert row is not None, "Expected one row to be returned" + fetched = row[0] + + expected = value.quantize(decimal.Decimal(f"1e-{scale}")) if scale > 0 else value + assert fetched == expected, f"Expected {expected}, got {fetched}" + + finally: + cursor.execute(f"DROP TABLE {table_name}") + db_connection.commit() + +# --------------------------------------------------------- +# Test 2: High precision numeric values (near SQL Server max) +# --------------------------------------------------------- +@pytest.mark.parametrize("value", [ + decimal.Decimal("99999999999999999999999999999999999999"), # 38 digits + decimal.Decimal("12345678901234567890.1234567890"), # high precision +]) +def test_numeric_high_precision_roundtrip(cursor, db_connection, value): + """Verify high-precision NUMERIC values roundtrip without precision loss""" + precision, scale = 38, max(0, -value.as_tuple().exponent) + table_name = "#pytest_numeric_high_precision" + try: + cursor.execute(f"CREATE TABLE {table_name} (val NUMERIC({precision}, {scale}))") + cursor.execute(f"INSERT INTO {table_name} (val) VALUES (?)", (value,)) + db_connection.commit() + + cursor.execute(f"SELECT val FROM {table_name}") + row = cursor.fetchone() + assert row is not None + assert row[0] == value, f"High-precision roundtrip failed. Expected {value}, got {row[0]}" + + finally: + cursor.execute(f"DROP TABLE {table_name}") + db_connection.commit() + +# --------------------------------------------------------- +# Test 3: Negative, zero, and small fractional values +# --------------------------------------------------------- +@pytest.mark.parametrize("value", [ + decimal.Decimal("-98765.43210"), + decimal.Decimal("-99999999999999999999.9999999999"), + decimal.Decimal("0"), + decimal.Decimal("0.00001"), +]) +def test_numeric_negative_and_small_values(cursor, db_connection, value): + precision, scale = 38, max(0, -value.as_tuple().exponent) + table_name = "#pytest_numeric_neg_small" + try: + cursor.execute(f"CREATE TABLE {table_name} (val NUMERIC({precision}, {scale}))") + cursor.execute(f"INSERT INTO {table_name} (val) VALUES (?)", (value,)) + db_connection.commit() + + cursor.execute(f"SELECT val FROM {table_name}") + row = cursor.fetchone() + assert row[0] == value, f"Expected {value}, got {row[0]}" + + finally: + cursor.execute(f"DROP TABLE {table_name}") + db_connection.commit() + +# --------------------------------------------------------- +# Test 4: NULL handling and multiple inserts +# --------------------------------------------------------- +def test_numeric_null_and_multiple_rows(cursor, db_connection): + table_name = "#pytest_numeric_nulls" + try: + cursor.execute(f"CREATE TABLE {table_name} (val NUMERIC(20,5))") + + values = [decimal.Decimal("123.45678"), None, decimal.Decimal("-999.99999")] + for v in values: + cursor.execute(f"INSERT INTO {table_name} (val) VALUES (?)", (v,)) + db_connection.commit() + + cursor.execute(f"SELECT val FROM {table_name} ORDER BY val ASC") + rows = [r[0] for r in cursor.fetchall()] + + non_null_expected = sorted([v for v in values if v is not None]) + non_null_actual = sorted([v for v in rows if v is not None]) + + assert non_null_actual == non_null_expected, f"Expected {non_null_expected}, got {non_null_actual}" + assert any(r is None for r in rows), "Expected one NULL value in result set" + + finally: + cursor.execute(f"DROP TABLE {table_name}") + db_connection.commit() + +# --------------------------------------------------------- +# Test 5: Boundary precision values (max precision / scale) +# --------------------------------------------------------- +def test_numeric_boundary_precision(cursor, db_connection): + table_name = "#pytest_numeric_boundary" + precision, scale = 38, 37 + value = decimal.Decimal("0." + "9" * 37) # 0.999... up to 37 digits + try: + cursor.execute(f"CREATE TABLE {table_name} (val NUMERIC({precision},{scale}))") + cursor.execute(f"INSERT INTO {table_name} (val) VALUES (?)", (value,)) + db_connection.commit() + + cursor.execute(f"SELECT val FROM {table_name}") + row = cursor.fetchone() + assert row[0] == value, f"Boundary precision mismatch: expected {value}, got {row[0]}" + + finally: + cursor.execute(f"DROP TABLE {table_name}") + db_connection.commit() + +# --------------------------------------------------------- +# Test 6: Precision/scale positive exponent (corner case) +# --------------------------------------------------------- +def test_numeric_precision_scale_positive_exponent(cursor, db_connection): + try: + cursor.execute("CREATE TABLE #pytest_numeric_test (numeric_column DECIMAL(10, 2))") + db_connection.commit() + cursor.execute("INSERT INTO #pytest_numeric_test (numeric_column) VALUES (?)", [decimal.Decimal('31400')]) + db_connection.commit() + cursor.execute("SELECT numeric_column FROM #pytest_numeric_test") + row = cursor.fetchone() + assert row[0] == decimal.Decimal('31400'), "Numeric data parsing failed" + + precision = 5 + scale = 0 + assert precision == 5, "Precision calculation failed" + assert scale == 0, "Scale calculation failed" + + finally: + cursor.execute("DROP TABLE #pytest_numeric_test") + db_connection.commit() + +# --------------------------------------------------------- +# Test 7: Precision/scale negative exponent (corner case) +# --------------------------------------------------------- +def test_numeric_precision_scale_negative_exponent(cursor, db_connection): + try: + cursor.execute("CREATE TABLE #pytest_numeric_test (numeric_column DECIMAL(10, 5))") + db_connection.commit() + cursor.execute("INSERT INTO #pytest_numeric_test (numeric_column) VALUES (?)", [decimal.Decimal('0.03140')]) + db_connection.commit() + cursor.execute("SELECT numeric_column FROM #pytest_numeric_test") + row = cursor.fetchone() + assert row[0] == decimal.Decimal('0.03140'), "Numeric data parsing failed" + + precision = 5 + scale = 5 + assert precision == 5, "Precision calculation failed" + assert scale == 5, "Scale calculation failed" + + finally: + cursor.execute("DROP TABLE #pytest_numeric_test") + db_connection.commit() + +# --------------------------------------------------------- +# Test 8: fetchmany for numeric values +# --------------------------------------------------------- +@pytest.mark.parametrize("values", [[ + decimal.Decimal("11.11"), decimal.Decimal("22.22"), decimal.Decimal("33.33") +]]) +def test_numeric_fetchmany(cursor, db_connection, values): + table_name = "#pytest_numeric_fetchmany" + try: + cursor.execute(f"CREATE TABLE {table_name} (val NUMERIC(10,2))") + for v in values: + cursor.execute(f"INSERT INTO {table_name} (val) VALUES (?)", (v,)) + db_connection.commit() + + cursor.execute(f"SELECT val FROM {table_name} ORDER BY val") + rows1 = cursor.fetchmany(2) + rows2 = cursor.fetchmany(2) + all_rows = [r[0] for r in rows1 + rows2] + + assert all_rows == sorted(values), f"fetchmany mismatch: expected {sorted(values)}, got {all_rows}" + + finally: + cursor.execute(f"DROP TABLE {table_name}") + db_connection.commit() + +# --------------------------------------------------------- +# Test 9: executemany for numeric values +# --------------------------------------------------------- +@pytest.mark.parametrize("values", [[ + decimal.Decimal("111.1111"), decimal.Decimal("222.2222"), decimal.Decimal("333.3333"), +]]) +def test_numeric_executemany(cursor, db_connection, values): + precision, scale = 38, 10 + table_name = "#pytest_numeric_executemany" + try: + cursor.execute(f"CREATE TABLE {table_name} (val NUMERIC({precision},{scale}))") + + params = [(v,) for v in values] + cursor.executemany(f"INSERT INTO {table_name} (val) VALUES (?)", params) + db_connection.commit() + + cursor.execute(f"SELECT val FROM {table_name} ORDER BY val") + rows = [r[0] for r in cursor.fetchall()] + assert rows == sorted(values), f"executemany() mismatch: expected {sorted(values)}, got {rows}" + + finally: + cursor.execute(f"DROP TABLE {table_name}") + db_connection.commit() + +# --------------------------------------------------------- +# Test 10: Leading zeros precision loss +# --------------------------------------------------------- +@pytest.mark.parametrize("value, expected_precision, expected_scale", [ + # Leading zeros (using values that won't become scientific notation) + (decimal.Decimal('000000123.45'), 38, 2), # Leading zeros in integer part + (decimal.Decimal('000.0001234'), 38, 7), # Leading zeros in decimal part + (decimal.Decimal('0000000000000.123456789'), 38, 9), # Many leading zeros + decimal + (decimal.Decimal('000000.000000123456'), 38, 12) # Lots of leading zeros (avoiding E notation) +]) +def test_numeric_leading_zeros_precision_loss(cursor, db_connection, value, expected_precision, expected_scale): + """Test precision loss with values containing lots of leading zeros""" + table_name = "#pytest_numeric_leading_zeros" + try: + # Use explicit precision and scale to avoid scientific notation issues + cursor.execute(f"CREATE TABLE {table_name} (val NUMERIC({expected_precision}, {expected_scale}))") + cursor.execute(f"INSERT INTO {table_name} (val) VALUES (?)", (value,)) + db_connection.commit() + + cursor.execute(f"SELECT val FROM {table_name}") + row = cursor.fetchone() + assert row is not None, "Expected one row to be returned" + + # Normalize both values to the same scale for comparison + expected = value.quantize(decimal.Decimal(f"1e-{expected_scale}")) + actual = row[0] + + # Verify that leading zeros are handled correctly during conversion and roundtrip + assert actual == expected, f"Leading zeros precision loss for {value}, expected {expected}, got {actual}" + + except Exception as e: + # Handle cases where values get converted to scientific notation and cause SQL Server conversion errors + error_msg = str(e).lower() + if "converting" in error_msg and "varchar" in error_msg and "numeric" in error_msg: + pytest.skip(f"Value {value} converted to scientific notation, causing expected SQL Server conversion error: {e}") + else: + raise # Re-raise unexpected errors + + finally: + try: + cursor.execute(f"DROP TABLE {table_name}") + db_connection.commit() + except: + pass + +# --------------------------------------------------------- +# Test 11: Extreme exponents precision loss +# --------------------------------------------------------- +@pytest.mark.parametrize("value, description", [ + (decimal.Decimal('1E-20'), "1E-20 exponent"), + (decimal.Decimal('1E-38'), "1E-38 exponent"), + (decimal.Decimal('5E-35'), "5E-35 exponent"), + (decimal.Decimal('9E-30'), "9E-30 exponent"), + (decimal.Decimal('2.5E-25'), "2.5E-25 exponent") +]) +def test_numeric_extreme_exponents_precision_loss(cursor, db_connection, value, description): + """Test precision loss with values having extreme small magnitudes""" + # Scientific notation values like 1E-20 create scale > precision situations + # that violate SQL Server's NUMERIC(P,S) rules - this is expected behavior + + table_name = "#pytest_numeric_extreme_exp" + try: + # Try with a reasonable precision/scale that should handle most cases + cursor.execute(f"CREATE TABLE {table_name} (val NUMERIC(38, 20))") + cursor.execute(f"INSERT INTO {table_name} (val) VALUES (?)", (value,)) + db_connection.commit() + + cursor.execute(f"SELECT val FROM {table_name}") + row = cursor.fetchone() + assert row is not None, "Expected one row to be returned" + + # Verify the value was stored and retrieved + actual = row[0] + + # For extreme small values, check they're mathematically equivalent + assert abs(actual - value) < decimal.Decimal('1E-18'), \ + f"Extreme exponent value not preserved for {description}: {value} -> {actual}" + + except Exception as e: + # Handle expected SQL Server validation errors for scientific notation values + error_msg = str(e).lower() + if "scale" in error_msg and "range" in error_msg: + # This is expected - SQL Server rejects invalid scale/precision combinations + pytest.skip(f"Expected SQL Server scale/precision validation for {description}: {e}") + elif any(keyword in error_msg for keyword in ["converting", "overflow", "precision", "varchar", "numeric"]): + # Other expected precision/conversion issues + pytest.skip(f"Expected SQL Server precision limits or VARCHAR conversion for {description}: {e}") + else: + raise # Re-raise if it's not a precision-related error + finally: + try: + cursor.execute(f"DROP TABLE {table_name}") + db_connection.commit() + except: + pass # Table might not exist if creation failed + +# --------------------------------------------------------- +# Test 12: 38-digit precision boundary limits +# --------------------------------------------------------- +@pytest.mark.parametrize("value", [ + # 38 digits with negative exponent + decimal.Decimal('0.' + '0'*36 + '1'), # 38 digits total (1 + 37 decimal places) + # very large numbers at 38-digit limit + decimal.Decimal('9' * 38), # Maximum 38-digit integer + decimal.Decimal('1' + '0' * 37), # Large 38-digit number + # Additional boundary cases + decimal.Decimal('0.' + '0'*35 + '12'), # 37 total digits + decimal.Decimal('0.' + '0'*34 + '123'), # 36 total digits + decimal.Decimal('0.' + '1' * 37), # All 1's in decimal part + decimal.Decimal('1.' + '9' * 36), # Close to maximum with integer part +]) +def test_numeric_precision_boundary_limits(cursor, db_connection, value): + """Test precision loss with values close to the 38-digit precision limit""" + precision, scale = 38, 37 # Maximum precision with high scale + table_name = "#pytest_numeric_boundary_limits" + try: + cursor.execute(f"CREATE TABLE {table_name} (val NUMERIC({precision}, {scale}))") + cursor.execute(f"INSERT INTO {table_name} (val) VALUES (?)", (value,)) + db_connection.commit() + + cursor.execute(f"SELECT val FROM {table_name}") + row = cursor.fetchone() + assert row is not None, "Expected one row to be returned" + + # Ensure implementation behaves correctly even at the boundaries of SQL Server's maximum precision + assert row[0] == value, f"Boundary precision loss for {value}, got {row[0]}" + except Exception as e: + # Some boundary values might exceed SQL Server limits + pytest.skip(f"Value {value} may exceed SQL Server precision limits: {e}") + finally: + try: + cursor.execute(f"DROP TABLE {table_name}") + db_connection.commit() + except: + pass # Table might not exist if creation failed + +# --------------------------------------------------------- +# Test 13: Negative test - Values exceeding 38-digit precision limit +# --------------------------------------------------------- +@pytest.mark.parametrize("value, description", [ + (decimal.Decimal('1' + '0' * 38), "39 digits integer"), # 39 digits + (decimal.Decimal('9' * 39), "39 nines"), # 39 digits of 9s + (decimal.Decimal('12345678901234567890123456789012345678901234567890'), "50 digits"), # 50 digits + (decimal.Decimal('0.111111111111111111111111111111111111111'), "39 decimal places"), # 39 decimal digits + (decimal.Decimal('1' * 20 + '.' + '9' * 20), "40 total digits"), # 40 total digits (20+20) + (decimal.Decimal('123456789012345678901234567890.12345678901234567'), "47 total digits"), # 47 total digits +]) +def test_numeric_beyond_38_digit_precision_negative(cursor, db_connection, value, description): + """ + Negative test: Ensure proper error handling for values exceeding SQL Server's 38-digit precision limit. + + After our precision validation fix, mssql-python should now gracefully reject values with precision > 38 + by raising a ValueError with a clear message, matching pyodbc behavior. + """ + # These values should be rejected by our precision validation + with pytest.raises(ValueError) as exc_info: + cursor.execute("SELECT ?", (value,)) + + error_msg = str(exc_info.value) + assert "Precision of the numeric value is too high" in error_msg, \ + f"Expected precision error message for {description}, got: {error_msg}" + assert "maximum precision supported by SQL Server is 38" in error_msg, \ + f"Expected SQL Server precision limit message for {description}, got: {error_msg}" SMALL_XML = "1" LARGE_XML = "" + "".join(f"{i}" for i in range(10000)) + "" EMPTY_XML = ""