Skip to content

Computing full effective mappings for data stream mapping APIs #130498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.RequestBuilder;
Expand Down Expand Up @@ -41,6 +42,7 @@
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.admin.indices.template.delete.TransportDeleteComposableIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.get.GetComposableIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequestBuilder;
Expand All @@ -52,6 +54,7 @@
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.datastreams.GetDataStreamAction.Response.DataStreamInfo;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.datastreams.UpdateDataStreamMappingsAction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.MultiSearchRequestBuilder;
Expand All @@ -62,6 +65,7 @@
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAction;
Expand All @@ -72,20 +76,24 @@
import org.elasticsearch.cluster.metadata.IndexMetadataStats;
import org.elasticsearch.cluster.metadata.IndexWriteLoad;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -2411,6 +2419,159 @@ public void testShardSizeIsForecastedDuringRollover() throws Exception {
assertThat(forecastedShardSizeInBytes.getAsLong(), is(equalTo(expectedTotalSizeInBytes / shardCount)));
}

public void testGetEffectiveMappings() throws Exception {
/*
* This test creates a composable template with a mapping and with two component templates with mappings. It then makes sure that
* DataStream.getEffectiveMappings returns a mapping that merges the template's mapping, the component templates' mappings, and the
* mapping override given. It then makes sure we get the same result calling the non-static version of getEffectiveMappings.
*/
ComposableIndexTemplate composableIndexTemplate;
{
ComponentTemplate ct1 = new ComponentTemplate(new Template(null, new CompressedXContent("""
{
"_doc":{
"dynamic":"strict",
"properties":{
"field1":{
"type":"text"
}
}
}
}
"""), null), 3L, null);
ComponentTemplate ct2 = new ComponentTemplate(new Template(null, new CompressedXContent("""
{
"_doc":{
"dynamic":"strict",
"properties":{
"field2":{
"type":"text"
}
}
}
}
"""), null), 3L, null);
client().execute(PutComponentTemplateAction.INSTANCE, new PutComponentTemplateAction.Request("ct1").componentTemplate(ct1))
.get();
client().execute(PutComponentTemplateAction.INSTANCE, new PutComponentTemplateAction.Request("ct2").componentTemplate(ct2))
.get();

List<String> componentTemplates = List.of("ct1", "ct2");
String templateName = "effective-mapping-template";
TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(templateName);
request.indexTemplate(
ComposableIndexTemplate.builder()
.indexPatterns(List.of("effective-*"))
.template(Template.builder().mappings(CompressedXContent.fromJSON("""
{
"_doc":{
"dynamic":"strict",
"properties":{
"field3":{
"type":"text"
}
}
}
}
""")))
.componentTemplates(componentTemplates)
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build()
);
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
GetComposableIndexTemplateAction.Response getTemplateResponse = client().execute(
GetComposableIndexTemplateAction.INSTANCE,
new GetComposableIndexTemplateAction.Request(TEST_REQUEST_TIMEOUT, templateName)
).actionGet();
composableIndexTemplate = getTemplateResponse.indexTemplates().values().iterator().next();
}
// The mappingOverrides changes the type of one field, and adds another field:
CompressedXContent mappingOverrides = new CompressedXContent("""
{
"properties":{
"field1":{
"type":"keyword"
},
"field4":{
"type":"keyword"
}
}
}
""");

String dataStreamName = "effective-mappings-test";
Index writeIndex;
{
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
dataStreamName
);
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
writeIndex = getDataStream(dataStreamName).getWriteIndex();
}

ProjectMetadata projectMetadata = client().admin()
.cluster()
.state(new ClusterStateRequest(TEST_REQUEST_TIMEOUT))
.get()
.getState()
.metadata()
.getProject(ProjectId.DEFAULT);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class);
CompressedXContent effectiveMappings = DataStream.getEffectiveMappings(
projectMetadata,
composableIndexTemplate,
mappingOverrides,
writeIndex,
indicesService
);
assertNotNull(effectiveMappings);
Map<String, Object> effectiveMappingMap = XContentHelper.convertToMap(effectiveMappings.uncompressed(), true, XContentType.JSON)
.v2();
Map<String, Object> expectedEffectiveMappingMap = Map.of(
"dynamic",
"strict",
"_data_stream_timestamp",
Map.of("enabled", true),
"properties",
Map.of(
"@timestamp",
Map.of("type", "date"),
"field1",
Map.of("type", "keyword"),
"field2",
Map.of("type", "text"),
"field3",
Map.of("type", "text"),
"field4",
Map.of("type", "keyword")
)
);
assertThat(effectiveMappingMap, equalTo(expectedEffectiveMappingMap));

// Add the same mappingOverrides to the data stream:
client().execute(
new ActionType<UpdateDataStreamMappingsAction.Response>(UpdateDataStreamMappingsAction.NAME),
new UpdateDataStreamMappingsAction.Request(mappingOverrides, false, TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).indices(
dataStreamName
)
).actionGet();
assertThat(getDataStream(dataStreamName).getEffectiveMappings(projectMetadata, indicesService), equalTo(effectiveMappings));
}

private DataStream getDataStream(String dataStreamName) throws ExecutionException, InterruptedException {
return client().admin()
.cluster()
.state(new ClusterStateRequest(TEST_REQUEST_TIMEOUT))
.get()
.getState()
.getMetadata()
.getProject(ProjectId.DEFAULT)
.dataStreams()
.get(dataStreamName);
}

private void indexDocsAndEnsureThereIsCapturedWriteLoad(String dataStreamName) throws Exception {
assertBusy(() -> {
for (int i = 0; i < 10; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ public void testGetAndUpdateMappings() throws IOException {
Map<String, Object> originalMappings = Map.of(
"dynamic",
"strict",
"_data_stream_timestamp",
Map.of("enabled", true),
"properties",
Map.of("foo1", Map.of("type", "text"), "foo2", Map.of("type", "text"))
Map.of("@timestamp", Map.of("type", "date"), "foo1", Map.of("type", "text"), "foo2", Map.of("type", "text"))
);
Map<String, Object> mappingOverrides = Map.of(
"properties",
Expand All @@ -67,8 +69,19 @@ public void testGetAndUpdateMappings() throws IOException {
Map<String, Object> expectedEffectiveMappings = Map.of(
"dynamic",
"strict",
"_data_stream_timestamp",
Map.of("enabled", true),
"properties",
Map.of("foo1", Map.of("type", "text"), "foo2", Map.of("type", "keyword"), "foo3", Map.of("type", "text"))
Map.of(
"@timestamp",
Map.of("type", "date"),
"foo1",
Map.of("type", "text"),
"foo2",
Map.of("type", "keyword"),
"foo3",
Map.of("type", "text")
)
);
assertExpectedMappings(dataStreamName, Map.of(), originalMappings);
updateMappings(dataStreamName, mappingOverrides, expectedEffectiveMappings, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -35,6 +36,7 @@ public class TransportGetDataStreamMappingsAction extends TransportLocalProjectM
GetDataStreamMappingsAction.Request,
GetDataStreamMappingsAction.Response> {
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final IndicesService indicesService;

@Inject
public TransportGetDataStreamMappingsAction(
Expand All @@ -43,7 +45,8 @@ public TransportGetDataStreamMappingsAction(
ThreadPool threadPool,
ActionFilters actionFilters,
ProjectResolver projectResolver,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
IndicesService indicesService
) {
super(
GetSettingsAction.NAME,
Expand All @@ -54,6 +57,7 @@ public TransportGetDataStreamMappingsAction(
projectResolver
);
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indicesService = indicesService;
}

@Override
Expand Down Expand Up @@ -81,7 +85,7 @@ protected void localClusterStateOperation(
new GetDataStreamMappingsAction.DataStreamMappingsResponse(
dataStreamName,
dataStream.getMappings(),
dataStream.getEffectiveMappings(project.metadata())
dataStream.getEffectiveMappings(project.metadata(), indicesService)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
Expand All @@ -47,6 +48,7 @@ public class TransportUpdateDataStreamMappingsAction extends TransportMasterNode
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final SystemIndices systemIndices;
private final ProjectResolver projectResolver;
private final IndicesService indicesService;

@Inject
public TransportUpdateDataStreamMappingsAction(
Expand All @@ -57,7 +59,8 @@ public TransportUpdateDataStreamMappingsAction(
ProjectResolver projectResolver,
MetadataDataStreamsService metadataDataStreamsService,
IndexNameExpressionResolver indexNameExpressionResolver,
SystemIndices systemIndices
SystemIndices systemIndices,
IndicesService indicesService
) {
super(
UpdateDataStreamMappingsAction.NAME,
Expand All @@ -73,6 +76,7 @@ public TransportUpdateDataStreamMappingsAction(
this.metadataDataStreamsService = metadataDataStreamsService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.systemIndices = systemIndices;
this.indicesService = indicesService;
}

@Override
Expand Down Expand Up @@ -163,7 +167,7 @@ private void updateSingleDataStream(
true,
null,
mappingsOverrides,
dataStream.getEffectiveMappings(clusterService.state().metadata().getProject(projectId))
dataStream.getEffectiveMappings(clusterService.state().metadata().getProject(projectId), indicesService)
)
);
} catch (IOException e) {
Expand Down
Loading