Skip to content
19 changes: 19 additions & 0 deletions GLOSSARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,25 @@ A time-based trigger that fires an action — sending a message or dispatching (
_Avoid_: cron job (recurring only), scheduled message (too narrow), reminder, timer
_See also_: Dispatch

## Observability

Scion produces two distinct families of metrics. They serve different audiences, use different prefixes, and flow through different pipelines — but both export to the same Cloud Monitoring backend.

**Infrastructure metrics**:
Operational health metrics for Scion as a system — the Hub process, its database connections, dispatch pipeline, broker authentication, and GCP token minting. These answer "is Scion itself healthy?" and are consumed by platform operators. Prefixes: `scion.hub.*`, `scion.db.*`, `scion.dispatch.*`. Produced by the Hub process; exported directly to Cloud Monitoring via an OTel MeterProvider with a GCP exporter.
_Avoid_: system metrics, platform metrics, server metrics
_See also_: Agent metrics (the other family)

**Agent metrics**:
Telemetry about what agents and their harnesses are doing — token usage, tool calls, model API latency, session counts, and cost signals. These answer "what are the agents doing and what do they cost?" and are consumed by users and project owners. Prefixes: `gen_ai.*`, `agent.*` (following OpenTelemetry Generative AI semantic conventions). Produced inside agent containers by the harness and sciontool; exported to Cloud Monitoring via the telemetry pipeline (`pkg/sciontool/telemetry`).
_Avoid_: harness metrics, user metrics, LLM metrics
_See also_: Infrastructure metrics (the other family), Telemetry pipeline

**Telemetry pipeline**:
The in-container OTLP receiver and forwarding pipeline (`pkg/sciontool/telemetry`) that collects traces, metrics, and logs from the harness and exports them to a cloud backend (GCP Cloud Monitoring, Cloud Trace, Cloud Logging). Requires the `scion-telemetry-gcp-credentials` secret for cloud export; runs in local-only mode without it.
_Avoid_: metrics pipeline, collector, OTel collector
_See also_: Agent metrics

## Potential Future Additions

Terms that recur in the codebase and may warrant canonical entries, but are **not yet defined** here. Listed so they aren't lost; promote to full entries (verified against the code) as the glossary matures.
Expand Down
69 changes: 61 additions & 8 deletions cmd/server_foreground.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (
"github.com/GoogleCloudPlatform/scion/pkg/harness"
"github.com/GoogleCloudPlatform/scion/pkg/hub"
"github.com/GoogleCloudPlatform/scion/pkg/observability/dbmetrics"
"github.com/GoogleCloudPlatform/scion/pkg/observability/dispatchmetrics"
"github.com/GoogleCloudPlatform/scion/pkg/observability/hubmetrics"
scionplugin "github.com/GoogleCloudPlatform/scion/pkg/plugin"
"github.com/GoogleCloudPlatform/scion/pkg/runtime"
"github.com/GoogleCloudPlatform/scion/pkg/runtimebroker"
Expand Down Expand Up @@ -211,6 +213,7 @@ func runServerStart(cmd *cobra.Command, args []string) error {
// 11. Start Hub
var hubSrv *hub.Server
var secretBackend secret.SecretBackend
var hubDBRec dbmetrics.Recorder
if enableHub {
// Initialize secret backend early so signing keys can be loaded from it
// during hub server creation. This prevents the previous bug where
Expand All @@ -232,13 +235,62 @@ func runServerStart(cmd *cobra.Command, args []string) error {
log.Fatalf("Hub server failed to start: %v", hubInitErr)
}

// Wire hub OTel metrics export to Cloud Monitoring.
if cfg.Hub.GCPProjectID != "" {
mp, mpErr := hubmetrics.NewMeterProvider(ctx, cfg.Hub.GCPProjectID,
hubmetrics.WithHubID(hubSrv.HubID()),
)
if mpErr != nil {
log.Printf("WARNING: hub metrics export disabled: %v", mpErr)
} else {
defer func() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_ = mp.Shutdown(shutdownCtx)
}()

dbRec, dbErr := dbmetrics.New(mp)
if dbErr != nil {
log.Printf("WARNING: hub db metrics disabled: %v", dbErr)
} else {
hubDBRec = dbRec
hubSrv.SetDBMetrics(dbRec)
}

dispRec, dispErr := dispatchmetrics.New(mp)
if dispErr != nil {
log.Printf("WARNING: hub dispatch metrics disabled: %v", dispErr)
} else {
hubSrv.SetDispatchMetrics(dispRec)
}

if hubSrv.GetBrokerAuthService() != nil {
otelMetrics, otelAuthErr := hub.NewOTelMetricsRecorder(mp)
if otelAuthErr != nil {
log.Printf("WARNING: hub auth metrics OTel export disabled: %v", otelAuthErr)
} else {
hubSrv.SetMetrics(otelMetrics)
}
}

otelGCP, otelGCPErr := hub.NewOTelGCPTokenMetrics(mp)
if otelGCPErr != nil {
log.Printf("WARNING: hub GCP token metrics OTel export disabled: %v", otelGCPErr)
} else {
hubSrv.SetGCPTokenMetrics(otelGCP)
}

log.Printf("Hub OTel metrics export enabled (project: %s)", cfg.Hub.GCPProjectID)
}
}

// Wire command bus for cross-node dispatch (B2-4).
cmdBus := newCommandBus(ctx, cfg, hubSrv)
hubSrv.SetCommandBus(cmdBus)

if !enableWeb {
// Hub runs its own HTTP server (standalone mode).
eventPub := newEventPublisher(ctx, cfg)
eventPub := newEventPublisher(ctx, cfg, hubDBRec)
hubSrv.SetEventPublisher(eventPub)

log.Printf("Starting Hub API server on %s:%d", cfg.Hub.Host, cfg.Hub.Port)
Expand Down Expand Up @@ -266,7 +318,7 @@ func runServerStart(cmd *cobra.Command, args []string) error {
// 12. Start Web
var webSrv *hub.WebServer
if enableWeb {
webSrv = initWebServer(ctx, cfg, hubSrv, devAuthToken, adminEmailList, adminMode, maintenanceMessage, requestLogger)
webSrv = initWebServer(ctx, cfg, hubSrv, devAuthToken, adminEmailList, adminMode, maintenanceMessage, requestLogger, hubDBRec)

// In combined mode, start Hub background services now that the
// ChannelEventPublisher has been wired by initWebServer.
Expand Down Expand Up @@ -1167,11 +1219,12 @@ func initHubStorage(ctx context.Context, hubSrv *hub.Server, cfg *config.GlobalC
// ChannelEventPublisher. If the Postgres publisher cannot be started it falls
// back to the in-process publisher so a single instance still functions, logging
// a prominent warning since cross-replica SSE delivery will be unavailable.
func newEventPublisher(ctx context.Context, cfg *config.GlobalConfig) hub.EventPublisher {
func newEventPublisher(ctx context.Context, cfg *config.GlobalConfig, dbRec dbmetrics.Recorder) hub.EventPublisher {
if strings.EqualFold(cfg.Database.Driver, "postgres") {
// Metrics export is wired separately (see pkg/observability/dbmetrics);
// use a disabled recorder until a MeterProvider is configured.
pub, err := hub.NewPostgresEventPublisher(ctx, cfg.Database.URL, dbmetrics.NewDisabled(), logging.Subsystem("hub.events"))
if dbRec == nil {
dbRec = dbmetrics.NewDisabled()
}
pub, err := hub.NewPostgresEventPublisher(ctx, cfg.Database.URL, dbRec, logging.Subsystem("hub.events"))
if err != nil {
log.Printf("WARNING: failed to start Postgres event publisher (%v); falling back to in-process events. Cross-replica SSE will not work.", err)
return hub.NewChannelEventPublisher()
Expand Down Expand Up @@ -1208,7 +1261,7 @@ func newCommandBus(ctx context.Context, cfg *config.GlobalConfig, hubSrv *hub.Se
// initWebServer creates and configures the Web server. The provided context is
// threaded to the event publisher so that the Postgres LISTEN/NOTIFY goroutine
// is cancelled cleanly on shutdown, preventing connection leaks.
func initWebServer(ctx context.Context, cfg *config.GlobalConfig, hubSrv *hub.Server, devAuthToken string, adminEmailList []string, adminMode bool, maintenanceMessage string, requestLogger *slog.Logger) *hub.WebServer {
func initWebServer(ctx context.Context, cfg *config.GlobalConfig, hubSrv *hub.Server, devAuthToken string, adminEmailList []string, adminMode bool, maintenanceMessage string, requestLogger *slog.Logger, dbRec dbmetrics.Recorder) *hub.WebServer {
webHost := cfg.Hub.Host
if webHost == "" {
webHost = "0.0.0.0"
Expand Down Expand Up @@ -1264,7 +1317,7 @@ func initWebServer(ctx context.Context, cfg *config.GlobalConfig, hubSrv *hub.Se
webSrv.SetRequestLogger(requestLogger)

// Create shared event publisher for real-time SSE
eventPub := newEventPublisher(ctx, cfg)
eventPub := newEventPublisher(ctx, cfg, dbRec)
webSrv.SetEventPublisher(eventPub)

// Wire Hub services into WebServer if Hub is enabled
Expand Down
8 changes: 8 additions & 0 deletions pkg/hub/gcp_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ import (
"time"
)

// GCPTokenMetricsRecorder is the interface for recording GCP token metrics.
type GCPTokenMetricsRecorder interface {
RecordAccessTokenRequest(success bool, latency time.Duration)
RecordIDTokenRequest(success bool, latency time.Duration)
RecordRateLimitRejection()
GetSnapshot() *GCPTokenMetricsSnapshot
}

// GCPTokenMetrics tracks metrics for GCP token operations.
type GCPTokenMetrics struct {
// Access token counters
Expand Down
125 changes: 125 additions & 0 deletions pkg/hub/otel_gcp_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package hub

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/otel/metric"
)

// OTelGCPTokenMetrics implements GCPTokenMetricsRecorder using OTel
// instruments for Cloud Monitoring export. It embeds a GCPTokenMetrics for
// the /api/metrics JSON snapshot endpoint (dual-write).
type OTelGCPTokenMetrics struct {
accessRequests metric.Int64Counter
accessSuccesses metric.Int64Counter
accessFailures metric.Int64Counter
idRequests metric.Int64Counter
idSuccesses metric.Int64Counter
idFailures metric.Int64Counter
rateLimitRejects metric.Int64Counter
iamDuration metric.Float64Histogram

snap *GCPTokenMetrics
}

var _ GCPTokenMetricsRecorder = (*OTelGCPTokenMetrics)(nil)

// NewOTelGCPTokenMetrics creates an OTel-backed GCP token metrics recorder.
func NewOTelGCPTokenMetrics(mp metric.MeterProvider) (*OTelGCPTokenMetrics, error) {
m := mp.Meter(instrumentationScope)
r := &OTelGCPTokenMetrics{snap: NewGCPTokenMetrics()}

var err error

if r.accessRequests, err = m.Int64Counter("scion.hub.gcp.token.access.requests",
metric.WithUnit("{request}"),
); err != nil {
return nil, fmt.Errorf("creating gcp.token.access.requests counter: %w", err)
}
if r.accessSuccesses, err = m.Int64Counter("scion.hub.gcp.token.access.successes",
metric.WithUnit("{request}"),
); err != nil {
return nil, fmt.Errorf("creating gcp.token.access.successes counter: %w", err)
}
if r.accessFailures, err = m.Int64Counter("scion.hub.gcp.token.access.failures",
metric.WithUnit("{request}"),
); err != nil {
return nil, fmt.Errorf("creating gcp.token.access.failures counter: %w", err)
}
if r.idRequests, err = m.Int64Counter("scion.hub.gcp.token.identity.requests",
metric.WithUnit("{request}"),
); err != nil {
return nil, fmt.Errorf("creating gcp.token.identity.requests counter: %w", err)
}
if r.idSuccesses, err = m.Int64Counter("scion.hub.gcp.token.identity.successes",
metric.WithUnit("{request}"),
); err != nil {
return nil, fmt.Errorf("creating gcp.token.identity.successes counter: %w", err)
}
if r.idFailures, err = m.Int64Counter("scion.hub.gcp.token.identity.failures",
metric.WithUnit("{request}"),
); err != nil {
return nil, fmt.Errorf("creating gcp.token.identity.failures counter: %w", err)
}
if r.rateLimitRejects, err = m.Int64Counter("scion.hub.gcp.token.ratelimit.rejections",
metric.WithUnit("{rejection}"),
); err != nil {
return nil, fmt.Errorf("creating gcp.token.ratelimit.rejections counter: %w", err)
}
if r.iamDuration, err = m.Float64Histogram("scion.hub.gcp.iam.duration",
metric.WithUnit("ms"),
); err != nil {
return nil, fmt.Errorf("creating gcp.iam.duration histogram: %w", err)
}

return r, nil
}

func (r *OTelGCPTokenMetrics) RecordAccessTokenRequest(success bool, latency time.Duration) {
ctx := context.Background()
r.accessRequests.Add(ctx, 1)
if success {
r.accessSuccesses.Add(ctx, 1)
} else {
r.accessFailures.Add(ctx, 1)
}
r.iamDuration.Record(ctx, float64(latency.Milliseconds()))
r.snap.RecordAccessTokenRequest(success, latency)
}

func (r *OTelGCPTokenMetrics) RecordIDTokenRequest(success bool, latency time.Duration) {
ctx := context.Background()
r.idRequests.Add(ctx, 1)
if success {
r.idSuccesses.Add(ctx, 1)
} else {
r.idFailures.Add(ctx, 1)
}
r.iamDuration.Record(ctx, float64(latency.Milliseconds()))
r.snap.RecordIDTokenRequest(success, latency)
}

func (r *OTelGCPTokenMetrics) RecordRateLimitRejection() {
r.rateLimitRejects.Add(context.Background(), 1)
r.snap.RecordRateLimitRejection()
}

func (r *OTelGCPTokenMetrics) GetSnapshot() *GCPTokenMetricsSnapshot {
return r.snap.GetSnapshot()
}
Loading
Loading