Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,19 @@ public List<ReindexConfig> buildReindexConfigs(

@Override
public void clear() {
esBulkProcessor.deleteByQuery(
QueryBuilders.matchAllQuery(), true, indexConvention.getIndexName(INDEX_NAME));
// Instead of deleting all documents (inefficient), delete and recreate the index
String indexName = indexConvention.getIndexName(INDEX_NAME);
try {
// Build the reindex config directly without needing OperationContext
ReindexConfig config =
indexBuilder.buildReindexState(
indexName, GraphRelationshipMappingsBuilder.getMappings(), Collections.emptyMap());
indexBuilder.clearIndex(indexName, config);
log.info("Cleared index {} by deleting and recreating it", indexName);
} catch (IOException e) {
log.error("Failed to clear index {}", indexName, e);
throw new RuntimeException("Failed to clear index: " + indexName, e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,28 @@ public List<ReindexConfig> buildReindexConfigsWithNewStructProp(

@Override
public void clear(@Nonnull OperationContext opContext) {
esWriteDAO.clear(opContext);
Set<String> deletedIndexNames = esWriteDAO.clear(opContext);

// Recreate the indices that were deleted
if (!deletedIndexNames.isEmpty()) {
try {
List<ReindexConfig> allConfigs =
indexBuilder.buildReindexConfigs(
opContext, settingsBuilder, mappingsBuilder, Collections.emptySet());

// Filter to only recreate indices that were deleted
for (ReindexConfig config : allConfigs) {
if (deletedIndexNames.contains(config.name())) {
indexBuilder.buildIndex(config);
log.info("Recreated index {} after clearing", config.name());
}
}
log.info("Recreated {} indices after clearing", deletedIndexNames.size());
} catch (IOException e) {
log.error("Failed to recreate indices after clearing", e);
throw new RuntimeException("Failed to recreate indices after clearing", e);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.*;
import org.opensearch.client.GetAliasesResponse;
import org.opensearch.client.core.CountRequest;
import org.opensearch.client.core.CountResponse;
import org.opensearch.client.indices.CreateIndexRequest;
Expand Down Expand Up @@ -1240,6 +1241,72 @@ private void createIndex(String indexName, ReindexConfig state) throws IOExcepti
log.info("Created index {}", indexName);
}

/**
* Efficiently clear an index by deleting it and recreating it with the same configuration. This
* is much more efficient than deleting all documents using deleteByQuery.
*
* @param indexName The name of the index to clear (can be an alias or concrete index)
* @param config The ReindexConfig containing mappings and settings for the index
* @throws IOException If the deletion or creation fails
*/
public void clearIndex(String indexName, ReindexConfig config) throws IOException {
// The indexName must be either an alias or an actual index
GetAliasesRequest getAliasesRequest = new GetAliasesRequest(indexName);
GetAliasesResponse aliasesResponse;
try {
aliasesResponse = searchClient.getIndexAliases(getAliasesRequest, RequestOptions.DEFAULT);
} catch (IOException | RuntimeException e) {
// If getIndexAliases throws, check if it's because index/alias doesn't exist
if (e.getMessage() != null && e.getMessage().contains("index_not_found_exception")) {
// Check if it's an actual index
boolean indexExists =
searchClient.indexExists(new GetIndexRequest(indexName), RequestOptions.DEFAULT);
if (!indexExists) {
throw new IOException(
"Index or alias " + indexName + " does not exist and cannot be cleared", e);
}
// Index exists but getIndexAliases failed, treat as concrete index (not an alias)
aliasesResponse = null;
} else {
throw new IOException("Failed to get index aliases for " + indexName, e);
}
}

Collection<String> indicesToDelete;
if (aliasesResponse == null || aliasesResponse.getAliases().isEmpty()) {
// Not an alias, must be a concrete index - verify it exists
boolean indexExists =
searchClient.indexExists(new GetIndexRequest(indexName), RequestOptions.DEFAULT);
if (!indexExists) {
throw new IOException("Index " + indexName + " does not exist and cannot be cleared");
}
indicesToDelete = List.of(indexName);
log.info("Deleting concrete index {} for efficient clearing", indexName);
} else {
// It's an alias, delete the concrete indices behind it
indicesToDelete = aliasesResponse.getAliases().keySet();
log.info(
"Deleting concrete indices {} behind alias {} for efficient clearing",
indicesToDelete,
indexName);
}

// Delete the concrete indices
for (String concreteIndex : indicesToDelete) {
try {
deleteActionWithRetry(searchClient, concreteIndex);
} catch (Exception e) {
log.error("Failed to delete index {} during clear operation", concreteIndex, e);
throw new IOException("Failed to delete index: " + concreteIndex, e);
}
}

// Recreate the index with the same mappings and settings
// This will recreate the alias if the original was an alias
log.info("Recreating index {} after clearing", indexName);
createIndex(indexName, config);
}

public static void cleanOrphanedIndices(
SearchClientShim<?> searchClient,
ElasticSearchConfiguration esConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,26 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.GetAliasesResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.core.CountRequest;
import org.opensearch.client.core.CountResponse;
Expand All @@ -38,19 +44,26 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.reindex.DeleteByQueryRequest;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;

@Slf4j
@RequiredArgsConstructor
public class ESWriteDAO {
private final ElasticSearchConfiguration config;
private final SearchClientShim<?> searchClient;
@Getter private final ESBulkProcessor bulkProcessor;
private boolean canWrite = true;

public ESWriteDAO(
ElasticSearchConfiguration config,
SearchClientShim<?> searchClient,
ESBulkProcessor bulkProcessor) {
this.config = config;
this.searchClient = searchClient;
this.bulkProcessor = bulkProcessor;
}

public void setWritable(boolean writable) {
canWrite = writable;
}
Expand Down Expand Up @@ -224,11 +237,19 @@ public void applyScriptUpdate(
bulkProcessor.add(updateRequest);
}

/** Clear all documents in all the indices */
public void clear(@Nonnull OperationContext opContext) {
/**
* Clear all documents in all the indices by deleting them. Returns the set of index names
* (aliases or concrete indices) that were deleted. The caller is responsible for recreating these
* indices if needed.
*
* @param opContext the operation context
* @return set of index names that were deleted
*/
@Nonnull
public Set<String> clear(@Nonnull OperationContext opContext) {
if (!canWrite) {
log.warn(READ_ONLY_LOG);
return;
return Collections.emptySet();
}
List<String> patterns =
opContext
Expand All @@ -239,7 +260,67 @@ public void clear(@Nonnull OperationContext opContext) {
for (String pattern : patterns) {
allIndices.addAll(Arrays.asList(getIndices(pattern)));
}
bulkProcessor.deleteByQuery(QueryBuilders.matchAllQuery(), allIndices.toArray(new String[0]));

// Track which indices (aliases or concrete) were deleted so the caller can recreate them
Set<String> deletedIndexNames = new HashSet<>();

// Instead of deleting all documents (inefficient), delete the indices themselves
for (String indexName : allIndices) {
try {
// Check if the index name is an alias or a concrete index
GetAliasesRequest getAliasesRequest = new GetAliasesRequest(indexName);
GetAliasesResponse aliasesResponse =
searchClient.getIndexAliases(getAliasesRequest, RequestOptions.DEFAULT);
Collection<String> indicesToDelete;
if (aliasesResponse.getAliases().isEmpty()) {
// Not an alias, check if it's a concrete index
boolean indexExists =
searchClient.indexExists(new GetIndexRequest(indexName), RequestOptions.DEFAULT);
if (!indexExists) {
log.debug("Index {} does not exist, skipping", indexName);
continue;
}
// If it wasn't an alias, track the concrete index name
deletedIndexNames.add(indexName);
indicesToDelete = List.of(indexName);
log.info("Deleting concrete index {} for efficient clearing", indexName);
} else {
// It's an alias, delete the concrete indices behind it
indicesToDelete = aliasesResponse.getAliases().keySet();
log.info(
"Deleting concrete indices {} behind alias {} for efficient clearing",
indicesToDelete,
indexName);
// Track the alias name, not the concrete indices
deletedIndexNames.add(indexName);
}

// Delete the concrete indices
for (String concreteIndex : indicesToDelete) {
try {
DeleteIndexRequest deleteRequest = new DeleteIndexRequest(concreteIndex);
deleteRequest.timeout(org.opensearch.common.unit.TimeValue.timeValueSeconds(30));
searchClient.deleteIndex(deleteRequest, RequestOptions.DEFAULT);
log.info("Successfully deleted index {}", concreteIndex);
} catch (IOException e) {
log.error("Failed to delete index {} during clear operation", concreteIndex, e);
throw new RuntimeException("Failed to delete index: " + concreteIndex, e);
}
}
} catch (IOException e) {
log.error("Failed to check aliases for index {} during clear operation", indexName, e);
throw new RuntimeException("Failed to clear index: " + indexName, e);
}
}

if (deletedIndexNames.isEmpty()) {
log.info("No indices were deleted");
} else {
log.info(
"Deleted {} indices. Caller should recreate them if needed.", deletedIndexNames.size());
}

return deletedIndexNames;
}

private String[] getIndices(String pattern) {
Expand All @@ -248,7 +329,20 @@ private String[] getIndices(String pattern) {
searchClient.getIndex(new GetIndexRequest(pattern), RequestOptions.DEFAULT);
return response.getIndices();
} catch (IOException e) {
log.error("Failed to get indices using pattern {}", pattern);
// Check if it's an index_not_found_exception, which is expected when no indices match
if (e.getMessage() != null && e.getMessage().contains("index_not_found_exception")) {
log.debug("No indices found matching pattern {}", pattern);
return new String[] {};
}
log.error("Failed to get indices using pattern {}", pattern, e);
return new String[] {};
} catch (Exception e) {
// Catch any other exceptions (like OpenSearchStatusException which is a RuntimeException)
if (e.getMessage() != null && e.getMessage().contains("index_not_found_exception")) {
log.debug("No indices found matching pattern {}", pattern);
return new String[] {};
}
log.error("Failed to get indices using pattern {}", pattern, e);
return new String[] {};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,19 @@ public List<ReindexConfig> buildReindexConfigs(

@Override
public void clear() {
_esBulkProcessor.deleteByQuery(
QueryBuilders.matchAllQuery(), true, _indexConvention.getIndexName(INDEX_NAME));
// Instead of deleting all documents (inefficient), delete and recreate the index
String indexName = _indexConvention.getIndexName(INDEX_NAME);
try {
// Build the reindex config directly without needing OperationContext
ReindexConfig config =
_indexBuilder.buildReindexState(
indexName, SystemMetadataMappingsBuilder.getMappings(), Collections.emptyMap());
_indexBuilder.clearIndex(indexName, config);
log.info("Cleared index {} by deleting and recreating it", indexName);
} catch (IOException e) {
log.error("Failed to clear index {}", indexName, e);
throw new RuntimeException("Failed to clear index: " + indexName, e);
}
}

@Override
Expand Down
Loading
Loading