diff --git a/core/src/main/java/org/jsmart/zerocode/parallel/ExecutorServiceRunner.java b/core/src/main/java/org/jsmart/zerocode/parallel/ExecutorServiceRunner.java index 119a8d360..b6c6d5f10 100644 --- a/core/src/main/java/org/jsmart/zerocode/parallel/ExecutorServiceRunner.java +++ b/core/src/main/java/org/jsmart/zerocode/parallel/ExecutorServiceRunner.java @@ -11,6 +11,9 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -18,6 +21,7 @@ import static java.lang.Integer.parseInt; import static java.lang.Thread.sleep; import static java.time.LocalDateTime.now; +import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.Executors.newFixedThreadPool; public class ExecutorServiceRunner { @@ -29,6 +33,7 @@ public class ExecutorServiceRunner { private int numberOfThreads; private int rampUpPeriod; private int loopCount; + private int abortAfterTimeLapsedInSeconds; private Double delayBetweenTwoThreadsInMilliSecs; @@ -37,6 +42,7 @@ public ExecutorServiceRunner(String loadPropertiesFile) { numberOfThreads = parseInt(properties.getProperty("number.of.threads")); rampUpPeriod = parseInt(properties.getProperty("ramp.up.period.in.seconds")); loopCount = parseInt(properties.getProperty("loop.count")); + abortAfterTimeLapsedInSeconds = parseInt(properties.getProperty("abort.after.time.lapsed.in.seconds")); calculateAndSetDelayBetweenTwoThreadsInSecs(rampUpPeriod); @@ -47,6 +53,17 @@ public ExecutorServiceRunner(int numberOfThreads, int loopCount, int rampUpPerio this.numberOfThreads = numberOfThreads; this.loopCount = loopCount; this.rampUpPeriod = rampUpPeriod; + this.abortAfterTimeLapsedInSeconds = Integer.MAX_VALUE; + + calculateAndSetDelayBetweenTwoThreadsInSecs(this.rampUpPeriod); + logLoadingProperties(); + } + + public ExecutorServiceRunner(int numberOfThreads, int loopCount, int rampUpPeriod, int abortAfterTimeLapsedInSeconds) { + this.numberOfThreads = numberOfThreads; + this.loopCount = loopCount; + this.rampUpPeriod = rampUpPeriod; + this.abortAfterTimeLapsedInSeconds = abortAfterTimeLapsedInSeconds; calculateAndSetDelayBetweenTwoThreadsInSecs(this.rampUpPeriod); logLoadingProperties(); @@ -64,16 +81,60 @@ public ExecutorServiceRunner addCallable(Callable callable) { public void runRunnables() { + executeWithAbortTimeout(() -> { + if (runnables == null || runnables.size() == 0) { + throw new RuntimeException("No runnable(s) was found to run. You can add one or more runnables using 'addRunnable(Runnable runnable)'"); + } - if (runnables == null || runnables.size() == 0) { - throw new RuntimeException("No runnable(s) was found to run. You can add one or more runnables using 'addRunnable(Runnable runnable)'"); - } + ExecutorService executorService = newFixedThreadPool(numberOfThreads); - ExecutorService executorService = newFixedThreadPool(numberOfThreads); + try { + for (int i = 0; i < loopCount; i++) { + runnables.stream().forEach(thisFunction -> { + for (int j = 0; j < numberOfThreads; j++) { + try { + LOGGER.debug("Waiting for the next test flight to adjust the overall ramp up time, " + + "waiting time in the transit now = " + delayBetweenTwoThreadsInMilliSecs); + sleep(delayBetweenTwoThreadsInMilliSecs.longValue()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - try { - for (int i = 0; i < loopCount; i++) { - runnables.stream().forEach(thisFunction -> { + LOGGER.debug(Thread.currentThread().getName() + " Executor - *Start... Time = " + now()); + + executorService.execute(thisFunction); + + LOGGER.debug(Thread.currentThread().getName() + " Executor - *Finished Time = " + now()); + } + }); + } + } catch (Exception interruptEx) { + throw new RuntimeException(interruptEx); + } finally { + executorService.shutdown(); + while (!executorService.isTerminated()) { + // -------------------------------------- + // wait for all tasks to finish execution + // -------------------------------------- + //LOGGER.info("Still waiting for all threads to complete execution..."); + } + LOGGER.debug("**Finished executing all threads**"); + } + }); + } + + public void runRunnablesMulti() { + executeWithAbortTimeout(() -> { + if (runnables == null || runnables.size() == 0) { + throw new RuntimeException("No runnable(s) was found to run. You can add one or more runnables using 'addRunnable(Runnable runnable)'"); + } + + ExecutorService executorService = newFixedThreadPool(numberOfThreads); + + try { + final AtomicInteger functionIndex = new AtomicInteger(); + + for (int i = 0; i < loopCount; i++) { for (int j = 0; j < numberOfThreads; j++) { try { LOGGER.debug("Waiting for the next test flight to adjust the overall ramp up time, " + @@ -85,70 +146,29 @@ public void runRunnables() { LOGGER.debug(Thread.currentThread().getName() + " Executor - *Start... Time = " + now()); - executorService.execute(thisFunction); + executorService.execute(runnables.get(functionIndex.getAndIncrement())); LOGGER.debug(Thread.currentThread().getName() + " Executor - *Finished Time = " + now()); - } - }); - } - } catch (Exception interruptEx) { - throw new RuntimeException(interruptEx); - } finally { - executorService.shutdown(); - while (!executorService.isTerminated()) { - // -------------------------------------- - // wait for all tasks to finish execution - // -------------------------------------- - //LOGGER.info("Still waiting for all threads to complete execution..."); - } - LOGGER.debug("**Finished executing all threads**"); - } - } - - public void runRunnablesMulti() { - if (runnables == null || runnables.size() == 0) { - throw new RuntimeException("No runnable(s) was found to run. You can add one or more runnables using 'addRunnable(Runnable runnable)'"); - } - - ExecutorService executorService = newFixedThreadPool(numberOfThreads); - try { - final AtomicInteger functionIndex = new AtomicInteger(); - - for (int i = 0; i < loopCount; i++) { - for (int j = 0; j < numberOfThreads; j++) { - try { - LOGGER.debug("Waiting for the next test flight to adjust the overall ramp up time, " + - "waiting time in the transit now = " + delayBetweenTwoThreadsInMilliSecs); - sleep(delayBetweenTwoThreadsInMilliSecs.longValue()); - } catch (InterruptedException e) { - throw new RuntimeException(e); + if (functionIndex.get() == runnables.size()) { + functionIndex.set(0); + } } - LOGGER.debug(Thread.currentThread().getName() + " Executor - *Start... Time = " + now()); - - executorService.execute(runnables.get(functionIndex.getAndIncrement())); - - LOGGER.debug(Thread.currentThread().getName() + " Executor - *Finished Time = " + now()); - - if(functionIndex.get() == runnables.size()){ - functionIndex.set(0); - } } - - } - } catch (Exception interruptEx) { - throw new RuntimeException(interruptEx); - } finally { - executorService.shutdown(); - while (!executorService.isTerminated()) { - // -------------------------------------- - // wait for all tasks to finish execution - // -------------------------------------- - //LOGGER.info("Still waiting for all threads to complete execution..."); + } catch (Exception interruptEx) { + throw new RuntimeException(interruptEx); + } finally { + executorService.shutdown(); + while (!executorService.isTerminated()) { + // -------------------------------------- + // wait for all tasks to finish execution + // -------------------------------------- + //LOGGER.info("Still waiting for all threads to complete execution..."); + } + LOGGER.warn("** Completed executing all virtual-user scenarios! **"); } - LOGGER.warn("** Completed executing all virtual-user scenarios! **"); - } + }); } public void runCallables() { @@ -156,42 +176,42 @@ public void runCallables() { } public void runCallableFutures() { + executeWithAbortTimeout(() -> { - if (callables == null || callables.size() == 0) { - throw new RuntimeException("No callable(s) was found to run. You can add one or more callables using 'addCallable(Callable callable)'"); - } + if (callables == null || callables.size() == 0) { + throw new RuntimeException("No callable(s) was found to run. You can add one or more callables using 'addCallable(Callable callable)'"); + } - ExecutorService executorService = newFixedThreadPool(numberOfThreads); + ExecutorService executorService = newFixedThreadPool(numberOfThreads); - try { - executorService.invokeAll(callables).stream().forEach(future -> { - for (int j = 0; j < numberOfThreads; j++) { - try { - LOGGER.debug("Waiting in the transit for next test flight to adjust overall ramp up time, wait time now = " + delayBetweenTwoThreadsInMilliSecs); - sleep(delayBetweenTwoThreadsInMilliSecs.longValue()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + try { + executorService.invokeAll(callables).stream().forEach(future -> { + for (int j = 0; j < numberOfThreads; j++) { + try { + LOGGER.debug("Waiting in the transit for next test flight to adjust overall ramp up time, wait time now = " + delayBetweenTwoThreadsInMilliSecs); + sleep(delayBetweenTwoThreadsInMilliSecs.longValue()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - LOGGER.debug(Thread.currentThread().getName() + " Future execution- Start.... Time = " + now()); + LOGGER.debug(Thread.currentThread().getName() + " Future execution- Start.... Time = " + now()); - execute(future); + execute(future); - LOGGER.debug(Thread.currentThread().getName() + " Future execution- *Finished Time = " + now()); + LOGGER.debug(Thread.currentThread().getName() + " Future execution- *Finished Time = " + now()); + } + }); + } catch (InterruptedException interruptEx) { + throw new RuntimeException(interruptEx); + } finally { + executorService.shutdown(); + while (!executorService.isTerminated()) { + // wait for all tasks to finish executing + // LOGGER.info("Still waiting for all threads to complete execution..."); } - }); - } catch (InterruptedException interruptEx) { - throw new RuntimeException(interruptEx); - } finally { - executorService.shutdown(); - while (!executorService.isTerminated()) { - // wait for all tasks to finish executing - // LOGGER.info("Still waiting for all threads to complete execution..."); + LOGGER.warn("* Completed executing all virtual-user scenarios! *"); } - LOGGER.warn("* Completed executing all virtual-user scenarios! *"); - } - - + }); } public Callable createCallableFuture(T objectToConsumer, Consumer consumer) { @@ -210,6 +230,24 @@ private Object execute(Future future) { } } + private void executeWithAbortTimeout(Runnable runnable) { + ExecutorService executorService = newSingleThreadExecutor(); + Future future = executorService.submit(runnable); + try { + future.get(abortAfterTimeLapsedInSeconds, TimeUnit.SECONDS); + } catch (TimeoutException timeoutEx) { + future.cancel(true); + throw new RuntimeException(timeoutEx); + } catch (InterruptedException interruptEx) { + Thread.currentThread().interrupt(); + future.cancel(true); + throw new RuntimeException(interruptEx); + } catch (ExecutionException executionEx) { + throw new RuntimeException(executionEx); + } finally { + executorService.shutdownNow(); + } + } private void calculateAndSetDelayBetweenTwoThreadsInSecs(int rampUpPeriod) { if (rampUpPeriod == 0) { @@ -242,6 +280,7 @@ private void logLoadingProperties() { "\n ### numberOfThreads : " + numberOfThreads + "\n ### rampUpPeriodInSeconds : " + rampUpPeriod + "\n ### loopCount : " + loopCount + + "\n ### abortAfterTimeLapsedInSeconds : " + abortAfterTimeLapsedInSeconds + "\n-----------------------------------\n"); } diff --git a/core/src/test/java/org/jsmart/zerocode/parallel/simple/LoadTest.java b/core/src/test/java/org/jsmart/zerocode/parallel/simple/LoadTest.java index 0958c24a2..9f36cc6fd 100644 --- a/core/src/test/java/org/jsmart/zerocode/parallel/simple/LoadTest.java +++ b/core/src/test/java/org/jsmart/zerocode/parallel/simple/LoadTest.java @@ -10,7 +10,9 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; import static org.hamcrest.core.Is.is; +import static org.hamcrest.CoreMatchers.equalTo; public class LoadTest { @@ -76,5 +78,37 @@ public void testLoad_Fail() { assertThat(passedCounter.get(), is(0)); } + @Test + public void testLoad_Timeout() { + ExecutorServiceRunner executorServiceRunner = new ExecutorServiceRunner(3, 3, 6, 3); + + final AtomicInteger passedCounter = new AtomicInteger(); + final AtomicInteger failedCounter = new AtomicInteger(); + + Runnable taskSampleTest = () -> { + System.out.println(Thread.currentThread().getName() + " JunitTestSample test- Start. Time = " + LocalDateTime.now()); + + Result result = (new JUnitCore()).run(Request.method(JunitTestSample.class, "testFirstName")); + + System.out.println(Thread.currentThread().getName() + " JunitTestSample test- *Finished Time, result = " + LocalDateTime.now() + " -" + result.wasSuccessful()); + + if(result.wasSuccessful()){ + passedCounter.incrementAndGet(); + } else { + failedCounter.incrementAndGet(); + } + }; + + executorServiceRunner.addRunnable(taskSampleTest); + + RuntimeException e = assertThrows(RuntimeException.class, executorServiceRunner::runRunnables); + assertThat(e.getMessage(), equalTo("java.util.concurrent.TimeoutException")); + + System.out.println(">>> passed count:" + passedCounter.get()); + System.out.println(">>> failed count:" + failedCounter.get()); + System.out.println(">>> Total test count:" + (failedCounter.get() + passedCounter.get())); + + assertThat(failedCounter.get(), is(0)); + } }