Skip to content

Commit 2de21f9

Browse files
Pluggable BBR framework (WIP) -- iteration 2
1 parent 4d7f6d5 commit 2de21f9

File tree

6 files changed

+215
-58
lines changed

6 files changed

+215
-58
lines changed

cmd/bbr/runner/runner.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
3535
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/metrics"
3636
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/server"
37+
bbrutils "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/utils"
3738
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3839
)
3940

@@ -102,8 +103,20 @@ func (r *Runner) Run(ctx context.Context) error {
102103
return err
103104
}
104105

106+
//Initialize PluginRegistry and request/response PluginsChain instances
107+
registry, requestChain, responseChain, metaDataKeys, err := bbrutils.InitPlugins()
108+
if err != nil {
109+
setupLog.Error(err, "Failed to initialize plugins")
110+
return err
111+
}
112+
105113
// Setup runner.
106-
serverRunner := runserver.NewDefaultExtProcServerRunner(*grpcPort, *streaming)
114+
serverRunner := runserver.NewDefaultExtProcServerRunner(*grpcPort,
115+
*streaming,
116+
registry,
117+
requestChain,
118+
responseChain,
119+
metaDataKeys)
107120

108121
// Register health server.
109122
if err := registerHealthServer(mgr, *grpcHealthPort); err != nil {

pkg/bbr/handlers/request.go

Lines changed: 66 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,54 +18,49 @@ package handlers
1818

1919
import (
2020
"context"
21-
"encoding/json"
21+
"strings"
2222

2323
basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2424
eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
2525
"sigs.k8s.io/controller-runtime/pkg/log"
2626

2727
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/metrics"
28+
bbrplugins "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/plugins"
29+
helpers "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/utils"
2830
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
2931
)
3032

31-
const modelHeader = "X-Gateway-Model-Name"
32-
33-
type RequestBody struct {
34-
Model string `json:"model"`
35-
}
36-
3733
// HandleRequestBody handles request bodies.
3834
func (s *Server) HandleRequestBody(ctx context.Context, requestBodyBytes []byte) ([]*eppb.ProcessingResponse, error) {
3935
logger := log.FromContext(ctx)
4036
var ret []*eppb.ProcessingResponse
4137

42-
var requestBody RequestBody
43-
if err := json.Unmarshal(requestBodyBytes, &requestBody); err != nil {
44-
metrics.RecordModelNotParsedCounter()
45-
return nil, err
38+
allHeaders, mutatedBodyBytes, err := s.requestChain.Run(requestBodyBytes, s.metaDataKeys, s.registry)
39+
40+
if err != nil {
41+
//TODO: add metric in metrics.go to count "other errors"
42+
logger.V(logutil.DEFAULT).Info("error processing body", "error", err)
43+
ret, _ := buildEmptyResponsesForMissingModel(s.streaming, requestBodyBytes)
44+
return ret, nil
45+
}
46+
47+
model, ok := allHeaders[bbrplugins.ModelHeader]
48+
49+
if !ok {
50+
//TODO: add metric in metrics.go to count "other errors"
51+
logger.V(logutil.DEFAULT).Info("manadatory header X-Gateway-Model-Name value is undetermined")
52+
ret, _ := buildEmptyResponsesForMissingModel(s.streaming, requestBodyBytes)
53+
return ret, nil
4654
}
4755

48-
if requestBody.Model == "" {
56+
if strings.TrimSpace(model) == "" {
4957
metrics.RecordModelNotInBodyCounter()
50-
logger.V(logutil.DEFAULT).Info("Request body does not contain model parameter")
51-
if s.streaming {
52-
ret = append(ret, &eppb.ProcessingResponse{
53-
Response: &eppb.ProcessingResponse_RequestHeaders{
54-
RequestHeaders: &eppb.HeadersResponse{},
55-
},
56-
})
57-
ret = addStreamedBodyResponse(ret, requestBodyBytes)
58-
return ret, nil
59-
} else {
60-
ret = append(ret, &eppb.ProcessingResponse{
61-
Response: &eppb.ProcessingResponse_RequestBody{
62-
RequestBody: &eppb.BodyResponse{},
63-
},
64-
})
65-
}
58+
ret, _ := buildEmptyResponsesForMissingModel(s.streaming, requestBodyBytes)
6659
return ret, nil
6760
}
6861

62+
logger.V(logutil.DEFAULT).Info("model extracted from request body", "model", model)
63+
6964
metrics.RecordSuccessCounter()
7065

7166
if s.streaming {
@@ -78,8 +73,8 @@ func (s *Server) HandleRequestBody(ctx context.Context, requestBodyBytes []byte)
7873
SetHeaders: []*basepb.HeaderValueOption{
7974
{
8075
Header: &basepb.HeaderValue{
81-
Key: modelHeader,
82-
RawValue: []byte(requestBody.Model),
76+
Key: bbrplugins.ModelHeader,
77+
RawValue: []byte(model),
8378
},
8479
},
8580
},
@@ -88,7 +83,10 @@ func (s *Server) HandleRequestBody(ctx context.Context, requestBodyBytes []byte)
8883
},
8984
},
9085
})
91-
ret = addStreamedBodyResponse(ret, requestBodyBytes)
86+
ret = addStreamedBodyResponse(ret, mutatedBodyBytes)
87+
88+
logger.V(logutil.DEFAULT).Info("RESPONSE", "response", helpers.PrettyPrintResponses(ret))
89+
9290
return ret, nil
9391
}
9492

@@ -103,28 +101,33 @@ func (s *Server) HandleRequestBody(ctx context.Context, requestBodyBytes []byte)
103101
SetHeaders: []*basepb.HeaderValueOption{
104102
{
105103
Header: &basepb.HeaderValue{
106-
Key: modelHeader,
107-
RawValue: []byte(requestBody.Model),
104+
Key: bbrplugins.ModelHeader,
105+
RawValue: []byte(model),
108106
},
109107
},
110108
},
111109
},
110+
BodyMutation: &eppb.BodyMutation{
111+
Mutation: &eppb.BodyMutation_Body{
112+
Body: mutatedBodyBytes,
113+
},
114+
},
112115
},
113116
},
114117
},
115118
},
116119
}, nil
117120
}
118121

119-
func addStreamedBodyResponse(responses []*eppb.ProcessingResponse, requestBodyBytes []byte) []*eppb.ProcessingResponse {
122+
func addStreamedBodyResponse(responses []*eppb.ProcessingResponse, mutatedBodyBytes []byte) []*eppb.ProcessingResponse {
120123
return append(responses, &eppb.ProcessingResponse{
121124
Response: &eppb.ProcessingResponse_RequestBody{
122125
RequestBody: &eppb.BodyResponse{
123126
Response: &eppb.CommonResponse{
124127
BodyMutation: &eppb.BodyMutation{
125128
Mutation: &eppb.BodyMutation_StreamedResponse{
126129
StreamedResponse: &eppb.StreamedBodyResponse{
127-
Body: requestBodyBytes,
130+
Body: mutatedBodyBytes,
128131
EndOfStream: true,
129132
},
130133
},
@@ -156,3 +159,31 @@ func (s *Server) HandleRequestTrailers(trailers *eppb.HttpTrailers) ([]*eppb.Pro
156159
},
157160
}, nil
158161
}
162+
163+
// buildEmptyResponsesForMissingModel is a local helper that returns the appropriate empty responses
164+
// for the "model not found" branch depending on streaming mode.
165+
// It is also used to create empty responses in case of other errors related to running plugins on the body
166+
// This is not very clean and MUST be segregated in the future.
167+
// Corresponding metrics should be defined to make different errors observable
168+
func buildEmptyResponsesForMissingModel(streaming bool, requestBodyBytes []byte) ([]*eppb.ProcessingResponse, error) {
169+
var ret []*eppb.ProcessingResponse
170+
171+
if streaming {
172+
// Emit empty headers response, then stream body unchanged.
173+
ret = append(ret, &eppb.ProcessingResponse{
174+
Response: &eppb.ProcessingResponse_RequestHeaders{
175+
RequestHeaders: &eppb.HeadersResponse{},
176+
},
177+
})
178+
ret = addStreamedBodyResponse(ret, requestBodyBytes)
179+
return ret, nil
180+
}
181+
182+
// Non-streaming: emit empty body response.
183+
ret = append(ret, &eppb.ProcessingResponse{
184+
Response: &eppb.ProcessingResponse_RequestBody{
185+
RequestBody: &eppb.BodyResponse{},
186+
},
187+
})
188+
return ret, nil
189+
}

pkg/bbr/handlers/server.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,34 @@ import (
2727
"google.golang.org/grpc/status"
2828
"sigs.k8s.io/controller-runtime/pkg/log"
2929

30+
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/framework"
3031
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3132
requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
3233
)
3334

34-
func NewServer(streaming bool) *Server {
35-
return &Server{streaming: streaming}
35+
func NewServer(streaming bool,
36+
reg framework.PluginRegistry,
37+
reqChain framework.PluginsChain,
38+
respChain framework.PluginsChain,
39+
metaDataKeys []string) *Server {
40+
return &Server{streaming: streaming,
41+
registry: reg,
42+
requestChain: reqChain,
43+
responseChain: respChain,
44+
endpoint: "", //will be known upon receiving a request
45+
metaDataKeys: metaDataKeys,
46+
}
3647
}
3748

3849
// Server implements the Envoy external processing server.
3950
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto
4051
type Server struct {
41-
streaming bool
52+
streaming bool
53+
registry framework.PluginRegistry
54+
requestChain framework.PluginsChain
55+
responseChain framework.PluginsChain
56+
endpoint string //holds endpoint path of an incoming openai request (extracted from an envoy :path pseudo-header)
57+
metaDataKeys []string
4258
}
4359

4460
func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {

pkg/bbr/handlers/server_test.go

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,8 @@ import (
2626
"google.golang.org/protobuf/testing/protocmp"
2727
"sigs.k8s.io/controller-runtime/pkg/log"
2828

29-
<<<<<<< HEAD
30-
=======
3129
bbrplugins "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/plugins"
3230
utils "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/utils"
33-
>>>>>>> 92cdb00 (Pluggable BBR framework (WiP))
3431
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3532
)
3633

@@ -68,7 +65,7 @@ func TestProcessRequestBody(t *testing.T) {
6865
SetHeaders: []*basepb.HeaderValueOption{
6966
{
7067
Header: &basepb.HeaderValue{
71-
Key: modelHeader,
68+
Key: bbrplugins.ModelHeader,
7269
RawValue: []byte("foo"),
7370
},
7471
},
@@ -103,7 +100,7 @@ func TestProcessRequestBody(t *testing.T) {
103100
SetHeaders: []*basepb.HeaderValueOption{
104101
{
105102
Header: &basepb.HeaderValue{
106-
Key: modelHeader,
103+
Key: bbrplugins.ModelHeader,
107104
RawValue: []byte("foo"),
108105
},
109106
},
@@ -135,11 +132,6 @@ func TestProcessRequestBody(t *testing.T) {
135132
},
136133
}
137134

138-
<<<<<<< HEAD
139-
for _, tc := range cases {
140-
t.Run(tc.desc, func(t *testing.T) {
141-
srv := NewServer(tc.streaming)
142-
=======
143135
//Initialize PluginRegistry and request/response PluginsChain instances based on the minimal configuration setting vi env vars
144136
registry, requestChain, responseChain, metaDataKeys, err := utils.InitPlugins()
145137
if err != nil {
@@ -149,7 +141,6 @@ func TestProcessRequestBody(t *testing.T) {
149141
for _, tc := range cases {
150142
t.Run(tc.desc, func(t *testing.T) {
151143
srv := NewServer(tc.streaming, registry, requestChain, responseChain, metaDataKeys)
152-
>>>>>>> 92cdb00 (Pluggable BBR framework (WiP))
153144
streamedBody := &streamedBody{}
154145
for i, body := range tc.bodys {
155146
got, err := srv.processRequestBody(context.Background(), body, streamedBody, log.FromContext(ctx))

pkg/bbr/server/runserver.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,36 @@ import (
2929

3030
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
3131
tlsutil "sigs.k8s.io/gateway-api-inference-extension/internal/tls"
32+
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/framework"
3233
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/handlers"
3334
)
3435

3536
// ExtProcServerRunner provides methods to manage an external process server.
3637
type ExtProcServerRunner struct {
37-
GrpcPort int
38-
SecureServing bool
39-
Streaming bool
38+
GrpcPort int
39+
SecureServing bool
40+
Streaming bool
41+
registry framework.PluginRegistry
42+
requestPluginsChain framework.PluginsChain
43+
responsePluginsChain framework.PluginsChain
44+
metaDataKeys []string
4045
}
4146

42-
func NewDefaultExtProcServerRunner(port int, streaming bool) *ExtProcServerRunner {
47+
func NewDefaultExtProcServerRunner(
48+
port int,
49+
streaming bool,
50+
r framework.PluginRegistry,
51+
reqChain framework.PluginsChain,
52+
respChain framework.PluginsChain,
53+
metaDataKeys []string) *ExtProcServerRunner {
4354
return &ExtProcServerRunner{
44-
GrpcPort: port,
45-
SecureServing: true,
46-
Streaming: streaming,
55+
GrpcPort: port,
56+
SecureServing: true,
57+
Streaming: streaming,
58+
registry: r,
59+
requestPluginsChain: reqChain,
60+
responsePluginsChain: respChain,
61+
metaDataKeys: metaDataKeys,
4762
}
4863
}
4964

@@ -65,7 +80,7 @@ func (r *ExtProcServerRunner) AsRunnable(logger logr.Logger) manager.Runnable {
6580

6681
extProcPb.RegisterExternalProcessorServer(
6782
srv,
68-
handlers.NewServer(r.Streaming),
83+
handlers.NewServer(r.Streaming, r.registry, r.requestPluginsChain, r.responsePluginsChain, r.metaDataKeys),
6984
)
7085

7186
// Forward to the gRPC runnable.

0 commit comments

Comments
 (0)