- BuggyQueue
- org.eclipse.jdt.core.javabuilder
- org.eclipse.jdt.core.javanature
+group 'livelessons'
+version '1.0-SNAPSHOT'
+apply plugin: 'java'
+targetCompatibility = 1.8
+sourceCompatibility = 1.8
+repositories {
+ mavenCentral()
+dependencies {
+ testCompile group: 'junit', name: 'junit', version: '4.11'
-import java.util.concurrent.*;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.List;
- * @class BuggyQueueTest
- *
- * @brief Test program for the SimpleQueue that induces race
- * conditions due to lack of synchronization.
- */
-public class BuggyQueueTest
- /**
- * Maximum number of iterations.
- */
- private final static int mMaxIterations = 1000000;
- /**
- * Maximum size of the queue.
- */
- private final static int mQueueSize = 10;
- /**
- * Count the number of iterations.
- */
- private final static AtomicInteger mCount =
- new AtomicInteger(0);
- /**
- * @class ProducerThread
- *
- * @brief This producer runs in a separate Java Thread and passes
- * Strings to a consumer Thread via a shared BlockingQueue.
- */
- static class ProducerThread extends Thread {
- /**
- * This queue is shared with the consumer.
- */
- private final BQ mQueue;
- /**
- * Constructor initializes the BlockingQueue data
- * member.
- */
- ProducerThread(BQ blockingQueue) {
- mQueue = blockingQueue;
- }
- /**
- * This method runs in a separate Java Thread and passes
- * Strings to a consumer Thread via a shared BlockingQueue.
- */
- public void run(){
- try {
- for(int i = 0; i < mMaxIterations; i++) {
- mCount.incrementAndGet();
- // Calls the put() method.
- mQueue.put(Integer.toString(i));
- }
- } catch (InterruptedException e) {
- System.out.println("InterruptedException caught");
- }
- }
- }
- /**
- * @class ConsumerThread
- *
- * @brief This consumer runs in a separate Java Thread and
- * receives Strings from a producer Thread via a shared
- * BlockingQueue.
- */
- static class ConsumerThread extends Thread {
- /**
- * This queue is shared with the producer.
- */
- private final BQ mQueue;
- /**
- * Constructor initializes the BlockingQueue data member.
- */
- ConsumerThread(BQ blockingQueue) {
- mQueue = blockingQueue;
- }
- /**
- * This method runs in a separate Java Thread and receives
- * Strings from a producer Thread via a shared BlockingQueue.
- */
- public void run(){
- Object s = null;
- try {
- for(int i = 0; i < mMaxIterations; i++) {
- // Calls the take() method.
- s = mQueue.take();
- mCount.decrementAndGet();
- if((i % (mMaxIterations / 10)) == 0)
- System.out.println(s == null ? "" : s);
- }
- } catch (InterruptedException e) {
- System.out.println("InterruptedException caught");
- }
- System.out.println("Final size of the queue is "
- + mQueue.size()
- + "\nmCount is "
- + mCount.get()
- + "\nFinal value is "
- + s);
- }
- }
- /**
- * Main entry point that tests the SimpleQueue class.
- */
- public static void main(String argv[]) {
- final SimpleQueue simpleQueue =
- new SimpleQueue(); // (mQueueSize);
- try {
- // Create a ProducerThread.
- Thread producer =
- new ProducerThread(simpleQueue);
- // Create a ConsumerThread.
- Thread consumer =
- new ConsumerThread(simpleQueue);
- // Run both Threads concurrently.
- producer.start();
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {}
- consumer.start();
- // Wait for both Threads to stop.
- producer.join();
- consumer.join();
- } catch (Exception e) {
- System.out.println("caught exception");
- }
- }
-import java.util.List;
-import java.util.concurrent.CyclicBarrier;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
- * @class SimpleQueue
- *
- * @brief Defines an implementation of the BlockingQueue interface
- * that (intentially) doesn't work properly when accessed via
- * multiple threads since it's not synchronized properly.
- */
-class SimpleQueue implements BlockingQueue {
- /**
- * The queue consists of a List of E's.
- */
- private List mList = new ArrayList();
- /**
- * The maximum capacity of the queue or Integer.MAX_VALUE if none.
- */
- private final int mCapacity;
- /**
- * Create a SimpleBlocking queue with a capacity of
- * Integer.MAX_VALUE.
- */
- public SimpleQueue() {
- this(Integer.MAX_VALUE);
- }
- /**
- * Create a SimpleBlocking queue with the given capacity.
- */
- public SimpleQueue(int capacity) {
- if (capacity <= 0)
- throw new IllegalArgumentException();
- mCapacity = capacity;
- mList = new ArrayList();
- }
- /**
- * True if the queue is empty.
- */
- public boolean isEmpty() {
- return mList.size() == 0;
- }
- /**
- * Returns true if the queue is full, else false.
- */
- private boolean isFull() {
- return mList.size() == mCapacity;
- }
- /**
- * Add a new E to the end of the queue.
- */
- public void put(E msg) throws InterruptedException {
- if (isFull() == false)
- mList.add(msg);
- }
- /**
- * Remove the E at the front of the queue.
- */
- public E take() throws InterruptedException {
- if (isEmpty() == false)
- return mList.remove(0);
- else
- return null;
- }
- /**
- * Returns the number of elements in this queue.
- */
- public int size() {
- return mList.size();
- }
- /**
- * All these methods are inherited from the BlockingQueue
- * interface. They are defined as no-ops to ensure the "Buggyness"
- * of this class ;-)
- */
- public int drainTo(Collection super E> c) {
- return 0;
- }
- public int drainTo(Collection super E> c, int maxElements) {
- return 0;
- }
- public boolean contains(Object o) {
- return false;
- }
- public boolean remove(Object o) {
- return false;
- }
- public int remainingCapacity() {
- return 0;
- }
- public E poll() {
- return null;
- }
- public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- return take();
- }
- public E peek() {
- return null;
- }
- public boolean offer(E e) {
- return false;
- }
- public boolean offer(E e, long timeout, TimeUnit unit) {
- try {
- put(e);
- }
- catch (InterruptedException ex) {
- // Just swallow this exception for this simple (buggy) test.
- }
- return true;
- }
- public boolean add(E e) {
- return false;
- }
- public E element() {
- return null;
- }
- public E remove() {
- return null;
- }
- public void clear() {
- }
- public boolean retainAll(Collection> collection) {
- return false;
- }
- public boolean removeAll(Collection> collection) {
- return false;
- }
- public boolean addAll(Collection extends E> collection) {
- return false;
- }
- public boolean containsAll(Collection> collection) {
- return false;
- }
- public Object[] toArray() {
- return null;
- }
- public T[] toArray(T[] array) {
- return null;
- }
- public Iterator iterator() {
- return null;
- }
+package edu.vandy;
+ * Defines an interface for a bounded queue.
+ */
+public interface BoundedQueue {
+ /**
+ * Inserts the specified element into this queue, waiting if
+ * necessary for space to become available.
+ *
+ * @param e the element to add
+ * @throws InterruptedException if interrupted while waiting
+ */
+ default void put(E e)
+ throws InterruptedException {
+ }
+ /**
+ * Retrieves and removes the head of this queue, waiting if necessary
+ * until an element becomes available.
+ *
+ * @return the head of this queue
+ * @throws InterruptedException if interrupted while waiting
+ */
+ default E take()
+ throws InterruptedException {
+ return null;
+ }
+ /**
+ * Retrieves and removes the head of this queue, or returns {@code
+ * null} if this queue is empty.
+ *
+ * @return the head of this queue, or {@code null} if this queue is empty
+ */
+ default E poll() {
+ return null;
+ }
+ /**
+ * Inserts the specified element into this queue if it is possible to do
+ * so immediately without violating capacity restrictions, returning
+ * {@code true} upon success and {@code false} if no space is currently
+ * available.
+ *
+ * @return {@code true} if the element was added to this queue, else
+ * {@code false}
+ */
+ default boolean offer(E e) {
+ return false;
+ }
+ /**
+ * Returns true if this queue contains no elements, else false.
+ *
+ * @return true if this queue contains no elements, else false.
+ */
+ boolean isEmpty();
+ /**
+ * Returns true if this queue is full, else false.
+ *
+ * @return true if this queue is full, else false.
+ */
+ boolean isFull();
+ /**
+ * Returns the number of elements in this queue.
+ *
+ * @return the number of elements in this collection
+ */
+ int size();
+package edu.vandy;
+import java.util.List;
+import java.util.concurrent.CyclicBarrier;
+import java.util.LinkedList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+ * Defines an implementation of the BoundedQueue interface that
+ * (intentially) doesn't work properly when accessed via multiple
+ * threads since it's not synchronized properly.
+ */
+class BuggyQueue
+ implements BoundedQueue {
+ /**
+ * The queue consists of a LinkedList of E's.
+ */
+ private List mList = new LinkedList<>();
+ /**
+ * The maximum capacity of the queue or Integer.MAX_VALUE if none.
+ */
+ private final int mCapacity;
+ /**
+ * Create a BuggyQueue with a capacity of Integer.MAX_VALUE.
+ */
+ public BuggyQueue() {
+ this(Integer.MAX_VALUE);
+ }
+ /**
+ * Create a BuggyQueue with the given capacity.
+ */
+ public BuggyQueue(int capacity) {
+ if (capacity <= 0)
+ throw new IllegalArgumentException();
+ mCapacity = capacity;
+ mList = new LinkedList<>();
+ }
+ /**
+ * Retrieves and removes the head of this queue, or returns {@code
+ * null} if this queue is empty.
+ *
+ * @return the head of this queue, or {@code null} if this queue is empty
+ */
+ public E poll() {
+ if (!isEmpty())
+ return mList.remove(0);
+ else
+ return null;
+ }
+ /**
+ * Inserts the specified element into this queue if it is possible to do
+ * so immediately without violating capacity restrictions, returning
+ * {@code true} upon success and {@code false} if no space is currently
+ * available.
+ *
+ * @return {@code true} if the element was added to this queue, else
+ * {@code false}
+ */
+ public boolean offer(E e) {
+ if (!isFull()) {
+ mList.add(e);
+ return true;
+ } else
+ return false;
+ }
+ /**
+ * Returns true if this queue contains no elements, else false.
+ *
+ * @return true if this queue contains no elements, else false.
+ */
+ public boolean isEmpty() {
+ return mList.size() == 0;
+ }
+ /**
+ * Returns true if this queue is full, else false.
+ *
+ * @return true if this queue is full, else false.
+ */
+ public boolean isFull() {
+ return mList.size() == mCapacity;
+ }
+ /**
+ * Returns the number of elements in this queue.
+ *
+ * @return the number of elements in this collection
+ */
+ public int size() {
+ return mList.size();
+ }
+package edu.vandy;
+import static org.junit.Assert.assertEquals;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Test;
+ * Test program for the BuggyQueue that induces race conditions due to
+ * lack of synchronization.
+ */
+public class BuggyQueueTest {
+ /**
+ * Maximum number of iterations.
+ */
+ private final static int mMaxIterations = 100000;
+ /**
+ * Maximum size of the queue.
+ */
+ private final static int sQUEUE_SIZE = 10;
+ /**
+ * Count the number of iterations.
+ */
+ private final static AtomicInteger mCount =
+ new AtomicInteger(0);
+ /**
+ * This producer runs in a separate Java thread and passes strings
+ * to a consumer thread via a shared BoundedQueue.
+ */
+ private static class Producer>
+ implements Runnable {
+ /**
+ * This queue is shared with the consumer.
+ */
+ private final BQ mQueue;
+ /**
+ * Constructor initializes the BoundedQueue data member.
+ */
+ Producer(BQ blockingQueue) {
+ mQueue = blockingQueue;
+ }
+ /**
+ * This method runs in a separate Java thread and passes
+ * strings to a consumer thread via a shared BoundedQueue.
+ */
+ public void run() {
+ for (int i = 0; i < mMaxIterations; ) {
+ // Calls the offer() method.
+ if (mQueue.offer(i)) {
+ i++;
+ mCount.incrementAndGet();
+ } else
+ Thread.yield();
+ }
+ }
+ }
+ /**
+ * This consumer runs in a separate Java thread and receives
+ * strings from a producer thread via a shared BoundedQueue.
+ */
+ private static class Consumer>
+ implements Runnable {
+ /**
+ * This queue is shared with the producer.
+ */
+ private final BQ mQueue;
+ /**
+ * Constructor initializes the BoundedQueue data member.
+ */
+ Consumer(BQ blockingQueue) {
+ mQueue = blockingQueue;
+ }
+ /**
+ * This method runs in a separate Java thread and receives
+ * strings from a producer thread via a shared BoundedQueue.
+ */
+ public void run() {
+ Integer integer = null;
+ int nullCount = 0;
+ // Get the first item from the queue.
+ Integer previous = null;
+ // Get the first non-null value.
+ while ((previous = mQueue.poll()) == null)
+ continue;
+ mCount.decrementAndGet();
+ for (int i = 1; i < mMaxIterations; ) {
+ // Try to get the next integer.
+ integer = mQueue.poll();
+ // Only update the state if we get a non-null
+ // value from take().
+ if (integer != null) {
+ // Make sure the entries are ordered.
+ assertEquals(previous + 1, integer.intValue());
+ previous = integer;
+ if ((i % (mMaxIterations / 10)) == 0)
+ System.out.println(integer);
+ mCount.decrementAndGet();
+ i++;
+ } else {
+ nullCount++;
+ Thread.yield();
+ }
+ }
+ assertEquals(0, mCount.get());
+ System.out.println("Final size of the queue is "
+ + mQueue.size()
+ + "\nmCount is "
+ + mCount.get()
+ + "\nFinal value is "
+ + integer
+ + "\nnumber of null returns from take() is "
+ + nullCount
+ + "\nmCount + nullCount is "
+ + (mCount.get() + nullCount));
+ }
+ }
+ /**
+ * Main entry point that tests the SimpleQueue class.
+ */
+ @Test(timeout=10000)
+ public void testBuggyQueue() {
+ final BuggyQueue buggyQueue =
+ new BuggyQueue<>(sQUEUE_SIZE);
+ try {
+ // Create producer and consumer threads.
+ Thread[] threads = new Thread[] {
+ new Thread(new Producer<>(buggyQueue)),
+ new Thread(new Consumer<>(buggyQueue))
+ };
+ // Record the start time.
+ long startTime = System.nanoTime();
+ // Start all the threads.
+ for (Thread thread : threads)
+ thread.start();
+ // Wait for all threads to stop.
+ for (Thread thread : threads)
+ thread.join();
+ System.out.println("test ran in "
+ + (System.nanoTime() - startTime) / 1_000_000
+ + " msecs");
+ } catch (Exception e) {
+ System.out.println("caught exception");
+ }
+ }
- BusySynchronizedQueue
- org.eclipse.jdt.core.javabuilder
- org.eclipse.jdt.core.javanature
+group 'livelessons'
+version '1.0-SNAPSHOT'
+apply plugin: 'java'
+targetCompatibility = 1.8
+sourceCompatibility = 1.8
+repositories {
+ mavenCentral()
+dependencies {
+ testCompile group: 'junit', name: 'junit', version: '4.11'
-import java.util.List;
-import java.util.concurrent.CyclicBarrier;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
- * @class BusySynchronizedQueue
- *
- * @brief Defines an implementation of the BlockingQueue interface
- * that works properly when accessed via multiple threads since
- * it's synchronized properly, but is inefficient since it
- * "busy waits".
- */
-class BusySynchronizedQueue implements BlockingQueue {
- /**
- * The queue consists of a List of E's.
- */
- private List mList = new ArrayList();
- /**
- * The maximum capacity of the queue or Integer.MAX_VALUE if none.
- */
- private final int mCapacity;
- /**
- * Create a SimpleBlocking queue with a capacity of
- * Integer.MAX_VALUE.
- */
- public BusySynchronizedQueue() {
- this(Integer.MAX_VALUE);
- }
- /**
- * Create a SimpleBlocking queue with the given capacity.
- */
- public BusySynchronizedQueue(int capacity) {
- if (capacity <= 0)
- throw new IllegalArgumentException();
- mCapacity = capacity;
- mList = new ArrayList();
- }
- /**
- * True if the queue is empty.
- */
- public synchronized boolean isEmpty() {
- return mList.size() == 0;
- }
- /**
- * Returns true if the queue is full, else false.
- */
- private synchronized boolean isFull() {
- return mList.size() == mCapacity;
- }
- /**
- * Add a new E to the end of the queue.
- */
- public synchronized void put(E msg) throws InterruptedException {
- if (isFull() == false)
- mList.add(msg);
- }
- /**
- * Remove the E at the front of the queue.
- */
- public synchronized E take() throws InterruptedException {
- if (isEmpty() == false)
- return mList.remove(0);
- else
- return null;
- }
- /**
- * Returns the number of elements in this queue.
- */
- public synchronized int size() {
- return mList.size();
- }
- /**
- * All these methods are inherited from the BlockingQueue
- * interface. All are defined as no-ops for simplicity.
- */
- public int drainTo(Collection super E> c) {
- return 0;
- }
- public int drainTo(Collection super E> c, int maxElements) {
- return 0;
- }
- public boolean contains(Object o) {
- return false;
- }
- public boolean remove(Object o) {
- return false;
- }
- public int remainingCapacity() {
- return 0;
- }
- public E poll() {
- return null;
- }
- public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- return take();
- }
- public E peek() {
- return null;
- }
- public boolean offer(E e) {
- return false;
- }
- public boolean offer(E e, long timeout, TimeUnit unit) {
- try {
- put(e);
- }
- catch (InterruptedException ex) {
- // Just swallow this exception for this simple (buggy) test.
- }
- return true;
- }
- public boolean add(E e) {
- return false;
- }
- public E element() {
- return null;
- }
- public E remove() {
- return null;
- }
- public void clear() {
- }
- public boolean retainAll(Collection> collection) {
- return false;
- }
- public boolean removeAll(Collection> collection) {
- return false;
- }
- public boolean addAll(Collection extends E> collection) {
- return false;
- }
- public boolean containsAll(Collection> collection) {
- return false;
- }
- public Object[] toArray() {
- return null;
- }
- public T[] toArray(T[] array) {
- return null;
- }
- public Iterator iterator() {
- return null;
- }
-import java.util.concurrent.*;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.List;
- * @class BusySynchronizedQueueTest
- *
- * @brief Test program for the BusySynchronizedQueue that is
- * inefficient due to "busy waiting".
- */
-public class BusySynchronizedQueueTest
- /**
- * Maximum number of iterations.
- */
- private final static int mMaxIterations = 1000000;
- /**
- * Maximum size of the queue.
- */
- private final static int mQueueSize = 10;
- /**
- * Count the number of iterations.
- */
- private final static AtomicInteger mCount =
- new AtomicInteger(0);
- /**
- * @class ProducerThread
- *
- * @brief This producer runs in a separate Java Thread and passes
- * Strings to a consumer Thread via a shared BlockingQueue.
- */
- static class ProducerThread extends Thread {
- /**
- * This queue is shared with the consumer.
- */
- private final BQ mQueue;
- /**
- * Constructor initializes the BlockingQueue data
- * member.
- */
- ProducerThread(BQ blockingQueue) {
- mQueue = blockingQueue;
- }
- /**
- * This method runs in a separate Java Thread and passes
- * Strings to a consumer Thread via a shared BlockingQueue.
- */
- public void run(){
- try {
- for(int i = 0; i < mMaxIterations; i++) {
- mCount.incrementAndGet();
- // Calls the put() method.
- mQueue.put(Integer.toString(i));
- }
- } catch (InterruptedException e) {
- System.out.println("InterruptedException caught");
- }
- }
- }
- /**
- * @class ConsumerThread
- *
- * @brief This consumer runs in a separate Java Thread and
- * receives Strings from a producer Thread via a shared
- * BlockingQueue.
- */
- static class ConsumerThread extends Thread {
- /**
- * This queue is shared with the producer.
- */
- private final BQ mQueue;
- /**
- * Constructor initializes the BlockingQueue data member.
- */
- ConsumerThread(BQ blockingQueue) {
- mQueue = blockingQueue;
- }
- /**
- * This method runs in a separate Java Thread and receives
- * Strings from a producer Thread via a shared BlockingQueue.
- */
- public void run(){
- Object s = null;
- try {
- for(int i = 0; i < mMaxIterations; i++) {
- // Calls the take() method.
- s = mQueue.take();
- mCount.decrementAndGet();
- if ((i % (mMaxIterations / 10)) == 0)
- System.out.println(s == null ? "" : s);
- }
- } catch (InterruptedException e) {
- System.out.println("InterruptedException caught");
- }
- System.out.println("Final size of the queue is "
- + mQueue.size()
- + "\nmCount is "
- + mCount.get()
- + "\nFinal value is "
- + s);
- }
- }
- /**
- * Main entry point that tests the BusySynchronizedQueue class.
- */
- public static void main(String argv[]) {
- final BusySynchronizedQueue busySynchronizedQueue =
- new BusySynchronizedQueue(); // (mQueueSize);
- try {
- // Create a ProducerThread.
- Thread producer =
- new ProducerThread(busySynchronizedQueue);
- // Create a ConsumerThread.
- Thread consumer =
- new ConsumerThread(busySynchronizedQueue);
- // Run both Threads concurrently.
- producer.start();
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {}
- consumer.start();
- // Wait for both Threads to stop.
- producer.join();
- consumer.join();
- } catch (Exception e) {
- System.out.println("caught exception");
- }
- }
+package edu.vandy;
+ * Defines an interface for a bounded queue.
+ */
+public interface BoundedQueue {
+ /**
+ * Inserts the specified element into this queue, waiting if
+ * necessary for space to become available.
+ *
+ * @param e the element to add
+ * @throws InterruptedException if interrupted while waiting
+ */
+ default void put(E e)
+ throws InterruptedException {
+ }
+ /**
+ * Retrieves and removes the head of this queue, waiting if necessary
+ * until an element becomes available.
+ *
+ * @return the head of this queue
+ * @throws InterruptedException if interrupted while waiting
+ */
+ default E take()
+ throws InterruptedException {
+ return null;
+ }
+ /**
+ * Retrieves and removes the head of this queue, or returns {@code
+ * null} if this queue is empty.
+ *
+ * @return the head of this queue, or {@code null} if this queue is empty
+ */
+ default E poll() {
+ return null;
+ }
+ /**
+ * Inserts the specified element into this queue if it is possible to do
+ * so immediately without violating capacity restrictions, returning
+ * {@code true} upon success and {@code false} if no space is currently
+ * available.
+ *
+ * @return {@code true} if the element was added to this queue, else
+ * {@code false}
+ */
+ default boolean offer(E e) {
+ return false;
+ }
+ /**
+ * Returns true if this queue contains no elements, else false.
+ *
+ * @return true if this queue contains no elements, else false.
+ */
+ boolean isEmpty();
+ /**
+ * Returns true if this queue is full, else false.
+ *
+ * @return true if this queue is full, else false.
+ */
+ boolean isFull();
+ /**
+ * Returns the number of elements in this queue.
+ *
+ * @return the number of elements in this collection
+ */
+ int size();
+package edu.vandy;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+ * Implements the BoundedQueue interface that works properly when
+ * accessed via multiple threads since it's synchronized properly, but
+ * is inefficient since due to its "busy waiting".
+ */
+class BusySynchronizedQueue
+ implements BoundedQueue {
+ /**
+ * The queue consists of a LinkedList of E's.
+ */
+ private List mList = new LinkedList<>();
+ /**
+ * The maximum capacity of the queue or Integer.MAX_VALUE if none.
+ */
+ private final int mCapacity;
+ /**
+ * Create a SimpleBlocking queue with a capacity of
+ * Integer.MAX_VALUE.
+ */
+ public BusySynchronizedQueue() {
+ this(Integer.MAX_VALUE);
+ }
+ /**
+ * Create a SimpleBlocking queue with the given capacity.
+ */
+ public BusySynchronizedQueue(int capacity) {
+ if (capacity <= 0)
+ throw new IllegalArgumentException();
+ mCapacity = capacity;
+ mList = new LinkedList<>();
+ }
+ /**
+ * Returns true if this queue contains no elements, else false.
+ *
+ * @return true if this queue contains no elements, else false.
+ */
+ @Override
+ public boolean isEmpty() {
+ synchronized(this) {
+ return mList.size() == 0;
+ }
+ }
+ /**
+ * Returns true if this queue is full, else false.
+ *
+ * @return true if this queue is full, else false.
+ */
+ @Override
+ public boolean isFull() {
+ synchronized(this) {
+ return mList.size() == mCapacity;
+ }
+ }
+ /**
+ * Returns the number of elements in this queue.
+ *
+ * @return the number of elements in this collection
+ */
+ @Override
+ public int size() {
+ synchronized(this) {
+ return mList.size();
+ }
+ }
+ /**
+ * Retrieves and removes the head of this queue, or returns {@code
+ * null} if this queue is empty.
+ *
+ * @return the head of this queue, or {@code null} if this queue is empty
+ */
+ @Override
+ public E poll() {
+ synchronized(this) {
+ if (!isEmpty())
+ return mList.remove(0);
+ else
+ return null;
+ }
+ }
+ /**
+ * Inserts the specified element into this queue if it is possible to do
+ * so immediately without violating capacity restrictions, returning
+ * {@code true} upon success and {@code false} if no space is currently
+ * available.
+ *
+ * @return {@code true} if the element was added to this queue, else
+ * {@code false}
+ */
+ @Override
+ public boolean offer(E e) {
+ synchronized (this) {
+ if (!isFull()) {
+ mList.add(e);
+ return true;
+ } else
+ return false;
+ }
+ }
+package edu.vandy;
+import static org.junit.Assert.assertEquals;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Test;
+ * Test program for the BusySynchronizedQueue that is inefficient due
+ * to its use of "busy waiting".
+ */
+public class BusySynchronizedQueueTest {
+ /**
+ * Maximum number of iterations.
+ */
+ private final static int mMaxIterations = 100000;
+ /**
+ * Maximum size of the queue.
+ */
+ private final static int sQUEUE_SIZE = 10;
+ /**
+ * Count the number of iterations.
+ */
+ private final static AtomicInteger mCount =
+ new AtomicInteger(0);
+ /**
+ * This producer runs in a separate Java thread and passes strings
+ * to a consumer thread via a shared BoundedBlockingQueue.
+ */
+ private static class Producer>
+ implements Runnable {
+ /**
+ * This queue is shared with the consumer.
+ */
+ private final BQ mQueue;
+ /**
+ * Constructor initializes the @a blockingQueue field.
+ */
+ Producer(BQ blockingQueue) {
+ mQueue = blockingQueue;
+ }
+ /**
+ * This method runs in a Java thread and passes strings to a
+ * consumer thread via a shared BoundedBlockingQueue.
+ */
+ public void run() {
+ for(int i = 0; i < mMaxIterations; ) {
+ // Calls the offer() method.
+ if (mQueue.offer(i)) {
+ i++;
+ mCount.incrementAndGet();
+ }
+ }
+ }
+ }
+ /**
+ * This consumer runs in a Java thread and receives strings from a
+ * producer thread via a shared BoundedBlockingQueue.
+ */
+ private static class Consumer>
+ implements Runnable {
+ /**
+ * This queue is shared with the producer.
+ */
+ private final BQ mQueue;
+ /**
+ * Constructor initializes the @a blockingQueue data member.
+ */
+ Consumer(BQ blockingQueue) {
+ mQueue = blockingQueue;
+ }
+ /**
+ * This method runs in a Java thread and receives integers
+ * from a producer thread via a shared BoundedBlockingQueue.
+ */
+ public void run() {
+ Integer integer = null;
+ int nullCount = 0;
+ // Get the first item from the queue.
+ Integer previous;
+ // Get the first non-null value.
+ while ((previous = mQueue.poll()) == null)
+ continue;
+ mCount.decrementAndGet();
+ for (int i = 1; i < mMaxIterations; ) {
+ // Try to get the next integer.
+ integer = mQueue.poll();
+ // Only update the state if we get a non-null
+ // value from take().
+ if (integer != null) {
+ if ((i % (mMaxIterations / 10)) == 0)
+ System.out.println(integer);
+ mCount.decrementAndGet();
+ i++;
+ // Make sure the entries are ordered.
+ assertEquals(previous + 1, integer.intValue());
+ previous = integer;
+ } else {
+ nullCount++;
+ // Thread.yield();
+ }
+ }
+ assertEquals(0, mCount.get());
+ System.out.println("Final size of the queue is "
+ + mQueue.size()
+ + "\nmCount is "
+ + mCount.get()
+ + "\nFinal value is "
+ + integer
+ + "\nnumber of null returns from take() is "
+ + nullCount
+ + "\nmCount + nullCount is "
+ + (mCount.get() + nullCount));
+ }
+ }
+ /**
+ * Main entry point that tests the BusySynchronizedQueue class.
+ */
+ @Test
+ public void testBusySynchronizedQueue() {
+ final BusySynchronizedQueue busySynchronizedQueue =
+ new BusySynchronizedQueue<>(sQUEUE_SIZE);
+ try {
+ // Create producer and consumer threads.
+ Thread[] threads = new Thread[] {
+ new Thread(new Producer<>(busySynchronizedQueue)),
+ new Thread(new Consumer<>(busySynchronizedQueue))
+ };
+ // Record the start time.
+ long startTime = System.nanoTime();
+ // Start all the threads.
+ for (Thread thread : threads)
+ thread.start();
+ // Wait for all threads to stop.
+ for (Thread thread : threads)
+ thread.join();
+ System.out.println("test ran in "
+ + (System.nanoTime() - startTime) / 1_000_000
+ + " msecs");
+ } catch (Exception e) {
+ System.out.println("caught exception");
+ }
+ }
- SimpleBlockingQueue
- org.eclipse.jdt.core.javabuilder
- org.eclipse.jdt.core.javanature
+group 'livelessons'
+version '1.0-SNAPSHOT'
+apply plugin: 'java'
+targetCompatibility = 1.8
+sourceCompatibility = 1.8
+repositories {
+ mavenCentral()
+dependencies {
+ testCompile group: 'junit', name: 'junit', version: '4.11'
-import java.util.List;
-import java.util.concurrent.CyclicBarrier;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
- * @class SimpleBlockingQueue
- *
- * @brief Defines an implementation of the BlockingQueue interface
- * that works properly when accessed via multiple threads since
- * it's synchronized properly.
- */
-class SimpleBlockingQueue implements BlockingQueue {
- /**
- * The queue consists of a List of E's.
- */
- final private List mList;
- /**
- * The maximum capacity of the queue or Integer.MAX_VALUE if none.
- */
- private final int mCapacity;
- /**
- * Create a SimpleBlocking queue with a capacity of
- * Integer.MAX_VALUE.
- */
- public SimpleBlockingQueue() {
- this(Integer.MAX_VALUE);
- }
- /**
- * Create a SimpleBlocking queue with the given capacity.
- */
- public SimpleBlockingQueue(int capacity) {
- if (capacity <= 0)
- throw new IllegalArgumentException();
- mCapacity = capacity;
- mList = new ArrayList();
- }
- /**
- * Add a new E to the end of the queue, blocking if necessary for
- * space to become available.
- */
- public void put(E e) throws InterruptedException {
- synchronized(this) {
- if (e == null)
- throw new NullPointerException();
- // Wait until the queue is not full.
- while (isFull()) {
- // System.out.println("BLOCKING ON PUT()");
- wait();
- }
- // Add e to the ArrayList.
- mList.add(e);
- // Notify that the queue may have changed state, e.g., "no
- // longer empty".
- notifyAll();
- }
- }
- /**
- * Remove the E at the front of the queue, blocking until there's
- * something in the queue.
- */
- public E take() throws InterruptedException {
- synchronized(this) {
- // Wait until the queue is not empty.
- while (mList.isEmpty()) {
- // System.out.println("BLOCKING ON TAKE()");
- wait();
- }
- final E e = mList.remove(0);
- // Notify that the queue may have changed state, e.g., "no
- // longer full".
- notifyAll();
- return e;
- }
- }
- /**
- * Returns the number of elements in this queue.
- */
- public int size() {
- synchronized(this) {
- return mList.size();
- }
- }
- /**
- * Returns true if the queue is empty, else false.
- */
- public boolean isEmpty() {
- synchronized(this) {
- return mList.size() == 0;
- }
- }
- /**
- * Returns true if the queue is full, else false. Since this
- * isn't a public method it assumes the monitor lock is held.
- */
- private boolean isFull() {
- return mList.size() == mCapacity;
- }
- /**
- * All these methods are inherited from the BlockingQueue
- * interface. They are defined as no-ops and their implementations
- * are left as an exercise to the reader.
- */
- public int drainTo(Collection super E> c) {
- return 0;
- }
- public int drainTo(Collection super E> c, int maxElements) {
- return 0;
- }
- public boolean contains(Object o) {
- return false;
- }
- public boolean remove(Object o) {
- return false;
- }
- public int remainingCapacity() {
- return 0;
- }
- public E poll() {
- return null;
- }
- public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- return take();
- }
- public E peek() {
- return null;
- }
- public boolean offer(E e) {
- return false;
- }
- public boolean offer(E e, long timeout, TimeUnit unit) {
- try {
- put(e);
- }
- catch (InterruptedException ex) {
- // Just swallow this exception for this simple (buggy) test.
- }
- return true;
- }
- public boolean add(E e) {
- return false;
- }
- public E element() {
- return null;
- }
- public E remove() {
- return null;
- }
- public void clear() {
- }
- public boolean retainAll(Collection> collection) {
- return false;
- }
- public boolean removeAll(Collection> collection) {
- return false;
- }
- public boolean addAll(Collection extends E> collection) {
- return false;
- }
- public boolean containsAll(Collection> collection) {
- return false;
- }
- public Object[] toArray() {
- return null;
- }
- public T[] toArray(T[] array) {
- return null;
- }
- public Iterator iterator() {
- return null;
- }
-import java.util.concurrent.*;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.List;
- * @class SimpleBlockingQueueTest
- *
- * @brief Test program for the SimpleBlockingQueue that induces race
- * conditions due to lack of synchronization.
- */
-public class SimpleBlockingQueueTest
- /**
- * Maximum number of iterations.
- */
- private final static int mMaxIterations = 100000;
- /**
- * Maximum size of the queue.
- */
- private final static int mQueueSize = 10;
- /**
- * Count the number of iterations.
- */
- private final static AtomicInteger mCount =
- new AtomicInteger(0);
- /**
- * @class ProducerThread
- *
- * @brief This producer runs in a separate Java Thread and passes
- * Strings to a consumer Thread via a shared BlockingQueue.
- */
- static class ProducerThread extends Thread {
- /**
- * This queue is shared with the consumer.
- */
- private final BQ mQueue;
- /**
- * Constructor initializes the BlockingQueue data
- * member.
- */
- ProducerThread(BQ blockingQueue) {
- mQueue = blockingQueue;
- }
- /**
- * This method runs in a separate Java Thread and passes
- * Strings to a consumer Thread via a shared BlockingQueue.
- */
- public void run(){
- try {
- for(int i = 0; i < mMaxIterations; i++) {
- mCount.incrementAndGet();
- // Calls the put() method.
- mQueue.put(Integer.toString(i));
- }
- } catch (InterruptedException e) {
- System.out.println("InterruptedException caught");
- }
- }
- }
- /**
- * @class ConsumerThread
- *
- * @brief This consumer runs in a separate Java Thread and
- * receives Strings from a producer Thread via a shared
- * BlockingQueue.
- */
- static class ConsumerThread extends Thread {
- /**
- * This queue is shared with the producer.
- */
- private final BQ mQueue;
- /**
- * Constructor initializes the BlockingQueue data member.
- */
- ConsumerThread(BQ blockingQueue) {
- mQueue = blockingQueue;
- }
- /**
- * This method runs in a separate Java Thread and receives
- * Strings from a producer Thread via a shared BlockingQueue.
- */
- public void run(){
- Object s = null;
- try {
- for(int i = 0; i < mMaxIterations; i++) {
- // Calls the take() method.
- s = mQueue.take();
- mCount.decrementAndGet();
- if((i % (mMaxIterations / 10)) == 0)
- System.out.println(s);
- }
- } catch (InterruptedException e) {
- System.out.println("InterruptedException caught");
- }
- System.out.println("Final size of the queue is "
- + mQueue.size()
- + "\nmCount is "
- + mCount.get()
- + "\nFinal value is "
- + s);
- }
- }
- /**
- * Main entry point that tests the SimpleBlockingQueue class.
- */
- public static void main(String argv[]) {
- final SimpleBlockingQueue simpleQueue =
- new SimpleBlockingQueue(mQueueSize);
- try {
- // Create a ProducerThread.
- Thread producer =
- new ProducerThread(simpleQueue);
- // Create a ConsumerThread.
- Thread consumer =
- new ConsumerThread(simpleQueue);
- // Run both Threads concurrently.
- producer.start();
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {}
- consumer.start();
- // Wait for both Threads to stop.
- producer.join();
- consumer.join();
- } catch (Exception e) {
- System.out.println("caught exception");
- }
- }
+package edu.vandy;
+ * Defines an interface for a bounded queue.
+ */
+public interface BoundedQueue {
+ /**
+ * Inserts the specified element into this queue, waiting if
+ * necessary for space to become available.
+ *
+ * @param e the element to add
+ * @throws InterruptedException if interrupted while waiting
+ */
+ default void put(E e)
+ throws InterruptedException {
+ }
+ /**
+ * Retrieves and removes the head of this queue, waiting if necessary
+ * until an element becomes available.
+ *
+ * @return the head of this queue
+ * @throws InterruptedException if interrupted while waiting
+ */
+ default E take()
+ throws InterruptedException {
+ return null;
+ }
+ /**
+ * Retrieves and removes the head of this queue, or returns {@code
+ * null} if this queue is empty.
+ *
+ * @return the head of this queue, or {@code null} if this queue is empty
+ */
+ default E poll() {
+ return null;
+ }
+ /**
+ * Inserts the specified element into this queue if it is possible to do
+ * so immediately without violating capacity restrictions, returning
+ * {@code true} upon success and {@code false} if no space is currently
+ * available.
+ *
+ * @return {@code true} if the element was added to this queue, else
+ * {@code false}
+ */
+ default boolean offer(E e) {
+ return false;
+ }
+ /**
+ * Returns true if this queue contains no elements, else false.
+ *
+ * @return true if this queue contains no elements, else false.
+ */
+ boolean isEmpty();
+ /**
+ * Returns true if this queue is full, else false.
+ *
+ * @return true if this queue is full, else false.
+ */
+ boolean isFull();
+ /**
+ * Returns the number of elements in this queue.
+ *
+ * @return the number of elements in this collection
+ */
+ int size();
+package edu.vandy;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+ * Defines an implementation of the BoundedQueue interface that works
+ * properly when accessed via multiple threads since it's synchronized
+ * properly.
+ */
+class SimpleBlockingBoundedQueue
+ implements BoundedQueue {
+ /**
+ * The queue consists of a LinkedList of E's.
+ */
+ final private List mList;
+ /**
+ * The maximum capacity of the queue or Integer.MAX_VALUE if none.
+ */
+ private final int mCapacity;
+ /**
+ * Create a SimpleBlocking queue with a capacity of
+ * Integer.MAX_VALUE.
+ */
+ public SimpleBlockingBoundedQueue() {
+ this(Integer.MAX_VALUE);
+ }
+ /**
+ * Create a SimpleBlocking queue with the given capacity.
+ */
+ public SimpleBlockingBoundedQueue(int capacity) {
+ if (capacity <= 0)
+ throw new IllegalArgumentException();
+ mCapacity = capacity;
+ mList = new LinkedList<>();
+ }
+ /**
+ * Inserts the specified element into this queue, waiting if
+ * necessary for space to become available.
+ *
+ * @param e the element to add
+ * @throws InterruptedException if interrupted while waiting
+ */
+ @Override
+ public void put(E e)
+ throws InterruptedException {
+ synchronized(this) {
+ if (e == null)
+ throw new NullPointerException();
+ // Wait until the queue is not full.
+ while (isFull()) {
+ // System.out.println("BLOCKING ON PUT()");
+ wait();
+ }
+ // Add e to the LinkedList.
+ mList.add(e);
+ // Notify that the queue may have changed state, e.g., "no
+ // longer empty".
+ notifyAll();
+ }
+ }
+ /**
+ * Retrieves and removes the head of this queue, waiting if necessary
+ * until an element becomes available.
+ *
+ * @return the head of this queue
+ * @throws InterruptedException if interrupted while waiting
+ */
+ @Override
+ public E take() throws InterruptedException {
+ synchronized(this) {
+ // Wait until the queue is not empty.
+ while (mList.isEmpty()) {
+ // System.out.println("BLOCKING ON TAKE()");
+ wait();
+ }
+ final E e = mList.remove(0);
+ // Notify that the queue may have changed state, e.g., "no
+ // longer full".
+ notifyAll();
+ return e;
+ }
+ }
+ /**
+ * Returns true if the queue is empty, else false.
+ */
+ @Override
+ public boolean isEmpty() {
+ synchronized(this) {
+ return mList.size() == 0;
+ }
+ }
+ /**
+ * Returns true if the queue is full, else false. Since this
+ * isn't a public method it assumes the monitor lock is held.
+ */
+ @Override
+ public boolean isFull() {
+ return mList.size() == mCapacity;
+ }
+ /**
+ * Returns the number of elements in this queue.
+ */
+ @Override
+ public int size() {
+ synchronized(this) {
+ return mList.size();
+ }
+ }
+package edu.vandy;
+import static org.junit.Assert.assertEquals;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Test;
+ * Test program for the SimpleBlockingBoundedQueue that fixes race conditions
+ * by having proper synchronization (i.e., mutual exclusion and
+ * coordination).
+ */
+public class SimpleBlockingBoundedQueueTest {
+ /**
+ * Maximum number of iterations.
+ */
+ private final static int mMaxIterations = 100000;
+ /**
+ * Maximum size of the queue.
+ */
+ private final static int sQUEUE_SIZE = 10;
+ /**
+ * Count the number of iterations.
+ */
+ private final static AtomicInteger mCount =
+ new AtomicInteger(0);
+ /**
+ * This producer runs in a separate Java thread and passes integers
+ * to a consumer thread via a shared BoundedQueue.
+ */
+ private static class Producer>
+ implements Runnable {
+ /**
+ * This queue is shared with the consumer.
+ */
+ private final BQ mQueue;
+ /**
+ * Constructor initializes the BoundedQueue data
+ * member.
+ */
+ Producer(BQ blockingQueue) {
+ mQueue = blockingQueue;
+ }
+ /**
+ * This method runs in a separate Java thread and passes
+ * integers to a consumer thread via a shared BoundedQueue.
+ */
+ public void run(){
+ try {
+ for (int i = 0; i < mMaxIterations; i++) {
+ mCount.incrementAndGet();
+ // Call the put() method.
+ mQueue.put(i);
+ }
+ } catch (InterruptedException e) {
+ System.out.println("InterruptedException caught");
+ }
+ }
+ }
+ /**
+ * This consumer runs in a separate Java thread and receives
+ * integers from a producer thread via a shared BoundedQueue.
+ */
+ private static class Consumer>
+ implements Runnable {
+ /**
+ * This queue is shared with the producer.
+ */
+ private final BQ mQueue;
+ /**
+ * Constructor initializes the BoundedQueue data member.
+ */
+ Consumer(BQ blockingQueue) {
+ mQueue = blockingQueue;
+ }
+ /**
+ * This method runs in a separate Java thread and receives
+ * integers from a producer thread[q via a shared BoundedQueue.
+ */
+ public void run() {
+ Integer integer = null;
+ try {
+ // Get the first item from the queue.
+ Integer previous = mQueue.take();
+ mCount.decrementAndGet();
+ for (int i = 1; i < mMaxIterations; ++i) {
+ // Calls the take() method.
+ integer = mQueue.take();
+ // Make sure the entries are ordered.
+ assertEquals(previous + 1, integer.intValue());
+ previous = integer;
+ if((i % (mMaxIterations / 10)) == 0)
+ System.out.println(integer);
+ mCount.decrementAndGet();
+ }
+ } catch (InterruptedException e) {
+ System.out.println("InterruptedException caught");
+ }
+ assertEquals(0, mCount.get());
+ System.out.println("Final size of the queue is "
+ + mQueue.size()
+ + "\nmCount is "
+ + mCount.get()
+ + "\nFinal value is "
+ + integer);
+ }
+ }
+ /**
+ * Main entry point that tests the SimpleBoundedQueue class.
+ */
+ @Test
+ public void testSimpleBlockingBoundedQueue() {
+ final SimpleBlockingBoundedQueue simpleQueue =
+ new SimpleBlockingBoundedQueue<>(sQUEUE_SIZE);
+ try {
+ // Create producer and consumer threads.
+ Thread[] threads = new Thread[] {
+ new Thread(new Producer<>(simpleQueue)),
+ new Thread(new Consumer<>(simpleQueue))
+ };
+ // Record the start time.
+ long startTime = System.nanoTime();
+ // Start both threads.
+ for (Thread thread : threads)
+ thread.start();
+ // Wait for both threads to stop.
+ for (Thread thread : threads)
+ thread.join();
+ System.out.println("test ran in "
+ + (System.nanoTime() - startTime) / 1_000_000
+ + " msecs");
+ } catch (Exception e) {
+ System.out.println("caught exception");
+ }
+ }