Skip to content

Commit bf6a780

Browse files
resolving pr comments and modify common schema , redshift & postgres schemareader.
1 parent a6866e6 commit bf6a780

File tree

11 files changed

+225
-52
lines changed

11 files changed

+225
-52
lines changed

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
111111
}
112112
sourceProperties.put(RedshiftSource.RedshiftSourceConfig.IMPORT_QUERY,
113113
getTableQuery(path.getDatabase(), schema, table));
114+
sourceProperties.put(RedshiftSource.RedshiftSourceConfig.PROPERTY_IMPORT_QUERY_TYPE,
115+
RedshiftSource.RedshiftSourceConfig.IMPORT_QUERY);
114116
sourceProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
115117
}
116118

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSchemaReader.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@
1919
import com.google.common.collect.ImmutableSet;
2020
import com.google.common.collect.Lists;
2121
import io.cdap.cdap.api.data.schema.Schema;
22+
import io.cdap.plugin.common.db.DBUtils;
2223
import io.cdap.plugin.db.CommonSchemaReader;
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
25-
import java.sql.Connection;
26-
import java.sql.DatabaseMetaData;
2726
import java.sql.ResultSet;
2827
import java.sql.ResultSetMetaData;
2928
import java.sql.SQLException;
@@ -113,4 +112,37 @@ public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLExcepti
113112
}
114113
return schemaFields;
115114
}
115+
/**
116+
* Maps database column type information to a corresponding {@link Schema}.
117+
*
118+
* @param typeName the SQL type name
119+
* @param columnType the JDBC type code
120+
* @param precision the column precision
121+
* @param scale the column scale
122+
* @param columnName the column name
123+
* @return the mapped {@link Schema} type
124+
*/
125+
@Override
126+
public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName,
127+
boolean isSigned, boolean handleAsDecimal) {
128+
if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
129+
return Schema.of(Schema.Type.STRING);
130+
}
131+
if ("INT".equalsIgnoreCase(typeName)) {
132+
return Schema.of(Schema.Type.INT);
133+
}
134+
if ("BIGINT".equalsIgnoreCase(typeName)) {
135+
return Schema.of(Schema.Type.LONG);
136+
}
137+
if (Types.NUMERIC == columnType && precision == 0) {
138+
LOG.warn(String.format("Field '%s' is a %s type without precision and scale," +
139+
" converting into STRING type to avoid any precision loss.",
140+
columnName, typeName));
141+
return Schema.of(Schema.Type.STRING);
142+
}
143+
if ("timestamp".equalsIgnoreCase(typeName)) {
144+
return Schema.of(Schema.LogicalType.DATETIME);
145+
}
146+
return DBUtils.getSchema(typeName, columnType, precision, scale, columnName, true, true);
147+
}
116148
}

amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftSource.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.cdap.cdap.api.annotation.Plugin;
2727
import io.cdap.cdap.etl.api.FailureCollector;
2828
import io.cdap.cdap.etl.api.PipelineConfigurer;
29+
import io.cdap.cdap.etl.api.StageConfigurer;
2930
import io.cdap.cdap.etl.api.batch.BatchSource;
3031
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
3132
import io.cdap.cdap.etl.api.connector.Connector;
@@ -38,13 +39,13 @@
3839
import io.cdap.plugin.util.DBUtils;
3940
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
4041

41-
import java.sql.Connection;
42-
import java.sql.DatabaseMetaData;
43-
import java.sql.SQLException;
4442
import java.util.Collections;
4543
import java.util.Map;
4644
import javax.annotation.Nullable;
4745

46+
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.IMPORT_QUERY;
47+
import static io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.TABLE_NAME;
48+
4849
/**
4950
* Batch source to read from an Amazon Redshift database.
5051
*/
@@ -67,7 +68,14 @@ public RedshiftSource(RedshiftSourceConfig redshiftSourceConfig) {
6768
@Override
6869
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
6970
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
70-
if ((!sourceConfig.containsMacro("tableName") && !sourceConfig.containsMacro("importQuery"))
71+
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
72+
if (sourceConfig.containsMacro(TABLE_NAME) || sourceConfig.containsMacro(IMPORT_QUERY)) {
73+
if (sourceConfig.getSchema() != null) {
74+
stageConfigurer.setOutputSchema(sourceConfig.getSchema());
75+
}
76+
return;
77+
}
78+
if ((!sourceConfig.containsMacro(IMPORT_QUERY) && !sourceConfig.containsMacro(TABLE_NAME))
7179
&& (Strings.isNullOrEmpty(sourceConfig.getTableName()) &&
7280
(Strings.isNullOrEmpty(sourceConfig.getImportQuery())))) {
7381
collector.addFailure(
@@ -78,6 +86,24 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
7886
super.configurePipeline(pipelineConfigurer);
7987
}
8088

89+
@Override
90+
public void prepareRun(BatchSourceContext context) throws Exception {
91+
FailureCollector collector = context.getFailureCollector();
92+
93+
if (!sourceConfig.containsMacro(sourceConfig.getImportQuery()) &&
94+
!sourceConfig.containsMacro(sourceConfig.getTableName()) &&
95+
Strings.isNullOrEmpty(sourceConfig.getTableName()) &&
96+
Strings.isNullOrEmpty(sourceConfig.getImportQuery())) {
97+
collector.addFailure(
98+
"Either 'tableName' or 'importQuery' must be specified.",
99+
"Provide a value for either 'tableName' or 'importQuery' in the configuration."
100+
).withConfigProperty("tableName")
101+
.withConfigProperty("importQuery");
102+
}
103+
super.prepareRun(context);
104+
collector.getOrThrowException();
105+
}
106+
81107
@Override
82108
protected SchemaReader getSchemaReader() {
83109
return new RedshiftSchemaReader();

amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftFailedConnectionTest.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.junit.Test;
2121

2222
import java.io.IOException;
23+
import static org.junit.Assert.assertTrue;
2324

2425
public class RedshiftFailedConnectionTest extends DBSpecificFailedConnectionTest {
2526
private static final String JDBC_DRIVER_CLASS_NAME = "com.amazon.redshift.Driver";
@@ -28,11 +29,23 @@ public class RedshiftFailedConnectionTest extends DBSpecificFailedConnectionTest
2829
public void test() throws ClassNotFoundException, IOException {
2930

3031
RedshiftConnector connector = new RedshiftConnector(
31-
new RedshiftConnectorConfig("username", "password", "jdbc", "", "localhost", "db", 5432));
32+
new RedshiftConnectorConfig("username", "password", "jdbc", "",
33+
"localhost", "db", 5432));
3234

33-
super.test(JDBC_DRIVER_CLASS_NAME, connector, "Failed to create connection to database via connection string: " +
34-
"jdbc:redshift://localhost:5432/db and arguments: " +
35-
"{user=username}. Error: ConnectException: Connection refused " +
36-
"(Connection refused).");
35+
String expectedPrefix = "Failed to create connection to database via connection string: " +
36+
"jdbc:redshift://localhost:5432/db and arguments: {user=username}. Error:";
37+
try {
38+
super.test(JDBC_DRIVER_CLASS_NAME, connector, expectedPrefix + " ConnectException: Connection " +
39+
"refused (Connection refused).");
40+
} catch (AssertionError e) {
41+
// Accept either ConnectException or SunCertPathBuilderException
42+
String message = e.getMessage();
43+
assertTrue(
44+
"Expected either ConnectException or SunCertPathBuilderException, but got: " + message,
45+
message.contains("ConnectException: Connection refused") ||
46+
message.contains("SunCertPathBuilderException: unable to find valid certification " +
47+
"path to requested target")
48+
);
49+
}
3750
}
3851
}

database-commons/src/main/java/io/cdap/plugin/db/CommonSchemaReader.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public List<Schema.Field> getSchemaFields(Connection connection, String tableNam
9494
int scale = columns.getInt("DECIMAL_DIGITS");
9595
int nullable = columns.getInt("NULLABLE");
9696

97-
Schema columnSchema = DBUtils.getSchema(typeName, columnType, precision, scale, columnName, true, true);
97+
Schema columnSchema = this.getSchema(typeName, columnType, precision, scale, columnName, true, true);
9898
if (nullable == DatabaseMetaData.columnNullable) {
9999
columnSchema = Schema.nullableOf(columnSchema);
100100
}
@@ -108,4 +108,21 @@ public List<Schema.Field> getSchemaFields(Connection connection, String tableNam
108108
return schemaFields;
109109
}
110110
}
111+
112+
/**
113+
* Returns the CDAP schema for the given SQL column type.
114+
*
115+
* @param typeName SQL type name
116+
* @param columnType JDBC type code
117+
* @param precision Numeric precision
118+
* @param scale Numeric scale
119+
* @param columnName Column name
120+
* @param isSigned Whether the column is signed
121+
* @param handleAsDecimal Whether to treat as decimal
122+
* @return Corresponding {@link Schema}, or null if not implemented
123+
*/
124+
public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName ,
125+
boolean isSigned, boolean handleAsDecimal) {
126+
return null;
127+
}
111128
}

database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,6 @@ public abstract class AbstractDBSpecificSourceConfig extends PluginConfig implem
5757

5858
@Name(PROPERTY_IMPORT_QUERY_TYPE)
5959
@Description("Whether to select Table Name or Import Query to extract the data.")
60-
@Macro
61-
@Nullable
6260
public String importQueryType;
6361

6462
@Nullable

database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,12 @@
6161

6262
import java.io.IOException;
6363
import java.sql.Connection;
64-
import java.sql.DatabaseMetaData;
6564
import java.sql.Driver;
6665
import java.sql.DriverManager;
6766
import java.sql.ResultSet;
6867
import java.sql.SQLException;
6968
import java.sql.Statement;
7069
import java.sql.Types;
71-
import java.util.ArrayList;
7270
import java.util.List;
7371
import java.util.Properties;
7472
import java.util.regex.Pattern;
@@ -178,32 +176,7 @@ public Schema getSchema() throws SQLException {
178176
}
179177

180178
private Schema loadSchemaFromDBwithTableName(Connection connection, String tableName) throws SQLException {
181-
DatabaseMetaData metaData = connection.getMetaData();
182-
183-
String schema = null;
184-
String table = tableName;
185-
if (tableName.contains(".")) {
186-
String[] parts = tableName.split("\\.", 2);
187-
schema = parts[0];
188-
table = parts[1];
189-
}
190-
191-
ResultSet columns = metaData.getColumns(null, schema, table, null);
192-
193-
List<Schema.Field> fields = new ArrayList<>();
194-
while (columns.next()) {
195-
String columnName = columns.getString("COLUMN_NAME");
196-
int dataType = columns.getInt("DATA_TYPE");
197-
Schema.Type schemaType = mapSqlTypeToSchemaType(dataType);
198-
fields.add(Schema.Field.of(columnName, Schema.of(schemaType)));
199-
}
200-
columns.close();
201-
202-
if (fields.isEmpty()) {
203-
throw new SQLException("No columns found for table: " +
204-
(schema != null ? schema + "." : "") + table);
205-
}
206-
return Schema.recordOf("schema", fields);
179+
return Schema.recordOf("schema", getSchemaReader().getSchemaFields(connection, sourceConfig.getTableName()));
207180
}
208181

209182

database-commons/src/test/java/io/cdap/plugin/db/CommonSchemaReaderTest.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,22 @@ public class CommonSchemaReaderTest {
5656

5757
@Before
5858
public void before() {
59-
reader = new CommonSchemaReader();
59+
reader = new CommonSchemaReader() {
60+
@Override
61+
public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName,
62+
boolean isSigned, boolean handleAsDecimal) {
63+
if ("INTEGER".equalsIgnoreCase(typeName) || columnType == Types.INTEGER) {
64+
return Schema.of(Schema.Type.INT);
65+
}
66+
if ("VARCHAR".equalsIgnoreCase(typeName) || columnType == Types.VARCHAR) {
67+
return Schema.of(Schema.Type.STRING);
68+
}
69+
if ("BIGINT".equalsIgnoreCase(typeName) || columnType == Types.BIGINT) {
70+
return Schema.of(Schema.Type.LONG);
71+
}
72+
return Schema.of(Schema.Type.STRING);
73+
}
74+
};
6075
}
6176

6277
/**

postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresSchemaReader.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.common.collect.ImmutableSet;
2020
import io.cdap.cdap.api.data.schema.Schema;
21+
import io.cdap.plugin.common.db.DBUtils;
2122
import io.cdap.plugin.db.CommonSchemaReader;
2223
import org.slf4j.Logger;
2324
import org.slf4j.LoggerFactory;
@@ -87,4 +88,53 @@ public boolean shouldIgnoreColumn(ResultSetMetaData metadata, int index) throws
8788
return metadata.getColumnName(index).equals("c_" + sessionID) ||
8889
metadata.getColumnName(index).equals("sqn_" + sessionID);
8990
}
91+
92+
/**
93+
* Returns the CDAP schema for a PostgreSQL column, handling special cases for certain types.
94+
* Maps PostgreSQL-specific types (like bit, timetz, money, arrays) to STRING, handles
95+
* INT and BIGINT directly, and maps numeric/decimal types with zero precision to STRING
96+
* to avoid precision loss. Timestamps are mapped to DATETIME. Falls back to DBUtils for others.
97+
*
98+
* @param typeName SQL type name (e.g., "INT", "NUMERIC")
99+
* @param columnType JDBC type constant
100+
* @param precision Numeric precision
101+
* @param scale Numeric scale
102+
* @param columnName Column name (for logging)
103+
* @param isSigned Whether the column is signed
104+
* @param handleAsDecimal Whether to treat as decimal
105+
* @return Corresponding CDAP {@link Schema}
106+
*/
107+
@Override
108+
public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName,
109+
boolean isSigned, boolean handleAsDecimal) {
110+
if (STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(typeName) || STRING_MAPPED_POSTGRES_TYPES.contains(columnType)) {
111+
return Schema.of(Schema.Type.STRING);
112+
}
113+
if (typeName.equalsIgnoreCase("INT")) {
114+
return Schema.of(Schema.Type.INT);
115+
}
116+
if (typeName.equalsIgnoreCase("BIGINT")) {
117+
return Schema.of(Schema.Type.LONG);
118+
}
119+
120+
// If it is a numeric type without precision then use the Schema of String to avoid any precision loss
121+
if (Types.NUMERIC == columnType ||
122+
"numeric".equalsIgnoreCase(typeName) ||
123+
"decimal".equalsIgnoreCase(typeName)) {
124+
125+
if (precision == 0) {
126+
LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
127+
+ "converting into STRING type to avoid any precision loss.",
128+
columnName, typeName));
129+
return Schema.of(Schema.Type.STRING);
130+
}
131+
return Schema.decimalOf(precision, scale);
132+
}
133+
134+
if ("timestamp".equalsIgnoreCase(typeName)) {
135+
return Schema.of(Schema.LogicalType.DATETIME);
136+
}
137+
138+
return DBUtils.getSchema(typeName, columnType, precision, scale, columnName, isSigned, handleAsDecimal);
139+
}
90140
}

0 commit comments

Comments
 (0)