Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parwinder/bcda-8627-archive cleanup job migration #1042

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
223d677
use River to schedule cleanup jobs: initial commit
bhagatparwinder Feb 4, 2025
e0d5716
Cleanup, added RunOnStart option
bhagatparwinder Feb 4, 2025
fd31f48
add uniqueness, empty struct, generate txID, fix dirs
bhagatparwinder Feb 5, 2025
41de2d2
add CleanupJobWorker to river
bhagatparwinder Feb 5, 2025
440d75b
Add comments to cleanup jobs
bhagatparwinder Feb 5, 2025
76d330e
update go.mod
bhagatparwinder Feb 5, 2025
15dce4d
unit tests initial commit
bhagatparwinder Feb 5, 2025
504f053
Remove actions and cleanup/archive functions from CLI
bhagatparwinder Feb 5, 2025
afb10a8
Use CLI functions directly, avoid circular dependency
bhagatparwinder Feb 5, 2025
7fc8cb0
update go.mod
bhagatparwinder Feb 5, 2025
5753ddd
update tests
bhagatparwinder Feb 6, 2025
0f910b3
updated Work test, checking method called with correct params
bhagatparwinder Feb 6, 2025
105d28f
extract CleanupJob into cleanup package
bhagatparwinder Feb 6, 2025
c350b60
Updating code and tests to inject dependecies and mocks
bhagatparwinder Feb 6, 2025
b1aa69b
fix: update cleanup_test.go
bhagatparwinder Feb 6, 2025
a7b8b46
set directories in env
bhagatparwinder Feb 6, 2025
cb52fa2
fix env variables
bhagatparwinder Feb 6, 2025
e9d59ad
Moving Work to River, cleanup only does business logic
bhagatparwinder Feb 7, 2025
6a38975
Get slack token, add slack notifications
bhagatparwinder Feb 7, 2025
f51fd37
Add functions to start/teardown test suite
bhagatparwinder Feb 10, 2025
e41ee90
remove testApp
bhagatparwinder Feb 10, 2025
cd5165f
create DB connection for tests
bhagatparwinder Feb 10, 2025
53f6f6c
Merge branch 'main' into parwinder/BCDA-8627-archive-cleanup-job-migr…
bhagatparwinder Feb 10, 2025
721d06d
fix 4 of 6 failing tests
bhagatparwinder Feb 11, 2025
defcec7
update unit tests
bhagatparwinder Feb 11, 2025
ece6313
fix remaining unit tests
bhagatparwinder Feb 11, 2025
751523a
add unit tests for getCutOffTime
bhagatparwinder Feb 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 0 additions & 156 deletions bcda/bcdacli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
"syscall"
"time"
Expand All @@ -39,7 +38,6 @@ import (
"github.com/CMSgov/bcda-app/conf"
"github.com/CMSgov/bcda-app/log"
"github.com/CMSgov/bcda-app/optout"
"github.com/sirupsen/logrus"

"github.com/pborman/uuid"
"github.com/pkg/errors"
Expand Down Expand Up @@ -74,7 +72,6 @@ func setUpApp() *cli.App {
fmt.Println("Error converting FILE_ARCHIVE_THRESHOLD_HR to uint", err)
}
var acoName, acoCMSID, acoID, accessToken, acoSize, filePath, fileSource, s3Endpoint, assumeRoleArn, environment, groupID, groupName, ips, fileType, alrFile string
var thresholdHr int
var httpPort, httpsPort int
app.Commands = []cli.Command{
{
Expand Down Expand Up @@ -286,75 +283,6 @@ func setUpApp() *cli.App {
return nil
},
},
{
Name: "archive-job-files",
bhagatparwinder marked this conversation as resolved.
Show resolved Hide resolved
Category: "Cleanup",
Usage: "Update job statuses and move files to an inaccessible location",
Flags: []cli.Flag{
cli.IntFlag{
Name: "threshold",
Value: 24,
Usage: constants.CliArchDesc,
EnvVar: "ARCHIVE_THRESHOLD_HR",
Destination: &thresholdHr,
},
},
Action: func(c *cli.Context) error {
cutoff := time.Now().Add(-time.Hour * time.Duration(thresholdHr))
return archiveExpiring(cutoff)
},
},
{
Name: constants.CleanupArchArg,
Category: "Cleanup",
Usage: constants.CliRemoveArchDesc,
Flags: []cli.Flag{
cli.IntFlag{
Name: "threshold",
Usage: constants.CliArchDesc,
Destination: &thresholdHr,
},
},
Action: func(c *cli.Context) error {
cutoff := time.Now().Add(-time.Hour * time.Duration(thresholdHr))
return cleanupJob(cutoff, models.JobStatusArchived, models.JobStatusExpired,
conf.GetEnv("FHIR_ARCHIVE_DIR"), conf.GetEnv("FHIR_STAGING_DIR"))
},
},
{
Name: "cleanup-failed",
Category: "Cleanup",
Usage: constants.CliRemoveArchDesc,
Flags: []cli.Flag{
cli.IntFlag{
Name: "threshold",
Usage: constants.CliArchDesc,
Destination: &thresholdHr,
},
},
Action: func(c *cli.Context) error {
cutoff := time.Now().Add(-(time.Hour * time.Duration(thresholdHr)))
return cleanupJob(cutoff, models.JobStatusFailed, models.JobStatusFailedExpired,
conf.GetEnv("FHIR_STAGING_DIR"), conf.GetEnv("FHIR_PAYLOAD_DIR"))
},
},
{
Name: "cleanup-cancelled",
Category: "Cleanup",
Usage: constants.CliRemoveArchDesc,
Flags: []cli.Flag{
cli.IntFlag{
Name: "threshold",
Usage: constants.CliRemoveArchDesc,
Destination: &thresholdHr,
},
},
Action: func(c *cli.Context) error {
cutoff := time.Now().Add(-(time.Hour * time.Duration(thresholdHr)))
return cleanupJob(cutoff, models.JobStatusCancelled, models.JobStatusCancelledExpired,
conf.GetEnv("FHIR_STAGING_DIR"), conf.GetEnv("FHIR_PAYLOAD_DIR"))
},
},
{
Name: "import-cclf-directory",
Category: constants.CliDataImpCategory,
Expand Down Expand Up @@ -748,90 +676,6 @@ func revokeAccessToken(accessToken string) error {
return auth.GetProvider().RevokeAccessToken(accessToken)
}

func archiveExpiring(maxDate time.Time) error {
log.API.Info("Archiving expiring job files...")

jobs, err := r.GetJobsByUpdateTimeAndStatus(context.Background(),
time.Time{}, maxDate, models.JobStatusCompleted)
if err != nil {
log.API.Error(err)
return err
}

var lastJobError error
for _, j := range jobs {
id := j.ID
jobPayloadDir := fmt.Sprintf("%s/%d", conf.GetEnv("FHIR_PAYLOAD_DIR"), id)
_, err = os.Stat(jobPayloadDir)
jobPayloadDirExist := err == nil
jobArchiveDir := fmt.Sprintf("%s/%d", conf.GetEnv("FHIR_ARCHIVE_DIR"), id)

if jobPayloadDirExist {
err = os.Rename(jobPayloadDir, jobArchiveDir)
if err != nil {
log.API.Error(err)
lastJobError = err
continue
}
}

j.Status = models.JobStatusArchived
err = r.UpdateJob(context.Background(), *j)
if err != nil {
log.API.Error(err)
lastJobError = err
}
}

return lastJobError
}

func cleanupJob(maxDate time.Time, currentStatus, newStatus models.JobStatus, rootDirsToClean ...string) error {
jobs, err := r.GetJobsByUpdateTimeAndStatus(context.Background(),
time.Time{}, maxDate, currentStatus)
if err != nil {
return err
}

if len(jobs) == 0 {
log.API.Infof("No %s job files to clean", currentStatus)
return nil
}

for _, job := range jobs {
if err := cleanupJobData(job.ID, rootDirsToClean...); err != nil {
log.API.Errorf("Unable to cleanup directories %s", err)
continue
}

job.Status = newStatus
err = r.UpdateJob(context.Background(), *job)
if err != nil {
log.API.Errorf("Failed to update job status to %s %s", newStatus, err)
continue
}

log.API.WithFields(logrus.Fields{
"job_began": job.CreatedAt,
"files_removed": time.Now(),
"job_id": job.ID,
}).Infof("Files cleaned from %s and job status set to %s", rootDirsToClean, newStatus)
}

return nil
}

func cleanupJobData(jobID uint, rootDirs ...string) error {
for _, rootDirToClean := range rootDirs {
dir := filepath.Join(rootDirToClean, strconv.FormatUint(uint64(jobID), 10))
if err := os.RemoveAll(dir); err != nil {
return fmt.Errorf("unable to remove %s because %s", dir, err)
}
}

return nil
}

func setDenylistState(cmsID string, td *models.Termination) error {
aco, err := r.GetACOByCMSID(context.Background(), cmsID)
if err != nil {
Expand Down
Loading
Loading