Skip to content

Commit

Permalink
[7.5] Foreach processor - fork recursive call (#50514) (#50773) (#50816)
Browse files Browse the repository at this point in the history
A very large number of recursive calls can cause a stack overflow
exception. This commit forks the recursive calls for non-async
processors. Once forked, each thread will handle at most 10
recursive calls to help keep the stack size and thread count
down to a reasonable size.
  • Loading branch information
jakelandis authored Jan 9, 2020
1 parent c3fa878 commit 72a2d84
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.WrappingProcessor;
import org.elasticsearch.script.ScriptService;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;

import org.elasticsearch.ingest.WrappingProcessor;
import org.elasticsearch.script.ScriptService;
import java.util.function.Consumer;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
Expand All @@ -50,16 +50,19 @@
public final class ForEachProcessor extends AbstractProcessor implements WrappingProcessor {

public static final String TYPE = "foreach";
static final int MAX_RECURSE_PER_THREAD = 10;

private final String field;
private final Processor processor;
private final boolean ignoreMissing;
private final Consumer<Runnable> genericExecutor;

ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing) {
ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing, Consumer<Runnable> genericExecutor) {
super(tag);
this.field = field;
this.processor = processor;
this.ignoreMissing = ignoreMissing;
this.genericExecutor = genericExecutor;
}

boolean isIgnoreMissing() {
Expand Down Expand Up @@ -91,6 +94,7 @@ void innerExecute(int index, List<?> values, List<Object> newValues, IngestDocum

Object value = values.get(index);
Object previousValue = document.getIngestMetadata().put("_value", value);
final Thread thread = Thread.currentThread();
processor.execute(document, (result, e) -> {
if (e != null) {
newValues.add(document.getIngestMetadata().put("_value", previousValue));
Expand All @@ -99,7 +103,15 @@ void innerExecute(int index, List<?> values, List<Object> newValues, IngestDocum
handler.accept(null, null);
} else {
newValues.add(document.getIngestMetadata().put("_value", previousValue));
innerExecute(index + 1, values, newValues, document, handler);
if (thread == Thread.currentThread() && (index + 1) % MAX_RECURSE_PER_THREAD == 0) {
// we are on the same thread and we need to fork to another thread to avoid recursive stack overflow on a single thread
// only fork after 10 recursive calls, then fork every 10 to keep the number of threads down
genericExecutor.accept(() -> innerExecute(index + 1, values, newValues, document, handler));
} else {
// we are on a different thread (we went asynchronous), it's safe to recurse
// or we have recursed less then 10 times with the same thread, it's safe to recurse
innerExecute(index + 1, values, newValues, document, handler);
}
}
});
}
Expand All @@ -125,9 +137,11 @@ public Processor getInnerProcessor() {
public static final class Factory implements Processor.Factory {

private final ScriptService scriptService;
private final Consumer<Runnable> genericExecutor;

Factory(ScriptService scriptService) {
Factory(ScriptService scriptService, Consumer<Runnable> genericExecutor) {
this.scriptService = scriptService;
this.genericExecutor = genericExecutor;
}

@Override
Expand All @@ -143,7 +157,7 @@ public ForEachProcessor create(Map<String, Processor.Factory> factories, String
Map.Entry<String, Map<String, Object>> entry = entries.iterator().next();
Processor processor =
ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue());
return new ForEachProcessor(tag, field, processor, ignoreMissing);
return new ForEachProcessor(tag, field, processor, ignoreMissing, genericExecutor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
processors.put(ConvertProcessor.TYPE, new ConvertProcessor.Factory());
processors.put(GsubProcessor.TYPE, new GsubProcessor.Factory());
processors.put(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService));
processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService));
processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService, parameters.genericExecutor));
processors.put(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService));
processors.put(SortProcessor.TYPE, new SortProcessor.Factory());
processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,21 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;

public class ForEachProcessorFactoryTests extends ESTestCase {

private final ScriptService scriptService = mock(ScriptService.class);
private final Consumer<Runnable> genericExecutor = Runnable::run;

public void testCreate() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Expand All @@ -57,7 +59,7 @@ public void testSetIgnoreMissing() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Expand All @@ -75,7 +77,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception {
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_first", (r, t, c) -> processor);
registry.put("_second", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Expand All @@ -88,7 +90,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception {
}

public void testCreateWithNonExistingProcessorType() throws Exception {
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("processor", Collections.singletonMap("_name", Collections.emptyMap()));
Expand All @@ -101,15 +103,15 @@ public void testCreateWithMissingField() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
Map<String, Object> config = new HashMap<>();
config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap())));
Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config));
assertThat(exception.getMessage(), equalTo("[field] required property is missing"));
}

public void testCreateWithMissingProcessor() {
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config));
Expand Down
Loading

0 comments on commit 72a2d84

Please sign in to comment.