Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] [CC Refactor #2] TableDescriptor API + CC Table Properties #4050

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.coordinatedcommits;

import static java.util.Objects.requireNonNull;

import io.delta.kernel.TableIdentifier;
import io.delta.kernel.annotation.Evolving;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
* The complete descriptor of a Coordinated Commits (CC) Delta table, including its logPath, table
* identifier, and table CC configuration.
*
* @since 3.4.0
*/
@Evolving
public class TableDescriptor {

private final String logPath;
private final Optional<TableIdentifier> tableId;
private final Map<String, String> tableConf;

public TableDescriptor(
String logPath, Optional<TableIdentifier> tableId, Map<String, String> tableConf) {
this.logPath = requireNonNull(logPath, "logPath is null");
this.tableId = requireNonNull(tableId, "tableId is null");
this.tableConf = requireNonNull(tableConf, "tableConf is null");
}

/** Returns the Delta log path of the table. */
public String getLogPath() {
return logPath;
}

/** Returns the optional table identifier of the table, e.g. $catalog / $schema / $tableName */
public Optional<TableIdentifier> getTableId() {
return tableId;
}

/**
* Returns the Coordinated Commits table configuration.
*
* <p>This is the parsed value of the Delta table property {@link
* io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_TABLE_CONF} and represents the
* properties for describing the Delta table to the commit-coordinator.
*/
public Map<String, String> getTableConf() {
return tableConf;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TableDescriptor that = (TableDescriptor) o;
return getLogPath().equals(that.getLogPath())
&& tableId.equals(that.tableId)
&& getTableConf().equals(that.getTableConf());
}

@Override
public int hashCode() {
return Objects.hash(getLogPath(), tableId, getTableConf());
}

@Override
public String toString() {
return String.format(
"TableDescriptor{logPath='%s', tableId=%s, tableConf=%s}", logPath, tableId, tableConf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,46 @@ public class TableConfig<T> {
// TableConfigs //
//////////////////

/**
* The commit coordinator name for this table. If this property is not set, the table will be
* considered a file system table and commits will be done via atomically publishing the commit
* file.
*/
public static final TableConfig<Optional<String>> COMMIT_COORDINATOR_NAME =
new TableConfig<>(
"delta.coordinatedCommits.commitCoordinator-preview",
null, /* default value */
Optional::ofNullable,
value -> true,
"Needs to be a string.",
true);

/**
* The configuration properties for the commit coordinator which is needed to build the commit
* coordinator client.
*/
public static final TableConfig<Map<String, String>> COMMIT_COORDINATOR_CONF =
new TableConfig<>(
"delta.coordinatedCommits.commitCoordinatorConf-preview",
null, /* default values */
JsonUtils::parseJSONKeyValueMap,
value -> true,
"Needs to be a string-to-string map of configuration properties.",
true);

/**
* The properties used to uniquely identify and describe this Delta table to the commit
* coordinator.
*/
public static final TableConfig<Map<String, String>> COORDINATED_COMMITS_TABLE_CONF =
new TableConfig<>(
"delta.coordinatedCommits.tableConf-preview",
null, /* default values */
JsonUtils::parseJSONKeyValueMap,
value -> true,
"Needs to be a string-to-string map of properties.",
true);

/**
* The shortest duration we have to keep logically deleted data files around before deleting them
* physically.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.coordinatedcommits

import org.scalatest.funsuite.AnyFunSuite
import io.delta.kernel.TableIdentifier
import java.util.Optional

import scala.collection.JavaConverters._

class TableDescriptorSuite extends AnyFunSuite {

test("TableDescriptor should throw NullPointerException for null constructor arguments") {
assertThrows[NullPointerException] {
new TableDescriptor(null, Optional.empty(), Map.empty[String, String].asJava)
}
assertThrows[NullPointerException] {
new TableDescriptor("/delta/logPath", null, Map.empty[String, String].asJava)
}
assertThrows[NullPointerException] {
new TableDescriptor("/delta/logPath", Optional.empty(), null)
}
}

test("TableDescriptor should return the correct logPath, tableId, and tableConf") {
val logPath = "/delta/logPath"
val tableId = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table"))
val tableConf = Map("key1" -> "value1", "key2" -> "value2").asJava

val tableDescriptor = new TableDescriptor(logPath, tableId, tableConf)

assert(tableDescriptor.getLogPath == logPath)
assert(tableDescriptor.getTableId == tableId)
assert(tableDescriptor.getTableConf == tableConf)
}

test("TableDescriptors with the same values should be equal") {
val logPath = "/delta/logPath"
val tableId = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table"))
val tableConf = Map("key1" -> "value1", "key2" -> "value2").asJava

val tableDescriptor1 = new TableDescriptor(logPath, tableId, tableConf)
val tableDescriptor2 = new TableDescriptor(logPath, tableId, tableConf)

assert(tableDescriptor1 == tableDescriptor2)
assert(tableDescriptor1.hashCode == tableDescriptor2.hashCode)
}

test("TableDescriptors with different values should not be equal") {
val logPath = "/delta/logPath"
val tableId = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table"))
val tableConf1 = Map("key1" -> "value1").asJava
val tableConf2 = Map("key1" -> "value2").asJava

val tableDescriptor1 = new TableDescriptor(logPath, tableId, tableConf1)
val tableDescriptor2 = new TableDescriptor(logPath, tableId, tableConf2)

assert(tableDescriptor1 != tableDescriptor2)
}

test("TableDescriptor toString format") {
val logPath = "/delta/logPath"
val tableId = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table"))
val tableConf = Map("key1" -> "value1").asJava

val tableDescriptor = new TableDescriptor(logPath, tableId, tableConf)
val expectedString = "TableDescriptor{logPath='/delta/logPath', " +
"tableId=Optional[TableIdentifier{catalog.schema.table}], " +
"tableConf={key1=value1}}"
assert(tableDescriptor.toString == expectedString)
}
}
Loading