Skip to content

Commit a3225ba

Browse files
committed
add instrum.body_size metric
1 parent 3d45ade commit a3225ba

File tree

2 files changed

+241
-221
lines changed

2 files changed

+241
-221
lines changed

instrumentation/appsec/proxy/message_processor.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"errors"
1111
"fmt"
1212
"io"
13+
"strconv"
1314
"sync"
1415

1516
"github.com/DataDog/dd-trace-go/v2/appsec"
@@ -19,6 +20,7 @@ import (
1920
"github.com/DataDog/dd-trace-go/v2/instrumentation/httptrace"
2021
"github.com/DataDog/dd-trace-go/v2/internal/appsec/body"
2122
"github.com/DataDog/dd-trace-go/v2/internal/appsec/body/json"
23+
"github.com/DataDog/dd-trace-go/v2/internal/telemetry"
2224
)
2325

2426
// Processor is a state machine that handles incoming HTTP request and response is a streaming manner
@@ -100,7 +102,6 @@ func (mp *Processor) OnRequestHeaders(ctx context.Context, req RequestHeaders) (
100102

101103
if !req.GetEndOfStream() && mp.isBodySupported(httpRequest.Header.Get("Content-Type")) {
102104
reqState.State = MessageTypeRequestBody
103-
// Todo: Set telemetry body size (using content-length)
104105
}
105106

106107
if err := mp.ContinueMessageFunc(reqState.Context, ContinueActionOptions{
@@ -134,7 +135,7 @@ func (mp *Processor) OnRequestBody(req HTTPBody, reqState *RequestState) error {
134135
return mp.ContinueMessageFunc(reqState.Context, ContinueActionOptions{MessageType: MessageTypeRequestBody})
135136
}
136137

137-
blocked := processBody(reqState.Context, reqState.requestBuffer, req.GetBody(), req.GetEndOfStream(), appsec.MonitorParsedHTTPBody)
138+
blocked := processBody(reqState.Context, reqState.requestBuffer, req.GetBody(), req.GetEndOfStream(), appsec.MonitorParsedHTTPBody, "request")
138139
if blocked != nil && !mp.BlockingUnavailable {
139140
mp.instr.Logger().Debug("external_processing: request blocked, end the stream")
140141
actionOpts := reqState.BlockAction()
@@ -178,7 +179,6 @@ func (mp *Processor) OnResponseHeaders(res ResponseHeaders, reqState *RequestSta
178179
}
179180
}
180181

181-
// TODO: Set telemetry body size (using content-length)
182182
reqState.State = MessageTypeResponseBody
183183

184184
// Run the waf on the response headers only when we are sure to not receive a response body
@@ -217,7 +217,7 @@ func (mp *Processor) OnResponseBody(resp HTTPBody, reqState *RequestState) error
217217
return io.EOF
218218
}
219219

220-
blocked := processBody(reqState.Context, reqState.responseBuffer, resp.GetBody(), resp.GetEndOfStream(), appsec.MonitorHTTPResponseBody)
220+
blocked := processBody(reqState.Context, reqState.responseBuffer, resp.GetBody(), resp.GetEndOfStream(), appsec.MonitorHTTPResponseBody, "response")
221221
if reqState.responseBuffer.analyzed {
222222
reqState.Close() // Call Close to ensure the response headers are analyzed
223223

@@ -251,14 +251,19 @@ func (mp *Processor) OnResponseTrailers(reqState *RequestState) error {
251251
return mp.ContinueMessageFunc(reqState.Context, ContinueActionOptions{MessageType: MessageTypeResponseTrailers})
252252
}
253253

254-
func processBody(ctx context.Context, bodyBuffer *bodyBuffer, body []byte, eos bool, analyzeBody func(ctx context.Context, encodable any) error) error {
254+
func processBody(ctx context.Context, bodyBuffer *bodyBuffer, body []byte, eos bool, analyzeBody func(ctx context.Context, encodable any) error, direction string) error {
255255
if bodyBuffer.analyzed {
256256
return nil
257257
}
258258

259259
bodyBuffer.append(body)
260260

261261
if eos || bodyBuffer.truncated {
262+
telemetry.Distribution(telemetry.NamespaceAppSec, "instrum.body_size", []string{
263+
"direction:" + direction,
264+
"truncated:" + strconv.FormatBool(bodyBuffer.truncated),
265+
}).Submit(float64(len(bodyBuffer.buffer)))
266+
262267
bodyBuffer.analyzed = true
263268
return analyzeBody(ctx, json.NewEncodableFromData(bodyBuffer.buffer, bodyBuffer.truncated))
264269
}

0 commit comments

Comments
 (0)