Skip to content

Commit da05351

Browse files
psainicsAbhishekKumar9984
authored andcommitted
adding import query type property to db plugin.which will only reflect in redshift and Postgres plugin.
1 parent e5c5794 commit da05351

File tree

25 files changed

+848
-77
lines changed

25 files changed

+848
-77
lines changed

amazon-redshift-plugin/docs/Redshift-batchsource.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ contain the '$CONDITIONS' string. For example, 'SELECT * FROM table WHERE $CONDI
3131
The '$CONDITIONS' string will be replaced by 'splitBy' field limits specified by the bounding query.
3232
The '$CONDITIONS' string is not required if numSplits is set to one.
3333

34+
**Import Query Type** - Determines how data is extracted—either by using a Table Name or a custom Import Query.
35+
36+
**Table Name**: Extracts data directly from a specified database table.
37+
3438
**Bounding Query:** Bounding Query should return the min and max of the values of the 'splitBy' field.
3539
For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one.
3640

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
111111
}
112112
sourceProperties.put(RedshiftSource.RedshiftSourceConfig.IMPORT_QUERY,
113113
getTableQuery(path.getDatabase(), schema, table));
114+
sourceProperties.put(RedshiftSource.RedshiftSourceConfig.PROPERTY_IMPORT_QUERY_TYPE,
115+
RedshiftSource.RedshiftSourceConfig.IMPORT_QUERY);
114116
sourceProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
115117
}
116118

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.cdap.plugin.db.CommonSchemaReader;
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
25-
2625
import java.sql.ResultSet;
2726
import java.sql.ResultSetMetaData;
2827
import java.sql.SQLException;
@@ -56,34 +55,12 @@ public RedshiftSchemaReader(String sessionID) {
5655
public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLException {
5756
String typeName = metadata.getColumnTypeName(index);
5857
int columnType = metadata.getColumnType(index);
58+
int precision = metadata.getPrecision(index);
59+
String columnName = metadata.getColumnName(index);
60+
int scale = metadata.getScale(index);
61+
boolean isSigned = metadata.isSigned(index);
5962

60-
if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
61-
return Schema.of(Schema.Type.STRING);
62-
}
63-
if (typeName.equalsIgnoreCase("INT")) {
64-
return Schema.of(Schema.Type.INT);
65-
}
66-
if (typeName.equalsIgnoreCase("BIGINT")) {
67-
return Schema.of(Schema.Type.LONG);
68-
}
69-
70-
// If it is a numeric type without precision then use the Schema of String to avoid any precision loss
71-
if (Types.NUMERIC == columnType) {
72-
int precision = metadata.getPrecision(index);
73-
if (precision == 0) {
74-
LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
75-
+ "converting into STRING type to avoid any precision loss.",
76-
metadata.getColumnName(index),
77-
metadata.getColumnTypeName(index)));
78-
return Schema.of(Schema.Type.STRING);
79-
}
80-
}
81-
82-
if (typeName.equalsIgnoreCase("timestamp")) {
83-
return Schema.of(Schema.LogicalType.DATETIME);
84-
}
85-
86-
return super.getSchema(metadata, index);
63+
return getSchema(typeName, columnType, precision, scale, columnName, isSigned, true);
8764
}
8865

8966
@Override
@@ -114,4 +91,45 @@ public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLExcepti
11491
return schemaFields;
11592
}
11693

94+
/**
95+
* Returns the CDAP {@link Schema} for a database column based on JDBC metadata.
96+
* Handles Redshift-specific and common JDBC types:
97+
* Maps Redshift string types to {@link Schema.Type#STRING}
98+
* Maps "INT" to {@link Schema.Type#INT}
99+
* Maps "BIGINT" to {@link Schema.Type#LONG}.
100+
* Maps NUMERIC with zero precision to {@link Schema.Type#STRING} and logs a warning.
101+
* Maps "timestamp" to {@link Schema.LogicalType#DATETIME}.
102+
* Delegates to the parent plugin for all other types.
103+
* @param typeName SQL type name (e.g. "INT", "BIGINT", "timestamp")
104+
* @param columnType JDBC type code (see {@link java.sql.Types})
105+
* @param precision column precision (for numeric types)
106+
* @param scale column scale (for numeric types)
107+
* @param columnName column name
108+
* @param isSigned whether the column is signed
109+
* @param handleAsDecimal whether to handle as decimal
110+
* @return the mapped {@link Schema} type
111+
*/
112+
@Override
113+
public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName,
114+
boolean isSigned, boolean handleAsDecimal) {
115+
if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
116+
return Schema.of(Schema.Type.STRING);
117+
}
118+
if ("INT".equalsIgnoreCase(typeName)) {
119+
return Schema.of(Schema.Type.INT);
120+
}
121+
if ("BIGINT".equalsIgnoreCase(typeName)) {
122+
return Schema.of(Schema.Type.LONG);
123+
}
124+
if (Types.NUMERIC == columnType && precision == 0) {
125+
LOG.warn(String.format("Field '%s' is a %s type without precision and scale," +
126+
" converting into STRING type to avoid any precision loss.",
127+
columnName, typeName));
128+
return Schema.of(Schema.Type.STRING);
129+
}
130+
if ("timestamp".equalsIgnoreCase(typeName)) {
131+
return Schema.of(Schema.LogicalType.DATETIME);
132+
}
133+
return super.getSchema(typeName, columnType, precision, scale, columnName, isSigned, handleAsDecimal);
134+
}
117135
}

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717
package io.cdap.plugin.amazon.redshift;
1818

1919
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.base.Strings;
2021
import io.cdap.cdap.api.annotation.Description;
2122
import io.cdap.cdap.api.annotation.Macro;
2223
import io.cdap.cdap.api.annotation.Metadata;
2324
import io.cdap.cdap.api.annotation.MetadataProperty;
2425
import io.cdap.cdap.api.annotation.Name;
2526
import io.cdap.cdap.api.annotation.Plugin;
2627
import io.cdap.cdap.etl.api.FailureCollector;
28+
import io.cdap.cdap.etl.api.PipelineConfigurer;
29+
import io.cdap.cdap.etl.api.StageConfigurer;
2730
import io.cdap.cdap.etl.api.batch.BatchSource;
2831
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
2932
import io.cdap.cdap.etl.api.connector.Connector;
@@ -34,12 +37,17 @@
3437
import io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig;
3538
import io.cdap.plugin.db.source.AbstractDBSource;
3639
import io.cdap.plugin.util.DBUtils;
40+
import io.cdap.plugin.util.ImportQueryType;
3741
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
3842

3943
import java.util.Collections;
4044
import java.util.Map;
4145
import javax.annotation.Nullable;
4246

47+
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.IMPORT_QUERY;
48+
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.PROPERTY_IMPORT_QUERY_TYPE;
49+
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.TABLE_NAME;
50+
4351
/**
4452
* Batch source to read from an Amazon Redshift database.
4553
*/
@@ -59,6 +67,79 @@ public RedshiftSource(RedshiftSourceConfig redshiftSourceConfig) {
5967
this.redshiftSourceConfig = redshiftSourceConfig;
6068
}
6169

70+
@Override
71+
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
72+
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
73+
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
74+
if (sourceConfig.containsMacro(TABLE_NAME) || sourceConfig.containsMacro(IMPORT_QUERY)) {
75+
if (sourceConfig.getSchema() != null) {
76+
stageConfigurer.setOutputSchema(sourceConfig.getSchema());
77+
}
78+
return;
79+
}
80+
if (!sourceConfig.containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
81+
82+
String importQueryTypeValue = sourceConfig.getImportQueryType();
83+
84+
boolean isImportQuerySelected = ImportQueryType.IMPORT_QUERY.name().equalsIgnoreCase(importQueryTypeValue);
85+
86+
if (isImportQuerySelected && !sourceConfig.containsMacro(IMPORT_QUERY) &&
87+
Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
88+
collector.addFailure("Import Query cannot be empty", null)
89+
.withConfigProperty(IMPORT_QUERY);
90+
91+
} else if (!isImportQuerySelected && !sourceConfig.containsMacro(TABLE_NAME) &&
92+
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
93+
collector.addFailure("Table Name cannot be empty", null)
94+
.withConfigProperty(TABLE_NAME);
95+
}
96+
} else {
97+
if (!sourceConfig.containsMacro(IMPORT_QUERY) &&
98+
Strings.isNullOrEmpty(sourceConfig.getImportQuery()) &&
99+
!sourceConfig.containsMacro(TABLE_NAME) &&
100+
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
101+
collector.addFailure("Either 'Import Query' or 'Table Name' must be provided.", null)
102+
.withConfigProperty(IMPORT_QUERY)
103+
.withConfigProperty(TABLE_NAME);
104+
}
105+
}
106+
collector.getOrThrowException();
107+
super.configurePipeline(pipelineConfigurer);
108+
}
109+
110+
@Override
111+
public void prepareRun(BatchSourceContext context) throws Exception {
112+
FailureCollector collector = context.getFailureCollector();
113+
114+
if (!sourceConfig.containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
115+
String importQueryTypeValue = sourceConfig.getImportQueryType();
116+
117+
boolean isImportQuerySelected = ImportQueryType.IMPORT_QUERY.name().equalsIgnoreCase(importQueryTypeValue);
118+
119+
if (isImportQuerySelected && !sourceConfig.containsMacro(IMPORT_QUERY) &&
120+
Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
121+
collector.addFailure("Import Query cannot be empty", null)
122+
.withConfigProperty(IMPORT_QUERY);
123+
124+
} else if (!isImportQuerySelected && !sourceConfig.containsMacro(TABLE_NAME) &&
125+
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
126+
collector.addFailure("Table Name cannot be empty", null)
127+
.withConfigProperty(TABLE_NAME);
128+
}
129+
} else {
130+
if (!sourceConfig.containsMacro(IMPORT_QUERY) &&
131+
Strings.isNullOrEmpty(sourceConfig.getImportQuery()) &&
132+
!sourceConfig.containsMacro(TABLE_NAME) &&
133+
Strings.isNullOrEmpty(sourceConfig.getTableName())) {
134+
collector.addFailure("Either 'Import Query' or 'Table Name' must be provided.", null)
135+
.withConfigProperty(IMPORT_QUERY)
136+
.withConfigProperty(TABLE_NAME);
137+
}
138+
}
139+
collector.getOrThrowException();
140+
super.prepareRun(context);
141+
}
142+
62143
@Override
63144
protected SchemaReader getSchemaReader() {
64145
return new RedshiftSchemaReader();

amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftFailedConnectionTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ public class RedshiftFailedConnectionTest extends DBSpecificFailedConnectionTest
2828
public void test() throws ClassNotFoundException, IOException {
2929

3030
RedshiftConnector connector = new RedshiftConnector(
31-
new RedshiftConnectorConfig("username", "password", "jdbc", "", "localhost", "db", 5432));
31+
new RedshiftConnectorConfig("username", "password", "jdbc", "", "localhost", "db", 5432));
3232

3333
super.test(JDBC_DRIVER_CLASS_NAME, connector, "Failed to create connection to database via connection string: " +
34-
"jdbc:redshift://localhost:5432/db and arguments: " +
35-
"{user=username}. Error: ConnectException: Connection refused " +
36-
"(Connection refused).");
34+
"jdbc:redshift://localhost:5432/db and arguments: " +
35+
"{user=username}. Error: ConnectException: Connection refused " +
36+
"(Connection refused).");
37+
}
3738
}
38-
}

amazon-redshift-plugin/widgets/Redshift-batchsource.json

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,30 @@
108108
{
109109
"label": "SQL Query",
110110
"properties": [
111+
{
112+
"widget-type": "radio-group",
113+
"label": "Import Query Type",
114+
"name": "importQueryType",
115+
"widget-attributes": {
116+
"layout": "inline",
117+
"default": "importQuery",
118+
"options": [
119+
{
120+
"id": "importQuery",
121+
"label": "Native Query"
122+
},
123+
{
124+
"id": "tableName",
125+
"label": "Named Table"
126+
}
127+
]
128+
}
129+
},
130+
{
131+
"widget-type": "textbox",
132+
"label": "Table Name",
133+
"name": "tableName"
134+
},
111135
{
112136
"widget-type": "textarea",
113137
"label": "Import Query",
@@ -229,6 +253,30 @@
229253
}
230254
]
231255
},
256+
{
257+
"name": "ImportQuery",
258+
"condition": {
259+
"expression": "importQueryType != 'tableName'"
260+
},
261+
"show": [
262+
{
263+
"type": "property",
264+
"name": "importQuery"
265+
}
266+
]
267+
},
268+
{
269+
"name": "NativeTableName",
270+
"condition": {
271+
"expression": "importQueryType == 'tableName'"
272+
},
273+
"show": [
274+
{
275+
"type": "property",
276+
"name": "tableName"
277+
}
278+
]
279+
}
232280
],
233281
"jump-config": {
234282
"datasets": [

cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,30 @@
127127
{
128128
"label": "CloudSQL Properties",
129129
"properties": [
130+
{
131+
"widget-type": "hidden",
132+
"label": "Import Query Type",
133+
"name": "importQueryType",
134+
"widget-attributes": {
135+
"layout": "inline",
136+
"default": "importQuery",
137+
"options": [
138+
{
139+
"id": "importQuery",
140+
"label": "Native Query"
141+
},
142+
{
143+
"id": "tableName",
144+
"label": "Named Table"
145+
}
146+
]
147+
}
148+
},
149+
{
150+
"widget-type": "hidden",
151+
"label": "Table Name",
152+
"name": "tableName"
153+
},
130154
{
131155
"widget-type": "textarea",
132156
"label": "Import Query",

cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,30 @@
127127
{
128128
"label": "CloudSQL Properties",
129129
"properties": [
130+
{
131+
"widget-type": "hidden",
132+
"label": "Import Query Type",
133+
"name": "importQueryType",
134+
"widget-attributes": {
135+
"layout": "inline",
136+
"default": "importQuery",
137+
"options": [
138+
{
139+
"id": "importQuery",
140+
"label": "Native Query"
141+
},
142+
{
143+
"id": "tableName",
144+
"label": "Named Table"
145+
}
146+
]
147+
}
148+
},
149+
{
150+
"widget-type": "hidden",
151+
"label": "Table Name",
152+
"name": "tableName"
153+
},
130154
{
131155
"widget-type": "textarea",
132156
"label": "Import Query",

0 commit comments

Comments
 (0)