-
Notifications
You must be signed in to change notification settings - Fork 2
[DT-2541] Support Elasticsearch 5 and OpenSearch 3 simultaneously #2765
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| package org.broadinstitute.consent.http.configurations; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
| import jakarta.validation.constraints.NotNull; | ||
| import java.util.List; | ||
|
|
||
| @JsonIgnoreProperties(ignoreUnknown = true) | ||
| public class OpenSearchConfiguration { | ||
|
|
||
| @NotNull private String indexName; | ||
|
|
||
| @NotNull private List<String> servers; | ||
|
|
||
| @NotNull private String datasetIndexName; | ||
|
|
||
| /** This is configurable for testing purposes */ | ||
| private int port = 9201; | ||
|
|
||
| public List<String> getServers() { | ||
| return servers; | ||
| } | ||
|
|
||
| public void setServers(List<String> servers) { | ||
| this.servers = servers; | ||
| } | ||
|
|
||
| public String getIndexName() { | ||
| return indexName; | ||
| } | ||
|
|
||
| public void setIndexName(String indexName) { | ||
| this.indexName = indexName; | ||
| } | ||
|
|
||
| public int getPort() { | ||
| return port; | ||
| } | ||
|
|
||
| public void setPort(int port) { | ||
| this.port = port; | ||
| } | ||
|
|
||
| public String getDatasetIndexName() { | ||
| return datasetIndexName; | ||
| } | ||
|
|
||
| public void setDatasetIndexName(String datasetIndexName) { | ||
| this.datasetIndexName = datasetIndexName; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| package org.broadinstitute.consent.http.health; | ||
|
|
||
| import static org.broadinstitute.consent.http.service.ontology.OpenSearchSupport.jsonHeader; | ||
|
|
||
| import com.codahale.metrics.health.HealthCheck; | ||
| import com.google.gson.JsonObject; | ||
| import com.google.gson.JsonParser; | ||
| import com.google.inject.Inject; | ||
| import io.dropwizard.lifecycle.Managed; | ||
| import java.io.IOException; | ||
| import java.nio.charset.Charset; | ||
| import org.apache.commons.io.IOUtils; | ||
| import org.broadinstitute.consent.http.configurations.OpenSearchConfiguration; | ||
| import org.broadinstitute.consent.http.service.ontology.OpenSearchSupport; | ||
| import org.opensearch.client.Request; | ||
| import org.opensearch.client.RequestOptions; | ||
| import org.opensearch.client.Response; | ||
| import org.opensearch.client.RestClient; | ||
|
|
||
| public class OpenSearchHealthCheck extends HealthCheck implements Managed { | ||
|
|
||
| private final RestClient client; | ||
|
|
||
| @Override | ||
| public void start() throws Exception {} | ||
|
Check failure on line 25 in src/main/java/org/broadinstitute/consent/http/health/OpenSearchHealthCheck.java
|
||
|
|
||
| @Override | ||
| public void stop() throws Exception { | ||
| if (client != null) { | ||
| client.close(); | ||
| } | ||
| } | ||
|
|
||
| @Inject | ||
| public OpenSearchHealthCheck(OpenSearchConfiguration config) { | ||
| this.client = OpenSearchSupport.createRestClient(config); | ||
| } | ||
|
|
||
| @Override | ||
| protected Result check() throws Exception { | ||
| try { | ||
| RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); | ||
| builder.addHeader(jsonHeader.getName(), jsonHeader.getValue()); | ||
| Request request = new Request("GET", OpenSearchSupport.getClusterHealthPath()); | ||
| request.setOptions(builder.build()); | ||
| Response esResponse = client.performRequest(request); | ||
| if (esResponse.getStatusLine().getStatusCode() != 200) { | ||
| return Result.unhealthy( | ||
| "Invalid health check request: " + esResponse.getStatusLine().getReasonPhrase()); | ||
| } | ||
| String stringResponse = | ||
| IOUtils.toString(esResponse.getEntity().getContent(), Charset.defaultCharset()); | ||
| JsonObject jsonResponse = JsonParser.parseString(stringResponse).getAsJsonObject(); | ||
| String status = jsonResponse.get("status").getAsString(); | ||
| if (status.equalsIgnoreCase("red")) { | ||
| return Result.unhealthy("ClusterHealth is RED\n" + jsonResponse); | ||
| } | ||
| if (status.equalsIgnoreCase("yellow")) { | ||
| return Result.healthy("ClusterHealth is YELLOW\n" + jsonResponse); | ||
| } | ||
| } catch (IOException e) { | ||
| return Result.unhealthy("Unable to connect to OpenSearch"); | ||
| } | ||
| return Result.healthy("ClusterHealth is GREEN"); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,6 +56,7 @@ | |
| import org.broadinstitute.consent.http.service.DatasetRegistrationService; | ||
| import org.broadinstitute.consent.http.service.DatasetService; | ||
| import org.broadinstitute.consent.http.service.ElasticSearchService; | ||
| import org.broadinstitute.consent.http.service.OpenSearchService; | ||
| import org.broadinstitute.consent.http.service.TDRService; | ||
| import org.broadinstitute.consent.http.service.UserService; | ||
| import org.broadinstitute.consent.http.util.JsonSchemaUtil; | ||
|
|
@@ -72,6 +73,7 @@ | |
| private final TDRService tdrService; | ||
| private final UserService userService; | ||
| private final ElasticSearchService elasticSearchService; | ||
| private final OpenSearchService openSearchService; | ||
|
|
||
| private final JsonSchemaUtil jsonSchemaUtil; | ||
| private final GCSService gcsService; | ||
|
|
@@ -82,13 +84,15 @@ | |
| UserService userService, | ||
| DatasetRegistrationService datasetRegistrationService, | ||
| ElasticSearchService elasticSearchService, | ||
| OpenSearchService openSearchService, | ||
| TDRService tdrService, | ||
| GCSService gcsService) { | ||
| this.datasetService = datasetService; | ||
| this.userService = userService; | ||
| this.datasetRegistrationService = datasetRegistrationService; | ||
| this.gcsService = gcsService; | ||
| this.elasticSearchService = elasticSearchService; | ||
| this.openSearchService = openSearchService; | ||
| this.tdrService = tdrService; | ||
| this.jsonSchemaUtil = new JsonSchemaUtil(); | ||
| } | ||
|
|
@@ -222,6 +226,7 @@ | |
| } | ||
| Dataset patched = datasetRegistrationService.patchDataset(datasetId, user, patch); | ||
| elasticSearchService.synchronizeDatasetInESIndex(patched, user, false); | ||
| openSearchService.synchronizeDatasetInESIndex(patched, user, false); | ||
| return Response.ok(patched).build(); | ||
| } catch (Exception e) { | ||
| return createExceptionResponse(e); | ||
|
|
@@ -376,7 +381,12 @@ | |
| } | ||
| try (var deleteResponse = elasticSearchService.deleteIndex(datasetId, user.getUserId())) { | ||
| if (!HttpStatusCodes.isSuccess(deleteResponse.getStatus())) { | ||
| logWarn("Unable to delete index for dataset: " + datasetId); | ||
| logWarn("Unable to delete Elasticsearch index for dataset: " + datasetId); | ||
| } | ||
| } | ||
| try (var deleteResponse = openSearchService.deleteIndex(datasetId, user.getUserId())) { | ||
| if (!HttpStatusCodes.isSuccess(deleteResponse.getStatus())) { | ||
| logWarn("Unable to delete OpenSearch index for dataset: " + datasetId); | ||
| } | ||
| } | ||
| return Response.ok().build(); | ||
|
|
@@ -392,8 +402,10 @@ | |
| try { | ||
| User user = userService.findUserByEmail(authUser.getEmail()); | ||
| var datasetIds = datasetService.findAllDatasetIds(); | ||
| StreamingOutput indexResponse = elasticSearchService.indexDatasetIds(datasetIds, user); | ||
| return Response.ok(indexResponse, MediaType.APPLICATION_JSON).build(); | ||
| StreamingOutput elasticSearchResponse = | ||
| elasticSearchService.indexDatasetIds(datasetIds, user); | ||
| StreamingOutput openSearchResponse = openSearchService.indexDatasetIds(datasetIds, user); | ||
|
Check warning on line 407 in src/main/java/org/broadinstitute/consent/http/resources/DatasetResource.java
|
||
| return Response.ok(elasticSearchResponse, MediaType.APPLICATION_JSON).build(); | ||
| } catch (Exception e) { | ||
| return createExceptionResponse(e); | ||
| } | ||
|
|
@@ -405,7 +417,9 @@ | |
| public Response indexDataset(@Auth AuthUser authUser, @PathParam("datasetId") Integer datasetId) { | ||
| try { | ||
| User user = userService.findUserByEmail(authUser.getEmail()); | ||
| return elasticSearchService.indexDataset(datasetId, user); | ||
| Response elasticSearchResponse = elasticSearchService.indexDataset(datasetId, user); | ||
| Response openSearchResponse = openSearchService.indexDataset(datasetId, user); | ||
|
Check warning on line 421 in src/main/java/org/broadinstitute/consent/http/resources/DatasetResource.java
|
||
| return elasticSearchResponse; | ||
| } catch (Exception e) { | ||
| return createExceptionResponse(e); | ||
| } | ||
|
|
@@ -446,7 +460,7 @@ | |
| @Timed | ||
| public Response searchDatasetIndexStream(@Auth DuosUser duosUser, String query) { | ||
| try { | ||
| InputStream inputStream = elasticSearchService.searchDatasetsStream(query); | ||
| InputStream inputStream = openSearchService.searchDatasetsStream(query); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We use |
||
| StreamingOutput stream = createStreamingOutput(inputStream); | ||
| return Response.ok(stream).build(); | ||
| } catch (Exception e) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are staging the OpenSearch rollout over time and we don't have values for production, this will likely cause the app to fail on startup.