|
49 | 49 | import io.cdap.plugin.db.DBRecord;
|
50 | 50 | import io.cdap.plugin.db.SchemaReader;
|
51 | 51 | import io.cdap.plugin.db.TransactionIsolationLevel;
|
| 52 | +import io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig; |
52 | 53 | import io.cdap.plugin.db.config.DatabaseSourceConfig;
|
53 | 54 | import io.cdap.plugin.util.DBUtils;
|
54 | 55 | import io.cdap.plugin.util.DriverCleanup;
|
@@ -164,9 +165,9 @@ public Schema getSchema() throws SQLException {
|
164 | 165 | try (Connection connection = getConnection()) {
|
165 | 166 | executeInitQueries(connection, sourceConfig.getInitQueries());
|
166 | 167 | String query = sourceConfig.getImportQuery();
|
167 |
| - if ("importQuery".equalsIgnoreCase(sourceConfig.getImportQueryType())) { |
| 168 | + if (AbstractDBSpecificSourceConfig.IMPORT_QUERY.equalsIgnoreCase(sourceConfig.getImportQueryType())) { |
168 | 169 | return loadSchemaFromDBwithQuery(connection, query);
|
169 |
| - } else if ("tableName".equalsIgnoreCase(sourceConfig.getImportQueryType())) { |
| 170 | + } else if (AbstractDBSpecificSourceConfig.TABLE_NAME.equalsIgnoreCase(sourceConfig.getImportQueryType())) { |
170 | 171 | List<Schema.Field> fields = getSchemaReader().getSchemaFields(connection, sourceConfig.getTableName());
|
171 | 172 | return Schema.recordOf("schema", fields);
|
172 | 173 | } else {
|
@@ -210,7 +211,7 @@ private Schema loadSchemaFromDB(Class<? extends Driver> driverClass)
|
210 | 211 | executeInitQueries(connection, sourceConfig.getInitQueries());
|
211 | 212 | String importQuery = sourceConfig.getImportQuery();
|
212 | 213 | String tableName = sourceConfig.getTableName();
|
213 |
| - if ("importQuery".equalsIgnoreCase(sourceConfig.getImportQueryType())) { |
| 214 | + if (AbstractDBSpecificSourceConfig.IMPORT_QUERY.equalsIgnoreCase(sourceConfig.getImportQueryType())) { |
214 | 215 | return loadSchemaFromDBwithQuery(connection, importQuery);
|
215 | 216 | } else {
|
216 | 217 | return loadSchemaFromDBwithTableName(connection, tableName);
|
@@ -429,25 +430,6 @@ private String getJDBCPluginId() {
|
429 | 430 | return String.format("%s.%s.%s", "source", ConnectionConfig.JDBC_PLUGIN_TYPE, sourceConfig.getJdbcPluginName());
|
430 | 431 | }
|
431 | 432 |
|
432 |
| - private Schema.Type mapSqlTypeToSchemaType(int sqlType) { |
433 |
| - switch (sqlType) { |
434 |
| - case Types.INTEGER: return Schema.Type.INT; |
435 |
| - case Types.BIGINT: return Schema.Type.LONG; |
436 |
| - case Types.FLOAT: |
437 |
| - case Types.REAL: |
438 |
| - case Types.DOUBLE: return Schema.Type.DOUBLE; |
439 |
| - case Types.VARCHAR: |
440 |
| - case Types.CHAR: |
441 |
| - case Types.LONGVARCHAR: return Schema.Type.STRING; |
442 |
| - case Types.BOOLEAN: |
443 |
| - case Types.BIT: return Schema.Type.BOOLEAN; |
444 |
| - case Types.DATE: |
445 |
| - case Types.TIMESTAMP: |
446 |
| - default: return Schema.Type.STRING; |
447 |
| - } |
448 |
| - } |
449 |
| - |
450 |
| - |
451 | 433 | protected abstract String createConnectionString();
|
452 | 434 |
|
453 | 435 | /**
|
|
0 commit comments