Skip to content

Commit

Permalink
added a catalog API which can be extended by any external catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Mar 28, 2022
1 parent 41831ce commit 0e568ad
Show file tree
Hide file tree
Showing 40 changed files with 503 additions and 358 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2876,4 +2876,7 @@ private CarbonCommonConstants() {

public static final String CARBON_ENABLE_SCHEMA_ENFORCEMENT_DEFAULT = "true";

@CarbonProperty
public static final String CARBON_CATALOG_IMPL = "carbon.catalog.impl";

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ org.apache.carbondata.format.TableSchema fromWrapperToExternalTableSchema(
* @param tableName
* @return
*/
org.apache.carbondata.format.TableInfo fromWrapperToExternalTableInfo(TableInfo wrapperTableInfo,
String dbName, String tableName);
org.apache.carbondata.format.TableInfo fromWrapperToExternalTableInfo(TableInfo wrapperTableInfo);

/**
* @param externalSchemaEvolutionEntry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ private org.apache.carbondata.format.BucketingInfo fromWrapperToExternalBucketin
*/
@Override
public org.apache.carbondata.format.TableInfo fromWrapperToExternalTableInfo(
TableInfo wrapperTableInfo, String dbName, String tableName) {
TableInfo wrapperTableInfo) {
org.apache.carbondata.format.TableSchema thriftFactTable =
fromWrapperToExternalTableSchema(wrapperTableInfo.getFactTable());
return new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1425,7 +1425,7 @@ public List<org.apache.carbondata.format.SchemaEvolutionEntry> getSchema_evoluti
org.apache.carbondata.format.TableSchema thriftFactTable =
new org.apache.carbondata.format.TableSchema("tableId", thriftColumnSchemas, schemaEvol);
org.apache.carbondata.format.TableInfo actualResult = thriftWrapperSchemaConverter
.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName);
.fromWrapperToExternalTableInfo(wrapperTableInfo);
org.apache.carbondata.format.TableInfo expectedResult =
new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<org.apache
.carbondata.format.TableSchema>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ private TableInfo getTableInfo(String databaseName, String tableName) {
CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl();
org.apache.carbondata.format.TableInfo thriftTable = schemaConverter
.fromWrapperToExternalTableInfo(carbonTable.getTableInfo(), carbonTable.getDatabaseName(),
carbonTable.getTableName());
.fromWrapperToExternalTableInfo(carbonTable.getTableInfo());
assertTrue(null != thriftTable);
}

Expand Down Expand Up @@ -135,8 +134,7 @@ private TableInfo getTableInfo(String databaseName, String tableName) {
CarbonTable carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl();
org.apache.carbondata.format.TableInfo thriftTable = schemaConverter
.fromWrapperToExternalTableInfo(carbonTable.getTableInfo(), carbonTable.getDatabaseName(),
carbonTable.getTableName());
.fromWrapperToExternalTableInfo(carbonTable.getTableInfo());
assertTrue(null != thriftTable);
}

Expand Down
6 changes: 3 additions & 3 deletions examples/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,6 @@
</profile>
<profile>
<id>spark-2.3</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.binary.version>2.3</spark.binary.version>
</properties>
Expand All @@ -215,6 +212,9 @@
</profile>
<profile>
<id>spark-3.1</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.binary.version>3.1</spark.binary.version>
<dep.jackson.version>2.10.0</dep.jackson.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,7 @@ public CarbonTable createTable(

SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
org.apache.carbondata.format.TableInfo thriftTableInfo =
schemaConverter.fromWrapperToExternalTableInfo(
tableInfo,
tableInfo.getDatabaseName(),
tableInfo.getFactTable().getTableName());
schemaConverter.fromWrapperToExternalTableInfo(tableInfo);
org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry =
new org.apache.carbondata.format.SchemaEvolutionEntry(tableInfo.getLastUpdatedTime());
thriftTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history()
Expand Down
6 changes: 3 additions & 3 deletions index/secondary-index/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,6 @@
</profile>
<profile>
<id>spark-2.3</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.binary.version>2.3</spark.binary.version>
</properties>
Expand All @@ -173,6 +170,9 @@
</profile>
<profile>
<id>spark-3.1</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.binary.version>3.1</spark.binary.version>
</properties>
Expand Down
6 changes: 3 additions & 3 deletions integration/flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,6 @@
<profiles>
<profile>
<id>spark-2.3</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.binary.version>2.3</spark.binary.version>
</properties>
Expand Down Expand Up @@ -263,6 +260,9 @@
</profile>
<profile>
<id>spark-3.1</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.binary.version>3.1</spark.binary.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,7 @@ private static void writeSchemaFile(TableInfo tableInfo) throws IOException {
ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
thriftWriter.open(FileWriteOperation.OVERWRITE);
thriftWriter.write(schemaConverter
.fromWrapperToExternalTableInfo(tableInfo, tableInfo.getDatabaseName(),
tableInfo.getFactTable().getTableName()));
.fromWrapperToExternalTableInfo(tableInfo));
thriftWriter.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,7 @@ object CarbonDataStoreCreator {
val schemaConverter: SchemaConverter =
new ThriftWrapperSchemaConverterImpl()
val thriftTableInfo: TableInfo =
schemaConverter.fromWrapperToExternalTableInfo(
tableInfo,
tableInfo.getDatabaseName,
tableInfo.getFactTable.getTableName)
schemaConverter.fromWrapperToExternalTableInfo(tableInfo)
val schemaEvolutionEntry: SchemaEvolutionEntry =
new org.apache.carbondata.format.SchemaEvolutionEntry(
tableInfo.getLastUpdatedTime)
Expand Down
6 changes: 3 additions & 3 deletions integration/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -608,9 +608,6 @@
</profile>
<profile>
<id>spark-2.3</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.binary.version>2.3</spark.binary.version>
</properties>
Expand Down Expand Up @@ -693,6 +690,9 @@
</profile>
<profile>
<id>spark-3.1</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.binary.version>3.1</spark.binary.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.sql.execution

import org.apache.log4j.Logger
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalog.CatalogFactory
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.util.CreateTableCommonUtil.getCatalogTable

Expand All @@ -34,18 +35,18 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)
val sessionState = sparkSession.sessionState
if (sessionState.catalog.tableExists(table.identifier)) {
if (CatalogFactory.getCatalog.tableExists(table.identifier)(sparkSession)) {
if (ignoreIfExists) {
return Seq.empty[Row]
} else {
throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.")
}
}
val newTable: CatalogTable = getCatalogTable(sparkSession, sessionState, table, LOGGER)

// We will return Nil or throw exception at the beginning if the table already exists, so when
// we reach here, the table should not exist and we should set `ignoreIfExists` to false.
sessionState.catalog.createTable(newTable, ignoreIfExists = false, validateLocation = false)
CatalogFactory.getCatalog.createTable(newTable, ignoreIfExists = false,
validateLocation = false)(sparkSession)
Seq.empty[Row]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util
import scala.collection.JavaConverters._

import org.apache.spark.sql.{CarbonEnv, CarbonThreadUtil, SparkSession}
import org.apache.spark.sql.catalog.CatalogFactory

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
Expand All @@ -31,7 +32,7 @@ class MVManagerInSpark(session: SparkSession) extends MVManager {
override def getDatabases: util.List[String] = {
CarbonThreadUtil.threadSet(CarbonCommonConstants.CARBON_ENABLE_MV, "true")
try {
session.sessionState.catalog.listDatabases().asJava
CatalogFactory.getCatalog.listDatabases()(session).asJava
} finally {
CarbonThreadUtil.threadUnset(CarbonCommonConstants.CARBON_ENABLE_MV)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ package org.apache.spark.sql
import java.util.concurrent.ConcurrentHashMap

import org.apache.hadoop.fs.Path
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql.catalog.CatalogFactory
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener}
import org.apache.spark.sql.execution.command.CreateFunctionCommand
import org.apache.spark.sql.execution.command.mutation.merge.udf.BlockPathsUDF
Expand All @@ -42,7 +41,8 @@ import org.apache.carbondata.core.util._
import org.apache.carbondata.events._
import org.apache.carbondata.geo.GeoUdfRegister
import org.apache.carbondata.index.{TextMatchMaxDocUDF, TextMatchUDF}
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent,
LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
import org.apache.carbondata.spark.rdd.SparkReadSupport
import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
import org.apache.carbondata.view.{MVFunctions, TimeSeriesFunction}
Expand Down Expand Up @@ -342,8 +342,7 @@ object CarbonEnv {
*/
def getDatabaseLocation(dbName: String, sparkSession: SparkSession): String = {
var databaseLocation =
sparkSession.sessionState.catalog.asInstanceOf[SessionCatalog].getDatabaseMetadata(dbName)
.locationUri.toString
CatalogFactory.getCatalog.getDatabaseMetadata(dbName)(sparkSession).locationUri.toString
// for default database and db ends with .db
// check whether the carbon store and hive store is same or different.
if ((!EnvHelper.isLegacy(sparkSession)) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.command.cache
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalog.CatalogFactory
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.expressions.AttributeReference
Expand Down Expand Up @@ -158,7 +158,7 @@ case class CarbonShowCacheCommand(showExecutorCache: Boolean,
}
}
var carbonTables = mutable.ArrayBuffer[CarbonTable]()
sparkSession.sessionState.catalog.listTables(currentDatabase).foreach {
CatalogFactory.getCatalog.listTables(currentDatabase)(sparkSession).foreach {
tableIdent =>
try {
val carbonTable = CarbonEnv.getCarbonTable(tableIdent)(sparkSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command.index
import scala.collection.JavaConverters._

import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalog.CatalogFactory
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.DataCommand
import org.apache.spark.sql.hive.CarbonRelation
Expand Down Expand Up @@ -51,7 +52,7 @@ extends DataCommand {
triggerRepair(tableIdentifier.table, databaseName, indexnameOp, segments, sparkSession)
} else {
// repairing si for all index tables in the mentioned database in the repair command
sparkSession.sessionState.catalog.listTables(dbName).foreach {
CatalogFactory.getCatalog.listTables(dbName)(sparkSession).foreach {
tableIdent =>
triggerRepair(tableIdent.table, dbName, indexnameOp, segments, sparkSession)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql._
import org.apache.spark.sql.catalog.CatalogFactory
import org.apache.spark.sql.catalyst.CarbonParserUtil.initializeSpatialIndexInstance
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
Expand Down Expand Up @@ -96,7 +97,7 @@ case class RefreshCarbonTableCommand(
initializeSpatialIndexInstance(tableProperties.get(SPATIAL_INDEX_CLASS),
indexName, tableProperties.asScala)
val tableIdentifier = new TableIdentifier(tableName, Some(tableInfo.getDatabaseName))
if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
if (CatalogFactory.getCatalog.tableExists(tableIdentifier)(sparkSession)) {
// In direct upgrade scenario, if spatial table already exists then on refresh command,
// update the property in metadata and fail table creation.
LOGGER.info(s"Updating $SPATIAL_INDEX_INSTANCE table property on $tableName")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
newCols = newCols.filter(x => !x.isComplexColumn)
schemaEvolutionEntry.setAdded(newCols.toList.asJava)
val thriftTable = schemaConverter
.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
.fromWrapperToExternalTableInfo(wrapperTableInfo)
// carbon columns based on schema order
val carbonColumns = carbonTable.getCreateOrderColumn().asScala
.collect { case carbonColumn if !carbonColumn.isInvisible => carbonColumn.getColumnSchema }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.spark.sql.execution.command.schema

import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.catalog.CatalogFactory
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand}
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalogUtil, MockClassForAlterRevertTests}
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalogUtil,
MockClassForAlterRevertTests}
import org.apache.spark.sql.index.CarbonIndexUtil
import org.apache.spark.util.AlterTableUtil

Expand All @@ -35,7 +36,8 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.index.IndexType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent,
OperationContext, OperationListenerBus}
import org.apache.carbondata.format.SchemaEvolutionEntry
import org.apache.carbondata.view.MVManagerInSpark

Expand All @@ -58,7 +60,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
if (!oldDatabaseName.equalsIgnoreCase(newDatabaseName)) {
throw new MalformedCarbonCommandException("Database name should be same for both tables")
}
val tableExists = sparkSession.catalog.tableExists(oldDatabaseName, newTableIdentifier.table)
val tableExists = CatalogFactory.getCatalog.tableExists(newTableIdentifier)(sparkSession)
if (tableExists) {
throw new MalformedCarbonCommandException(s"Table with name $newTableIdentifier " +
s"already exists")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package org.apache.spark.sql.execution.command.table

import org.apache.hadoop.hive.metastore.api.AlreadyExistsException
import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonSource, Row, SparkSession}
import org.apache.spark.sql.catalog.CatalogFactory
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.execution.command.{CreateDataSourceTableCommand, DropTableCommand, MetadataCommand}
import org.apache.spark.sql.execution.command.{DropTableCommand, MetadataCommand}
import org.apache.spark.sql.execution.datasources.PartitioningUtils

import org.apache.carbondata.common.logging.LogServiceFactory
Expand All @@ -41,7 +42,7 @@ case class CarbonCreateDataSourceTableCommand(
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
if (sparkSession.sessionState.catalog.tableExists(table.identifier)) {
if (CatalogFactory.getCatalog.tableExists(table.identifier)(sparkSession)) {
if (ignoreIfExists) {
return Seq.empty[Row]
} else {
Expand Down
Loading

0 comments on commit 0e568ad

Please sign in to comment.