From 43bb00475e8fa88e4d336f5fc5d126d2eb5bcbce Mon Sep 17 00:00:00 2001 From: jmacryl Date: Fri, 28 Nov 2025 19:47:09 +0100 Subject: [PATCH 1/5] ES clear: delete indices themselves, not all the docs --- .../elastic/ElasticSearchGraphService.java | 15 ++- .../indexbuilder/ESIndexBuilder.java | 53 ++++++++ .../elasticsearch/update/ESWriteDAO.java | 123 +++++++++++++++++- .../ElasticSearchSystemMetadataService.java | 15 ++- .../search/ElasticSearchServiceFactory.java | 12 +- 5 files changed, 208 insertions(+), 10 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java index e4e427bd42da2..bdb21aaa5d56a 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java @@ -333,8 +333,19 @@ public List buildReindexConfigs( @Override public void clear() { - esBulkProcessor.deleteByQuery( - QueryBuilders.matchAllQuery(), true, indexConvention.getIndexName(INDEX_NAME)); + // Instead of deleting all documents (inefficient), delete and recreate the index + String indexName = indexConvention.getIndexName(INDEX_NAME); + try { + // Build the reindex config directly without needing OperationContext + ReindexConfig config = + indexBuilder.buildReindexState( + indexName, GraphRelationshipMappingsBuilder.getMappings(), Collections.emptyMap()); + indexBuilder.clearIndex(indexName, config); + log.info("Cleared index {} by deleting and recreating it", indexName); + } catch (IOException e) { + log.error("Failed to clear index {}", indexName, e); + throw new RuntimeException("Failed to clear index: " + indexName, e); + } } @Override diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java index ab27d9b3c7e18..093018ff528e9 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java @@ -60,6 +60,7 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.client.*; +import org.opensearch.client.GetAliasesResponse; import org.opensearch.client.core.CountRequest; import org.opensearch.client.core.CountResponse; import org.opensearch.client.indices.CreateIndexRequest; @@ -1240,6 +1241,58 @@ private void createIndex(String indexName, ReindexConfig state) throws IOExcepti log.info("Created index {}", indexName); } + /** + * Efficiently clear an index by deleting it and recreating it with the same configuration. This + * is much more efficient than deleting all documents using deleteByQuery. + * + * @param indexName The name of the index to clear (can be an alias or concrete index) + * @param config The ReindexConfig containing mappings and settings for the index + * @throws IOException If the deletion or creation fails + */ + public void clearIndex(String indexName, ReindexConfig config) throws IOException { + // Check if the index name is an alias or a concrete index + GetAliasesRequest getAliasesRequest = new GetAliasesRequest(indexName); + GetAliasesResponse aliasesResponse = + searchClient.getIndexAliases(getAliasesRequest, RequestOptions.DEFAULT); + + Collection indicesToDelete; + if (aliasesResponse.getAliases().isEmpty()) { + // Not an alias, check if it's a concrete index + boolean indexExists = + searchClient.indexExists(new GetIndexRequest(indexName), RequestOptions.DEFAULT); + if (!indexExists) { + log.info("Index {} does not exist, nothing to clear", indexName); + // Still recreate the index + createIndex(indexName, config); + return; + } + indicesToDelete = List.of(indexName); + log.info("Deleting concrete index {} for efficient clearing", indexName); + } else { + // It's an alias, delete the concrete indices behind it + indicesToDelete = aliasesResponse.getAliases().keySet(); + log.info( + "Deleting concrete indices {} behind alias {} for efficient clearing", + indicesToDelete, + indexName); + } + + // Delete the concrete indices + for (String concreteIndex : indicesToDelete) { + try { + deleteActionWithRetry(searchClient, concreteIndex); + } catch (Exception e) { + log.error("Failed to delete index {} during clear operation", concreteIndex, e); + throw new IOException("Failed to delete index: " + concreteIndex, e); + } + } + + // Recreate the index with the same mappings and settings + // This will recreate the alias if the original was an alias + log.info("Recreating index {} after clearing", indexName); + createIndex(indexName, config); + } + public static void cleanOrphanedIndices( SearchClientShim searchClient, ElasticSearchConfiguration esConfig, diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java index 4166013045809..ee2cfba967051 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java @@ -7,6 +7,10 @@ import com.google.common.annotations.VisibleForTesting; import com.linkedin.metadata.config.search.BulkDeleteConfiguration; import com.linkedin.metadata.config.search.ElasticSearchConfiguration; +import com.linkedin.metadata.search.elasticsearch.index.MappingsBuilder; +import com.linkedin.metadata.search.elasticsearch.index.SettingsBuilder; +import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; +import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexConfig; import com.linkedin.metadata.utils.elasticsearch.SearchClientShim; import com.linkedin.metadata.utils.elasticsearch.responses.GetIndexResponse; import io.datahubproject.metadata.context.OperationContext; @@ -14,20 +18,26 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import javax.annotation.Nonnull; import javax.annotation.Nullable; import lombok.Builder; import lombok.Data; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.update.UpdateRequest; +import org.opensearch.client.GetAliasesResponse; import org.opensearch.client.RequestOptions; import org.opensearch.client.core.CountRequest; import org.opensearch.client.core.CountResponse; @@ -38,19 +48,44 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.query.QueryBuilder; -import org.opensearch.index.query.QueryBuilders; import org.opensearch.index.reindex.DeleteByQueryRequest; import org.opensearch.script.Script; import org.opensearch.script.ScriptType; @Slf4j -@RequiredArgsConstructor public class ESWriteDAO { private final ElasticSearchConfiguration config; private final SearchClientShim searchClient; @Getter private final ESBulkProcessor bulkProcessor; + @Nullable private final ESIndexBuilder indexBuilder; + @Nullable private final MappingsBuilder mappingsBuilder; + @Nullable private final SettingsBuilder settingsBuilder; private boolean canWrite = true; + // Constructor with index recreation support + public ESWriteDAO( + ElasticSearchConfiguration config, + SearchClientShim searchClient, + ESBulkProcessor bulkProcessor, + @Nullable ESIndexBuilder indexBuilder, + @Nullable MappingsBuilder mappingsBuilder, + @Nullable SettingsBuilder settingsBuilder) { + this.config = config; + this.searchClient = searchClient; + this.bulkProcessor = bulkProcessor; + this.indexBuilder = indexBuilder; + this.mappingsBuilder = mappingsBuilder; + this.settingsBuilder = settingsBuilder; + } + + // Constructor for backward compatibility (without index recreation) + public ESWriteDAO( + ElasticSearchConfiguration config, + SearchClientShim searchClient, + ESBulkProcessor bulkProcessor) { + this(config, searchClient, bulkProcessor, null, null, null); + } + public void setWritable(boolean writable) { canWrite = writable; } @@ -239,7 +274,87 @@ public void clear(@Nonnull OperationContext opContext) { for (String pattern : patterns) { allIndices.addAll(Arrays.asList(getIndices(pattern))); } - bulkProcessor.deleteByQuery(QueryBuilders.matchAllQuery(), allIndices.toArray(new String[0])); + + // Track which indices (aliases or concrete) were deleted so we can recreate them + Set deletedIndexNames = new HashSet<>(); + + // Instead of deleting all documents (inefficient), delete the indices themselves + for (String indexName : allIndices) { + try { + // Check if the index name is an alias or a concrete index + GetAliasesRequest getAliasesRequest = new GetAliasesRequest(indexName); + GetAliasesResponse aliasesResponse = + searchClient.getIndexAliases(getAliasesRequest, RequestOptions.DEFAULT); + Collection indicesToDelete; + if (aliasesResponse.getAliases().isEmpty()) { + // Not an alias, check if it's a concrete index + boolean indexExists = + searchClient.indexExists(new GetIndexRequest(indexName), RequestOptions.DEFAULT); + if (!indexExists) { + log.debug("Index {} does not exist, skipping", indexName); + continue; + } + // If it wasn't an alias, track the concrete index name + deletedIndexNames.add(indexName); + indicesToDelete = List.of(indexName); + log.info("Deleting concrete index {} for efficient clearing", indexName); + } else { + // It's an alias, delete the concrete indices behind it + indicesToDelete = aliasesResponse.getAliases().keySet(); + log.info( + "Deleting concrete indices {} behind alias {} for efficient clearing", + indicesToDelete, + indexName); + // Track the alias name, not the concrete indices + deletedIndexNames.add(indexName); + } + + // Delete the concrete indices + for (String concreteIndex : indicesToDelete) { + try { + DeleteIndexRequest deleteRequest = new DeleteIndexRequest(concreteIndex); + deleteRequest.timeout(org.opensearch.common.unit.TimeValue.timeValueSeconds(30)); + searchClient.deleteIndex(deleteRequest, RequestOptions.DEFAULT); + log.info("Successfully deleted index {}", concreteIndex); + } catch (IOException e) { + log.error("Failed to delete index {} during clear operation", concreteIndex, e); + throw new RuntimeException("Failed to delete index: " + concreteIndex, e); + } + } + } catch (IOException e) { + log.error("Failed to check aliases for index {} during clear operation", indexName, e); + throw new RuntimeException("Failed to clear index: " + indexName, e); + } + } + + // Recreate only the indices that were deleted, with their proper mappings and settings + if (!deletedIndexNames.isEmpty() + && indexBuilder != null + && mappingsBuilder != null + && settingsBuilder != null) { + try { + List allConfigs = + indexBuilder.buildReindexConfigs( + opContext, settingsBuilder, mappingsBuilder, Collections.emptySet()); + + // Filter to only recreate indices that were deleted + for (ReindexConfig config : allConfigs) { + if (deletedIndexNames.contains(config.name())) { + indexBuilder.buildIndex(config); + log.info("Recreated index {} after clearing", config.name()); + } + } + log.info("Recreated {} indices after clearing", deletedIndexNames.size()); + } catch (IOException e) { + log.error("Failed to recreate indices after clearing", e); + throw new RuntimeException("Failed to recreate indices after clearing", e); + } + } else if (deletedIndexNames.isEmpty()) { + log.info("No indices were deleted, nothing to recreate"); + } else { + log.warn( + "Index builders not available - indices will be recreated automatically when documents are added"); + } } private String[] getIndices(String pattern) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataService.java b/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataService.java index c38e08a6b89c7..a4f2354fca710 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataService.java @@ -279,8 +279,19 @@ public List buildReindexConfigs( @Override public void clear() { - _esBulkProcessor.deleteByQuery( - QueryBuilders.matchAllQuery(), true, _indexConvention.getIndexName(INDEX_NAME)); + // Instead of deleting all documents (inefficient), delete and recreate the index + String indexName = _indexConvention.getIndexName(INDEX_NAME); + try { + // Build the reindex config directly without needing OperationContext + ReindexConfig config = + _indexBuilder.buildReindexState( + indexName, SystemMetadataMappingsBuilder.getMappings(), Collections.emptyMap()); + _indexBuilder.clearIndex(indexName, config); + log.info("Cleared index {} by deleting and recreating it", indexName); + } catch (IOException e) { + log.error("Failed to clear index {}", indexName, e); + throw new RuntimeException("Failed to clear index: " + indexName, e); + } } @Override diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java index 3344b538b262c..cdd6e9a2cf97d 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java @@ -72,10 +72,18 @@ protected ESSearchDAO esSearchDAO( } @Bean - protected ESWriteDAO esWriteDAO(final ConfigurationProvider configurationProvider) { + protected ESWriteDAO esWriteDAO( + final ConfigurationProvider configurationProvider, + @Qualifier("mappingsBuilder") final MappingsBuilder mappingsBuilder, + @Qualifier("settingsBuilder") final SettingsBuilder settingsBuilder) { ESWriteDAO esWriteDAO = new ESWriteDAO( - components.getConfig(), components.getSearchClient(), components.getBulkProcessor()); + components.getConfig(), + components.getSearchClient(), + components.getBulkProcessor(), + components.getIndexBuilder(), + mappingsBuilder, + settingsBuilder); if (configurationProvider.getDatahub().isReadOnly()) { esWriteDAO.setWritable(false); } From 58c7494a9ea9a09e7bf219bd6e11278e1354271a Mon Sep 17 00:00:00 2001 From: jmacryl Date: Mon, 1 Dec 2025 10:55:29 -0800 Subject: [PATCH 2/5] fix tests --- .../search/ElasticSearchServiceFactory.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java index cdd6e9a2cf97d..142db982af87b 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java @@ -30,10 +30,14 @@ public class ElasticSearchServiceFactory { @Qualifier("baseElasticSearchComponents") private BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components; - @Autowired + @Autowired(required = false) @Qualifier("settingsBuilder") private SettingsBuilder settingsBuilder; + @Autowired(required = false) + @Qualifier("mappingsBuilder") + private MappingsBuilder mappingsBuilder; + @Autowired @Qualifier("entityRegistry") private EntityRegistry entityRegistry; @@ -72,10 +76,8 @@ protected ESSearchDAO esSearchDAO( } @Bean - protected ESWriteDAO esWriteDAO( - final ConfigurationProvider configurationProvider, - @Qualifier("mappingsBuilder") final MappingsBuilder mappingsBuilder, - @Qualifier("settingsBuilder") final SettingsBuilder settingsBuilder) { + protected ESWriteDAO esWriteDAO(final ConfigurationProvider configurationProvider) { + // Builders may be null in test contexts - that's fine, ESWriteDAO handles null builders ESWriteDAO esWriteDAO = new ESWriteDAO( components.getConfig(), @@ -98,11 +100,9 @@ protected ElasticSearchService getInstance( final ElasticSearchConfiguration elasticSearchConfiguration, @Nullable final CustomSearchConfiguration customSearchConfiguration, final ESSearchDAO esSearchDAO, - final ESWriteDAO esWriteDAO, - @Qualifier("mappingsBuilder") final MappingsBuilder mappingsBuilder, - @Qualifier("settingsBuilder") final SettingsBuilder settingsBuilder) + final ESWriteDAO esWriteDAO) throws IOException { - + // Use the field-injected builders (may be null in test contexts) return new ElasticSearchService( components.getIndexBuilder(), configurationProvider.getSearchService(), From 768a1f61fcc9d30ba384d429361c7929d930d3d4 Mon Sep 17 00:00:00 2001 From: jmacryl Date: Mon, 1 Dec 2025 11:51:28 -0800 Subject: [PATCH 3/5] fix tests --- .../search/update/ESWriteDAOTest.java | 83 +++++++++++++++++-- 1 file changed, 75 insertions(+), 8 deletions(-) diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/update/ESWriteDAOTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/update/ESWriteDAOTest.java index ee2f451c5aa14..5251a29e0f983 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/update/ESWriteDAOTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/update/ESWriteDAOTest.java @@ -3,6 +3,7 @@ import static io.datahubproject.test.search.SearchTestUtils.TEST_OS_SEARCH_CONFIG; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -33,8 +34,12 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.action.update.UpdateRequest; +import org.opensearch.client.GetAliasesResponse; import org.opensearch.client.RequestOptions; import org.opensearch.client.core.CountRequest; import org.opensearch.client.core.CountResponse; @@ -160,6 +165,31 @@ public void testClear() throws IOException { when(mockSearchClient.getIndex(any(GetIndexRequest.class), eq(RequestOptions.DEFAULT))) .thenReturn(mockResponse); + // Mock getIndexAliases to return empty (meaning concrete index, not alias) + // This will be called once for each index, so we need to return the same response multiple + // times + GetAliasesResponse mockAliasesResponse = mock(GetAliasesResponse.class); + when(mockAliasesResponse.getAliases()).thenReturn(java.util.Collections.emptyMap()); + // Use lenient() to ensure the mock works even if not all calls are verified + lenient() + .when( + mockSearchClient.getIndexAliases( + any(GetAliasesRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockAliasesResponse); + + // Mock indexExists to return true (called once for each index) + lenient() + .when(mockSearchClient.indexExists(any(GetIndexRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(true); + + // Mock deleteIndex + AcknowledgedResponse mockDeleteResponse = mock(AcknowledgedResponse.class); + when(mockDeleteResponse.isAcknowledged()).thenReturn(true); + lenient() + .when( + mockSearchClient.deleteIndex(any(DeleteIndexRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockDeleteResponse); + esWriteDAO.clear(opContext); // Verify the GetIndexRequest @@ -168,10 +198,16 @@ public void testClear() throws IOException { verify(mockSearchClient).getIndex(indexRequestCaptor.capture(), eq(RequestOptions.DEFAULT)); assertEquals(indexRequestCaptor.getValue().indices()[0], TEST_PATTERN); - // Verify the deletion query - ArgumentCaptor queryCaptor = ArgumentCaptor.forClass(QueryBuilder.class); - verify(mockBulkProcessor).deleteByQuery(queryCaptor.capture(), eq(indices)); - assertTrue(queryCaptor.getValue() instanceof org.opensearch.index.query.MatchAllQueryBuilder); + // Verify getIndexAliases was called for each index + verify(mockSearchClient, times(indices.length)) + .getIndexAliases(any(GetAliasesRequest.class), eq(RequestOptions.DEFAULT)); + + // Verify deleteIndex was called for each index + verify(mockSearchClient, times(indices.length)) + .deleteIndex(any(DeleteIndexRequest.class), eq(RequestOptions.DEFAULT)); + + // Verify deleteByQuery was NOT called (new implementation doesn't use it) + verify(mockBulkProcessor, never()).deleteByQuery(any(QueryBuilder.class), any()); } @Test @@ -181,8 +217,9 @@ public void testClearWithIOException() throws IOException { esWriteDAO.clear(opContext); - // Verify empty array is used when exception occurs - verify(mockBulkProcessor).deleteByQuery(any(QueryBuilder.class), eq(new String[] {})); + // Verify no delete operations when exception occurs + verify(mockSearchClient, never()).deleteIndex(any(DeleteIndexRequest.class), any()); + verify(mockBulkProcessor, never()).deleteByQuery(any(QueryBuilder.class), any()); } @Test @@ -984,13 +1021,43 @@ public void testClearWithWritability(boolean canWrite, String description) throw when(mockSearchClient.getIndex(any(GetIndexRequest.class), eq(RequestOptions.DEFAULT))) .thenReturn(mockResponse); + // Mock getIndexAliases to return empty (meaning concrete index, not alias) + // This will be called once for each index, so we need to return the same response multiple + // times + GetAliasesResponse mockAliasesResponse = mock(GetAliasesResponse.class); + when(mockAliasesResponse.getAliases()).thenReturn(java.util.Collections.emptyMap()); + // Use lenient() to ensure the mock works even if not all calls are verified + lenient() + .when( + mockSearchClient.getIndexAliases( + any(GetAliasesRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockAliasesResponse); + + // Mock indexExists to return true (called once for each index) + lenient() + .when(mockSearchClient.indexExists(any(GetIndexRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(true); + + // Mock deleteIndex + AcknowledgedResponse mockDeleteResponse = mock(AcknowledgedResponse.class); + when(mockDeleteResponse.isAcknowledged()).thenReturn(true); + lenient() + .when( + mockSearchClient.deleteIndex(any(DeleteIndexRequest.class), eq(RequestOptions.DEFAULT))) + .thenReturn(mockDeleteResponse); + esWriteDAO.clear(opContext); if (canWrite) { - verify(mockBulkProcessor, times(1)).deleteByQuery(any(QueryBuilder.class), eq(indices)); + // Verify deleteIndex was called for each index + verify(mockSearchClient, times(indices.length)) + .deleteIndex(any(DeleteIndexRequest.class), eq(RequestOptions.DEFAULT)); } else { - verify(mockBulkProcessor, never()).deleteByQuery(any(QueryBuilder.class), any()); + // When not writable, no operations should be performed + verify(mockSearchClient, never()).deleteIndex(any(DeleteIndexRequest.class), any()); } + // Verify deleteByQuery was NOT called (new implementation doesn't use it) + verify(mockBulkProcessor, never()).deleteByQuery(any(QueryBuilder.class), any()); } @Test(dataProvider = "writabilityConfig") From 5d0e363a4bb94a69580ad66e42c4a018c1bb0ba4 Mon Sep 17 00:00:00 2001 From: jmacryl Date: Mon, 1 Dec 2025 12:15:33 -0800 Subject: [PATCH 4/5] fix tests --- .../search/elasticsearch/update/ESWriteDAO.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java index ee2cfba967051..179064332710c 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java @@ -363,7 +363,20 @@ private String[] getIndices(String pattern) { searchClient.getIndex(new GetIndexRequest(pattern), RequestOptions.DEFAULT); return response.getIndices(); } catch (IOException e) { - log.error("Failed to get indices using pattern {}", pattern); + // Check if it's an index_not_found_exception, which is expected when no indices match + if (e.getMessage() != null && e.getMessage().contains("index_not_found_exception")) { + log.debug("No indices found matching pattern {}", pattern); + return new String[] {}; + } + log.error("Failed to get indices using pattern {}", pattern, e); + return new String[] {}; + } catch (Exception e) { + // Catch any other exceptions (like OpenSearchStatusException which is a RuntimeException) + if (e.getMessage() != null && e.getMessage().contains("index_not_found_exception")) { + log.debug("No indices found matching pattern {}", pattern); + return new String[] {}; + } + log.error("Failed to get indices using pattern {}", pattern, e); return new String[] {}; } } From c99971ab8e91326de84f49173779c1e78d4e6f56 Mon Sep 17 00:00:00 2001 From: jmacryl Date: Mon, 1 Dec 2025 14:54:20 -0800 Subject: [PATCH 5/5] change back so less changes needed to Factories --- .../elasticsearch/ElasticSearchService.java | 23 ++++- .../indexbuilder/ESIndexBuilder.java | 32 +++++-- .../elasticsearch/update/ESWriteDAO.java | 72 ++++----------- .../search/update/ESWriteDAOTest.java | 88 ++++++++----------- .../search/ElasticSearchServiceFactory.java | 20 ++--- 5 files changed, 108 insertions(+), 127 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java index 1f6b3d4381742..6a677656218ef 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java @@ -107,7 +107,28 @@ public List buildReindexConfigsWithNewStructProp( @Override public void clear(@Nonnull OperationContext opContext) { - esWriteDAO.clear(opContext); + Set deletedIndexNames = esWriteDAO.clear(opContext); + + // Recreate the indices that were deleted + if (!deletedIndexNames.isEmpty()) { + try { + List allConfigs = + indexBuilder.buildReindexConfigs( + opContext, settingsBuilder, mappingsBuilder, Collections.emptySet()); + + // Filter to only recreate indices that were deleted + for (ReindexConfig config : allConfigs) { + if (deletedIndexNames.contains(config.name())) { + indexBuilder.buildIndex(config); + log.info("Recreated index {} after clearing", config.name()); + } + } + log.info("Recreated {} indices after clearing", deletedIndexNames.size()); + } catch (IOException e) { + log.error("Failed to recreate indices after clearing", e); + throw new RuntimeException("Failed to recreate indices after clearing", e); + } + } } @Override diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java index 093018ff528e9..ee6206e4dc078 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java @@ -1250,21 +1250,35 @@ private void createIndex(String indexName, ReindexConfig state) throws IOExcepti * @throws IOException If the deletion or creation fails */ public void clearIndex(String indexName, ReindexConfig config) throws IOException { - // Check if the index name is an alias or a concrete index + // The indexName must be either an alias or an actual index GetAliasesRequest getAliasesRequest = new GetAliasesRequest(indexName); - GetAliasesResponse aliasesResponse = - searchClient.getIndexAliases(getAliasesRequest, RequestOptions.DEFAULT); + GetAliasesResponse aliasesResponse; + try { + aliasesResponse = searchClient.getIndexAliases(getAliasesRequest, RequestOptions.DEFAULT); + } catch (IOException | RuntimeException e) { + // If getIndexAliases throws, check if it's because index/alias doesn't exist + if (e.getMessage() != null && e.getMessage().contains("index_not_found_exception")) { + // Check if it's an actual index + boolean indexExists = + searchClient.indexExists(new GetIndexRequest(indexName), RequestOptions.DEFAULT); + if (!indexExists) { + throw new IOException( + "Index or alias " + indexName + " does not exist and cannot be cleared", e); + } + // Index exists but getIndexAliases failed, treat as concrete index (not an alias) + aliasesResponse = null; + } else { + throw new IOException("Failed to get index aliases for " + indexName, e); + } + } Collection indicesToDelete; - if (aliasesResponse.getAliases().isEmpty()) { - // Not an alias, check if it's a concrete index + if (aliasesResponse == null || aliasesResponse.getAliases().isEmpty()) { + // Not an alias, must be a concrete index - verify it exists boolean indexExists = searchClient.indexExists(new GetIndexRequest(indexName), RequestOptions.DEFAULT); if (!indexExists) { - log.info("Index {} does not exist, nothing to clear", indexName); - // Still recreate the index - createIndex(indexName, config); - return; + throw new IOException("Index " + indexName + " does not exist and cannot be cleared"); } indicesToDelete = List.of(indexName); log.info("Deleting concrete index {} for efficient clearing", indexName); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java index 179064332710c..f0257653e006c 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java @@ -7,10 +7,6 @@ import com.google.common.annotations.VisibleForTesting; import com.linkedin.metadata.config.search.BulkDeleteConfiguration; import com.linkedin.metadata.config.search.ElasticSearchConfiguration; -import com.linkedin.metadata.search.elasticsearch.index.MappingsBuilder; -import com.linkedin.metadata.search.elasticsearch.index.SettingsBuilder; -import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; -import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexConfig; import com.linkedin.metadata.utils.elasticsearch.SearchClientShim; import com.linkedin.metadata.utils.elasticsearch.responses.GetIndexResponse; import io.datahubproject.metadata.context.OperationContext; @@ -57,33 +53,15 @@ public class ESWriteDAO { private final ElasticSearchConfiguration config; private final SearchClientShim searchClient; @Getter private final ESBulkProcessor bulkProcessor; - @Nullable private final ESIndexBuilder indexBuilder; - @Nullable private final MappingsBuilder mappingsBuilder; - @Nullable private final SettingsBuilder settingsBuilder; private boolean canWrite = true; - // Constructor with index recreation support public ESWriteDAO( ElasticSearchConfiguration config, SearchClientShim searchClient, - ESBulkProcessor bulkProcessor, - @Nullable ESIndexBuilder indexBuilder, - @Nullable MappingsBuilder mappingsBuilder, - @Nullable SettingsBuilder settingsBuilder) { + ESBulkProcessor bulkProcessor) { this.config = config; this.searchClient = searchClient; this.bulkProcessor = bulkProcessor; - this.indexBuilder = indexBuilder; - this.mappingsBuilder = mappingsBuilder; - this.settingsBuilder = settingsBuilder; - } - - // Constructor for backward compatibility (without index recreation) - public ESWriteDAO( - ElasticSearchConfiguration config, - SearchClientShim searchClient, - ESBulkProcessor bulkProcessor) { - this(config, searchClient, bulkProcessor, null, null, null); } public void setWritable(boolean writable) { @@ -259,11 +237,19 @@ public void applyScriptUpdate( bulkProcessor.add(updateRequest); } - /** Clear all documents in all the indices */ - public void clear(@Nonnull OperationContext opContext) { + /** + * Clear all documents in all the indices by deleting them. Returns the set of index names + * (aliases or concrete indices) that were deleted. The caller is responsible for recreating these + * indices if needed. + * + * @param opContext the operation context + * @return set of index names that were deleted + */ + @Nonnull + public Set clear(@Nonnull OperationContext opContext) { if (!canWrite) { log.warn(READ_ONLY_LOG); - return; + return Collections.emptySet(); } List patterns = opContext @@ -275,7 +261,7 @@ public void clear(@Nonnull OperationContext opContext) { allIndices.addAll(Arrays.asList(getIndices(pattern))); } - // Track which indices (aliases or concrete) were deleted so we can recreate them + // Track which indices (aliases or concrete) were deleted so the caller can recreate them Set deletedIndexNames = new HashSet<>(); // Instead of deleting all documents (inefficient), delete the indices themselves @@ -327,34 +313,14 @@ public void clear(@Nonnull OperationContext opContext) { } } - // Recreate only the indices that were deleted, with their proper mappings and settings - if (!deletedIndexNames.isEmpty() - && indexBuilder != null - && mappingsBuilder != null - && settingsBuilder != null) { - try { - List allConfigs = - indexBuilder.buildReindexConfigs( - opContext, settingsBuilder, mappingsBuilder, Collections.emptySet()); - - // Filter to only recreate indices that were deleted - for (ReindexConfig config : allConfigs) { - if (deletedIndexNames.contains(config.name())) { - indexBuilder.buildIndex(config); - log.info("Recreated index {} after clearing", config.name()); - } - } - log.info("Recreated {} indices after clearing", deletedIndexNames.size()); - } catch (IOException e) { - log.error("Failed to recreate indices after clearing", e); - throw new RuntimeException("Failed to recreate indices after clearing", e); - } - } else if (deletedIndexNames.isEmpty()) { - log.info("No indices were deleted, nothing to recreate"); + if (deletedIndexNames.isEmpty()) { + log.info("No indices were deleted"); } else { - log.warn( - "Index builders not available - indices will be recreated automatically when documents are added"); + log.info( + "Deleted {} indices. Caller should recreate them if needed.", deletedIndexNames.size()); } + + return deletedIndexNames; } private String[] getIndices(String pattern) { diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/update/ESWriteDAOTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/update/ESWriteDAOTest.java index 5251a29e0f983..4a6e3937676c7 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/update/ESWriteDAOTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/update/ESWriteDAOTest.java @@ -3,7 +3,6 @@ import static io.datahubproject.test.search.SearchTestUtils.TEST_OS_SEARCH_CONFIG; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -29,6 +28,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.mockito.ArgumentCaptor; @@ -165,32 +165,23 @@ public void testClear() throws IOException { when(mockSearchClient.getIndex(any(GetIndexRequest.class), eq(RequestOptions.DEFAULT))) .thenReturn(mockResponse); - // Mock getIndexAliases to return empty (meaning concrete index, not alias) - // This will be called once for each index, so we need to return the same response multiple - // times + // Mock getIndexAliases to return empty aliases (concrete indices, not aliases) GetAliasesResponse mockAliasesResponse = mock(GetAliasesResponse.class); when(mockAliasesResponse.getAliases()).thenReturn(java.util.Collections.emptyMap()); - // Use lenient() to ensure the mock works even if not all calls are verified - lenient() - .when( - mockSearchClient.getIndexAliases( - any(GetAliasesRequest.class), eq(RequestOptions.DEFAULT))) + when(mockSearchClient.getIndexAliases(any(GetAliasesRequest.class), eq(RequestOptions.DEFAULT))) .thenReturn(mockAliasesResponse); - // Mock indexExists to return true (called once for each index) - lenient() - .when(mockSearchClient.indexExists(any(GetIndexRequest.class), eq(RequestOptions.DEFAULT))) + // Mock indexExists to return true + when(mockSearchClient.indexExists(any(GetIndexRequest.class), eq(RequestOptions.DEFAULT))) .thenReturn(true); - // Mock deleteIndex + // Mock deleteIndex to return acknowledged response AcknowledgedResponse mockDeleteResponse = mock(AcknowledgedResponse.class); when(mockDeleteResponse.isAcknowledged()).thenReturn(true); - lenient() - .when( - mockSearchClient.deleteIndex(any(DeleteIndexRequest.class), eq(RequestOptions.DEFAULT))) + when(mockSearchClient.deleteIndex(any(DeleteIndexRequest.class), eq(RequestOptions.DEFAULT))) .thenReturn(mockDeleteResponse); - esWriteDAO.clear(opContext); + Set deletedIndices = esWriteDAO.clear(opContext); // Verify the GetIndexRequest ArgumentCaptor indexRequestCaptor = @@ -198,16 +189,16 @@ public void testClear() throws IOException { verify(mockSearchClient).getIndex(indexRequestCaptor.capture(), eq(RequestOptions.DEFAULT)); assertEquals(indexRequestCaptor.getValue().indices()[0], TEST_PATTERN); - // Verify getIndexAliases was called for each index - verify(mockSearchClient, times(indices.length)) - .getIndexAliases(any(GetAliasesRequest.class), eq(RequestOptions.DEFAULT)); + // Verify indices were deleted + assertEquals(deletedIndices.size(), 2); + assertTrue(deletedIndices.contains("index1")); + assertTrue(deletedIndices.contains("index2")); // Verify deleteIndex was called for each index - verify(mockSearchClient, times(indices.length)) - .deleteIndex(any(DeleteIndexRequest.class), eq(RequestOptions.DEFAULT)); - - // Verify deleteByQuery was NOT called (new implementation doesn't use it) - verify(mockBulkProcessor, never()).deleteByQuery(any(QueryBuilder.class), any()); + ArgumentCaptor deleteRequestCaptor = + ArgumentCaptor.forClass(DeleteIndexRequest.class); + verify(mockSearchClient, times(2)) + .deleteIndex(deleteRequestCaptor.capture(), eq(RequestOptions.DEFAULT)); } @Test @@ -215,11 +206,13 @@ public void testClearWithIOException() throws IOException { when(mockSearchClient.getIndex(any(GetIndexRequest.class), eq(RequestOptions.DEFAULT))) .thenThrow(new IOException("Test exception")); - esWriteDAO.clear(opContext); + Set deletedIndices = esWriteDAO.clear(opContext); - // Verify no delete operations when exception occurs - verify(mockSearchClient, never()).deleteIndex(any(DeleteIndexRequest.class), any()); - verify(mockBulkProcessor, never()).deleteByQuery(any(QueryBuilder.class), any()); + // Verify no indices were deleted when exception occurs + assertTrue(deletedIndices.isEmpty()); + // Verify deleteIndex was never called + verify(mockSearchClient, never()) + .deleteIndex(any(DeleteIndexRequest.class), eq(RequestOptions.DEFAULT)); } @Test @@ -1021,43 +1014,38 @@ public void testClearWithWritability(boolean canWrite, String description) throw when(mockSearchClient.getIndex(any(GetIndexRequest.class), eq(RequestOptions.DEFAULT))) .thenReturn(mockResponse); - // Mock getIndexAliases to return empty (meaning concrete index, not alias) - // This will be called once for each index, so we need to return the same response multiple - // times + // Mock getIndexAliases to return empty aliases (concrete indices, not aliases) GetAliasesResponse mockAliasesResponse = mock(GetAliasesResponse.class); when(mockAliasesResponse.getAliases()).thenReturn(java.util.Collections.emptyMap()); - // Use lenient() to ensure the mock works even if not all calls are verified - lenient() - .when( - mockSearchClient.getIndexAliases( - any(GetAliasesRequest.class), eq(RequestOptions.DEFAULT))) + when(mockSearchClient.getIndexAliases(any(GetAliasesRequest.class), eq(RequestOptions.DEFAULT))) .thenReturn(mockAliasesResponse); - // Mock indexExists to return true (called once for each index) - lenient() - .when(mockSearchClient.indexExists(any(GetIndexRequest.class), eq(RequestOptions.DEFAULT))) + // Mock indexExists to return true + when(mockSearchClient.indexExists(any(GetIndexRequest.class), eq(RequestOptions.DEFAULT))) .thenReturn(true); - // Mock deleteIndex + // Mock deleteIndex to return acknowledged response AcknowledgedResponse mockDeleteResponse = mock(AcknowledgedResponse.class); when(mockDeleteResponse.isAcknowledged()).thenReturn(true); - lenient() - .when( - mockSearchClient.deleteIndex(any(DeleteIndexRequest.class), eq(RequestOptions.DEFAULT))) + when(mockSearchClient.deleteIndex(any(DeleteIndexRequest.class), eq(RequestOptions.DEFAULT))) .thenReturn(mockDeleteResponse); - esWriteDAO.clear(opContext); + Set deletedIndices = esWriteDAO.clear(opContext); if (canWrite) { + // Verify indices were deleted + assertEquals(deletedIndices.size(), 2); + assertTrue(deletedIndices.contains("index1")); + assertTrue(deletedIndices.contains("index2")); // Verify deleteIndex was called for each index - verify(mockSearchClient, times(indices.length)) + verify(mockSearchClient, times(2)) .deleteIndex(any(DeleteIndexRequest.class), eq(RequestOptions.DEFAULT)); } else { - // When not writable, no operations should be performed - verify(mockSearchClient, never()).deleteIndex(any(DeleteIndexRequest.class), any()); + // Verify no indices were deleted when not writable + assertTrue(deletedIndices.isEmpty()); + verify(mockSearchClient, never()) + .deleteIndex(any(DeleteIndexRequest.class), eq(RequestOptions.DEFAULT)); } - // Verify deleteByQuery was NOT called (new implementation doesn't use it) - verify(mockBulkProcessor, never()).deleteByQuery(any(QueryBuilder.class), any()); } @Test(dataProvider = "writabilityConfig") diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java index 142db982af87b..3344b538b262c 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java @@ -30,14 +30,10 @@ public class ElasticSearchServiceFactory { @Qualifier("baseElasticSearchComponents") private BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components; - @Autowired(required = false) + @Autowired @Qualifier("settingsBuilder") private SettingsBuilder settingsBuilder; - @Autowired(required = false) - @Qualifier("mappingsBuilder") - private MappingsBuilder mappingsBuilder; - @Autowired @Qualifier("entityRegistry") private EntityRegistry entityRegistry; @@ -77,15 +73,9 @@ protected ESSearchDAO esSearchDAO( @Bean protected ESWriteDAO esWriteDAO(final ConfigurationProvider configurationProvider) { - // Builders may be null in test contexts - that's fine, ESWriteDAO handles null builders ESWriteDAO esWriteDAO = new ESWriteDAO( - components.getConfig(), - components.getSearchClient(), - components.getBulkProcessor(), - components.getIndexBuilder(), - mappingsBuilder, - settingsBuilder); + components.getConfig(), components.getSearchClient(), components.getBulkProcessor()); if (configurationProvider.getDatahub().isReadOnly()) { esWriteDAO.setWritable(false); } @@ -100,9 +90,11 @@ protected ElasticSearchService getInstance( final ElasticSearchConfiguration elasticSearchConfiguration, @Nullable final CustomSearchConfiguration customSearchConfiguration, final ESSearchDAO esSearchDAO, - final ESWriteDAO esWriteDAO) + final ESWriteDAO esWriteDAO, + @Qualifier("mappingsBuilder") final MappingsBuilder mappingsBuilder, + @Qualifier("settingsBuilder") final SettingsBuilder settingsBuilder) throws IOException { - // Use the field-injected builders (may be null in test contexts) + return new ElasticSearchService( components.getIndexBuilder(), configurationProvider.getSearchService(),