Skip to content

Commit

Permalink
BCDA-393 Feature: Staging directory for FHIR payloads (#60)
Browse files Browse the repository at this point in the history
* initial cut at staging dir for FHIR payloads

* cleaned up code and unit test

* cleaned up test

* test check

* final cleanup
  • Loading branch information
tbellj authored Oct 31, 2018
1 parent a3ab614 commit 87b6000
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 46 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ bcdaworker/debug
test_results/*
.idea
bcda/swaggerui/swagger.json
bcdaworker/data/*.ndjson
bcdaworker/data/*
1 change: 1 addition & 0 deletions Dockerfiles/Dockerfile.bcda
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ ENV BB_CLIENT_CERT_FILE ../shared_files/bb-dev-test-cert.pem
ENV BB_CLIENT_KEY_FILE ../shared_files/bb-dev-test-key.pem
ENV BB_SERVER_LOCATION https://fhir.backend.bluebutton.hhsdevcloud.us
ENV FHIR_PAYLOAD_DIR ../bcdaworker/data
ENV FHIR_STAGING_DIR ../bcdaworker/tmpdata

WORKDIR /go/src/github.com/CMSgov/bcda-app/bcda
CMD ["fresh", "-r", "start-api"]
1 change: 1 addition & 0 deletions Dockerfiles/Dockerfile.bcdaworker
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ ENV BB_CLIENT_CERT_FILE ../shared_files/bb-dev-test-cert.pem
ENV BB_CLIENT_KEY_FILE ../shared_files/bb-dev-test-key.pem
ENV BB_SERVER_LOCATION https://fhir.backend.bluebutton.hhsdevcloud.us
ENV FHIR_PAYLOAD_DIR data
ENV FHIR_STAGING_DIR ../bcdaworker/tmpdata
ENV BB_TIMEOUT_MS 500

WORKDIR /go/src/github.com/CMSgov/bcda-app/bcdaworker
Expand Down
1 change: 1 addition & 0 deletions Dockerfiles/Dockerfile.unit_test
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ ENV BB_CLIENT_CERT_FILE ../shared_files/bb-dev-test-key.pem
ENV BB_CLIENT_KEY_FILE ../shared_files/bb-dev-test-key.pem
ENV BB_SERVER_LOCATION https://fhir.backend.bluebutton.hhsdevcloud.us
ENV FHIR_PAYLOAD_DIR ../bcdaworker/data
ENV FHIR_STAGING_DIR ../bcdaworker/tmpdata

WORKDIR /go/src/github.com/CMSgov/bcda-app
CMD ["bash", "unit_test.sh"]
9 changes: 5 additions & 4 deletions bcda/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func jobStatus(w http.ResponseWriter, r *http.Request) {

fi := fileItem{
Type: "ExplanationOfBenefit",
URL: fmt.Sprintf("%s://%s/data/%s.ndjson", scheme, r.Host, job.AcoID),
URL: fmt.Sprintf("%s://%s/data/%s/%s.ndjson", scheme, r.Host, jobID, job.AcoID),
}

rb := bulkResponseBody{
Expand All @@ -249,11 +249,11 @@ func jobStatus(w http.ResponseWriter, r *http.Request) {
Errors: []fileItem{},
}

errFilePath := fmt.Sprintf("%s/%s-error.ndjson", os.Getenv("FHIR_PAYLOAD_DIR"), job.AcoID)
errFilePath := fmt.Sprintf("%s/%s/%s-error.ndjson", os.Getenv("FHIR_PAYLOAD_DIR"), jobID, job.AcoID)
if _, err := os.Stat(errFilePath); !os.IsNotExist(err) {
errFI := fileItem{
Type: "OperationOutcome",
URL: fmt.Sprintf("%s://%s/data/%s-error.ndjson", scheme, r.Host, job.AcoID),
URL: fmt.Sprintf("%s://%s/data/%s/%s-error.ndjson", scheme, r.Host, jobID, job.AcoID),
}
rb.Errors = append(rb.Errors, errFI)
}
Expand Down Expand Up @@ -283,7 +283,8 @@ func serveData(w http.ResponseWriter, r *http.Request) {

dataDir := os.Getenv("FHIR_PAYLOAD_DIR")
acoID := chi.URLParam(r, "acoID")
http.ServeFile(w, r, fmt.Sprintf("%s/%s.ndjson", dataDir, acoID))
jobID := chi.URLParam(r, "jobID")
http.ServeFile(w, r, fmt.Sprintf("%s/%s/%s.ndjson", dataDir, jobID, acoID))
}

func getToken(w http.ResponseWriter, r *http.Request) {
Expand Down
21 changes: 17 additions & 4 deletions bcda/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,12 @@ func (s *APITestSuite) TestJobStatusCompleted() {
s.T().Error(err)
}

expectedurl := fmt.Sprintf("%s/%s/%s", "https://example.com/data", fmt.Sprint(j.ID), "dbbd1ce1-ae24-435c-807d-ed45953077d3.ndjson")

assert.Equal(s.T(), j.RequestURL, rb.RequestURL)
assert.Equal(s.T(), true, rb.RequiresAccessToken)
assert.Equal(s.T(), "ExplanationOfBenefit", rb.Files[0].Type)
assert.Equal(s.T(), "https://example.com/data/dbbd1ce1-ae24-435c-807d-ed45953077d3.ndjson", rb.Files[0].URL)
assert.Equal(s.T(), expectedurl, rb.Files[0].URL)
assert.Empty(s.T(), rb.Errors)

s.db.Delete(&j)
Expand All @@ -391,7 +393,15 @@ func (s *APITestSuite) TestJobStatusCompletedErrorFileExists() {
rctx.URLParams.Add("jobId", fmt.Sprint(j.ID))
req = req.WithContext(context.WithValue(req.Context(), chi.RouteCtxKey, rctx))

errFilePath := fmt.Sprintf("%s/%s-error.ndjson", os.Getenv("FHIR_PAYLOAD_DIR"), j.AcoID)
f := fmt.Sprintf("%s/%s", os.Getenv("FHIR_PAYLOAD_DIR"), fmt.Sprint(j.ID))
if _, err := os.Stat(f); os.IsNotExist(err) {
err = os.MkdirAll(f, os.ModePerm)
if err != nil {
s.T().Error(err)
}
}

errFilePath := fmt.Sprintf("%s/%s/%s-error.ndjson", os.Getenv("FHIR_PAYLOAD_DIR"), fmt.Sprint(j.ID), j.AcoID)
_, err := os.Create(errFilePath)
if err != nil {
s.T().Error(err)
Expand All @@ -408,12 +418,15 @@ func (s *APITestSuite) TestJobStatusCompletedErrorFileExists() {
s.T().Error(err)
}

dataurl := fmt.Sprintf("%s/%s/%s", "https://example.com/data", fmt.Sprint(j.ID), "dbbd1ce1-ae24-435c-807d-ed45953077d3.ndjson")
errorurl := fmt.Sprintf("%s/%s/%s", "https://example.com/data", fmt.Sprint(j.ID), "dbbd1ce1-ae24-435c-807d-ed45953077d3-error.ndjson")

assert.Equal(s.T(), j.RequestURL, rb.RequestURL)
assert.Equal(s.T(), true, rb.RequiresAccessToken)
assert.Equal(s.T(), "ExplanationOfBenefit", rb.Files[0].Type)
assert.Equal(s.T(), "https://example.com/data/dbbd1ce1-ae24-435c-807d-ed45953077d3.ndjson", rb.Files[0].URL)
assert.Equal(s.T(), dataurl, rb.Files[0].URL)
assert.Equal(s.T(), "OperationOutcome", rb.Errors[0].Type)
assert.Equal(s.T(), "https://example.com/data/dbbd1ce1-ae24-435c-807d-ed45953077d3-error.ndjson", rb.Errors[0].URL)
assert.Equal(s.T(), errorurl, rb.Errors[0].URL)

s.db.Delete(&j)
os.Remove(errFilePath)
Expand Down
2 changes: 1 addition & 1 deletion bcda/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewRouter() http.Handler {
r.Get("/bb_metadata", blueButtonMetadata)
}
})
r.With(auth.RequireTokenAuth, auth.RequireTokenACOMatch).Get("/data/{acoID}.ndjson", serveData)
r.With(auth.RequireTokenAuth, auth.RequireTokenACOMatch).Get("/data/{jobID}/{acoID}.ndjson", serveData)
r.Get("/_version", getVersion)
return r
}
Expand Down
17 changes: 15 additions & 2 deletions bcda/testUtils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"crypto/x509"
"encoding/pem"
"fmt"
"github.com/CMSgov/bcda-app/bcda/auth"
"github.com/stretchr/testify/suite"
"io/ioutil"
"log"
"os"

"github.com/CMSgov/bcda-app/bcda/auth"
"github.com/stretchr/testify/suite"
)

type AuthTestSuite struct {
Expand Down Expand Up @@ -92,3 +93,15 @@ func (s *AuthTestSuite) SetupAuthBackend() {

s.AuthBackend = auth.InitAuthBackend()
}

func CreateStaging(jobID string) {
os.Setenv("FHIR_STAGING_DIR", "data/test")
testdir := fmt.Sprintf("%s/%s", os.Getenv("FHIR_STAGING_DIR"), jobID)

if _, err := os.Stat(testdir); os.IsNotExist(err) {
err = os.MkdirAll(testdir, os.ModePerm)
if err != nil {
log.Fatal(err)
}
}
}
66 changes: 50 additions & 16 deletions bcdaworker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/jackc/pgx"
"io/ioutil"
"os"
"os/signal"
"regexp"
"strconv"
"syscall"

"github.com/CMSgov/bcda-app/bcda/responseutils"

"github.com/jackc/pgx"
log "github.com/sirupsen/logrus"

"github.com/CMSgov/bcda-app/bcda/client"
"github.com/CMSgov/bcda-app/bcda/database"
"github.com/CMSgov/bcda-app/bcda/models"
"github.com/CMSgov/bcda-app/bcda/responseutils"
"github.com/bgentry/que-go"
)

Expand Down Expand Up @@ -74,11 +74,45 @@ func processJob(j *que.Job) error {
return err
}

err = writeEOBDataToFile(bb, jobArgs.AcoID, jobArgs.BeneficiaryIDs)
jobID := strconv.Itoa(jobArgs.ID)
staging := fmt.Sprintf("%s/%s", os.Getenv("FHIR_STAGING_DIR"), jobID)
data := fmt.Sprintf("%s/%s", os.Getenv("FHIR_PAYLOAD_DIR"), jobID)

if _, err := os.Stat(staging); os.IsNotExist(err) {
err = os.MkdirAll(staging, os.ModePerm)
if err != nil {
log.Error(err)
return err
}
}
err = writeEOBDataToFile(bb, jobArgs.AcoID, jobArgs.BeneficiaryIDs, jobID)

if err != nil {
exportJob.Status = "Failed"
} else {
files, err := ioutil.ReadDir(staging)
if err != nil {
log.Error(err)
return err
}

for _, f := range files {
oldpath := staging + "/" + f.Name()
newpath := data + "/" + f.Name()
if _, err := os.Stat(data); os.IsNotExist(err) {
err = os.Mkdir(data, os.ModePerm)
if err != nil {
log.Error(err)
return err
}
}
err := os.Rename(oldpath, newpath)
if err != nil {
log.Error(err)
return err
}
}
os.Remove(staging)
exportJob.Status = "Completed"
}

Expand All @@ -92,7 +126,7 @@ func processJob(j *que.Job) error {
return nil
}

func writeEOBDataToFile(bb client.APIClient, acoID string, beneficiaryIDs []string) error {
func writeEOBDataToFile(bb client.APIClient, acoID string, beneficiaryIDs []string, jobID string) error {
re := regexp.MustCompile("[a-fA-F0-9]{8}(?:-[a-fA-F0-9]{4}){3}-[a-fA-F0-9]{12}")
if !re.Match([]byte(acoID)) {
err := errors.New("Invalid ACO ID")
Expand All @@ -106,8 +140,8 @@ func writeEOBDataToFile(bb client.APIClient, acoID string, beneficiaryIDs []stri
return err
}

dataDir := os.Getenv("FHIR_PAYLOAD_DIR")
f, err := os.Create(fmt.Sprintf("%s/%s.ndjson", dataDir, acoID))
dataDir := os.Getenv("FHIR_STAGING_DIR")
f, err := os.Create(fmt.Sprintf("%s/%s/%s.ndjson", dataDir, jobID, acoID))
if err != nil {
log.Error(err)
return err
Expand All @@ -121,9 +155,9 @@ func writeEOBDataToFile(bb client.APIClient, acoID string, beneficiaryIDs []stri
pData, err := bb.GetExplanationOfBenefitData(beneficiaryID)
if err != nil {
log.Error(err)
appendErrorToFile(acoID, responseutils.Exception, responseutils.BbErr, fmt.Sprintf("Error retrieving ExplanationOfBenefit for beneficiary %s in ACO %s", beneficiaryID, acoID))
appendErrorToFile(acoID, responseutils.Exception, responseutils.BbErr, fmt.Sprintf("Error retrieving ExplanationOfBenefit for beneficiary %s in ACO %s", beneficiaryID, acoID), jobID)
} else {
fhirBundleToResourceNDJSON(w, pData, "ExplanationOfBenefits", beneficiaryID, acoID)
fhirBundleToResourceNDJSON(w, pData, "ExplanationOfBenefits", beneficiaryID, acoID, jobID)
}
}

Expand All @@ -132,11 +166,11 @@ func writeEOBDataToFile(bb client.APIClient, acoID string, beneficiaryIDs []stri
return nil
}

func appendErrorToFile(acoID, code, detailsCode, detailsDisplay string) {
func appendErrorToFile(acoID, code, detailsCode, detailsDisplay string, jobID string) {
oo := responseutils.CreateOpOutcome(responseutils.Error, code, detailsCode, detailsDisplay)

dataDir := os.Getenv("FHIR_PAYLOAD_DIR")
fileName := fmt.Sprintf("%s/%s-error.ndjson", dataDir, acoID)
dataDir := os.Getenv("FHIR_STAGING_DIR")
fileName := fmt.Sprintf("%s/%s/%s-error.ndjson", dataDir, jobID, acoID)
f, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
log.Error(err)
Expand All @@ -154,12 +188,12 @@ func appendErrorToFile(acoID, code, detailsCode, detailsDisplay string) {
}
}

func fhirBundleToResourceNDJSON(w *bufio.Writer, jsonData, jsonType, beneficiaryID, acoID string) {
func fhirBundleToResourceNDJSON(w *bufio.Writer, jsonData, jsonType, beneficiaryID, acoID string, jobID string) {
var jsonOBJ map[string]interface{}
err := json.Unmarshal([]byte(jsonData), &jsonOBJ)
if err != nil {
log.Error(err)
appendErrorToFile(acoID, responseutils.Exception, responseutils.InternalErr, fmt.Sprintf("Error UnMarshaling %s from data for beneficiary %s in ACO %s", jsonType, beneficiaryID, acoID))
appendErrorToFile(acoID, responseutils.Exception, responseutils.InternalErr, fmt.Sprintf("Error UnMarshaling %s from data for beneficiary %s in ACO %s", jsonType, beneficiaryID, acoID), jobID)
return
}

Expand All @@ -173,13 +207,13 @@ func fhirBundleToResourceNDJSON(w *bufio.Writer, jsonData, jsonType, beneficiary
// This is unlikely to happen because we just unmarshalled this data a few lines above.
if err != nil {
log.Error(err)
appendErrorToFile(acoID, responseutils.Exception, responseutils.InternalErr, fmt.Sprintf("Error Marshaling %s to Json for beneficiary %s in ACO %s", jsonType, beneficiaryID, acoID))
appendErrorToFile(acoID, responseutils.Exception, responseutils.InternalErr, fmt.Sprintf("Error Marshaling %s to Json for beneficiary %s in ACO %s", jsonType, beneficiaryID, acoID), jobID)
continue
}
_, err = w.WriteString(string(entryJSON) + "\n")
if err != nil {
log.Error(err)
appendErrorToFile(acoID, responseutils.Exception, responseutils.InternalErr, fmt.Sprintf("Error writing %s to file for beneficiary %s in ACO %s", jsonType, beneficiaryID, acoID))
appendErrorToFile(acoID, responseutils.Exception, responseutils.InternalErr, fmt.Sprintf("Error writing %s to file for beneficiary %s in ACO %s", jsonType, beneficiaryID, acoID), jobID)
}
}
}
Expand Down
Loading

0 comments on commit 87b6000

Please sign in to comment.