@@ -1454,12 +1454,35 @@ def _wait_until_data_rows_are_processed(
1454
1454
""" Wait until all the specified data rows are processed"""
1455
1455
start_time = datetime .now ()
1456
1456
1457
+ max_data_rows_per_poll = 100_000
1458
+ if data_row_ids is not None :
1459
+ for i in range (0 , len (data_row_ids ), max_data_rows_per_poll ):
1460
+ chunk = data_row_ids [i :i + max_data_rows_per_poll ]
1461
+ self ._poll_data_row_processing_status (
1462
+ chunk , [], start_time , wait_processing_max_seconds ,
1463
+ sleep_interval )
1464
+
1465
+ if global_keys is not None :
1466
+ for i in range (0 , len (global_keys ), max_data_rows_per_poll ):
1467
+ chunk = global_keys [i :i + max_data_rows_per_poll ]
1468
+ self ._poll_data_row_processing_status (
1469
+ [], chunk , start_time , wait_processing_max_seconds ,
1470
+ sleep_interval )
1471
+
1472
+ def _poll_data_row_processing_status (
1473
+ self ,
1474
+ data_row_ids : List [str ],
1475
+ global_keys : List [str ],
1476
+ start_time : datetime ,
1477
+ wait_processing_max_seconds : int = _wait_processing_max_seconds ,
1478
+ sleep_interval = 30 ):
1479
+
1457
1480
while True :
1458
1481
if (datetime .now () -
1459
1482
start_time ).total_seconds () >= wait_processing_max_seconds :
1460
1483
raise ProcessingWaitTimeout (
1461
- "Maximum wait time exceeded while waiting for data rows to be processed. Try creating a batch a bit later"
1462
- )
1484
+ """ Maximum wait time exceeded while waiting for data rows to be processed.
1485
+ Try creating a batch a bit later""" )
1463
1486
1464
1487
all_good = self .__check_data_rows_have_been_processed (
1465
1488
data_row_ids , global_keys )
0 commit comments