Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.
// Copyright 2025 Datadog, Inc.

package main

import (
"net"
"strconv"

gocontrolplane "github.com/DataDog/dd-trace-go/contrib/envoyproxy/go-control-plane/v2"
"github.com/DataDog/dd-trace-go/v2/instrumentation"
"github.com/DataDog/dd-trace-go/v2/instrumentation/env"
)

Expand All @@ -17,13 +19,16 @@ import (
func intEnv(key string, def int) int {
vv, ok := env.Lookup(key)
if !ok {
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, def, instrumentation.TelemetryOriginDefault)
return def
}
v, err := strconv.Atoi(vv)
if err != nil {
log.Warn("Non-integer value for env var %s, defaulting to %d. Parse failed with error: %v", key, def, err)
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, def, instrumentation.TelemetryOriginDefault)
return def
}
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, def, instrumentation.TelemetryOriginEnvVar)
return v
}

Expand All @@ -39,22 +44,25 @@ func intEnvNil(key string) *int {
log.Warn("Non-integer value for env var %s. Parse failed with error: %v", key, err)
return nil
}
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, &v, instrumentation.TelemetryOriginEnvVar)
return &v
}

// IpEnv returns the valid IP value of an environment variable, or def otherwise.
func ipEnv(key string, def net.IP) net.IP {
vv, ok := env.Lookup(key)
if !ok {
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, def.String(), instrumentation.TelemetryOriginDefault)
return def
}

ip := net.ParseIP(vv)
if ip == nil {
log.Warn("Non-IP value for env var %s, defaulting to %s", key, def.String())
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, def.String(), instrumentation.TelemetryOriginDefault)
return def
}

gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, vv, instrumentation.TelemetryOriginEnvVar)
return ip
}

Expand All @@ -63,13 +71,16 @@ func ipEnv(key string, def net.IP) net.IP {
func boolEnv(key string, def bool) bool {
vv, ok := env.Lookup(key)
if !ok {
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, def, instrumentation.TelemetryOriginDefault)
return def
}
v, err := strconv.ParseBool(vv)
if err != nil {
log.Warn("Non-boolean value for env var %s, defaulting to %t. Parse failed with error: %v", key, def, err)
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, def, instrumentation.TelemetryOriginDefault)
return def
}
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, v, instrumentation.TelemetryOriginEnvVar)
return v
}

Expand All @@ -78,7 +89,9 @@ func boolEnv(key string, def bool) bool {
func stringEnv(key, def string) string {
v, ok := env.Lookup(key)
if !ok {
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, v, instrumentation.TelemetryOriginDefault)
return def
}
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, v, instrumentation.TelemetryOriginEnvVar)
return v
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,16 @@ func getDefaultEnvVars() map[string]string {
// initializeEnvironment sets up required environment variables with their defaults
func initializeEnvironment() {
for k, v := range getDefaultEnvVars() {
if env.Get(k) == "" {
setValue := env.Get(k)
if setValue == "" {
if err := os.Setenv(k, v); err != nil {
log.Error("service_extension: failed to set %s environment variable: %s\n", k, err.Error())
continue
}
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(k, v, instrumentation.TelemetryOriginDefault)
continue
}
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(k, setValue, instrumentation.TelemetryOriginEnvVar)
}
}

Expand Down
5 changes: 5 additions & 0 deletions contrib/envoyproxy/go-control-plane/envoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ func init() {
instr = instrumentation.Load(instrumentation.PackageEnvoyProxyGoControlPlane)
}

// Instrumentation returns the instrumentation.Instrumentation package instrumentation
func Instrumentation() *instrumentation.Instrumentation {
return instr
}

// Integration represents the proxy integration type that is used for the External Processing.
type Integration int

Expand Down
16 changes: 12 additions & 4 deletions contrib/haproxy/stream-processing-offload/cmd/spoa/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,45 @@ package main

import (
"net"
"os"
"strconv"

streamprocessingoffload "github.com/DataDog/dd-trace-go/contrib/haproxy/stream-processing-offload/v2"
"github.com/DataDog/dd-trace-go/v2/instrumentation"
"github.com/DataDog/dd-trace-go/v2/instrumentation/env"
)

// IntEnv returns the parsed int value of an environment variable, or
// def otherwise.
func intEnv(key string, def int) int {
vv, ok := os.LookupEnv(key)
vv, ok := env.Lookup(key)
if !ok {
streamprocessingoffload.Instrumentation().TelemetryRegisterAppConfig(key, def, instrumentation.TelemetryOriginDefault)
return def
}
v, err := strconv.Atoi(vv)
if err != nil {
log.Warn("Non-integer value for env var %s, defaulting to %d. Parse failed with error: %v", key, def, err)
streamprocessingoffload.Instrumentation().TelemetryRegisterAppConfig(key, def, instrumentation.TelemetryOriginDefault)
return def
}
streamprocessingoffload.Instrumentation().TelemetryRegisterAppConfig(key, def, instrumentation.TelemetryOriginEnvVar)
return v
}

// IpEnv returns the valid IP value of an environment variable, or def otherwise.
func ipEnv(key string, def net.IP) net.IP {
vv, ok := os.LookupEnv(key)
vv, ok := env.Lookup(key)
if !ok {
streamprocessingoffload.Instrumentation().TelemetryRegisterAppConfig(key, def.String(), instrumentation.TelemetryOriginDefault)
return def
}

ip := net.ParseIP(vv)
if ip == nil {
log.Warn("Non-IP value for env var %s, defaulting to %s", key, def.String())
streamprocessingoffload.Instrumentation().TelemetryRegisterAppConfig(key, def.String(), instrumentation.TelemetryOriginDefault)
return def
}

streamprocessingoffload.Instrumentation().TelemetryRegisterAppConfig(key, vv, instrumentation.TelemetryOriginEnvVar)
return ip
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Logger struct {

// NewLogger creates a new Logger instance
func NewLogger() *Logger {
return &Logger{streamprocessingoffload.Logger()}
return &Logger{streamprocessingoffload.Instrumentation().Logger()}
}

func (l Logger) Errorf(format string, args ...interface{}) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"syscall"
"time"

"github.com/DataDog/dd-trace-go/v2/instrumentation/env"
"github.com/negasus/haproxy-spoe-go/agent"

"github.com/DataDog/dd-trace-go/contrib/haproxy/stream-processing-offload/v2"
Expand Down Expand Up @@ -47,11 +48,16 @@ func getDefaultEnvVars() map[string]string {
// initializeEnvironment sets up required environment variables with their defaults
func initializeEnvironment() {
for k, v := range getDefaultEnvVars() {
if os.Getenv(k) == "" {
setValue := env.Get(k)
if setValue == "" {
if err := os.Setenv(k, v); err != nil {
log.Error("haproxy_spoa: failed to set %s environment variable: %s\n", k, err.Error())
continue
}
streamprocessingoffload.Instrumentation().TelemetryRegisterAppConfig(k, v, instrumentation.TelemetryOriginDefault)
continue
}
streamprocessingoffload.Instrumentation().TelemetryRegisterAppConfig(k, setValue, instrumentation.TelemetryOriginEnvVar)
}
}

Expand Down
6 changes: 3 additions & 3 deletions contrib/haproxy/stream-processing-offload/haproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ func init() {
instr = instrumentation.Load(instrumentation.PackageHAProxyStreamProcessingOffload)
}

// Logger returns the integration logger for the HAProxy Stream Processing Offload package
func Logger() instrumentation.Logger {
return instr.Logger()
// Instrumentation returns the instrumentation.Instrumentation package instrumentation
func Instrumentation() *instrumentation.Instrumentation {
return instr
}

// HAProxySPOA defines the AppSec HAProxy Stream Processing Offload Agent
Expand Down
10 changes: 5 additions & 5 deletions instrumentation/appsec/proxy/message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (mp *Processor) OnRequestHeaders(ctx context.Context, req RequestHeaders) (
if bodyLimit <= 0 {
mp.instr.Logger().Info("external_processing: body parsing size limit set to 0 or negative. The request and response bodies will NOT be analyzed.")
}
RegisterConfig(mp)
mp.instr.Logger().Info("external_processing: first request received. Configuration: BlockingUnavailable=%v, BodyParsingSizeLimit=%dB, Framework=%s", mp.BlockingUnavailable, mp.computedBodyParsingSizeLimit.Load(), mp.Framework)
})

Expand Down Expand Up @@ -109,7 +110,6 @@ func (mp *Processor) OnRequestHeaders(ctx context.Context, req RequestHeaders) (

if !req.GetEndOfStream() && mp.isBodySupported(httpRequest.Header.Get("Content-Type")) {
reqState.State = MessageTypeRequestBody
// Todo: Set telemetry body size (using content-length)
}

if err := mp.ContinueMessageFunc(reqState.Context, ContinueActionOptions{
Expand Down Expand Up @@ -143,7 +143,7 @@ func (mp *Processor) OnRequestBody(req HTTPBody, reqState *RequestState) error {
return mp.ContinueMessageFunc(reqState.Context, ContinueActionOptions{MessageType: MessageTypeRequestBody})
}

blocked := processBody(reqState.Context, reqState.requestBuffer, req.GetBody(), req.GetEndOfStream(), appsec.MonitorParsedHTTPBody)
blocked := processBody(reqState.Context, reqState.requestBuffer, req.GetBody(), req.GetEndOfStream(), appsec.MonitorParsedHTTPBody, "request")
if blocked != nil && !mp.BlockingUnavailable {
mp.instr.Logger().Debug("external_processing: request blocked, end the stream")
actionOpts := reqState.BlockAction()
Expand Down Expand Up @@ -187,7 +187,6 @@ func (mp *Processor) OnResponseHeaders(res ResponseHeaders, reqState *RequestSta
}
}

// TODO: Set telemetry body size (using content-length)
reqState.State = MessageTypeResponseBody

// Run the waf on the response headers only when we are sure to not receive a response body
Expand Down Expand Up @@ -226,7 +225,7 @@ func (mp *Processor) OnResponseBody(resp HTTPBody, reqState *RequestState) error
return io.EOF
}

blocked := processBody(reqState.Context, reqState.responseBuffer, resp.GetBody(), resp.GetEndOfStream(), appsec.MonitorHTTPResponseBody)
blocked := processBody(reqState.Context, reqState.responseBuffer, resp.GetBody(), resp.GetEndOfStream(), appsec.MonitorHTTPResponseBody, "response")
if reqState.responseBuffer.analyzed {
reqState.Close() // Call Close to ensure the response headers are analyzed

Expand Down Expand Up @@ -260,14 +259,15 @@ func (mp *Processor) OnResponseTrailers(reqState *RequestState) error {
return mp.ContinueMessageFunc(reqState.Context, ContinueActionOptions{MessageType: MessageTypeResponseTrailers})
}

func processBody(ctx context.Context, bodyBuffer *bodyBuffer, body []byte, eos bool, analyzeBody func(ctx context.Context, encodable any) error) error {
func processBody(ctx context.Context, bodyBuffer *bodyBuffer, body []byte, eos bool, analyzeBody func(ctx context.Context, encodable any) error, direction string) error {
if bodyBuffer.analyzed {
return nil
}

bodyBuffer.append(body)

if eos || bodyBuffer.truncated {
EmitBodySize(len(bodyBuffer.buffer), direction, bodyBuffer.truncated)
bodyBuffer.analyzed = true
return analyzeBody(ctx, json.NewEncodableFromData(bodyBuffer.buffer, bodyBuffer.truncated))
}
Expand Down
17 changes: 17 additions & 0 deletions instrumentation/appsec/proxy/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ package proxy

import (
"context"
"strconv"
"sync/atomic"
"time"

"github.com/DataDog/dd-trace-go/v2/instrumentation"
"github.com/DataDog/dd-trace-go/v2/internal/telemetry"
)

type metrics struct {
Expand Down Expand Up @@ -44,3 +46,18 @@ func newMetricsReporter(ctx context.Context, logger instrumentation.Logger) *met
func (m *metrics) incrementRequestCount() {
m.requestCounter.Add(1)
}

func EmitBodySize(bodySize int, direction string, truncated bool) {
telemetry.Distribution(telemetry.NamespaceAppSec, "instrum.body_size", []string{
"direction:" + direction,
"truncated:" + strconv.FormatBool(truncated),
}).Submit(float64(bodySize))
}

func RegisterConfig(mp *Processor) {
telemetry.RegisterAppConfigs(
telemetry.Configuration{Name: "appsec.proxy.blockingUnavailable", Value: mp.BlockingUnavailable},
telemetry.Configuration{Name: "appsec.proxy.bodyParsingSizeLimit", Value: mp.computedBodyParsingSizeLimit.Load()},
telemetry.Configuration{Name: "appsec.proxy.framework", Value: mp.Framework},
)
}
Loading
Loading