Skip to content

Commit 5828606

Browse files
psainicsAbhishekKumar9984
authored andcommitted
adding import query type property to db plugin.which will only reflect in redshift and Postgres plugin.
1 parent e5c5794 commit 5828606

File tree

24 files changed

+738
-64
lines changed

24 files changed

+738
-64
lines changed

amazon-redshift-plugin/docs/Redshift-batchsource.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ contain the '$CONDITIONS' string. For example, 'SELECT * FROM table WHERE $CONDI
3131
The '$CONDITIONS' string will be replaced by 'splitBy' field limits specified by the bounding query.
3232
The '$CONDITIONS' string is not required if numSplits is set to one.
3333

34+
**Import Query Type** - Determines how data is extracted—either by using a Table Name or a custom Import Query.
35+
36+
**Table Name**: Extracts data directly from a specified database table.
37+
3438
**Bounding Query:** Bounding Query should return the min and max of the values of the 'splitBy' field.
3539
For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one.
3640

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
111111
}
112112
sourceProperties.put(RedshiftSource.RedshiftSourceConfig.IMPORT_QUERY,
113113
getTableQuery(path.getDatabase(), schema, table));
114+
sourceProperties.put(RedshiftSource.RedshiftSourceConfig.PROPERTY_IMPORT_QUERY_TYPE,
115+
RedshiftSource.RedshiftSourceConfig.IMPORT_QUERY);
114116
sourceProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
115117
}
116118

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.cdap.plugin.db.CommonSchemaReader;
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
25-
2625
import java.sql.ResultSet;
2726
import java.sql.ResultSetMetaData;
2827
import java.sql.SQLException;
@@ -56,34 +55,12 @@ public RedshiftSchemaReader(String sessionID) {
5655
public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException {
5756
String typeName = metadata.getColumnTypeName(index);
5857
int columnType = metadata.getColumnType(index);
58+
int precision = metadata.getPrecision(index);
59+
String columnName = metadata.getColumnName(index);
60+
int scale = metadata.getScale(index);
61+
boolean isSigned = metadata.isSigned(index);
5962

60-
if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
61-
return Schema.of(Schema.Type.STRING);
62-
}
63-
if (typeName.equalsIgnoreCase("INT")) {
64-
return Schema.of(Schema.Type.INT);
65-
}
66-
if (typeName.equalsIgnoreCase("BIGINT")) {
67-
return Schema.of(Schema.Type.LONG);
68-
}
69-
70-
// If it is a numeric type without precision then use the Schema of String to avoid any precision loss
71-
if (Types.NUMERIC == columnType) {
72-
int precision = metadata.getPrecision(index);
73-
if (precision == 0) {
74-
LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
75-
+ "converting into STRING type to avoid any precision loss.",
76-
metadata.getColumnName(index),
77-
metadata.getColumnTypeName(index)));
78-
return Schema.of(Schema.Type.STRING);
79-
}
80-
}
81-
82-
if (typeName.equalsIgnoreCase("timestamp")) {
83-
return Schema.of(Schema.LogicalType.DATETIME);
84-
}
85-
86-
return super.getSchema(metadata, index);
63+
return getSchema(typeName, columnType, precision, scale, columnName, isSigned, true);
8764
}
8865

8966
@Override
@@ -114,4 +91,45 @@ public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLExcepti
11491
return schemaFields;
11592
}
11693

94+
/**
95+
* Returns the CDAP {@link Schema} for a database column based on JDBC metadata.
96+
* Handles Redshift-specific and common JDBC types:
97+
* Maps Redshift string types to {@link Schema.Type#STRING}
98+
* Maps "INT" to {@link Schema.Type#INT}
99+
* Maps "BIGINT" to {@link Schema.Type#LONG}.
100+
* Maps NUMERIC with zero precision to {@link Schema.Type#STRING} and logs a warning.
101+
* Maps "timestamp" to {@link Schema.LogicalType#DATETIME}.
102+
* Delegates to the parent plugin for all other types.
103+
* @param typeName SQL type name (e.g. "INT", "BIGINT", "timestamp")
104+
* @param columnType JDBC type code (see {@link java.sql.Types})
105+
* @param precision column precision (for numeric types)
106+
* @param scale column scale (for numeric types)
107+
* @param columnName column name
108+
* @param isSigned whether the column is signed
109+
* @param handleAsDecimal whether to handle as decimal
110+
* @return the mapped {@link Schema} type
111+
*/
112+
@Override
113+
public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName,
114+
boolean isSigned, boolean handleAsDecimal) {
115+
if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
116+
return Schema.of(Schema.Type.STRING);
117+
}
118+
if ("INT".equalsIgnoreCase(typeName)) {
119+
return Schema.of(Schema.Type.INT);
120+
}
121+
if ("BIGINT".equalsIgnoreCase(typeName)) {
122+
return Schema.of(Schema.Type.LONG);
123+
}
124+
if (Types.NUMERIC == columnType && precision == 0) {
125+
LOG.warn(String.format("Field '%s' is a %s type without precision and scale," +
126+
" converting into STRING type to avoid any precision loss.",
127+
columnName, typeName));
128+
return Schema.of(Schema.Type.STRING);
129+
}
130+
if ("timestamp".equalsIgnoreCase(typeName)) {
131+
return Schema.of(Schema.LogicalType.DATETIME);
132+
}
133+
return super.getSchema(typeName, columnType, precision, scale, columnName, isSigned, handleAsDecimal);
134+
}
117135
}

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717
package io.cdap.plugin.amazon.redshift;
1818

1919
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.base.Strings;
2021
import io.cdap.cdap.api.annotation.Description;
2122
import io.cdap.cdap.api.annotation.Macro;
2223
import io.cdap.cdap.api.annotation.Metadata;
2324
import io.cdap.cdap.api.annotation.MetadataProperty;
2425
import io.cdap.cdap.api.annotation.Name;
2526
import io.cdap.cdap.api.annotation.Plugin;
2627
import io.cdap.cdap.etl.api.FailureCollector;
28+
import io.cdap.cdap.etl.api.PipelineConfigurer;
29+
import io.cdap.cdap.etl.api.StageConfigurer;
2730
import io.cdap.cdap.etl.api.batch.BatchSource;
2831
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
2932
import io.cdap.cdap.etl.api.connector.Connector;
@@ -34,12 +37,17 @@
3437
import io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig;
3538
import io.cdap.plugin.db.source.AbstractDBSource;
3639
import io.cdap.plugin.util.DBUtils;
40+
import io.cdap.plugin.util.ImportQueryType;
3741
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
3842

3943
import java.util.Collections;
4044
import java.util.Map;
4145
import javax.annotation.Nullable;
4246

47+
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.IMPORT_QUERY;
48+
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.PROPERTY_IMPORT_QUERY_TYPE;
49+
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.TABLE_NAME;
50+
4351
/**
4452
* Batch source to read from an Amazon Redshift database.
4553
*/
@@ -59,6 +67,30 @@ public RedshiftSource(RedshiftSourceConfig redshiftSourceConfig) {
5967
this.redshiftSourceConfig = redshiftSourceConfig;
6068
}
6169

70+
@Override
71+
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
72+
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
73+
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
74+
if (sourceConfig.containsMacro(TABLE_NAME) || sourceConfig.containsMacro(IMPORT_QUERY)) {
75+
if (sourceConfig.getSchema() != null) {
76+
stageConfigurer.setOutputSchema(sourceConfig.getSchema());
77+
}
78+
return;
79+
}
80+
validateTableNameAndImportQuery(collector);
81+
super.configurePipeline(pipelineConfigurer);
82+
}
83+
84+
@Override
85+
public void prepareRun(BatchSourceContext context) throws Exception {
86+
FailureCollector collector = context.getFailureCollector();
87+
if (sourceConfig.containsMacro(TABLE_NAME) || sourceConfig.containsMacro(IMPORT_QUERY)) {
88+
return;
89+
}
90+
validateTableNameAndImportQuery(collector);
91+
super.prepareRun(context);
92+
}
93+
6294
@Override
6395
protected SchemaReader getSchemaReader() {
6496
return new RedshiftSchemaReader();

amazon-redshift-plugin/widgets/Redshift-batchsource.json

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,30 @@
108108
{
109109
"label": "SQL Query",
110110
"properties": [
111+
{
112+
"widget-type": "radio-group",
113+
"label": "Import Query Type",
114+
"name": "importQueryType",
115+
"widget-attributes": {
116+
"layout": "inline",
117+
"default": "nativeQuery",
118+
"options": [
119+
{
120+
"id": "nativeQuery",
121+
"label": "Native Query"
122+
},
123+
{
124+
"id": "namedTable",
125+
"label": "Named Table"
126+
}
127+
]
128+
}
129+
},
130+
{
131+
"widget-type": "textbox",
132+
"label": "Table Name",
133+
"name": "tableName"
134+
},
111135
{
112136
"widget-type": "textarea",
113137
"label": "Import Query",
@@ -229,6 +253,30 @@
229253
}
230254
]
231255
},
256+
{
257+
"name": "ImportQuery",
258+
"condition": {
259+
"expression": "importQueryType != 'tableName'"
260+
},
261+
"show": [
262+
{
263+
"type": "property",
264+
"name": "importQuery"
265+
}
266+
]
267+
},
268+
{
269+
"name": "NativeTableName",
270+
"condition": {
271+
"expression": "importQueryType == 'tableName'"
272+
},
273+
"show": [
274+
{
275+
"type": "property",
276+
"name": "tableName"
277+
}
278+
]
279+
}
232280
],
233281
"jump-config": {
234282
"datasets": [

cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,30 @@
127127
{
128128
"label": "CloudSQL Properties",
129129
"properties": [
130+
{
131+
"widget-type": "hidden",
132+
"label": "Import Query Type",
133+
"name": "importQueryType",
134+
"widget-attributes": {
135+
"layout": "inline",
136+
"default": "nativeQuery",
137+
"options": [
138+
{
139+
"id": "nativeQuery",
140+
"label": "Native Query"
141+
},
142+
{
143+
"id": "namedTable",
144+
"label": "Named Table"
145+
}
146+
]
147+
}
148+
},
149+
{
150+
"widget-type": "hidden",
151+
"label": "Table Name",
152+
"name": "tableName"
153+
},
130154
{
131155
"widget-type": "textarea",
132156
"label": "Import Query",

cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,30 @@
127127
{
128128
"label": "CloudSQL Properties",
129129
"properties": [
130+
{
131+
"widget-type": "hidden",
132+
"label": "Import Query Type",
133+
"name": "importQueryType",
134+
"widget-attributes": {
135+
"layout": "inline",
136+
"default": "nativeQuery",
137+
"options": [
138+
{
139+
"id": "nativeQuery",
140+
"label": "Native Query"
141+
},
142+
{
143+
"id": "namedTable",
144+
"label": "Named Table"
145+
}
146+
]
147+
}
148+
},
149+
{
150+
"widget-type": "hidden",
151+
"label": "Table Name",
152+
"name": "tableName"
153+
},
130154
{
131155
"widget-type": "textarea",
132156
"label": "Import Query",

database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import io.cdap.cdap.api.data.schema.Schema;
2121
import io.cdap.plugin.common.db.DBUtils;
2222

23+
import java.sql.Connection;
24+
import java.sql.DatabaseMetaData;
2325
import java.sql.ResultSet;
2426
import java.sql.ResultSetMetaData;
2527
import java.sql.SQLException;
@@ -29,7 +31,6 @@
2931
* Common schema reader for mapping non specific DB types.
3032
*/
3133
public class CommonSchemaReader implements SchemaReader {
32-
3334
@Override
3435
public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLException {
3536
List<Schema.Field> schemaFields = Lists.newArrayList();
@@ -61,4 +62,67 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
6162
public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException {
6263
return false;
6364
}
65+
66+
/**
67+
* Returns the schema fields for the specified table using JDBC metadata.
68+
* Supports schema-qualified table names (e.g. "schema.table").
69+
* Throws SQLException if the table has no columns.
70+
*
71+
* @param connection JDBC connection
72+
* @param tableName table name, optionally schema-qualified
73+
* @return list of schema fields
74+
* @throws SQLException if no columns found or on database error
75+
*/
76+
@Override
77+
public List<Schema.Field> getSchemaFields(Connection connection, String tableName) throws SQLException {
78+
DatabaseMetaData dbMetaData = connection.getMetaData();
79+
String schema = null;
80+
String table = tableName;
81+
// Support schema-qualified table names like "schema.table"
82+
if (tableName != null && tableName.contains(".")) {
83+
String[] parts = tableName.split("\\.", 2);
84+
schema = parts[0];
85+
table = parts[1];
86+
}
87+
try (ResultSet columns = dbMetaData.getColumns(null, schema, table, null)) {
88+
List<Schema.Field> schemaFields = Lists.newArrayList();
89+
while (columns.next()) {
90+
String columnName = columns.getString("COLUMN_NAME");
91+
String typeName = columns.getString("TYPE_NAME");
92+
int columnType = columns.getInt("DATA_TYPE");
93+
int precision = columns.getInt("COLUMN_SIZE");
94+
int scale = columns.getInt("DECIMAL_DIGITS");
95+
int nullable = columns.getInt("NULLABLE");
96+
97+
Schema columnSchema = this.getSchema(typeName, columnType, precision, scale, columnName, true, true);
98+
if (nullable == DatabaseMetaData.columnNullable) {
99+
columnSchema = Schema.nullableOf(columnSchema);
100+
}
101+
Schema.Field field = Schema.Field.of(columnName, columnSchema);
102+
schemaFields.add(field);
103+
}
104+
if (schemaFields.isEmpty()) {
105+
throw new SQLException("No columns found for table: " +
106+
(schema != null ? schema + "." : "") + table);
107+
}
108+
return schemaFields;
109+
}
110+
}
111+
112+
/**
113+
* Returns the CDAP schema for the given SQL column type.
114+
*
115+
* @param typeName SQL type name
116+
* @param columnType JDBC type code
117+
* @param precision Numeric precision
118+
* @param scale Numeric scale
119+
* @param columnName Column name
120+
* @param isSigned Whether the column is signed
121+
* @param handleAsDecimal Whether to treat as decimal
122+
* @return Corresponding {@link Schema}, or null if not implemented
123+
*/
124+
public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName ,
125+
boolean isSigned, boolean handleAsDecimal) {
126+
return DBUtils.getSchema(typeName, columnType, precision, scale, columnName, isSigned, handleAsDecimal);
127+
}
64128
}

0 commit comments

Comments
 (0)