diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java index 3d2f7399a..4794be082 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java @@ -112,7 +112,7 @@ protected DBConnectorPath getDBConnectorPath(String path) { @Override protected SchemaReader getSchemaReader(String sessionID) { - return new OracleSourceSchemaReader(sessionID); + return new OracleSourceSchemaReader(sessionID, false); } @Override diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java index 53f75613b..c2ec73ff1 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java @@ -63,7 +63,7 @@ protected String createConnectionString() { @Override protected SchemaReader getSchemaReader() { - return new OracleSourceSchemaReader(); + return new OracleSourceSchemaReader(null, oracleSourceConfig.shouldTreatAsOldTimestamp()); } @Override @@ -101,6 +101,7 @@ public static class OracleSourceConfig extends AbstractDBSpecificSourceConfig { public static final String NAME_CONNECTION = "connection"; public static final String DEFAULT_ROW_PREFETCH_VALUE = "40"; public static final String DEFAULT_BATCH_SIZE = "10"; + public static final String TREAT_AS_OLD_TIMESTAMP = "treatAsOldTimestamp"; @Name(NAME_USE_CONNECTION) @Nullable @@ -123,11 +124,19 @@ public static class OracleSourceConfig extends AbstractDBSpecificSourceConfig { @Nullable private Integer defaultRowPrefetch; + @Name(TREAT_AS_OLD_TIMESTAMP) + @Description("For internal use only. If set to true, DATETIME types will be treated as TIMESTAMP_MICROS to maintain" + + "backward compatibility.") + @Macro + @Nullable + private Boolean treatAsOldTimestamp = false; + + public OracleSourceConfig(String host, int port, String user, String password, String jdbcPluginName, String connectionArguments, String connectionType, String database, String role, int defaultBatchValue, int defaultRowPrefetch, String importQuery, Integer numSplits, int fetchSize, - String boundingQuery, String splitBy, Boolean useSSL) { + String boundingQuery, String splitBy, Boolean useSSL, Boolean treatAsOldTimestamp) { this.connection = new OracleConnectorConfig(host, port, user, password, jdbcPluginName, connectionArguments, connectionType, database, role, useSSL); this.defaultBatchValue = defaultBatchValue; @@ -137,12 +146,14 @@ public OracleSourceConfig(String host, int port, String user, String password, S this.numSplits = numSplits; this.boundingQuery = boundingQuery; this.splitBy = splitBy; + this.treatAsOldTimestamp = treatAsOldTimestamp; } @Override public String getConnectionString() { return OracleConstants.getConnectionString(connection.getConnectionType(), connection.getHost(), - connection.getPort(), connection.getDatabase(), connection.getSSlMode()); + connection.getPort(), connection.getDatabase(), + connection.getSSlMode()); } @Override @@ -163,6 +174,10 @@ public OracleConnectorConfig getConnection() { return connection; } + public boolean shouldTreatAsOldTimestamp() { + return Boolean.TRUE.equals(treatAsOldTimestamp); + } + @Override public void validate(FailureCollector collector) { ConfigUtil.validateConnection(this, useConnection, connection, collector); @@ -183,21 +198,21 @@ protected void validateField(FailureCollector collector, // contain Decimal(38, 0) (or something similar), and the code internally would try to identify // the schema of the field(without precision and scale) as String. if (Schema.LogicalType.DECIMAL.equals(expectedFieldSchema.getLogicalType()) - && actualFieldSchema.getType().equals(Schema.Type.STRING)) { + && actualFieldSchema.getType().equals(Schema.Type.STRING)) { return; } // For handling TimestampTZ types allow if the expected schema is STRING and // actual schema is set to TIMESTAMP type to ensure backward compatibility. if (Schema.LogicalType.TIMESTAMP_MICROS.equals(actualFieldSchema.getLogicalType()) - && Schema.Type.STRING.equals(expectedFieldSchema.getType())) { + && Schema.Type.STRING.equals(expectedFieldSchema.getType())) { return; } // For handling TimestampLTZ and Timestamp types allow if the expected schema is TIMESTAMP and // actual schema is set to DATETIME type to ensure backward compatibility. if (Schema.LogicalType.DATETIME.equals(actualFieldSchema.getLogicalType()) - && Schema.LogicalType.TIMESTAMP_MICROS.equals(expectedFieldSchema.getLogicalType())) { + && Schema.LogicalType.TIMESTAMP_MICROS.equals(expectedFieldSchema.getLogicalType())) { return; } diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java index 7d35f9bc7..163814bee 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java @@ -66,13 +66,16 @@ public class OracleSourceSchemaReader extends CommonSchemaReader { private final String sessionID; + private final boolean treatAsOldTimestamp; + public OracleSourceSchemaReader() { - this(null); + this(null, false); } - public OracleSourceSchemaReader(String sessionID) { + public OracleSourceSchemaReader(String sessionID, boolean treatAsOldTimestamp) { super(); this.sessionID = sessionID; + this.treatAsOldTimestamp = treatAsOldTimestamp; } @Override @@ -83,8 +86,17 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti case TIMESTAMP_TZ: return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); case Types.TIMESTAMP: - case TIMESTAMP_LTZ: return Schema.of(Schema.LogicalType.DATETIME); + case TIMESTAMP_LTZ: + // TIMESTAMP_LTZ (Local timezone timestamp) + // - Legacy behavior used TIMESTAMP_MICROS + // - New behavior uses DATETIME for accurate semantic representation + // Use treatAsOldTimestamp flag to ensure backward compatibility + if (treatAsOldTimestamp) { + return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); + } else { + return Schema.of(Schema.LogicalType.DATETIME); + } case BINARY_FLOAT: return Schema.of(Schema.Type.FLOAT); case BINARY_DOUBLE: @@ -108,10 +120,10 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti if (precision == 0) { // reference : https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT1832 LOG.warn(String.format("Field '%s' is a %s type without precision and scale, " - + "converting into STRING type to avoid any precision loss.", - metadata.getColumnName(index), - metadata.getColumnTypeName(index), - metadata.getColumnName(index))); + + "converting into STRING type to avoid any precision loss.", + metadata.getColumnName(index), + metadata.getColumnTypeName(index), + metadata.getColumnName(index))); return Schema.of(Schema.Type.STRING); } return Schema.decimalOf(precision, scale); diff --git a/oracle-plugin/widgets/Oracle-batchsource.json b/oracle-plugin/widgets/Oracle-batchsource.json index 5eca20cc4..16722f1b2 100644 --- a/oracle-plugin/widgets/Oracle-batchsource.json +++ b/oracle-plugin/widgets/Oracle-batchsource.json @@ -246,6 +246,15 @@ "default": "40", "min": "1" } + }, + { + "widget-type": "hidden", + "label": "Treat As Old Timestamp", + "name": "treatAsOldTimestamp", + "description": "For internal use only. If set to true, DATETIME types will be treated as TIMESTAMP_MICROS to maintain backward compatibility.", + "widget-attributes": { + "default": "false" + } } ] }