diff --git a/amazon-redshift-plugin/docs/Redshift-batchsource.md b/amazon-redshift-plugin/docs/Redshift-batchsource.md index 38873b15a..1034750ac 100644 --- a/amazon-redshift-plugin/docs/Redshift-batchsource.md +++ b/amazon-redshift-plugin/docs/Redshift-batchsource.md @@ -31,6 +31,10 @@ contain the '$CONDITIONS' string. For example, 'SELECT * FROM table WHERE $CONDI The '$CONDITIONS' string will be replaced by 'splitBy' field limits specified by the bounding query. The '$CONDITIONS' string is not required if numSplits is set to one. +**Import Query Type** - Determines how data is extracted—either by using a Table Name or a custom Import Query. + +**Table Name**: Extracts data directly from a specified database table. + **Bounding Query:** Bounding Query should return the min and max of the values of the 'splitBy' field. For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one. diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnector.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnector.java index fb8cac4a7..103452d17 100644 --- a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnector.java +++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnector.java @@ -111,6 +111,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa } sourceProperties.put(RedshiftSource.RedshiftSourceConfig.IMPORT_QUERY, getTableQuery(path.getDatabase(), schema, table)); + sourceProperties.put(RedshiftSource.RedshiftSourceConfig.PROPERTY_IMPORT_QUERY_TYPE, + RedshiftSource.RedshiftSourceConfig.IMPORT_QUERY); sourceProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table)); } diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java index df9938a45..ff24aa47f 100644 --- a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java +++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java @@ -22,7 +22,6 @@ import io.cdap.plugin.db.CommonSchemaReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -56,34 +55,12 @@ public RedshiftSchemaReader(String sessionID) { public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException { String typeName = metadata.getColumnTypeName(index); int columnType = metadata.getColumnType(index); + int precision = metadata.getPrecision(index); + String columnName = metadata.getColumnName(index); + int scale = metadata.getScale(index); + boolean isSigned = metadata.isSigned(index); - if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) { - return Schema.of(Schema.Type.STRING); - } - if (typeName.equalsIgnoreCase("INT")) { - return Schema.of(Schema.Type.INT); - } - if (typeName.equalsIgnoreCase("BIGINT")) { - return Schema.of(Schema.Type.LONG); - } - - // If it is a numeric type without precision then use the Schema of String to avoid any precision loss - if (Types.NUMERIC == columnType) { - int precision = metadata.getPrecision(index); - if (precision == 0) { - LOG.warn(String.format("Field '%s' is a %s type without precision and scale, " - + "converting into STRING type to avoid any precision loss.", - metadata.getColumnName(index), - metadata.getColumnTypeName(index))); - return Schema.of(Schema.Type.STRING); - } - } - - if (typeName.equalsIgnoreCase("timestamp")) { - return Schema.of(Schema.LogicalType.DATETIME); - } - - return super.getSchema(metadata, index); + return getSchema(typeName, columnType, precision, scale, columnName, isSigned, true); } @Override @@ -114,4 +91,45 @@ public List getSchemaFields(ResultSet resultSet) throws SQLExcepti return schemaFields; } + /** + * Returns the CDAP {@link Schema} for a database column based on JDBC metadata. + * Handles Redshift-specific and common JDBC types: + * Maps Redshift string types to {@link Schema.Type#STRING} + * Maps "INT" to {@link Schema.Type#INT} + * Maps "BIGINT" to {@link Schema.Type#LONG}. + * Maps NUMERIC with zero precision to {@link Schema.Type#STRING} and logs a warning. + * Maps "timestamp" to {@link Schema.LogicalType#DATETIME}. + * Delegates to the parent plugin for all other types. + * @param typeName SQL type name (e.g. "INT", "BIGINT", "timestamp") + * @param columnType JDBC type code (see {@link java.sql.Types}) + * @param precision column precision (for numeric types) + * @param scale column scale (for numeric types) + * @param columnName column name + * @param isSigned whether the column is signed + * @param handleAsDecimal whether to handle as decimal + * @return the mapped {@link Schema} type + */ + @Override + public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName, + boolean isSigned, boolean handleAsDecimal) { + if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) { + return Schema.of(Schema.Type.STRING); + } + if ("INT".equalsIgnoreCase(typeName)) { + return Schema.of(Schema.Type.INT); + } + if ("BIGINT".equalsIgnoreCase(typeName)) { + return Schema.of(Schema.Type.LONG); + } + if (Types.NUMERIC == columnType && precision == 0) { + LOG.warn(String.format("Field '%s' is a %s type without precision and scale," + + " converting into STRING type to avoid any precision loss.", + columnName, typeName)); + return Schema.of(Schema.Type.STRING); + } + if ("timestamp".equalsIgnoreCase(typeName)) { + return Schema.of(Schema.LogicalType.DATETIME); + } + return super.getSchema(typeName, columnType, precision, scale, columnName, isSigned, handleAsDecimal); + } } diff --git a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java index 6a0df3a2d..521f1a150 100644 --- a/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java +++ b/amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java @@ -17,6 +17,7 @@ package io.cdap.plugin.amazon.redshift; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Metadata; @@ -24,6 +25,8 @@ import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.StageConfigurer; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.cdap.etl.api.connector.Connector; @@ -34,12 +37,17 @@ import io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig; import io.cdap.plugin.db.source.AbstractDBSource; import io.cdap.plugin.util.DBUtils; +import io.cdap.plugin.util.ImportQueryType; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import java.util.Collections; import java.util.Map; import javax.annotation.Nullable; +import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.IMPORT_QUERY; +import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.PROPERTY_IMPORT_QUERY_TYPE; +import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.TABLE_NAME; + /** * Batch source to read from an Amazon Redshift database. */ @@ -59,6 +67,30 @@ public RedshiftSource(RedshiftSourceConfig redshiftSourceConfig) { this.redshiftSourceConfig = redshiftSourceConfig; } + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); + StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer(); + if (sourceConfig.containsMacro(TABLE_NAME) || sourceConfig.containsMacro(IMPORT_QUERY)) { + if (sourceConfig.getSchema() != null) { + stageConfigurer.setOutputSchema(sourceConfig.getSchema()); + } + return; + } + validateTableNameAndImportQuery(collector); + super.configurePipeline(pipelineConfigurer); + } + + @Override + public void prepareRun(BatchSourceContext context) throws Exception { + FailureCollector collector = context.getFailureCollector(); + if (sourceConfig.containsMacro(TABLE_NAME) || sourceConfig.containsMacro(IMPORT_QUERY)) { + return; + } + validateTableNameAndImportQuery(collector); + super.prepareRun(context); + } + @Override protected SchemaReader getSchemaReader() { return new RedshiftSchemaReader(); diff --git a/amazon-redshift-plugin/widgets/Redshift-batchsource.json b/amazon-redshift-plugin/widgets/Redshift-batchsource.json index 943e2d24e..e39771d81 100644 --- a/amazon-redshift-plugin/widgets/Redshift-batchsource.json +++ b/amazon-redshift-plugin/widgets/Redshift-batchsource.json @@ -108,6 +108,30 @@ { "label": "SQL Query", "properties": [ + { + "widget-type": "radio-group", + "label": "Import Query Type", + "name": "importQueryType", + "widget-attributes": { + "layout": "inline", + "default": "nativeQuery", + "options": [ + { + "id": "nativeQuery", + "label": "Native Query" + }, + { + "id": "namedTable", + "label": "Named Table" + } + ] + } + }, + { + "widget-type": "textbox", + "label": "Table Name", + "name": "tableName" + }, { "widget-type": "textarea", "label": "Import Query", @@ -229,6 +253,30 @@ } ] }, + { + "name": "ImportQuery", + "condition": { + "expression": "importQueryType != 'tableName'" + }, + "show": [ + { + "type": "property", + "name": "importQuery" + } + ] + }, + { + "name": "NativeTableName", + "condition": { + "expression": "importQueryType == 'tableName'" + }, + "show": [ + { + "type": "property", + "name": "tableName" + } + ] + } ], "jump-config": { "datasets": [ diff --git a/cloudsql-mysql-plugin/src/e2e-test/features/source/CloudMySqlDesignTimeValidation.feature b/cloudsql-mysql-plugin/src/e2e-test/features/source/CloudMySqlDesignTimeValidation.feature index c5fa3d226..8edc922d4 100644 --- a/cloudsql-mysql-plugin/src/e2e-test/features/source/CloudMySqlDesignTimeValidation.feature +++ b/cloudsql-mysql-plugin/src/e2e-test/features/source/CloudMySqlDesignTimeValidation.feature @@ -192,7 +192,23 @@ Feature: CloudMySql source- Verify CloudMySql source plugin design time validati | connectionName | | database | | referenceName | - | importQuery | + + @CloudMySql_Required + Scenario: To verify CloudSQLMySQL source plugin validation error message with blank import query + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "CloudSQL MySQL" from the plugins list as: "Source" + Then Navigate to the properties page of plugin: "CloudSQL MySQL" + Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName" + Then Select radio button plugin property: "instanceType" with value: "public" + Then Replace input plugin property: "connectionName" with value: "connectionName" for Credentials and Authorization related fields + Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields + Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Replace input plugin property: "database" with value: "DatabaseName" + Then Click on the Validate button + Then Verify that the Plugin Property: "importQuery" is displaying an in-line error message: "errorMessageImportQuery" + @CloudMySql_Required Scenario: To verify CloudSQLMySQL source plugin validation error message with invalid connection name with public instance diff --git a/cloudsql-mysql-plugin/src/e2e-test/resources/errorMessage.properties b/cloudsql-mysql-plugin/src/e2e-test/resources/errorMessage.properties index 89a1b23df..10fc16da5 100644 --- a/cloudsql-mysql-plugin/src/e2e-test/resources/errorMessage.properties +++ b/cloudsql-mysql-plugin/src/e2e-test/resources/errorMessage.properties @@ -24,3 +24,4 @@ errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: errorMessageInvalidPassword=SQL error while getting query schema: Error: Access denied for user errorMessagePrivateConnectionName=Enter the internal IP address of the Compute Engine VM cloudsql proxy is running on, to connect to a private errorMessageWithBlankPassword=Exception while trying to validate schema of database table +errorMessageImportQuery=Import Query cannot be null. Please specify the Import Query. diff --git a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json index 4ac7747f4..2e0f44bfe 100644 --- a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json +++ b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json @@ -127,6 +127,30 @@ { "label": "CloudSQL Properties", "properties": [ + { + "widget-type": "hidden", + "label": "Import Query Type", + "name": "importQueryType", + "widget-attributes": { + "layout": "inline", + "default": "nativeQuery", + "options": [ + { + "id": "nativeQuery", + "label": "Native Query" + }, + { + "id": "namedTable", + "label": "Named Table" + } + ] + } + }, + { + "widget-type": "hidden", + "label": "Table Name", + "name": "tableName" + }, { "widget-type": "textarea", "label": "Import Query", diff --git a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json index 96ea97ac2..e0ae611ae 100644 --- a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json +++ b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json @@ -127,6 +127,30 @@ { "label": "CloudSQL Properties", "properties": [ + { + "widget-type": "hidden", + "label": "Import Query Type", + "name": "importQueryType", + "widget-attributes": { + "layout": "inline", + "default": "nativeQuery", + "options": [ + { + "id": "nativeQuery", + "label": "Native Query" + }, + { + "id": "namedTable", + "label": "Named Table" + } + ] + } + }, + { + "widget-type": "hidden", + "label": "Table Name", + "name": "tableName" + }, { "widget-type": "textarea", "label": "Import Query", diff --git a/database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java b/database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java index 28c56db8c..c12de469c 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java @@ -20,6 +20,8 @@ import io.cdap.cdap.api.data.schema.Schema; import io.cdap.plugin.common.db.DBUtils; +import java.sql.Connection; +import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -29,7 +31,6 @@ * Common schema reader for mapping non specific DB types. */ public class CommonSchemaReader implements SchemaReader { - @Override public List getSchemaFields(ResultSet resultSet) throws SQLException { List schemaFields = Lists.newArrayList(); @@ -61,4 +62,67 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException { return false; } + + /** + * Returns the schema fields for the specified table using JDBC metadata. + * Supports schema-qualified table names (e.g. "schema.table"). + * Throws SQLException if the table has no columns. + * + * @param connection JDBC connection + * @param tableName table name, optionally schema-qualified + * @return list of schema fields + * @throws SQLException if no columns found or on database error + */ + @Override + public List getSchemaFields(Connection connection, String tableName) throws SQLException { + DatabaseMetaData dbMetaData = connection.getMetaData(); + String schema = null; + String table = tableName; + // Support schema-qualified table names like "schema.table" + if (tableName != null && tableName.contains(".")) { + String[] parts = tableName.split("\\.", 2); + schema = parts[0]; + table = parts[1]; + } + try (ResultSet columns = dbMetaData.getColumns(null, schema, table, null)) { + List schemaFields = Lists.newArrayList(); + while (columns.next()) { + String columnName = columns.getString("COLUMN_NAME"); + String typeName = columns.getString("TYPE_NAME"); + int columnType = columns.getInt("DATA_TYPE"); + int precision = columns.getInt("COLUMN_SIZE"); + int scale = columns.getInt("DECIMAL_DIGITS"); + int nullable = columns.getInt("NULLABLE"); + + Schema columnSchema = this.getSchema(typeName, columnType, precision, scale, columnName, true, true); + if (nullable == DatabaseMetaData.columnNullable) { + columnSchema = Schema.nullableOf(columnSchema); + } + Schema.Field field = Schema.Field.of(columnName, columnSchema); + schemaFields.add(field); + } + if (schemaFields.isEmpty()) { + throw new SQLException("No columns found for table: " + + (schema != null ? schema + "." : "") + table); + } + return schemaFields; + } + } + + /** + * Returns the CDAP schema for the given SQL column type. + * + * @param typeName SQL type name + * @param columnType JDBC type code + * @param precision Numeric precision + * @param scale Numeric scale + * @param columnName Column name + * @param isSigned Whether the column is signed + * @param handleAsDecimal Whether to treat as decimal + * @return Corresponding {@link Schema}, or null if not implemented + */ + public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName , + boolean isSigned, boolean handleAsDecimal) { + return DBUtils.getSchema(typeName, columnType, precision, scale, columnName, isSigned, handleAsDecimal); + } } diff --git a/database-commons/src/main/java/io/cdap/plugin/db/SchemaReader.java b/database-commons/src/main/java/io/cdap/plugin/db/SchemaReader.java index 442549917..52ab40a2b 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/SchemaReader.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/SchemaReader.java @@ -18,6 +18,7 @@ import io.cdap.cdap.api.data.schema.Schema; +import java.sql.Connection; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -64,4 +65,6 @@ public interface SchemaReader { * @throws SQLException */ boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException; + + List getSchemaFields(Connection connection, String tableName) throws SQLException; } diff --git a/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java index 41c577397..e439534af 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java @@ -28,6 +28,7 @@ import io.cdap.plugin.db.TransactionIsolationLevel; import io.cdap.plugin.db.connector.AbstractDBConnectorConfig; import io.cdap.plugin.db.source.AbstractDBSource; +import io.cdap.plugin.util.ImportQueryType; import java.io.IOException; import java.util.Collections; @@ -40,8 +41,9 @@ * Abstract Config for DB Specific Source plugin */ public abstract class AbstractDBSpecificSourceConfig extends PluginConfig implements DatabaseSourceConfig { - + public static final String TABLE_NAME = "tableName"; public static final String IMPORT_QUERY = "importQuery"; + public static final String PROPERTY_IMPORT_QUERY_TYPE = "importQueryType"; public static final String BOUNDING_QUERY = "boundingQuery"; public static final String SPLIT_BY = "splitBy"; public static final String NUM_SPLITS = "numSplits"; @@ -54,6 +56,18 @@ public abstract class AbstractDBSpecificSourceConfig extends PluginConfig implem @Description(Constants.Reference.REFERENCE_NAME_DESCRIPTION) public String referenceName; + @Nullable + @Name(PROPERTY_IMPORT_QUERY_TYPE) + @Description("Whether to select Table Name or Import Query to extract the data.") + public String importQueryType; + + @Nullable + @Name(TABLE_NAME) + @Description("The name of the table to import data from. This can be used instead of specifying an import query.") + @Macro + protected String tableName; + + @Nullable @Name(IMPORT_QUERY) @Description("The SELECT query to use to import data from the specified table. " + "You can specify an arbitrary number of columns to import, or import all columns using *. " + @@ -103,11 +117,20 @@ public String getImportQuery() { return cleanQuery(importQuery); } + public String getTableName() { + return tableName; + } + + @Nullable + public String getImportQueryType() { + return importQueryType == null ? ImportQueryType.IMPORT_QUERY.name() : importQueryType; + } + public String getBoundingQuery() { return cleanQuery(boundingQuery); } - public void validate(FailureCollector collector) { + public void validate(FailureCollector collector) { boolean hasOneSplit = false; if (!containsMacro(NUM_SPLITS) && numSplits != null) { if (numSplits < 1) { @@ -125,10 +148,16 @@ public void validate(FailureCollector collector) { TransactionIsolationLevel.validate(getTransactionIsolationLevel(), collector); } - if (!containsMacro(IMPORT_QUERY) && Strings.isNullOrEmpty(importQuery)) { - collector.addFailure("Import Query is empty.", "Specify the Import Query.") - .withConfigProperty(IMPORT_QUERY); - } + if (!containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) { + ImportQueryType importQueryType = ImportQueryType.fromString(getImportQueryType()); + boolean isImportQuery = importQueryType == ImportQueryType.IMPORT_QUERY; + + if ((isImportQuery && !containsMacro(IMPORT_QUERY) && Strings.isNullOrEmpty(importQuery)) || + (!isImportQuery && !containsMacro(TABLE_NAME) && Strings.isNullOrEmpty(tableName))) { + collector.addFailure("Import Query cannot be null.", "Please specify the Import Query.") + .withConfigProperty(isImportQuery ? IMPORT_QUERY : TABLE_NAME); + } + } if (!hasOneSplit && !containsMacro(IMPORT_QUERY) && !getImportQuery().contains("$CONDITIONS")) { collector.addFailure(String.format( diff --git a/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseSourceConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseSourceConfig.java index 8987377b9..c46feadac 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseSourceConfig.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseSourceConfig.java @@ -90,4 +90,15 @@ public interface DatabaseSourceConfig extends DatabaseConnectionConfig { * @return the number of rows to fetch at a time per split */ Integer getFetchSize(); + + /** + * Returns the name of the table from which data will be imported. + */ + String getTableName(); + + /** + * @return a {@link String} indicating the import query type, + * typically "table" (for table name) or "query" (for custom import query) + */ + String getImportQueryType(); } diff --git a/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java b/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java index 54d1e2ab6..d3e308462 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java @@ -52,6 +52,7 @@ import io.cdap.plugin.db.config.DatabaseSourceConfig; import io.cdap.plugin.util.DBUtils; import io.cdap.plugin.util.DriverCleanup; +import io.cdap.plugin.util.ImportQueryType; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; @@ -72,6 +73,11 @@ import java.util.stream.Collectors; import javax.annotation.Nullable; +import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.IMPORT_QUERY; +import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.PROPERTY_IMPORT_QUERY_TYPE; +import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.TABLE_NAME; + + /** * Batch source to read from a DB table * @param the DB Source config @@ -163,11 +169,21 @@ public Schema getSchema() throws SQLException { try (Connection connection = getConnection()) { executeInitQueries(connection, sourceConfig.getInitQueries()); String query = sourceConfig.getImportQuery(); - return loadSchemaFromDB(connection, query); + ImportQueryType type = ImportQueryType.fromString(sourceConfig.getImportQueryType()); + if (type == ImportQueryType.TABLE_NAME) { + List fields = getSchemaReader().getSchemaFields(connection, sourceConfig.getTableName()); + return Schema.recordOf("schema", fields); + } + return loadSchemaFromDBwithQuery(connection, query); } } - private Schema loadSchemaFromDB(Connection connection, String query) throws SQLException { + private Schema loadSchemaFromDBwithTableName(Connection connection, String tableName) throws SQLException { + return Schema.recordOf("schema", getSchemaReader().getSchemaFields(connection, sourceConfig.getTableName())); + } + + + private Schema loadSchemaFromDBwithQuery(Connection connection, String query) throws SQLException { Statement statement = connection.createStatement(); statement.setMaxRows(1); if (query.contains("$CONDITIONS")) { @@ -191,13 +207,18 @@ private Schema loadSchemaFromDB(Class driverClass) String connectionString = sourceConfig.getConnectionString(); DriverCleanup driverCleanup = DBUtils.ensureJDBCDriverIsAvailable(driverClass, connectionString, sourceConfig.getJdbcPluginName()); - Properties connectionProperties = new Properties(); connectionProperties.putAll(sourceConfig.getConnectionArguments()); try (Connection connection = DriverManager.getConnection(connectionString, connectionProperties)) { executeInitQueries(connection, sourceConfig.getInitQueries()); - return loadSchemaFromDB(connection, sourceConfig.getImportQuery()); - + String importQuery = sourceConfig.getImportQuery(); + String tableName = sourceConfig.getTableName(); + ImportQueryType type = ImportQueryType.fromString(sourceConfig.getImportQueryType()); + if (type == ImportQueryType.TABLE_NAME) { + return loadSchemaFromDBwithTableName(connection, tableName); + } else { + return loadSchemaFromDBwithQuery(connection, importQuery); + } } catch (SQLException e) { // wrap exception to ensure SQLException-child instances not exposed to contexts without jdbc driver in classpath String errorMessage = @@ -334,9 +355,15 @@ public ConnectionConfigAccessor getConnectionConfigAccessor (String driverClassN if (sourceConfig.getFetchSize() != null) { connectionConfigAccessor.setFetchSize(sourceConfig.getFetchSize()); } - + ImportQueryType type = ImportQueryType.fromString(sourceConfig.getImportQueryType()); + String query; + if (type == ImportQueryType.IMPORT_QUERY) { + query = sourceConfig.getImportQuery(); + } else { + query = String.format("SELECT * FROM %s", sourceConfig.getTableName()); + } DataDrivenETLDBInputFormat.setInput(connectionConfigAccessor.getConfiguration(), getDBRecordType(), - sourceConfig.getImportQuery(), sourceConfig.getBoundingQuery(), + query , sourceConfig.getBoundingQuery(), false); if (sourceConfig.getTransactionIsolationLevel() != null) { @@ -408,6 +435,42 @@ private String getJDBCPluginId() { protected abstract String createConnectionString(); + /** + * Validates that either an import query or a table name is provided, according to the selected import query type. + * If the {@code importQueryType} property does not contain a macro, this method checks: + * If {@code importQueryType} is {@code IMPORT_QUERY}, ensures that the {@code importQuery} property is not empty. + * If {@code importQueryType} is {@code TABLE_NAME}, ensures that the {@code tableName} property is not empty. + * If the {@code importQueryType} property contains a macro, this method checks that at least one of + * {@code importQuery} or {@code tableName} is provided. + * Any validation failures are added to the provided {@link FailureCollector}. If any failures are present, + * this method will throw an exception at the end of validation. + * @param collector the {@link FailureCollector} used to collect validation failures + */ + public void validateTableNameAndImportQuery(FailureCollector collector) { + if (!sourceConfig.containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) { + ImportQueryType type = ImportQueryType.fromString(sourceConfig.getImportQueryType()); + boolean isImportQuery = type == ImportQueryType.IMPORT_QUERY; + + if (isImportQuery && Strings.isNullOrEmpty(sourceConfig.getImportQuery())) { + collector.addFailure("Import Query cannot be empty", "Please specify Import Query!") + .withConfigProperty(IMPORT_QUERY); + } else if (!isImportQuery && Strings.isNullOrEmpty(sourceConfig.getTableName())) { + collector.addFailure("Table Name cannot be empty", "Please specify Table Name!") + .withConfigProperty(TABLE_NAME); + } + } else { + boolean importQueryEmpty = Strings.isNullOrEmpty(sourceConfig.getImportQuery()); + boolean tableNameEmpty = Strings.isNullOrEmpty(sourceConfig.getTableName()); + if (importQueryEmpty && tableNameEmpty) { + collector.addFailure("Either 'Import Query' or 'Table Name' must be provided.", + "Please specify Either 'ImportQuery' or 'Table Name.") + .withConfigProperty(IMPORT_QUERY) + .withConfigProperty(TABLE_NAME); + } + } + collector.getOrThrowException(); + } + /** * {@link PluginConfig} for {@link AbstractDBSource} */ @@ -420,6 +483,7 @@ public abstract static class DBSourceConfig extends DBConfig implements Database public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel"; public static final String FETCH_SIZE = "fetchSize"; + @Nullable @Name(IMPORT_QUERY) @Description("The SELECT query to use to import data from the specified table. " + "You can specify an arbitrary number of columns to import, or import all columns using *. " + @@ -469,6 +533,14 @@ public String getImportQuery() { return cleanQuery(importQuery); } + public String getTableName() { + return null; + } + + public String getImportQueryType() { + return null; + } + public String getBoundingQuery() { return cleanQuery(boundingQuery); } diff --git a/database-commons/src/main/java/io/cdap/plugin/util/ImportQueryType.java b/database-commons/src/main/java/io/cdap/plugin/util/ImportQueryType.java new file mode 100644 index 000000000..8ae4b33cd --- /dev/null +++ b/database-commons/src/main/java/io/cdap/plugin/util/ImportQueryType.java @@ -0,0 +1,48 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.util; + +/** + * Enum to specify the import query type used. + */ +public enum ImportQueryType { + IMPORT_QUERY("nativeQuery"), + TABLE_NAME("namedTable"); + + private String value; + + ImportQueryType(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public static ImportQueryType fromString(String value) { + if (value == null) { + return ImportQueryType.IMPORT_QUERY; + } + + for (ImportQueryType type : ImportQueryType.values()) { + if (type.value.equalsIgnoreCase(value)) { + return type; + } + } + return ImportQueryType.IMPORT_QUERY; + } +} diff --git a/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java b/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java index cbe1361d0..61fa4f678 100644 --- a/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java +++ b/database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java @@ -25,10 +25,14 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Types; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; @@ -40,11 +44,122 @@ public class CommonSchemaReaderTest { @Mock ResultSetMetaData metadata; + @Mock + Connection mockConn; + @Mock + DatabaseMetaData mockDbMeta; + @Mock + ResultSet mockColumns; + @Mock + ResultSet mockTables; + + @Before public void before() { - reader = new CommonSchemaReader(); + reader = new CommonSchemaReader() { + @Override + public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName, + boolean isSigned, boolean handleAsDecimal) { + if ("INTEGER".equalsIgnoreCase(typeName) || columnType == Types.INTEGER) { + return Schema.of(Schema.Type.INT); + } + if ("VARCHAR".equalsIgnoreCase(typeName) || columnType == Types.VARCHAR) { + return Schema.of(Schema.Type.STRING); + } + if ("BIGINT".equalsIgnoreCase(typeName) || columnType == Types.BIGINT) { + return Schema.of(Schema.Type.LONG); + } + return Schema.of(Schema.Type.STRING); + } + }; + } + + /** + * Test: getSchemaFields(Connection, String) with a simple table name. + * This covers the case where the table exists, and two columns are present: + * one NOT NULL integer, one nullable string. + */ + @Test + public void testGetSchemaFieldsWithConnection() throws Exception { + when(mockConn.getMetaData()).thenReturn(mockDbMeta); + + when(mockDbMeta.getColumns(any(), any(), eq("MYTABLE"), any())).thenReturn(mockColumns); + when(mockColumns.next()).thenReturn(true, true, false); + when(mockColumns.getString("COLUMN_NAME")).thenReturn("id", "name"); + when(mockColumns.getString("TYPE_NAME")).thenReturn("INTEGER", "VARCHAR"); + when(mockColumns.getInt("DATA_TYPE")).thenReturn(Types.INTEGER, Types.VARCHAR); + when(mockColumns.getInt("COLUMN_SIZE")).thenReturn(10, 255); + when(mockColumns.getInt("DECIMAL_DIGITS")).thenReturn(0, 0); + when(mockColumns.getInt("NULLABLE")).thenReturn(DatabaseMetaData.columnNoNulls, DatabaseMetaData.columnNullable); + + java.util.List fields = reader.getSchemaFields(mockConn, "MYTABLE"); + + Assert.assertEquals(2, fields.size()); + Assert.assertEquals("id", fields.get(0).getName()); + Assert.assertEquals(Schema.of(Schema.Type.INT), fields.get(0).getSchema()); + Assert.assertEquals("name", fields.get(1).getName()); + Assert.assertTrue(fields.get(1).getSchema().isNullable()); + Assert.assertEquals(Schema.of(Schema.Type.STRING), fields.get(1).getSchema().getNonNullable()); + } + + /** + * Test: getSchemaFields(Connection, String) with a schema-qualified table name. + * This checks that "myschema.MYTABLE" is parsed and resolved correctly. + */ + @Test + public void testGetSchemaFieldsWithSchemaQualifiedName() throws Exception { + // Setup for schema-qualified table name "myschema.MYTABLE" + when(mockConn.getMetaData()).thenReturn(mockDbMeta); + + when(mockDbMeta.getColumns(any(), eq("myschema"), eq("MYTABLE"), any())).thenReturn(mockColumns); + when(mockColumns.next()).thenReturn(true, false); + when(mockColumns.getString("COLUMN_NAME")).thenReturn("id"); + when(mockColumns.getString("TYPE_NAME")).thenReturn("INTEGER"); + when(mockColumns.getInt("DATA_TYPE")).thenReturn(Types.INTEGER); + when(mockColumns.getInt("COLUMN_SIZE")).thenReturn(10); + when(mockColumns.getInt("DECIMAL_DIGITS")).thenReturn(0); + when(mockColumns.getInt("NULLABLE")).thenReturn(DatabaseMetaData.columnNoNulls); + + java.util.List fields = reader.getSchemaFields(mockConn, "myschema.MYTABLE"); + Assert.assertEquals(1, fields.size()); + Assert.assertEquals("id", fields.get(0).getName()); + Assert.assertEquals(Schema.of(Schema.Type.INT), fields.get(0).getSchema()); + } + + /** + * Test: Nullability logic is correct for columns. + */ + @Test + public void testGetSchemaFieldsHandlesNullability() throws Exception { + when(mockConn.getMetaData()).thenReturn(mockDbMeta); + when(mockDbMeta.getColumns(any(), any(), eq("MYTABLE"), any())).thenReturn(mockColumns); + when(mockColumns.next()).thenReturn(true, true, false); + when(mockColumns.getString("COLUMN_NAME")).thenReturn("col1", "col2"); + when(mockColumns.getString("TYPE_NAME")).thenReturn("INTEGER", "VARCHAR"); + when(mockColumns.getInt("DATA_TYPE")).thenReturn(Types.INTEGER, Types.VARCHAR); + when(mockColumns.getInt("COLUMN_SIZE")).thenReturn(10, 255); + when(mockColumns.getInt("DECIMAL_DIGITS")).thenReturn(0, 0); + when(mockColumns.getInt("NULLABLE")).thenReturn(DatabaseMetaData.columnNullable, DatabaseMetaData.columnNoNulls); + + java.util.List fields = reader.getSchemaFields(mockConn, "MYTABLE"); + Assert.assertTrue(fields.get(0).getSchema().isNullable()); + Assert.assertFalse(fields.get(1).getSchema().isNullable()); } + /** + * Test: Exception is thrown when table is not found. + */ + @Test(expected = SQLException.class) + public void testGetSchemaFieldsThrowsWhenTableNotFound() throws Exception { + when(mockConn.getMetaData()).thenReturn(mockDbMeta); + when(mockDbMeta.getColumns(any(), any(), eq("NOTABLE"), any())).thenReturn(mockColumns); + when(mockColumns.next()).thenReturn(false); // No columns found + + reader.getSchemaFields(mockConn, "NOTABLE"); + } + + + @Test public void testGetSchemaHandlesNull() throws SQLException { when(metadata.getColumnType(eq(1))).thenReturn(Types.NULL); diff --git a/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java b/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java index a8be38b46..a868185aa 100644 --- a/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java +++ b/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java @@ -48,6 +48,9 @@ public class AbstractDBSourceTest { public String getConnectionString() { return ""; } + public String getTableName() { + return ""; + } }; @Test diff --git a/mssql-plugin/src/e2e-test/features/mssql/mssql source/DesignTimeValidation.feature b/mssql-plugin/src/e2e-test/features/mssql/mssql source/DesignTimeValidation.feature index cc3bf8562..a3eb82ae3 100644 --- a/mssql-plugin/src/e2e-test/features/mssql/mssql source/DesignTimeValidation.feature +++ b/mssql-plugin/src/e2e-test/features/mssql/mssql source/DesignTimeValidation.feature @@ -212,7 +212,22 @@ Feature: Mssql source- Verify Mssql source plugin design time validation scenari | jdbcPluginName | | referenceName | | database | - | importQuery | + + @Mssql_Required + Scenario: To verify MSSQL source plugin validation error message with blank import query + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "SQL Server" from the plugins list as: "Source" + Then Navigate to the properties page of plugin: "SQL Server" + Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName" + Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields + Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields + Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields + Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Replace input plugin property: "database" with value: "databaseName" + Then Click on the Validate button + Then Verify that the Plugin Property: "importQuery" is displaying an in-line error message: "errorMessageImportQuery" @Mssql_Required Scenario: Verify the validation error message with missing jdbc plugin name diff --git a/mssql-plugin/src/e2e-test/resources/errorMessage.properties b/mssql-plugin/src/e2e-test/resources/errorMessage.properties index 4eb83c386..c0c8a399e 100644 --- a/mssql-plugin/src/e2e-test/resources/errorMessage.properties +++ b/mssql-plugin/src/e2e-test/resources/errorMessage.properties @@ -24,3 +24,4 @@ errorMessageInvalidCredentialSource=Spark program 'phase-1' failed with error: S errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: Stage 'SQL Server' encountered : java.io.IOException: Could not find stored procedure blank.jdbcPluginName.message=Required property 'jdbcPluginName' has no value. blank.connection.message=Exception while trying to validate schema of database table +errorMessageImportQuery=Import Query cannot be null. Please specify the Import Query. diff --git a/mssql-plugin/widgets/SqlServer-batchsource.json b/mssql-plugin/widgets/SqlServer-batchsource.json index b3494e485..989c4478c 100644 --- a/mssql-plugin/widgets/SqlServer-batchsource.json +++ b/mssql-plugin/widgets/SqlServer-batchsource.json @@ -140,6 +140,30 @@ "widget-type": "get-schema", "widget-category": "plugin" }, + { + "widget-type": "hidden", + "label": "Import Query Type", + "name": "importQueryType", + "widget-attributes": { + "layout": "inline", + "default": "nativeQuery", + "options": [ + { + "id": "nativeQuery", + "label": "Native Query" + }, + { + "id": "namedTable", + "label": "Named Table" + } + ] + } + }, + { + "widget-type": "hidden", + "label": "Table Name", + "name": "tableName" + }, { "widget-type": "textarea", "label": "Import Query", diff --git a/mysql-plugin/widgets/Mysql-batchsource.json b/mysql-plugin/widgets/Mysql-batchsource.json index 506e837f7..e978be0f3 100644 --- a/mysql-plugin/widgets/Mysql-batchsource.json +++ b/mysql-plugin/widgets/Mysql-batchsource.json @@ -121,6 +121,30 @@ "widget-type": "get-schema", "widget-category": "plugin" }, + { + "widget-type": "hidden", + "label": "Import Query Type", + "name": "importQueryType", + "widget-attributes": { + "layout": "inline", + "default": "nativeQuery", + "options": [ + { + "id": "nativeQuery", + "label": "Native Query" + }, + { + "id": "namedTable", + "label": "Named Table" + } + ] + } + }, + { + "widget-type": "hidden", + "label": "Table Name", + "name": "tableName" + }, { "widget-type": "textarea", "label": "Import Query", diff --git a/oracle-plugin/src/e2e-test/features/source/OracleDesignTimeValidation.feature b/oracle-plugin/src/e2e-test/features/source/OracleDesignTimeValidation.feature index f6ea23407..f62a3f304 100644 --- a/oracle-plugin/src/e2e-test/features/source/OracleDesignTimeValidation.feature +++ b/oracle-plugin/src/e2e-test/features/source/OracleDesignTimeValidation.feature @@ -27,7 +27,23 @@ Feature: Oracle source- Verify Oracle source plugin design time validation scena | jdbcPluginName | | database | | referenceName | - | importQuery | + + Scenario: To verify Oracle source plugin validation error message with blank import query + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "Oracle" from the plugins list as: "Source" + Then Navigate to the properties page of plugin: "Oracle" + Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "driverName" + Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields + Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields + Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields + Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields + Then Select radio button plugin property: "connectionType" with value: "service" + Then Select radio button plugin property: "role" with value: "normal" + Then Enter input plugin property: "referenceName" with value: "sourceRef" + Then Replace input plugin property: "database" with value: "databaseName" + Then Click on the Validate button + Then Verify that the Plugin Property: "importQuery" is displaying an in-line error message: "errorMessageImportQuery" Scenario: To verify Oracle source plugin validation error message with invalid reference test data Given Open Datafusion Project to configure pipeline diff --git a/oracle-plugin/src/e2e-test/resources/errorMessage.properties b/oracle-plugin/src/e2e-test/resources/errorMessage.properties index 620b91af8..7c3206609 100644 --- a/oracle-plugin/src/e2e-test/resources/errorMessage.properties +++ b/oracle-plugin/src/e2e-test/resources/errorMessage.properties @@ -19,3 +19,4 @@ errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: java.io.IOException: ORA-00936: missing expression . Please check the system logs for more details. blank.database.message=Required property 'database' has no value. blank.connection.message=Exception while trying to validate schema of database table +errorMessageImportQuery=Import Query cannot be null. Please specify the Import Query. diff --git a/oracle-plugin/widgets/Oracle-batchsource.json b/oracle-plugin/widgets/Oracle-batchsource.json index 404262fb2..27073e80c 100644 --- a/oracle-plugin/widgets/Oracle-batchsource.json +++ b/oracle-plugin/widgets/Oracle-batchsource.json @@ -224,6 +224,30 @@ "widget-type": "get-schema", "widget-category": "plugin" }, + { + "widget-type": "hidden", + "label": "Import Query Type", + "name": "importQueryType", + "widget-attributes": { + "layout": "inline", + "default": "nativeQuery", + "options": [ + { + "id": "NativeQuery", + "label": "Native Query" + }, + { + "id": "namedTable", + "label": "Named Table" + } + ] + } + }, + { + "widget-type": "hidden", + "label": "Table Name", + "name": "tableName" + }, { "widget-type": "textarea", "label": "Import Query", diff --git a/postgresql-plugin/docs/Postgres-batchsource.md b/postgresql-plugin/docs/Postgres-batchsource.md index 559723526..3a9abc911 100644 --- a/postgresql-plugin/docs/Postgres-batchsource.md +++ b/postgresql-plugin/docs/Postgres-batchsource.md @@ -38,6 +38,10 @@ contain the '$CONDITIONS' string. For example, 'SELECT * FROM table WHERE $CONDI The '$CONDITIONS' string will be replaced by 'splitBy' field limits specified by the bounding query. The '$CONDITIONS' string is not required if numSplits is set to one. +**Import Query Type** - Determines how data is extracted—either by using a Table Name or a custom Import Query. + +**Table Name**: Extracts data directly from a specified database table. + **Bounding Query:** Bounding Query should return the min and max of the values of the 'splitBy' field. For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one. diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresConnector.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresConnector.java index deb56ed79..e94fc8f5f 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresConnector.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresConnector.java @@ -99,6 +99,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa } sourceProperties.put(PostgresSource.PostgresSourceConfig.IMPORT_QUERY, getTableQuery(path.getDatabase(), schema, table)); + sourceProperties.put(PostgresSource.PostgresSourceConfig.PROPERTY_IMPORT_QUERY_TYPE, + PostgresSource.PostgresSourceConfig.IMPORT_QUERY); sinkProperties.put(PostgresSink.PostgresSinkConfig.TABLE_NAME, table); sourceProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table)); sinkProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table)); diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java index 1f3435b10..bcb8444ac 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java @@ -21,7 +21,6 @@ import io.cdap.plugin.db.CommonSchemaReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Types; @@ -57,36 +56,58 @@ public PostgresSchemaReader(String sessionID) { public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException { String typeName = metadata.getColumnTypeName(index); int columnType = metadata.getColumnType(index); + int precision = metadata.getPrecision(index); + String columnName = metadata.getColumnName(index); + int scale = metadata.getScale(index); + boolean isSigned = metadata.isSigned(index); + + return getSchema(typeName, columnType, precision, scale, columnName, isSigned, true); + } + @Override + public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException { + if (sessionID == null) { + return false; + } + return metadata.getColumnName(index).equals("c_" + sessionID) || + metadata.getColumnName(index).equals("sqn_" + sessionID); + } + + /** + * Returns the CDAP schema for a PostgreSQL column, handling special cases for certain types. + * Maps PostgreSQL-specific types (like bit, timetz, money, arrays) to STRING, handles + * INT and BIGINT directly, and maps numeric/decimal types with zero precision to STRING + * to avoid precision loss. Timestamps are mapped to DATETIME. Falls back to DBUtils for others. + * + * @param typeName SQL type name (e.g., "INT", "NUMERIC") + * @param columnType JDBC type constant + * @param precision Numeric precision + * @param scale Numeric scale + * @param columnName Column name (for logging) + * @param isSigned Whether the column is signed + * @param handleAsDecimal Whether to treat as decimal + * @return Corresponding CDAP {@link Schema} + */ + @Override + public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName, + boolean isSigned, boolean handleAsDecimal) { if (STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(typeName) || STRING_MAPPED_POSTGRES_TYPES.contains(columnType)) { return Schema.of(Schema.Type.STRING); } - // If it is a numeric type without precision then use the Schema of String to avoid any precision loss if (Types.NUMERIC == columnType) { - int precision = metadata.getPrecision(index); if (precision == 0) { LOG.warn(String.format("Field '%s' is a %s type without precision and scale, " + "converting into STRING type to avoid any precision loss.", - metadata.getColumnName(index), - metadata.getColumnTypeName(index))); + columnName, typeName)); return Schema.of(Schema.Type.STRING); } - } - - if (typeName.equalsIgnoreCase("timestamp")) { - return Schema.of(Schema.LogicalType.DATETIME); + return Schema.decimalOf(precision, scale); } - return super.getSchema(metadata, index); - } - - @Override - public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException { - if (sessionID == null) { - return false; + if ("timestamp".equalsIgnoreCase(typeName)) { + return Schema.of(Schema.LogicalType.DATETIME); } - return metadata.getColumnName(index).equals("c_" + sessionID) || - metadata.getColumnName(index).equals("sqn_" + sessionID); + return super.getSchema(typeName, columnType, precision, scale, columnName, isSigned, handleAsDecimal); } } diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java index b230f3d1e..481169175 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSource.java @@ -25,6 +25,8 @@ import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.StageConfigurer; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.cdap.etl.api.connector.Connector; @@ -40,6 +42,9 @@ import java.util.Map; import javax.annotation.Nullable; +import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.IMPORT_QUERY; +import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.TABLE_NAME; + /** * Batch source to read from PostgreSQL. */ @@ -57,6 +62,31 @@ public PostgresSource(PostgresSourceConfig postgresSourceConfig) { this.postgresSourceConfig = postgresSourceConfig; } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); + StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer(); + if (sourceConfig.containsMacro(TABLE_NAME) || sourceConfig.containsMacro(IMPORT_QUERY)) { + if (sourceConfig.getSchema() != null) { + stageConfigurer.setOutputSchema(sourceConfig.getSchema()); + } + return; + } + validateTableNameAndImportQuery(collector); + super.configurePipeline(pipelineConfigurer); + } + + @Override + public void prepareRun(BatchSourceContext context) throws Exception { + FailureCollector collector = context.getFailureCollector(); + if (sourceConfig.containsMacro(TABLE_NAME) || sourceConfig.containsMacro(IMPORT_QUERY)) { + return; + } + validateTableNameAndImportQuery(collector); + super.prepareRun(context); + } + @Override protected String createConnectionString() { return postgresSourceConfig.getConnectionString(); diff --git a/postgresql-plugin/widgets/Postgres-batchsource.json b/postgresql-plugin/widgets/Postgres-batchsource.json index 60de4725f..555a561fa 100644 --- a/postgresql-plugin/widgets/Postgres-batchsource.json +++ b/postgresql-plugin/widgets/Postgres-batchsource.json @@ -120,6 +120,30 @@ "widget-type": "get-schema", "widget-category": "plugin" }, + { + "widget-type": "radio-group", + "label": "Import Query Type", + "name": "importQueryType", + "widget-attributes": { + "layout": "inline", + "default": "nativeQuery", + "options": [ + { + "id": "nativeQuery", + "label": "Native Query" + }, + { + "id": "namedTable", + "label": "Named Table" + } + ] + } + }, + { + "widget-type": "textbox", + "label": "Table Name", + "name": "tableName" + }, { "widget-type": "textarea", "label": "Import Query", @@ -244,6 +268,30 @@ "name": "connection" } ] + }, + { + "name": "ImportQuery", + "condition": { + "expression": "importQueryType != 'namedTable'" + }, + "show": [ + { + "type": "property", + "name": "importQuery" + } + ] + }, + { + "name": "NativeTableName", + "condition": { + "expression": "importQueryType == 'namedTable'" + }, + "show": [ + { + "type": "property", + "name": "tableName" + } + ] } ], "jump-config": {