-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmkdb.go
115 lines (89 loc) · 2.3 KB
/
mkdb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package main
import (
"bufio"
"crypto/sha1"
"encoding/hex"
"fmt"
"os"
"strings"
"github.com/blevesearch/bleve"
log "github.com/sirupsen/logrus"
)
func isValidType(pType string) bool {
if (pType == TYPE_IPv4) || (pType == TYPE_IPv6) {
return true
}
return false
}
func getRecordIndexID(r *AsnRecord) string {
h := sha1.New()
h.Write([]byte(fmt.Sprintf("%s_%s", r.ID, r.Address)))
return hex.EncodeToString(h.Sum(nil))
}
func removeQuotes(s string) string {
return strings.ReplaceAll(s, "\"", "")
}
func createIndexedAsnDB(pDB, pType, pFile *string) {
index, err := bleve.Open(*pDB)
if err != nil {
mapping := bleve.NewIndexMapping()
index, err = bleve.New(*pDB, mapping)
if err != nil {
log.Fatalf("Failed to create index at %s error: %+v", *pDB, err)
}
}
file, err := os.Open(*pFile)
if err != nil {
log.Fatalf("Failed to open CSV file: %s", *pFile)
}
defer file.Close()
defer index.Close()
if !isValidType(*pType) {
log.Fatalf("Invalid type: %s", *pType)
}
scanner := bufio.NewScanner(file)
count := 0
log.Infof("Indexing records to DB: %s", *pDB)
// var wg sync.WaitGroup
// channel := make(chan *AsnRecord)
// // Start the indexers
// for i := 0; i < 10; i++ {
// wg.Add(1)
// go func(c chan *AsnRecord, workerId int) {
// defer wg.Done()
// log.Infof("Indexer worker running: %d", workerId)
// for r := range c {
// riID := getRecordIndexID(r)
// if err := index.Index(riID, r); err != nil {
// log.Fatalf("Failed to index record. Error: %+v", err)
// }
// }
// }(channel, i)
// }
for scanner.Scan() {
parts := strings.Split(scanner.Text(), ",")
record := AsnRecord{ID: parts[1],
Address: parts[0],
Organization: removeQuotes(parts[2]),
Type: *pType}
riID := getRecordIndexID(&record)
index.Index(riID, record)
// wg.Add(1)
// go func(r *AsnRecord) {
// defer wg.Done()
// riID := getRecordIndexID(r)
// log.Infof("Indexing record with ASNID: %s ID: %s", r.ID, riID)
// if err := index.Index(riID, r); err != nil {
// log.Fatalf("Failed to index record. Error: %+v", err)
// }
// }(&record)
// channel <- &record
count = count + 1
if (count % 10000) == 0 {
log.Infof("Indexed %d records", count)
}
}
// close(channel)
// wg.Wait()
log.Infof("Indexed %d entries", count)
}