Skip to content

Commit f0b5959

Browse files
FEAT: streaming support in executemany (#251)
### Work Item / Issue Reference <!-- IMPORTANT: Please follow the PR template guidelines below. For mssql-python maintainers: Insert your ADO Work Item ID below (e.g. AB#37452) For external contributors: Insert Github Issue number below (e.g. #149) Only one reference is required - either GitHub issue OR ADO Work Item. --> <!-- mssql-python maintainers: ADO Work Item --> > [AB#38645](https://sqlclientdrivers.visualstudio.com/c6d89619-62de-46a0-8b46-70b92a84d85e/_workitems/edit/38645) <!-- External contributors: GitHub Issue --> > GitHub Issue: #<ISSUE_NUMBER> ------------------------------------------------------------------- ### Summary <!-- Insert your summary of changes below. Minimum 10 characters required. --> This pull request introduces robust support for streaming large values (such as NVARCHAR(MAX), VARCHAR(MAX), VARBINARY(MAX)) with the `executemany` method, ensuring correct handling of parameters that require Data-At-Execution (DAE) or streaming. It also adds comprehensive tests for these scenarios and improves logging and code clarity throughout the codebase. **Streaming and DAE parameter handling:** * The `executemany` method in `mssql_python/cursor.py` now automatically detects DAE/streaming parameters and falls back to row-by-row execution when necessary, ensuring correct behavior for large values and streaming inserts. [[1]](diffhunk://#diff-deceea46ae01082ce8400e14fa02f4b7585afb7b5ed9885338b66494f5f38280L1567-L1579) [[2]](diffhunk://#diff-deceea46ae01082ce8400e14fa02f4b7585afb7b5ed9885338b66494f5f38280R1597) [[3]](diffhunk://#diff-deceea46ae01082ce8400e14fa02f4b7585afb7b5ed9885338b66494f5f38280R1701-R1709) * In the C++ binding (`mssql_python/pybind/ddbc_bindings.cpp`), the `SQLExecuteMany_wrap` function checks for DAE parameters and either performs fast array execution or falls back to row-by-row execution with streaming, supporting both text and binary data. **Testing improvements:** * Added new tests in `tests/test_004_cursor.py` to verify streaming inserts and fetches for NVARCHAR(MAX), VARCHAR(MAX), and VARBINARY(MAX) columns using `executemany` and all fetch modes. **Sample usage and demonstration:** * Updated `main.py` to include a sample script that demonstrates streaming inserts and fetches of large NVARCHAR(MAX) values using `executemany` and different fetch modes. **Logging and code clarity:** * Improved debug logging for parameter handling and DAE detection, and clarified log messages for LOB (Large Object) column handling in fetch operations. [[1]](diffhunk://#diff-deceea46ae01082ce8400e14fa02f4b7585afb7b5ed9885338b66494f5f38280L1001-R1013) [[2]](diffhunk://#diff-dde2297345718ec449a14e7dff91b7bb2342b008ecc071f562233646d71144a1L2216-R2276) [[3]](diffhunk://#diff-dde2297345718ec449a14e7dff91b7bb2342b008ecc071f562233646d71144a1L3273-R3333) [[4]](diffhunk://#diff-dde2297345718ec449a14e7dff91b7bb2342b008ecc071f562233646d71144a1L3395-R3455) <!-- ### PR Title Guide > For feature requests FEAT: (short-description) > For non-feature requests like test case updates, config updates , dependency updates etc CHORE: (short-description) > For Fix requests FIX: (short-description) > For doc update requests DOC: (short-description) > For Formatting, indentation, or styling update STYLE: (short-description) > For Refactor, without any feature changes REFACTOR: (short-description) > For release related changes, without any feature changes RELEASE: #<RELEASE_VERSION> (short-description) ### Contribution Guidelines External contributors: - Create a GitHub issue first: https://github.com/microsoft/mssql-python/issues/new - Link the GitHub issue in the "GitHub Issue" section above - Follow the PR title format and provide a meaningful summary mssql-python maintainers: - Create an ADO Work Item following internal processes - Link the ADO Work Item in the "ADO Work Item" section above - Follow the PR title format and provide a meaningful summary --> --------- Co-authored-by: Gaurav Sharma <[email protected]>
1 parent 0d745b1 commit f0b5959

File tree

3 files changed

+163
-16
lines changed

3 files changed

+163
-16
lines changed

mssql_python/cursor.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1529,22 +1529,19 @@ def _compute_column_type(self, column):
15291529
sample_value = v
15301530

15311531
return sample_value, None, None
1532-
1532+
15331533
def executemany(self, operation: str, seq_of_parameters: list) -> None:
15341534
"""
15351535
Prepare a database operation and execute it against all parameter sequences.
15361536
This version uses column-wise parameter binding and a single batched SQLExecute().
15371537
Args:
15381538
operation: SQL query or command.
15391539
seq_of_parameters: Sequence of sequences or mappings of parameters.
1540-
15411540
Raises:
15421541
Error: If the operation fails.
15431542
"""
15441543
self._check_closed()
15451544
self._reset_cursor()
1546-
1547-
# Clear any previous messages
15481545
self.messages = []
15491546

15501547
if not seq_of_parameters:
@@ -1570,6 +1567,7 @@ def executemany(self, operation: str, seq_of_parameters: list) -> None:
15701567
param_count = len(sample_row)
15711568
param_info = ddbc_bindings.ParamInfo
15721569
parameters_type = []
1570+
any_dae = False
15731571

15741572
# Check if we have explicit input sizes set
15751573
if self._inputsizes:
@@ -1673,6 +1671,14 @@ def executemany(self, operation: str, seq_of_parameters: list) -> None:
16731671
paraminfo.columnSize = max(max_binary_size, 1)
16741672

16751673
parameters_type.append(paraminfo)
1674+
if paraminfo.isDAE:
1675+
any_dae = True
1676+
1677+
if any_dae:
1678+
log('debug', "DAE parameters detected. Falling back to row-by-row execution with streaming.")
1679+
for row in seq_of_parameters:
1680+
self.execute(operation, row)
1681+
return
16761682

16771683
# Process parameters into column-wise format with possible type conversions
16781684
# First, convert any Decimal types as needed for NUMERIC/DECIMAL columns
@@ -1705,8 +1711,7 @@ def executemany(self, operation: str, seq_of_parameters: list) -> None:
17051711
log('debug', "Executing batch query with %d parameter sets:\n%s",
17061712
len(seq_of_parameters), "\n".join(f" {i+1}: {tuple(p) if isinstance(p, (list, tuple)) else p}" for i, p in enumerate(seq_of_parameters[:5])) # Limit to first 5 rows for large batches
17071713
)
1708-
1709-
# Execute batched statement
1714+
17101715
ret = ddbc_bindings.SQLExecuteMany(
17111716
self.hstmt,
17121717
operation,

mssql_python/pybind/ddbc_bindings.cpp

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2000,6 +2000,7 @@ SQLRETURN SQLExecuteMany_wrap(const SqlHandlePtr statementHandle,
20002000
size_t paramSetSize) {
20012001
SQLHANDLE hStmt = statementHandle->get();
20022002
SQLWCHAR* queryPtr;
2003+
20032004
#if defined(__APPLE__) || defined(__linux__)
20042005
std::vector<SQLWCHAR> queryBuffer = WStringToSQLWCHAR(query);
20052006
queryPtr = queryBuffer.data();
@@ -2008,15 +2009,63 @@ SQLRETURN SQLExecuteMany_wrap(const SqlHandlePtr statementHandle,
20082009
#endif
20092010
RETCODE rc = SQLPrepare_ptr(hStmt, queryPtr, SQL_NTS);
20102011
if (!SQL_SUCCEEDED(rc)) return rc;
2011-
std::vector<std::shared_ptr<void>> paramBuffers;
2012-
rc = BindParameterArray(hStmt, columnwise_params, paramInfos, paramSetSize, paramBuffers);
2013-
if (!SQL_SUCCEEDED(rc)) return rc;
2014-
rc = SQLSetStmtAttr_ptr(hStmt, SQL_ATTR_PARAMSET_SIZE, (SQLPOINTER)paramSetSize, 0);
2015-
if (!SQL_SUCCEEDED(rc)) return rc;
2016-
rc = SQLExecute_ptr(hStmt);
2017-
return rc;
2012+
2013+
bool hasDAE = false;
2014+
for (const auto& p : paramInfos) {
2015+
if (p.isDAE) {
2016+
hasDAE = true;
2017+
break;
2018+
}
2019+
}
2020+
if (!hasDAE) {
2021+
std::vector<std::shared_ptr<void>> paramBuffers;
2022+
rc = BindParameterArray(hStmt, columnwise_params, paramInfos, paramSetSize, paramBuffers);
2023+
if (!SQL_SUCCEEDED(rc)) return rc;
2024+
2025+
rc = SQLSetStmtAttr_ptr(hStmt, SQL_ATTR_PARAMSET_SIZE, (SQLPOINTER)paramSetSize, 0);
2026+
if (!SQL_SUCCEEDED(rc)) return rc;
2027+
2028+
rc = SQLExecute_ptr(hStmt);
2029+
return rc;
2030+
} else {
2031+
size_t rowCount = columnwise_params.size();
2032+
for (size_t rowIndex = 0; rowIndex < rowCount; ++rowIndex) {
2033+
py::list rowParams = columnwise_params[rowIndex];
2034+
2035+
std::vector<std::shared_ptr<void>> paramBuffers;
2036+
rc = BindParameters(hStmt, rowParams, const_cast<std::vector<ParamInfo>&>(paramInfos), paramBuffers);
2037+
if (!SQL_SUCCEEDED(rc)) return rc;
2038+
2039+
rc = SQLExecute_ptr(hStmt);
2040+
while (rc == SQL_NEED_DATA) {
2041+
SQLPOINTER token;
2042+
rc = SQLParamData_ptr(hStmt, &token);
2043+
if (!SQL_SUCCEEDED(rc) && rc != SQL_NEED_DATA) return rc;
2044+
2045+
py::object* py_obj_ptr = reinterpret_cast<py::object*>(token);
2046+
if (!py_obj_ptr) return SQL_ERROR;
2047+
2048+
if (py::isinstance<py::str>(*py_obj_ptr)) {
2049+
std::string data = py_obj_ptr->cast<std::string>();
2050+
SQLLEN data_len = static_cast<SQLLEN>(data.size());
2051+
rc = SQLPutData_ptr(hStmt, (SQLPOINTER)data.c_str(), data_len);
2052+
} else if (py::isinstance<py::bytes>(*py_obj_ptr) || py::isinstance<py::bytearray>(*py_obj_ptr)) {
2053+
std::string data = py_obj_ptr->cast<std::string>();
2054+
SQLLEN data_len = static_cast<SQLLEN>(data.size());
2055+
rc = SQLPutData_ptr(hStmt, (SQLPOINTER)data.c_str(), data_len);
2056+
} else {
2057+
LOG("Unsupported DAE parameter type in row {}", rowIndex);
2058+
return SQL_ERROR;
2059+
}
2060+
}
2061+
2062+
if (!SQL_SUCCEEDED(rc)) return rc;
2063+
}
2064+
return SQL_SUCCESS;
2065+
}
20182066
}
20192067

2068+
20202069
// Wrap SQLNumResultCols
20212070
SQLSMALLINT SQLNumResultCols_wrap(SqlHandlePtr statementHandle) {
20222071
LOG("Get number of columns in result set");
@@ -2213,7 +2262,7 @@ static py::object FetchLobColumnData(SQLHSTMT hStmt,
22132262
LOG("Loop {}: Appended {} bytes", loopCount, bytesRead);
22142263
}
22152264
if (ret == SQL_SUCCESS) {
2216-
LOG("Loop {}: SQL_SUCCESS no more data", loopCount);
2265+
LOG("Loop {}: SQL_SUCCESS, no more data", loopCount);
22172266
break;
22182267
}
22192268
}
@@ -3270,7 +3319,7 @@ SQLRETURN FetchMany_wrap(SqlHandlePtr StatementHandle, py::list& rows, int fetch
32703319

32713320
// If we have LOBs → fall back to row-by-row fetch + SQLGetData_wrap
32723321
if (!lobColumns.empty()) {
3273-
LOG("LOB columns detected using per-row SQLGetData path");
3322+
LOG("LOB columns detected, using per-row SQLGetData path");
32743323
while (true) {
32753324
ret = SQLFetch_ptr(hStmt);
32763325
if (ret == SQL_NO_DATA) break;
@@ -3392,7 +3441,7 @@ SQLRETURN FetchAll_wrap(SqlHandlePtr StatementHandle, py::list& rows) {
33923441

33933442
// If we have LOBs → fall back to row-by-row fetch + SQLGetData_wrap
33943443
if (!lobColumns.empty()) {
3395-
LOG("LOB columns detected using per-row SQLGetData path");
3444+
LOG("LOB columns detected, using per-row SQLGetData path");
33963445
while (true) {
33973446
ret = SQLFetch_ptr(hStmt);
33983447
if (ret == SQL_NO_DATA) break;

tests/test_004_cursor.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10661,6 +10661,99 @@ def test_decimal_separator_calculations(cursor, db_connection):
1066110661
# Cleanup
1066210662
cursor.execute("DROP TABLE IF EXISTS #pytest_decimal_calc_test")
1066310663

10664+
def test_nvarcharmax_executemany_streaming(cursor, db_connection):
10665+
"""Streaming insert + fetch > 4k NVARCHAR(MAX) using executemany with all fetch modes."""
10666+
try:
10667+
values = ["Ω" * 4100, "漢" * 5000]
10668+
cursor.execute("CREATE TABLE #pytest_nvarcharmax (col NVARCHAR(MAX))")
10669+
db_connection.commit()
10670+
10671+
# --- executemany insert ---
10672+
cursor.executemany("INSERT INTO #pytest_nvarcharmax VALUES (?)", [(v,) for v in values])
10673+
db_connection.commit()
10674+
10675+
# --- fetchall ---
10676+
cursor.execute("SELECT col FROM #pytest_nvarcharmax ORDER BY LEN(col)")
10677+
rows = [r[0] for r in cursor.fetchall()]
10678+
assert rows == sorted(values, key=len)
10679+
10680+
# --- fetchone ---
10681+
cursor.execute("SELECT col FROM #pytest_nvarcharmax ORDER BY LEN(col)")
10682+
r1 = cursor.fetchone()[0]
10683+
r2 = cursor.fetchone()[0]
10684+
assert {r1, r2} == set(values)
10685+
assert cursor.fetchone() is None
10686+
10687+
# --- fetchmany ---
10688+
cursor.execute("SELECT col FROM #pytest_nvarcharmax ORDER BY LEN(col)")
10689+
batch = [r[0] for r in cursor.fetchmany(1)]
10690+
assert batch[0] in values
10691+
finally:
10692+
cursor.execute("DROP TABLE #pytest_nvarcharmax")
10693+
db_connection.commit()
10694+
10695+
def test_varcharmax_executemany_streaming(cursor, db_connection):
10696+
"""Streaming insert + fetch > 4k VARCHAR(MAX) using executemany with all fetch modes."""
10697+
try:
10698+
values = ["A" * 4100, "B" * 5000]
10699+
cursor.execute("CREATE TABLE #pytest_varcharmax (col VARCHAR(MAX))")
10700+
db_connection.commit()
10701+
10702+
# --- executemany insert ---
10703+
cursor.executemany("INSERT INTO #pytest_varcharmax VALUES (?)", [(v,) for v in values])
10704+
db_connection.commit()
10705+
10706+
# --- fetchall ---
10707+
cursor.execute("SELECT col FROM #pytest_varcharmax ORDER BY LEN(col)")
10708+
rows = [r[0] for r in cursor.fetchall()]
10709+
assert rows == sorted(values, key=len)
10710+
10711+
# --- fetchone ---
10712+
cursor.execute("SELECT col FROM #pytest_varcharmax ORDER BY LEN(col)")
10713+
r1 = cursor.fetchone()[0]
10714+
r2 = cursor.fetchone()[0]
10715+
assert {r1, r2} == set(values)
10716+
assert cursor.fetchone() is None
10717+
10718+
# --- fetchmany ---
10719+
cursor.execute("SELECT col FROM #pytest_varcharmax ORDER BY LEN(col)")
10720+
batch = [r[0] for r in cursor.fetchmany(1)]
10721+
assert batch[0] in values
10722+
finally:
10723+
cursor.execute("DROP TABLE #pytest_varcharmax")
10724+
db_connection.commit()
10725+
10726+
def test_varbinarymax_executemany_streaming(cursor, db_connection):
10727+
"""Streaming insert + fetch > 4k VARBINARY(MAX) using executemany with all fetch modes."""
10728+
try:
10729+
values = [b"\x01" * 4100, b"\x02" * 5000]
10730+
cursor.execute("CREATE TABLE #pytest_varbinarymax (col VARBINARY(MAX))")
10731+
db_connection.commit()
10732+
10733+
# --- executemany insert ---
10734+
cursor.executemany("INSERT INTO #pytest_varbinarymax VALUES (?)", [(v,) for v in values])
10735+
db_connection.commit()
10736+
10737+
# --- fetchall ---
10738+
cursor.execute("SELECT col FROM #pytest_varbinarymax ORDER BY DATALENGTH(col)")
10739+
rows = [r[0] for r in cursor.fetchall()]
10740+
assert rows == sorted(values, key=len)
10741+
10742+
# --- fetchone ---
10743+
cursor.execute("SELECT col FROM #pytest_varbinarymax ORDER BY DATALENGTH(col)")
10744+
r1 = cursor.fetchone()[0]
10745+
r2 = cursor.fetchone()[0]
10746+
assert {r1, r2} == set(values)
10747+
assert cursor.fetchone() is None
10748+
10749+
# --- fetchmany ---
10750+
cursor.execute("SELECT col FROM #pytest_varbinarymax ORDER BY DATALENGTH(col)")
10751+
batch = [r[0] for r in cursor.fetchmany(1)]
10752+
assert batch[0] in values
10753+
finally:
10754+
cursor.execute("DROP TABLE #pytest_varbinarymax")
10755+
db_connection.commit()
10756+
1066410757
def test_date_string_parameter_binding(cursor, db_connection):
1066510758
"""Verify that date-like strings are treated as strings in parameter binding"""
1066610759
table_name = "#pytest_date_string"

0 commit comments

Comments
 (0)