Skip to content

Commit

Permalink
Merge pull request #218 from franz1981/sliced_on_heap_bb_test
Browse files Browse the repository at this point in the history
Added sliced on heap ByteBuffer test
  • Loading branch information
mikeb01 authored Mar 6, 2018
2 parents 1878c49 + e71b23e commit 383010e
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 30 deletions.
4 changes: 4 additions & 0 deletions runoffheap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ echo "Done"
echo "Running OnHeap..."
$BIN $ARGS $GCARGS -Xloggc:custom-gc.log $CPATH com.lmax.disruptor.offheap.OneToOneOnHeapThroughputTest
echo "Done"

echo "Running Sliced OnHeap..."
$BIN $ARGS $GCARGS -Xloggc:custom-gc.log $CPATH -Dsliced=true com.lmax.disruptor.offheap.OneToOneOnHeapThroughputTest
echo "Done"

Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
package com.lmax.disruptor.offheap;

import com.lmax.disruptor.*;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.LockSupport;

import com.lmax.disruptor.AbstractPerfTestDisruptor;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.DataProvider;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.Sequencer;
import com.lmax.disruptor.SingleProducerSequencer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.util.DaemonThreadFactory;

public class OneToOneOffHeapThroughputTest extends AbstractPerfTestDisruptor
{
private static final int BLOCK_SIZE = 256;
Expand Down Expand Up @@ -99,9 +91,10 @@ public static class ByteBufferHandler implements EventHandler<ByteBuffer>
@Override
public void onEvent(ByteBuffer event, long sequence, boolean endOfBatch) throws Exception
{
for (int i = 0; i < BLOCK_SIZE; i += 8)
final int start = event.position();
for (int i = start, size = start + BLOCK_SIZE; i < size; i += 8)
{
total += event.getLong();
total += event.getLong(i);
}

if (--expectedCount == 0)
Expand Down Expand Up @@ -134,7 +127,7 @@ public static class OffHeapRingBuffer implements DataProvider<ByteBuffer>
@Override
protected ByteBuffer initialValue()
{
return buffer.duplicate();
return buffer.duplicate().order(ByteOrder.nativeOrder());
}
};

Expand All @@ -143,7 +136,7 @@ public OffHeapRingBuffer(Sequencer sequencer, int entrySize)
this.sequencer = sequencer;
this.entrySize = entrySize;
this.mask = sequencer.getBufferSize() - 1;
buffer = ByteBuffer.allocateDirect(sequencer.getBufferSize() * entrySize);
buffer = ByteBuffer.allocateDirect(sequencer.getBufferSize() * entrySize).order(ByteOrder.nativeOrder());
}

public void addGatingSequences(Sequence sequence)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
package com.lmax.disruptor.offheap;

import com.lmax.disruptor.*;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.LockSupport;

import com.lmax.disruptor.AbstractPerfTestDisruptor;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.util.DaemonThreadFactory;

public class OneToOneOnHeapThroughputTest extends AbstractPerfTestDisruptor
{
private static final int BLOCK_SIZE = 256;
private static final int BUFFER_SIZE = 1024 * 1024;
private static final long ITERATIONS = 1000 * 1000 * 10L;

private static final boolean SLICED_BUFFER = Boolean.getBoolean("sliced");
private final Executor executor = Executors.newFixedThreadPool(1, DaemonThreadFactory.INSTANCE);
private final WaitStrategy waitStrategy = new YieldingWaitStrategy();
private final RingBuffer<ByteBuffer> buffer =
RingBuffer.createSingleProducer(BufferFactory.direct(BLOCK_SIZE), BUFFER_SIZE, waitStrategy);
private final ByteBufferHandler handler = new ByteBufferHandler();
RingBuffer.createSingleProducer(
SLICED_BUFFER ? SlicedBufferFactory.direct(BLOCK_SIZE, BUFFER_SIZE) : BufferFactory.direct(BLOCK_SIZE),
BUFFER_SIZE, waitStrategy);
private final ByteBufferHandler handler = new ByteBufferHandler();
private final BatchEventProcessor<ByteBuffer> processor =
new BatchEventProcessor<ByteBuffer>(buffer, buffer.newBarrier(), handler);

Expand Down Expand Up @@ -103,7 +101,7 @@ public void onEvent(ByteBuffer event, long sequence, boolean endOfBatch) throws
{
for (int i = 0; i < BLOCK_SIZE; i += 8)
{
total += event.getLong();
total += event.getLong(i);
}

if (--expectedCount == 0)
Expand Down Expand Up @@ -140,11 +138,11 @@ public ByteBuffer newInstance()
{
if (isDirect)
{
return ByteBuffer.allocateDirect(size);
return ByteBuffer.allocateDirect(size).order(ByteOrder.nativeOrder());
}
else
{
return ByteBuffer.allocate(size);
return ByteBuffer.allocate(size).order(ByteOrder.nativeOrder());
}
}

Expand All @@ -159,4 +157,51 @@ public static BufferFactory heap(int size)
return new BufferFactory(false, size);
}
}

private static final class SlicedBufferFactory implements EventFactory<ByteBuffer>
{
private final boolean isDirect;
private final int size;
private final int total;
private ByteBuffer buffer;

private SlicedBufferFactory(boolean isDirect, int size, int total)
{
this.isDirect = isDirect;
this.size = size;
this.total = total;
this.buffer =
(isDirect ? ByteBuffer.allocateDirect(size * total) : ByteBuffer.allocate(size * total))
.order(ByteOrder.nativeOrder());
this.buffer.limit(0);
}

@Override
public ByteBuffer newInstance()
{
if (this.buffer.limit() == this.buffer.capacity())
{
this.buffer =
(isDirect ? ByteBuffer.allocateDirect(size * total) : ByteBuffer.allocate(size * total))
.order(ByteOrder.nativeOrder());
this.buffer.limit(0);
}
final int limit = this.buffer.limit();
this.buffer.limit(limit + size);
this.buffer.position(limit);
final ByteBuffer slice = this.buffer.slice().order(ByteOrder.nativeOrder());
return slice;
}

public static SlicedBufferFactory direct(int size, int total)
{
return new SlicedBufferFactory(true, size, total);
}

@SuppressWarnings("unused")
public static SlicedBufferFactory heap(int size, int total)
{
return new SlicedBufferFactory(false, size, total);
}
}
}

0 comments on commit 383010e

Please sign in to comment.