Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ public void addAutoDeleteS3Objects(Construct scope, IBucket bucket) {
autoDeleteS3Stack.addAutoDeleteS3Objects(scope, bucket);
}

public void addAutoStopEcsClusterTasks(Construct scope, ICluster cluster) {
autoStopEcsStack.addAutoStopEcsClusterTasks(scope, cluster);
public void addAutoStopEcsClusterTasksAfterTaskCreatorIsDeleted(Construct scope, ICluster cluster, IFunction taskCreator) {
autoStopEcsStack.addAutoStopEcsClusterTasksAfterTaskCreatorIsDeleted(scope, cluster, taskCreator);
}

public AutoStopEcsClusterTasksStack getAutoStopEcsStack() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,12 @@ public BulkExportTaskResources(
this.props = props;
this.instanceProperties = props.getInstanceProperties();
this.jobsQueue = jobsQueue;
ecsClusterForBulkExportTasks(coreStacks, jarsBucket, resultsBucket);
lambdaToCreateTasks(coreStacks, lambdaCode);
Cluster cluster = ecsClusterForBulkExportTasks(coreStacks, jarsBucket, resultsBucket);
IFunction taskCreator = lambdaToCreateTasks(coreStacks, lambdaCode);
coreStacks.addAutoStopEcsClusterTasksAfterTaskCreatorIsDeleted(stack, cluster, taskCreator);
}

private void lambdaToCreateTasks(SleeperCoreStacks coreStacks, SleeperLambdaCode lambdaCode) {
private IFunction lambdaToCreateTasks(SleeperCoreStacks coreStacks, SleeperLambdaCode lambdaCode) {
String instanceId = Utils.cleanInstanceId(instanceProperties);
String functionName = String.join("-", "sleeper",
instanceId, "bulk-export-tasks-creator");
Expand Down Expand Up @@ -128,9 +129,11 @@ private void lambdaToCreateTasks(SleeperCoreStacks coreStacks, SleeperLambdaCode
.build();
instanceProperties.set(BULK_EXPORT_TASK_CREATION_LAMBDA_FUNCTION, handler.getFunctionName());
instanceProperties.set(BULK_EXPORT_TASK_CREATION_CLOUDWATCH_RULE, rule.getRuleName());

return handler;
}

private void ecsClusterForBulkExportTasks(
private Cluster ecsClusterForBulkExportTasks(
SleeperCoreStacks coreStacks, IBucket jarsBucket, IBucket resultsBucket) {
VpcLookupOptions vpcLookupOptions = VpcLookupOptions.builder()
.vpcId(instanceProperties.get(VPC_ID))
Expand Down Expand Up @@ -167,8 +170,7 @@ private void ecsClusterForBulkExportTasks(
.build();
new CfnOutput(stack, BULK_EXPORT_CLUSTER_NAME, bulkExportClusterProps);

coreStacks.addAutoStopEcsClusterTasks(stack, cluster);

return cluster;
}

private static PolicyStatement runTasksPolicyStatement() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ public CompactionTaskResources(Stack stack,
this.props = props;
this.instanceProperties = props.getInstanceProperties();

ecsClusterForCompactionTasks(coreStacks, jarsBucket, lambdaCode, jobResources);
lambdaToCreateCompactionTasks(coreStacks, lambdaCode, jobResources.getCompactionJobsQueue());
Cluster cluster = ecsClusterForCompactionTasks(coreStacks, jarsBucket, lambdaCode, jobResources);
IFunction taskCreator = lambdaToCreateCompactionTasks(coreStacks, lambdaCode, jobResources.getCompactionJobsQueue());
coreStacks.addAutoStopEcsClusterTasksAfterTaskCreatorIsDeleted(stack, cluster, taskCreator);

// Allow running compaction tasks
coreStacks.getInvokeCompactionPolicyForGrants().addStatements(runTasksPolicyStatement());

}

private void ecsClusterForCompactionTasks(SleeperCoreStacks coreStacks, IBucket jarsBucket, SleeperLambdaCode taskCreatorJar, CompactionJobResources jobResources) {
private Cluster ecsClusterForCompactionTasks(SleeperCoreStacks coreStacks, IBucket jarsBucket, SleeperLambdaCode taskCreatorJar, CompactionJobResources jobResources) {
VpcLookupOptions vpcLookupOptions = VpcLookupOptions.builder()
.vpcId(instanceProperties.get(VPC_ID))
.build();
Expand Down Expand Up @@ -152,11 +152,11 @@ private void ecsClusterForCompactionTasks(SleeperCoreStacks coreStacks, IBucket
.build();
new CfnOutput(stack, COMPACTION_CLUSTER_NAME, compactionClusterProps);

coreStacks.addAutoStopEcsClusterTasks(stack, cluster);
return cluster;
}

@SuppressFBWarnings("NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE")
private void lambdaToCreateCompactionTasks(
private IFunction lambdaToCreateCompactionTasks(
SleeperCoreStacks coreStacks, SleeperLambdaCode lambdaCode, Queue compactionJobsQueue) {
String functionName = String.join("-", "sleeper",
Utils.cleanInstanceId(instanceProperties), "compaction-tasks-creator");
Expand Down Expand Up @@ -196,6 +196,8 @@ private void lambdaToCreateCompactionTasks(
.build();
instanceProperties.set(COMPACTION_TASK_CREATION_LAMBDA_FUNCTION, handler.getFunctionName());
instanceProperties.set(COMPACTION_TASK_CREATION_CLOUDWATCH_RULE, rule.getRuleName());

return handler;
}

private static PolicyStatement runTasksPolicyStatement() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,26 @@ private void createLambda(InstanceProperties instanceProperties, SleeperJarsInBu
/**
* Adds a custom resource to stop tasks in an ECS cluster.
*
* @param scope the stack to add the custom resource to
* @param cluster the ECS cluster
* @param scope the stack to add the custom resource to
* @param cluster the ECS cluster
* @param taskCreator the lambda function that starts tasks in the cluster
*/
public void addAutoStopEcsClusterTasks(Construct scope, ICluster cluster) {
public void addAutoStopEcsClusterTasksAfterTaskCreatorIsDeleted(Construct scope, ICluster cluster, IFunction taskCreator) {
CustomResource customResource = addAutoStopEcsClusterTasks(scope, cluster);

// This dependency means that during teardown the task creator lambda will be deleted before ECS tasks are stopped.
// This is important otherwise more tasks may be created as they are being stopped.
taskCreator.getNode().addDependency(customResource);
}

/**
* Adds a custom resource to stop tasks in an ECS cluster.
*
* @param scope the stack to add the custom resource to
* @param cluster the ECS cluster
* @return the custom resource that will stop tasks when it is deleted
*/
public CustomResource addAutoStopEcsClusterTasks(Construct scope, ICluster cluster) {

String id = cluster.getNode().getId() + "-AutoStop";

Expand All @@ -110,8 +126,10 @@ public void addAutoStopEcsClusterTasks(Construct scope, ICluster cluster) {
.serviceToken(provider.getServiceToken())
.build();

// This dependency means that ECS tasks will be stopped before the cluster is deleted.
customResource.getNode().addDependency(cluster);

return customResource;
}

}
20 changes: 8 additions & 12 deletions java/cdk/src/main/java/sleeper/cdk/stack/ingest/IngestStack.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,10 @@ public IngestStack(
IBucket jarsBucket = Bucket.fromBucketName(this, "JarsBucket", props.getJars().bucketName());
SleeperLambdaCode lambdaCode = props.getJars().lambdaCode(jarsBucket);

// SQS queue for ingest jobs
sqsQueueForIngestJobs(coreStacks);

// ECS cluster for ingest tasks
ecsClusterForIngestTasks(jarsBucket, coreStacks, ingestJobQueue, lambdaCode);

// Lambda to create ingest tasks
lambdaToCreateIngestTasks(coreStacks, ingestJobQueue, lambdaCode);
ingestJobQueue = sqsQueueForIngestJobs(coreStacks);
Cluster cluster = ecsClusterForIngestTasks(jarsBucket, coreStacks, ingestJobQueue, lambdaCode);
IFunction taskCreator = lambdaToCreateIngestTasks(coreStacks, ingestJobQueue, lambdaCode);
coreStacks.addAutoStopEcsClusterTasksAfterTaskCreatorIsDeleted(this, cluster, taskCreator);

Utils.addStackTagIfSet(this, instanceProperties);
}
Expand All @@ -135,7 +131,7 @@ private Queue sqsQueueForIngestJobs(SleeperCoreStacks coreStacks) {
.queue(ingestDLQ)
.build();
String queueName = String.join("-", "sleeper", instanceId, "IngestJobQ");
ingestJobQueue = Queue.Builder
Queue ingestJobQueue = Queue.Builder
.create(this, "IngestJobQueue")
.queueName(queueName)
.deadLetterQueue(ingestJobDeadLetterQueue)
Expand Down Expand Up @@ -229,12 +225,10 @@ private Cluster ecsClusterForIngestTasks(
.build();
new CfnOutput(this, INGEST_CONTAINER_ROLE_ARN, ingestRoleARNProps);

coreStacks.addAutoStopEcsClusterTasks(this, cluster);

return cluster;
}

private void lambdaToCreateIngestTasks(SleeperCoreStacks coreStacks, Queue ingestJobQueue, SleeperLambdaCode lambdaCode) {
private IFunction lambdaToCreateIngestTasks(SleeperCoreStacks coreStacks, Queue ingestJobQueue, SleeperLambdaCode lambdaCode) {

// Run tasks function
String functionName = String.join("-", "sleeper",
Expand Down Expand Up @@ -278,6 +272,8 @@ private void lambdaToCreateIngestTasks(SleeperCoreStacks coreStacks, Queue inges
.build();
instanceProperties.set(INGEST_LAMBDA_FUNCTION, handler.getFunctionName());
instanceProperties.set(INGEST_CLOUDWATCH_RULE, rule.getRuleName());

return handler;
}

public Queue getIngestJobQueue() {
Expand Down