Skip to content

Commit 856f6fc

Browse files
authored
Merge pull request #1043 from Yelp/u/kkasp/TRON-2414-set-extra-safe-transaction-limit
Drop under the limit a bit more for maximum safety
2 parents 211bc9b + 55c4bff commit 856f6fc

File tree

4 files changed

+34
-6
lines changed

4 files changed

+34
-6
lines changed

tron/config/config_parse.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -858,12 +858,18 @@ class ValidateStatePersistence(Validator):
858858
config_class = schema.ConfigState
859859
defaults = {
860860
"buffer_size": 1,
861+
"dynamodb_region": None,
862+
"table_name": None,
863+
"max_transact_write_items": 8,
861864
}
862865

863866
validators = {
864867
"name": valid_string,
865868
"store_type": config_utils.build_real_enum_validator(schema.StatePersistenceTypes),
866869
"buffer_size": valid_int,
870+
"dynamodb_region": valid_string,
871+
"table_name": valid_string,
872+
"max_transact_write_items": valid_int,
867873
}
868874

869875
def post_validation(self, config, config_context):
@@ -873,6 +879,22 @@ def post_validation(self, config, config_context):
873879
path = config_context.path
874880
raise ConfigError("%s buffer_size must be >= 1." % path)
875881

882+
store_type = config.get("store_type")
883+
884+
if store_type == schema.StatePersistenceTypes.dynamodb.value:
885+
if not config.get("table_name"):
886+
raise ConfigError(f"{config_context.path} table_name is required when store_type is 'dynamodb'")
887+
if not config.get("dynamodb_region"):
888+
raise ConfigError(f"{config_context.path} dynamodb_region is required when store_type is 'dynamodb'")
889+
890+
max_transact = config.get("max_transact_write_items")
891+
892+
# Upper bound is based on boto3 transact_write_items limit
893+
if not 1 <= max_transact <= 100:
894+
raise ConfigError(
895+
f"{config_context.path} max_transact_write_items must be between 1 and 100, got {max_transact}"
896+
)
897+
876898

877899
valid_state_persistence = ValidateStatePersistence()
878900

tron/config/schema.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ def from_dict(cls, data: Dict[str, Any]):
106106
"buffer_size",
107107
"dynamodb_region",
108108
"table_name",
109+
"max_transact_write_items",
109110
],
110111
)
111112

tron/serialize/runstate/dynamodb_state_store.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,12 @@
4242
# infinite loops in the case where a key is truly unprocessable. We allow for more retries than it should
4343
# ever take to avoid failing restores due to transient issues.
4444
MAX_UNPROCESSED_KEYS_RETRIES = 30
45-
# While the AWS maximum is 100, we set this to 10 to avoid hitting the 4MB limit for the transaction. See DAR-2637
46-
MAX_TRANSACT_WRITE_ITEMS = 10
4745
log = logging.getLogger(__name__)
4846
T = TypeVar("T")
4947

5048

5149
class DynamoDBStateStore:
52-
def __init__(self, name, dynamodb_region, stopping=False) -> None:
50+
def __init__(self, name, dynamodb_region, stopping=False, max_transact_write_items=8) -> None:
5351
# Standard mode includes an exponential backoff by a base factor of 2 for a
5452
# maximum backoff time of 20 seconds (min(b*r^i, MAX_BACKOFF) where b is a
5553
# random number between 0 and 1 and r is the base factor of 2). This might
@@ -70,6 +68,7 @@ def __init__(self, name, dynamodb_region, stopping=False) -> None:
7068
self.dynamodb_region = dynamodb_region
7169
self.table = self.dynamodb.Table(name)
7270
self.stopping = stopping
71+
self.max_transact_write_items = max_transact_write_items
7372
self.save_queue: OrderedDict = OrderedDict()
7473
self.save_lock = threading.Lock()
7574
self.save_errors = 0
@@ -336,7 +335,7 @@ def _save_loop(self):
336335

337336
def __setitem__(self, key: str, value: Tuple[bytes, str]) -> None:
338337
"""
339-
Partition the item and write up to MAX_TRANSACT_WRITE_ITEMS
338+
Partition the item and write up to self.max_transact_write_items
340339
partitions atomically using TransactWriteItems.
341340
342341
The function examines the size of pickled_val and json_val,
@@ -392,7 +391,7 @@ def __setitem__(self, key: str, value: Tuple[bytes, str]) -> None:
392391

393392
# We want to write the items when we've either reached the max number of items
394393
# for a transaction, or when we're done processing all partitions
395-
if len(items) == MAX_TRANSACT_WRITE_ITEMS or index == max_partitions - 1:
394+
if len(items) == self.max_transact_write_items or index == max_partitions - 1:
396395
try:
397396
self.client.transact_write_items(TransactItems=items)
398397
items = []
@@ -401,6 +400,11 @@ def __setitem__(self, key: str, value: Tuple[bytes, str]) -> None:
401400
name="tron.dynamodb.setitem",
402401
delta=time.time() - start,
403402
)
403+
# TODO: TRON-2419 - We should be smarter here. While each batch is atomic, a sufficiently
404+
# large JobRun could exceed the max size of a single transaction (e.g. a JobRun with 12
405+
# partitions). While one batch might succeed (saving partitions 1-8), the next one (for
406+
# partitions 9-12) might fail. We should to handle this case or we will see more hanging
407+
# chads in DynamoDB.
404408
log.exception(f"Failed to save partition for key: {key}")
405409
raise
406410
timer(

tron/serialize/runstate/statemanager.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ def from_config(cls, persistence_config):
4848
if store_type == schema.StatePersistenceTypes.dynamodb:
4949
table_name = persistence_config.table_name
5050
dynamodb_region = persistence_config.dynamodb_region
51-
store = DynamoDBStateStore(table_name, dynamodb_region)
51+
max_transact_write_items = persistence_config.max_transact_write_items
52+
store = DynamoDBStateStore(table_name, dynamodb_region, max_transact_write_items=max_transact_write_items)
5253

5354
buffer = StateSaveBuffer(buffer_size)
5455
return PersistentStateManager(store, buffer)

0 commit comments

Comments
 (0)