Skip to content

[Streams] Propagate the ingest pipeline access pattern flag to the ingest document #130488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@

import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand Down Expand Up @@ -73,6 +75,13 @@ public final class IngestDocument {
// Contains all pipelines that have been executed for this document
private final Set<String> executedPipelines = new LinkedHashSet<>();

/**
* Maintains the stack of access patterns for each pipeline that this document is currently being processed by.
* When a pipeline with one access pattern calls another pipeline with a different one, we must ensure the access patterns
* are correctly restored when returning from a nested pipeline to an enclosing pipeline.
*/
private final Deque<IngestPipelineFieldAccessPattern> accessPatternStack = new ArrayDeque<>();

/**
* An ordered set of the values of the _index that have been used for this document.
* <p>
Expand Down Expand Up @@ -114,12 +123,13 @@ public IngestDocument(IngestDocument other) {
deepCopyMap(other.ingestMetadata)
);
/*
* The executedPipelines field is clearly execution-centric rather than data centric. Despite what the comment above says, we're
* copying it here anyway. THe reason is that this constructor is only called from two non-test locations, and both of those
* involve the simulate pipeline logic. The simulate pipeline logic needs this information. Rather than making the code more
* complicated, we're just copying this over here since it does no harm.
* The executedPipelines and accessPatternStack fields are clearly execution-centric rather than data centric.
* Despite what the comment above says, we're copying it here anyway. THe reason is that this constructor is only called from
Copy link
Preview

Copilot AI Jul 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a typo in the Javadoc: THe should be The.

Suggested change
* Despite what the comment above says, we're copying it here anyway. THe reason is that this constructor is only called from
* Despite what the comment above says, we're copying it here anyway. The reason is that this constructor is only called from

Copilot uses AI. Check for mistakes.

* two non-test locations, and both of those involve the simulate pipeline logic. The simulate pipeline logic needs this
* information. Rather than making the code more complicated, we're just copying them over here since it does no harm.
*/
this.executedPipelines.addAll(other.executedPipelines);
this.accessPatternStack.addAll(other.accessPatternStack);
}

/**
Expand Down Expand Up @@ -856,8 +866,17 @@ public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Except
);
} else if (executedPipelines.add(pipeline.getId())) {
Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId());
IngestPipelineFieldAccessPattern previousAccessPattern = accessPatternStack.peek();
accessPatternStack.push(pipeline.getFieldAccessPattern());
pipeline.execute(this, (result, e) -> {
executedPipelines.remove(pipeline.getId());
accessPatternStack.poll();
Copy link
Preview

Copilot AI Jul 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using poll() ignores the returned element and silently handles an empty stack; consider using pop() or removeFirst() for clearer intent and to fail-fast on unexpected underflow.

Suggested change
accessPatternStack.poll();
accessPatternStack.pop();

Copilot uses AI. Check for mistakes.

assert previousAccessPattern == accessPatternStack.peek()
: "Cleared access pattern from nested pipeline and found inconsistent stack state. Expected ["
+ previousAccessPattern
+ "] but found ["
+ accessPatternStack.peek()
+ "]";
Comment on lines +874 to +879
Copy link
Preview

Copilot AI Jul 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relying on a Java assert means this check is skipped unless assertions are enabled; consider throwing an explicit exception or using a proper validation mechanism to ensure consistency in all environments.

Suggested change
assert previousAccessPattern == accessPatternStack.peek()
: "Cleared access pattern from nested pipeline and found inconsistent stack state. Expected ["
+ previousAccessPattern
+ "] but found ["
+ accessPatternStack.peek()
+ "]";
if (previousAccessPattern != accessPatternStack.peek()) {
throw new IllegalStateException(
"Cleared access pattern from nested pipeline and found inconsistent stack state. Expected ["
+ previousAccessPattern
+ "] but found ["
+ accessPatternStack.peek()
+ "]"
);
}

Copilot uses AI. Check for mistakes.

if (previousPipeline != null) {
ingestMetadata.put("pipeline", previousPipeline);
} else {
Expand All @@ -879,6 +898,13 @@ List<String> getPipelineStack() {
return pipelineStack;
}

/**
* @return The access pattern for any currently executing pipelines, or null if no pipelines are in progress for this doc
*/
public IngestPipelineFieldAccessPattern getCurrentAccessPattern() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional that this method isn't used above (and seems to only be used in the test)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used above yet. I'm currently using it in a couple places in the next set of changes I'm working on.

return accessPatternStack.peek();
}

/**
* Adds an index to the index history for this document, returning true if the index
* was added to the index history (i.e. if it wasn't already in the index history).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* A pipeline is a list of {@link Processor} instances grouped under a unique id.
*/
public final class Pipeline {
public class Pipeline {

public static final String DESCRIPTION_KEY = "description";
public static final String PROCESSORS_KEY = "processors";
Expand Down
100 changes: 100 additions & 0 deletions server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@

package org.elasticsearch.ingest;

import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.XContentType;
import org.hamcrest.Matchers;
import org.junit.Assume;
import org.junit.Before;
import org.mockito.ArgumentCaptor;

import java.time.Instant;
import java.time.ZoneId;
Expand All @@ -25,12 +28,14 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.DoubleStream;

import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -40,6 +45,10 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class IngestDocumentTests extends ESTestCase {

Expand Down Expand Up @@ -1245,4 +1254,95 @@ public void testSourceHashMapIsNotCopied() {
assertThat(document2.getCtxMap().getMetadata(), not(sameInstance(document1.getCtxMap().getMetadata())));
}
}

/**
* When executing nested pipelines on an ingest document, the document should keep track of each pipeline's access pattern for the
* lifetime of each pipeline execution. When a pipeline execution concludes, it should clear access pattern from the document and
* restore the previous pipeline's access pattern.
*/
public void testNestedAccessPatternPropagation() {
Assume.assumeTrue(DataStream.LOGS_STREAM_FEATURE_FLAG);

Map<String, Object> source = new HashMap<>(Map.of("foo", 1));
IngestDocument document = new IngestDocument("index", "id", 1, null, null, source);

// 1-3 nested calls
doTestNestedAccessPatternPropagation(0, randomIntBetween(1, 5), document);

// At the end of the test, there should be neither pipeline ids nor access patterns left in the stack.
assertThat(document.getPipelineStack(), is(empty()));
assertThat(document.getCurrentAccessPattern(), is(nullValue()));
}

/**
* Recursively execute some number of pipelines at various call depths to simulate a robust chain of pipelines being called on a
* document.
* @param level The current call depth. This is how many pipelines deep into the nesting we are.
* @param maxCallDepth How much further in the call depth we should go in the test. If this is greater than the current level, we will
* recurse in at least one of the pipelines executed at this level. If the current level is equal to the max call
* depth we will run some pipelines but recurse no further before returning.
* @param document The document to repeatedly use and verify against.
*/
void doTestNestedAccessPatternPropagation(int level, int maxCallDepth, IngestDocument document) {
// 1-5 pipelines to be run at any given level
logger.debug("LEVEL {}/{}: BEGIN", level, maxCallDepth);
int pipelinesAtThisLevel = randomIntBetween(1, 7);
logger.debug("Run pipelines: {}", pipelinesAtThisLevel);

boolean recursed = false;
if (level >= maxCallDepth) {
// If we're at max call depth, do no recursions
recursed = true;
logger.debug("No more recursions");
}

for (int pipelineIdx = 0; pipelineIdx < pipelinesAtThisLevel; pipelineIdx++) {
String expectedPipelineId = randomAlphaOfLength(20);
IngestPipelineFieldAccessPattern expectedAccessPattern = randomFrom(IngestPipelineFieldAccessPattern.values());

// We mock the pipeline because it's easier to verify calls and doesn't
// need us to force a stall in the execution logic to half apply it.
Pipeline mockPipeline = mock(Pipeline.class);
when(mockPipeline.getId()).thenReturn(expectedPipelineId);
when(mockPipeline.getProcessors()).thenReturn(List.of(new TestProcessor((doc) -> {})));
when(mockPipeline.getFieldAccessPattern()).thenReturn(expectedAccessPattern);
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> mockHandler = mock(BiConsumer.class);

// Execute pipeline
logger.debug("LEVEL {}/{}: Executing {}/{}", level, maxCallDepth, pipelineIdx, pipelinesAtThisLevel);
document.executePipeline(mockPipeline, mockHandler);

// Verify pipeline was called, capture completion handler
ArgumentCaptor<BiConsumer<IngestDocument, Exception>> argumentCaptor = ArgumentCaptor.captor();
verify(mockPipeline).execute(eq(document), argumentCaptor.capture());

// Assert expected state
assertThat(document.getPipelineStack().getFirst(), is(expectedPipelineId));
assertThat(document.getCurrentAccessPattern(), is(expectedAccessPattern));

// Randomly recurse: We recurse only one time per level to avoid hogging test time, but we randomize which
// pipeline to recurse on, eventually requiring a recursion on the last pipeline run if one hasn't happened yet.
if (recursed == false && (randomBoolean() || pipelineIdx == pipelinesAtThisLevel - 1)) {
logger.debug("Recursed on pipeline {}", pipelineIdx);
doTestNestedAccessPatternPropagation(level + 1, maxCallDepth, document);
recursed = true;
}

// Pull up the captured completion handler to conclude the pipeline run
argumentCaptor.getValue().accept(document, null);

// Assert expected state
assertThat(document.getPipelineStack().size(), is(equalTo(level)));
if (level == 0) {
// Top level means access pattern should be empty
assertThat(document.getCurrentAccessPattern(), is(nullValue()));
} else {
// If we're nested below the top level we should still have an access
// pattern on the document for the pipeline above us
assertThat(document.getCurrentAccessPattern(), is(not(nullValue())));
}
}
logger.debug("LEVEL {}/{}: COMPLETE", level, maxCallDepth);
}
}