diff --git a/cmd/server/main.go b/cmd/server/main.go index e12e5261b6..18dd580456 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -265,6 +265,7 @@ func main() { if err == nil { cfg.AuthDir = pgStoreInst.AuthDir() log.Infof("postgres-backed token store enabled, workspace path: %s", pgStoreInst.WorkDir()) + pgStoreInst.StartAuthSync(context.Background(), 30*time.Second, nil) } } else if useObjectStore { if objectStoreLocalPath == "" { diff --git a/internal/store/postgresstore.go b/internal/store/postgresstore.go index 527b25cc12..253a1cd9ed 100644 --- a/internal/store/postgresstore.go +++ b/internal/store/postgresstore.go @@ -143,6 +143,88 @@ func (s *PostgresStore) EnsureSchema(ctx context.Context) error { return nil } +// StartAuthSync polls the auth_store table and incrementally updates local auth files +// when changes are detected. This enables multi-replica setups: when one pod +// adds/refreshes credentials, other pods pick them up within the poll interval. +// Only files whose content has actually changed are written, avoiding unnecessary +// fsnotify events and file system churn. +func (s *PostgresStore) StartAuthSync(ctx context.Context, interval time.Duration, onChange func()) { + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(interval): + changed, err := s.incrementalAuthSync(ctx) + if err != nil { + log.Printf("postgres store: auth sync poll: %v", err) + continue + } + if changed && onChange != nil { + onChange() + } + } + } + }() +} + +// incrementalAuthSync compares PG auth records with local files and writes only changes. +func (s *PostgresStore) incrementalAuthSync(ctx context.Context) (bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + + query := fmt.Sprintf("SELECT id, content FROM %s", s.fullTableName(s.cfg.AuthTable)) + rows, err := s.db.QueryContext(ctx, query) + if err != nil { + return false, fmt.Errorf("load auth from database: %w", err) + } + defer rows.Close() + + changed := false + seen := make(map[string]bool) + for rows.Next() { + var id, payload string + if err := rows.Scan(&id, &payload); err != nil { + return false, fmt.Errorf("scan auth row: %w", err) + } + seen[id] = true + path, errPath := s.absoluteAuthPath(id) + if errPath != nil { + continue + } + existing, errRead := os.ReadFile(path) + if errRead == nil && string(existing) == payload { + continue + } + if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil { + return false, fmt.Errorf("create auth subdir: %w", err) + } + if err := os.WriteFile(path, []byte(payload), 0o600); err != nil { + return false, fmt.Errorf("write auth file: %w", err) + } + changed = true + } + if err := rows.Err(); err != nil { + return false, fmt.Errorf("iterate auth rows: %w", err) + } + + _ = filepath.WalkDir(s.authDir, func(path string, d fs.DirEntry, err error) error { + if err != nil || d.IsDir() { + return nil + } + rel, relErr := filepath.Rel(s.authDir, path) + if relErr != nil || seen[rel] { + return nil + } + if removeErr := os.Remove(path); removeErr == nil { + changed = true + } + return nil + }) + + return changed, nil +} + // Bootstrap synchronizes configuration and auth records between PostgreSQL and the local workspace. func (s *PostgresStore) Bootstrap(ctx context.Context, exampleConfigPath string) error { if err := s.EnsureSchema(ctx); err != nil { @@ -393,7 +475,31 @@ func (s *PostgresStore) PersistConfig(ctx context.Context) error { } // syncConfigFromDatabase writes the database-stored config to disk or seeds the database from template. +// If PGSTORE_CONFIG_OVERRIDE_PATH is set, that file always wins over the database — the file +// is read, written into PG, and synced to the local spool. This lets k8s ConfigMaps override +// PG-stored config on every pod restart. func (s *PostgresStore) syncConfigFromDatabase(ctx context.Context, exampleConfigPath string) error { + if overridePath := os.Getenv("PGSTORE_CONFIG_OVERRIDE_PATH"); overridePath != "" { + data, readErr := os.ReadFile(overridePath) + if readErr != nil { + log.Printf("postgres store: cannot read config override %s: %v (falling back to DB)", overridePath, readErr) + } + if readErr == nil { + if errPersist := s.persistConfig(ctx, data); errPersist != nil { + log.Printf("postgres store: persist config override: %v (continuing with DB config)", errPersist) + } else { + if errDir := os.MkdirAll(filepath.Dir(s.configPath), 0o700); errDir != nil { + return fmt.Errorf("postgres store: prepare config directory: %w", errDir) + } + if errWrite := os.WriteFile(s.configPath, data, 0o600); errWrite != nil { + return fmt.Errorf("postgres store: write config override to spool: %w", errWrite) + } + log.Printf("postgres store: config overridden from %s", overridePath) + return nil + } + } + } + query := fmt.Sprintf("SELECT content FROM %s WHERE id = $1", s.fullTableName(s.cfg.ConfigTable)) var content string err := s.db.QueryRowContext(ctx, query, defaultConfigKey).Scan(&content)