Skip to content

Commit

Permalink
MINOR: Rename log dir UUIDs (apache#14517)
Browse files Browse the repository at this point in the history
After a late discussion in the voting thread for KIP-858 we
decided to improve the names for the designated reserved
log directory UUID values.

Reviewers: Christo Lolov <[email protected]>, Ismael Juma <[email protected]>,  Ziming Deng <[email protected]>.
  • Loading branch information
soarez authored Oct 30, 2023
1 parent 2736a2e commit 9dbee59
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 40 deletions.
41 changes: 5 additions & 36 deletions clients/src/main/java/org/apache/kafka/common/Uuid.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,45 +46,14 @@ public class Uuid implements Comparable<Uuid> {
*/
public static final Uuid ZERO_UUID = new Uuid(0L, 0L);

/**
* A UUID that is used to identify new or unknown dir assignments.
*/
public static final Uuid UNKNOWN_DIR = ZERO_UUID;

/**
* A UUID that is used to represent unspecified offline dirs.
*/
public static final Uuid OFFLINE_DIR = ONE_UUID;

/**
* A UUID that is used to represent and unspecified log directory,
* that is expected to have been previously selected to host an
* associated replica. This contrasts with {@code UNKNOWN_DIR},
* which is associated with (typically new) replicas that may not
* yet have been placed in any log directory.
*/
public static final Uuid SELECTED_DIR = new Uuid(0L, 2L);

/**
* The set of reserved UUIDs that will never be returned by the randomUuid method.
*/
public static final Set<Uuid> RESERVED;

static {
HashSet<Uuid> reserved = new HashSet<>(Arrays.asList(
METADATA_TOPIC_ID,
ZERO_UUID,
ONE_UUID,
UNKNOWN_DIR,
OFFLINE_DIR,
SELECTED_DIR
));
// The first 100 UUIDs are reserved for future use.
for (long i = 0L; i < 100L; i++) {
reserved.add(new Uuid(0L, i));
}
RESERVED = Collections.unmodifiableSet(reserved);
}
public static final Set<Uuid> RESERVED = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
METADATA_TOPIC_ID,
ZERO_UUID,
ONE_UUID
)));

private final long mostSignificantBits;
private final long leastSignificantBits;
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.metadata.ConfigRepository
import kafka.server._
import kafka.utils._
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException, LogDirNotFoundException}

Expand Down Expand Up @@ -287,7 +287,7 @@ class LogManager(logDirs: Seq[File],
val uuid = rawMetaProperties.directoryId match {
case Some(uuidStr) => Uuid.fromString(uuidStr)
case None =>
val uuid = Uuid.randomUuid()
val uuid = DirectoryId.random()
rawMetaProperties.directoryId = uuid.toString
metadataCheckpoint.write(rawMetaProperties.props)
uuid
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/tools/StorageTool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import kafka.utils.{Exit, Logging}
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.{append, store, storeTrue}
import net.sourceforge.argparse4j.inf.Namespace
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.{DirectoryId, Uuid}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
Expand Down Expand Up @@ -430,7 +430,7 @@ object StorageTool extends Logging {
}
val metaPropertiesPath = Paths.get(directory, KafkaServer.brokerMetaPropsFile)
val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile)
checkpoint.write(metaProperties.toPropertiesWithDirectoryId(Uuid.randomUuid().toString))
checkpoint.write(metaProperties.toPropertiesWithDirectoryId(DirectoryId.random().toString))

val bootstrapDirectory = new BootstrapDirectory(directory, Optional.empty())
bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

public class DirectoryId {

/**
* A UUID that is used to identify new or unknown dir assignments.
*/
public static final Uuid UNASSIGNED = new Uuid(0L, 0L);

/**
* A UUID that is used to represent unspecified offline dirs.
*/
public static final Uuid LOST = new Uuid(0L, 1L);

/**
* A UUID that is used to represent and unspecified log directory,
* that is expected to have been previously selected to host an
* associated replica. This contrasts with {@code UNASSIGNED_DIR},
* which is associated with (typically new) replicas that may not
* yet have been placed in any log directory.
*/
public static final Uuid MIGRATING = new Uuid(0L, 2L);

/**
* The set of reserved UUIDs that will never be returned by the random method.
*/
public static final Set<Uuid> RESERVED;

static {
HashSet<Uuid> reserved = new HashSet<>(Uuid.RESERVED);
// The first 100 UUIDs are reserved for future use.
for (long i = 0L; i < 100L; i++) {
reserved.add(new Uuid(0L, i));
}
RESERVED = Collections.unmodifiableSet(reserved);
}

/**
* Static factory to generate a directory ID.
*
* This will not generate a reserved UUID (first 100), or one whose string representation starts with a dash ("-")
*/
public static Uuid random() {
Uuid uuid = Uuid.randomUuid();
while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) {
uuid = Uuid.randomUuid();
}
return uuid;
}
}

0 comments on commit 9dbee59

Please sign in to comment.