diff --git a/README.md b/README.md index 4314260..ae0cf7a 100644 --- a/README.md +++ b/README.md @@ -1 +1,7 @@ -# java_homeworks \ No newline at end of file +### Запуск jcstress + +``` +mvn clean install + +java -jar target/jcstress.jar +``` diff --git a/simpleThreadPool/pom.xml b/simpleThreadPool/pom.xml new file mode 100644 index 0000000..02392e1 --- /dev/null +++ b/simpleThreadPool/pom.xml @@ -0,0 +1,127 @@ + + + + 4.0.0 + + 1 + simpleThreadPool + 0.0.1-SNAPSHOT + jar + + JCStress test sample + + + + + 3.0 + + + + + org.openjdk.jcstress + jcstress-core + ${jcstress.version} + + + junit + junit + 4.12 + test + + + + + UTF-8 + + + 0.4 + + + 1.8 + + + jcstress + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${javac.target} + ${javac.target} + ${javac.target} + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.2 + + + main + package + + shade + + + ${uberjar.name} + + + org.openjdk.jcstress.Main + + + META-INF/TestList + + + + + + + + + + diff --git a/simpleThreadPool/src/main/java/simpleThreadPool/LightExecutionException.java b/simpleThreadPool/src/main/java/simpleThreadPool/LightExecutionException.java new file mode 100644 index 0000000..d30e7f4 --- /dev/null +++ b/simpleThreadPool/src/main/java/simpleThreadPool/LightExecutionException.java @@ -0,0 +1,3 @@ +package simpleThreadPool; + +public class LightExecutionException extends Exception {} diff --git a/simpleThreadPool/src/main/java/simpleThreadPool/LightFuture.java b/simpleThreadPool/src/main/java/simpleThreadPool/LightFuture.java new file mode 100644 index 0000000..b434d20 --- /dev/null +++ b/simpleThreadPool/src/main/java/simpleThreadPool/LightFuture.java @@ -0,0 +1,12 @@ +package simpleThreadPool; + +import java.util.function.Function; + +public interface LightFuture { + + public boolean isReady(); + + public R get() throws LightExecutionException, InterruptedException; + + public LightFuture thenApply(Function nextFunc); +} diff --git a/simpleThreadPool/src/main/java/simpleThreadPool/SubmitGetTest.java b/simpleThreadPool/src/main/java/simpleThreadPool/SubmitGetTest.java new file mode 100644 index 0000000..a6bb41c --- /dev/null +++ b/simpleThreadPool/src/main/java/simpleThreadPool/SubmitGetTest.java @@ -0,0 +1,48 @@ +package simpleThreadPool; + +import java.util.Random; + +import org.openjdk.jcstress.annotations.*; +import org.openjdk.jcstress.infra.results.II_Result; +import org.openjdk.jcstress.infra.results.I_Result; + +@JCStressTest +@Outcome(id = "0, 0", expect = Expect.ACCEPTABLE, desc = "Good outcome.") +@State +public class SubmitGetTest { + + static ThreadPool pool = new ThreadPoolImpl(2); + Random rand = new Random(10); + + void filler(II_Result r) { + Integer rnd = rand.nextInt(); + LightFuture f1 = pool.submit(() -> rnd); + LightFuture f2 = f1.thenApply((i) -> i + 1); + try { + r.r1 = rnd - f1.get(); + r.r2 = rnd + 1 - f2.get(); + } catch (LightExecutionException | InterruptedException e) { + int i = 1/0; + } + } + + @Actor + public void actor1(II_Result r) { + filler(r); + } + + @Actor + public void actor2(II_Result r) { + filler(r); + } + + @Actor + public void actor3(II_Result r) { + filler(r); + } + + @Actor + public void actor4(II_Result r) { + filler(r); + } +} diff --git a/simpleThreadPool/src/main/java/simpleThreadPool/SupplierFuturePair.java b/simpleThreadPool/src/main/java/simpleThreadPool/SupplierFuturePair.java new file mode 100644 index 0000000..8183e78 --- /dev/null +++ b/simpleThreadPool/src/main/java/simpleThreadPool/SupplierFuturePair.java @@ -0,0 +1,15 @@ +package simpleThreadPool; + +import java.util.function.Supplier; + +import simpleThreadPool.Worker.LightFutureImpl; + +public class SupplierFuturePair { + Supplier supplier; + LightFutureImpl future; + + public SupplierFuturePair(Supplier suppl, LightFutureImpl fut) { + supplier = suppl; + future = fut; + } +} diff --git a/simpleThreadPool/src/main/java/simpleThreadPool/ThreadPool.java b/simpleThreadPool/src/main/java/simpleThreadPool/ThreadPool.java new file mode 100644 index 0000000..50ce89d --- /dev/null +++ b/simpleThreadPool/src/main/java/simpleThreadPool/ThreadPool.java @@ -0,0 +1,9 @@ +package simpleThreadPool; + +import java.util.function.Supplier; + +public interface ThreadPool { + public LightFuture submit(Supplier supp); + + public void shutdown(); +} diff --git a/simpleThreadPool/src/main/java/simpleThreadPool/ThreadPoolImpl.java b/simpleThreadPool/src/main/java/simpleThreadPool/ThreadPoolImpl.java new file mode 100644 index 0000000..28676b1 --- /dev/null +++ b/simpleThreadPool/src/main/java/simpleThreadPool/ThreadPoolImpl.java @@ -0,0 +1,45 @@ +package simpleThreadPool; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.function.Supplier; + +import simpleThreadPool.Worker.LightFutureImpl; + +public class ThreadPoolImpl implements ThreadPool{ + + private Queue tasks = new LinkedList<>(); + + private List pool = new ArrayList<>(); + + public ThreadPoolImpl(int nthreads) { + for (int i = 0; i < nthreads; i++) { + Thread t = new Thread(new Worker(tasks)); + t.setDaemon(true); + pool.add(t); + t.start(); + } + } + + @Override + public LightFuture submit(Supplier supp) { + SupplierFuturePair pair = new SupplierFuturePair(supp, + new LightFutureImpl(this)); + + synchronized (tasks) { + tasks.add(pair); + tasks.notify(); + } + + return pair.future; + } + + @Override + public void shutdown() { + for (Thread t : pool) { + t.interrupt(); + } + } +} \ No newline at end of file diff --git a/simpleThreadPool/src/main/java/simpleThreadPool/Worker.java b/simpleThreadPool/src/main/java/simpleThreadPool/Worker.java new file mode 100644 index 0000000..13bdfcb --- /dev/null +++ b/simpleThreadPool/src/main/java/simpleThreadPool/Worker.java @@ -0,0 +1,94 @@ +package simpleThreadPool; + +import java.util.Queue; +import java.util.function.Function; + +public class Worker implements Runnable { + + private Queue queue; + + public Worker(Queue q) { + queue = q; + } + + static class LightFutureImpl implements LightFuture { + private volatile boolean isready; + private ThreadPool masterPool; + + private R result; + private LightExecutionException execExcept; + + public LightFutureImpl(ThreadPool tp) { + masterPool = tp; + } + + @Override + public boolean isReady() { + return isready; + } + + @Override + public R get() throws LightExecutionException, InterruptedException { + if (!isready) { + synchronized (this) { + while (!isready) { + this.wait(); + } + } + } + + if (execExcept != null) { + throw execExcept; + } + + return result; + } + + @Override + public LightFuture thenApply(Function nextFunc) { + return masterPool.submit(() -> { + R1 res = null; + try { + res = nextFunc.apply(get()); + } catch (Exception e) { + throw new RuntimeException(); + } + return res; + }); + } + + } + + @Override + public void run() { +a: while (true) { + SupplierFuturePair workPair; + + synchronized (queue) { + while (queue.isEmpty()) { + try { + queue.wait(); + } catch (InterruptedException e) { + break a; + } + + } + workPair = queue.poll(); + } + + LightFutureImpl future = workPair.future; + synchronized (future) { + + try { + future.result = workPair.supplier.get(); + } catch (Exception e) { + future.execExcept = new LightExecutionException(); + } + + future.isready = true; + future.notifyAll(); + } + } + } + +} diff --git a/simpleThreadPool/src/test/java/simpleThreadPool/PoolTest.java b/simpleThreadPool/src/test/java/simpleThreadPool/PoolTest.java new file mode 100644 index 0000000..c7f7f46 --- /dev/null +++ b/simpleThreadPool/src/test/java/simpleThreadPool/PoolTest.java @@ -0,0 +1,121 @@ +package simpleThreadPool; + +import java.util.Collection; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.junit.Assert; +import org.junit.Test; + +public class PoolTest extends Assert{ + + @Test + public void testBasicWorkability() throws LightExecutionException, InterruptedException { + ThreadPool pool = new ThreadPoolImpl(1); + LightFuture f1 = pool.submit(() -> 1); + LightFuture f2 = pool.submit(() -> 2); + assertEquals(Integer.valueOf(1), f1.get()); + assertEquals(Integer.valueOf(2), f2.get()); + } + + @Test + public void testFutureReady() throws LightExecutionException, InterruptedException { + + ThreadPool pool = new ThreadPoolImpl(1); + + Lock l1 = new ReentrantLock(); + l1.lock(); + + LightFuture f1 = pool.submit(() -> { + l1.lock(); + return 1; + }); + assertFalse(f1.isReady()); + + l1.unlock(); + f1.get(); + assertTrue(f1.isReady()); + } + + @Test + public void testMultipleThenApply() throws LightExecutionException, InterruptedException { + ThreadPool pool = new ThreadPoolImpl(1); + + LightFuture f = pool + .submit(() -> 1) + .thenApply((i) -> i + 1) + .thenApply((i) -> i + 1); + + assertEquals(Integer.valueOf(3), f.get()); + + } + + @Test(expected = LightExecutionException.class) + public void testException() throws LightExecutionException, InterruptedException { + ThreadPool pool = new ThreadPoolImpl(1); + pool.submit(() -> 1/0).get(); + } + + @Test(expected = LightExecutionException.class) + public void testExceptionThenApply() throws LightExecutionException, InterruptedException { + ThreadPool pool = new ThreadPoolImpl(1); + pool + .submit(() -> 1/0) + .thenApply((i) -> i + 1) + .get(); + } + + @Test(expected = LightExecutionException.class) + public void testInterruption() throws LightExecutionException, InterruptedException { + ThreadPool pool = new ThreadPoolImpl(1); + Object l = new Object(); + LightFuture f = pool + .submit(() -> { + try { + synchronized(l) { + while(true) + l.wait(); + } + } catch (InterruptedException e) { + throw new RuntimeException(); + } + //return 1; + }); + pool.shutdown(); + f.get(); + } + + @Test + public void testContainsNThreads() throws LightExecutionException, InterruptedException { + for (int nth = 1; nth < 50; nth++) { + ThreadPool pool = new ThreadPoolImpl(nth); + Lock l = new ReentrantLock(); + Collection thNames = new LinkedBlockingQueue<>(); + l.lock(); + LightFuture f = pool + .submit(() -> { + l.lock(); + thNames.add(Thread.currentThread().getName()); + return 1;}); + for (int j = 0; j < nth + 2; j++) { + f = f.thenApply((i) -> { + thNames.add(Thread.currentThread().getName()); + return 0;}); + + } + Thread.sleep(10); + l.unlock(); + f.get(); + assertEquals(nth + 3, thNames.size()); + assertEquals(nth, thNames.stream().distinct().count()); + } + } + + @Test + public void testSameObjects() throws LightExecutionException, InterruptedException { + ThreadPool pool = new ThreadPoolImpl(1); + LightFuture f1 = pool.submit(() -> 1); + assertTrue(f1.get() == f1.get()); + } +}