Skip to content

Handle HAPI FHIR's MDM service in HAPI JDBC mode #1216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.hl7.fhir.exceptions.FHIRException;
import org.hl7.fhir.r4.model.CanonicalType;
import org.hl7.fhir.r4.model.Coding;
import org.hl7.fhir.r4.model.Identifier;
import org.hl7.fhir.r4.model.Meta;
import org.hl7.fhir.r4.model.Resource;
import org.hl7.fhir.r4.model.codesystems.ActionType;
Expand All @@ -55,6 +56,8 @@ public class ConvertResourceFn extends FetchSearchPageFn<HapiRowDescriptor> {

private final HashMap<String, Counter> totalPushTimeMillisMap;

private final List<String> mdmResourceTypes;

private final Boolean processDeletedRecords;

Counter counter =
Expand All @@ -70,7 +73,8 @@ public class ConvertResourceFn extends FetchSearchPageFn<HapiRowDescriptor> {
// Only in the incremental mode we process deleted resources.
this.processDeletedRecords = !Strings.isNullOrEmpty(options.getSince());
List<String> resourceTypes = Arrays.asList(options.getResourceList().split(","));
for (String resourceType : resourceTypes) {
this.mdmResourceTypes = Arrays.asList(options.getMdmResourceList().split(","));
for (String resourceType : options.getResourceList().split(",")) {
this.numFetchedResourcesMap.put(
resourceType,
Metrics.counter(
Expand All @@ -97,15 +101,15 @@ public class ConvertResourceFn extends FetchSearchPageFn<HapiRowDescriptor> {

public void writeResource(HapiRowDescriptor element)
throws IOException, ParseException, SQLException, ViewApplicationException, ProfileException {
String resourceId = element.resourceId();
String fhirId = element.fhirId();
String forcedId = element.forcedId();
String resourceType = element.resourceType();
Meta meta =
new Meta()
.setVersionId(element.resourceVersion())
.setLastUpdated(simpleDateFormat.parse(element.lastUpdated()));
setMetaTags(element, meta);
String jsonResource = element.jsonResource();
String jsonResource = updateSourceIds(element);
long startTime = System.currentTimeMillis();
Resource resource = null;
if (jsonResource == null || jsonResource.isBlank()) {
Expand Down Expand Up @@ -134,12 +138,16 @@ public void writeResource(HapiRowDescriptor element)
}
totalParseTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime);
if (forcedId == null || forcedId.equals("")) {
resource.setId(resourceId);
resource.setId(fhirId);
} else {
resource.setId(forcedId);
}
resource.setMeta(meta);

if (mdmResourceTypes.contains(resourceType)) {
addSourceIdentifiers(resource, element);
}

numFetchedResourcesMap.get(resourceType).inc(1);

if (!parquetFile.isEmpty()) {
Expand Down Expand Up @@ -192,6 +200,39 @@ private void setMetaTags(HapiRowDescriptor element, Meta meta) {
}
}

private String updateSourceIds(HapiRowDescriptor element) {
String jsonResource = element.jsonResource();
if (element.getMdmLinks() != null) {
for (MdmLink mdmLink : element.getMdmLinks()) {
jsonResource =
jsonResource.replaceAll(mdmLink.getSourceFhirId(), mdmLink.getGoldenFhirId());
}
}
return jsonResource;
}

private void addSourceIdentifiers(Resource resource, HapiRowDescriptor element) {
if (element.getSourceIdentifiers() != null) {
List<Identifier> identifiers = new ArrayList<>();
for (SourceIdentifier sourceIdentifier : element.getSourceIdentifiers()) {
identifiers.add(
new Identifier()
.setSystem(sourceIdentifier.getSystem())
.setValue(sourceIdentifier.getValue()));
}

try {
resource.getClass().getMethod("setIdentifier", List.class).invoke(resource, identifiers);
} catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
log.warn(
"Failed to set identifiers for ${}, check that mdmResourceList is properly configured:"
+ " ",
resource.fhirType(),
e);
}
}
}

@Override
public void finishBundle(FinishBundleContext context) {
super.finishBundle(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,12 @@ private static List<Pipeline> buildHapiJdbcPipeline(FhirEtlOptions options)

JdbcFetchHapi jdbcFetchHapi = new JdbcFetchHapi(jdbcSource);
Map<String, Integer> resourceCount =
jdbcFetchHapi.searchResourceCounts(options.getResourceList(), options.getSince());
jdbcFetchHapi.searchResourceCounts(
options.getResourceList(), options.getSince(), options.getMdmResourceList());

List<Pipeline> pipelines = new ArrayList<>();
long totalNumOfResources = 0l;
List<String> mdmResourceTypes = Arrays.asList(options.getMdmResourceList().split(","));
for (String resourceType : options.getResourceList().split(",")) {
int numResources = resourceCount.get(resourceType);
if (numResources == 0) {
Expand All @@ -346,7 +348,9 @@ private static List<Pipeline> buildHapiJdbcPipeline(FhirEtlOptions options)
new JdbcFetchHapi.FetchRowsJdbcIo(
options.getResourceList(),
JdbcIO.DataSourceConfiguration.create(jdbcSource),
options.getSince()));
options.getSince(),
mdmResourceTypes.contains(resourceType),
options.getMapToGoldenResources()));

payload.apply(
"Convert to parquet for " + resourceType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,18 @@ public interface FhirEtlOptions extends BasePipelineOptions {
String getSourceNdjsonFilePatternList();

void setSourceNdjsonFilePatternList(String value);

@Description(
"Whether to use HAPI-FHIR's MDM service to replace Resource references with their Golden"
+ " Resource counterparts")
@Default.Boolean(false)
Boolean getMapToGoldenResources();

void setMapToGoldenResources(Boolean mapToGoldenResources);

@Description("Comma separated list of resources to treat as MDM Resources.")
@Default.String("Patient")
String getMdmResourceList();

void setMdmResourceList(String mdmResourceList);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Google LLC
* Copyright 2020-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,7 @@ abstract class HapiRowDescriptor implements Serializable {

static HapiRowDescriptor create(
String resourceId,
String fhirId,
String forcedId,
String resourceType,
String lastUpdated,
Expand All @@ -40,6 +41,7 @@ static HapiRowDescriptor create(
String jsonResource) {
return new AutoValue_HapiRowDescriptor(
resourceId,
fhirId,
forcedId,
resourceType,
lastUpdated,
Expand All @@ -50,6 +52,8 @@ static HapiRowDescriptor create(

abstract String resourceId();

abstract String fhirId();

@Nullable
abstract String forcedId();

Expand All @@ -65,4 +69,8 @@ static HapiRowDescriptor create(

// FHIR tags.
List<ResourceTag> tags;

List<MdmLink> mdmLinks;

List<SourceIdentifier> sourceIdentifiers;
}
Loading