Skip to content

Patch/abhi tablename : Adding import query type property and modify tableName property for redshift and postgres plugin #607

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from

Conversation

AbhishekKumar9984
Copy link

Adding import query type property and modify tableName property for redshift and postgres plugin

/**
* Override: Fetches schema fields for a specific table using database metadata.
*/
@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is there in parent class, you can use that one, instead you can call getSchema method of this class inside

Copy link
Contributor

@vikasrathee-cs vikasrathee-cs Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no update on this, also javadoc not updated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update javadoc

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
if (!sourceConfig.containsMacro("tableName") && !sourceConfig.containsMacro("importQuery")) {
if ((sourceConfig.getTableName() == null || sourceConfig.getTableName().isEmpty())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Strings method for null empty check

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -88,6 +107,11 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
return new LineageRecorder(context, assetBuilder.build());
}

public DatabaseMetaData getDatabaseMetadata(Connection connection) throws SQLException {
return (DatabaseMetaData) connection.getMetaData().getColumns(null,
null, redshiftSourceConfig.getTableName(), null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema is not handled here, if customer is passing "schema.tablename", it will not work

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now this method is not used so no need to handle we can remove this method .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not getting used then remove it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@@ -251,6 +275,30 @@
"name": "port"
}
]
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to add filters as it is hidden

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keeping it so that if further we have to make changes for this we can just changed the hidden property.
but if not required , will removed..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove for now, will add later if required

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

if (!Strings.isNullOrEmpty(importQuery)) {
return loadSchemaFromDB(connection, importQuery);
} else {
String query = String.format("SELECT * FROM %s LIMIT 1", tableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have fetch schema from database metadata in case of tableName, this should not behave like this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no update on this

@@ -73,7 +72,7 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
return Schema.of(Schema.Type.STRING);
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not removed properly, this class has no changes it should not be there in commit files

Copy link
Author

@AbhishekKumar9984 AbhishekKumar9984 Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
if (!sourceConfig.containsMacro("tableName") && !sourceConfig.containsMacro("importQuery")) {
if ((Strings.isNullOrEmpty(sourceConfig.getTableName()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge these two conditions

if (!containsMacro(IMPORT_QUERY) && Strings.isNullOrEmpty(importQuery)) {
collector.addFailure("Import Query is empty.", "Specify the Import Query.")
.withConfigProperty(IMPORT_QUERY);
if (!containsMacro(TABLE_NAME) && !containsMacro(IMPORT_QUERY)) {
Copy link
Contributor

@vikasrathee-cs vikasrathee-cs Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge if conditions

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merged

"Import Query %s must contain the string '$CONDITIONS'. if Number of Splits is not set to 1.", importQuery),
"Include '$CONDITIONS' in the Import Query")
.withConfigProperty(IMPORT_QUERY);
if (!Strings.isNullOrEmpty(importQuery)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge if conditions

{
"name": "ImportQuery",
"condition": {
"expression": "importQueryType == 'importQuery'"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not done as per redshift

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

@AbhishekKumar9984 AbhishekKumar9984 force-pushed the patch/abhi-tablename branch 2 times, most recently from c91b373 to bf6a780 Compare June 23, 2025 07:11
if ("timestamp".equalsIgnoreCase(typeName)) {
return Schema.of(Schema.LogicalType.DATETIME);
}
return DBUtils.getSchema(typeName, columnType, precision, scale, columnName, true, true);
Copy link
Contributor

@vikasrathee-cs vikasrathee-cs Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last two values shouldn't be true by default, get it from metadata

// Accept either ConnectException or SunCertPathBuilderException
String message = e.getMessage();
assertTrue(
"Expected either ConnectException or SunCertPathBuilderException, but got: " + message,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when this exception will be thrown

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AbhishekKumar9984 Any update on this

*/
public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName ,
boolean isSigned, boolean handleAsDecimal) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return DBUtils method from here

@@ -406,6 +429,25 @@ private String getJDBCPluginId() {
return String.format("%s.%s.%s", "source", ConnectionConfig.JDBC_PLUGIN_TYPE, sourceConfig.getJdbcPluginName());
}

private Schema.Type mapSqlTypeToSchemaType(int sqlType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this method is added?

@Override
public Schema getSchema(String typeName, int columnType, int precision, int scale, String columnName,
boolean isSigned, boolean handleAsDecimal) {
if (STRING_MAPPED_REDSHIFT_TYPES_NAMES.contains(typeName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is getting repeated from public Schema getSchema(ResultSetMetaData metadata, int index) method, extract that common code, do the same in postgres

@AbhishekKumar9984 AbhishekKumar9984 force-pushed the patch/abhi-tablename branch 2 times, most recently from e627cb7 to 467a8cd Compare June 25, 2025 09:24
@@ -31,6 +31,10 @@ contain the '$CONDITIONS' string. For example, 'SELECT * FROM table WHERE $CONDI
The '$CONDITIONS' string will be replaced by 'splitBy' field limits specified by the bounding query.
The '$CONDITIONS' string is not required if numSplits is set to one.

**ImportQueryType** - determines how data is extracted—either by using a Table Name or a custom Import Query.
Copy link
Contributor

@vikasrathee-cs vikasrathee-cs Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add spaces in names, It should be the label name, refer snowflake PR for this

return;
}
if (!sourceConfig.containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
boolean isImportQuerySelected = ImportQueryType.IMPORT_QUERY.getValue().equals(sourceConfig.importQueryType);
Copy link
Contributor

@vikasrathee-cs vikasrathee-cs Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be a direct comparison as in case of null it should return import query. Use its getter method. No need for two variables, for one you can use negation.

@AbhishekKumar9984 AbhishekKumar9984 force-pushed the patch/abhi-tablename branch 3 times, most recently from 7675916 to 27bbd79 Compare June 27, 2025 09:45

@Override
public void prepareRun(BatchSourceContext context) throws Exception {
FailureCollector collector = context.getFailureCollector();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this common logic of configurePipeline and prepareRun to its parent class as a separate method and call the same from postgres also

if (!containsMacro(PROPERTY_IMPORT_QUERY_TYPE)) {
ImportQueryType importQueryType = ImportQueryType.fromString(getImportQueryType());

boolean isImportQuerySelected = importQueryType == ImportQueryType.IMPORT_QUERY;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These conditions are already tested in redshift and postgres, here add just a single condition if both values are null or empty or macro. keep just the one on line 166

@@ -176,7 +217,7 @@ public void validateSchema(Schema actualSchema, FailureCollector collector) {
Schema actualFieldSchema = actualField.getSchema().isNullable() ?
actualField.getSchema().getNonNullable() : actualField.getSchema();
Schema expectedFieldSchema = field.getSchema().isNullable() ?
field.getSchema().getNonNullable() : field.getSchema();
field.getSchema().getNonNullable() : field.getSchema();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert unintended changes from whole PR

if (STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(typeName) || STRING_MAPPED_POSTGRES_TYPES.contains(columnType)) {
return Schema.of(Schema.Type.STRING);
}
if (typeName.equalsIgnoreCase("INT")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not there in postgres from where you added this and numeric changes

if (Types.NUMERIC == columnType) {
int precision = metadata.getPrecision(index);
if (Types.NUMERIC == columnType ||
"numeric".equalsIgnoreCase(typeName) ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from where this is added?

"jdbc:postgresql://localhost:5432/null and arguments: " +
"{user=username}. Error: ConnectException: Connection refused " +
"(Connection refused).");
"jdbc:postgresql://localhost:5432/null and arguments: " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unintended changes

"default": "importQuery",
"options": [
{
"id": "importQuery",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be the reason e2e test cases are failing. Change name of id to nativeQuery and namedTable in all widget jsons and change name in enum also.
Text field with name tableName can be there as it is.

@AbhishekKumar9984 AbhishekKumar9984 force-pushed the patch/abhi-tablename branch 3 times, most recently from b83def1 to 5719a06 Compare July 2, 2025 10:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants