Skip to content

Commit 413788d

Browse files
authored
Merge pull request #24 from AllenInstitute/feature/DT-7131/add-delete-support-to-context-manager
DT-7131 Add DELETE support to context manager
2 parents eee650d + edf1728 commit 413788d

File tree

5 files changed

+224
-12
lines changed

5 files changed

+224
-12
lines changed

src/aibs_informatics_aws_lambda/handlers/demand/context_manager.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from aibs_informatics_core.utils.os_operations import write_env_file
3636
from aibs_informatics_core.utils.units import BYTES_PER_GIBIBYTE
3737

38+
from aibs_informatics_aws_lambda.handlers.data_sync.model import RemoveDataPathsRequest
3839
from aibs_informatics_aws_lambda.handlers.demand.model import (
3940
ContextManagerConfiguration,
4041
EnvFileWriteMode,
@@ -296,6 +297,24 @@ def post_execution_data_sync_requests(self) -> List[PrepareBatchDataSyncRequest]
296297
)
297298
return requests
298299

300+
@property
301+
def post_execution_remove_data_paths_requests(self) -> List[RemoveDataPathsRequest]:
302+
"""Generates remove data paths requests for post-execution data sync
303+
304+
Returns:
305+
List[RemoveDataPathsRequest]: list of remove data paths requests
306+
"""
307+
requests = []
308+
if self.configuration.cleanup_inputs:
309+
input_paths = []
310+
for param in self.demand_execution.execution_parameters.downloadable_job_param_inputs:
311+
input_paths.append(get_efs_path(Path(param.value), True, self.efs_mount_points))
312+
requests.append(RemoveDataPathsRequest(paths=input_paths))
313+
314+
if self.configuration.cleanup_working_dir:
315+
requests.append(RemoveDataPathsRequest(paths=[self.efs_working_path]))
316+
return requests
317+
299318
@classmethod
300319
def from_demand_execution(
301320
cls,

src/aibs_informatics_aws_lambda/handlers/demand/model.py

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from aibs_informatics_core.models.demand_execution import DemandExecution
1515

1616
from aibs_informatics_aws_lambda.handlers.batch.model import CreateDefinitionAndPrepareArgsRequest
17+
from aibs_informatics_aws_lambda.handlers.data_sync.model import RemoveDataPathsRequest
1718

1819

1920
@dataclass
@@ -54,6 +55,23 @@ class EnvFileWriteMode(str, Enum):
5455

5556
@dataclass
5657
class DataSyncConfiguration(SchemaModel):
58+
"""
59+
Configuration for how data sync should be run.
60+
61+
Attributes:
62+
temporary_request_payload_path (Optional[S3Path]):
63+
The path to the temporary request payload. This is useful if many requests will
64+
be generated and the payloads are too large to be passed in the state machine
65+
context object.
66+
force (bool):
67+
If True, the data will be synced even if it already exists and satisfies
68+
the checksum or size check.
69+
size_only (bool):
70+
If True, only the size of the data will be checked when determining if the data
71+
should be synced. If False, the checksum of the data will also be checked.
72+
73+
"""
74+
5775
temporary_request_payload_path: Optional[S3Path] = custom_field(
5876
default=None, mm_field=S3Path.as_mm_field()
5977
)
@@ -63,11 +81,47 @@ class DataSyncConfiguration(SchemaModel):
6381

6482
@dataclass
6583
class ContextManagerConfiguration(SchemaModel):
66-
isolate_inputs: bool = custom_field(default=False)
84+
"""
85+
Configuration for managing the context in which a demand execution runs.
86+
87+
Attributes:
88+
isolate_inputs (bool):
89+
If True, input data will be written to working directory instead of the shared
90+
scratch directory. This is useful if:
91+
- you want to ensure that input data is not modified by other processes,
92+
- can be modified by the demand execution,
93+
- and can be cleaned up immediately after completion
94+
(RO shared scratch data is not cleaned up).
95+
cleanup_inputs (bool):
96+
If True, input data will be cleaned up after execution. Note that this may
97+
not work as expected if isolate_inputs is False, as inputs are typically
98+
mounted as read-only. This will be clear when defining infrastructure code.
99+
cleanup_working_dir (bool):
100+
If True, the working directory will be cleaned up after execution. This is useful if
101+
you want to ensure that no data is left behind in the working directory.
102+
env_file_write_mode (EnvFileWriteMode):
103+
Determines when environment files should be written instead of being added to the list
104+
of env variables in batch job definition. Options are NEVER, ALWAYS, and IF_REQUIRED.
105+
IF_REQUIRED is experimental and attempts to write env files only if the env variables
106+
exceed a certain length.
107+
input_data_sync_configuration (DataSyncConfiguration):
108+
Configuration for syncing input data. The force flag and size_only flag are used to
109+
determine how the data is synced. The temporary_request_payload_path should be used
110+
if many requests will be generated and the payloads are too large to be passed
111+
in the state machine context object.
112+
output_data_sync_configuration (DataSyncConfiguration):
113+
Configuration for syncing output data. The force flag and size_only flag are used to
114+
determine how the data is synced. The temporary_request_payload_path should be used
115+
if many requests will be generated and the payloads are too large to be passed
116+
in the state machine context object.
117+
"""
118+
119+
isolate_inputs: bool = custom_field(default=True)
120+
cleanup_inputs: bool = custom_field(default=True)
121+
cleanup_working_dir: bool = custom_field(default=True)
67122
env_file_write_mode: EnvFileWriteMode = custom_field(
68123
mm_field=EnumField(EnvFileWriteMode), default=EnvFileWriteMode.ALWAYS
69124
)
70-
# data sync configurations
71125
input_data_sync_configuration: DataSyncConfiguration = custom_field(
72126
default_factory=DataSyncConfiguration, mm_field=DataSyncConfiguration.as_mm_field()
73127
)
@@ -124,6 +178,9 @@ class DemandExecutionCleanupConfigs(SchemaModel):
124178
]
125179
)
126180
)
181+
remove_data_paths_requests: List[RemoveDataPathsRequest] = custom_field(
182+
mm_field=ListField(RemoveDataPathsRequest.as_mm_field()), default_factory=list
183+
)
127184

128185

129186
@dataclass

src/aibs_informatics_aws_lambda/handlers/demand/scaffolding.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
)
1414
from aibs_informatics_aws_utils.efs import MountPointConfiguration
1515
from aibs_informatics_core.env import EnvBase
16-
from aibs_informatics_core.models.data_sync import DataSyncRequest
1716

1817
from aibs_informatics_aws_lambda.common.handler import LambdaHandler
1918
from aibs_informatics_aws_lambda.handlers.demand.context_manager import (
@@ -109,7 +108,8 @@ def handle(self, request: PrepareDemandScaffoldingRequest) -> PrepareDemandScaff
109108
data_sync_requests=[
110109
sync_request.from_dict(sync_request.to_dict())
111110
for sync_request in context_manager.post_execution_data_sync_requests
112-
]
111+
],
112+
remove_data_paths_requests=context_manager.post_execution_remove_data_paths_requests,
113113
)
114114

115115
return PrepareDemandScaffoldingResponse(

test/aibs_informatics_aws_lambda/handlers/demand/test_context_manager.py

Lines changed: 124 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
from datetime import datetime
22
from pathlib import Path
3-
from test.base import AwsBaseTest, does_not_raise
3+
from test.base import AwsBaseTest
44
from typing import Dict, Optional, Union
55

66
import boto3
7-
from aibs_informatics_aws_utils.batch import BatchJobBuilder
87
from aibs_informatics_aws_utils.constants.efs import (
98
EFS_ROOT_ACCESS_POINT_NAME,
109
EFS_ROOT_PATH,
@@ -32,7 +31,7 @@
3231
from aibs_informatics_core.models.unique_ids import UniqueID
3332
from aibs_informatics_core.utils.hashing import uuid_str
3433
from moto import mock_efs, mock_sts
35-
from pytest import fixture, mark, param
34+
from pytest import fixture
3635

3736
from aibs_informatics_aws_lambda.handlers.demand.context_manager import (
3837
BatchEFSConfiguration,
@@ -650,7 +649,13 @@ def test__pre_execution_data_sync_requests__single_input_generates_list(self):
650649
command=["cmd"], inputs=["X"], params={"X": S3_URI}
651650
)
652651
)
653-
decm = DemandExecutionContextManager.from_demand_execution(demand_execution, self.env_base)
652+
decm = DemandExecutionContextManager.from_demand_execution(
653+
demand_execution,
654+
self.env_base,
655+
ContextManagerConfiguration(
656+
isolate_inputs=False, cleanup_inputs=False, cleanup_working_dir=False
657+
),
658+
)
654659
actual = decm.pre_execution_data_sync_requests
655660

656661
expected = {
@@ -678,9 +683,12 @@ def test__pre_execution_data_sync_requests__single_input_generates_list__with_da
678683
demand_execution,
679684
self.env_base,
680685
ContextManagerConfiguration(
686+
isolate_inputs=False,
687+
cleanup_inputs=False,
688+
cleanup_working_dir=False,
681689
input_data_sync_configuration=DataSyncConfiguration(
682690
temporary_request_payload_path=S3URI("s3://bucket/override_prefix")
683-
)
691+
),
684692
),
685693
)
686694
actual = decm.pre_execution_data_sync_requests
@@ -766,6 +774,117 @@ def test__post_execution_data_sync_requests__single_output_generates_list__with_
766774
actual_dict.get("temporary_request_payload_path"),
767775
)
768776

777+
def test__post_execution_remove_data_paths_requests__no_cleanup_data(self):
778+
demand_execution = get_any_demand_execution(
779+
execution_parameters=DemandExecutionParameters(
780+
command=["cmd"],
781+
inputs=["X", "Y"],
782+
params={
783+
"X": {
784+
"remote": S3_URI + "1",
785+
"local": "X",
786+
},
787+
"Y": {
788+
"remote": S3_URI + "2",
789+
"local": "Y",
790+
},
791+
},
792+
)
793+
)
794+
decm = DemandExecutionContextManager.from_demand_execution(
795+
demand_execution,
796+
self.env_base,
797+
ContextManagerConfiguration(
798+
isolate_inputs=True, cleanup_inputs=False, cleanup_working_dir=False
799+
),
800+
)
801+
actual = decm.post_execution_remove_data_paths_requests
802+
803+
self.assertEqual(len(actual), 0)
804+
805+
def test__post_execution_remove_data_paths_requests__cleanup_inputs(self):
806+
demand_execution = get_any_demand_execution(
807+
execution_parameters=DemandExecutionParameters(
808+
command=["cmd"],
809+
inputs=["X", "Y"],
810+
params={
811+
"X": {
812+
"remote": S3_URI + "1",
813+
"local": "X",
814+
},
815+
"Y": {
816+
"remote": S3_URI + "2",
817+
"local": "Y",
818+
},
819+
},
820+
)
821+
)
822+
decm = DemandExecutionContextManager.from_demand_execution(
823+
demand_execution,
824+
self.env_base,
825+
ContextManagerConfiguration(
826+
isolate_inputs=True, cleanup_inputs=True, cleanup_working_dir=False
827+
),
828+
)
829+
actual = decm.post_execution_remove_data_paths_requests
830+
831+
expected = [
832+
{
833+
"paths": [
834+
f"{self.gwo_file_system_id}:/scratch/{demand_execution.execution_id}/X",
835+
f"{self.gwo_file_system_id}:/scratch/{demand_execution.execution_id}/Y",
836+
]
837+
},
838+
]
839+
self.assertTrue(len(actual) == 1)
840+
self.assertEqual(len(actual[0].paths), 2)
841+
self.assertListEqual(expected[0]["paths"], actual[0].paths)
842+
843+
def test__post_execution_remove_data_paths_requests__cleanup_inputs_and_working_dir(self):
844+
demand_execution = get_any_demand_execution(
845+
execution_parameters=DemandExecutionParameters(
846+
command=["cmd"],
847+
inputs=["X", "Y"],
848+
params={
849+
"X": {
850+
"remote": S3_URI + "1",
851+
"local": "X",
852+
},
853+
"Y": {
854+
"remote": S3_URI + "2",
855+
"local": "Y",
856+
},
857+
},
858+
)
859+
)
860+
decm = DemandExecutionContextManager.from_demand_execution(
861+
demand_execution,
862+
self.env_base,
863+
ContextManagerConfiguration(
864+
isolate_inputs=True, cleanup_inputs=True, cleanup_working_dir=True
865+
),
866+
)
867+
actual = decm.post_execution_remove_data_paths_requests
868+
869+
expected = [
870+
{
871+
"paths": [
872+
f"{self.gwo_file_system_id}:/scratch/{demand_execution.execution_id}/X",
873+
f"{self.gwo_file_system_id}:/scratch/{demand_execution.execution_id}/Y",
874+
]
875+
},
876+
{
877+
"paths": [
878+
f"{self.gwo_file_system_id}:/scratch/{demand_execution.execution_id}",
879+
]
880+
},
881+
]
882+
self.assertTrue(len(actual) == 2)
883+
self.assertEqual(len(actual[0].paths), 2)
884+
self.assertEqual(len(actual[1].paths), 1)
885+
self.assertListEqual(expected[0]["paths"], actual[0].paths)
886+
self.assertListEqual(expected[1]["paths"], actual[1].paths)
887+
769888
def test__batch_job_queue_name__works_for_valid_demand_execution(self):
770889
demand_execution = get_any_demand_execution(
771890
execution_parameters=DemandExecutionParameters(

test/aibs_informatics_aws_lambda/handlers/demand/test_model.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from aibs_informatics_core.models.email_address import EmailAddress
66
from pytest import mark, param, raises
77

8+
from aibs_informatics_aws_lambda.handlers.data_sync.model import RemoveDataPathsRequest
89
from aibs_informatics_aws_lambda.handlers.demand.model import (
910
DataSyncRequest,
1011
DemandExecutionCleanupConfigs,
@@ -25,7 +26,7 @@
2526
destination_path=S3Path("s3://bucket/dst"),
2627
temporary_request_payload_path=S3Path("s3://bucket/tmp"),
2728
)
28-
]
29+
],
2930
),
3031
{
3132
"data_sync_requests": [
@@ -42,7 +43,8 @@
4243
"source_path": "s3://bucket/src",
4344
"temporary_request_payload_path": "s3://bucket/tmp",
4445
}
45-
]
46+
],
47+
"remove_data_paths_requests": [],
4648
},
4749
does_not_raise(),
4850
id="Handles single PrepareBatchDataSyncRequest",
@@ -70,11 +72,26 @@
7072
"size_only": False,
7173
"source_path": "s3://bucket/src",
7274
}
73-
]
75+
],
76+
"remove_data_paths_requests": [],
7477
},
7578
does_not_raise(),
7679
id="Handles single ambiguous ds request",
7780
),
81+
param(
82+
DemandExecutionCleanupConfigs(
83+
data_sync_requests=[],
84+
remove_data_paths_requests=[
85+
RemoveDataPathsRequest(paths=["efs://path1", "efs://path2"])
86+
],
87+
),
88+
{
89+
"data_sync_requests": [],
90+
"remove_data_paths_requests": [{"paths": ["efs://path1", "efs://path2"]}],
91+
},
92+
does_not_raise(),
93+
id="Handles remove data path request, empty data sync requests",
94+
),
7895
],
7996
)
8097
def test__DemandExecutionCleanupConfigs__serialization(

0 commit comments

Comments
 (0)