Skip to content

Timestamp changes #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected DBConnectorPath getDBConnectorPath(String path) {

@Override
protected SchemaReader getSchemaReader(String sessionID) {
return new OracleSourceSchemaReader(sessionID);
return new OracleSourceSchemaReader(sessionID, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected String createConnectionString() {

@Override
protected SchemaReader getSchemaReader() {
return new OracleSourceSchemaReader();
return new OracleSourceSchemaReader(null, oracleSourceConfig.shouldTreatAsOldTimestamp());
}

@Override
Expand Down Expand Up @@ -101,6 +101,7 @@ public static class OracleSourceConfig extends AbstractDBSpecificSourceConfig {
public static final String NAME_CONNECTION = "connection";
public static final String DEFAULT_ROW_PREFETCH_VALUE = "40";
public static final String DEFAULT_BATCH_SIZE = "10";
public static final String TREAT_AS_OLD_TIMESTAMP = "treatAsOldTimestamp";

@Name(NAME_USE_CONNECTION)
@Nullable
Expand All @@ -123,11 +124,19 @@ public static class OracleSourceConfig extends AbstractDBSpecificSourceConfig {
@Nullable
private Integer defaultRowPrefetch;

@Name(TREAT_AS_OLD_TIMESTAMP)
@Description("For internal use only. If set to true, DATETIME types will be treated as TIMESTAMP_MICROS to maintain"
+ "backward compatibility.")
@Macro
@Nullable
private Boolean treatAsOldTimestamp = false;


public OracleSourceConfig(String host, int port, String user, String password, String jdbcPluginName,
String connectionArguments, String connectionType, String database, String role,
int defaultBatchValue, int defaultRowPrefetch,
String importQuery, Integer numSplits, int fetchSize,
String boundingQuery, String splitBy, Boolean useSSL) {
String boundingQuery, String splitBy, Boolean useSSL, Boolean treatAsOldTimestamp) {
this.connection = new OracleConnectorConfig(host, port, user, password, jdbcPluginName, connectionArguments,
connectionType, database, role, useSSL);
this.defaultBatchValue = defaultBatchValue;
Expand All @@ -137,12 +146,14 @@ public OracleSourceConfig(String host, int port, String user, String password, S
this.numSplits = numSplits;
this.boundingQuery = boundingQuery;
this.splitBy = splitBy;
this.treatAsOldTimestamp = treatAsOldTimestamp;
}

@Override
public String getConnectionString() {
return OracleConstants.getConnectionString(connection.getConnectionType(), connection.getHost(),
connection.getPort(), connection.getDatabase(), connection.getSSlMode());
connection.getPort(), connection.getDatabase(),
connection.getSSlMode());
}

@Override
Expand All @@ -163,6 +174,10 @@ public OracleConnectorConfig getConnection() {
return connection;
}

public boolean shouldTreatAsOldTimestamp() {
return Boolean.TRUE.equals(treatAsOldTimestamp);
}

@Override
public void validate(FailureCollector collector) {
ConfigUtil.validateConnection(this, useConnection, connection, collector);
Expand All @@ -183,21 +198,21 @@ protected void validateField(FailureCollector collector,
// contain Decimal(38, 0) (or something similar), and the code internally would try to identify
// the schema of the field(without precision and scale) as String.
if (Schema.LogicalType.DECIMAL.equals(expectedFieldSchema.getLogicalType())
&& actualFieldSchema.getType().equals(Schema.Type.STRING)) {
&& actualFieldSchema.getType().equals(Schema.Type.STRING)) {
return;
}

// For handling TimestampTZ types allow if the expected schema is STRING and
// actual schema is set to TIMESTAMP type to ensure backward compatibility.
if (Schema.LogicalType.TIMESTAMP_MICROS.equals(actualFieldSchema.getLogicalType())
&& Schema.Type.STRING.equals(expectedFieldSchema.getType())) {
&& Schema.Type.STRING.equals(expectedFieldSchema.getType())) {
return;
}

// For handling TimestampLTZ and Timestamp types allow if the expected schema is TIMESTAMP and
// actual schema is set to DATETIME type to ensure backward compatibility.
if (Schema.LogicalType.DATETIME.equals(actualFieldSchema.getLogicalType())
&& Schema.LogicalType.TIMESTAMP_MICROS.equals(expectedFieldSchema.getLogicalType())) {
&& Schema.LogicalType.TIMESTAMP_MICROS.equals(expectedFieldSchema.getLogicalType())) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,16 @@ public class OracleSourceSchemaReader extends CommonSchemaReader {

private final String sessionID;

private final boolean treatAsOldTimestamp;

public OracleSourceSchemaReader() {
this(null);
this(null, false);
}

public OracleSourceSchemaReader(String sessionID) {
public OracleSourceSchemaReader(String sessionID, boolean treatAsOldTimestamp) {
super();
this.sessionID = sessionID;
this.treatAsOldTimestamp = treatAsOldTimestamp;
}

@Override
Expand All @@ -83,8 +86,17 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
case TIMESTAMP_TZ:
return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
case Types.TIMESTAMP:
case TIMESTAMP_LTZ:
return Schema.of(Schema.LogicalType.DATETIME);
case TIMESTAMP_LTZ:
// TIMESTAMP_LTZ (Local timezone timestamp)
// - Legacy behavior used TIMESTAMP_MICROS
// - New behavior uses DATETIME for accurate semantic representation
// Use treatAsOldTimestamp flag to ensure backward compatibility
if (treatAsOldTimestamp) {
return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
} else {
return Schema.of(Schema.LogicalType.DATETIME);
}
case BINARY_FLOAT:
return Schema.of(Schema.Type.FLOAT);
case BINARY_DOUBLE:
Expand All @@ -108,10 +120,10 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
if (precision == 0) {
// reference : https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT1832
LOG.warn(String.format("Field '%s' is a %s type without precision and scale, "
+ "converting into STRING type to avoid any precision loss.",
metadata.getColumnName(index),
metadata.getColumnTypeName(index),
metadata.getColumnName(index)));
+ "converting into STRING type to avoid any precision loss.",
metadata.getColumnName(index),
metadata.getColumnTypeName(index),
metadata.getColumnName(index)));
return Schema.of(Schema.Type.STRING);
}
return Schema.decimalOf(precision, scale);
Expand Down
9 changes: 9 additions & 0 deletions oracle-plugin/widgets/Oracle-batchsource.json
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,15 @@
"default": "40",
"min": "1"
}
},
{
"widget-type": "hidden",
"label": "Treat As Old Timestamp",
"name": "treatAsOldTimestamp",
"description": "For internal use only. If set to true, DATETIME types will be treated as TIMESTAMP_MICROS to maintain backward compatibility.",
"widget-attributes": {
"default": "false"
}
}
]
}
Expand Down