@@ -41,7 +41,8 @@ import (
41
41
)
42
42
43
43
const (
44
- EdgeFunctionIngestQueue = "EDGE_FUNC_INGEST_TASK_QUEUE"
44
+ EdgeFunctionIngestQueue = "EDGE_FUNC_INGEST_TASK_QUEUE"
45
+ EdgeFunctionCleanupQueue = "EDGE_FUNC_CLEANUP_TASK_QUEUE"
45
46
)
46
47
47
48
const (
@@ -61,6 +62,7 @@ type IngestResult struct {
61
62
// RegisterWorkflows registers Edge Function workflows with the Temporal worker.
62
63
func RegisterWorkflows (w tworker.Worker ) {
63
64
w .RegisterWorkflow (EdgeFunctionIngestWorkflow )
65
+ w .RegisterWorkflow (EdgeFunctionCleanupWorkflow )
64
66
}
65
67
66
68
// EdgeFunctionIngestWorkflow is a Temporal workflow that ingests Edge Functions.
@@ -191,6 +193,35 @@ Finalize:
191
193
return nil
192
194
}
193
195
196
+ func EdgeFunctionCleanupWorkflow (ctx workflow.Context , in * EdgeFunctionIngestParams ) error {
197
+ opts := workflow.ActivityOptions {
198
+ StartToCloseTimeout : 10 * time .Minute ,
199
+ RetryPolicy : & temporal.RetryPolicy {
200
+ InitialInterval : time .Second ,
201
+ BackoffCoefficient : 2.0 ,
202
+ MaximumInterval : 10 * time .Second ,
203
+ MaximumAttempts : 3 ,
204
+ },
205
+ }
206
+ ctx = workflow .WithActivityOptions (ctx , opts )
207
+ log := tlog .With (workflow .GetLogger (ctx ), "Name" , in .Obj .Name , "ResourceVersion" , in .Obj .ResourceVersion )
208
+
209
+ var w * worker
210
+
211
+ if err := workflow .ExecuteActivity (ctx , w .CleanupStagedData , in ).Get (ctx , nil ); err != nil {
212
+ log .Error ("Failed to cleanup staged data" , "Error" , err )
213
+ return err
214
+ }
215
+
216
+ if err := workflow .ExecuteActivity (ctx , w .CleanupStoredData , in ).Get (ctx , nil ); err != nil {
217
+ log .Error ("Failed to cleanup stored data" , "Error" , err )
218
+ return err
219
+ }
220
+
221
+ log .Info ("Edge Function cleanup completed successfully" )
222
+ return nil
223
+ }
224
+
194
225
// Worker implements Temporal Activities for Edge Functions Ingest queue.
195
226
type worker struct {
196
227
k8s kubernetes.Interface
@@ -223,6 +254,7 @@ func (w *worker) RegisterActivities(tw tworker.Worker) {
223
254
tw .RegisterActivity (w .StoreGoPluginActivity )
224
255
tw .RegisterActivity (w .FinalizeActivity )
225
256
tw .RegisterActivity (w .CleanupStagedData )
257
+ tw .RegisterActivity (w .CleanupStoredData )
226
258
}
227
259
228
260
func (w * worker ) AddIngestConditionActivity (
@@ -872,6 +904,25 @@ func (w *worker) CleanupStagedData(
872
904
return nil
873
905
}
874
906
907
+ func (w * worker ) CleanupStoredData (
908
+ ctx context.Context ,
909
+ in * EdgeFunctionIngestParams ,
910
+ ) error {
911
+ log := tlog .With (activity .GetLogger (ctx ), "Name" , in .Obj .Name , "ResourceVersion" , in .Obj .ResourceVersion )
912
+ log .Info ("Cleaning up stored data" )
913
+
914
+ wid := activity .GetInfo (ctx ).WorkflowExecution .ID
915
+ storeDir := w .storeDir (wid )
916
+
917
+ if err := os .RemoveAll (storeDir ); err != nil {
918
+ log .Error ("Failed to remove store directory" , "Error" , err )
919
+ return err
920
+ }
921
+
922
+ log .Info ("Store directory cleaned up successfully" )
923
+ return nil
924
+ }
925
+
875
926
func hasCondition (conditions []metav1.Condition , condType string , status metav1.ConditionStatus ) bool {
876
927
for _ , c := range conditions {
877
928
if c .Type == condType && c .Status == status {
0 commit comments