diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 2d03460fe43ed..af3bf3eb21f98 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -27,12 +27,14 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.Deque; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; @@ -73,6 +75,13 @@ public final class IngestDocument { // Contains all pipelines that have been executed for this document private final Set executedPipelines = new LinkedHashSet<>(); + /** + * Maintains the stack of access patterns for each pipeline that this document is currently being processed by. + * When a pipeline with one access pattern calls another pipeline with a different one, we must ensure the access patterns + * are correctly restored when returning from a nested pipeline to an enclosing pipeline. + */ + private final Deque accessPatternStack = new ArrayDeque<>(); + /** * An ordered set of the values of the _index that have been used for this document. *

@@ -114,12 +123,13 @@ public IngestDocument(IngestDocument other) { deepCopyMap(other.ingestMetadata) ); /* - * The executedPipelines field is clearly execution-centric rather than data centric. Despite what the comment above says, we're - * copying it here anyway. THe reason is that this constructor is only called from two non-test locations, and both of those - * involve the simulate pipeline logic. The simulate pipeline logic needs this information. Rather than making the code more - * complicated, we're just copying this over here since it does no harm. + * The executedPipelines and accessPatternStack fields are clearly execution-centric rather than data centric. + * Despite what the comment above says, we're copying it here anyway. THe reason is that this constructor is only called from + * two non-test locations, and both of those involve the simulate pipeline logic. The simulate pipeline logic needs this + * information. Rather than making the code more complicated, we're just copying them over here since it does no harm. */ this.executedPipelines.addAll(other.executedPipelines); + this.accessPatternStack.addAll(other.accessPatternStack); } /** @@ -856,8 +866,17 @@ public void executePipeline(Pipeline pipeline, BiConsumer { executedPipelines.remove(pipeline.getId()); + accessPatternStack.poll(); + assert previousAccessPattern == accessPatternStack.peek() + : "Cleared access pattern from nested pipeline and found inconsistent stack state. Expected [" + + previousAccessPattern + + "] but found [" + + accessPatternStack.peek() + + "]"; if (previousPipeline != null) { ingestMetadata.put("pipeline", previousPipeline); } else { @@ -879,6 +898,13 @@ List getPipelineStack() { return pipelineStack; } + /** + * @return The access pattern for any currently executing pipelines, or null if no pipelines are in progress for this doc + */ + public IngestPipelineFieldAccessPattern getCurrentAccessPattern() { + return accessPatternStack.peek(); + } + /** * Adds an index to the index history for this document, returning true if the index * was added to the index history (i.e. if it wasn't already in the index history). diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index e1ade43933289..ec9163f5e847d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -25,7 +25,7 @@ /** * A pipeline is a list of {@link Processor} instances grouped under a unique id. */ -public final class Pipeline { +public class Pipeline { public static final String DESCRIPTION_KEY = "description"; public static final String PROCESSORS_KEY = "processors"; diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java index a0152ea63ef53..f3825ca7f9cc1 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java @@ -9,12 +9,15 @@ package org.elasticsearch.ingest; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.XContentType; import org.hamcrest.Matchers; +import org.junit.Assume; import org.junit.Before; +import org.mockito.ArgumentCaptor; import java.time.Instant; import java.time.ZoneId; @@ -25,12 +28,14 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiConsumer; import java.util.stream.DoubleStream; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; @@ -40,6 +45,10 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class IngestDocumentTests extends ESTestCase { @@ -1245,4 +1254,95 @@ public void testSourceHashMapIsNotCopied() { assertThat(document2.getCtxMap().getMetadata(), not(sameInstance(document1.getCtxMap().getMetadata()))); } } + + /** + * When executing nested pipelines on an ingest document, the document should keep track of each pipeline's access pattern for the + * lifetime of each pipeline execution. When a pipeline execution concludes, it should clear access pattern from the document and + * restore the previous pipeline's access pattern. + */ + public void testNestedAccessPatternPropagation() { + Assume.assumeTrue(DataStream.LOGS_STREAM_FEATURE_FLAG); + + Map source = new HashMap<>(Map.of("foo", 1)); + IngestDocument document = new IngestDocument("index", "id", 1, null, null, source); + + // 1-3 nested calls + doTestNestedAccessPatternPropagation(0, randomIntBetween(1, 5), document); + + // At the end of the test, there should be neither pipeline ids nor access patterns left in the stack. + assertThat(document.getPipelineStack(), is(empty())); + assertThat(document.getCurrentAccessPattern(), is(nullValue())); + } + + /** + * Recursively execute some number of pipelines at various call depths to simulate a robust chain of pipelines being called on a + * document. + * @param level The current call depth. This is how many pipelines deep into the nesting we are. + * @param maxCallDepth How much further in the call depth we should go in the test. If this is greater than the current level, we will + * recurse in at least one of the pipelines executed at this level. If the current level is equal to the max call + * depth we will run some pipelines but recurse no further before returning. + * @param document The document to repeatedly use and verify against. + */ + void doTestNestedAccessPatternPropagation(int level, int maxCallDepth, IngestDocument document) { + // 1-5 pipelines to be run at any given level + logger.debug("LEVEL {}/{}: BEGIN", level, maxCallDepth); + int pipelinesAtThisLevel = randomIntBetween(1, 7); + logger.debug("Run pipelines: {}", pipelinesAtThisLevel); + + boolean recursed = false; + if (level >= maxCallDepth) { + // If we're at max call depth, do no recursions + recursed = true; + logger.debug("No more recursions"); + } + + for (int pipelineIdx = 0; pipelineIdx < pipelinesAtThisLevel; pipelineIdx++) { + String expectedPipelineId = randomAlphaOfLength(20); + IngestPipelineFieldAccessPattern expectedAccessPattern = randomFrom(IngestPipelineFieldAccessPattern.values()); + + // We mock the pipeline because it's easier to verify calls and doesn't + // need us to force a stall in the execution logic to half apply it. + Pipeline mockPipeline = mock(Pipeline.class); + when(mockPipeline.getId()).thenReturn(expectedPipelineId); + when(mockPipeline.getProcessors()).thenReturn(List.of(new TestProcessor((doc) -> {}))); + when(mockPipeline.getFieldAccessPattern()).thenReturn(expectedAccessPattern); + @SuppressWarnings("unchecked") + BiConsumer mockHandler = mock(BiConsumer.class); + + // Execute pipeline + logger.debug("LEVEL {}/{}: Executing {}/{}", level, maxCallDepth, pipelineIdx, pipelinesAtThisLevel); + document.executePipeline(mockPipeline, mockHandler); + + // Verify pipeline was called, capture completion handler + ArgumentCaptor> argumentCaptor = ArgumentCaptor.captor(); + verify(mockPipeline).execute(eq(document), argumentCaptor.capture()); + + // Assert expected state + assertThat(document.getPipelineStack().getFirst(), is(expectedPipelineId)); + assertThat(document.getCurrentAccessPattern(), is(expectedAccessPattern)); + + // Randomly recurse: We recurse only one time per level to avoid hogging test time, but we randomize which + // pipeline to recurse on, eventually requiring a recursion on the last pipeline run if one hasn't happened yet. + if (recursed == false && (randomBoolean() || pipelineIdx == pipelinesAtThisLevel - 1)) { + logger.debug("Recursed on pipeline {}", pipelineIdx); + doTestNestedAccessPatternPropagation(level + 1, maxCallDepth, document); + recursed = true; + } + + // Pull up the captured completion handler to conclude the pipeline run + argumentCaptor.getValue().accept(document, null); + + // Assert expected state + assertThat(document.getPipelineStack().size(), is(equalTo(level))); + if (level == 0) { + // Top level means access pattern should be empty + assertThat(document.getCurrentAccessPattern(), is(nullValue())); + } else { + // If we're nested below the top level we should still have an access + // pattern on the document for the pipeline above us + assertThat(document.getCurrentAccessPattern(), is(not(nullValue()))); + } + } + logger.debug("LEVEL {}/{}: COMPLETE", level, maxCallDepth); + } }