Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
FROM golang:1.25-bookworm AS builder

ARG SERVICE=server

WORKDIR /app

# Copy dependency files first for better caching
Expand All @@ -10,7 +12,7 @@ RUN go mod download
COPY . .

# Build static binary
RUN CGO_ENABLED=0 GOOS=linux go build -o main ./cmd/server
RUN CGO_ENABLED=0 GOOS=linux go build -o main ./cmd/${SERVICE}

# Runtime image
FROM debian:bookworm-slim
Expand Down
119 changes: 119 additions & 0 deletions cmd/scheduler/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package main

import (
"context"
"os"
"os/signal"
"syscall"
"time"

"github.com/sirupsen/logrus"

"github.com/vultisig/agent-backend/internal/ai"
"github.com/vultisig/agent-backend/internal/cache/redis"
"github.com/vultisig/agent-backend/internal/config"
"github.com/vultisig/agent-backend/internal/mcp"
"github.com/vultisig/agent-backend/internal/service/agent"
"github.com/vultisig/agent-backend/internal/service/plugin"
"github.com/vultisig/agent-backend/internal/service/scheduler"
"github.com/vultisig/agent-backend/internal/service/verifier"
"github.com/vultisig/agent-backend/internal/storage/postgres"
)

func main() {
logger := logrus.New()
logger.SetFormatter(&logrus.JSONFormatter{})
logger.SetOutput(os.Stdout)

cfg, err := config.Load()
if err != nil {
logger.WithError(err).Fatal("failed to load configuration")
}

if cfg.LogFormat == "text" {
logger.SetFormatter(&logrus.TextFormatter{})
}

logger.Info("starting agent-scheduler")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Connect to database
db, err := postgres.New(ctx, cfg.Database.DSN)
if err != nil {
logger.WithError(err).Fatal("failed to connect to database")
}
defer db.Close()

// Initialize Redis client
redisClient, err := redis.New(cfg.Redis.URI)
if err != nil {
logger.WithError(err).Fatal("failed to connect to redis")
}
defer redisClient.Close()

// Initialize AI client
aiClient := ai.NewClient(cfg.AI.APIKey, cfg.AI.Model, cfg.AI.BaseURL, cfg.AI.AppName, cfg.AI.AppURL)

// Initialize services
pluginService := plugin.NewService(cfg.Verifier.URL, redisClient, logger)
verifierClient := verifier.NewClient(cfg.Verifier.URL)

// Initialize MCP client (optional, provides observation tools like find_token, get_balance)
var mcpProvider agent.MCPToolProvider
if cfg.MCP.ServerURL != "" {
cacheTTL := time.Duration(cfg.MCP.ToolCacheTTLSec) * time.Second
mcpClient := mcp.NewClient(cfg.MCP.ServerURL, cacheTTL, logger)

mcpCtx, mcpCancel := context.WithTimeout(ctx, 10*time.Second)
defer mcpCancel()

if err := mcpClient.Initialize(mcpCtx); err != nil {
logger.WithError(err).Warn("failed to initialize mcp client, continuing without mcp tools")
} else {
tools, err := mcpClient.ListTools(mcpCtx)
if err != nil {
logger.WithError(err).Warn("failed to list mcp tools, continuing without mcp tools")
} else {
logger.WithField("tool_count", len(tools)).Info("mcp tools loaded")
mcpProvider = mcpClient
}
}
}

// Initialize repositories
convRepo := postgres.NewConversationRepository(db.Pool())
msgRepo := postgres.NewMessageRepository(db.Pool())
memRepo := postgres.NewMemoryRepository(db.Pool())
taskRepo := postgres.NewScheduledTaskRepository(db.Pool())

// Initialize agent service (used by scheduler for headless execution)
agentService := agent.NewAgentService(
aiClient, msgRepo, convRepo, memRepo, taskRepo,
redisClient, verifierClient, pluginService, mcpProvider, nil,
logger, cfg.AI.SummaryModel, cfg.Context, cfg.Scheduler,
)

// Initialize and run scheduler
sched := scheduler.New(
agentService, taskRepo, convRepo, msgRepo, mcpProvider,
cfg.Scheduler, cfg.AI.Model, logger,
)

// Handle graceful shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

go func() {
<-quit
logger.Info("shutting down scheduler")
cancel()
}()

if err := sched.Run(ctx); err != nil && err != context.Canceled {
logger.WithError(err).Fatal("scheduler error")
}

logger.Info("scheduler stopped")
}
5 changes: 4 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,11 @@ func main() {
logger.WithField("url", cfg.MCP.URL).Info("MCP swap tx builder enabled")
}

// Initialize scheduled task repository
taskRepo := postgres.NewScheduledTaskRepository(db.Pool())

// Initialize agent service
agentService := agent.NewAgentService(aiClient, msgRepo, convRepo, memRepo, redisClient, verifierClient, pluginService, mcpProvider, swapTxBuilder, logger, cfg.AI.SummaryModel, cfg.Context)
agentService := agent.NewAgentService(aiClient, msgRepo, convRepo, memRepo, taskRepo, redisClient, verifierClient, pluginService, mcpProvider, swapTxBuilder, logger, cfg.AI.SummaryModel, cfg.Context, cfg.Scheduler)

// Initialize API server
server := api.NewServer(
Expand Down
6 changes: 6 additions & 0 deletions deploy/01_server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ spec:
configMapKeyRef:
name: verifier
key: url
# --- MCP config (from ConfigMap) ---
- name: MCP_SERVER_URL
valueFrom:
configMapKeyRef:
name: agent
key: mcp-server-url
# --- Secrets ---
- name: DATABASE_DSN
valueFrom:
Expand Down
110 changes: 110 additions & 0 deletions deploy/02_scheduler.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: scheduler
labels:
app: scheduler
spec:
replicas: 1
selector:
matchLabels:
app: scheduler
template:
metadata:
labels:
app: scheduler
annotations:
prometheus.io/scrape: "false"
spec:
containers:
- name: scheduler
image: ghcr.io/vultisig/agent-backend/scheduler:v1.0.0
command: ["/app/main"]
env:
# --- Logging ---
- name: LOG_FORMAT
value: "json"
# --- AI config (from ConfigMap) ---
- name: AI_MODEL
valueFrom:
configMapKeyRef:
name: agent
key: ai-model
- name: AI_SUMMARY_MODEL
valueFrom:
configMapKeyRef:
name: agent
key: ai-summary-model
- name: AI_BASE_URL
valueFrom:
configMapKeyRef:
name: agent
key: ai-base-url
- name: AI_APP_NAME
valueFrom:
configMapKeyRef:
name: agent
key: ai-app-name
# --- Context window config (from ConfigMap) ---
- name: CONTEXT_WINDOW_SIZE
valueFrom:
configMapKeyRef:
name: agent
key: context-window-size
- name: CONTEXT_SUMMARIZE_TRIGGER
valueFrom:
configMapKeyRef:
name: agent
key: context-summarize-trigger
- name: CONTEXT_SUMMARY_MAX_TOKENS
valueFrom:
configMapKeyRef:
name: agent
key: context-summary-max-tokens
# --- Verifier URL (from ConfigMap, differs dev/prod) ---
- name: VERIFIER_URL
valueFrom:
configMapKeyRef:
name: verifier
key: url
# --- MCP config (from ConfigMap) ---
- name: MCP_SERVER_URL
valueFrom:
configMapKeyRef:
name: agent
key: mcp-server-url
# --- Scheduler config ---
- name: SCHEDULER_POLL_INTERVAL_SECONDS
value: "30"
- name: SCHEDULER_MAX_ACTIVE_PER_USER
value: "10"
- name: SCHEDULER_MIN_INTERVAL_MINUTES
value: "60"
# --- Secrets ---
- name: DATABASE_DSN
valueFrom:
secretKeyRef:
name: postgres
key: dsn
- name: REDIS_URI
valueFrom:
secretKeyRef:
name: redis
key: uri
- name: AI_API_KEY
valueFrom:
secretKeyRef:
name: ai-provider
key: api-key
- name: AUTH_CACHE_KEY_SECRET
valueFrom:
secretKeyRef:
name: auth-cache
key: key-secret
resources:
requests:
memory: "64Mi"
cpu: "50m"
limits:
memory: "256Mi"
cpu: "250m"
1 change: 1 addition & 0 deletions deploy/dev/01_agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ data:
context-window-size: "20"
context-summarize-trigger: "30"
context-summary-max-tokens: "512"
mcp-server-url: "" # TODO: set MCP server URL for dev environment
1 change: 1 addition & 0 deletions deploy/prod/01_agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ data:
context-window-size: "20"
context-summarize-trigger: "30"
context-summary-max-tokens: "512"
mcp-server-url: "" # TODO: set MCP server URL for prod environment
29 changes: 28 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ services:
retries: 10

server:
build: .
build:
context: .
args:
SERVICE: server
ports:
- "8084:8084"
env_file:
Expand All @@ -45,5 +48,29 @@ services:
- docker-compose.yaml
- .env

scheduler:
build:
context: .
args:
SERVICE: scheduler
env_file:
- .env
environment:
DATABASE_DSN: postgres://vultisig:vultisig@postgres:5432/vultisig-agent?sslmode=disable
REDIS_URI: redis://redis:6379
VERIFIER_URL: http://host.docker.internal:8080
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
develop:
watch:
- action: rebuild
path: .
ignore:
- docker-compose.yaml
- .env

volumes:
pgdata:
8 changes: 8 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Config struct {
Context ContextConfig
Verifier VerifierConfig
MCP MCPConfig
Scheduler SchedulerConfig
}

// ServerConfig holds HTTP server configuration.
Expand Down Expand Up @@ -70,6 +71,13 @@ type MCPConfig struct {
URL string `envconfig:"MCP_URL" default:""`
}

// SchedulerConfig holds settings for the scheduler service.
type SchedulerConfig struct {
PollIntervalSeconds int `envconfig:"SCHEDULER_POLL_INTERVAL_SECONDS" default:"30"`
MaxActivePerUser int `envconfig:"SCHEDULER_MAX_ACTIVE_PER_USER" default:"10"`
MinIntervalMinutes int `envconfig:"SCHEDULER_MIN_INTERVAL_MINUTES" default:"60"`
}

// TODO: Add MetricsConfig for Prometheus metrics when metrics are implemented.

// Load reads configuration from environment variables.
Expand Down
6 changes: 6 additions & 0 deletions internal/service/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,14 @@ type AgentService struct {
msgRepo *postgres.MessageRepository
convRepo *postgres.ConversationRepository
memRepo *postgres.MemoryRepository
taskRepo *postgres.ScheduledTaskRepository
redis *redis.Client
verifier *verifier.Client
pluginProvider PluginSkillsProvider
mcpProvider MCPToolProvider
swapTxBuilder SwapTxBuilder
logger *logrus.Logger
schedulerCfg config.SchedulerConfig
summaryModel string
windowSize int
summarizeTrigger int
Expand All @@ -95,6 +97,7 @@ func NewAgentService(
msgRepo *postgres.MessageRepository,
convRepo *postgres.ConversationRepository,
memRepo *postgres.MemoryRepository,
taskRepo *postgres.ScheduledTaskRepository,
redisClient *redis.Client,
verifierClient *verifier.Client,
pluginProvider PluginSkillsProvider,
Expand All @@ -103,18 +106,21 @@ func NewAgentService(
logger *logrus.Logger,
summaryModel string,
ctxCfg config.ContextConfig,
schedulerCfg config.SchedulerConfig,
) *AgentService {
return &AgentService{
ai: aiClient,
msgRepo: msgRepo,
convRepo: convRepo,
memRepo: memRepo,
taskRepo: taskRepo,
redis: redisClient,
verifier: verifierClient,
pluginProvider: pluginProvider,
mcpProvider: mcpProvider,
swapTxBuilder: swapTxBuilder,
logger: logger,
schedulerCfg: schedulerCfg,
summaryModel: summaryModel,
windowSize: ctxCfg.WindowSize,
summarizeTrigger: ctxCfg.SummarizeTrigger,
Expand Down
Loading