diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 681b167c828e1..11c1d3e2ea764 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -23,6 +23,8 @@ import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; +import org.elasticsearch.ingest.WrappingProcessor; +import org.elasticsearch.script.ScriptService; import java.util.ArrayList; import java.util.List; @@ -30,9 +32,7 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.BiConsumer; - -import org.elasticsearch.ingest.WrappingProcessor; -import org.elasticsearch.script.ScriptService; +import java.util.function.Consumer; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty; @@ -50,16 +50,19 @@ public final class ForEachProcessor extends AbstractProcessor implements WrappingProcessor { public static final String TYPE = "foreach"; + static final int MAX_RECURSE_PER_THREAD = 10; private final String field; private final Processor processor; private final boolean ignoreMissing; + private final Consumer genericExecutor; - ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing) { + ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing, Consumer genericExecutor) { super(tag); this.field = field; this.processor = processor; this.ignoreMissing = ignoreMissing; + this.genericExecutor = genericExecutor; } boolean isIgnoreMissing() { @@ -91,6 +94,7 @@ void innerExecute(int index, List values, List newValues, IngestDocum Object value = values.get(index); Object previousValue = document.getIngestMetadata().put("_value", value); + final Thread thread = Thread.currentThread(); processor.execute(document, (result, e) -> { if (e != null) { newValues.add(document.getIngestMetadata().put("_value", previousValue)); @@ -99,7 +103,15 @@ void innerExecute(int index, List values, List newValues, IngestDocum handler.accept(null, null); } else { newValues.add(document.getIngestMetadata().put("_value", previousValue)); - innerExecute(index + 1, values, newValues, document, handler); + if (thread == Thread.currentThread() && (index + 1) % MAX_RECURSE_PER_THREAD == 0) { + // we are on the same thread and we need to fork to another thread to avoid recursive stack overflow on a single thread + // only fork after 10 recursive calls, then fork every 10 to keep the number of threads down + genericExecutor.accept(() -> innerExecute(index + 1, values, newValues, document, handler)); + } else { + // we are on a different thread (we went asynchronous), it's safe to recurse + // or we have recursed less then 10 times with the same thread, it's safe to recurse + innerExecute(index + 1, values, newValues, document, handler); + } } }); } @@ -125,9 +137,11 @@ public Processor getInnerProcessor() { public static final class Factory implements Processor.Factory { private final ScriptService scriptService; + private final Consumer genericExecutor; - Factory(ScriptService scriptService) { + Factory(ScriptService scriptService, Consumer genericExecutor) { this.scriptService = scriptService; + this.genericExecutor = genericExecutor; } @Override @@ -143,7 +157,7 @@ public ForEachProcessor create(Map factories, String Map.Entry> entry = entries.iterator().next(); Processor processor = ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue()); - return new ForEachProcessor(tag, field, processor, ignoreMissing); + return new ForEachProcessor(tag, field, processor, ignoreMissing, genericExecutor); } } } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index b0fe00bdb3496..fbc2a6f97ae45 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -74,7 +74,7 @@ public Map getProcessors(Processor.Parameters paramet processors.put(ConvertProcessor.TYPE, new ConvertProcessor.Factory()); processors.put(GsubProcessor.TYPE, new GsubProcessor.Factory()); processors.put(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService)); - processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService)); + processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService, parameters.genericExecutor)); processors.put(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService)); processors.put(SortProcessor.TYPE, new SortProcessor.Factory()); processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters))); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java index 65fee8ce19a7d..19b3966573f7f 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.function.Consumer; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; @@ -36,12 +37,13 @@ public class ForEachProcessorFactoryTests extends ESTestCase { private final ScriptService scriptService = mock(ScriptService.class); + private final Consumer genericExecutor = Runnable::run; public void testCreate() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); Map config = new HashMap<>(); config.put("field", "_field"); @@ -57,7 +59,7 @@ public void testSetIgnoreMissing() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); Map config = new HashMap<>(); config.put("field", "_field"); @@ -75,7 +77,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception { Map registry = new HashMap<>(); registry.put("_first", (r, t, c) -> processor); registry.put("_second", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); Map config = new HashMap<>(); config.put("field", "_field"); @@ -88,7 +90,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception { } public void testCreateWithNonExistingProcessorType() throws Exception { - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); Map config = new HashMap<>(); config.put("field", "_field"); config.put("processor", Collections.singletonMap("_name", Collections.emptyMap())); @@ -101,7 +103,7 @@ public void testCreateWithMissingField() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); Map config = new HashMap<>(); config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap()))); Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config)); @@ -109,7 +111,7 @@ public void testCreateWithMissingField() throws Exception { } public void testCreateWithMissingProcessor() { - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); Map config = new HashMap<>(); config.put("field", "_field"); Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config)); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index a4ee786315c03..66b6cb5c71fc2 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -19,6 +19,9 @@ package org.elasticsearch.ingest.common; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; @@ -26,6 +29,7 @@ import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.script.TemplateScript; import org.elasticsearch.test.ESTestCase; +import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; @@ -34,32 +38,98 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.IntStream; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; public class ForEachProcessorTests extends ESTestCase { + @SuppressWarnings("unchecked") + private Consumer genericExecutor = (Consumer) mock(Consumer.class); + private final ExecutorService direct = EsExecutors.newDirectExecutorService(); + + @Before + public void setup() { + //execute runnable on same thread for simplicity. some tests will override this and actually run async + doAnswer(invocationOnMock -> { + direct.execute((Runnable) invocationOnMock.getArguments()[0]); + return null; + }).when(genericExecutor).accept(any(Runnable.class)); + } + public void testExecute() throws Exception { + ThreadPoolExecutor asyncExecutor = + EsExecutors.newScaling(getClass().getName() + "/" + getTestName(), between(1, 2), between(3, 4), 10, TimeUnit.SECONDS, + EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY)); + doAnswer(invocationOnMock -> { + asyncExecutor.execute((Runnable) invocationOnMock.getArguments()[0]); + return null; + }).when(genericExecutor).accept(any(Runnable.class)); + List values = new ArrayList<>(); values.add("foo"); values.add("bar"); values.add("baz"); + IntStream.range(0, ForEachProcessor.MAX_RECURSE_PER_THREAD).forEach(value -> values.add("a")); IngestDocument ingestDocument = new IngestDocument( "_index", "_type", "_id", null, null, null, Collections.singletonMap("values", values) ); ForEachProcessor processor = new ForEachProcessor( "_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"), - false + false, genericExecutor ); processor.execute(ingestDocument, (result, e) -> {}); + assertBusy(() -> assertEquals(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD, asyncExecutor.getCompletedTaskCount())); + asyncExecutor.shutdown(); + asyncExecutor.awaitTermination(5, TimeUnit.SECONDS); + @SuppressWarnings("unchecked") List result = ingestDocument.getFieldValue("values", List.class); assertThat(result.get(0), equalTo("FOO")); assertThat(result.get(1), equalTo("BAR")); assertThat(result.get(2), equalTo("BAZ")); + IntStream.range(3, ForEachProcessor.MAX_RECURSE_PER_THREAD + 3).forEach(i -> assertThat(result.get(i), equalTo("A"))); + verify(genericExecutor, times(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD)).accept(any(Runnable.class)); + } + + public void testExecuteWithAsyncProcessor() throws Exception { + List values = new ArrayList<>(); + values.add("foo"); + values.add("bar"); + values.add("baz"); + IngestDocument ingestDocument = new IngestDocument( + "_index", "_type", "_id", null, null, null, Collections.singletonMap("values", values) + ); + + ForEachProcessor processor = new ForEachProcessor("_tag", "values", new AsyncUpperCaseProcessor("_ingest._value"), + false, genericExecutor); + processor.execute(ingestDocument, (result, e) -> { + }); + + assertBusy(() -> { + @SuppressWarnings("unchecked") + List result = ingestDocument.getFieldValue("values", List.class); + assertEquals(values.size(), result.size()); + assertThat(result.get(0), equalTo("FOO")); + assertThat(result.get(1), equalTo("BAR")); + assertThat(result.get(2), equalTo("BAZ")); + }); + + verifyZeroInteractions(genericExecutor); } public void testExecuteWithFailure() throws Exception { @@ -72,7 +142,7 @@ public void testExecuteWithFailure() throws Exception { throw new RuntimeException("failure"); } }); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false, genericExecutor); Exception[] exceptions = new Exception[1]; processor.execute(ingestDocument, (result, e) -> {exceptions[0] = e;}); assertThat(exceptions[0].getMessage(), equalTo("failure")); @@ -90,7 +160,7 @@ public void testExecuteWithFailure() throws Exception { Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {}); processor = new ForEachProcessor( "_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)), - false + false, genericExecutor ); processor.execute(ingestDocument, (result, e) -> {}); assertThat(testProcessor.getInvokedCounter(), equalTo(3)); @@ -110,7 +180,7 @@ public void testMetaDataAvailable() throws Exception { id.setFieldValue("_ingest._value.type", id.getSourceAndMetadata().get("_type")); id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id")); }); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, genericExecutor); processor.execute(ingestDocument, (result, e) -> {}); assertThat(innerProcessor.getInvokedCounter(), equalTo(2)); @@ -138,7 +208,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { ForEachProcessor processor = new ForEachProcessor( "_tag", "values", new SetProcessor("_tag", new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"), - (model) -> model.get("other")), false); + (model) -> model.get("other")), false, genericExecutor); processor.execute(ingestDocument, (result, e) -> {}); assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value")); @@ -149,6 +219,13 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { } public void testRandom() throws Exception { + ThreadPoolExecutor asyncExecutor = + EsExecutors.newScaling(getClass().getName() + "/" + getTestName(), between(1, 2), between(3, 4), 10, TimeUnit.SECONDS, + EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY)); + doAnswer(invocationOnMock -> { + asyncExecutor.execute((Runnable) invocationOnMock.getArguments()[0]); + return null; + }).when(genericExecutor).accept(any(Runnable.class)); Processor innerProcessor = new Processor() { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { @@ -167,7 +244,7 @@ public String getTag() { return null; } }; - int numValues = randomIntBetween(1, 32); + int numValues = randomIntBetween(1, 10000); List values = new ArrayList<>(numValues); for (int i = 0; i < numValues; i++) { values.add(""); @@ -176,14 +253,20 @@ public String getTag() { "_index", "_type", "_id", null, null, null, Collections.singletonMap("values", values) ); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, genericExecutor); processor.execute(ingestDocument, (result, e) -> {}); + + assertBusy(() -> assertEquals(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD, asyncExecutor.getCompletedTaskCount())); + asyncExecutor.shutdown(); + asyncExecutor.awaitTermination(5, TimeUnit.SECONDS); + @SuppressWarnings("unchecked") List result = ingestDocument.getFieldValue("values", List.class); assertThat(result.size(), equalTo(numValues)); for (String r : result) { assertThat(r, equalTo(".")); } + verify(genericExecutor, times(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD)).accept(any(Runnable.class)); } public void testModifyFieldsOutsideArray() throws Exception { @@ -201,7 +284,7 @@ public void testModifyFieldsOutsideArray() throws Exception { "_tag", "values", new CompoundProcessor(false, Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")), Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added")))) - ), false); + ), false, genericExecutor); processor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values", List.class); @@ -227,7 +310,7 @@ public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_source._value", String.class))); - ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false); + ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false, genericExecutor); forEachProcessor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values", List.class); @@ -260,7 +343,8 @@ public void testNestedForEach() throws Exception { doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_ingest._value", String.class).toUpperCase(Locale.ENGLISH)) ); ForEachProcessor processor = new ForEachProcessor( - "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), false); + "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false, genericExecutor), + false, genericExecutor); processor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values1.0.values2", List.class); @@ -278,10 +362,47 @@ public void testIgnoreMissing() throws Exception { ); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); TestProcessor testProcessor = new TestProcessor(doc -> {}); - ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true); + ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true, genericExecutor); processor.execute(ingestDocument, (result, e) -> {}); assertIngestDocument(originalIngestDocument, ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(0)); } + private class AsyncUpperCaseProcessor implements Processor { + + private final String field; + + private AsyncUpperCaseProcessor(String field) { + this.field = field; + } + + @Override + public void execute(IngestDocument document, BiConsumer handler) { + new Thread(() -> { + try { + String value = document.getFieldValue(field, String.class, false); + document.setFieldValue(field, value.toUpperCase(Locale.ROOT)); + handler.accept(document, null); + } catch (Exception e) { + handler.accept(null, e); + } + }).start(); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException("this is an async processor, don't call this"); + } + + @Override + public String getType() { + return "uppercase-async"; + } + + @Override + public String getTag() { + return getType(); + } + } + } diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/80_foreach.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/80_foreach.yml index ab2be3bf81c2d..7fbf182eac05f 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/80_foreach.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/80_foreach.yml @@ -45,3 +45,22 @@ teardown: type: test id: 1 - match: { _source.values: ["FOO", "BAR", "BAZ"] } + + #exceeds the recurse max per thread and will runs some of these on a different thread + - do: + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: > + { + "values": ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", + "v", "w", "x", "y", "z"] + } + + - do: + get: + index: test + id: 1 + - match: { _source.values: ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", + "V", "W", "X", "Y", "Z"] } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 926cf15c72a56..18fbb906a2d10 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -102,9 +102,10 @@ public IngestService(ClusterService clusterService, ThreadPool threadPool, threadPool.getThreadContext(), threadPool::relativeTimeInMillis, (delay, command) -> threadPool.schedule( command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC - ), this, client + ), this, client, threadPool.generic()::execute ) ); + this.threadPool = threadPool; } diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index 029e80234e9e8..671670212ab0a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.function.BiConsumer; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.LongSupplier; /** @@ -122,6 +123,8 @@ class Parameters { public final IngestService ingestService; + public final Consumer genericExecutor; + /** * Provides scheduler support */ @@ -134,7 +137,7 @@ class Parameters { public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext, LongSupplier relativeTimeSupplier, BiFunction scheduler, - IngestService ingestService, Client client) { + IngestService ingestService, Client client, Consumer genericExecutor ) { this.env = env; this.scriptService = scriptService; this.threadContext = threadContext; @@ -143,6 +146,7 @@ public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry this.scheduler = scheduler; this.ingestService = ingestService; this.client = client; + this.genericExecutor = genericExecutor; } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 604428af9b185..fbfb5fb11899f 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -58,6 +58,7 @@ import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.CustomTypeSafeMatcher; +import org.junit.Before; import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; @@ -107,10 +108,18 @@ public Map getProcessors(Processor.Parameters paramet } }; + private ThreadPool threadPool; + + @Before + public void setup(){ + threadPool = mock(ThreadPool.class); + ExecutorService executorService = EsExecutors.newDirectExecutorService(); + when(threadPool.generic()).thenReturn(executorService); + when(threadPool.executor(anyString())).thenReturn(executorService); + } public void testIngestPlugin() { - ThreadPool tp = mock(ThreadPool.class); Client client = mock(Client.class); - IngestService ingestService = new IngestService(mock(ClusterService.class), tp, null, null, + IngestService ingestService = new IngestService(mock(ClusterService.class), threadPool, null, null, null, Collections.singletonList(DUMMY_PLUGIN), client); Map factories = ingestService.getProcessorFactories(); assertTrue(factories.containsKey("foo")); @@ -118,19 +127,15 @@ public void testIngestPlugin() { } public void testIngestPluginDuplicate() { - ThreadPool tp = mock(ThreadPool.class); Client client = mock(Client.class); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> - new IngestService(mock(ClusterService.class), tp, null, null, + new IngestService(mock(ClusterService.class), threadPool, null, null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN), client)); assertTrue(e.getMessage(), e.getMessage().contains("already registered")); } public void testExecuteIndexPipelineDoesNotExist() { - ThreadPool threadPool = mock(ThreadPool.class); Client client = mock(Client.class); - final ExecutorService executorService = EsExecutors.newDirectExecutorService(); - when(threadPool.executor(anyString())).thenReturn(executorService); IngestService ingestService = new IngestService(mock(ClusterService.class), threadPool, null, null, null, Collections.singletonList(DUMMY_PLUGIN), client); final IndexRequest indexRequest = @@ -1209,10 +1214,9 @@ public Map getProcessors(Processor.Parameters paramet }; // Create ingest service: - ThreadPool tp = mock(ThreadPool.class); Client client = mock(Client.class); IngestService ingestService = - new IngestService(mock(ClusterService.class), tp, null, null, null, Arrays.asList(testPlugin), client); + new IngestService(mock(ClusterService.class), threadPool, null, null, null, Arrays.asList(testPlugin), client); ingestService.addIngestClusterStateListener(ingestClusterStateListener); // Create pipeline and apply the resulting cluster state, which should update the counter in the right order: @@ -1251,9 +1255,11 @@ private static IngestService createWithProcessors() { } private static IngestService createWithProcessors(Map processors) { - ThreadPool threadPool = mock(ThreadPool.class); + Client client = mock(Client.class); - final ExecutorService executorService = EsExecutors.newDirectExecutorService(); + ThreadPool threadPool = mock(ThreadPool.class); + ExecutorService executorService = EsExecutors.newDirectExecutorService(); + when(threadPool.generic()).thenReturn(executorService); when(threadPool.executor(anyString())).thenReturn(executorService); return new IngestService(mock(ClusterService.class), threadPool, null, null, null, Collections.singletonList(new IngestPlugin() {