Skip to content

Trunk accord changes #4204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ public enum CassandraRelevantProperties
TCM_SORT_REPLICA_GROUPS("cassandra.sorted_replica_groups_enabled", "true"),
TCM_UNSAFE_BOOT_WITH_CLUSTERMETADATA("cassandra.unsafe_boot_with_clustermetadata", null),
TCM_USE_ATOMIC_LONG_PROCESSOR("cassandra.test.use_atomic_long_processor", "false"),
TCM_USE_NO_OP_REPLICATOR("cassandra.test.use_no_op_replicator", "false"),
TCM_USE_TEST_NO_OP_REPLICATOR("cassandra.test.use_no_op_replicator", "false"),
TEST_ACCORD_STORE_THREAD_CHECKS_ENABLED("cassandra.test.accord.store.thread_checks_enabled", "true"),
TEST_BBFAILHELPER_ENABLED("test.bbfailhelper.enabled"),
TEST_BLOB_SHARED_SEED("cassandra.test.blob.shared.seed", "42"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ public DataSet data()
TableId tableId = (TableId) view.shard().range.start().prefix();
TableMetadata tableMetadata = tableMetadata(tableId);
ds.row(keyspace(tableMetadata), table(tableId, tableMetadata), sortToken(view.shard().range.start()))
.column("start_token", printToken(view.shard().range.start()))
.column("end_token", printToken(view.shard().range.end()))
.column("token_start", printToken(view.shard().range.start()))
.column("token_end", printToken(view.shard().range.end()))
.column("last_started_at", approxTime.translate().toMillisSinceEpoch(view.lastStartedAtMicros() * 1000))
.column("cycle_started_at", approxTime.translate().toMillisSinceEpoch(view.cycleStartedAtMicros() * 1000))
.column("active", Objects.toString(view.active()))
Expand All @@ -170,7 +170,6 @@ public DataSet data()
.column("endIndex", view.cycleLength())
.column("current_splits", view.currentSplits())
.column("stopping", view.stopping())
.column("stopping", view.stopping())
;
}
return ds;
Expand Down Expand Up @@ -204,8 +203,8 @@ public DataSet data()
TableId tableId = (TableId) start.prefix();
TableMetadata tableMetadata = tableMetadata(tableId);
ds.row(keyspace(tableMetadata), table(tableId, tableMetadata), sortToken(start))
.column("start_token", printToken(start))
.column("end_token", printToken(end))
.column("token_start", printToken(start))
.column("token_end", printToken(end))
.column("majority_before", entry.majorityBefore.toString())
.column("universal_before", entry.universalBefore.toString());
return ds;
Expand Down Expand Up @@ -292,8 +291,8 @@ public DataSet data()
maxConflicts.foldlWithBounds(
(timestamp, ds, start, end) -> {
return ds.row(keyspace(tableMetadata), table(tableId, tableMetadata), sortToken(start), commandStoreId)
.column("start_token", printToken(start))
.column("end_token", printToken(end))
.column("token_start", printToken(start))
.column("token_end", printToken(end))
.column("timestamp", timestamp.toString())
;
},
Expand Down Expand Up @@ -508,8 +507,8 @@ public DataSet data()
commandStore.unsafeGetRedundantBefore().foldl(
(entry, ds) -> {
ds.row(keyspace, table, sortToken(entry.range.start()), commandStoreId)
.column("start_token", printToken(entry.range.start()))
.column("end_token", printToken(entry.range.end()))
.column("token_start", printToken(entry.range.start()))
.column("token_end", printToken(entry.range.end()))
.column("start_epoch", entry.startEpoch)
.column("end_epoch", entry.endEpoch)
.column("gc_before", entry.maxBound(GC_BEFORE).toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@

import static org.apache.cassandra.utils.Clock.Global.nowInSeconds;

// A route index consists of a few files: cintia_sorted_list, cintia_checkpoints, and metadata
// A route index consists of a few files: cintia_sorted_list, cintia_checkpoints, and
// metadata stores the segement mappings and stats needed for search selection
public class RouteIndexFormat
{
Expand Down
23 changes: 21 additions & 2 deletions src/java/org/apache/cassandra/journal/ActiveSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;

import accord.utils.Invariants;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.concurrent.OpOrder;
Expand Down Expand Up @@ -93,7 +96,7 @@ private ActiveSegment(
static <K, V> ActiveSegment<K, V> create(Descriptor descriptor, Params params, KeySupport<K> keySupport)
{
InMemoryIndex<K> index = InMemoryIndex.create(keySupport);
Metadata metadata = Metadata.create();
Metadata metadata = Metadata.empty();
return new ActiveSegment<>(descriptor, params, index, metadata, keySupport);
}

Expand Down Expand Up @@ -341,7 +344,7 @@ boolean discardUnusedTail()
if ((int)prev >= next)
{
// already stopped allocating, might also be closed
assert buffer == null || prev == buffer.capacity() + 1;
Invariants.require(buffer == null || prev == buffer.capacity() + 1);
return false;
}

Expand All @@ -350,6 +353,7 @@ boolean discardUnusedTail()
// stopped allocating now; can only succeed once, no further allocation or discardUnusedTail can succeed
endOfBuffer = (int)prev;
assert buffer != null && next == buffer.capacity() + 1;
metadata.fsyncLimit((int) prev);
return prev == 0;
}
LockSupport.parkNanos(1);
Expand Down Expand Up @@ -456,6 +460,21 @@ void write(K id, ByteBuffer record)
}
}

// TODO (required): Find a better way to test unwritten allocations and/or corruption
@VisibleForTesting
void consumeBufferUnsafe(Consumer<ByteBuffer> fn)
{
try
{
fn.accept(buffer);
}
finally
{
appendOp.close();
}
}


// Variant of write that does not allocate/return a record pointer
void writeInternal(K id, ByteBuffer record)
{
Expand Down
108 changes: 77 additions & 31 deletions src/java/org/apache/cassandra/journal/EntrySerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,33 @@
*/
package org.apache.cassandra.journal;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.zip.CRC32;

import accord.utils.Invariants;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Crc;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.zip.CRC32;

import static org.apache.cassandra.journal.Journal.validateCRC;

/**
* Entry format:
*
* [Total Size (4 bytes)]
* [Header (variable size)]
* [Header CRC (4 bytes)]
* [Record Data (variable size)]
* [Record CRC (4 bytes)]
*/
public final class EntrySerializer
{
/**
* NOTE: out buffer already contains 4 bytes specifying the position of the next allocation, which
* can be used for determining current allocation size and reading / skipping unwritten allocations.
*/
static <K> void write(K key,
ByteBuffer record,
KeySupport<K> keySupport,
Expand Down Expand Up @@ -96,48 +109,81 @@ static <K> int tryRead(EntryHolder<K> into,
KeySupport<K> keySupport,
ByteBuffer from,
int syncedOffset,
int userVersion)
throws IOException
int userVersion) throws IOException
{
CRC32 crc = Crc.crc32();
into.clear();

int start = from.position();
if (from.remaining() < TypeSizes.INT_SIZE)
return -1;
int start = from.position();

int endOffset = from.getInt(start);
// If the node was shut down abruptly, it may happen that we end up with a log that does not have an index or metadata, in
// which case we infer the last contiuous fsynced offset from
if (endOffset == 0)
{
Invariants.require(syncedOffset == Integer.MAX_VALUE, "Synced offset %d, but end offset is not set", syncedOffset, endOffset);
return -1;
}

int totalSize = from.getInt(start) - start;
int totalSize = endOffset - start;
// TODO (required): figure out when this condition can be hit.
if (totalSize == 0)
return -1;
Invariants.require(totalSize > 0);

if (from.remaining() < totalSize)
return handleReadException(new EOFException(), from.limit(), syncedOffset);

try
{
int headerSize = EntrySerializer.headerSize(keySupport, userVersion);
int headerCrc = readAndUpdateHeaderCrc(crc, from, headerSize);
try
if (from.remaining() < totalSize)
return handleReadException(new EOFException(), from.limit(), syncedOffset);
{
validateCRC(crc, headerCrc);
}
catch (IOException e)
{
return handleReadException(e, from.position() + headerSize, syncedOffset);
int headerSize = EntrySerializer.headerSize(keySupport, userVersion);
int headerCrc = readAndUpdateHeaderCrc(crc, from, headerSize);
try
{
validateCRC(crc, headerCrc);
}
catch (IOException e)
{
return handleReadException(e, from.position() + headerSize, syncedOffset);
}

int recordCrc = readAndUpdateRecordCrc(crc, from, start + totalSize);
try
{
validateCRC(crc, recordCrc);
}
catch (IOException e)
{
return handleReadException(e, from.position(), syncedOffset);
}
}

int recordCrc = readAndUpdateRecordCrc(crc, from, start + totalSize);
try
{
validateCRC(crc, recordCrc);
}
catch (IOException e)
{
return handleReadException(e, from.position(), syncedOffset);
}
readValidated(into, from, start, keySupport, userVersion);
return totalSize;
}
catch (Crc.InvalidCrc e)
{
throw new MaybeRecoverableJournalError(totalSize, e);
}
}

readValidated(into, from, start, keySupport, userVersion);
return totalSize;
public static class MaybeRecoverableJournalError extends IOException
{
public final int knownLength;

public MaybeRecoverableJournalError(int knownLength, Throwable cause)
{
super(cause);
this.knownLength = knownLength;
}

@Override
public String getMessage()
{
return String.format("%s knownLength %d", super.getMessage(), knownLength);
}
}

private static <K> void readValidated(EntryHolder<K> into, ByteBuffer from, int start, KeySupport<K> keySupport, int userVersion)
Expand Down
Loading