Skip to content

Commit a6866e6

Browse files
psainicsAbhishekKumar9984
authored andcommitted
adding import query type property to db plugin.which will only reflect in redshift and Postgres plugin.
1 parent e5c5794 commit a6866e6

File tree

19 files changed

+527
-27
lines changed

19 files changed

+527
-27
lines changed

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
import io.cdap.plugin.db.CommonSchemaReader;
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
25-
25+
import java.sql.Connection;
26+
import java.sql.DatabaseMetaData;
2627
import java.sql.ResultSet;
2728
import java.sql.ResultSetMetaData;
2829
import java.sql.SQLException;
@@ -72,17 +73,16 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
7273
int precision = metadata.getPrecision(index);
7374
if (precision == 0) {
7475
LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
75-
+ "converting into STRING type to avoid any precision loss.",
76-
metadata.getColumnName(index),
77-
metadata.getColumnTypeName(index)));
76+
+ "converting into STRING type to avoid any precision loss.",
77+
metadata.getColumnName(index),
78+
metadata.getColumnTypeName(index)));
7879
return Schema.of(Schema.Type.STRING);
7980
}
8081
}
8182

8283
if (typeName.equalsIgnoreCase("timestamp")) {
8384
return Schema.of(Schema.LogicalType.DATETIME);
8485
}
85-
8686
return super.getSchema(metadata, index);
8787
}
8888

@@ -113,5 +113,4 @@ public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLExcepti
113113
}
114114
return schemaFields;
115115
}
116-
117116
}

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717
package io.cdap.plugin.amazon.redshift;
1818

1919
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.base.Strings;
2021
import io.cdap.cdap.api.annotation.Description;
2122
import io.cdap.cdap.api.annotation.Macro;
2223
import io.cdap.cdap.api.annotation.Metadata;
2324
import io.cdap.cdap.api.annotation.MetadataProperty;
2425
import io.cdap.cdap.api.annotation.Name;
2526
import io.cdap.cdap.api.annotation.Plugin;
2627
import io.cdap.cdap.etl.api.FailureCollector;
28+
import io.cdap.cdap.etl.api.PipelineConfigurer;
2729
import io.cdap.cdap.etl.api.batch.BatchSource;
2830
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
2931
import io.cdap.cdap.etl.api.connector.Connector;
@@ -36,6 +38,9 @@
3638
import io.cdap.plugin.util.DBUtils;
3739
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
3840

41+
import java.sql.Connection;
42+
import java.sql.DatabaseMetaData;
43+
import java.sql.SQLException;
3944
import java.util.Collections;
4045
import java.util.Map;
4146
import javax.annotation.Nullable;
@@ -59,6 +64,20 @@ public RedshiftSource(RedshiftSourceConfig redshiftSourceConfig) {
5964
this.redshiftSourceConfig = redshiftSourceConfig;
6065
}
6166

67+
@Override
68+
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
69+
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
70+
if ((!sourceConfig.containsMacro("tableName") && !sourceConfig.containsMacro("importQuery"))
71+
&& (Strings.isNullOrEmpty(sourceConfig.getTableName()) &&
72+
(Strings.isNullOrEmpty(sourceConfig.getImportQuery())))) {
73+
collector.addFailure(
74+
"Either 'tableName' or 'importQuery' must be specified.",
75+
"Provide a value for either 'tableName' or 'importQuery' in the configuration."
76+
).withConfigProperty(sourceConfig.getTableName()).withConfigProperty(sourceConfig.getImportQuery());
77+
}
78+
super.configurePipeline(pipelineConfigurer);
79+
}
80+
6281
@Override
6382
protected SchemaReader getSchemaReader() {
6483
return new RedshiftSchemaReader();

amazon-redshift-plugin/widgets/Redshift-batchsource.json

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,30 @@
108108
{
109109
"label": "SQL Query",
110110
"properties": [
111+
{
112+
"widget-type": "radio-group",
113+
"label": "Import Query Type",
114+
"name": "importQueryType",
115+
"widget-attributes": {
116+
"layout": "inline",
117+
"default": "importQuery",
118+
"options": [
119+
{
120+
"id": "importQuery",
121+
"label": "Native Query"
122+
},
123+
{
124+
"id": "tableName",
125+
"label": "Named Table"
126+
}
127+
]
128+
}
129+
},
130+
{
131+
"widget-type": "textbox",
132+
"label": "Table Name",
133+
"name": "tableName"
134+
},
111135
{
112136
"widget-type": "textarea",
113137
"label": "Import Query",
@@ -229,6 +253,30 @@
229253
}
230254
]
231255
},
256+
{
257+
"name": "ImportQuery",
258+
"condition": {
259+
"expression": "importQueryType != 'tableName'"
260+
},
261+
"show": [
262+
{
263+
"type": "property",
264+
"name": "importQuery"
265+
}
266+
]
267+
},
268+
{
269+
"name": "NativeTableName",
270+
"condition": {
271+
"expression": "importQueryType == 'tableName'"
272+
},
273+
"show": [
274+
{
275+
"type": "property",
276+
"name": "tableName"
277+
}
278+
]
279+
}
232280
],
233281
"jump-config": {
234282
"datasets": [

cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,30 @@
127127
{
128128
"label": "CloudSQL Properties",
129129
"properties": [
130+
{
131+
"widget-type": "hidden",
132+
"label": "Import Query Type",
133+
"name": "importQueryType",
134+
"widget-attributes": {
135+
"layout": "inline",
136+
"default": "importQuery",
137+
"options": [
138+
{
139+
"id": "importQuery",
140+
"label": "Native Query"
141+
},
142+
{
143+
"id": "tableName",
144+
"label": "Named Table"
145+
}
146+
]
147+
}
148+
},
149+
{
150+
"widget-type": "hidden",
151+
"label": "Table Name",
152+
"name": "tableName"
153+
},
130154
{
131155
"widget-type": "textarea",
132156
"label": "Import Query",

cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,30 @@
127127
{
128128
"label": "CloudSQL Properties",
129129
"properties": [
130+
{
131+
"widget-type": "hidden",
132+
"label": "Import Query Type",
133+
"name": "importQueryType",
134+
"widget-attributes": {
135+
"layout": "inline",
136+
"default": "importQuery",
137+
"options": [
138+
{
139+
"id": "importQuery",
140+
"label": "Native Query"
141+
},
142+
{
143+
"id": "tableName",
144+
"label": "Named Table"
145+
}
146+
]
147+
}
148+
},
149+
{
150+
"widget-type": "hidden",
151+
"label": "Table Name",
152+
"name": "tableName"
153+
},
130154
{
131155
"widget-type": "textarea",
132156
"label": "Import Query",

database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import io.cdap.cdap.api.data.schema.Schema;
2121
import io.cdap.plugin.common.db.DBUtils;
2222

23+
import java.sql.Connection;
24+
import java.sql.DatabaseMetaData;
2325
import java.sql.ResultSet;
2426
import java.sql.ResultSetMetaData;
2527
import java.sql.SQLException;
@@ -29,7 +31,6 @@
2931
* Common schema reader for mapping non specific DB types.
3032
*/
3133
public class CommonSchemaReader implements SchemaReader {
32-
3334
@Override
3435
public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLException {
3536
List<Schema.Field> schemaFields = Lists.newArrayList();
@@ -61,4 +62,50 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
6162
public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException {
6263
return false;
6364
}
65+
66+
/**
67+
* Returns the schema fields for the specified table using JDBC metadata.
68+
* Supports schema-qualified table names (e.g. "schema.table").
69+
* Throws SQLException if the table has no columns.
70+
*
71+
* @param connection JDBC connection
72+
* @param tableName table name, optionally schema-qualified
73+
* @return list of schema fields
74+
* @throws SQLException if no columns found or on database error
75+
*/
76+
@Override
77+
public List<Schema.Field> getSchemaFields(Connection connection, String tableName) throws SQLException {
78+
DatabaseMetaData dbMetaData = connection.getMetaData();
79+
String schema = null;
80+
String table = tableName;
81+
// Support schema-qualified table names like "schema.table"
82+
if (tableName != null && tableName.contains(".")) {
83+
String[] parts = tableName.split("\\.", 2);
84+
schema = parts[0];
85+
table = parts[1];
86+
}
87+
try (ResultSet columns = dbMetaData.getColumns(null, schema, table, null)) {
88+
List<Schema.Field> schemaFields = Lists.newArrayList();
89+
while (columns.next()) {
90+
String columnName = columns.getString("COLUMN_NAME");
91+
String typeName = columns.getString("TYPE_NAME");
92+
int columnType = columns.getInt("DATA_TYPE");
93+
int precision = columns.getInt("COLUMN_SIZE");
94+
int scale = columns.getInt("DECIMAL_DIGITS");
95+
int nullable = columns.getInt("NULLABLE");
96+
97+
Schema columnSchema = DBUtils.getSchema(typeName, columnType, precision, scale, columnName, true, true);
98+
if (nullable == DatabaseMetaData.columnNullable) {
99+
columnSchema = Schema.nullableOf(columnSchema);
100+
}
101+
Schema.Field field = Schema.Field.of(columnName, columnSchema);
102+
schemaFields.add(field);
103+
}
104+
if (schemaFields.isEmpty()) {
105+
throw new SQLException("No columns found for table: " +
106+
(schema != null ? schema + "." : "") + table);
107+
}
108+
return schemaFields;
109+
}
110+
}
64111
}

database-commons/src/main/java/io/cdap/plugin/db/SchemaReader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.cdap.cdap.api.data.schema.Schema;
2020

21+
import java.sql.Connection;
2122
import java.sql.ResultSet;
2223
import java.sql.ResultSetMetaData;
2324
import java.sql.SQLException;
@@ -64,4 +65,6 @@ public interface SchemaReader {
6465
* @throws SQLException
6566
*/
6667
boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws SQLException;
68+
69+
List<Schema.Field> getSchemaFields(Connection connection, String tableName) throws SQLException;
6770
}

database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@
4040
* Abstract Config for DB Specific Source plugin
4141
*/
4242
public abstract class AbstractDBSpecificSourceConfig extends PluginConfig implements DatabaseSourceConfig {
43-
43+
public static final String TABLE_NAME = "tableName";
4444
public static final String IMPORT_QUERY = "importQuery";
45+
public static final String PROPERTY_IMPORT_QUERY_TYPE = "importQueryType";
4546
public static final String BOUNDING_QUERY = "boundingQuery";
4647
public static final String SPLIT_BY = "splitBy";
4748
public static final String NUM_SPLITS = "numSplits";
@@ -54,6 +55,19 @@ public abstract class AbstractDBSpecificSourceConfig extends PluginConfig implem
5455
@Description(Constants.Reference.REFERENCE_NAME_DESCRIPTION)
5556
public String referenceName;
5657

58+
@Name(PROPERTY_IMPORT_QUERY_TYPE)
59+
@Description("Whether to select Table Name or Import Query to extract the data.")
60+
@Macro
61+
@Nullable
62+
public String importQueryType;
63+
64+
@Nullable
65+
@Name(TABLE_NAME)
66+
@Description("The name of the table to import data from. This can be used instead of specifying an import query.")
67+
@Macro
68+
protected String tableName;
69+
70+
@Nullable
5771
@Name(IMPORT_QUERY)
5872
@Description("The SELECT query to use to import data from the specified table. " +
5973
"You can specify an arbitrary number of columns to import, or import all columns using *. " +
@@ -103,10 +117,15 @@ public String getImportQuery() {
103117
return cleanQuery(importQuery);
104118
}
105119

120+
public String getTableName() {
121+
return tableName;
122+
}
123+
106124
public String getBoundingQuery() {
107125
return cleanQuery(boundingQuery);
108126
}
109127

128+
110129
public void validate(FailureCollector collector) {
111130
boolean hasOneSplit = false;
112131
if (!containsMacro(NUM_SPLITS) && numSplits != null) {
@@ -125,16 +144,19 @@ public void validate(FailureCollector collector) {
125144
TransactionIsolationLevel.validate(getTransactionIsolationLevel(), collector);
126145
}
127146

128-
if (!containsMacro(IMPORT_QUERY) && Strings.isNullOrEmpty(importQuery)) {
129-
collector.addFailure("Import Query is empty.", "Specify the Import Query.")
130-
.withConfigProperty(IMPORT_QUERY);
147+
if ((!containsMacro(TABLE_NAME) && !containsMacro(IMPORT_QUERY)) &&
148+
(Strings.isNullOrEmpty(tableName) && Strings.isNullOrEmpty(importQuery))) {
149+
collector.addFailure(" Import Query must be specified.",
150+
" Import Query, Can not be empty.")
151+
.withConfigProperty(IMPORT_QUERY);
131152
}
132-
133-
if (!hasOneSplit && !containsMacro(IMPORT_QUERY) && !getImportQuery().contains("$CONDITIONS")) {
134-
collector.addFailure(String.format(
135-
"Import Query %s must contain the string '$CONDITIONS'. if Number of Splits is not set to 1.", importQuery),
136-
"Include '$CONDITIONS' in the Import Query")
137-
.withConfigProperty(IMPORT_QUERY);
153+
if (!Strings.isNullOrEmpty(importQuery) &&
154+
(!hasOneSplit && !containsMacro(IMPORT_QUERY) && !getImportQuery().contains("$CONDITIONS"))) {
155+
collector.addFailure(String.format(
156+
"Import Query %s must contain the string '$CONDITIONS'. " +
157+
"if Number of Splits is not set to 1.", importQuery),
158+
"Include '$CONDITIONS' in the Import Query")
159+
.withConfigProperty(IMPORT_QUERY);
138160
}
139161

140162
if (!hasOneSplit && !containsMacro(SPLIT_BY) && (splitBy == null || splitBy.isEmpty())) {
@@ -177,8 +199,7 @@ public void validateSchema(Schema actualSchema, FailureCollector collector) {
177199
actualField.getSchema().getNonNullable() : actualField.getSchema();
178200
Schema expectedFieldSchema = field.getSchema().isNullable() ?
179201
field.getSchema().getNonNullable() : field.getSchema();
180-
181-
validateField(collector, field, actualFieldSchema, expectedFieldSchema);
202+
validateField(collector, field, actualFieldSchema, expectedFieldSchema);
182203
}
183204
}
184205

database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseSourceConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,6 @@ public interface DatabaseSourceConfig extends DatabaseConnectionConfig {
9090
* @return the number of rows to fetch at a time per split
9191
*/
9292
Integer getFetchSize();
93+
94+
String getTableName();
9395
}

0 commit comments

Comments
 (0)