Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
---
"Test get sample with multiple reroutes":
- requires:
cluster_features: [ "random_sampling" ]
reason: requires feature 'random_sampling' to get random samples

- do:
ingest.put_pipeline:
id: pipeline1
body: >
{
"processors" : [
{
"set" : {
"field": "message",
"value": "set by pipeline1"
}
},
{
"reroute" : {
"destination": "foo.bar"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: pipeline2
body: >
{
"processors" : [
{
"set" : {
"field": "message",
"value": "set by pipeline2"
}
},
{
"reroute" : {
"destination": "foo.bar.baz"
}
}
]
}
- match: { acknowledged: true }

- do:
indices.put_index_template:
name: my-template1
body:
index_patterns: [foo]
template:
settings:
default_pipeline: pipeline1
index.number_of_shards: 1
index.number_of_replicas: 0
mappings:
dynamic: strict
properties:
message:
type: text
data_stream: {}
- match: { acknowledged: true }

- do:
indices.put_index_template:
name: my-template2
body:
index_patterns: [foo.bar]
template:
settings:
default_pipeline: pipeline2
index.number_of_shards: 1
index.number_of_replicas: 0
mappings:
dynamic: strict
properties:
message:
type: text
data_stream: {}
- match: { acknowledged: true }

- do:
indices.put_index_template:
name: my-template3
body:
index_patterns: [foo.bar.baz]
template:
settings:
index.number_of_shards: 1
index.number_of_replicas: 0
mappings:
dynamic: strict
properties:
message:
type: text
data_stream: {}
- match: { acknowledged: true }

- do:
indices.create_data_stream:
name: foo
- is_true: acknowledged

- do:
indices.create_data_stream:
name: foo.bar
- is_true: acknowledged

- do:
indices.create_data_stream:
name: foo.bar.baz
- is_true: acknowledged

- do:
indices.rollover:
alias: foo.bar.baz
wait_for_active_shards: 1
- match: { rolled_over: true }

- do:
indices.put_sample_configuration:
index: foo
body:
rate: 1.0
max_samples: 100

- do:
indices.put_sample_configuration:
index: foo.bar
body:
rate: 1.0
max_samples: 100

- do:
indices.put_sample_configuration:
index: foo.bar.baz
body:
rate: 1.0
max_samples: 100

- do:
bulk:
refresh: true
body:
- '{ "create":{"_index": "foo" } }'
- '{"@timestamp": 123456, "message": "This is the original message"}'
- '{ "create":{"_index": "foo" } }'
- '{"@timestamp": 123456, "message": "This is the original message"}'
- match: { errors: false }

- do:
indices.get_sample:
index: foo
- length: { sample: 2 }
- match: { sample.0.index: "foo" }
- match: { sample.0.source.message: "This is the original message" }
- match: { sample.1.source.message: "This is the original message" }

- do:
indices.get_sample:
index: foo.bar
- length: { sample: 2 }
- match: { sample.0.index: "foo.bar" }
- match: { sample.0.source.message: "This is the original message" }
- match: { sample.1.source.message: "This is the original message" }

- do:
indices.get_sample:
index: foo.bar.baz
- length: { sample: 2 }
- match: { sample.0.index: "foo.bar.baz" }
- match: { sample.0.source.message: "This is the original message" }
- match: { sample.1.source.message: "This is the original message" }

---
teardown:
- requires:
cluster_features: [ "random_sampling" ]
reason: requires feature 'random_sampling' to get random samples

- do:
indices.delete_data_stream:
name: foo*
27 changes: 7 additions & 20 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1040,14 +1040,7 @@ public void onFailure(Exception e) {
}
);

executePipelines(
pipelines,
indexRequest,
ingestDocument,
adaptedResolveFailureStore,
documentListener,
originalDocumentMetadata
);
executePipelines(pipelines, indexRequest, ingestDocument, adaptedResolveFailureStore, documentListener);
assert actionRequest.index() != null;

i++;
Expand Down Expand Up @@ -1166,8 +1159,7 @@ private void executePipelines(
final IndexRequest indexRequest,
final IngestDocument ingestDocument,
final Function<String, Boolean> resolveFailureStore,
final ActionListener<IngestPipelinesExecutionResult> listener,
final Metadata originalDocumentMetadata
final ActionListener<IngestPipelinesExecutionResult> listener
) {
assert pipelines.hasNext();
PipelineSlot slot = pipelines.next();
Expand Down Expand Up @@ -1353,14 +1345,14 @@ private void executePipelines(
}

if (newPipelines.hasNext()) {
executePipelines(newPipelines, indexRequest, ingestDocument, resolveFailureStore, listener, originalDocumentMetadata);
executePipelines(newPipelines, indexRequest, ingestDocument, resolveFailureStore, listener);
} else {
/*
* At this point, all pipelines have been executed, and we are about to overwrite ingestDocument with the results.
* This is our chance to sample with both the original document and all changes.
*/
haveAttemptedSampling.set(true);
attemptToSampleData(project, indexRequest, ingestDocument, originalDocumentMetadata);
attemptToSampleData(project, indexRequest, ingestDocument);
updateIndexRequestSource(indexRequest, ingestDocument);
cacheRawTimestamp(indexRequest, ingestDocument);
listener.onResponse(IngestPipelinesExecutionResult.SUCCESSFUL_RESULT); // document succeeded!
Expand All @@ -1369,7 +1361,7 @@ private void executePipelines(
} catch (Exception e) {
if (haveAttemptedSampling.get() == false) {
// It is possible that an exception happened after we sampled. We do not want to sample the same document twice.
attemptToSampleData(project, indexRequest, ingestDocument, originalDocumentMetadata);
attemptToSampleData(project, indexRequest, ingestDocument);
}
logger.debug(
() -> format("failed to execute pipeline [%s] for document [%s/%s]", pipelineId, indexRequest.index(), indexRequest.id()),
Expand All @@ -1379,18 +1371,13 @@ private void executePipelines(
}
}

private void attemptToSampleData(
ProjectMetadata projectMetadata,
IndexRequest indexRequest,
IngestDocument ingestDocument,
Metadata originalDocumentMetadata
) {
private void attemptToSampleData(ProjectMetadata projectMetadata, IndexRequest indexRequest, IngestDocument ingestDocument) {
if (samplingService != null && samplingService.atLeastOneSampleConfigured(projectMetadata)) {
/*
* We need both the original document and the fully updated document for sampling, so we make a copy of the original
* before overwriting it here. We can discard it after sampling.
*/
samplingService.maybeSample(projectMetadata, originalDocumentMetadata.getIndex(), indexRequest, ingestDocument);
samplingService.maybeSample(projectMetadata, indexRequest, ingestDocument);

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,17 @@ public void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexReque
}

/**
* Potentially samples the given indexRequest, depending on the existing sampling configuration.
* Potentially samples the given indexRequest, depending on the existing sampling configuration. The request will be sampled against
* the sampling configurations of all indices it has been rerouted to (if it has been rerouted).
* @param projectMetadata Used to get the sampling configuration
* @param indexRequest The raw request to potentially sample
* @param ingestDocument The IngestDocument used for evaluating any conditionals that are part of the sample configuration
*/
public void maybeSample(ProjectMetadata projectMetadata, String indexName, IndexRequest indexRequest, IngestDocument ingestDocument) {
maybeSample(projectMetadata, indexName, indexRequest, () -> ingestDocument);
public void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexRequest, IngestDocument ingestDocument) {
// The index history gives us the initially-requested index, as well as any indices it has been rerouted through
for (String index : ingestDocument.getIndexHistory()) {
maybeSample(projectMetadata, index, indexRequest, () -> ingestDocument);
}
}

private void maybeSample(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3403,7 +3403,7 @@ public void testSampling() {
);
verify(listener, times(1)).onResponse(null);
// In the case where there is a pipeline, or there is a pipeline failure, there will be an IngestDocument so this verion is called:
verify(samplingService, times(2)).maybeSample(any(), any(), any(), any());
verify(samplingService, times(2)).maybeSample(any(), any(), any());
// When there is no pipeline, we have no IngestDocument, and the maybeSample that does not require an IngestDocument is called:
verify(samplingService, times(1)).maybeSample(any(), any());
}
Expand Down