-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmain.go
More file actions
477 lines (414 loc) · 14 KB
/
main.go
File metadata and controls
477 lines (414 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"os"
"strings"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/relaytools/go-wsstat"
"github.com/spf13/viper"
"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nip11"
"github.com/mmcloughlin/geohash"
)
// normalizeURL normalizes a URL by converting it to lowercase and ensuring consistent format
func normalizeURL(rawURL string) (string, error) {
parsedURL, err := url.Parse(rawURL)
if err != nil {
return "", err
}
// Convert scheme and host to lowercase
parsedURL.Scheme = strings.ToLower(parsedURL.Scheme)
parsedURL.Host = strings.ToLower(parsedURL.Host)
// Remove default ports
if (parsedURL.Scheme == "ws" && strings.HasSuffix(parsedURL.Host, ":80")) ||
(parsedURL.Scheme == "wss" && strings.HasSuffix(parsedURL.Host, ":443")) {
parsedURL.Host = parsedURL.Host[:strings.LastIndex(parsedURL.Host, ":")]
}
// Ensure path ends with / if it's empty
if parsedURL.Path == "" {
parsedURL.Path = "/"
}
return parsedURL.String(), nil
}
// ValidRelayTypes is the set of allowed T-tag values for kind 30166 relay type advertisement.
var ValidRelayTypes = map[string]bool{
"PublicOutbox": true,
"PublicInbox": true,
"PrivateInbox": true,
"PrivateStorage": true,
"Search": true,
"Directory": true,
"Community": true,
"Algo": true,
"Archival": true,
"LocalCache": true,
"Blob": true,
"Broadcast": true,
"Proxy": true,
"Trusted": true,
"Push": true,
}
type MonitorConfig struct {
InfluxUrl string `mapstructure:"INFLUXDB_URL"`
InfluxToken string `mapstructure:"INFLUXDB_TOKEN"`
InfluxOrg string `mapstructure:"INFLUXDB_ORG"`
InfluxBucket string `mapstructure:"INFLUXDB_BUCKET"`
InfluxMeasurement string `mapstructure:"INFLUXDB_MEASUREMENT"`
MonitorName string `mapstructure:"MONITOR_NAME"`
MonitorFrequency int `mapstructure:"MONITOR_FREQUENCY"`
Publish bool `mapstructure:"NOSTR_PUBLISH"`
PrivateKey string `mapstructure:"NOSTR_PRIVATE_KEY"`
PublishRelayMetrics string `mapstructure:"NOSTR_PUBLISH_RELAY_METRICS"`
PublishMonitorProfile bool `mapstructure:"NOSTR_PUBLISH_MONITOR_PROFILE"`
MonitorCountryCode string `mapstructure:"MONITOR_COUNTRY_CODE"`
MonitorLatitude float64 `mapstructure:"MONITOR_LATITUDE"`
MonitorLongitude float64 `mapstructure:"MONITOR_LONGITUDE"`
MonitorAbout string `mapstructure:"MONITOR_ABOUT"`
MonitorPicture string `mapstructure:"MONITOR_PICTURE"`
RelayUrls string `mapstructure:"RELAY_URLS"`
RelayLatitude float64 `mapstructure:"RELAY_LATITUDE"`
RelayLongitude float64 `mapstructure:"RELAY_LONGITUDE"`
}
// RelayEntry holds a relay URL and its optional type, parsed from "url;type" notation.
type RelayEntry struct {
URL string
Type string
}
// parseRelayEntry splits a "url;type" string into a RelayEntry.
// The type portion is optional; if absent, Type is empty.
func parseRelayEntry(raw string) RelayEntry {
raw = strings.TrimSpace(raw)
parts := strings.SplitN(raw, ";", 2)
if len(parts) == 2 {
return RelayEntry{URL: strings.TrimSpace(parts[0]), Type: strings.TrimSpace(parts[1])}
}
return RelayEntry{URL: parts[0]}
}
type NostrProfile struct {
Name string `json:"name"`
About string `json:"about"`
Picture string `json:"picture"`
}
func publishEv(ev nostr.Event, urls []string) (err error) {
isError := false
var lastError error
lastError = nil
ctx := context.Background()
for i, url := range urls {
if i > 0 {
time.Sleep(500 * time.Millisecond)
}
relay, err := nostr.RelayConnect(ctx, url)
if err != nil {
isError = true
lastError = err
}
if err := relay.Publish(ctx, ev); err != nil {
isError = true
lastError = err
}
relay.Close()
}
if isError {
return lastError
}
return nil
}
func main() {
// Config loading
viper.AddConfigPath("/usr/local/etc")
viper.AddConfigPath("./")
viper.SetConfigName(".monitorlizard.env")
viper.SetConfigType("env")
var iConfig *MonitorConfig
if err := viper.ReadInConfig(); err != nil {
fmt.Println("Warn: error reading monitorlizard config file from current directory -or- /usr/local/etc/.monitorlizard.env\n", err)
os.Exit(1)
}
// Viper unmarshals the loaded env variables into the struct
if err := viper.Unmarshal(&iConfig); err != nil {
fmt.Print("Warn: unable to decode monitorlizard config into struct\n", err)
os.Exit(1)
}
relayEntries := []RelayEntry{}
if iConfig.RelayUrls != "" && strings.Contains(iConfig.RelayUrls, ",") {
for _, raw := range strings.Split(iConfig.RelayUrls, ",") {
relayEntries = append(relayEntries, parseRelayEntry(raw))
}
} else if iConfig.RelayUrls != "" {
// single url
relayEntries = []RelayEntry{parseRelayEntry(iConfig.RelayUrls)}
} else {
// command line arg
args := os.Args
if len(args) < 2 {
log.Printf("Usage: go run main.go URL, or specify RELAY_URLS in config")
}
rawUrl := args[1]
_, err := url.Parse(strings.SplitN(rawUrl, ";", 2)[0])
if err != nil {
log.Fatalf("Failed to parse URL: %v", err)
}
relayEntries = []RelayEntry{parseRelayEntry(rawUrl)}
}
// if a comma is detected in the iConfig.PublishRelayMetrics, split it into a slice
publishRelays := []string{iConfig.PublishRelayMetrics}
if iConfig.PublishRelayMetrics != "" && strings.Contains(iConfig.PublishRelayMetrics, ",") {
publishRelays = strings.Split(iConfig.PublishRelayMetrics, ",")
}
fmt.Printf("Publishing to %d relays: %v\n", len(publishRelays), publishRelays)
influxEnabled := true
if iConfig.InfluxUrl == "" || iConfig.InfluxToken == "" || iConfig.InfluxOrg == "" || iConfig.InfluxBucket == "" || iConfig.InfluxMeasurement == "" {
fmt.Println("Warn: InfluxDB configuration missing, disabling InfluxDB")
influxEnabled = false
}
// Default to frequency 10 seconds
useFrequency := time.Second * 10
useFrequencySecondsString := "10"
if iConfig.MonitorFrequency != 0 {
useFrequency = time.Second * time.Duration(iConfig.MonitorFrequency)
useFrequencySecondsString = fmt.Sprintf("%d", iConfig.MonitorFrequency)
}
pub, _ := nostr.GetPublicKey(iConfig.PrivateKey)
fmt.Printf("Info: influxdb: %t\n", influxEnabled)
var client influxdb2.Client
var writeAPI api.WriteAPI
if influxEnabled {
// INFLUX INIT
client = influxdb2.NewClientWithOptions(iConfig.InfluxUrl, iConfig.InfluxToken,
influxdb2.DefaultOptions().SetBatchSize(20))
// Get non-blocking write client
writeAPI = client.WriteAPI(iConfig.InfluxOrg, iConfig.InfluxBucket)
}
if iConfig.PublishMonitorProfile {
// 0 - Monitor Profile
newProfile := NostrProfile{
Name: iConfig.MonitorName,
About: iConfig.MonitorAbout,
Picture: iConfig.MonitorPicture,
}
var err error
newProfileJson, err := json.Marshal(newProfile)
if err != nil {
fmt.Println(err)
}
profileEv := nostr.Event{
PubKey: pub,
CreatedAt: nostr.Timestamp(time.Now().Unix()),
Kind: 0,
Tags: nostr.Tags{},
Content: string(newProfileJson),
}
profileEv.Sign(iConfig.PrivateKey)
err = publishEv(profileEv, publishRelays)
if err != nil {
fmt.Printf("Error publishing kind 0: %s\n", err)
} else {
fmt.Printf("published monitor profile kind:0 to %v\n", publishRelays)
}
time.Sleep(1 * time.Second)
// 10002 - Monitor Relay List
relayTags := nostr.Tags{}
for _, t := range publishRelays {
fmt.Println(t)
relayTags = relayTags.AppendUnique(nostr.Tag{"r", t, "write"})
}
relayListEv := nostr.Event{
PubKey: pub,
CreatedAt: nostr.Timestamp(time.Now().Unix()),
Kind: 10002,
Tags: relayTags,
Content: "",
}
fmt.Println(publishRelays)
fmt.Println(relayTags)
relayListEv.Sign(iConfig.PrivateKey)
err = publishEv(relayListEv, publishRelays)
if err != nil {
fmt.Printf("Error publishing kind 10002: %s\n", err)
} else {
fmt.Printf("published monitor relayList kind:10002 to %v\n", publishRelays)
}
time.Sleep(1 * time.Second)
// Publish to Nostr
// 10166 - Monitor Profile
profileTags := nostr.Tags{
nostr.Tag{"url"},
nostr.Tag{"frequency", useFrequencySecondsString},
nostr.Tag{"o", pub},
nostr.Tag{"k", "30066"},
nostr.Tag{"c", "open"},
nostr.Tag{"c", "read"},
nostr.Tag{"timeout", "5000", "open"},
nostr.Tag{"timeout", "15000", "read"},
nostr.Tag{"timeout", "15000", "write"},
nostr.Tag{"G", iConfig.MonitorCountryCode, "countryCode"},
}
// for every geo tag, encode all lesser precisions also
monitorGeo := geohash.EncodeWithPrecision(iConfig.MonitorLatitude, iConfig.MonitorLongitude, 9)
fmt.Println("monitor geohash was: ", monitorGeo)
for i := 1; i < 9; i++ {
profileTags = profileTags.AppendUnique(nostr.Tag{"g", monitorGeo[:i]})
}
ev := nostr.Event{
PubKey: pub,
CreatedAt: nostr.Timestamp(time.Now().Unix()),
Kind: 10166,
Tags: profileTags,
Content: "",
}
ev.Sign(iConfig.PrivateKey)
err = publishEv(ev, publishRelays)
if err != nil {
fmt.Printf("Error publishing kind 10166: %s\n", err)
} else {
fmt.Printf("published monitor registration profile kind:10166 to %v\n", publishRelays)
}
}
//FOR EACH RELAY
for relayIdx, entry := range relayEntries {
u := entry.URL
// Normalize the URL for consistent d tag usage
normalizedURL, err := normalizeURL(u)
if err != nil {
fmt.Printf("Error normalizing URL %s: %s\n", u, err)
continue
}
// fetch NIP11 document
theseTags := nostr.Tags{}
nip11Info, err := nip11.Fetch(context.Background(), u)
gotNip11 := true
if err != nil {
fmt.Printf("Error fetching NIP11 document: %s\n", err)
gotNip11 = false
}
if gotNip11 {
for _, t := range nip11Info.SupportedNIPs {
// Convert interface{} to int (SupportedNIPs contains float64 values)
if nipNum, ok := t.(float64); ok {
theseTags = theseTags.AppendUnique(nostr.Tag{"N", fmt.Sprintf("%d", int(nipNum))})
}
}
/*
if nip11Info.Limitation.PaymentRequired {
theseTags = theseTags.AppendUnique(nostr.Tag{"R", "payment"})
} else {
theseTags = theseTags.AppendUnique(nostr.Tag{"R", "!payment"})
}
if nip11Info.Limitation.AuthRequired {
theseTags = theseTags.AppendUnique(nostr.Tag{"R", "auth"})
} else {
theseTags = theseTags.AppendUnique(nostr.Tag{"R", "!auth"})
}
// relay_countries (it's in nip11, could be used for geotags)
if len(nip11Info.RelayCountries) > 0 {
for _, c := range nip11Info.RelayCountries {
theseTags = theseTags.AppendUnique(nostr.Tag{"G", c})
}
}
// general tags
if len(nip11Info.Tags) > 0 {
for _, t := range nip11Info.Tags {
theseTags = theseTags.AppendUnique(nostr.Tag{"t", t})
}
}
theseTags = theseTags.AppendUnique(nostr.Tag{"d", u})
// Todo:
//// don't need these but maybe
// accepted kinds?
// fees? probably don't need this
// restricted writes? that's new..
// language tags?
*/
}
// stagger the requests for multiple relays
if relayIdx > 0 {
time.Sleep(time.Duration(relayIdx) * 2 * time.Second)
}
go func(relayURL string, normalizedRelayURL string, relayTags nostr.Tags, relayType string) {
ticker := time.NewTicker(useFrequency)
defer ticker.Stop()
parsedUrl, err := url.Parse(relayURL)
if err != nil {
fmt.Printf("fatal error: unable to parse url: %s, %s\n", relayURL, err)
os.Exit(1)
}
for {
t := time.Now()
msg := "[\"REQ\", \"1234abcdping\", {\"kinds\": [1], \"limit\": 1}]"
whatTime := t
result, _, err := wsstat.MeasureLatency(parsedUrl, msg, http.Header{})
if err != nil {
fmt.Println("ERROR OCCURRED: ", err)
<-ticker.C
continue
}
fmt.Printf("Collecting data for %s at %s. total latency %dms\n", relayURL, t, result.TotalTime.Milliseconds())
if influxEnabled {
point := influxdb2.NewPoint(
iConfig.InfluxMeasurement,
map[string]string{
"relay": relayURL,
"monitor": iConfig.MonitorName,
},
map[string]interface{}{
"dnslookup": result.DNSLookup.Milliseconds(),
"tcpconnection": result.TCPConnection.Milliseconds(),
"tlshandshake": result.TLSHandshake.Milliseconds(),
"wshandshake": result.WSHandshake.Milliseconds(),
"wsrtt": result.MessageRoundTrip.Milliseconds(),
"totaltime": result.TotalTime.Milliseconds(),
},
whatTime,
)
// write asynchronously
writeAPI.WritePoint(point)
}
openConnMs := result.DNSLookup.Milliseconds() + result.TCPConnection.Milliseconds() + result.TLSHandshake.Milliseconds() + result.WSHandshake.Milliseconds()
openConnString := fmt.Sprintf("%d", openConnMs)
openConnReadString := fmt.Sprintf("%d", result.MessageRoundTrip.Milliseconds())
newTags := nostr.Tags{}
for _, tag := range relayTags {
newTags = newTags.AppendUnique(tag)
}
newTags = newTags.AppendUnique(nostr.Tag{"d", normalizedRelayURL})
// for every geo tag, encode all lesser precisions also
fullGeo := geohash.EncodeWithPrecision(iConfig.RelayLatitude, iConfig.RelayLongitude, 9)
for i := 1; i < 9; i++ {
newTags = newTags.AppendUnique(nostr.Tag{"g", fullGeo[:i]})
}
newTags = newTags.AppendUnique(nostr.Tag{"rtt-open", openConnString})
newTags = newTags.AppendUnique(nostr.Tag{"rtt-read", openConnReadString})
newTags = newTags.AppendUnique(nostr.Tag{"other", "network", "clearnet"})
if relayType != "" {
if ValidRelayTypes[relayType] {
newTags = newTags.AppendUnique(nostr.Tag{"T", relayType})
} else {
fmt.Printf("Warn: unknown relay type %q, skipping T tag\n", relayType)
}
}
if iConfig.Publish {
// Publish to Nostr stats/kind 30166
ev := nostr.Event{
PubKey: pub,
CreatedAt: nostr.Timestamp(whatTime.Unix()),
Kind: 30166,
Tags: newTags,
Content: "",
}
ev.Sign(iConfig.PrivateKey)
publishEv(ev, publishRelays)
}
<-ticker.C
}
}(u, normalizedURL, theseTags, entry.Type)
}
select {}
}