diff --git a/modules/accord b/modules/accord index 1ce7122e2e30..2eb6677ebedf 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 1ce7122e2e305f7510ec4c10c7587822c0549364 +Subproject commit 2eb6677ebedf760a87da827fa50708035d353f75 diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index e2957ce95f4f..0d411d7e81a3 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -585,7 +585,7 @@ public enum CassandraRelevantProperties TCM_SORT_REPLICA_GROUPS("cassandra.sorted_replica_groups_enabled", "true"), TCM_UNSAFE_BOOT_WITH_CLUSTERMETADATA("cassandra.unsafe_boot_with_clustermetadata", null), TCM_USE_ATOMIC_LONG_PROCESSOR("cassandra.test.use_atomic_long_processor", "false"), - TCM_USE_NO_OP_REPLICATOR("cassandra.test.use_no_op_replicator", "false"), + TCM_USE_TEST_NO_OP_REPLICATOR("cassandra.test.use_no_op_replicator", "false"), TEST_ACCORD_STORE_THREAD_CHECKS_ENABLED("cassandra.test.accord.store.thread_checks_enabled", "true"), TEST_BBFAILHELPER_ENABLED("test.bbfailhelper.enabled"), TEST_BLOB_SHARED_SEED("cassandra.test.blob.shared.seed", "42"), diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java index 4bab8d2c6403..abc68385ef40 100644 --- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java @@ -156,8 +156,8 @@ public DataSet data() TableId tableId = (TableId) view.shard().range.start().prefix(); TableMetadata tableMetadata = tableMetadata(tableId); ds.row(keyspace(tableMetadata), table(tableId, tableMetadata), sortToken(view.shard().range.start())) - .column("start_token", printToken(view.shard().range.start())) - .column("end_token", printToken(view.shard().range.end())) + .column("token_start", printToken(view.shard().range.start())) + .column("token_end", printToken(view.shard().range.end())) .column("last_started_at", approxTime.translate().toMillisSinceEpoch(view.lastStartedAtMicros() * 1000)) .column("cycle_started_at", approxTime.translate().toMillisSinceEpoch(view.cycleStartedAtMicros() * 1000)) .column("active", Objects.toString(view.active())) @@ -170,7 +170,6 @@ public DataSet data() .column("endIndex", view.cycleLength()) .column("current_splits", view.currentSplits()) .column("stopping", view.stopping()) - .column("stopping", view.stopping()) ; } return ds; @@ -204,8 +203,8 @@ public DataSet data() TableId tableId = (TableId) start.prefix(); TableMetadata tableMetadata = tableMetadata(tableId); ds.row(keyspace(tableMetadata), table(tableId, tableMetadata), sortToken(start)) - .column("start_token", printToken(start)) - .column("end_token", printToken(end)) + .column("token_start", printToken(start)) + .column("token_end", printToken(end)) .column("majority_before", entry.majorityBefore.toString()) .column("universal_before", entry.universalBefore.toString()); return ds; @@ -292,8 +291,8 @@ public DataSet data() maxConflicts.foldlWithBounds( (timestamp, ds, start, end) -> { return ds.row(keyspace(tableMetadata), table(tableId, tableMetadata), sortToken(start), commandStoreId) - .column("start_token", printToken(start)) - .column("end_token", printToken(end)) + .column("token_start", printToken(start)) + .column("token_end", printToken(end)) .column("timestamp", timestamp.toString()) ; }, @@ -508,8 +507,8 @@ public DataSet data() commandStore.unsafeGetRedundantBefore().foldl( (entry, ds) -> { ds.row(keyspace, table, sortToken(entry.range.start()), commandStoreId) - .column("start_token", printToken(entry.range.start())) - .column("end_token", printToken(entry.range.end())) + .column("token_start", printToken(entry.range.start())) + .column("token_end", printToken(entry.range.end())) .column("start_epoch", entry.startEpoch) .column("end_epoch", entry.endEpoch) .column("gc_before", entry.maxBound(GC_BEFORE).toString()) diff --git a/src/java/org/apache/cassandra/index/accord/RouteIndexFormat.java b/src/java/org/apache/cassandra/index/accord/RouteIndexFormat.java index 56be6b9eed9d..05d7db36a131 100644 --- a/src/java/org/apache/cassandra/index/accord/RouteIndexFormat.java +++ b/src/java/org/apache/cassandra/index/accord/RouteIndexFormat.java @@ -63,7 +63,7 @@ import static org.apache.cassandra.utils.Clock.Global.nowInSeconds; -// A route index consists of a few files: cintia_sorted_list, cintia_checkpoints, and metadata +// A route index consists of a few files: cintia_sorted_list, cintia_checkpoints, and // metadata stores the segement mappings and stats needed for search selection public class RouteIndexFormat { diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java b/src/java/org/apache/cassandra/journal/ActiveSegment.java index ac05f78f2c06..85035f499199 100644 --- a/src/java/org/apache/cassandra/journal/ActiveSegment.java +++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java @@ -25,8 +25,11 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.locks.LockSupport; +import java.util.function.Consumer; +import accord.utils.Invariants; import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -93,7 +96,7 @@ private ActiveSegment( static ActiveSegment create(Descriptor descriptor, Params params, KeySupport keySupport) { InMemoryIndex index = InMemoryIndex.create(keySupport); - Metadata metadata = Metadata.create(); + Metadata metadata = Metadata.empty(); return new ActiveSegment<>(descriptor, params, index, metadata, keySupport); } @@ -341,7 +344,7 @@ boolean discardUnusedTail() if ((int)prev >= next) { // already stopped allocating, might also be closed - assert buffer == null || prev == buffer.capacity() + 1; + Invariants.require(buffer == null || prev == buffer.capacity() + 1); return false; } @@ -350,6 +353,7 @@ boolean discardUnusedTail() // stopped allocating now; can only succeed once, no further allocation or discardUnusedTail can succeed endOfBuffer = (int)prev; assert buffer != null && next == buffer.capacity() + 1; + metadata.fsyncLimit((int) prev); return prev == 0; } LockSupport.parkNanos(1); @@ -456,6 +460,21 @@ void write(K id, ByteBuffer record) } } + // TODO (required): Find a better way to test unwritten allocations and/or corruption + @VisibleForTesting + void consumeBufferUnsafe(Consumer fn) + { + try + { + fn.accept(buffer); + } + finally + { + appendOp.close(); + } + } + + // Variant of write that does not allocate/return a record pointer void writeInternal(K id, ByteBuffer record) { diff --git a/src/java/org/apache/cassandra/journal/EntrySerializer.java b/src/java/org/apache/cassandra/journal/EntrySerializer.java index bf6464bb68dc..2ff680b150e7 100644 --- a/src/java/org/apache/cassandra/journal/EntrySerializer.java +++ b/src/java/org/apache/cassandra/journal/EntrySerializer.java @@ -17,20 +17,33 @@ */ package org.apache.cassandra.journal; -import java.io.EOFException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.zip.CRC32; - import accord.utils.Invariants; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Crc; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.zip.CRC32; + import static org.apache.cassandra.journal.Journal.validateCRC; +/** + * Entry format: + * + * [Total Size (4 bytes)] + * [Header (variable size)] + * [Header CRC (4 bytes)] + * [Record Data (variable size)] + * [Record CRC (4 bytes)] + */ public final class EntrySerializer { + /** + * NOTE: out buffer already contains 4 bytes specifying the position of the next allocation, which + * can be used for determining current allocation size and reading / skipping unwritten allocations. + */ static void write(K key, ByteBuffer record, KeySupport keySupport, @@ -96,48 +109,81 @@ static int tryRead(EntryHolder into, KeySupport keySupport, ByteBuffer from, int syncedOffset, - int userVersion) - throws IOException + int userVersion) throws IOException { CRC32 crc = Crc.crc32(); into.clear(); - int start = from.position(); if (from.remaining() < TypeSizes.INT_SIZE) return -1; + int start = from.position(); + + int endOffset = from.getInt(start); + // If the node was shut down abruptly, it may happen that we end up with a log that does not have an index or metadata, in + // which case we infer the last contiuous fsynced offset from + if (endOffset == 0) + { + Invariants.require(syncedOffset == Integer.MAX_VALUE, "Synced offset %d, but end offset is not set", syncedOffset, endOffset); + return -1; + } - int totalSize = from.getInt(start) - start; + int totalSize = endOffset - start; + // TODO (required): figure out when this condition can be hit. if (totalSize == 0) return -1; + Invariants.require(totalSize > 0); - if (from.remaining() < totalSize) - return handleReadException(new EOFException(), from.limit(), syncedOffset); - + try { - int headerSize = EntrySerializer.headerSize(keySupport, userVersion); - int headerCrc = readAndUpdateHeaderCrc(crc, from, headerSize); - try + if (from.remaining() < totalSize) + return handleReadException(new EOFException(), from.limit(), syncedOffset); { - validateCRC(crc, headerCrc); - } - catch (IOException e) - { - return handleReadException(e, from.position() + headerSize, syncedOffset); + int headerSize = EntrySerializer.headerSize(keySupport, userVersion); + int headerCrc = readAndUpdateHeaderCrc(crc, from, headerSize); + try + { + validateCRC(crc, headerCrc); + } + catch (IOException e) + { + return handleReadException(e, from.position() + headerSize, syncedOffset); + } + + int recordCrc = readAndUpdateRecordCrc(crc, from, start + totalSize); + try + { + validateCRC(crc, recordCrc); + } + catch (IOException e) + { + return handleReadException(e, from.position(), syncedOffset); + } } - int recordCrc = readAndUpdateRecordCrc(crc, from, start + totalSize); - try - { - validateCRC(crc, recordCrc); - } - catch (IOException e) - { - return handleReadException(e, from.position(), syncedOffset); - } + readValidated(into, from, start, keySupport, userVersion); + return totalSize; + } + catch (Crc.InvalidCrc e) + { + throw new MaybeRecoverableJournalError(totalSize, e); } + } - readValidated(into, from, start, keySupport, userVersion); - return totalSize; + public static class MaybeRecoverableJournalError extends IOException + { + public final int knownLength; + + public MaybeRecoverableJournalError(int knownLength, Throwable cause) + { + super(cause); + this.knownLength = knownLength; + } + + @Override + public String getMessage() + { + return String.format("%s knownLength %d", super.getMessage(), knownLength); + } } private static void readValidated(EntryHolder into, ByteBuffer from, int start, KeySupport keySupport, int userVersion) diff --git a/src/java/org/apache/cassandra/journal/Journal.java b/src/java/org/apache/cassandra/journal/Journal.java index e70e93b9c6e7..727360a3f0ee 100644 --- a/src/java/org/apache/cassandra/journal/Journal.java +++ b/src/java/org/apache/cassandra/journal/Journal.java @@ -18,6 +18,7 @@ package org.apache.cassandra.journal; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; import java.nio.file.FileStore; import java.util.ArrayList; @@ -29,10 +30,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; -import java.util.function.BooleanSupplier; -import java.util.function.Function; -import java.util.function.LongConsumer; -import java.util.function.Predicate; +import java.util.function.*; import java.util.zip.CRC32; import com.google.common.annotations.VisibleForTesting; @@ -52,10 +50,10 @@ import org.apache.cassandra.io.util.PathUtils; import org.apache.cassandra.journal.Segments.ReferencedSegments; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.Crc; import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.LazyToString; import org.apache.cassandra.utils.MergeIterator; import org.apache.cassandra.utils.Simulate; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -498,10 +496,17 @@ public RecordPointer asyncWrite(K id, Writer writer) } } + // TODO (require): Find a better way to test unwritten allocations and/or corruption + @VisibleForTesting + public void unsafeConsumeBytesForTesting(int entrySize, Consumer corrupt) + { + allocate(entrySize).consumeBufferUnsafe(corrupt); + } + private ActiveSegment.Allocation allocate(int entrySize) { - ActiveSegment segment = currentSegment; + ActiveSegment segment = currentSegment; ActiveSegment.Allocation alloc; while (null == (alloc = segment.allocate(entrySize))) { @@ -861,7 +866,7 @@ static void validateCRC(CRC32 crc, int readCRC) throws Crc.InvalidCrc /** * @return true if the invoking thread should continue, or false if it should terminate itself */ - boolean handleError(String message, Throwable t) + public boolean handleError(String message, Throwable t) { Params.FailurePolicy policy = params.failurePolicy(); JVMStabilityInspector.inspectJournalThrowable(t, name, policy); @@ -878,6 +883,7 @@ boolean handleError(String message, Throwable t) message = format("%s. Journal %s failure policy is %s; terminating thread.", message, name, policy); logger.error(maybeAddDiskSpaceContext(message), t); return false; + case ALLOW_UNSAFE_STARTUP: case IGNORE: message = format("%s. Journal %s failure policy is %s; ignoring excepton.", message, name, policy); logger.error(maybeAddDiskSpaceContext(message), t); @@ -965,15 +971,16 @@ public K key() return key; } + public void ensureSorted() + { + Arrays.sort(segments); + } + private void add(K key, long segment) { + Invariants.require(this.key == null || key.equals(this.key)); this.key = key; - if (size == 0 || segments[size - 1] < segment) - segments[size++] = segment; - else - Invariants.require(segments[size - 1] == segment, - "Tried to add an out-of-order segment: %d, %s", segment, - LazyToString.lazy(() -> Arrays.toString(Arrays.copyOf(segments, size)))); + segments[size++] = segment; } private void reset() @@ -982,6 +989,16 @@ private void reset() size = 0; Arrays.fill(segments, 0); } + + @Override + public String toString() + { + return "KeyRefs{" + + "segments=" + Arrays.toString(segments) + + ", key=" + key + + ", size=" + size + + '}'; + } } public class StaticSegmentKeyIterator implements CloseableIterator> @@ -997,31 +1014,37 @@ public StaticSegmentKeyIterator() for (Segment segment : segments.allSorted(true)) { StaticSegment staticSegment = (StaticSegment) segment; - Iterator iter = staticSegment.index().reader(); - Head head = new Head(staticSegment.descriptor.timestamp); - iterators.add(new Iterator<>() + iterators.add(new AbstractIterator<>() { - public boolean hasNext() - { - return iter.hasNext(); - } + final Iterator iter = staticSegment.index().reader(); + final Head head = new Head(staticSegment.descriptor.timestamp); - public Head next() + @Override + protected Head computeNext() { - head.key = iter.next(); + if (!iter.hasNext()) + return endOfData(); + + K next = iter.next(); + while (next.equals(head.key)) + { + if (!iter.hasNext()) + return endOfData(); + + next = iter.next(); + } + + Invariants.require(!next.equals(head.key), + "%s == %s", next, head.key); + head.key = next; return head; } }); } this.iterator = MergeIterator.get(iterators, - (r1, r2) -> { - int keyCmp = keySupport.compare(r1.key, r2.key); - if (keyCmp != 0) - return keyCmp; - return Long.compare(r1.segment, r2.segment); - }, - new MergeIterator.Reducer>() + (r1, r2) -> keySupport.compare(r1.key, r2.key), + new MergeIterator.Reducer<>() { final KeyRefs ret = new KeyRefs<>(segments.count()); @@ -1034,6 +1057,7 @@ public void reduce(int idx, Head head) @Override protected KeyRefs getReduced() { + ret.ensureSorted(); return ret; } diff --git a/src/java/org/apache/cassandra/journal/Metadata.java b/src/java/org/apache/cassandra/journal/Metadata.java index 2742816316d6..d921d0253705 100644 --- a/src/java/org/apache/cassandra/journal/Metadata.java +++ b/src/java/org/apache/cassandra/journal/Metadata.java @@ -41,14 +41,15 @@ final class Metadata private static final AtomicIntegerFieldUpdater recordsCountUpdater = AtomicIntegerFieldUpdater.newUpdater(Metadata.class, "recordsCount"); - static Metadata create() + static Metadata empty() { - return new Metadata(0); + return new Metadata(0, 0); } - private Metadata(int recordsCount) + private Metadata(int recordsCount, int fsyncLimit) { this.recordsCount = recordsCount; + this.fsyncLimit = fsyncLimit; } void update() @@ -56,6 +57,11 @@ void update() incrementRecordsCount(); } + void fsyncLimit(int fsyncLimit) + { + this.fsyncLimit = fsyncLimit; + } + int fsyncLimit() { return fsyncLimit; @@ -75,7 +81,9 @@ void write(DataOutputPlus out) throws IOException { CRC32 crc = Crc.crc32(); out.writeInt(recordsCount); + out.writeInt(fsyncLimit); updateChecksumInt(crc, recordsCount); + updateChecksumInt(crc, fsyncLimit); out.writeInt((int) crc.getValue()); } @@ -83,9 +91,11 @@ static Metadata read(DataInputPlus in) throws IOException { CRC32 crc = Crc.crc32(); int recordsCount = in.readInt(); + int fsyncLimit = in.readInt(); updateChecksumInt(crc, recordsCount); + updateChecksumInt(crc, fsyncLimit); validateCRC(crc, in.readInt()); - return new Metadata(recordsCount); + return new Metadata(recordsCount, fsyncLimit); } void persist(Descriptor descriptor) @@ -121,11 +131,12 @@ static Metadata load(Descriptor descriptor) static Metadata rebuild(Descriptor descriptor, KeySupport keySupport) { int recordsCount = 0; - + int fsyncLimit = 0; try (StaticSegment.SequentialReader reader = StaticSegment.sequentialReader(descriptor, keySupport, Integer.MAX_VALUE)) { while (reader.advance()) ++recordsCount; + fsyncLimit = reader.offset; } catch (JournalReadError e) { @@ -134,7 +145,7 @@ static Metadata rebuild(Descriptor descriptor, KeySupport keySupport) throw e; } - return new Metadata(recordsCount); + return new Metadata(recordsCount, fsyncLimit); } static Metadata rebuildAndPersist(Descriptor descriptor, KeySupport keySupport) diff --git a/src/java/org/apache/cassandra/journal/Params.java b/src/java/org/apache/cassandra/journal/Params.java index 452f024fbc64..29ba152e1767 100644 --- a/src/java/org/apache/cassandra/journal/Params.java +++ b/src/java/org/apache/cassandra/journal/Params.java @@ -23,7 +23,7 @@ public interface Params { enum FlushMode { BATCH, GROUP, PERIODIC } - enum FailurePolicy { STOP, STOP_JOURNAL, IGNORE, DIE } + enum FailurePolicy { STOP, STOP_JOURNAL, IGNORE, ALLOW_UNSAFE_STARTUP, DIE } /** * @return maximum segment size diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java b/src/java/org/apache/cassandra/journal/StaticSegment.java index 2dffd2bcb7c6..281ed41178e1 100644 --- a/src/java/org/apache/cassandra/journal/StaticSegment.java +++ b/src/java/org/apache/cassandra/journal/StaticSegment.java @@ -17,6 +17,15 @@ */ package org.apache.cassandra.journal; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.Closeable; +import org.apache.cassandra.utils.Throwables; +import org.apache.cassandra.utils.concurrent.Ref; +import org.apache.cassandra.utils.memory.MemoryUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; @@ -27,16 +36,6 @@ import java.util.Collection; import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.io.util.File; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.utils.Closeable; -import org.apache.cassandra.utils.Throwables; -import org.apache.cassandra.utils.concurrent.Ref; -import org.apache.cassandra.utils.memory.MemoryUtil; - /** * An immutable data segment that is no longer written to. *

@@ -80,7 +79,11 @@ static List> open(Collection descriptors, KeySu { List> segments = new ArrayList<>(descriptors.size()); for (Descriptor descriptor : descriptors) - segments.add(open(descriptor, keySupport)); + { + StaticSegment segment = open(descriptor, keySupport); + segments.add(segment); + } + return segments; } @@ -393,6 +396,8 @@ static final class SequentialReader extends Reader { super(descriptor, keySupport); this.fsyncedLimit = fsyncedLimit; + if (fsyncedLimit < buffer.limit()) + buffer.limit(fsyncedLimit); } @Override @@ -415,6 +420,20 @@ private boolean doAdvance() return eof(); buffer.position(offset + length); } + catch (EntrySerializer.MaybeRecoverableJournalError e) + { + logger.warn("Caught a recoverable journal error, skipping bytes", e); + int sizeMarker = buffer.getInt(offset); + if (e.knownLength <= Integer.BYTES || sizeMarker != offset + e.knownLength) + throw new JournalReadError(descriptor, file, e.getCause()); + + if (!areAllBytesZero(buffer, offset + Integer.BYTES, e.knownLength - Integer.BYTES)) + throw new JournalReadError(descriptor, file, e.getCause()); + + buffer.position(offset + e.knownLength); + // Recur here, as we anticipate a corrupt or incompletely written entry to be a very rare case. + return doAdvance(); + } catch (IOException e) { throw new JournalReadError(descriptor, file, e); @@ -432,6 +451,25 @@ private void reset() } } + public static boolean areAllBytesZero(ByteBuffer buffer, int start, int length) + { + int mod8 = (length/8) * 8; + // Make sure all bytes are zero + for (int i = 0; i < mod8; i += Long.BYTES) + { + long v = buffer.getLong(start + i); + if (v != 0L) + return false; + } + for (int i = mod8; i < length; i++) + { + byte v = buffer.get(start + i); + if (v != 0) + return false; + } + return true; + } + public StaticSegment.KeyOrderReader keyOrderReader() { return new StaticSegment.KeyOrderReader<>(descriptor, keySupport, index.reader()); diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java index 5253124ffee2..91d7b8ad580e 100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@ -134,7 +134,6 @@ import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.FetchCMSLog; import org.apache.cassandra.tcm.FetchPeerLog; -import org.apache.cassandra.tcm.ReconstructLogState; import org.apache.cassandra.tcm.migration.CMSInitializationResponse; import org.apache.cassandra.tcm.migration.Election; import org.apache.cassandra.tcm.migration.CMSInitializationRequest; @@ -306,8 +305,6 @@ public enum Verb TCM_DISCOVER_REQ (813, P0, rpcTimeout, INTERNAL_METADATA, () -> NoPayload.serializer, () -> Discovery.instance.requestHandler, TCM_DISCOVER_RSP ), TCM_FETCH_PEER_LOG_RSP (818, P0, shortTimeout, FETCH_METADATA, MessageSerializers::logStateSerializer, RESPONSE_HANDLER ), TCM_FETCH_PEER_LOG_REQ (819, P0, rpcTimeout, FETCH_METADATA, () -> FetchPeerLog.serializer, () -> FetchPeerLog.Handler.instance, TCM_FETCH_PEER_LOG_RSP ), - TCM_RECONSTRUCT_EPOCH_RSP (820, P0, rpcTimeout, FETCH_METADATA, MessageSerializers::logStateSerializer, () -> ResponseVerbHandler.instance ), - TCM_RECONSTRUCT_EPOCH_REQ (821, P0, rpcTimeout, FETCH_METADATA, () -> ReconstructLogState.serializer, () -> ReconstructLogState.Handler.instance, TCM_FETCH_PEER_LOG_RSP ), INITIATE_DATA_MOVEMENTS_RSP (814, P1, rpcTimeout, MISC, () -> NoPayload.serializer, RESPONSE_HANDLER ), INITIATE_DATA_MOVEMENTS_REQ (815, P1, rpcTimeout, MISC, () -> DataMovement.serializer, () -> DataMovementVerbHandler.instance, INITIATE_DATA_MOVEMENTS_RSP ), diff --git a/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java b/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java index edc9afe17f9c..82adbb6d80d7 100644 --- a/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java +++ b/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java @@ -163,20 +163,6 @@ public static LogState getLogState(Epoch since, boolean consistentFetch) return (consistentFetch ? serialLogReader : localLogReader).getLogState(since); } - /** - * Reconstructs the log state by returning a _consistent_ base snapshot of a start epoch, and - * a list of transformations between start and end. - * - * TODO: this is a rather expensive operation, and should be use sparingly. If we decide we need to - * rely on reconstructing arbitrary epochs during normal operation, we need to add a caching mechanism - * here. One more alternative is to keep a lazily-initialized AccordTopology table on CMS nodes for a - * number of recent epochs, and keep a node-local cache of this table on other nodes. - */ - public static LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot) - { - return serialLogReader.getLogState(start, end, includeSnapshot); - } - public static class DistributedTableLogReader implements LogReader { private final ConsistencyLevel consistencyLevel; diff --git a/src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java b/src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java index 18f3de0633ba..954ae2a46ca6 100644 --- a/src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java +++ b/src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java @@ -89,6 +89,10 @@ boolean considerWritingKey() abstract void finishAndAddWriter(); abstract Throwable cleanupWriter(Throwable t); + // Only valid in the scope of a single `compact` call + private JournalKey prevKey; + private DecoratedKey prevDecoratedKey; + @Override public Collection> compact(Collection> segments) { @@ -191,20 +195,22 @@ public Collection> compact(Collection serializer, long descriptor, int offset) throws IOException { if (builder != null) { DecoratedKey decoratedKey = AccordKeyspace.JournalColumns.decorate(key); - Invariants.requireArgument(prevKey == null || ((decoratedKey.compareTo(prevDecoratedKey) >= 0 ? 1 : -1) == (JournalKey.SUPPORT.compare(key, prevKey) >= 0 ? 1 : -1)), + Invariants.requireArgument(prevKey == null || normalize(decoratedKey.compareTo(prevDecoratedKey)) == normalize(JournalKey.SUPPORT.compare(key, prevKey)), "Partition key and JournalKey didn't have matching order, which may imply a serialization issue.\n%s (%s)\n%s (%s)", key, decoratedKey, prevKey, prevDecoratedKey); prevKey = key; @@ -222,5 +228,14 @@ private void maybeWritePartition(JournalKey key, FlyweightImage builder, Flyweig writer().append(update.unfilteredIterator()); } } + + private static int normalize(int cmp) + { + if (cmp == 0) + return 0; + if (cmp < 0) + return -1; + return 1; + } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java index 4cec60f50c24..6fa8fa622477 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java @@ -219,23 +219,29 @@ public List executors() public void waitForQuiescense() { - boolean hadPending; + boolean runAgain = true; try { - do + while (true) { - hadPending = false; + boolean hasTasks = false; List> futures = new ArrayList<>(); for (AccordExecutor executor : this.executors) { - hadPending |= executor.hasTasks(); + hasTasks |= executor.hasTasks(); + hasTasks |= Stage.MUTATION.executor().getPendingTaskCount() > 0; + hasTasks |= Stage.MUTATION.executor().getActiveTaskCount() > 0; futures.add(executor.submit(() -> {})); } for (Future future : futures) future.get(); futures.clear(); + + if (!runAgain) + return; + + runAgain = hasTasks; } - while (hadPending); } catch (ExecutionException e) { diff --git a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java index bd06cb9764c1..e1177a3d37d8 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java @@ -23,6 +23,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -62,6 +63,8 @@ import org.apache.cassandra.utils.Simulate; import org.apache.cassandra.utils.concurrent.AsyncPromise; import org.apache.cassandra.utils.concurrent.Future; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.cassandra.service.accord.AccordTopology.tcmIdToAccord; import static org.apache.cassandra.utils.Simulate.With.MONITORS; @@ -70,6 +73,7 @@ @Simulate(with=MONITORS) public class AccordConfigurationService extends AbstractConfigurationService implements AccordEndpointMapper, AccordSyncPropagator.Listener, Shutdownable { + public static final Logger logger = LoggerFactory.getLogger(AccordConfigurationService.class); private final AccordSyncPropagator syncPropagator; public final WatermarkCollector watermarkCollector; @@ -427,6 +431,12 @@ private void fetchTopologyAsync(long epoch, BiConsumer consumer) + { + try (CloseableIterator> iter = journalTable.keyIterator()) + { + while (iter.hasNext()) + { + Journal.KeyRefs ref = iter.next(); + consumer.accept(ref.key()); + } + } + } + + @SuppressWarnings("unchecked") + @Override public void replay(CommandStores commandStores) { try (CloseableIterator> iter = journalTable.keyIterator()) { + JournalKey prev = null; while (iter.hasNext()) { Journal.KeyRefs ref = iter.next(); if (ref.key().type != JournalKey.Type.COMMAND_DIFF) continue; - CommandStore commandStore = commandStores.forId(ref.key().commandStoreId); Loader loader = commandStore.loader(); - AsyncChains.getUnchecked(loader.load(ref.key().id) - .map(command -> { - if (journalTable.shouldIndex(ref.key()) - && command.participants() != null - && command.participants().route() != null) - { - ref.segments(segment -> { - journalTable.safeNotify(index -> index.update(segment, ref.key().commandStoreId, ref.key().id, command.participants().route())); - }); - } - return command; - }) - .beginAsResult()); + TxnId txnId = ref.key().id; + try + { + Invariants.require(prev == null || + ref.key().commandStoreId != prev.commandStoreId || + ref.key().id.compareTo(prev.id) != 0, + "duplicate key detected %s == %s", ref.key(), prev); + prev = ref.key(); + AsyncChains.getUnchecked(loader.load(txnId) + .map(command -> { + if (journalTable.shouldIndex(ref.key()) + && command.participants() != null + && command.participants().route() != null) + { + ref.segments(segment -> { + journalTable.safeNotify(index -> index.update(segment, ref.key().commandStoreId, txnId, command.participants().route())); + }); + } + return command; + }) + .beginAsResult()); + } + catch (Throwable t) + { + journal.handleError("Could not replay command " + ref.key().id, t); + } } } - catch (Throwable t) - { - throw new RuntimeException("Can not replay journal.", t); - } } public static @Nullable ByteBuffer asSerializedChange(Command before, Command after, Version userVersion) throws IOException @@ -488,6 +511,12 @@ public void unsafeSetStarted() status = Status.STARTED; } + @VisibleForTesting + public Journal unsafeGetJournal() + { + return journal; + } + @Override public RangeSearcher rangeSearcher() { diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java index 5b718aaa88b0..b8f7afa725d2 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java @@ -217,23 +217,18 @@ public void accept(long segment, int position, K key, ByteBuffer buffer, int use } } + // TODO (expected): this can be removed entirely when we "flush" segments directly to sstables (but we perhaps need to be careful about the active segment) private class TableRecordConsumer implements RecordConsumer { - protected LongHashSet visited = null; - protected RecordConsumer delegate; + final LongHashSet visited; + final RecordConsumer delegate; - TableRecordConsumer(RecordConsumer delegate) + TableRecordConsumer(LongHashSet visited, RecordConsumer delegate) { + this.visited = visited; this.delegate = delegate; } - void visit(long segment) - { - if (visited == null) - visited = new LongHashSet(); - visited.add(segment); - } - boolean visited(long segment) { return visited != null && visited.contains(segment); @@ -242,34 +237,40 @@ boolean visited(long segment) @Override public void accept(long segment, int position, K key, ByteBuffer buffer, int userVersion) { - visit(segment); - delegate.accept(segment, position, key, buffer, userVersion); + if (!visited(segment)) + delegate.accept(segment, position, key, buffer, userVersion); } } private class JournalAndTableRecordConsumer implements RecordConsumer { private final K key; - private final TableRecordConsumer tableRecordConsumer; private final RecordConsumer delegate; + private LongHashSet visited; + + void visit(long segment) + { + if (visited == null) + visited = new LongHashSet(); + visited.add(segment); + } JournalAndTableRecordConsumer(K key, RecordConsumer reader) { this.key = key; - this.tableRecordConsumer = new TableRecordConsumer(reader); this.delegate = reader; } void readTable() { - readAllFromTable(key, tableRecordConsumer); + readAllFromTable(key, new TableRecordConsumer(visited, delegate)); } @Override public void accept(long segment, int position, K key, ByteBuffer buffer, int userVersion) { - if (!tableRecordConsumer.visited(segment)) //TODO (required): don't need this anymore - delegate.accept(segment, position, key, buffer, userVersion); + visit(segment); + delegate.accept(segment, position, key, buffer, userVersion); } } @@ -484,13 +485,19 @@ private TableIterator() @CheckForNull protected K computeNext() { + K ret = null; if (mergeIterator.hasNext()) { try (UnfilteredRowIterator partition = mergeIterator.next()) { - return (K) AccordKeyspace.JournalColumns.getJournalKey(partition.partitionKey()); + ret = (K) AccordKeyspace.JournalColumns.getJournalKey(partition.partitionKey()); + while (partition.hasNext()) + partition.next(); } } + + if (ret != null) + return ret; else return endOfData(); } @@ -514,10 +521,36 @@ private JournalAndTableKeyIterator() this.journalIterator = journal.staticSegmentKeyIterator(); } + K prevFromTable = null; + K prevFromJournal = null; + + @Override protected Journal.KeyRefs computeNext() { K tableKey = tableIterator.hasNext() ? tableIterator.peek() : null; - Journal.KeyRefs journalKey = journalIterator.hasNext() ? journalIterator.peek() : null; + K journalKey = journalIterator.hasNext() ? journalIterator.peek().key() : null; + + if (journalKey != null) + { + Invariants.require(prevFromJournal == null || keySupport.compare(journalKey, prevFromJournal) >= 0, // == for case where we have not consumed previous on prev iteration + "Incorrect sort order in journal segments: %s should strictrly follow %s " + this, journalKey, prevFromJournal); + prevFromJournal = journalKey; + } + else + { + prevFromJournal = null; + } + + if (tableKey != null) + { + Invariants.require(prevFromTable == null || keySupport.compare(tableKey, prevFromTable) >= 0, // == for case where we have not consumed previous on prev iteration + "Incorrect sort order in journal table: %s should strictrly follow %s " + this, tableKey, prevFromTable); + prevFromTable = tableKey; + } + else + { + prevFromTable = null; + } if (tableKey == null) return journalKey == null ? endOfData() : journalIterator.next(); @@ -525,7 +558,7 @@ protected Journal.KeyRefs computeNext() if (journalKey == null) return new Journal.KeyRefs<>(tableIterator.next()); - int cmp = keySupport.compare(tableKey, journalKey.key()); + int cmp = keySupport.compare(tableKey, journalKey); if (cmp == 0) { tableIterator.next(); @@ -550,7 +583,7 @@ public static void readBuffer(ByteBuffer buffer, Reader reader, Version userVers } catch (IOException e) { - // can only throw if serializer is buggy + // can only throw if serializer is buggy or bytes got corrupted throw new RuntimeException(e); } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java index bc7c7cf8418e..2639e184d381 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java @@ -120,7 +120,7 @@ protected AccordSafeCommand add(AccordSafeCommand safeCommand, ExclusiveCaches c @Override protected void persistFieldUpdates() { - super.persistFieldUpdates(); + // Field persistence is handled by AccordTask } protected void persistFieldUpdatesInternal(Runnable onDone) diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 6529db168383..e7b93be7a63e 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -37,6 +37,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; +import org.apache.cassandra.utils.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -397,9 +398,11 @@ else if (images.isEmpty()) // First boot, single-node cluster } WatermarkCollector.fetchAndReportWatermarksAsync(configService()); + configService.unsafeMarkTruncated(); int attempt = 0; int waitSeconds = 5; + long deadine = Clock.Global.nanoTime() + SECONDS.toNanos(60); while (true) { Epoch await = Epoch.max(Epoch.create(configService.currentEpoch()), metadata.epoch); @@ -412,6 +415,9 @@ else if (images.isEmpty()) // First boot, single-node cluster { logger.warn("Epoch {} is not ready after waiting for {} seconds", metadata.epoch, (++attempt) * waitSeconds); } + + if (Clock.Global.nanoTime() > deadine) + throw new IllegalStateException(String.format("Could not initialize epoch %s. Config service state:\n%s", await, configService.getDebugStr())); } } catch (InterruptedException e) diff --git a/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java b/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java index d124707300ab..99c280bdd35a 100644 --- a/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java +++ b/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java @@ -18,10 +18,8 @@ package org.apache.cassandra.tcm; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -32,11 +30,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import accord.utils.Invariants; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.log.LocalLog; -import org.apache.cassandra.tcm.log.LogReader; import org.apache.cassandra.tcm.log.LogState; import org.apache.cassandra.tcm.log.LogStorage; @@ -81,39 +76,6 @@ public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry retry) return log.waitForHighestConsecutive(); } - @Override - public LogState getLocalState(Epoch lowEpoch, Epoch highEpoch, boolean includeSnapshot) - { - try - { - LogReader.EntryHolder state = log.storage().getEntries(Epoch.EMPTY, highEpoch); - ClusterMetadata metadata = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); - - Iterator iter = state.iterator(); - ImmutableList.Builder rest = new ImmutableList.Builder<>(); - while (iter.hasNext()) - { - Entry current = iter.next(); - if (current.epoch.isEqualOrBefore(lowEpoch)) - metadata = current.transform.execute(metadata).success().metadata; - else - rest.add(current); - } - - return new LogState(metadata, rest.build()); - } - catch (IOException t) - { - throw new RuntimeException(t); - } - } - - @Override - public LogState getLogState(Epoch lowEpoch, Epoch highEpoch, boolean includeSnapshot, Retry retryPolicy) - { - return getLocalState(lowEpoch, highEpoch, includeSnapshot); - } - public static class InMemoryStorage implements LogStorage { private final List entries; @@ -137,7 +99,11 @@ public synchronized void append(Entry entry) @Override public synchronized LogState getLogState(Epoch startEpoch) { - return getLogState(startEpoch, Epoch.MAX); + ImmutableList.Builder builder = ImmutableList.builder(); + ClusterMetadata latest = metadataSnapshots.getLatestSnapshot(); + Epoch actualSince = latest != null && latest.epoch.isAfter(startEpoch) ? latest.epoch : startEpoch; + entries.stream().filter(e -> e.epoch.isAfter(actualSince)).forEach(builder::add); + return new LogState(latest, builder.build()); } @Override @@ -174,29 +140,6 @@ public synchronized EntryHolder getEntries(Epoch since, Epoch until) entries.stream().filter(e -> e.epoch.isAfter(since) && e.epoch.isEqualOrBefore(until)).forEach(entryHolder::add); return entryHolder; } - - public LogState getLogState(Epoch start, Epoch end) - { - EntryHolder state = getEntries(Epoch.EMPTY); - ClusterMetadata metadata = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); - Iterator iter = state.iterator(); - ImmutableList.Builder rest = new ImmutableList.Builder<>(); - while (iter.hasNext()) - { - Entry current = iter.next(); - if (current.epoch.isAfter(end)) - break; - if (current.epoch.isEqualOrBefore(start)) - { - Invariants.require(current.epoch.isDirectlyAfter(metadata.epoch)); - metadata = current.transform.execute(metadata).success().metadata; - } - else if (current.epoch.isAfter(start)) - rest.add(current); - } - - return new LogState(metadata, rest.build()); - } } public static class InMemoryMetadataSnapshots implements MetadataSnapshots diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index 15b21e1aa7d7..3272e5772b0b 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -172,16 +172,16 @@ public static State state(ClusterMetadata metadata) { log = logSpec.sync().withStorage(new AtomicLongBackedProcessor.InMemoryStorage()).createLog(); localProcessor = wrapProcessor.apply(new AtomicLongBackedProcessor(log, logSpec.isReset())); + fetchLogHandler = new FetchCMSLog.Handler((e, ignored) -> logSpec.storage().getLogState(e)); } else { log = logSpec.async().createLog(); localProcessor = wrapProcessor.apply(new PaxosBackedProcessor(log)); + fetchLogHandler = new FetchCMSLog.Handler(); } - fetchLogHandler = new FetchCMSLog.Handler(); - - Commit.Replicator replicator = CassandraRelevantProperties.TCM_USE_NO_OP_REPLICATOR.getBoolean() + Commit.Replicator replicator = CassandraRelevantProperties.TCM_USE_TEST_NO_OP_REPLICATOR.getBoolean() ? Commit.Replicator.NO_OP : new Commit.DefaultReplicator(() -> log.metadata().directory); @@ -938,18 +938,6 @@ public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry retryPolicy) return delegate().fetchLogAndWait(waitFor, retryPolicy); } - @Override - public LogState getLocalState(Epoch start, Epoch end, boolean includeSnapshot) - { - return delegate().getLocalState(start, end, includeSnapshot); - } - - @Override - public LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot, Retry retryPolicy) - { - return delegate().getLogState(start, end, includeSnapshot, retryPolicy); - } - public String toString() { return "SwitchableProcessor{" + diff --git a/src/java/org/apache/cassandra/tcm/FetchCMSLog.java b/src/java/org/apache/cassandra/tcm/FetchCMSLog.java index 71d79964692f..38ef550ba587 100644 --- a/src/java/org/apache/cassandra/tcm/FetchCMSLog.java +++ b/src/java/org/apache/cassandra/tcm/FetchCMSLog.java @@ -19,8 +19,7 @@ package org.apache.cassandra.tcm; import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; +import java.util.function.BiFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,11 +32,10 @@ import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; import org.apache.cassandra.tcm.log.LogState; import org.apache.cassandra.utils.FBUtilities; -import static org.apache.cassandra.config.DatabaseDescriptor.getCmsAwaitTimeout; - public class FetchCMSLog { public static final Serializer serializer = new Serializer(); @@ -91,16 +89,16 @@ static class Handler implements IVerbHandler * to node-local (which only relevant in cases of CMS expansions/shrinks, and can only be requested by the * CMS node that collects the highest epoch from the quorum of peers). */ - private final Supplier processor; + private final BiFunction logStateSupplier; public Handler() { - this(() -> ClusterMetadataService.instance().processor()); + this(DistributedMetadataLogKeyspace::getLogState); } - public Handler(Supplier processor) + public Handler(BiFunction logStateSupplier) { - this.processor = processor; + this.logStateSupplier = logStateSupplier; } public void doVerb(Message message) throws IOException @@ -116,13 +114,7 @@ public void doVerb(Message message) throws IOException // If both we and the other node believe it should be caught up with a linearizable read boolean consistentFetch = request.consistentFetch && !ClusterMetadataService.instance().isCurrentMember(message.from()); - Retry retry = Retry.untilElapsed(getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS), TCMMetrics.instance.fetchLogRetries); - LogState delta; - if (consistentFetch) - delta = processor.get().getLogState(message.payload.lowerBound, Epoch.MAX, false, retry); - else - delta = processor.get().getLocalState(message.payload.lowerBound, Epoch.MAX, false); - + LogState delta = logStateSupplier.apply(message.payload.lowerBound, consistentFetch); TCMMetrics.instance.cmsLogEntriesServed(message.payload.lowerBound, delta.latestEpoch()); logger.info("Responding to {}({}) with log delta: {}", message.from(), request, delta); MessagingService.instance().send(message.responseWith(delta), message.from()); diff --git a/src/java/org/apache/cassandra/tcm/FetchPeerLog.java b/src/java/org/apache/cassandra/tcm/FetchPeerLog.java index ab55dcf8f0dc..c2e16c70799c 100644 --- a/src/java/org/apache/cassandra/tcm/FetchPeerLog.java +++ b/src/java/org/apache/cassandra/tcm/FetchPeerLog.java @@ -81,9 +81,7 @@ public void doVerb(Message message) throws IOException ClusterMetadata metadata = ClusterMetadata.current(); logger.info("Received peer log fetch request {} from {}: start = {}, current = {}", request, message.from(), message.payload.start, metadata.epoch); - LogState delta = ClusterMetadataService.instance() - .processor() - .getLocalState(message.payload.start, Epoch.MAX, false); + LogState delta = ClusterMetadataService.instance().log().storage().getLogState(message.payload.start); TCMMetrics.instance.peerLogEntriesServed(message.payload.start, delta.latestEpoch()); logger.info("Responding with log delta: {}", delta); MessagingService.instance().send(message.responseWith(delta), message.from()); diff --git a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java index b3a6e318f512..38fdd306de1a 100644 --- a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java +++ b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java @@ -168,31 +168,6 @@ public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry retryPolicy) throw new ReadTimeoutException(ConsistencyLevel.QUORUM, blockFor - collected.size(), blockFor, false); } - @Override - public LogState getLocalState(Epoch start, Epoch end, boolean includeSnapshot) - { - return log.storage().getLogState(start, end, includeSnapshot); - } - - @Override - public LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot, Retry retryPolicy) - { - while (true) - { - if (Thread.currentThread().isInterrupted()) - throw new RuntimeException("Can not reconstruct during shutdown", new InterruptedException()); - try - { - return DistributedMetadataLogKeyspace.getLogState(start, end, includeSnapshot); - } - catch (RuntimeException e) // honestly best to only retry timeouts, but everything gets wrapped in a RuntimeException... - { - if (!retryPolicy.maybeSleep()) - throw new RuntimeException(String.format("Could not reconstruct range %d, %d", start.getEpoch(), end.getEpoch()), new TimeoutException()); - } - } - } - private static T unwrap(Promise promise) { if (!promise.isDone() || !promise.isSuccess()) diff --git a/src/java/org/apache/cassandra/tcm/Processor.java b/src/java/org/apache/cassandra/tcm/Processor.java index 9a27ac9e6683..e4e99aa08e24 100644 --- a/src/java/org/apache/cassandra/tcm/Processor.java +++ b/src/java/org/apache/cassandra/tcm/Processor.java @@ -18,18 +18,13 @@ package org.apache.cassandra.tcm; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.concurrent.TimeUnit; import com.codahale.metrics.Meter; -import accord.utils.Invariants; import org.apache.cassandra.metrics.TCMMetrics; import org.apache.cassandra.service.WaitStrategy; import org.apache.cassandra.tcm.log.Entry; -import org.apache.cassandra.tcm.log.LogState; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.config.DatabaseDescriptor.getCmsAwaitTimeout; @@ -103,37 +98,4 @@ default ClusterMetadata fetchLogAndWait(Epoch waitFor) } ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry retryPolicy); - - /** - * Queries node's _local_ state. It is not guaranteed to be contiguous, but can be used for restoring CMS state/ - */ - LogState getLocalState(Epoch start, Epoch end, boolean includeSnapshot); - - /** - * Queries global log state. - */ - LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot, Retry retryPolicy); - - /** - * Reconstructs - */ - default List reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry retryPolicy) - { - LogState logState = getLogState(lowEpoch, highEpoch, true, retryPolicy); - if (logState.isEmpty()) return Collections.emptyList(); - List cms = new ArrayList<>(logState.entries.size()); - - ClusterMetadata acc = logState.baseState; - cms.add(acc); - for (Entry entry : logState.entries) - { - Invariants.require(entry.epoch.isDirectlyAfter(acc.epoch), "%s should have been directly after %s", entry.epoch, acc.epoch); - Transformation.Result res = entry.transform.execute(acc); - assert res.isSuccess() : res.toString(); - acc = res.success().metadata; - cms.add(acc); - } - return cms; - } - } diff --git a/src/java/org/apache/cassandra/tcm/ReconstructLogState.java b/src/java/org/apache/cassandra/tcm/ReconstructLogState.java deleted file mode 100644 index 4dea1efeb87d..000000000000 --- a/src/java/org/apache/cassandra/tcm/ReconstructLogState.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.tcm; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; - -import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.metrics.TCMMetrics; -import org.apache.cassandra.net.IVerbHandler; -import org.apache.cassandra.net.Message; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.tcm.log.LogState; -import org.apache.cassandra.utils.FBUtilities; - -import static org.apache.cassandra.config.DatabaseDescriptor.getCmsAwaitTimeout; - -public class ReconstructLogState -{ - public static final Serializer serializer = new Serializer(); - - public final Epoch lowerBound; - public final Epoch higherBound; - public final boolean includeSnapshot; - - public ReconstructLogState(Epoch lowerBound, Epoch higherBound, boolean includeSnapshot) - { - this.lowerBound = lowerBound; - this.higherBound = higherBound; - this.includeSnapshot = includeSnapshot; - } - - static class Serializer implements IVersionedSerializer - { - - public void serialize(ReconstructLogState t, DataOutputPlus out, int version) throws IOException - { - Epoch.serializer.serialize(t.lowerBound, out); - Epoch.serializer.serialize(t.higherBound, out); - out.writeBoolean(t.includeSnapshot); - } - - public ReconstructLogState deserialize(DataInputPlus in, int version) throws IOException - { - Epoch lowerBound = Epoch.serializer.deserialize(in); - Epoch higherBound = Epoch.serializer.deserialize(in); - return new ReconstructLogState(lowerBound, higherBound, in.readBoolean()); - } - - public long serializedSize(ReconstructLogState t, int version) - { - return Epoch.serializer.serializedSize(t.lowerBound) + - Epoch.serializer.serializedSize(t.higherBound) + - TypeSizes.BOOL_SIZE; - } - } - - public static class Handler implements IVerbHandler - { - public static final Handler instance = new Handler(); - - private final Supplier processor; - - public Handler() - { - this(() -> ClusterMetadataService.instance().processor()); - } - public Handler(Supplier processor) - { - this.processor = processor; - } - public void doVerb(Message message) throws IOException - { - TCMMetrics.instance.reconstructLogStateCall.mark(); - ReconstructLogState request = message.payload; - - if (!ClusterMetadataService.instance().isCurrentMember(FBUtilities.getBroadcastAddressAndPort())) - throw new NotCMSException("This node is not in the CMS, can't generate a consistent log fetch response to " + message.from()); - - LogState result = processor.get().getLogState(request.lowerBound, request.higherBound, request.includeSnapshot, - Retry.untilElapsed(getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS), TCMMetrics.instance.fetchLogRetries)); - - MessagingService.instance().send(message.responseWith(result), message.from()); - } - } -} diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java index fe87719593ac..46de475f729b 100644 --- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java +++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java @@ -151,36 +151,6 @@ public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry retryPolicy) } } - @Override - public LogState getLocalState(Epoch start, Epoch end, boolean includeSnapshot) - { - return log.getLocalEntries(start, end, includeSnapshot); - } - - @Override - public LogState getLogState(Epoch lowEpoch, Epoch highEpoch, boolean includeSnapshot, Retry retryPolicy) - { - try - { - Promise request = new AsyncPromise<>(); - List candidates = new ArrayList<>(log.metadata().fullCMSMembers()); - sendWithCallbackAsync(request, - Verb.TCM_RECONSTRUCT_EPOCH_REQ, - new ReconstructLogState(lowEpoch, highEpoch, includeSnapshot), - new CandidateIterator(candidates), - retryPolicy); - return request.get(retryPolicy.remainingNanos(), TimeUnit.NANOSECONDS); - } - catch (InterruptedException e) - { - throw new RuntimeException("Can not reconstruct during shutdown", e); - } - catch (ExecutionException | TimeoutException e) - { - throw new RuntimeException(String.format("Could not reconstruct range %d, %d", lowEpoch.getEpoch(), highEpoch.getEpoch()), e); - } - } - public static ClusterMetadata fetchLogAndWait(CandidateIterator candidateIterator, LocalLog log) { try diff --git a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java index d062d202e64a..6036554e0d00 100644 --- a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java @@ -34,7 +34,6 @@ import org.apache.cassandra.tcm.Commit.Replicator; import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.log.LocalLog; -import org.apache.cassandra.tcm.log.LogState; import org.apache.cassandra.tcm.membership.Directory; import org.apache.cassandra.tcm.ownership.DataPlacements; import org.apache.cassandra.tcm.ownership.PlacementProvider; @@ -152,21 +151,8 @@ public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry retryPolicy) { throw new UnsupportedOperationException(); } - - @Override - public LogState getLocalState(Epoch start, Epoch end, boolean includeSnapshot) - { - throw new UnsupportedOperationException(); - } - - @Override - public LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot, Retry retryPolicy) - { - throw new UnsupportedOperationException(); - } } - public static Builder builder() { return new Builder(); diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java b/src/java/org/apache/cassandra/tcm/log/LocalLog.java index a0f50d16ad8d..425f4bdc2465 100644 --- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java +++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java @@ -361,11 +361,6 @@ public LogState getLocalEntries(Epoch since) return storage.getLogState(since, false); } - public LogState getLocalEntries(Epoch since, Epoch until, boolean includeSnapshot) - { - return storage.getLogState(since, until, includeSnapshot); - } - public ClusterMetadata waitForHighestConsecutive() { runOnce(); diff --git a/src/java/org/apache/cassandra/tcm/log/LogReader.java b/src/java/org/apache/cassandra/tcm/log/LogReader.java index 4ec597a31122..fb352dcd2265 100644 --- a/src/java/org/apache/cassandra/tcm/log/LogReader.java +++ b/src/java/org/apache/cassandra/tcm/log/LogReader.java @@ -28,8 +28,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; -import accord.utils.Invariants; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.MetadataSnapshots; @@ -121,58 +119,6 @@ else if (!allowSnapshots) } } - default LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot) - { - try - { - ClusterMetadata closestSnapshot = null; - if (includeSnapshot) - closestSnapshot = snapshots().getSnapshotBefore(start); - - // Snapshot could not be found, fetch enough epochs to reconstruct the start metadata - if (closestSnapshot == null) - { - if (includeSnapshot) - closestSnapshot = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); - ImmutableList.Builder entries = new ImmutableList.Builder<>(); - EntryHolder entryHolder = getEntries(Epoch.EMPTY, end); - for (Entry entry : entryHolder.entries) - { - if (entry.epoch.isAfter(start)) - entries.add(entry); - else if (includeSnapshot) - closestSnapshot = entry.transform.execute(closestSnapshot).success().metadata; - } - return new LogState(closestSnapshot, entries.build()); - } - else if (closestSnapshot.epoch.isBefore(start)) - { - ImmutableList.Builder entries = new ImmutableList.Builder<>(); - // start is exclusive, so use the closest snapshot - EntryHolder entryHolder = getEntries(closestSnapshot.epoch, end); - for (Entry entry : entryHolder.entries) - { - if (entry.epoch.isAfter(start)) - entries.add(entry); - else if (includeSnapshot) - closestSnapshot = entry.transform.execute(closestSnapshot).success().metadata; - } - return new LogState(closestSnapshot, entries.build()); - } - else - { - Invariants.require(closestSnapshot.epoch.isEqualOrAfter(start), - "Got %s, but requested snapshot of %s", closestSnapshot.epoch, start); - EntryHolder entryHolder = getEntries(closestSnapshot.epoch, end); - return new LogState(closestSnapshot, ImmutableList.copyOf(entryHolder.entries)); - } - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - class EntryHolder { SortedSet entries; diff --git a/src/java/org/apache/cassandra/tcm/log/LogStorage.java b/src/java/org/apache/cassandra/tcm/log/LogStorage.java index e739a8aae799..667c68cbce5c 100644 --- a/src/java/org/apache/cassandra/tcm/log/LogStorage.java +++ b/src/java/org/apache/cassandra/tcm/log/LogStorage.java @@ -56,12 +56,6 @@ public LogState getLogState(Epoch startEpoch, boolean allowSnapshots) return LogState.EMPTY; } - @Override - public LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot) - { - return LogState.EMPTY; - } - @Override public LogState getPersistedLogState() { diff --git a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java b/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java index 602f52a3a7df..ef52305e0d8b 100644 --- a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java +++ b/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java @@ -25,7 +25,6 @@ import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.ClusterMetadata; -import org.apache.cassandra.tcm.log.LogState; public class GossipProcessor implements Processor { @@ -40,16 +39,4 @@ public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry retryPolicy) { return ClusterMetadata.current(); } - - @Override - public LogState getLocalState(Epoch start, Epoch end, boolean includeSnapshot) - { - throw new IllegalStateException("Can't reconstruct log state when running in gossip mode. Enable the ClusterMetadataService with `nodetool addtocms`."); - } - - @Override - public LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot, Retry retryPolicy) - { - throw new IllegalStateException("Can't reconstruct log state when running in gossip mode. Enable the ClusterMetadataService with `nodetool addtocms`."); - } } diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java index 220c7ff9fd71..2c840d4a7d82 100644 --- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java +++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java @@ -204,7 +204,7 @@ private static void forceHeapSpaceOomMaybe(OutOfMemoryError oom) private static void inspectJournalError(Throwable t, String journalName, FailurePolicy failurePolicy) { - if (!StorageService.instance.isDaemonSetupCompleted()) + if (!StorageService.instance.isDaemonSetupCompleted() && failurePolicy != FailurePolicy.ALLOW_UNSAFE_STARTUP) { logger.error("Exiting due to error while processing journal {} during initialization.", journalName, t); killer.killCurrentJVM(t, true); diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 575482c5a06c..a74e9eb596ce 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -535,8 +535,9 @@ private SerializableConsumer receiveMessageRunnable(IMessage message) return runOnCaller -> { if (!internodeMessagingStarted) { - inInstancelogger.debug("Dropping inbound message {} to {} as internode messaging has not been started yet", - message, config().broadcastAddress()); + if (inInstancelogger != null) + inInstancelogger.debug("Dropping inbound message {} to {} as internode messaging has not been started yet", + message, config().broadcastAddress()); return; } if (message.version() > MessagingService.current_version) diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordHostReplacementTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordHostReplacementTest.java index 3d94fe226db3..ceebb3933dca 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordHostReplacementTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordHostReplacementTest.java @@ -69,7 +69,8 @@ public void hostReplace() throws IOException withRandom(rng -> { Generator schemaGen = SchemaGenerators.schemaSpecGen(KEYSPACE, "host_replace", 1000, - SchemaSpec.optionsBuilder().withTransactionalMode(transactionalModeGen.generate(rng))); + SchemaSpec.optionsBuilder().withTransactionalMode(transactionalModeGen.generate(rng)) + .withSpeculativeRetry("ALWAYS")); SchemaSpec schema = schemaGen.generate(rng); Generators.TrackingGenerator pkGen = Generators.tracking(Generators.int32(0, Math.min(schema.valueGenerators.pkPopulation(), 1000))); diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java index 45a62d3a97f2..6613add2e81b 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java @@ -756,18 +756,6 @@ public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry retryPolicy) log.append(logState); return log.waitForHighestConsecutive(); } - - @Override - public LogState getLocalState(Epoch start, Epoch end, boolean includeSnapshot) - { - return log.getLocalEntries(start, end, includeSnapshot); - } - - @Override - public LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot, Retry retryPolicy) - { - return getLocalState(start, end, includeSnapshot); - } }, (a,b) -> {}, false); diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java deleted file mode 100644 index b1b1411c5803..000000000000 --- a/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.distributed.test.log; - -import java.util.Iterator; -import java.util.concurrent.TimeUnit; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.cassandra.distributed.Cluster; -import org.apache.cassandra.distributed.test.TestBaseImpl; -import org.apache.cassandra.metrics.TCMMetrics; -import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; -import org.apache.cassandra.tcm.ClusterMetadataService; -import org.apache.cassandra.tcm.Epoch; -import org.apache.cassandra.tcm.Retry; -import org.apache.cassandra.tcm.log.Entry; -import org.apache.cassandra.tcm.log.LogState; - -import static org.apache.cassandra.config.DatabaseDescriptor.getCmsAwaitTimeout; - -public class ReconstructEpochTest extends TestBaseImpl -{ - @Test - public void logReaderTest() throws Exception - { - try (Cluster cluster = init(builder().withNodes(2).start())) - { - cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (id int primary key)")); - for (int i = 0; i < 30; i++) - { - if (i > 0 && i % 5 == 0) - cluster.get(1).runOnInstance(() -> ClusterMetadataService.instance().triggerSnapshot()); - cluster.schemaChange(withKeyspace("ALTER TABLE %s.tbl WITH comment = '" + i + "'")); - } - - cluster.get(1).runOnInstance(() -> { - for (int[] cfg : new int[][]{ new int[]{ 6, 9 }, - new int[]{ 2, 20 }, - new int[]{ 5, 5 }, - new int[]{ 15, 20 } }) - { - int start = cfg[0]; - int end = cfg[1]; - LogState logState = DistributedMetadataLogKeyspace.getLogState(Epoch.create(start), Epoch.create(end), true); - Assert.assertEquals(start, logState.baseState.epoch.getEpoch()); - Iterator iter = logState.entries.iterator(); - for (int i = start + 1; i <= end; i++) - Assert.assertEquals(i, iter.next().epoch.getEpoch()); - } - }); - - - cluster.get(2).runOnInstance(() -> { - for (int[] cfg : new int[][]{ new int[]{ 6, 9 }, - new int[]{ 2, 20 }, - new int[]{ 5, 5 }, - new int[]{ 15, 20 } }) - { - int start = cfg[0]; - int end = cfg[1]; - LogState logState = ClusterMetadataService.instance() - .processor() - .getLogState(Epoch.create(start), - Epoch.create(end), - true, - Retry.untilElapsed(getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS), TCMMetrics.instance.commitRetries)); - - Assert.assertEquals(start, logState.baseState.epoch.getEpoch()); - Iterator iter = logState.entries.iterator(); - for (int i = start + 1; i <= end; i++) - Assert.assertEquals(i, iter.next().epoch.getEpoch()); - } - }); - } - } -} diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java index 28ef23bf6271..c4a4079225ce 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/SystemKeyspaceStorageTest.java @@ -98,6 +98,7 @@ public void testLogStateQuery() throws Throwable cluster.get(1).runOnInstance(() -> deleteSnapshot(toRemoveSnapshot.getEpoch())); } } + Epoch latestSnapshot = remainingSnapshots.get(remainingSnapshots.size() - 1); Epoch lastEpoch = allEpochs.stream().max(Comparator.naturalOrder()).get(); repeat(10, () -> { repeat(100, () -> { @@ -118,14 +119,12 @@ public void testLogStateQuery() throws Throwable } else { - assertEquals(since, logState.baseState.epoch); + assertEquals(latestSnapshot, logState.baseState.epoch); start = logState.baseState.epoch; if (logState.entries.isEmpty()) // no entries, snapshot should have the same epoch as since assertEquals(since, start); else // first epoch in entries should be snapshot epoch + 1 { - if (!start.nextEpoch().equals(logState.entries.get(0).epoch)) - System.out.println(1); assertEquals(start.nextEpoch(), logState.entries.get(0).epoch); } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java b/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java index 27ddbd18be45..6853bfa03107 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java @@ -32,7 +32,6 @@ import org.apache.cassandra.tcm.Retry; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.log.Entry; -import org.apache.cassandra.tcm.log.LogState; import org.apache.cassandra.utils.concurrent.WaitQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,18 +69,6 @@ public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry retryPolicy) return delegate.fetchLogAndWait(waitFor, retryPolicy); } - @Override - public LogState getLocalState(Epoch start, Epoch end, boolean includeSnapshot) - { - return delegate.getLocalState(start, end, includeSnapshot); - } - - @Override - public LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot, Retry retryPolicy) - { - return delegate.getLogState(start, end, includeSnapshot, retryPolicy); - } - protected void waitIfPaused() { if (isPaused()) diff --git a/test/distributed/org/apache/cassandra/fuzz/topology/AccordBootstrapTest.java b/test/distributed/org/apache/cassandra/fuzz/topology/AccordBootstrapTest.java index 77aaebfab7c4..4ddcd03d45a7 100644 --- a/test/distributed/org/apache/cassandra/fuzz/topology/AccordBootstrapTest.java +++ b/test/distributed/org/apache/cassandra/fuzz/topology/AccordBootstrapTest.java @@ -77,7 +77,7 @@ public void bootstrapFuzzTest() throws Throwable HashSet downInstances = new HashSet<>(); withRandom(rng -> { - Generator schemaGen = SchemaGenerators.trivialSchema(KEYSPACE, "bootstrap_fuzz", POPULATION, + Generator schemaGen = SchemaGenerators.trivialSchema(KEYSPACE, () -> "bootstrap_fuzz", POPULATION, SchemaSpec.optionsBuilder() .addWriteTimestamps(false) .withTransactionalMode(TransactionalMode.full) diff --git a/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java b/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java new file mode 100644 index 000000000000..1e311bf8a541 --- /dev/null +++ b/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fuzz.topology; + +import accord.primitives.Range; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.test.log.FuzzTestBase; +import org.apache.cassandra.harry.SchemaSpec; +import org.apache.cassandra.harry.dsl.HistoryBuilder; +import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder; +import org.apache.cassandra.harry.execution.InJvmDTestVisitExecutor; +import org.apache.cassandra.harry.execution.QueryBuildingVisitExecutor; +import org.apache.cassandra.harry.gen.Generator; +import org.apache.cassandra.harry.gen.SchemaGenerators; +import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.consensus.TransactionalMode; +import org.junit.Test; + +import java.util.*; +import java.util.function.Supplier; + +import static org.apache.cassandra.harry.checker.TestHelper.withRandom; + +public class AccordBounceTest extends FuzzTestBase +{ + private static final int WRITES = 10; + private static final int POPULATION = 1000; + + // Test bounce in presence of unwritten allocation. + @Test + public void emptyJournalAllocationBounceTest() throws Throwable + { + try (Cluster cluster = init(builder().withNodes(1).start())) + { + withRandom(rng -> { + Generator schemaGen = SchemaGenerators.trivialSchema(KEYSPACE, new Supplier() + { + int i = 0; + + @Override + public String get() + { + return "bootstrap_fuzz" + (i++); + } + }, POPULATION, + SchemaSpec.optionsBuilder() + .addWriteTimestamps(false) + .withTransactionalMode(TransactionalMode.full) + ); + + List historyBuilders = new ArrayList<>(); + for (int i = 0; i < 10; i++) + { + SchemaSpec schema = schemaGen.generate(rng); + cluster.schemaChange(schema.compile()); + historyBuilders.add(new ReplayingHistoryBuilder(schema.valueGenerators, + hb -> InJvmDTestVisitExecutor.builder() + .consistencyLevel(ConsistencyLevel.QUORUM) + .wrapQueries(QueryBuildingVisitExecutor.WrapQueries.TRANSACTION) + .pageSizeSelector(p -> InJvmDTestVisitExecutor.PageSizeSelector.NO_PAGING) + .build(schema, hb, cluster))); + } + + for (HistoryBuilder hb : historyBuilders) + for (int pk = 0; pk < 5; pk++) + { + for (int i = 0; i < 5; i++) + hb.insert(pk); + cluster.get(1).runOnInstance(() -> { + AccordService accordService = (AccordService) AccordService.instance(); + accordService.journal().unsafeGetJournal().unsafeConsumeBytesForTesting(200, bb -> {}); + }); + } + + for (HistoryBuilder hb : historyBuilders) + for (int pk = 0; pk < 5; pk++) + hb.selectPartition(pk); + + ClusterUtils.stopUnchecked(cluster.get(1)); + cluster.get(1).startup(); + + for (HistoryBuilder hb : historyBuilders) + for (int pk = 0; pk < 5; pk++) + hb.selectPartition(pk); + }); + } + } + + + @Test + public void commandStoresBounceTest() throws Throwable + { + try (Cluster cluster = init(builder().withNodes(1).start())) + { + withRandom(rng -> { + Generator schemaGen = SchemaGenerators.trivialSchema(KEYSPACE, new Supplier() { + int i = 0; + @Override + public String get() + { + return "bootstrap_fuzz" + (i++); + } + }, POPULATION, + SchemaSpec.optionsBuilder() + .addWriteTimestamps(false) + .withTransactionalMode(TransactionalMode.full) + ); + + List historyBuilders = new ArrayList<>(); + for (int i = 0; i < 10; i++) + { + SchemaSpec schema = schemaGen.generate(rng); + cluster.schemaChange(schema.compile()); + historyBuilders.add(new ReplayingHistoryBuilder(schema.valueGenerators, + hb -> InJvmDTestVisitExecutor.builder() + .consistencyLevel(ConsistencyLevel.QUORUM) + .wrapQueries(QueryBuildingVisitExecutor.WrapQueries.TRANSACTION) + .pageSizeSelector(p -> InJvmDTestVisitExecutor.PageSizeSelector.NO_PAGING) + .build(schema, hb, cluster))); + } + + Runnable writeAndValidate = () -> { + for (HistoryBuilder hb : historyBuilders) + for (int pk = 0; pk < 10; pk++) + for (int i = 0; i < 10; i++) + hb.insert(pk); + for (HistoryBuilder hb : historyBuilders) + for (int pk = 0; pk < 10; pk++) + hb.selectPartition(pk); + }; + + // Command Stores should not be lost on bounce + Map> before = cluster.get(1).callOnInstance(() -> { + Map> m = new HashMap<>(); + AccordService.instance().node().commandStores().forEach((store, ranges) -> { + Set set = new HashSet<>(); + for (Range range : ranges.all()) + set.add(range.toString()); + m.put(store.id(), set); + }); + return m; + }); + for (int i = 0; i < 5; i++) + { + writeAndValidate.run(); + ClusterUtils.stopUnchecked(cluster.get(1)); + cluster.get(1).startup(); + + SchemaSpec schema = schemaGen.generate(rng); + cluster.schemaChange(schema.compile()); + + Map> after = cluster.get(1).callOnInstance(() -> { + Map> m = new HashMap<>(); + AccordService.instance().node().commandStores().forEach((store, ranges) -> { + Set set = new HashSet<>(); + for (Range range : ranges.all()) + set.add(range.toString()); + m.put(store.id(), set); + }); + return m; + }); + if (!before.equals(after)) + { + for (Integer k : before.keySet()) + { + if (!after.containsKey(k)) + throw new AssertionError(String.format("%d is contained only in before set with %s", k, before.get(k))); + + for (String s : before.get(k)) + { + if (!after.get(k).contains(s)) + throw new AssertionError(String.format("%d is contained in before set with %s but in after set with %s", k, before.get(k), after.get(k))); + } + } + } + before = after; + } + }); + } + } +} + diff --git a/test/distributed/org/apache/cassandra/fuzz/topology/JournalGCTest.java b/test/distributed/org/apache/cassandra/fuzz/topology/JournalGCTest.java new file mode 100644 index 000000000000..e685f3a4f9b0 --- /dev/null +++ b/test/distributed/org/apache/cassandra/fuzz/topology/JournalGCTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fuzz.topology; + +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.test.log.FuzzTestBase; +import org.apache.cassandra.harry.SchemaSpec; +import org.apache.cassandra.harry.dsl.HistoryBuilder; +import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder; +import org.apache.cassandra.harry.execution.InJvmDTestVisitExecutor; +import org.apache.cassandra.harry.execution.QueryBuildingVisitExecutor; +import org.apache.cassandra.harry.gen.Generator; +import org.apache.cassandra.harry.gen.SchemaGenerators; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.service.accord.AccordKeyspace; +import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.JournalKey; +import org.apache.cassandra.service.consensus.TransactionalMode; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.cassandra.harry.checker.TestHelper.withRandom; + +public class JournalGCTest extends FuzzTestBase +{ + private static final int POPULATION = 1000; + + @Test + public void journalGCTest() throws Throwable + { + try (Cluster cluster = init(builder().withNodes(1) + .withConfig(cfg -> cfg.set("accord.gc_delay", "1s") + .set("accord.shard_durability_target_splits", "1") + .set("accord.shard_durability_cycle", "1s") + .set("accord.global_durability_cycle", "1s")) + .start())) + { + withRandom(rng -> { + cluster.get(1).runOnInstance(() -> { + Keyspace.open(SchemaConstants.ACCORD_KEYSPACE_NAME).getColumnFamilyStore(AccordKeyspace.JOURNAL).disableAutoCompaction(); + }); + + Generator schemaGen = SchemaGenerators.trivialSchema(KEYSPACE, () -> "bootstrap_fuzz", POPULATION, + SchemaSpec.optionsBuilder() + .addWriteTimestamps(false) + .withTransactionalMode(TransactionalMode.full)); + + SchemaSpec schema = schemaGen.generate(rng); + cluster.schemaChange(schema.compile()); + HistoryBuilder history = new ReplayingHistoryBuilder(schema.valueGenerators, + hb -> InJvmDTestVisitExecutor.builder() + .consistencyLevel(ConsistencyLevel.QUORUM) + .wrapQueries(QueryBuildingVisitExecutor.WrapQueries.TRANSACTION) + .pageSizeSelector(p -> InJvmDTestVisitExecutor.PageSizeSelector.NO_PAGING) + .build(schema, hb, cluster)); + + for (int pk = 0; pk < 500; pk++) { + for (int i = 0; i < 500; i++) + history.insert(pk); + } + + cluster.get(1).runOnInstance(() -> { + ((AccordService) AccordService.instance()).journal().closeCurrentSegmentForTestingIfNonEmpty(); + ((AccordService) AccordService.instance()).journal().compactor().run(); + }); + + int before = cluster.get(1).callOnInstance(() -> { + AtomicInteger a = new AtomicInteger(); + ((AccordService) AccordService.instance()).journal().forEach((v) -> { + if (v.type == JournalKey.Type.COMMAND_DIFF) + a.incrementAndGet(); + }); + return a.get(); + }); + + Thread.sleep(10_000); + cluster.get(1).runOnInstance(() -> { + Keyspace.open(SchemaConstants.ACCORD_KEYSPACE_NAME).getColumnFamilyStore(AccordKeyspace.JOURNAL).forceMajorCompaction(); + }); + + cluster.get(1).forceCompact("system_accord", "journal"); + + int after = cluster.get(1).callOnInstance(() -> { + AtomicInteger a = new AtomicInteger(); + ((AccordService) AccordService.instance()).journal().forEach((v) -> { + if (v.type == JournalKey.Type.COMMAND_DIFF) + a.incrementAndGet(); + }); + return a.get(); + }); + Assert.assertTrue(String.format("%s should have been strictly smaller than %s", after, before), before > after); + Assert.assertEquals(0, after); + }); + } + } +} + diff --git a/test/harry/main/org/apache/cassandra/harry/SchemaSpec.java b/test/harry/main/org/apache/cassandra/harry/SchemaSpec.java index b103d82e37b4..f6f5b8bf224b 100644 --- a/test/harry/main/org/apache/cassandra/harry/SchemaSpec.java +++ b/test/harry/main/org/apache/cassandra/harry/SchemaSpec.java @@ -187,6 +187,15 @@ public String compile() shouldAppendAnd = true; } + if (options.speculativeRetry() != null) + { + appendWith.run(); + if (shouldAppendAnd) + sb.append(" AND"); + sb.append(" ").append("speculative_retry = '" + options.speculativeRetry() + "'"); + shouldAppendAnd = true; + } + if (options.disableReadRepair()) { appendWith.run(); @@ -350,6 +359,7 @@ public int hashCode() public interface Options { TransactionalMode transactionalMode(); + String speculativeRetry(); boolean addWriteTimestamps(); boolean disableReadRepair(); String compactionStrategy(); @@ -372,6 +382,7 @@ public static class OptionsBuilder implements Options private boolean ifNotExists = false; private boolean trackLts = false; private boolean compactStorage = false; + private String speculativeRetry = null; private OptionsBuilder() { @@ -382,6 +393,12 @@ public Options build() return this; } + public OptionsBuilder withSpeculativeRetry(String speculativeRetry) + { + this.speculativeRetry = speculativeRetry; + return this; + } + public OptionsBuilder withTransactionalMode(TransactionalMode mode) { this.transactionalMode = mode; @@ -394,6 +411,11 @@ public TransactionalMode transactionalMode() return transactionalMode; } + @Override + public String speculativeRetry() { + return speculativeRetry; + } + public OptionsBuilder addWriteTimestamps(boolean newValue) { this.addWriteTimestamps = newValue; diff --git a/test/harry/main/org/apache/cassandra/harry/gen/SchemaGenerators.java b/test/harry/main/org/apache/cassandra/harry/gen/SchemaGenerators.java index a3433c5b56d1..4feff7cf268c 100644 --- a/test/harry/main/org/apache/cassandra/harry/gen/SchemaGenerators.java +++ b/test/harry/main/org/apache/cassandra/harry/gen/SchemaGenerators.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.function.Supplier; import org.apache.cassandra.harry.ColumnSpec; import org.apache.cassandra.harry.SchemaSpec; @@ -98,15 +99,15 @@ public ColumnSpec generate(EntropySource rng) public static Generator trivialSchema(String ks, String table, int population) { - return trivialSchema(ks, table, population, SchemaSpec.optionsBuilder().build()); + return trivialSchema(ks, () -> table, population, SchemaSpec.optionsBuilder().build()); } - public static Generator trivialSchema(String ks, String table, int population, SchemaSpec.Options options) + public static Generator trivialSchema(String ks, Supplier table, int population, SchemaSpec.Options options) { return (rng) -> { return new SchemaSpec(rng.next(), population, - ks, table, + ks, table.get(), Arrays.asList(ColumnSpec.pk("pk1", ColumnSpec.int64Type, Generators.int64())), Arrays.asList(ColumnSpec.ck("ck1", ColumnSpec.int64Type, Generators.int64(), false)), Arrays.asList(ColumnSpec.regularColumn("v1", ColumnSpec.int64Type)), diff --git a/test/unit/org/apache/cassandra/journal/MetadataTest.java b/test/unit/org/apache/cassandra/journal/MetadataTest.java index 2ad9c14edad4..fd9ad2f2cc42 100644 --- a/test/unit/org/apache/cassandra/journal/MetadataTest.java +++ b/test/unit/org/apache/cassandra/journal/MetadataTest.java @@ -32,7 +32,7 @@ public class MetadataTest @Test public void testUpdate() { - Metadata metadata = Metadata.create(); + Metadata metadata = Metadata.empty(); metadata.update(); metadata.update(); @@ -45,7 +45,7 @@ public void testUpdate() @Test public void testWriteRead() throws IOException { - Metadata metadata = Metadata.create(); + Metadata metadata = Metadata.empty(); metadata.update(); metadata.update(); diff --git a/test/unit/org/apache/cassandra/journal/SegmentTest.java b/test/unit/org/apache/cassandra/journal/SegmentTest.java index 3c1e7fee8a06..6eb8d53ec25b 100644 --- a/test/unit/org/apache/cassandra/journal/SegmentTest.java +++ b/test/unit/org/apache/cassandra/journal/SegmentTest.java @@ -17,23 +17,22 @@ */ package org.apache.cassandra.journal; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.file.Files; - -import org.junit.Test; - import org.apache.cassandra.concurrent.ImmediateExecutor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.File; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.TimeUUID; import org.apache.cassandra.utils.concurrent.OpOrder; +import org.junit.Assert; +import org.junit.Test; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; + +import static org.apache.cassandra.harry.checker.TestHelper.withRandom; import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class SegmentTest { @@ -165,7 +164,7 @@ public void testReadClosedSegmentSequentially() throws IOException tidier.await.issue(); activeSegment.close(null); - StaticSegment.SequentialReader reader = StaticSegment.sequentialReader(descriptor, TimeUUIDKeySupport.INSTANCE, 0); + StaticSegment.SequentialReader reader = StaticSegment.sequentialReader(descriptor, TimeUUIDKeySupport.INSTANCE, activeSegment.metadata.fsyncLimit()); // read all 4 entries sequentially and compare with originals assertTrue(reader.advance()); @@ -187,6 +186,19 @@ public void testReadClosedSegmentSequentially() throws IOException assertFalse(reader.advance()); } + @Test + public void testAllZeroBytes() + { + for (int size = 1; size < 200; size++) + { + ByteBuffer buffer = ByteBuffer.allocate(size); + withRandom(rng -> { + buffer.put(rng.nextInt(buffer.limit()), (byte) 0xff); + Assert.assertFalse(StaticSegment.areAllBytesZero(buffer, 0, buffer.limit())); + }); + } + } + private static Params params() { return TestParams.INSTANCE; diff --git a/test/unit/org/apache/cassandra/tcm/BootWithMetadataTest.java b/test/unit/org/apache/cassandra/tcm/BootWithMetadataTest.java index 2f77df4f603f..6b0d1194482f 100644 --- a/test/unit/org/apache/cassandra/tcm/BootWithMetadataTest.java +++ b/test/unit/org/apache/cassandra/tcm/BootWithMetadataTest.java @@ -85,7 +85,7 @@ public void bootFromExportedMetadataTest() throws Throwable // General test setup, no need to use Paxos for log commits or to replicate // to (non-existent) peers CassandraRelevantProperties.TCM_USE_ATOMIC_LONG_PROCESSOR.setBoolean(true); - CassandraRelevantProperties.TCM_USE_NO_OP_REPLICATOR.setBoolean(true); + CassandraRelevantProperties.TCM_USE_TEST_NO_OP_REPLICATOR.setBoolean(true); ServerTestUtils.daemonInitialization(); DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); diff --git a/test/unit/org/apache/cassandra/tcm/ValidatingClusterMetadataService.java b/test/unit/org/apache/cassandra/tcm/ValidatingClusterMetadataService.java index 8c2dc5a2ecb3..75f3588d6080 100644 --- a/test/unit/org/apache/cassandra/tcm/ValidatingClusterMetadataService.java +++ b/test/unit/org/apache/cassandra/tcm/ValidatingClusterMetadataService.java @@ -22,14 +22,12 @@ import java.util.List; import java.util.TreeMap; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.tcm.log.Entry; -import org.apache.cassandra.tcm.log.LogState; import org.apache.cassandra.tcm.sequences.LockedRanges; import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializers; @@ -130,25 +128,6 @@ public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry retryPolicy) { return delegate.fetchLogAndWait(waitFor, retryPolicy); } - - @Override - public LogState getLocalState(Epoch lowEpoch, Epoch highEpoch, boolean includeSnapshot) - { - if (!epochs.containsKey(lowEpoch)) - throw new AssertionError("Unknown epoch: " + lowEpoch); - ClusterMetadata base = epochs.get(lowEpoch); - ImmutableList.Builder entries = ImmutableList.builder(); - int id = 0; - for (ClusterMetadata cm : epochs.subMap(lowEpoch, false, highEpoch, true).values()) - entries.add(new Entry(new Entry.Id(id++), cm.epoch, new MockTransformer(cm))); - return new LogState(includeSnapshot ? base : null, entries.build()); - } - - @Override - public LogState getLogState(Epoch lowEpoch, Epoch highEpoch, boolean includeSnapshot, Retry retryPolicy) - { - return getLocalState(lowEpoch, highEpoch, includeSnapshot); - } }; } diff --git a/test/unit/org/apache/cassandra/tcm/log/DistributedLogStateTest.java b/test/unit/org/apache/cassandra/tcm/log/DistributedLogStateTest.java index 656ee5551097..37cee7237347 100644 --- a/test/unit/org/apache/cassandra/tcm/log/DistributedLogStateTest.java +++ b/test/unit/org/apache/cassandra/tcm/log/DistributedLogStateTest.java @@ -102,9 +102,9 @@ public void snapshotMetadata() } @Override - public LogReader reader() + public LogState getLogState(Epoch since) { - return reader; + return reader.getLogState(since); } @Override diff --git a/test/unit/org/apache/cassandra/tcm/log/LocalStorageLogStateTest.java b/test/unit/org/apache/cassandra/tcm/log/LocalStorageLogStateTest.java index 196c69ed7ec8..5bc6ec0fa831 100644 --- a/test/unit/org/apache/cassandra/tcm/log/LocalStorageLogStateTest.java +++ b/test/unit/org/apache/cassandra/tcm/log/LocalStorageLogStateTest.java @@ -90,9 +90,9 @@ public void snapshotMetadata() throws IOException } @Override - public LogReader reader() + public LogState getLogState(Epoch since) { - return storage; + return storage.getLogState(since); } @Override diff --git a/test/unit/org/apache/cassandra/tcm/log/LogStateTestBase.java b/test/unit/org/apache/cassandra/tcm/log/LogStateTestBase.java index 5342930da03f..ffeee364a9a2 100644 --- a/test/unit/org/apache/cassandra/tcm/log/LogStateTestBase.java +++ b/test/unit/org/apache/cassandra/tcm/log/LogStateTestBase.java @@ -22,23 +22,18 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Objects; import java.util.stream.Stream; import org.junit.Before; import org.junit.Test; -import accord.utils.Gen; -import accord.utils.Gens; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.MetadataSnapshots; import org.apache.cassandra.tcm.sequences.SequencesUtils; -import org.assertj.core.api.Assertions; -import static accord.utils.Property.qt; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -51,42 +46,13 @@ public abstract class LogStateTestBase static int EXTRA_ENTRIES = 2; static Epoch CURRENT_EPOCH = Epoch.create((NUM_SNAPSHOTS * SNAPSHOT_FREQUENCY) + EXTRA_ENTRIES); static Epoch LATEST_SNAPSHOT_EPOCH = Epoch.create(NUM_SNAPSHOTS * SNAPSHOT_FREQUENCY); - private static final Gen.LongGen EPOCH_GEN = rs -> rs.nextLong(0, CURRENT_EPOCH.getEpoch()) + 1; - private static final Gen BETWEEN_GEN = rs -> { - long a = EPOCH_GEN.nextLong(rs); - long b = EPOCH_GEN.nextLong(rs); - while (b == a) - b = EPOCH_GEN.nextLong(rs); - if (b < a) - { - long tmp = a; - a = b; - b = tmp; - } - return new Between(Epoch.create(a), Epoch.create(b)); - }; - private static final Gen SNAPSHOTS_GEN = Gens.oneOf() - .add(i -> MetadataSnapshots.NO_OP) - .add(i -> throwing()) - .add(rs -> rs.nextBoolean() ? withCorruptSnapshots(LATEST_SNAPSHOT_EPOCH) : withAvailableSnapshots(LATEST_SNAPSHOT_EPOCH)) - .add(rs -> { - Epoch[] queriedEpochs = new Epoch[NUM_SNAPSHOTS]; - for (int i = 0; i < NUM_SNAPSHOTS; i++) - queriedEpochs[i] = SequencesUtils.epoch((NUM_SNAPSHOTS - i) * SNAPSHOT_FREQUENCY); - return rs.nextBoolean() ? withCorruptSnapshots(queriedEpochs) : withAvailableSnapshots(queriedEpochs); - }) - .build(); interface LogStateSUT { void cleanup() throws IOException; void insertRegularEntry() throws IOException; void snapshotMetadata() throws IOException; - LogReader reader(); - default LogState getLogState(Epoch since) - { - return reader().getLogState(since); - } + LogState getLogState(Epoch since); // just for manually checking the test data void dumpTables() throws IOException; @@ -291,47 +257,6 @@ public void sinceArbitraryEpochWithMultipleCorruptSnapshots() assertEntries(state.entries, since.nextEpoch(), CURRENT_EPOCH); } - @Test - public void getLogStateBetween() - { - qt().forAll(SNAPSHOTS_GEN, BETWEEN_GEN).check((snapshots, between) -> { - LogStateSUT sut = getSystemUnderTest(snapshots); - LogState state = sut.reader().getLogState(between.start, between.end, true); - Assertions.assertThat(state.entries).describedAs("with and without snapshot should have the same entries").isEqualTo(sut.reader().getLogState(between.start, between.end, false).entries); - Assertions.assertThat(state.baseState.epoch).isEqualTo(between.start); - - List entries = state.entries; - Assertions.assertThat(entries.size()).isEqualTo(between.end.getEpoch() - between.start.getEpoch()); - - long expected = between.start.nextEpoch().getEpoch(); - for (Entry e : entries) - { - long actual = e.epoch.getEpoch(); - Assertions.assertThat(actual).describedAs("Unexpected epoch").isEqualTo(expected); - expected++; - } - }); - } - - @Test - public void getEntriesBetween() - { - qt().forAll(SNAPSHOTS_GEN, BETWEEN_GEN).check((snapshots, between) -> { - LogStateSUT sut = getSystemUnderTest(snapshots); - LogReader.EntryHolder entries = sut.reader().getEntries(between.start, between.end); - Assertions.assertThat(entries.since).isEqualTo(between.start); - Assertions.assertThat(entries.entries.size()).isEqualTo(between.end.getEpoch() - between.start.getEpoch()); - - long expected = between.start.nextEpoch().getEpoch(); - for (Entry e : entries.entries) - { - long actual = e.epoch.getEpoch(); - Assertions.assertThat(actual).describedAs("Unexpected epoch").isEqualTo(expected); - expected++; - } - }); - } - private void assertEntries(List entries, Epoch min, Epoch max) { int idx = 0; @@ -343,39 +268,4 @@ private void assertEntries(List entries, Epoch min, Epoch max) } assertEquals(idx, entries.size()); } - - private static class Between - { - private final Epoch start, end; - - private Between(Epoch start, Epoch end) - { - this.start = start; - this.end = end; - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Between between = (Between) o; - return start.equals(between.start) && end.equals(between.end); - } - - @Override - public int hashCode() - { - return Objects.hash(start, end); - } - - @Override - public String toString() - { - return "Between{" + - "start=" + start.getEpoch() + - ", end=" + end.getEpoch() + - '}'; - } - } } diff --git a/test/unit/org/apache/cassandra/tools/ToolRunner.java b/test/unit/org/apache/cassandra/tools/ToolRunner.java index ad1f80e8127d..666cc6f91b9c 100644 --- a/test/unit/org/apache/cassandra/tools/ToolRunner.java +++ b/test/unit/org/apache/cassandra/tools/ToolRunner.java @@ -57,6 +57,7 @@ public class ToolRunner protected static final Logger logger = LoggerFactory.getLogger(ToolRunner.class); public static final ImmutableList DEFAULT_CLEANERS = ImmutableList.of("(?im)^picked up.*\\R", + "(?im)^.*Not generating a deterministic id for table.*\\R", "(?im)^.*`USE ` with prepared statements is.*\\R"); public static int runClassAsTool(String clazz, String... args)