Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_style_check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ jobs:
refactor
mcp
aigw
dynamic_module
subjectPattern: ^(?![A-Z]).+$
subjectPatternError: |
The subject "{subject}" found in the pull request title "{title}"
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ build.%: ## Build a binary for the given command under the internal/cmd director
build: ## Build all binaries under cmd/ directory.
@$(foreach COMMAND_NAME,$(COMMANDS),$(MAKE) build.$(COMMAND_NAME);)

# This builds the dynamic module filter for Envoy. This is the shared library that can be loaded by Envoy to run the AI Gateway filter.
.PHONE: build-dm
build-dm: ## Build the dynamic module for Envoy.
CGO_ENABLED=1 go build -tags "envoy_1.36" -buildmode=c-shared -o $(OUTPUT_DIR)/libaigateway.so ./cmd/dynamic_module

# This builds the docker images for the controller, extproc and testupstream for the e2e tests.
.PHONY: build-e2e
build-e2e: ## Build the docker images for the controller, extproc and testupstream for the e2e tests.
Expand Down
114 changes: 114 additions & 0 deletions cmd/dynamic_module/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright Envoy AI Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package main

import (
"context"
"fmt"
"log/slog"
"os"
"time"

"github.com/prometheus/client_golang/prometheus"
otelprom "go.opentelemetry.io/otel/exporters/prometheus"

"github.com/envoyproxy/ai-gateway/internal/backendauth"
"github.com/envoyproxy/ai-gateway/internal/dynamicmodule"
"github.com/envoyproxy/ai-gateway/internal/dynamicmodule/sdk"
"github.com/envoyproxy/ai-gateway/internal/filterapi"
"github.com/envoyproxy/ai-gateway/internal/internalapi"
"github.com/envoyproxy/ai-gateway/internal/metrics"
)

func main() {} // This must be present to make a shared library.

// Set the envoy.NewHTTPFilter function to create a new http filter.
func init() {
g := &globalState{}
if err := g.initializeEnv(); err != nil {
panic("failed to create env config: " + err.Error())
}

// TODO: use a writer implemented with the Logger ABI of Envoy.
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug, // Adjust log level from environment variable if needed.
}))
if err := filterapi.StartConfigWatcher(context.Background(),
os.Getenv("AI_GATEWAY_dynamicmodule_FILTER_CONFIG_PATH"), g, logger, time.Second*5); err != nil {
panic("failed to start filter config watcher: " + err.Error())
}
sdk.NewHTTPFilterConfig = g.newHTTPFilterConfig
}

// globalState implements [filterapi.ConfigReceiver] to load filter configuration.
type globalState struct {
fc *filterapi.RuntimeConfig
env *dynamicmodule.Env
}

// newHTTPFilterConfig creates a new http filter based on the config.
//
// `config` is the configuration string that is specified in the Envoy configuration.
func (g *globalState) newHTTPFilterConfig(name string, _ []byte) sdk.HTTPFilterConfig {
switch name {
case "ai_gateway.router":
return dynamicmodule.NewRouterFilterConfig(g.env, &g.fc)
case "ai_gateway.upstream":
return dynamicmodule.NewUpstreamFilterConfig(g.env)
default:
panic("unknown filter: " + name)
}
}

// LoadConfig implements [filterapi.ConfigReceiver.LoadConfig].
func (g *globalState) LoadConfig(ctx context.Context, config *filterapi.Config) error {
newConfig, err := filterapi.NewRuntimeConfig(ctx, config, backendauth.NewHandler)
if err != nil {
return fmt.Errorf("cannot create runtime filter config: %w", err)
}
g.fc = newConfig // This is racy but we don't care.
return nil
}

func (g *globalState) initializeEnv() error {
ctx := context.Background()
promRegistry := prometheus.NewRegistry()
promReader, err := otelprom.New(otelprom.WithRegisterer(promRegistry))
if err != nil {
return fmt.Errorf("failed to create prometheus reader: %w", err)
}

meter, _, err := metrics.NewMeterFromEnv(ctx, os.Stdout, promReader)
if err != nil {
return fmt.Errorf("failed to create metrics: %w", err)
}

endpointPrefixes, err := internalapi.ParseEndpointPrefixes(os.Getenv(
"AI_GATEWAY_dynamicmodule_FILTER_ENDPOINT_PREFIXES",
))
if err != nil {
return fmt.Errorf("failed to parse endpoint prefixes: %w", err)
}

metricsRequestHeaderAttributes, err := internalapi.ParseRequestHeaderAttributeMapping(os.Getenv(
"AI_GATEWAY_dynamicmodule_FILTER_METRICS_REQUEST_HEADER_ATTRIBUTES",
))
if err != nil {
return fmt.Errorf("failed to parse metrics header mapping: %w", err)
}

g.env = &dynamicmodule.Env{
RootPrefix: os.Getenv("AI_GATEWAY_dynamicmodule_ROOT_PREFIX"),
EndpointPrefixes: endpointPrefixes,
ChatCompletionMetricsFactory: metrics.NewMetricsFactory(meter, metricsRequestHeaderAttributes, metrics.GenAIOperationChat),
MessagesMetricsFactory: metrics.NewMetricsFactory(meter, metricsRequestHeaderAttributes, metrics.GenAIOperationMessages),
CompletionMetricsFactory: metrics.NewMetricsFactory(meter, metricsRequestHeaderAttributes, metrics.GenAIOperationCompletion),
EmbeddingsMetricsFactory: metrics.NewMetricsFactory(meter, metricsRequestHeaderAttributes, metrics.GenAIOperationEmbedding),
ImageGenerationMetricsFactory: metrics.NewMetricsFactory(meter, metricsRequestHeaderAttributes, metrics.GenAIOperationImageGeneration),
RerankMetricsFactory: metrics.NewMetricsFactory(meter, metricsRequestHeaderAttributes, metrics.GenAIOperationRerank),
}
return nil
}
43 changes: 43 additions & 0 deletions internal/dynamicmodule/dynamic_module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright Envoy AI Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package dynamicmodule

import (
"github.com/envoyproxy/ai-gateway/internal/internalapi"
"github.com/envoyproxy/ai-gateway/internal/metrics"
)

// endpoint represents the type of the endpoint that the request is targeting.
type endpoint int

const (
// chatCompletionsEndpoint represents the /v1/chat/completions endpoint.
chatCompletionsEndpoint endpoint = iota
// completionsEndpoint represents the /v1/completions endpoint.
completionsEndpoint
// embeddingsEndpoint represents the /v1/embeddings endpoint.
embeddingsEndpoint
// imagesGenerationsEndpoint represents the /v1/images/generations endpoint.
imagesGenerationsEndpoint
// rerankEndpoint represents the /v2/rerank endpoint of cohere.
rerankEndpoint
// messagesEndpoint represents the /v1/messages endpoint of anthropic.
messagesEndpoint
// modelsEndpoint represents the /v1/models endpoint.
modelsEndpoint
)

// Env holds the environment configuration for the dynamic module that is process-wide.
type Env struct {
RootPrefix string
EndpointPrefixes internalapi.EndpointPrefixes
ChatCompletionMetricsFactory,
MessagesMetricsFactory,
CompletionMetricsFactory,
EmbeddingsMetricsFactory,
ImageGenerationMetricsFactory,
RerankMetricsFactory metrics.Factory
}
194 changes: 194 additions & 0 deletions internal/dynamicmodule/router_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright Envoy AI Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package dynamicmodule

import (
"encoding/json"
"fmt"
"io"
"path"
"strings"
"unsafe"

openaisdk "github.com/openai/openai-go/v2"

"github.com/envoyproxy/ai-gateway/internal/apischema/anthropic"
cohereschema "github.com/envoyproxy/ai-gateway/internal/apischema/cohere"
"github.com/envoyproxy/ai-gateway/internal/apischema/openai"
"github.com/envoyproxy/ai-gateway/internal/dynamicmodule/sdk"
"github.com/envoyproxy/ai-gateway/internal/filterapi"
"github.com/envoyproxy/ai-gateway/internal/internalapi"
)

const routerFilterPointerDynamicMetadataKey = "router_filter_pointer"

type (
// routerFilterConfig implements [sdk.HTTPFilterConfig].
//
// This is mostly for debugging purposes, it does not do anything except
// setting a response header with the version of the dynamic module.
routerFilterConfig struct {
fcr **filterapi.RuntimeConfig
prefixToEndpoint map[string]endpoint
}
// routerFilter implements [sdk.HTTPFilter].
routerFilter struct {
routerFilterConfig *routerFilterConfig
runtimeFilterConfig *filterapi.RuntimeConfig
endpoint endpoint
originalHeaders map[string]string
originalRequestBody any
originalRequestBodyRaw []byte
span any
attemptCount int
}
)

// NewRouterFilterConfig creates a new instance of an implementation of [sdk.HTTPFilterConfig] for the router filter.
func NewRouterFilterConfig(env *Env, fcr **filterapi.RuntimeConfig) sdk.HTTPFilterConfig {
prefixToEndpoint := map[string]endpoint{
path.Join(env.RootPrefix, env.EndpointPrefixes.OpenAI, "/v1/chat/completions"): chatCompletionsEndpoint,
path.Join(env.RootPrefix, env.EndpointPrefixes.OpenAI, "/v1/completions"): completionsEndpoint,
path.Join(env.RootPrefix, env.EndpointPrefixes.OpenAI, "/v1/embeddings"): embeddingsEndpoint,
path.Join(env.RootPrefix, env.EndpointPrefixes.OpenAI, "/v1/images/generations"): imagesGenerationsEndpoint,
path.Join(env.RootPrefix, env.EndpointPrefixes.Cohere, "/v2/rerank"): rerankEndpoint,
path.Join(env.RootPrefix, env.EndpointPrefixes.OpenAI, "/v1/models"): modelsEndpoint,
path.Join(env.RootPrefix, env.EndpointPrefixes.Anthropic, "/v1/messages"): messagesEndpoint,
}
return &routerFilterConfig{
fcr: fcr,
prefixToEndpoint: prefixToEndpoint,
}
}

// NewFilter implements [sdk.HTTPFilterConfig].
func (f *routerFilterConfig) NewFilter() sdk.HTTPFilter {
return &routerFilter{routerFilterConfig: f, runtimeFilterConfig: *f.fcr}
}

// RequestHeaders implements [sdk.HTTPFilter].
func (f *routerFilter) RequestHeaders(e sdk.EnvoyHTTPFilter, _ bool) sdk.RequestHeadersStatus {
p, _ := e.GetRequestHeader(":path") // The :path pseudo header is always present.
// Strip query parameters for processor lookup.
if queryIndex := strings.Index(p, "?"); queryIndex != -1 {
p = p[:queryIndex]
}
ep, ok := f.routerFilterConfig.prefixToEndpoint[p]
if !ok {
e.SendLocalReply(404, nil, []byte(fmt.Sprintf("unsupported path: %s", p)))
return sdk.RequestHeadersStatusStopIteration
}
f.endpoint = ep
if f.endpoint == modelsEndpoint {
return f.handleModelsEndpoint(e)
}
return sdk.RequestHeadersStatusContinue
}

// RequestBody implements [sdk.HTTPFilter].
func (f *routerFilter) RequestBody(e sdk.EnvoyHTTPFilter, endOfStream bool) sdk.RequestBodyStatus {
if !endOfStream {
return sdk.RequestBodyStatusStopIterationAndBuffer
}
b, ok := e.GetRequestBody()
if !ok {
e.SendLocalReply(400, nil, []byte("failed to read request body"))
return sdk.RequestBodyStatusStopIterationAndBuffer
}
raw, err := io.ReadAll(b)
if err != nil {
e.SendLocalReply(400, nil, []byte("failed to read request body: "+err.Error()))
return sdk.RequestBodyStatusStopIterationAndBuffer
}
f.originalRequestBodyRaw = raw
var parsed any
var modelName string
switch f.endpoint {
case chatCompletionsEndpoint:
parsed, modelName, err = parseBodyWithModel(raw, func(req *openai.ChatCompletionRequest) string { return req.Model })
case completionsEndpoint:
parsed, modelName, err = parseBodyWithModel(raw, func(req *openai.CompletionRequest) string { return req.Model })
case embeddingsEndpoint:
parsed, modelName, err = parseBodyWithModel(raw, func(req *openai.EmbeddingRequest) string { return req.Model })
case imagesGenerationsEndpoint:
parsed, modelName, err = parseBodyWithModel(raw, func(req *openaisdk.ImageGenerateParams) string { return req.Model })
case rerankEndpoint:
parsed, modelName, err = parseBodyWithModel(raw, func(req *cohereschema.RerankV2Request) string { return req.Model })
case messagesEndpoint:
parsed, modelName, err = parseBodyWithModel(raw, func(req *anthropic.MessagesRequest) string { return req.GetModel() })
default:
e.SendLocalReply(500, nil, []byte("BUG: unsupported endpoint at body parsing: "+fmt.Sprintf("%d", f.endpoint)))
}
if err != nil {
e.SendLocalReply(400, nil, []byte("failed to parse request body: "+err.Error()))
return sdk.RequestBodyStatusStopIterationAndBuffer
}
f.originalRequestBody = parsed
if !e.SetRequestHeader(internalapi.ModelNameHeaderKeyDefault, []byte(modelName)) {
e.SendLocalReply(500, nil, []byte("failed to set model name header"))
return sdk.RequestBodyStatusStopIterationAndBuffer
}
// Store the pointer to the filter in dynamic metadata for later retrieval in the upstream filter.
e.SetDynamicMetadataString(internalapi.AIGatewayFilterMetadataNamespace, routerFilterPointerDynamicMetadataKey,
fmt.Sprintf("%d", uintptr(unsafe.Pointer(f))))

f.originalHeaders = multiValueHeadersToSingleValue(e.GetRequestHeaders())
return sdk.RequestBodyStatusContinue
}

// ResponseHeaders implements [sdk.HTTPFilter].
func (f *routerFilter) ResponseHeaders(sdk.EnvoyHTTPFilter, bool) sdk.ResponseHeadersStatus {
return sdk.ResponseHeadersStatusContinue
}

// ResponseBody implements [sdk.HTTPFilter].
func (f *routerFilter) ResponseBody(sdk.EnvoyHTTPFilter, bool) sdk.ResponseBodyStatus {
return sdk.ResponseBodyStatusContinue
}

// handleModelsEndpoint handles the /v1/models endpoint by returning the list of declared models in the filter configuration.
//
// This is called on request headers phase.
func (f *routerFilter) handleModelsEndpoint(e sdk.EnvoyHTTPFilter) sdk.RequestHeadersStatus {
config := f.runtimeFilterConfig
models := openai.ModelList{
Object: "list",
Data: make([]openai.Model, 0, len(config.DeclaredModels)),
}
for _, m := range config.DeclaredModels {
models.Data = append(models.Data, openai.Model{
ID: m.Name,
Object: "model",
OwnedBy: m.OwnedBy,
Created: openai.JSONUNIXTime(m.CreatedAt),
})
}

body, _ := json.Marshal(models)
e.SendLocalReply(200, [][2]string{
{"content-type", "application/json"},
}, body)
return sdk.RequestHeadersStatusStopIteration
}

func parseBodyWithModel[T any](body []byte, modelExtractFn func(req *T) string) (interface{}, string, error) {
var req T
if err := json.Unmarshal(body, &req); err != nil {
return nil, "", fmt.Errorf("failed to unmarshal body: %w", err)
}
return req, modelExtractFn(&req), nil
}

// multiValueHeadersToSingleValue converts a map of headers with multiple values to a map of headers with single values by taking the first value for each header.
//
// TODO: this is purely for feature parity with the old filter where we ignore the case of multiple header values.
func multiValueHeadersToSingleValue(headers map[string][]string) map[string]string {
singleValueHeaders := make(map[string]string, len(headers))
for k, v := range headers {
singleValueHeaders[k] = v[0]
}
return singleValueHeaders
}
Loading
Loading