Skip to content

Commit 7489eb0

Browse files
authored
Merge pull request observatorium#831 from jimdaga/SREP-337
Extend Observatorium API with Probes Endpoint
2 parents 8d34ac9 + bc497f2 commit 7489eb0

File tree

9 files changed

+708
-4
lines changed

9 files changed

+708
-4
lines changed

README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,26 @@ Usage of ./observatorium-api:
151151
The gRPC Server Address against which to run rate limit checks when the rate limits are specified for a given tenant. If not specified, local, non-shared rate limiting will be used. Has precedence over other rate limiter options.
152152
-middleware.rate-limiter.type string
153153
The type of rate limiter to use when not using a gRPC rate limiter. Options: 'local' (default), 'redis' (leaky bucket algorithm). (default "local")
154+
-probes.dial-timeout duration
155+
The timeout for establishing connections to the probes upstream. (default 30s)
156+
-probes.endpoint string
157+
The endpoint against which to make HTTP requests for probes.
158+
-probes.keep-alive-timeout duration
159+
The keep-alive timeout for connections to the probes upstream. (default 30s)
160+
-probes.tenant-header string
161+
The name of the HTTP header containing the tenant ID to forward to the probes upstream. (default "X-Tenant")
162+
-probes.tls-handshake-timeout duration
163+
The TLS handshake timeout for connections to the probes upstream. (default 10s)
164+
-probes.tls.ca-file string
165+
File containing the TLS CA against which to upstream probes servers. Leave blank to disable TLS.
166+
-probes.tls.cert-file string
167+
File containing the TLS client certificates to authenticate against upstream probes servers. Leave blank to disable mTLS.
168+
-probes.tls.key-file string
169+
File containing the TLS client key to authenticate against upstream probes servers. Leave blank to disable mTLS.
170+
-probes.tls.watch-certs
171+
Watch for certificate changes and reload
172+
-probes.write-timeout duration
173+
The HTTP write timeout for proxied requests to the probes endpoint. Defaults to the server's write timeout. (default 12m0s)
154174
-rbac.config string
155175
Path to the RBAC configuration file. (default "rbac.yaml")
156176
-server.read-header-timeout duration

api/probes/v1/http.go

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
package v1
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
"net"
9+
"net/http"
10+
"net/http/httputil"
11+
"net/url"
12+
"time"
13+
14+
"github.com/go-chi/chi"
15+
"github.com/go-kit/log"
16+
"github.com/go-kit/log/level"
17+
"github.com/observatorium/api/authentication"
18+
"github.com/observatorium/api/tls"
19+
)
20+
21+
type handlerOptions struct {
22+
logger log.Logger
23+
tenantHeader string
24+
tlsOptions *tls.UpstreamOptions
25+
dialTimeout time.Duration
26+
keepAliveTimeout time.Duration
27+
tlsHandshakeTimeout time.Duration
28+
readMiddlewares []func(http.Handler) http.Handler
29+
writeMiddlewares []func(http.Handler) http.Handler
30+
}
31+
32+
// HandlerOption is a function that configures the handler.
33+
type HandlerOption func(*handlerOptions)
34+
35+
// WithLogger sets the logger for the handler.
36+
func WithLogger(l log.Logger) HandlerOption {
37+
return func(o *handlerOptions) {
38+
o.logger = l
39+
}
40+
}
41+
42+
// WithTenantHeader sets the tenant header for the handler.
43+
func WithTenantHeader(h string) HandlerOption {
44+
return func(o *handlerOptions) {
45+
o.tenantHeader = h
46+
}
47+
}
48+
49+
// WithUpstreamTLSOptions sets the upstream TLS options for the handler.
50+
func WithUpstreamTLSOptions(opts *tls.UpstreamOptions) HandlerOption {
51+
return func(o *handlerOptions) {
52+
o.tlsOptions = opts
53+
}
54+
}
55+
56+
// WithDialTimeout sets the dial timeout for upstream connections.
57+
func WithDialTimeout(timeout time.Duration) HandlerOption {
58+
return func(o *handlerOptions) {
59+
o.dialTimeout = timeout
60+
}
61+
}
62+
63+
// WithKeepAliveTimeout sets the keep-alive timeout for upstream connections.
64+
func WithKeepAliveTimeout(timeout time.Duration) HandlerOption {
65+
return func(o *handlerOptions) {
66+
o.keepAliveTimeout = timeout
67+
}
68+
}
69+
70+
// WithTLSHandshakeTimeout sets the TLS handshake timeout for upstream connections.
71+
func WithTLSHandshakeTimeout(timeout time.Duration) HandlerOption {
72+
return func(o *handlerOptions) {
73+
o.tlsHandshakeTimeout = timeout
74+
}
75+
}
76+
77+
// WithReadMiddleware adds a middleware for read operations.
78+
func WithReadMiddleware(m func(http.Handler) http.Handler) HandlerOption {
79+
return func(o *handlerOptions) {
80+
o.readMiddlewares = append(o.readMiddlewares, m)
81+
}
82+
}
83+
84+
// WithWriteMiddleware adds a middleware for write operations.
85+
func WithWriteMiddleware(m func(http.Handler) http.Handler) HandlerOption {
86+
return func(o *handlerOptions) {
87+
o.writeMiddlewares = append(o.writeMiddlewares, m)
88+
}
89+
}
90+
91+
// NewHandler creates a new handler for the probes API.
92+
func NewHandler(downstream *url.URL, opts ...HandlerOption) (http.Handler, error) {
93+
options := &handlerOptions{
94+
logger: log.NewNopLogger(),
95+
dialTimeout: 30 * time.Second,
96+
keepAliveTimeout: 30 * time.Second,
97+
tlsHandshakeTimeout: 10 * time.Second,
98+
}
99+
for _, o := range opts {
100+
o(options)
101+
}
102+
103+
r := chi.NewRouter()
104+
proxy := httputil.NewSingleHostReverseProxy(downstream)
105+
106+
proxy.Transport = &http.Transport{
107+
Proxy: http.ProxyFromEnvironment,
108+
DialContext: (&net.Dialer{
109+
Timeout: options.dialTimeout,
110+
KeepAlive: options.keepAliveTimeout,
111+
}).DialContext,
112+
TLSHandshakeTimeout: options.tlsHandshakeTimeout,
113+
TLSClientConfig: options.tlsOptions.NewClientConfig(),
114+
}
115+
116+
if options.tenantHeader != "" {
117+
originalDirector := proxy.Director
118+
proxy.Director = func(req *http.Request) {
119+
originalDirector(req)
120+
tenant, ok := authentication.GetTenant(req.Context())
121+
if !ok {
122+
level.Warn(options.logger).Log("msg", "could not find tenant in request context for proxy")
123+
return
124+
}
125+
126+
// Set tenant header
127+
req.Header.Set(options.tenantHeader, tenant)
128+
129+
// Inject tenant label into POST request body.
130+
if req.Method == http.MethodPost {
131+
if req.Body == nil {
132+
return // Nothing to do
133+
}
134+
135+
bodyBytes, err := io.ReadAll(req.Body)
136+
if err != nil {
137+
level.Error(options.logger).Log("msg", "failed to read request body", "err", err)
138+
req.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) // Restore body
139+
return
140+
}
141+
// Restore body since it's been consumed
142+
req.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
143+
144+
var payload map[string]interface{}
145+
if err := json.Unmarshal(bodyBytes, &payload); err != nil {
146+
level.Warn(options.logger).Log("msg", "failed to unmarshal JSON body, forwarding unmodified", "err", err)
147+
return
148+
}
149+
150+
labels, ok := payload["labels"].(map[string]interface{})
151+
if !ok {
152+
labels = make(map[string]interface{})
153+
}
154+
155+
labels["tenant"] = tenant
156+
payload["labels"] = labels
157+
158+
newBodyBytes, err := json.Marshal(payload)
159+
if err != nil {
160+
level.Error(options.logger).Log("msg", "failed to marshal modified JSON body, forwarding unmodified", "err", err)
161+
return
162+
}
163+
164+
req.Body = io.NopCloser(bytes.NewBuffer(newBodyBytes))
165+
req.ContentLength = int64(len(newBodyBytes))
166+
req.Header.Set("Content-Length", fmt.Sprint(len(newBodyBytes)))
167+
}
168+
}
169+
}
170+
proxyHandler := http.HandlerFunc(proxy.ServeHTTP)
171+
172+
r.Group(func(r chi.Router) {
173+
r.Use(options.readMiddlewares...)
174+
r.Get("/*", proxyHandler)
175+
})
176+
177+
r.Group(func(r chi.Router) {
178+
r.Use(options.writeMiddlewares...)
179+
r.Post("/*", proxyHandler)
180+
r.Patch("/*", proxyHandler)
181+
r.Delete("/*", proxyHandler)
182+
})
183+
184+
return r, nil
185+
}

main.go

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
logsv1 "github.com/observatorium/api/api/logs/v1"
5151
metricslegacy "github.com/observatorium/api/api/metrics/legacy"
5252
metricsv1 "github.com/observatorium/api/api/metrics/v1"
53+
probesv1 "github.com/observatorium/api/api/probes/v1"
5354
tracesv1 "github.com/observatorium/api/api/traces/v1"
5455
"github.com/observatorium/api/authentication"
5556
"github.com/observatorium/api/authorization"
@@ -102,6 +103,7 @@ type config struct {
102103
metrics metricsConfig
103104
logs logsConfig
104105
traces tracesConfig
106+
probes probesConfig
105107
middleware middlewareConfig
106108
internalTracing internalTracingConfig
107109
}
@@ -194,6 +196,21 @@ type tracesConfig struct {
194196
enableCertWatcher bool
195197
}
196198

199+
type probesConfig struct {
200+
endpoint *url.URL
201+
upstreamWriteTimeout time.Duration
202+
dialTimeout time.Duration
203+
keepAliveTimeout time.Duration
204+
tlsHandshakeTimeout time.Duration
205+
upstreamCAFile string
206+
upstreamCertFile string
207+
upstreamKeyFile string
208+
tenantHeader string
209+
// enable probes if endpoint is provided.
210+
enabled bool
211+
enableCertWatcher bool
212+
}
213+
197214
type middlewareConfig struct {
198215
grpcRateLimiterAddress string
199216
rateLimiterType string
@@ -286,9 +303,9 @@ func main() {
286303

287304
stdlog.Println(version.Info())
288305

289-
if !cfg.metrics.enabled && !cfg.logs.enabled && !cfg.traces.enabled {
290-
stdlog.Fatal("Neither logging, metrics not traces endpoints are enabled. " +
291-
"Specifying at least a logging or a metrics endpoint is mandatory")
306+
if !cfg.metrics.enabled && !cfg.logs.enabled && !cfg.traces.enabled && !cfg.probes.enabled {
307+
stdlog.Fatal("Neither logging, metrics, traces, nor probes endpoints are enabled. " +
308+
"Specifying at least one endpoint is mandatory")
292309
}
293310

294311
logger := logger.NewLogger(cfg.logLevel, cfg.logFormat, cfg.debug.name)
@@ -669,6 +686,51 @@ func main() {
669686
metricslegacy.WithUIMiddleware(authorization.WithAuthorizers(authorizers, rbac.Read, "metrics")),
670687
))
671688

689+
// enable probes if endpoint is provided.
690+
// Since probes are part of the metrics API, we mount them within the metrics route group.
691+
if cfg.probes.enabled {
692+
var loadInterval *time.Duration
693+
if cfg.probes.enableCertWatcher {
694+
loadInterval = &cfg.tls.reloadInterval
695+
}
696+
697+
probesUpstreamClientOptions, err := tls.NewUpstreamOptions(
698+
context.Background(),
699+
cfg.probes.upstreamCertFile,
700+
cfg.probes.upstreamKeyFile,
701+
cfg.probes.upstreamCAFile,
702+
loadInterval,
703+
logger,
704+
g,
705+
)
706+
if err != nil {
707+
stdlog.Fatalf("failed to read upstream probes TLS: %v", err)
708+
}
709+
710+
probesHandler, err := probesv1.NewHandler(
711+
cfg.probes.endpoint,
712+
probesv1.WithLogger(logger),
713+
probesv1.WithUpstreamTLSOptions(probesUpstreamClientOptions),
714+
probesv1.WithTenantHeader(cfg.probes.tenantHeader),
715+
probesv1.WithDialTimeout(cfg.probes.dialTimeout),
716+
probesv1.WithKeepAliveTimeout(cfg.probes.keepAliveTimeout),
717+
probesv1.WithTLSHandshakeTimeout(cfg.probes.tlsHandshakeTimeout),
718+
probesv1.WithReadMiddleware(authentication.WithTenantMiddlewares(pm.Middlewares)),
719+
probesv1.WithReadMiddleware(rateLimitMiddleware),
720+
probesv1.WithReadMiddleware(authorization.WithAuthorizers(authorizers, rbac.Read, "probes")),
721+
probesv1.WithWriteMiddleware(authentication.WithTenantMiddlewares(pm.Middlewares)),
722+
probesv1.WithWriteMiddleware(rateLimitMiddleware),
723+
probesv1.WithWriteMiddleware(authorization.WithAuthorizers(authorizers, rbac.Write, "probes")),
724+
)
725+
if err != nil {
726+
level.Error(logger).Log("msg", "failed to create probes handler", "err", err)
727+
} else {
728+
r.Mount("/api/metrics/v1/{tenant}/probes",
729+
stripTenantPrefix("/api/metrics/v1", probesHandler),
730+
)
731+
}
732+
}
733+
672734
const matchParamName = "match[]"
673735
r.Mount("/api/metrics/v1/{tenant}", metricsv1.NewHandler(
674736
eps,
@@ -1056,6 +1118,7 @@ func parseFlags() (config, error) {
10561118
rawTracesTempoEndpoint string
10571119
rawTracesWriteOTLPGRPCEndpoint string
10581120
rawTracesWriteOTLPHTTPEndpoint string
1121+
rawProbesEndpoint string
10591122
)
10601123

10611124
cfg := config{}
@@ -1167,6 +1230,26 @@ func parseFlags() (config, error) {
11671230
"The name of the HTTP header containing the tenant ID to forward to upstream OpenTelemetry collector.")
11681231
flag.BoolVar(&cfg.traces.queryRBAC, "traces.query-rbac", false,
11691232
"Enables query RBAC. A user will be able to see attributes only from namespaces it has access to. Only the spans with allowed k8s.namespace.name attribute are fully visible.")
1233+
flag.StringVar(&rawProbesEndpoint, "probes.endpoint", "",
1234+
"The endpoint against which to make HTTP requests for probes.")
1235+
flag.DurationVar(&cfg.probes.upstreamWriteTimeout, "probes.write-timeout", writeTimeout,
1236+
"The HTTP write timeout for proxied requests to the probes endpoint. Defaults to the server's write timeout.")
1237+
flag.StringVar(&cfg.probes.upstreamCAFile, "probes.tls.ca-file", "",
1238+
"File containing the TLS CA against which to upstream probes servers. Leave blank to disable TLS.")
1239+
flag.StringVar(&cfg.probes.upstreamCertFile, "probes.tls.cert-file", "",
1240+
"File containing the TLS client certificates to authenticate against upstream probes servers. Leave blank to disable mTLS.")
1241+
flag.StringVar(&cfg.probes.upstreamKeyFile, "probes.tls.key-file", "",
1242+
"File containing the TLS client key to authenticate against upstream probes servers. Leave blank to disable mTLS.")
1243+
flag.BoolVar(&cfg.probes.enableCertWatcher, "probes.tls.watch-certs", false,
1244+
"Watch for certificate changes and reload")
1245+
flag.StringVar(&cfg.probes.tenantHeader, "probes.tenant-header", "X-Tenant",
1246+
"The name of the HTTP header containing the tenant ID to forward to the probes upstream.")
1247+
flag.DurationVar(&cfg.probes.dialTimeout, "probes.dial-timeout", 30*time.Second,
1248+
"The timeout for establishing connections to the probes upstream.")
1249+
flag.DurationVar(&cfg.probes.keepAliveTimeout, "probes.keep-alive-timeout", 30*time.Second,
1250+
"The keep-alive timeout for connections to the probes upstream.")
1251+
flag.DurationVar(&cfg.probes.tlsHandshakeTimeout, "probes.tls-handshake-timeout", 10*time.Second,
1252+
"The TLS handshake timeout for connections to the probes upstream.")
11701253
flag.StringVar(&cfg.tls.serverCertFile, "tls.server.cert-file", "",
11711254
"File containing the default x509 Certificate for HTTPS. Leave blank to disable TLS.")
11721255
flag.StringVar(&cfg.tls.serverKeyFile, "tls.server.key-file", "",
@@ -1378,6 +1461,15 @@ func parseFlags() (config, error) {
13781461
cfg.traces.writeOTLPHTTPEndpoint = tracesOTLPHTTPEndpoint
13791462
}
13801463

1464+
if rawProbesEndpoint != "" {
1465+
cfg.probes.enabled = true
1466+
var err error
1467+
cfg.probes.endpoint, err = url.ParseRequestURI(rawProbesEndpoint)
1468+
if err != nil {
1469+
return cfg, fmt.Errorf("failed to parse probes endpoint: %w", err)
1470+
}
1471+
}
1472+
13811473
if cfg.traces.enabled && cfg.server.grpcListen == "" {
13821474
return cfg, fmt.Errorf("-traces.write.endpoint is set to %q but -grpc.listen is not set", cfg.traces.writeOTLPGRPCEndpoint)
13831475
}

test/config/rbac.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ roles:
44
- metrics
55
- logs
66
- traces
7+
- probes
78
tenants:
89
- test-oidc
910
permissions:
@@ -36,6 +37,14 @@ roles:
3637
permissions:
3738
- read
3839
- write
40+
- name: read-write-probes
41+
resources:
42+
- probes
43+
tenants:
44+
- test-probes
45+
permissions:
46+
- read
47+
- write
3948
roleBindings:
4049
- name: test-oidc
4150
roles:
@@ -56,3 +65,9 @@ roleBindings:
5665
subjects:
5766
- name: test
5867
kind: group
68+
- name: test-probes
69+
roles:
70+
- read-write-probes
71+
subjects:
72+
73+
kind: user

0 commit comments

Comments
 (0)