From 8a9aef7e2eb52d492a20a587e5a12714b1fc5102 Mon Sep 17 00:00:00 2001 From: jixie Date: Wed, 8 Jan 2025 15:18:00 +0800 Subject: [PATCH] add cursor metrics --- pkg/mongoproxy/plugins/authz/plugin.go | 4 +-- pkg/mongoproxy/plugins/authz/plugin_test.go | 2 +- pkg/mongoproxy/plugins/interface.go | 31 ++++++++++++++-- pkg/mongoproxy/plugins/mongo/mongo.go | 8 ++--- pkg/mongoproxy/proxy.go | 40 ++++++++++++++++----- pkg/mongoproxy/proxy_handlers.go | 12 +++---- 6 files changed, 73 insertions(+), 24 deletions(-) diff --git a/pkg/mongoproxy/plugins/authz/plugin.go b/pkg/mongoproxy/plugins/authz/plugin.go index c77ee47..10526e5 100644 --- a/pkg/mongoproxy/plugins/authz/plugin.go +++ b/pkg/mongoproxy/plugins/authz/plugin.go @@ -365,7 +365,7 @@ func (p *AuthzPlugin) resourcesForCommand(r *plugins.Request, c command.Command) } case *command.GetMore: - cursorResources := r.CursorCache.GetCursor(cmd.CursorID).Map[contextKeyResources] + cursorResources := r.CursorCache.GetCursor(cmd.CursorID, r.GetClientInfo()).Map[contextKeyResources] if cr, ok := cursorResources.(map[authzlib.AuthorizationMethod][]authzlib.Resource); ok { return cr } @@ -592,7 +592,7 @@ func (p *AuthzPlugin) Process(ctx context.Context, r *plugins.Request, next plug result, err := next(ctx, r) if cursorIDRaw, ok := bsonutil.Lookup(result, "cursor", "id"); ok { if cursorID, ok := cursorIDRaw.(int64); ok && cursorID > 0 { - r.CursorCache.GetCursor(cursorID).Map[contextKeyResources] = resourceMap + r.CursorCache.GetCursor(cursorID, r.GetClientInfo()).Map[contextKeyResources] = resourceMap } } diff --git a/pkg/mongoproxy/plugins/authz/plugin_test.go b/pkg/mongoproxy/plugins/authz/plugin_test.go index 6400793..a0be3a8 100644 --- a/pkg/mongoproxy/plugins/authz/plugin_test.go +++ b/pkg/mongoproxy/plugins/authz/plugin_test.go @@ -58,7 +58,7 @@ func TestPluginGetMore(t *testing.T) { p := plugins.BuildPipeline([]plugins.Plugin{d}, func(_ context.Context, r *plugins.Request) (bson.D, error) { switch r.Command.(type) { case *command.Find: - r.CursorCache.GetCursor(cursorID) + r.CursorCache.GetCursor(cursorID, r.GetClientInfo()) return bson.D{ {"ok", 1}, {"cursor", bson.D{{"id", cursorID}}}, diff --git a/pkg/mongoproxy/plugins/interface.go b/pkg/mongoproxy/plugins/interface.go index 2a2165f..205a153 100644 --- a/pkg/mongoproxy/plugins/interface.go +++ b/pkg/mongoproxy/plugins/interface.go @@ -3,6 +3,7 @@ package plugins import ( "context" "net" + "strings" "go.mongodb.org/mongo-driver/bson" @@ -25,20 +26,22 @@ type Plugin interface { Process(context.Context, *Request, PipelineFunc) (bson.D, error) } -func NewCursorCacheEntry(id int64) *CursorCacheEntry { +func NewCursorCacheEntry(id int64, clientInfo string) *CursorCacheEntry { return &CursorCacheEntry{ ID: id, + ClientInfo: clientInfo, Map: map[interface{}]interface{}{}, } } type CursorCache interface { - GetCursor(cursorID int64) *CursorCacheEntry - CloseCursor(cursorID int64) + GetCursor(cursorID int64, clientInfo string) *CursorCacheEntry + CloseCursor(cursorID int64, clientInfo string) } type CursorCacheEntry struct { ID int64 + ClientInfo string CursorConsumed int // Map is storage that resets on cursor change @@ -59,6 +62,10 @@ type Request struct { Map map[string]interface{} } +func (r *Request) GetClientInfo() string { + return r.CC.GetClientInfo() +} + func (r *Request) Close() {} func NewClientConnection() *ClientConnection { @@ -77,6 +84,19 @@ type ClientConnection struct { // Map is storage that resets on cursor change Map map[interface{}]interface{} } +func (c *ClientConnection) GetUsername() string { + var usernames []string + for _, identity := range c.Identities { + usernames = append(usernames, identity.User()) + } + var username string + if len(usernames) > 0 { + username = strings.Join(usernames, ",") + } else { + username = "unknown" + } + return username +} func (c *ClientConnection) GetAddr() string { if c.Addr == nil { @@ -85,6 +105,11 @@ func (c *ClientConnection) GetAddr() string { return c.Addr.String() } +func (c *ClientConnection) GetClientInfo() string { + //todo @jiapeng use username + return c.GetAddr() +} + func (c *ClientConnection) Close() {} type ClientIdentity interface { diff --git a/pkg/mongoproxy/plugins/mongo/mongo.go b/pkg/mongoproxy/plugins/mongo/mongo.go index e73a901..63556a4 100644 --- a/pkg/mongoproxy/plugins/mongo/mongo.go +++ b/pkg/mongoproxy/plugins/mongo/mongo.go @@ -302,7 +302,7 @@ func (p *MongoPlugin) Process(ctx context.Context, r *plugins.Request, next plug if cursorID, ok := cursorIDRaw.(int64); ok && cursorID > 0 { logrus.Tracef("Store cursor: %v %v", cursorID, cmdServer) // TODO: TTL from cmd - r.CursorCache.GetCursor(cursorID).Map[contextKeyServer] = cmdServer + r.CursorCache.GetCursor(cursorID, r.GetClientInfo()).Map[contextKeyServer] = cmdServer } } } @@ -473,7 +473,7 @@ func (p *MongoPlugin) Process(ctx context.Context, r *plugins.Request, next plug cmd.Database = "" // TODO: move into runCommand? - v, ok := r.CursorCache.GetCursor(cmd.CursorID).Map[contextKeyServer] + v, ok := r.CursorCache.GetCursor(cmd.CursorID, r.GetClientInfo()).Map[contextKeyServer] if !ok { return mongoerror.CursorNotFound.ErrMessage("Cursor not found."), nil } @@ -482,7 +482,7 @@ func (p *MongoPlugin) Process(ctx context.Context, r *plugins.Request, next plug if cursorIDRaw, ok := bsonutil.Lookup(result, "cursor", "id"); ok { if cursorID, ok := cursorIDRaw.(int64); ok && cursorID == 0 { - r.CursorCache.CloseCursor(cmd.CursorID) + r.CursorCache.CloseCursor(cmd.CursorID, r.GetClientInfo()) } } @@ -522,7 +522,7 @@ func (p *MongoPlugin) Process(ctx context.Context, r *plugins.Request, next plug if !ok { return nil, fmt.Errorf("invalid cursorID") } - v, ok := r.CursorCache.GetCursor(cursorID).Map[contextKeyServer] + v, ok := r.CursorCache.GetCursor(cursorID, r.GetClientInfo()).Map[contextKeyServer] if !ok { return mongoerror.CursorNotFound.ErrMessage("Cursor not found."), nil } diff --git a/pkg/mongoproxy/proxy.go b/pkg/mongoproxy/proxy.go index fe93ba4..4923e00 100644 --- a/pkg/mongoproxy/proxy.go +++ b/pkg/mongoproxy/proxy.go @@ -10,6 +10,7 @@ import ( "os" "reflect" "strconv" + "strings" "sync" "time" @@ -47,6 +48,14 @@ var ( Name: "mongoproxy_client_message_total", Help: "The total number of messages from clients", }, []string{"opcode"}) + clientCursorGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "mongoproxy_client_cursors_open", + Help: "The current number of open cursors", + }, []string{"clientInfo"}) + clientCursorCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mongoproxy_client_cursors_total", + Help: "The total number of client cursors", + }, []string{"clientInfo"}) ErrServerClosed = errors.New("server closed") SKIP_RECOVER = false @@ -91,17 +100,31 @@ func NewProxy(l net.Listener, cfg *config.Config) (*Proxy, error) { // Set up cursorCache p.cursorCache.SetTTL(p.cfg.IdleCursorTimeout) // default TTL -- config - p.cursorCache.SetLoaderFunction(func(key string) (interface{}, time.Duration, error) { + p.cursorCache.SetLoaderFunction(func(clientKey string) (interface{}, time.Duration, error) { + keys := strings.SplitN(clientKey, "_", 2) + if keys == nil || len(keys) != 2 { + return nil, time.Duration(0), errors.New("Illegal cursor key " + clientKey) + } + key := keys[0] + clientInfo := keys[1] cursorID, err := strconv.ParseInt(key, 10, 64) if err != nil { return nil, time.Duration(0), err } + clientCursorCounter.WithLabelValues(clientInfo).Inc() + clientCursorGauge.WithLabelValues(clientInfo).Inc() - return plugins.NewCursorCacheEntry(cursorID), time.Duration(0), nil + return plugins.NewCursorCacheEntry(cursorID, clientInfo), time.Duration(0), nil }) // expiration handler to send killCursor commands - p.cursorCache.SetExpirationReasonCallback(func(key string, reason ttlcache.EvictionReason, value interface{}) { - logrus.Tracef("expire cursor %s", key) + p.cursorCache.SetExpirationReasonCallback(func(clientKey string, reason ttlcache.EvictionReason, value interface{}) { + logrus.Tracef("expire cursor %s", clientKey) + keys := strings.SplitN(clientKey, "_", 2) + if keys == nil || len(keys) != 2 { + return + } + key := keys[0] + clientInfo := keys[1] i, err := strconv.ParseInt(key, 10, 64) if err != nil { return @@ -114,6 +137,7 @@ func NewProxy(l net.Listener, cfg *config.Config) (*Proxy, error) { {"cursors", primitive.A{i}}, }) } + clientCursorGauge.WithLabelValues(clientInfo).Dec() }) return p, nil @@ -138,8 +162,8 @@ type Proxy struct { internalCC *plugins.ClientConnection } -func (p *Proxy) GetCursor(cursorID int64) *plugins.CursorCacheEntry { - v, err := p.cursorCache.Get(strconv.FormatInt(cursorID, 10)) +func (p *Proxy) GetCursor(cursorID int64, clientInfo string) *plugins.CursorCacheEntry { + v, err := p.cursorCache.Get(strconv.FormatInt(cursorID, 10) + "_" + clientInfo) if err == ttlcache.ErrNotFound { panic("can't get cursor") } @@ -147,8 +171,8 @@ func (p *Proxy) GetCursor(cursorID int64) *plugins.CursorCacheEntry { return v.(*plugins.CursorCacheEntry) } -func (p *Proxy) CloseCursor(cursorID int64) { - p.cursorCache.Remove(strconv.FormatInt(cursorID, 10)) +func (p *Proxy) CloseCursor(cursorID int64, clientInfo string) { + p.cursorCache.Remove(strconv.FormatInt(cursorID, 10) + "_" + clientInfo) } func (p *Proxy) Addr() string { diff --git a/pkg/mongoproxy/proxy_handlers.go b/pkg/mongoproxy/proxy_handlers.go index 2375dcd..2057db0 100644 --- a/pkg/mongoproxy/proxy_handlers.go +++ b/pkg/mongoproxy/proxy_handlers.go @@ -146,7 +146,7 @@ func (p *Proxy) handleOpQuery(ctx context.Context, cc *plugins.ClientConnection, } if cursorID, ok := bsonutil.Lookup(result, "cursor", "id"); ok { - p.GetCursor(cursorID.(int64)).CursorConsumed += len(v.(primitive.A)) + p.GetCursor(cursorID.(int64), cc.GetClientInfo()).CursorConsumed += len(v.(primitive.A)) } reply.Documents = []bson.D{{{"ok", 1}, {"result", v}}} @@ -164,7 +164,7 @@ func (p *Proxy) handleOpQuery(ctx context.Context, cc *plugins.ClientConnection, } var cursorEntry *plugins.CursorCacheEntry if cursorID, ok := bsonutil.Lookup(cursorData, "id"); ok { - cursorEntry = p.GetCursor(cursorID.(int64)) + cursorEntry = p.GetCursor(cursorID.(int64), cc.GetClientInfo()) } firstBatchRaw, ok := bsonutil.Lookup(cursorData, "firstBatch") if ok { @@ -283,7 +283,7 @@ func (p *Proxy) handleOpQuery(ctx context.Context, cc *plugins.ClientConnection, } var cursorEntry *plugins.CursorCacheEntry if cursorID, ok := bsonutil.Lookup(cursorData, "id"); ok { - cursorEntry = p.GetCursor(cursorID.(int64)) + cursorEntry = p.GetCursor(cursorID.(int64), cc.GetClientInfo()) reply.CursorID = cursorID.(int64) } firstBatchRaw, ok := bsonutil.Lookup(cursorData, "firstBatch") @@ -318,7 +318,7 @@ func (p *Proxy) handleOpKillCursors(ctx context.Context, cc *plugins.ClientConne if err == nil { if bsonutil.Ok(result) { for _, cursorID := range q.CursorIDs { - p.CloseCursor(cursorID) + p.CloseCursor(cursorID, request.GetClientInfo()) } } } @@ -339,7 +339,7 @@ func (p *Proxy) handleOpGetMore(ctx context.Context, cc *plugins.ClientConnectio Header: q.Header, } - cursorEntry := p.GetCursor(q.CursorID) + cursorEntry := p.GetCursor(q.CursorID, cc.GetClientInfo()) result, err := p.HandleMongo(ctx, request, []primitive.E{ {Key: "getMore", Value: q.CursorID}, @@ -360,7 +360,7 @@ func (p *Proxy) handleOpGetMore(ctx context.Context, cc *plugins.ClientConnectio if cursorID, ok := bsonutil.Lookup(cursorData, "id"); ok { reply.CursorID = cursorID.(int64) if cursorID.(int64) == 0 { - p.CloseCursor(cursorEntry.ID) + p.CloseCursor(cursorEntry.ID, request.GetClientInfo()) } } nextBatchRaw, ok := bsonutil.Lookup(cursorData, "nextBatch")