Skip to content

Commit 70ee2bc

Browse files
bq validation
1 parent 9fcc646 commit 70ee2bc

File tree

7 files changed

+169
-7
lines changed

7 files changed

+169
-7
lines changed

cloudsql-mysql-plugin/src/e2e-test/features/source/CloudMySqlRunTime.feature

+5-3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ Feature: CloudMySql Source - Run time scenarios
1717

1818
Feature: CloudMySql - Verify data transfer from CloudMySql source to BigQuery sink
1919

20-
@CLOUDMYSQL_SOURCE_TEST @BQ_SINK_TEST
20+
@CLOUDMYSQL_SOURCE_TEST
2121
Scenario: To verify data is getting transferred from CloudMySql source to BigQuery sink successfully
2222
Given Open Datafusion Project to configure pipeline
2323
When Expand Plugin group in the LHS plugins list: "Source"
@@ -29,8 +29,10 @@ Feature: CloudMySql - Verify data transfer from CloudMySql source to BigQuery si
2929
Then Select dropdown plugin property: "select-jdbcPluginName" with option value: "cloudsql-mysql"
3030
Then Select radio button plugin property: "instanceType" with value: "public"
3131
Then Enter input plugin property: "connectionName" with value: "ConnectionName"
32-
Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields
33-
Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields
32+
Then Enter input plugin property: "user" with value: "username"
33+
Then Enter input plugin property: "password" with value: "password"
34+
# Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields
35+
# Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields
3436
Then Enter input plugin property: "referenceName" with value: "RefName"
3537
Then Enter input plugin property: "database" with value: "DatabaseName"
3638
Then Enter textarea plugin property: "importQuery" with value: "selectQuery"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package io.cdap.plugin.CloudMySql;
2+
3+
import com.google.cloud.bigquery.TableResult;
4+
import com.google.gson.Gson;
5+
import com.google.gson.JsonObject;
6+
import io.cdap.e2e.utils.BigQueryClient;
7+
import io.cdap.e2e.utils.PluginPropertyUtils;
8+
import io.cdap.plugin.CloudMySqlClient;
9+
import org.junit.Assert;
10+
11+
import java.io.IOException;
12+
import java.sql.*;
13+
import java.text.ParseException;
14+
import java.text.SimpleDateFormat;
15+
import java.util.ArrayList;
16+
import java.util.Base64;
17+
import java.util.Date;
18+
import java.util.List;
19+
20+
/**
21+
* BQValidation
22+
*/
23+
24+
public class BQValidation {
25+
public static void main(String[] args) {
26+
// validateBQAndDBRecordValues(String schema, String sourceTable, String targetTable)
27+
}
28+
29+
/**
30+
* Extracts entire data from source and target tables.
31+
* @param sourceTable table at the source side
32+
* @param targetTable table at the sink side
33+
* @return true if the values in source and target side are equal
34+
*/
35+
36+
public static boolean validateBQAndDBRecordValues(String schema, String sourceTable, String targetTable)
37+
throws SQLException, ClassNotFoundException, ParseException, IOException, InterruptedException {
38+
List<JsonObject> jsonResponse = new ArrayList<>();
39+
List<Object> bigQueryRows = new ArrayList<>();
40+
getBigQueryTableData(targetTable, bigQueryRows);
41+
for (Object rows : bigQueryRows) {
42+
JsonObject json = new Gson().fromJson(String.valueOf(rows), JsonObject.class);
43+
jsonResponse.add(json);
44+
}
45+
String getSourceQuery = "SELECT * FROM " + schema + "." + sourceTable;
46+
try (Connection connect = CloudMySqlClient.getCloudMysqlConnection()) {
47+
connect.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT);
48+
Statement statement1 = connect.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE,
49+
ResultSet.HOLD_CURSORS_OVER_COMMIT);
50+
51+
ResultSet rsSource = statement1.executeQuery(getSourceQuery);
52+
return compareResultSetData(rsSource, jsonResponse);
53+
}
54+
}
55+
56+
/**
57+
* Retrieves the data from a specified BigQuery table and populates it into the provided list of objects.
58+
*
59+
* @param table The name of the BigQuery table to fetch data from.
60+
* @param bigQueryRows The list to store the fetched BigQuery data.
61+
*
62+
*/
63+
64+
private static void getBigQueryTableData(String table, List<Object> bigQueryRows)
65+
throws IOException, InterruptedException {
66+
67+
String projectId = PluginPropertyUtils.pluginProp("projectId");
68+
String dataset = PluginPropertyUtils.pluginProp("dataset");
69+
String selectQuery = "SELECT TO_JSON(t) FROM `" + projectId + "." + dataset + "." + table + "` AS t";
70+
TableResult result = BigQueryClient.getQueryResult(selectQuery);
71+
result.iterateAll().forEach(value -> bigQueryRows.add(value.get(0).getValue()));
72+
}
73+
74+
/**
75+
* Compares the data in the result set obtained from the Oracle database with the provided BigQuery JSON objects.
76+
*
77+
* @param rsSource The result set obtained from the Oracle database.
78+
* @param bigQueryData The list of BigQuery JSON objects to compare with the result set data.
79+
*
80+
* @return True if the result set data matches the BigQuery data, false otherwise.
81+
* @throws SQLException If an SQL error occurs during the result set operations.
82+
* @throws ParseException If an error occurs while parsing the data.
83+
*/
84+
85+
public static boolean compareResultSetData(ResultSet rsSource, List<JsonObject> bigQueryData) throws SQLException,
86+
ParseException {
87+
ResultSetMetaData mdSource = rsSource.getMetaData();
88+
boolean result = false;
89+
int columnCountSource = mdSource.getColumnCount();
90+
91+
if (bigQueryData == null) {
92+
Assert.fail("bigQueryData is null");
93+
return result;
94+
}
95+
96+
// Get the column count of the first JsonObject in bigQueryData
97+
int columnCountTarget = 0;
98+
if (bigQueryData.size() > 0) {
99+
columnCountTarget = bigQueryData.get(0).entrySet().size();
100+
}
101+
// Compare the number of columns in the source and target
102+
Assert.assertEquals("Number of columns in source and target are not equal",
103+
columnCountSource, columnCountTarget);
104+
105+
//Variable 'jsonObjectIdx' to track the index of the current JsonObject in the bigQueryData list,
106+
int jsonObjectIdx = 0;
107+
while (rsSource.next()) {
108+
int currentColumnCount = 1;
109+
while (currentColumnCount <= columnCountSource) {
110+
String columnTypeName = mdSource.getColumnTypeName(currentColumnCount);
111+
int columnType = mdSource.getColumnType(currentColumnCount);
112+
String columnName = mdSource.getColumnName(currentColumnCount);
113+
// Perform different comparisons based on column type
114+
switch (columnType) {
115+
// Since we skip BFILE in Oracle Sink, we are not comparing the BFILE source and sink values
116+
case Types.BLOB:
117+
case Types.VARBINARY:
118+
case Types.LONGVARBINARY:
119+
String sourceB64String = new String(Base64.getEncoder().encode(rsSource.getBytes(currentColumnCount)));
120+
String targetB64String = bigQueryData.get(jsonObjectIdx).get(columnName).getAsString();
121+
Assert.assertEquals("Different values found for column : %s",
122+
sourceB64String, targetB64String);
123+
break;
124+
125+
case Types.NUMERIC:
126+
long sourceVal = rsSource.getLong(currentColumnCount);
127+
long targetVal = Long.parseLong(bigQueryData.get(jsonObjectIdx).get(columnName).getAsString());
128+
Assert.assertTrue("Different values found for column : %s",
129+
String.valueOf(sourceVal).equals(String.valueOf(targetVal)));
130+
break;
131+
132+
case Types.TIMESTAMP:
133+
Timestamp sourceTS = rsSource.getTimestamp(columnName);
134+
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss'Z'");
135+
Date parsedDate = dateFormat.parse(bigQueryData.get(jsonObjectIdx).get(columnName).getAsString());
136+
Timestamp targetTs = new Timestamp(parsedDate.getTime());
137+
Assert.assertEquals("Different values found for column : %s", String.valueOf(sourceTS).
138+
equals(String.valueOf(targetTs)));
139+
break;
140+
default:
141+
String sourceString = rsSource.getString(currentColumnCount);
142+
String targetString = bigQueryData.get(jsonObjectIdx).get(columnName).getAsString();
143+
Assert.assertEquals(String.format("Different %s values found for column : %s", columnTypeName, columnName),
144+
String.valueOf(sourceString), String.valueOf(targetString));
145+
}
146+
currentColumnCount++;
147+
}
148+
jsonObjectIdx++;
149+
}
150+
Assert.assertFalse("Number of rows in Source table is greater than the number of rows in Target table",
151+
rsSource.next());
152+
return true;
153+
}
154+
}
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
package io.cdap.plugin.CloudMySql.stepsdesign;
22

33
public class CloudMySql {
4+
45
}

cloudsql-mysql-plugin/src/e2e-test/java/io/cdap/plugin/CloudMySqlClient.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public static Connection getCloudMysqlConnection() throws SQLException, ClassNot
3131
String Password = "v@123";
3232
String jdbcUrl = String.format("jdbc:mysql:///%s?cloudSqlInstance=%s&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=%s&password=%s", databaseName, instanceConnectionName, Username, Password);
3333
Connection conn = DriverManager.getConnection(jdbcUrl);
34-
//System.out.println("connected to database");
34+
System.out.println("connected to database");
3535
return conn;
3636
}
3737

@@ -148,6 +148,7 @@ public static void createSourceDatatypesTable(String sourceTable) throws SQLExce
148148
String datatypesColumns = PluginPropertyUtils.pluginProp("datatypesColumns");
149149
String createSourceTableQuery = "CREATE TABLE " + sourceTable + " " + datatypesColumns;
150150
statement.executeUpdate(createSourceTableQuery);
151+
System.out.println(createSourceTableQuery);
151152

152153
// Insert dummy data.
153154
String datatypesValues = PluginPropertyUtils.pluginProp("datatypesValue1");

cloudsql-mysql-plugin/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java

+6
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,19 @@
2424
*/
2525

2626
public class TestSetupHooks {
27+
public static void main(String[] args) throws SQLException, ClassNotFoundException {
28+
setTableName();
29+
createDatatypesTable();
30+
}
2731
private static void setTableName() {
2832
String randomString = RandomStringUtils.randomAlphabetic(10);
2933
String sourceTableName = String.format("SourceTable_%s", randomString);
3034
String targetTableName = String.format("TargetTable_%s", randomString);
3135
PluginPropertyUtils.addPluginProp("sourceTable", sourceTableName);
3236
PluginPropertyUtils.addPluginProp("targetTable", targetTableName);
3337
PluginPropertyUtils.addPluginProp("selectQuery", String.format("select * from %s", sourceTableName));
38+
System.out.println(sourceTableName);
39+
3440
}
3541

3642
@Before(order = 1)

cloudsql-mysql-plugin/src/e2e-test/resources/pluginParameters.properties

-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ dataset=sql
2323
bqSourceTable=mysql
2424
driver=cloudsql-mysql
2525
table=myTable
26-
name=NAME
27-
pass=PASS
2826
invalidUserName=testUser
2927
invalidPassword=testPassword
3028
invalidTable=data

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -715,7 +715,7 @@
715715
<dependency>
716716
<groupId>io.cdap.tests.e2e</groupId>
717717
<artifactId>cdap-e2e-framework</artifactId>
718-
<version>0.2.0-SNAPSHOT</version>
718+
<version>0.3.0-SNAPSHOT</version>
719719
<scope>test</scope>
720720
</dependency>
721721
<dependency>

0 commit comments

Comments
 (0)