-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinflux.go
More file actions
135 lines (113 loc) · 3.5 KB
/
influx.go
File metadata and controls
135 lines (113 loc) · 3.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package ml
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"time"
coreio "dappco.re/go/core/io"
coreerr "dappco.re/go/core/log"
)
// InfluxClient talks to an InfluxDB v3 instance.
type InfluxClient struct {
url string
db string
token string
}
// NewInfluxClient creates an InfluxClient for the given URL and database.
// Reads token from INFLUX_TOKEN env var first, then ~/.influx_token file.
// If url is empty, defaults to "http://10.69.69.165:8181".
// If db is empty, defaults to "training".
func NewInfluxClient(url, db string) *InfluxClient {
if url == "" {
url = "http://10.69.69.165:8181"
}
if db == "" {
db = "training"
}
token := os.Getenv("INFLUX_TOKEN")
if token == "" {
home, err := os.UserHomeDir()
if err == nil {
data, err := coreio.Local.Read(filepath.Join(home, ".influx_token"))
if err == nil {
token = strings.TrimSpace(string(data))
}
}
}
return &InfluxClient{
url: url,
db: db,
token: token,
}
}
// WriteLp writes line protocol data to InfluxDB.
func (c *InfluxClient) WriteLp(lines []string) error {
body := strings.Join(lines, "\n")
url := fmt.Sprintf("%s/api/v3/write_lp?db=%s", c.url, c.db)
req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(body))
if err != nil {
return coreerr.E("ml.InfluxClient.WriteLp", "create write request", err)
}
req.Header.Set("Authorization", "Bearer "+c.token)
req.Header.Set("Content-Type", "text/plain")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return coreerr.E("ml.InfluxClient.WriteLp", "write request", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
respBody, _ := io.ReadAll(resp.Body)
return coreerr.E("ml.InfluxClient.WriteLp", fmt.Sprintf("write failed %d: %s", resp.StatusCode, string(respBody)), nil)
}
return nil
}
// QuerySQL runs a SQL query against InfluxDB and returns the result rows.
func (c *InfluxClient) QuerySQL(sql string) ([]map[string]any, error) {
reqBody := map[string]string{
"db": c.db,
"q": sql,
}
jsonBody, err := json.Marshal(reqBody)
if err != nil {
return nil, coreerr.E("ml.InfluxClient.QuerySQL", "marshal query request", err)
}
url := c.url + "/api/v3/query_sql"
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(jsonBody))
if err != nil {
return nil, coreerr.E("ml.InfluxClient.QuerySQL", "create query request", err)
}
req.Header.Set("Authorization", "Bearer "+c.token)
req.Header.Set("Content-Type", "application/json")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return nil, coreerr.E("ml.InfluxClient.QuerySQL", "query request", err)
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, coreerr.E("ml.InfluxClient.QuerySQL", "read query response", err)
}
if resp.StatusCode != http.StatusOK {
return nil, coreerr.E("ml.InfluxClient.QuerySQL", fmt.Sprintf("query failed %d: %s", resp.StatusCode, string(respBody)), nil)
}
var rows []map[string]any
if err := json.Unmarshal(respBody, &rows); err != nil {
return nil, coreerr.E("ml.InfluxClient.QuerySQL", "unmarshal query response", err)
}
return rows, nil
}
// EscapeLp escapes spaces, commas, and equals signs for InfluxDB line protocol
// tag values.
func EscapeLp(s string) string {
s = strings.ReplaceAll(s, `,`, `\,`)
s = strings.ReplaceAll(s, `=`, `\=`)
s = strings.ReplaceAll(s, ` `, `\ `)
return s
}