diff --git a/runoffheap.sh b/runoffheap.sh index 6a3f1d760..cf175c050 100755 --- a/runoffheap.sh +++ b/runoffheap.sh @@ -14,4 +14,8 @@ echo "Done" echo "Running OnHeap..." $BIN $ARGS $GCARGS -Xloggc:custom-gc.log $CPATH com.lmax.disruptor.offheap.OneToOneOnHeapThroughputTest echo "Done" + +echo "Running Sliced OnHeap..." +$BIN $ARGS $GCARGS -Xloggc:custom-gc.log $CPATH -Dsliced=true com.lmax.disruptor.offheap.OneToOneOnHeapThroughputTest +echo "Done" diff --git a/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java b/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java index 0b90a246d..0368eaec3 100644 --- a/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java +++ b/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOffHeapThroughputTest.java @@ -1,24 +1,16 @@ package com.lmax.disruptor.offheap; +import com.lmax.disruptor.*; +import com.lmax.disruptor.util.DaemonThreadFactory; + import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.locks.LockSupport; -import com.lmax.disruptor.AbstractPerfTestDisruptor; -import com.lmax.disruptor.BatchEventProcessor; -import com.lmax.disruptor.DataProvider; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.Sequence; -import com.lmax.disruptor.SequenceBarrier; -import com.lmax.disruptor.Sequencer; -import com.lmax.disruptor.SingleProducerSequencer; -import com.lmax.disruptor.WaitStrategy; -import com.lmax.disruptor.YieldingWaitStrategy; -import com.lmax.disruptor.util.DaemonThreadFactory; - public class OneToOneOffHeapThroughputTest extends AbstractPerfTestDisruptor { private static final int BLOCK_SIZE = 256; @@ -99,9 +91,10 @@ public static class ByteBufferHandler implements EventHandler @Override public void onEvent(ByteBuffer event, long sequence, boolean endOfBatch) throws Exception { - for (int i = 0; i < BLOCK_SIZE; i += 8) + final int start = event.position(); + for (int i = start, size = start + BLOCK_SIZE; i < size; i += 8) { - total += event.getLong(); + total += event.getLong(i); } if (--expectedCount == 0) @@ -134,7 +127,7 @@ public static class OffHeapRingBuffer implements DataProvider @Override protected ByteBuffer initialValue() { - return buffer.duplicate(); + return buffer.duplicate().order(ByteOrder.nativeOrder()); } }; @@ -143,7 +136,7 @@ public OffHeapRingBuffer(Sequencer sequencer, int entrySize) this.sequencer = sequencer; this.entrySize = entrySize; this.mask = sequencer.getBufferSize() - 1; - buffer = ByteBuffer.allocateDirect(sequencer.getBufferSize() * entrySize); + buffer = ByteBuffer.allocateDirect(sequencer.getBufferSize() * entrySize).order(ByteOrder.nativeOrder()); } public void addGatingSequences(Sequence sequence) diff --git a/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java b/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java index b7e144c4a..9a6e527c2 100644 --- a/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java +++ b/src/perftest/java/com/lmax/disruptor/offheap/OneToOneOnHeapThroughputTest.java @@ -1,32 +1,30 @@ package com.lmax.disruptor.offheap; +import com.lmax.disruptor.*; +import com.lmax.disruptor.util.DaemonThreadFactory; + import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.locks.LockSupport; -import com.lmax.disruptor.AbstractPerfTestDisruptor; -import com.lmax.disruptor.BatchEventProcessor; -import com.lmax.disruptor.EventFactory; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.RingBuffer; -import com.lmax.disruptor.WaitStrategy; -import com.lmax.disruptor.YieldingWaitStrategy; -import com.lmax.disruptor.util.DaemonThreadFactory; - public class OneToOneOnHeapThroughputTest extends AbstractPerfTestDisruptor { private static final int BLOCK_SIZE = 256; private static final int BUFFER_SIZE = 1024 * 1024; private static final long ITERATIONS = 1000 * 1000 * 10L; + private static final boolean SLICED_BUFFER = Boolean.getBoolean("sliced"); private final Executor executor = Executors.newFixedThreadPool(1, DaemonThreadFactory.INSTANCE); private final WaitStrategy waitStrategy = new YieldingWaitStrategy(); private final RingBuffer buffer = - RingBuffer.createSingleProducer(BufferFactory.direct(BLOCK_SIZE), BUFFER_SIZE, waitStrategy); - private final ByteBufferHandler handler = new ByteBufferHandler(); + RingBuffer.createSingleProducer( + SLICED_BUFFER ? SlicedBufferFactory.direct(BLOCK_SIZE, BUFFER_SIZE) : BufferFactory.direct(BLOCK_SIZE), + BUFFER_SIZE, waitStrategy); + private final ByteBufferHandler handler = new ByteBufferHandler(); private final BatchEventProcessor processor = new BatchEventProcessor(buffer, buffer.newBarrier(), handler); @@ -103,7 +101,7 @@ public void onEvent(ByteBuffer event, long sequence, boolean endOfBatch) throws { for (int i = 0; i < BLOCK_SIZE; i += 8) { - total += event.getLong(); + total += event.getLong(i); } if (--expectedCount == 0) @@ -140,11 +138,11 @@ public ByteBuffer newInstance() { if (isDirect) { - return ByteBuffer.allocateDirect(size); + return ByteBuffer.allocateDirect(size).order(ByteOrder.nativeOrder()); } else { - return ByteBuffer.allocate(size); + return ByteBuffer.allocate(size).order(ByteOrder.nativeOrder()); } } @@ -159,4 +157,51 @@ public static BufferFactory heap(int size) return new BufferFactory(false, size); } } + + private static final class SlicedBufferFactory implements EventFactory + { + private final boolean isDirect; + private final int size; + private final int total; + private ByteBuffer buffer; + + private SlicedBufferFactory(boolean isDirect, int size, int total) + { + this.isDirect = isDirect; + this.size = size; + this.total = total; + this.buffer = + (isDirect ? ByteBuffer.allocateDirect(size * total) : ByteBuffer.allocate(size * total)) + .order(ByteOrder.nativeOrder()); + this.buffer.limit(0); + } + + @Override + public ByteBuffer newInstance() + { + if (this.buffer.limit() == this.buffer.capacity()) + { + this.buffer = + (isDirect ? ByteBuffer.allocateDirect(size * total) : ByteBuffer.allocate(size * total)) + .order(ByteOrder.nativeOrder()); + this.buffer.limit(0); + } + final int limit = this.buffer.limit(); + this.buffer.limit(limit + size); + this.buffer.position(limit); + final ByteBuffer slice = this.buffer.slice().order(ByteOrder.nativeOrder()); + return slice; + } + + public static SlicedBufferFactory direct(int size, int total) + { + return new SlicedBufferFactory(true, size, total); + } + + @SuppressWarnings("unused") + public static SlicedBufferFactory heap(int size, int total) + { + return new SlicedBufferFactory(false, size, total); + } + } }