|
| 1 | +package io.cdap.plugin.CloudMySql; |
| 2 | + |
| 3 | +import com.google.cloud.bigquery.TableResult; |
| 4 | +import com.google.gson.Gson; |
| 5 | +import com.google.gson.JsonObject; |
| 6 | +import io.cdap.e2e.utils.BigQueryClient; |
| 7 | +import io.cdap.e2e.utils.PluginPropertyUtils; |
| 8 | +import io.cdap.plugin.CloudMySqlClient; |
| 9 | +import org.junit.Assert; |
| 10 | + |
| 11 | +import java.io.IOException; |
| 12 | +import java.sql.*; |
| 13 | +import java.text.ParseException; |
| 14 | +import java.text.SimpleDateFormat; |
| 15 | +import java.util.ArrayList; |
| 16 | +import java.util.Base64; |
| 17 | +import java.util.Date; |
| 18 | +import java.util.List; |
| 19 | + |
| 20 | +/** |
| 21 | + * BQValidation |
| 22 | + */ |
| 23 | + |
| 24 | +public class BQValidation { |
| 25 | + public static void main(String[] args) { |
| 26 | +// validateBQAndDBRecordValues(String schema, String sourceTable, String targetTable) |
| 27 | + } |
| 28 | + |
| 29 | + /** |
| 30 | + * Extracts entire data from source and target tables. |
| 31 | + * @param sourceTable table at the source side |
| 32 | + * @param targetTable table at the sink side |
| 33 | + * @return true if the values in source and target side are equal |
| 34 | + */ |
| 35 | + |
| 36 | + public static boolean validateBQAndDBRecordValues(String schema, String sourceTable, String targetTable) |
| 37 | + throws SQLException, ClassNotFoundException, ParseException, IOException, InterruptedException { |
| 38 | + List<JsonObject> jsonResponse = new ArrayList<>(); |
| 39 | + List<Object> bigQueryRows = new ArrayList<>(); |
| 40 | + getBigQueryTableData(targetTable, bigQueryRows); |
| 41 | + for (Object rows : bigQueryRows) { |
| 42 | + JsonObject json = new Gson().fromJson(String.valueOf(rows), JsonObject.class); |
| 43 | + jsonResponse.add(json); |
| 44 | + } |
| 45 | + String getSourceQuery = "SELECT * FROM " + schema + "." + sourceTable; |
| 46 | + try (Connection connect = CloudMySqlClient.getCloudMysqlConnection()) { |
| 47 | + connect.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT); |
| 48 | + Statement statement1 = connect.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE, |
| 49 | + ResultSet.HOLD_CURSORS_OVER_COMMIT); |
| 50 | + |
| 51 | + ResultSet rsSource = statement1.executeQuery(getSourceQuery); |
| 52 | + return compareResultSetData(rsSource, jsonResponse); |
| 53 | + } |
| 54 | + } |
| 55 | + |
| 56 | + /** |
| 57 | + * Retrieves the data from a specified BigQuery table and populates it into the provided list of objects. |
| 58 | + * |
| 59 | + * @param table The name of the BigQuery table to fetch data from. |
| 60 | + * @param bigQueryRows The list to store the fetched BigQuery data. |
| 61 | + * |
| 62 | + */ |
| 63 | + |
| 64 | + private static void getBigQueryTableData(String table, List<Object> bigQueryRows) |
| 65 | + throws IOException, InterruptedException { |
| 66 | + |
| 67 | + String projectId = PluginPropertyUtils.pluginProp("projectId"); |
| 68 | + String dataset = PluginPropertyUtils.pluginProp("dataset"); |
| 69 | + String selectQuery = "SELECT TO_JSON(t) FROM `" + projectId + "." + dataset + "." + table + "` AS t"; |
| 70 | + TableResult result = BigQueryClient.getQueryResult(selectQuery); |
| 71 | + result.iterateAll().forEach(value -> bigQueryRows.add(value.get(0).getValue())); |
| 72 | + } |
| 73 | + |
| 74 | + /** |
| 75 | + * Compares the data in the result set obtained from the Oracle database with the provided BigQuery JSON objects. |
| 76 | + * |
| 77 | + * @param rsSource The result set obtained from the Oracle database. |
| 78 | + * @param bigQueryData The list of BigQuery JSON objects to compare with the result set data. |
| 79 | + * |
| 80 | + * @return True if the result set data matches the BigQuery data, false otherwise. |
| 81 | + * @throws SQLException If an SQL error occurs during the result set operations. |
| 82 | + * @throws ParseException If an error occurs while parsing the data. |
| 83 | + */ |
| 84 | + |
| 85 | + public static boolean compareResultSetData(ResultSet rsSource, List<JsonObject> bigQueryData) throws SQLException, |
| 86 | + ParseException { |
| 87 | + ResultSetMetaData mdSource = rsSource.getMetaData(); |
| 88 | + boolean result = false; |
| 89 | + int columnCountSource = mdSource.getColumnCount(); |
| 90 | + |
| 91 | + if (bigQueryData == null) { |
| 92 | + Assert.fail("bigQueryData is null"); |
| 93 | + return result; |
| 94 | + } |
| 95 | + |
| 96 | + // Get the column count of the first JsonObject in bigQueryData |
| 97 | + int columnCountTarget = 0; |
| 98 | + if (bigQueryData.size() > 0) { |
| 99 | + columnCountTarget = bigQueryData.get(0).entrySet().size(); |
| 100 | + } |
| 101 | + // Compare the number of columns in the source and target |
| 102 | + Assert.assertEquals("Number of columns in source and target are not equal", |
| 103 | + columnCountSource, columnCountTarget); |
| 104 | + |
| 105 | + //Variable 'jsonObjectIdx' to track the index of the current JsonObject in the bigQueryData list, |
| 106 | + int jsonObjectIdx = 0; |
| 107 | + while (rsSource.next()) { |
| 108 | + int currentColumnCount = 1; |
| 109 | + while (currentColumnCount <= columnCountSource) { |
| 110 | + String columnTypeName = mdSource.getColumnTypeName(currentColumnCount); |
| 111 | + int columnType = mdSource.getColumnType(currentColumnCount); |
| 112 | + String columnName = mdSource.getColumnName(currentColumnCount); |
| 113 | + // Perform different comparisons based on column type |
| 114 | + switch (columnType) { |
| 115 | + // Since we skip BFILE in Oracle Sink, we are not comparing the BFILE source and sink values |
| 116 | + case Types.BLOB: |
| 117 | + case Types.VARBINARY: |
| 118 | + case Types.LONGVARBINARY: |
| 119 | + String sourceB64String = new String(Base64.getEncoder().encode(rsSource.getBytes(currentColumnCount))); |
| 120 | + String targetB64String = bigQueryData.get(jsonObjectIdx).get(columnName).getAsString(); |
| 121 | + Assert.assertEquals("Different values found for column : %s", |
| 122 | + sourceB64String, targetB64String); |
| 123 | + break; |
| 124 | + |
| 125 | + case Types.NUMERIC: |
| 126 | + long sourceVal = rsSource.getLong(currentColumnCount); |
| 127 | + long targetVal = Long.parseLong(bigQueryData.get(jsonObjectIdx).get(columnName).getAsString()); |
| 128 | + Assert.assertTrue("Different values found for column : %s", |
| 129 | + String.valueOf(sourceVal).equals(String.valueOf(targetVal))); |
| 130 | + break; |
| 131 | + |
| 132 | + case Types.TIMESTAMP: |
| 133 | + Timestamp sourceTS = rsSource.getTimestamp(columnName); |
| 134 | + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss'Z'"); |
| 135 | + Date parsedDate = dateFormat.parse(bigQueryData.get(jsonObjectIdx).get(columnName).getAsString()); |
| 136 | + Timestamp targetTs = new Timestamp(parsedDate.getTime()); |
| 137 | + Assert.assertEquals("Different values found for column : %s", String.valueOf(sourceTS). |
| 138 | + equals(String.valueOf(targetTs))); |
| 139 | + break; |
| 140 | + default: |
| 141 | + String sourceString = rsSource.getString(currentColumnCount); |
| 142 | + String targetString = bigQueryData.get(jsonObjectIdx).get(columnName).getAsString(); |
| 143 | + Assert.assertEquals(String.format("Different %s values found for column : %s", columnTypeName, columnName), |
| 144 | + String.valueOf(sourceString), String.valueOf(targetString)); |
| 145 | + } |
| 146 | + currentColumnCount++; |
| 147 | + } |
| 148 | + jsonObjectIdx++; |
| 149 | + } |
| 150 | + Assert.assertFalse("Number of rows in Source table is greater than the number of rows in Target table", |
| 151 | + rsSource.next()); |
| 152 | + return true; |
| 153 | + } |
| 154 | +} |
0 commit comments