Skip to content

Commit

Permalink
remove the flag to enable/disable processors
Browse files Browse the repository at this point in the history
Signed-off-by: Ignasi Barrera <[email protected]>
  • Loading branch information
nacx committed Feb 12, 2025
1 parent fe391f0 commit 125b6bc
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 39 deletions.
27 changes: 2 additions & 25 deletions cmd/extproc/mainlib/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,6 @@ type extProcFlags struct {
configPath string // path to the configuration file
extProcAddr string // gRPC address for the external processor
logLevel slog.Level // log level for the external processor
processors []string // list of processors to enable
}

// builtinProcessors is the map of built-in processors that can be enabled.
var builtinProcessors = map[string]struct {
path string
factory extproc.ProcessorFactory
}{
"chatcompletions": {"/v1/chat/completions", extproc.NewChatCompletionProcessor},
"models": {"/v1/models", extproc.NewModelsProcessor},
}

// parseAndValidateFlags parses and validates the flas passed to the external processor.
Expand All @@ -63,11 +53,6 @@ func parseAndValidateFlags(args []string) (extProcFlags, error) {
"info",
"log level for the external processor. One of 'debug', 'info', 'warn', or 'error'.",
)
processorsPtr := fs.String(
"processors",
"chatcompletions,models",
"comma-separated list of processors to enable. Available processors: chatcompletions, models",
)

if err := fs.Parse(args); err != nil {
return extProcFlags{}, fmt.Errorf("failed to parse extProcFlags: %w", err)
Expand All @@ -80,13 +65,6 @@ func parseAndValidateFlags(args []string) (extProcFlags, error) {
errs = append(errs, fmt.Errorf("failed to unmarshal log level: %w", err))
}

flags.processors = strings.Split(*processorsPtr, ",")
for _, p := range flags.processors {
if _, ok := builtinProcessors[p]; !ok {
errs = append(errs, fmt.Errorf("invalid processor: %s", p))
}
}

return flags, errors.Join(errs...)
}

Expand Down Expand Up @@ -123,9 +101,8 @@ func Main() {
if err != nil {
log.Fatalf("failed to create external processor server: %v", err)
}
for _, p := range flags.processors {
server.Register(builtinProcessors[p].path, builtinProcessors[p].factory)
}
server.Register("/v1/chat/completions", extproc.NewChatCompletionProcessor)
server.Register("/v1/models", extproc.NewModelsProcessor)

if err := extproc.StartConfigWatcher(ctx, flags.configPath, server, l, time.Second*5); err != nil {
log.Fatalf("failed to start config watcher: %v", err)
Expand Down
15 changes: 2 additions & 13 deletions cmd/extproc/mainlib/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,68 +16,59 @@ func Test_parseAndValidateFlags(t *testing.T) {
configPath string
addr string
logLevel slog.Level
processors []string
}{
{
name: "minimal extProcFlags",
args: []string{"-configPath", "/path/to/config.yaml"},
configPath: "/path/to/config.yaml",
addr: ":1063",
logLevel: slog.LevelInfo,
processors: []string{"chatcompletions", "models"},
},
{
name: "custom addr",
args: []string{"-configPath", "/path/to/config.yaml", "-extProcAddr", "unix:///tmp/ext_proc.sock"},
configPath: "/path/to/config.yaml",
addr: "unix:///tmp/ext_proc.sock",
logLevel: slog.LevelInfo,
processors: []string{"chatcompletions", "models"},
},
{
name: "log level debug",
args: []string{"-configPath", "/path/to/config.yaml", "-logLevel", "debug"},
configPath: "/path/to/config.yaml",
addr: ":1063",
logLevel: slog.LevelDebug,
processors: []string{"chatcompletions", "models"},
},
{
name: "log level warn",
args: []string{"-configPath", "/path/to/config.yaml", "-logLevel", "warn"},
configPath: "/path/to/config.yaml",
addr: ":1063",
logLevel: slog.LevelWarn,
processors: []string{"chatcompletions", "models"},
},
{
name: "log level error",
args: []string{"-configPath", "/path/to/config.yaml", "-logLevel", "error"},
configPath: "/path/to/config.yaml",
addr: ":1063",
logLevel: slog.LevelError,
processors: []string{"chatcompletions", "models"},
},
{
name: "custom processors",
args: []string{"-configPath", "/path/to/config.yaml", "-processors", "chatcompletions"},
configPath: "/path/to/config.yaml",
addr: ":1063",
logLevel: slog.LevelInfo,
processors: []string{"chatcompletions"},
},
{
name: "all extProcFlags",
args: []string{
"-configPath", "/path/to/config.yaml",
"-extProcAddr", "unix:///tmp/ext_proc.sock",
"-logLevel", "debug",
"-processors", "models",
},
configPath: "/path/to/config.yaml",
addr: "unix:///tmp/ext_proc.sock",
logLevel: slog.LevelDebug,
processors: []string{"models"},
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -86,16 +77,14 @@ func Test_parseAndValidateFlags(t *testing.T) {
assert.Equal(t, tc.configPath, flags.configPath)
assert.Equal(t, tc.addr, flags.extProcAddr)
assert.Equal(t, tc.logLevel, flags.logLevel)
assert.Equal(t, tc.processors, flags.processors)
})
}
})

t.Run("invalid extProcFlags", func(t *testing.T) {
_, err := parseAndValidateFlags([]string{"-logLevel", "invalid", "-processors", "undefined"})
_, err := parseAndValidateFlags([]string{"-logLevel", "invalid"})
assert.EqualError(t, err, `configPath must be provided
failed to unmarshal log level: slog: level string "invalid": unknown name
invalid processor: undefined`)
failed to unmarshal log level: slog: level string "invalid": unknown name`)
})
}

Expand Down
17 changes: 16 additions & 1 deletion internal/extproc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log/slog"
"slices"
"strings"
"unicode/utf8"

corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
Expand Down Expand Up @@ -163,7 +164,7 @@ func (s *Server) Process(stream extprocv3.ExternalProcessor_ProcessServer) error
// If we're processing the request headers, read the :path header to instantiate the
// right processor.
if headers := req.GetRequestHeaders().GetHeaders(); headers != nil {
path := headersToMap(headers)[":path"] // TODO(nacx): Just read the :path header, not all of them
path := getHeader(headers, ":path")
p, err = s.processorForPath(path)
if err != nil {
s.logger.Error("cannot get processor for path", slog.String("path", path), slog.String("error", err.Error()))
Expand Down Expand Up @@ -297,3 +298,17 @@ func filterSensitiveBody(resp *extprocv3.ProcessingResponse, logger *slog.Logger
}
return filteredResp
}

func getHeader(headers *corev3.HeaderMap, name string) string {
for _, h := range headers.GetHeaders() {
if h.GetKey() == name {
if len(h.Value) > 0 {
return h.Value
} else if utf8.Valid(h.RawValue) {
return string(h.RawValue)
}
return ""
}
}
return ""
}

0 comments on commit 125b6bc

Please sign in to comment.