Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cmd/neona/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/fentz26/neona/internal/audit"
"github.com/fentz26/neona/internal/connectors/localexec"
"github.com/fentz26/neona/internal/controlplane"
"github.com/fentz26/neona/internal/mcp"
"github.com/fentz26/neona/internal/scheduler"
"github.com/fentz26/neona/internal/store"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -60,6 +61,21 @@ func runDaemon(cmd *cobra.Command, args []string) error {
schedulerCfg := scheduler.DefaultConfig()
sched := scheduler.New(s, pdr, connector, schedulerCfg)

// Initialize MCP router
mcpConfig, err := mcp.LoadConfigFromHome()
if err != nil {
log.Printf("Warning: failed to load MCP config: %v (using defaults)", err)
mcpConfig = mcp.DefaultConfig()
}
registry := mcp.NewRegistry()
registry.RegisterDefaults()
mcpRouter := mcp.NewRouter(mcpConfig, registry)
log.Printf("MCP router initialized with %d servers", registry.Count())

// Wire MCP router to scheduler and server
sched.SetMCPRouter(mcpRouter)
server.SetMCPRouter(mcpRouter)

// Wire scheduler to server for /workers endpoint
server.SetScheduler(sched)

Expand Down
94 changes: 94 additions & 0 deletions internal/controlplane/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"github.com/fentz26/neona/internal/mcp"
"github.com/fentz26/neona/internal/models"
"github.com/fentz26/neona/internal/store"
)
Expand All @@ -20,13 +21,19 @@ type SchedulerStatsProvider interface {
GetStats() map[string]interface{}
}

// MCPRouter provides MCP routing for the /mcp/route endpoint.
type MCPRouter interface {
Route(ctx context.Context, task mcp.Task) (*mcp.RoutingResult, error)
}

// Server provides the HTTP API for Neona.
type Server struct {
service *Service
store *store.Store
addr string
server *http.Server
scheduler SchedulerStatsProvider
mcpRouter MCPRouter
}

// NewServer creates a new HTTP server.
Expand All @@ -44,6 +51,12 @@ func (s *Server) SetScheduler(sched SchedulerStatsProvider) {
s.scheduler = sched
}

// SetMCPRouter sets the MCP router for the /mcp/route endpoint.
// Must be called before Start() - not safe for concurrent use.
func (s *Server) SetMCPRouter(router MCPRouter) {
s.mcpRouter = router
}

// Start starts the HTTP server.
func (s *Server) Start() error {
mux := http.NewServeMux()
Expand All @@ -58,6 +71,9 @@ func (s *Server) Start() error {
// Worker pool monitor endpoint
mux.HandleFunc("/workers", s.handleWorkers)

// MCP routing endpoint
mux.HandleFunc("/mcp/route", s.handleMCPRoute)

// Health check with DB ping
mux.HandleFunc("/health", s.handleHealth)

Expand Down Expand Up @@ -410,3 +426,81 @@ func (s *Server) handleWorkers(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(stats)
}

// --- MCP Route Handlers ---

// mcpRouteRequest represents the request body for /mcp/route
type mcpRouteRequest struct {
Title string `json:"title"`
Description string `json:"description"`
}

// mcpRouteResponse represents the response for /mcp/route
type mcpRouteResponse struct {
SelectedMCPs []mcpServerInfo `json:"selected_mcps"`
MatchedRules []string `json:"matched_rules"`
TotalTools int `json:"total_tools"`
ToolBudget int `json:"tool_budget"`
}

type mcpServerInfo struct {
Name string `json:"name"`
ToolCount int `json:"tool_count"`
}

// handleMCPRoute handles POST /mcp/route
func (s *Server) handleMCPRoute(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}

if s.mcpRouter == nil {
http.Error(w, "MCP router not configured", http.StatusServiceUnavailable)
return
}

var req mcpRouteRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid json", http.StatusBadRequest)
return
}

if req.Title == "" {
http.Error(w, "title is required", http.StatusBadRequest)
return
}

task := mcp.Task{
Title: req.Title,
Description: req.Description,
}

result, err := s.mcpRouter.Route(r.Context(), task)
if err != nil {
log.Printf("MCP routing failed: %v", err)
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}

// Build response
mcps := make([]mcpServerInfo, len(result.SelectedMCPs))
for i, m := range result.SelectedMCPs {
mcps[i] = mcpServerInfo{
Name: m.Name,
ToolCount: m.ToolCount,
}
}

resp := mcpRouteResponse{
SelectedMCPs: mcps,
MatchedRules: result.MatchedRules,
TotalTools: result.TotalTools,
ToolBudget: 80, // Default budget
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API returns hardcoded tool budget ignoring configuration

Low Severity

The ToolBudget field in the /mcp/route response is hardcoded to 80, but the actual budget used by the router comes from config.MaxToolsPerTask which users can customize via their mcp.yaml configuration. If a user configures a different budget (e.g., 50 or 100), the API response still shows 80, providing misleading metadata about the actual tool budget being enforced.

Fix in Cursor Fix in Web

}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(resp); err != nil {
log.Printf("Failed to encode MCP route response: %v", err)
}
json.NewEncoder(w).Encode(resp)
}
36 changes: 36 additions & 0 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/fentz26/neona/internal/audit"
"github.com/fentz26/neona/internal/connectors"
"github.com/fentz26/neona/internal/mcp"
"github.com/fentz26/neona/internal/models"
"github.com/fentz26/neona/internal/store"
"github.com/google/uuid"
Expand All @@ -33,6 +34,9 @@ type Scheduler struct {
connector connectors.Connector
config *Config

// MCP router for tool selection
mcpRouter *mcp.KeywordRouter

// Worker pool state
mu sync.Mutex
activeWorkers int
Expand Down Expand Up @@ -69,6 +73,12 @@ func New(s *store.Store, pdr *audit.PDRWriter, conn connectors.Connector, cfg *C
}
}

// SetMCPRouter sets the MCP router for tool selection.
// Must be called before Start() - not safe for concurrent use.
func (sch *Scheduler) SetMCPRouter(router *mcp.KeywordRouter) {
sch.mcpRouter = router
}

// Start begins the scheduler loop.
func (sch *Scheduler) Start() {
sch.mu.Lock()
Expand Down Expand Up @@ -149,6 +159,32 @@ func (sch *Scheduler) pollAndDispatch() {
"connector": connectorName,
}, "success", task.ID, fmt.Sprintf("Dispatched to worker %s", workerID))

// Route MCPs for this task if router is configured
if sch.mcpRouter != nil {
mcpTask := mcp.Task{
ID: task.ID,
Title: task.Title,
Description: task.Description,
}
result, err := sch.mcpRouter.Route(sch.ctx, mcpTask)
if err != nil {
log.Printf("MCP routing error for task %s: %v", task.ID, err)
} else {
// Log selected MCPs
mcpNames := make([]string, len(result.SelectedMCPs))
for i, m := range result.SelectedMCPs {
mcpNames[i] = m.Name
}
sch.pdr.Record("task.mcp_route", map[string]interface{}{
"task_id": task.ID,
"selected_mcps": mcpNames,
"total_tools": result.TotalTools,
"matched_rules": result.MatchedRules,
}, "success", task.ID, fmt.Sprintf("Routed to %d MCPs with %d tools", len(mcpNames), result.TotalTools))
log.Printf("Task %s routed to MCPs: %v (%d tools)", task.ID, mcpNames, result.TotalTools)
}
}

log.Printf("Dispatched task %s (%s) to worker %s", task.ID, task.Title, workerID)

// Increment worker counts and store worker info
Expand Down