From 4f28333e3cb23af384fb9e58bb23d0fa9adbb997 Mon Sep 17 00:00:00 2001 From: Haoming Zhang Date: Thu, 22 Jun 2017 14:09:33 -0700 Subject: [PATCH 1/2] add map to store columns --- apigee_sync_test.go | 2 +- changes.go | 21 +++++++++++++++ snapshot.go | 40 +++++++++++++++++++++++++++++ snapshot_test.go | 61 ++++++++++++++++++++++++++++++++++++++++++++ sql/init_mock_db.sql | 49 ++++++++--------------------------- 5 files changed, 134 insertions(+), 39 deletions(-) create mode 100644 snapshot_test.go diff --git a/apigee_sync_test.go b/apigee_sync_test.go index d81ec32..b885593 100644 --- a/apigee_sync_test.go +++ b/apigee_sync_test.go @@ -84,7 +84,7 @@ var _ = Describe("Sync", func() { apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { if s, ok := event.(*common.Snapshot); ok { - Expect(16).To(Equal(len(knownTables))) + Expect(12).To(Equal(len(knownTables))) Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse()) lastSnapshot = s diff --git a/changes.go b/changes.go index 993613f..9f26466 100644 --- a/changes.go +++ b/changes.go @@ -316,6 +316,27 @@ func changesHaveNewTables(a map[string]bool, changes []common.Change) bool { return false } +/* + * Determine if any columns added/dropped in any table + */ +func changesHavecolumnsChanged(a map[string]bool, changes []common.Change) bool { + + //nil maps should not be passed in. Making the distinction between nil map and empty map + if a == nil { + log.Warn("Nil map passed to function changesHaveNewTables, may be bug") + return true + } + + for _, change := range changes { + if !a[normalizeTableName(change.Table)] { + log.Infof("Unable to find %s table in current known tables", change.Table) + return true + } + } + + return false +} + /* * seqCurr.Compare() will return 1, if its newer than seqPrev, * else will return 0, if same, or -1 if older. diff --git a/snapshot.go b/snapshot.go index 960b4fc..ae889e3 100644 --- a/snapshot.go +++ b/snapshot.go @@ -201,6 +201,46 @@ func extractTablesFromSnapshot(snapshot *common.Snapshot) (tables map[string]boo } +func extractTableColumnsFromSnapshot(snapshot *common.Snapshot) (map[string][]string) { + + columns := make(map[string][]string) + tables := make([]string, 0) + + log.Debug("Extracting table names from snapshot") + db, err := dataService.DBVersion(snapshot.SnapshotInfo) + if err != nil { + log.Panicf("Database inaccessible: %v", err) + } + rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;") + if err != nil { + log.Panicf("Unable to read in known snapshot tables from sqlite file") + } + defer rows.Close() + for rows.Next() { + var tableName string + rows.Scan(&tableName) + if err != nil { + log.Panic("Error scaning tableNames from _transicator_tables") + } + tables = append(tables, tableName) + } + + for _, tableName := range tables { + + dummyRows, err := db.Query("SELECT * FROM " + tableName + " LIMIT 0;") + if err != nil { + log.Panicf("Get table info failed: %v", err) + } + defer dummyRows.Close() + cols, err := dummyRows.Columns() + if err != nil { + log.Panicf("Get table columns failed: %v", err) + } + columns[tableName] = cols + } + return columns +} + func extractTablesFromDB(db apid.DB) (tables map[string]bool) { tables = make(map[string]bool) diff --git a/snapshot_test.go b/snapshot_test.go new file mode 100644 index 0000000..ffad61a --- /dev/null +++ b/snapshot_test.go @@ -0,0 +1,61 @@ +// Copyright 2017 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package apidApigeeSync +import ( + "github.com/apigee-labs/transicator/common" + . "github.com/onsi/ginkgo" + "os" + "strings" +) + +var _ = Describe("Change Agent", func() { + + const testDbId = "test_snapshot" + + Context("Change Agent Unit Tests", func() { + testHandler := handler{} + + var createTestDb = func(sqlfile string, dbId string) common.Snapshot { + initDb(sqlfile, "./mockdb_snapshot.sqlite3") + file, err := os.Open("./mockdb_snapshot.sqlite3") + if err != nil { + Fail("Failed to open mock db for test") + } + + s := common.Snapshot{} + err = processSnapshotServerFileResponse(dbId, file, &s) + if err != nil { + Fail("Error processing test snapshots") + } + return s + } + + BeforeEach(func() { + event := createTestDb("./sql/init_mock_db.sql", testDbId) + testHandler.Handle(&event) + knownTables = extractTablesFromDB(getDB()) + }) + + It("test extract table columns", func() { + s := &common.Snapshot{ + SnapshotInfo: testDbId, + } + columns := extractTableColumnsFromSnapshot(s) + for table, cols := range columns { + log.Error("snapshot TABLE: " + table + " COLUMN: " + strings.Join(cols, "|")) + } + }) + + }) +}) \ No newline at end of file diff --git a/sql/init_mock_db.sql b/sql/init_mock_db.sql index f1b8471..4df6654 100644 --- a/sql/init_mock_db.sql +++ b/sql/init_mock_db.sql @@ -22,17 +22,17 @@ CREATE TABLE _transicator_tables columnName varchar not null, typid integer, primaryKey bool); -INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','id',1043,1); -INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','data_scope_id',1043,1); -INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','name',25,0); -INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','uri',25,0); -INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','checksumtype',25,0); -INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','checksum',25,0); -INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','created',1114,0); -INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','created_by',25,0); -INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','updated',1114,0); -INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','updated_by',25,0); -INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','crc',25,0); +-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','id',1043,1); +-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','data_scope_id',1043,1); +-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','name',25,0); +-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','uri',25,0); +-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','checksumtype',25,0); +-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','checksum',25,0); +-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','created',1114,0); +-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','created_by',25,0); +-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','updated',1114,0); +-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','updated_by',25,0); +-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','crc',25,0); INSERT INTO "_transicator_tables" VALUES('kms_deployment','id',1043,1); INSERT INTO "_transicator_tables" VALUES('kms_deployment','bundle_config_id',1043,1); INSERT INTO "_transicator_tables" VALUES('kms_deployment','apid_cluster_id',1043,1); @@ -107,21 +107,6 @@ INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created_by',25,1) INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated',1114,0); INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated_by',25,0); INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','_change_selector',25,1); -INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','id',2950,1); -INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','name',1043,0); -INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','ext_ref_id',1043,1); -INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','display_name',1043,0); -INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','description',1043,0); -INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','created_at',1114,1); -INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','created_by',1043,0); -INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','updated_at',1114,1); -INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','updated_by',1043,0); -INSERT INTO "_transicator_tables" VALUES('configuration','id',1043,1); -INSERT INTO "_transicator_tables" VALUES('configuration','body',25,0); -INSERT INTO "_transicator_tables" VALUES('configuration','created',1114,0); -INSERT INTO "_transicator_tables" VALUES('configuration','created_by',25,0); -INSERT INTO "_transicator_tables" VALUES('configuration','updated',1114,0); -INSERT INTO "_transicator_tables" VALUES('configuration','updated_by',25,0); INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1); INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1); INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0); @@ -156,18 +141,6 @@ INSERT INTO "_transicator_tables" VALUES('kms_company_developer','created_by',10 INSERT INTO "_transicator_tables" VALUES('kms_company_developer','updated_at',1114,0); INSERT INTO "_transicator_tables" VALUES('kms_company_developer','updated_by',1043,0); INSERT INTO "_transicator_tables" VALUES('kms_company_developer','_change_selector',1043,0); -INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','id',1043,1); -INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','deployment_id',1043,0); -INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','action',25,0); -INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','bundle_config_id',1043,0); -INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','apid_cluster_id',1043,0); -INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','data_scope_id',1043,0); -INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','bundle_config_json',25,0); -INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','config_json',25,0); -INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','created',1114,0); -INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','created_by',25,0); -INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','updated',1114,0); -INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','updated_by',25,0); INSERT INTO "_transicator_tables" VALUES('kms_app','id',2950,1); INSERT INTO "_transicator_tables" VALUES('kms_app','tenant_id',1043,1); INSERT INTO "_transicator_tables" VALUES('kms_app','name',1043,1); From d20fb197b227c9b8b0264b1cbb5c5e679a43dfb5 Mon Sep 17 00:00:00 2001 From: Haoming Zhang Date: Fri, 7 Jul 2017 16:42:00 -0700 Subject: [PATCH 2/2] add column maps --- apigee_sync.go | 2 +- apigee_sync_test.go | 15 ++++----- change_test.go | 2 +- changes.go | 4 +-- snapshot.go | 77 ++++++++------------------------------------- snapshot_test.go | 14 ++++++--- 6 files changed, 34 insertions(+), 80 deletions(-) diff --git a/apigee_sync.go b/apigee_sync.go index 10745de..1f6e38a 100644 --- a/apigee_sync.go +++ b/apigee_sync.go @@ -26,7 +26,7 @@ const ( maxIdleConnsPerHost = 10 ) -var knownTables = make(map[string]bool) +var knownTables = make(map[string]map[string]bool) /* * Start from existing snapshot if possible diff --git a/apigee_sync_test.go b/apigee_sync_test.go index b885593..ad79a6b 100644 --- a/apigee_sync_test.go +++ b/apigee_sync_test.go @@ -235,38 +235,39 @@ var _ = Describe("Sync", func() { It("should correctly identify non-proper subsets with respect to maps", func() { + testMap := map[string]map[string]bool{"a": make(map[string]bool), "b": make(map[string]bool)} //test b proper subset of a - Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + Expect(changesHaveNewTables(testMap, []common.Change{common.Change{Table: "b"}}, )).To(BeFalse()) //test a == b - Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + Expect(changesHaveNewTables(testMap, []common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}}, )).To(BeFalse()) //test b superset of a - Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + Expect(changesHaveNewTables(testMap, []common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}, common.Change{Table: "c"}}, )).To(BeTrue()) //test b not subset of a - Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + Expect(changesHaveNewTables(testMap, []common.Change{common.Change{Table: "c"}}, )).To(BeTrue()) //test a empty - Expect(changesHaveNewTables(map[string]bool{}, + Expect(changesHaveNewTables(map[string]map[string]bool{}, []common.Change{common.Change{Table: "a"}}, )).To(BeTrue()) //test b empty - Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + Expect(changesHaveNewTables(testMap, []common.Change{}, )).To(BeFalse()) //test b nil - Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, nil)).To(BeFalse()) + Expect(changesHaveNewTables(testMap, nil)).To(BeFalse()) //test a nil Expect(changesHaveNewTables(nil, diff --git a/change_test.go b/change_test.go index 383bff6..ec9b82f 100644 --- a/change_test.go +++ b/change_test.go @@ -46,7 +46,7 @@ var _ = Describe("Change Agent", func() { BeforeEach(func() { event := createTestDb("./sql/init_mock_db.sql", "test_change") processSnapshot(&event) - knownTables = extractTablesFromDB(getDB()) + knownTables = extractTableColsFromDB(getDB()) }) var initializeContext = func() { diff --git a/changes.go b/changes.go index 9f26466..9141927 100644 --- a/changes.go +++ b/changes.go @@ -298,7 +298,7 @@ func (c *pollChangeManager) handleChangeServerError(err error) { /* * Determine if any tables in changes are not present in known tables */ -func changesHaveNewTables(a map[string]bool, changes []common.Change) bool { +func changesHaveNewTables(a map[string]map[string]bool, changes []common.Change) bool { //nil maps should not be passed in. Making the distinction between nil map and empty map if a == nil { @@ -307,7 +307,7 @@ func changesHaveNewTables(a map[string]bool, changes []common.Change) bool { } for _, change := range changes { - if !a[normalizeTableName(change.Table)] { + if _, ok := a[normalizeTableName(change.Table)]; !ok { log.Infof("Unable to find %s table in current known tables", change.Table) return true } diff --git a/snapshot.go b/snapshot.go index ae889e3..66108eb 100644 --- a/snapshot.go +++ b/snapshot.go @@ -147,7 +147,7 @@ func (s *simpleSnapShotManager) downloadDataSnapshot() { } func (s *simpleSnapShotManager) storeDataSnapshot(snapshot *common.Snapshot) { - knownTables = extractTablesFromSnapshot(snapshot) + knownTables = extractTableColumnsFromSnapshot(snapshot) _, err := dataService.DBVersion(snapshot.SnapshotInfo) if err != nil { @@ -167,50 +167,20 @@ func (s *simpleSnapShotManager) storeDataSnapshot(snapshot *common.Snapshot) { } -func extractTablesFromSnapshot(snapshot *common.Snapshot) (tables map[string]bool) { - - tables = make(map[string]bool) - +func extractTableColumnsFromSnapshot(snapshot *common.Snapshot) map[string]map[string]bool { log.Debug("Extracting table names from snapshot") - if snapshot.Tables == nil { - //if this panic ever fires, it's a bug - db, err := dataService.DBVersion(snapshot.SnapshotInfo) - if err != nil { - log.Panicf("Database inaccessible: %v", err) - } - rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;") - if err != nil { - log.Panicf("Unable to read in known snapshot tables from sqlite file") - } - for rows.Next() { - var tableName string - rows.Scan(&tableName) - if err != nil { - log.Panic("Error scaning tableNames from _transicator_tables") - } - tables[tableName] = true - } - - } else { - - for _, table := range snapshot.Tables { - tables[table.Name] = true - } + db, err := dataService.DBVersion(snapshot.SnapshotInfo) + if err != nil { + log.Panicf("Database inaccessible: %v", err) } - return tables + return extractTableColsFromDB(db) } -func extractTableColumnsFromSnapshot(snapshot *common.Snapshot) (map[string][]string) { +func extractTableColsFromDB(db apid.DB) map[string]map[string]bool { - columns := make(map[string][]string) + columns := make(map[string]map[string]bool) tables := make([]string, 0) - - log.Debug("Extracting table names from snapshot") - db, err := dataService.DBVersion(snapshot.SnapshotInfo) - if err != nil { - log.Panicf("Database inaccessible: %v", err) - } rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;") if err != nil { log.Panicf("Unable to read in known snapshot tables from sqlite file") @@ -226,7 +196,7 @@ func extractTableColumnsFromSnapshot(snapshot *common.Snapshot) (map[string][]st } for _, tableName := range tables { - + columns[tableName] = make(map[string]bool) dummyRows, err := db.Query("SELECT * FROM " + tableName + " LIMIT 0;") if err != nil { log.Panicf("Get table info failed: %v", err) @@ -236,33 +206,12 @@ func extractTableColumnsFromSnapshot(snapshot *common.Snapshot) (map[string][]st if err != nil { log.Panicf("Get table columns failed: %v", err) } - columns[tableName] = cols - } - return columns -} - -func extractTablesFromDB(db apid.DB) (tables map[string]bool) { - - tables = make(map[string]bool) - - log.Debug("Extracting table names from existing DB") - rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;") - defer rows.Close() - - if err != nil { - log.Panicf("Error reading current set of tables: %v", err) - } - - for rows.Next() { - var table string - if err := rows.Scan(&table); err != nil { - log.Panicf("Error reading current set of tables: %v", err) + for _, col := range cols { + columns[tableName][col] = true } - log.Debugf("Table %s found in existing db", table) - tables[table] = true } - return tables + return columns } // Skip Downloading snapshot if there is already a snapshot available from previous run @@ -275,7 +224,7 @@ func startOnLocalSnapshot(snapshot string) *common.Snapshot { log.Panicf("Database inaccessible: %v", err) } - knownTables = extractTablesFromDB(db) + knownTables = extractTableColsFromDB(db) // allow plugins (including this one) to start immediately on existing database // Note: this MUST have no tables as that is used as an indicator diff --git a/snapshot_test.go b/snapshot_test.go index ffad61a..bcbafd5 100644 --- a/snapshot_test.go +++ b/snapshot_test.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. package apidApigeeSync + import ( "github.com/apigee-labs/transicator/common" . "github.com/onsi/ginkgo" @@ -24,7 +25,6 @@ var _ = Describe("Change Agent", func() { const testDbId = "test_snapshot" Context("Change Agent Unit Tests", func() { - testHandler := handler{} var createTestDb = func(sqlfile string, dbId string) common.Snapshot { initDb(sqlfile, "./mockdb_snapshot.sqlite3") @@ -43,8 +43,8 @@ var _ = Describe("Change Agent", func() { BeforeEach(func() { event := createTestDb("./sql/init_mock_db.sql", testDbId) - testHandler.Handle(&event) - knownTables = extractTablesFromDB(getDB()) + processSnapshot(&event) + knownTables = extractTableColsFromDB(getDB()) }) It("test extract table columns", func() { @@ -52,10 +52,14 @@ var _ = Describe("Change Agent", func() { SnapshotInfo: testDbId, } columns := extractTableColumnsFromSnapshot(s) - for table, cols := range columns { + for table, colMap := range columns { + cols := []string{} + for col := range colMap { + cols = append(cols, col) + } log.Error("snapshot TABLE: " + table + " COLUMN: " + strings.Join(cols, "|")) } }) }) -}) \ No newline at end of file +})