Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func main() {
if err == nil {
cfg.AuthDir = pgStoreInst.AuthDir()
log.Infof("postgres-backed token store enabled, workspace path: %s", pgStoreInst.WorkDir())
pgStoreInst.StartAuthSync(context.Background(), 30*time.Second, nil)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Use a bounded context for auth sync polling

StartAuthSync is started with context.Background(), but incrementalAuthSync takes the store mutex before QueryContext; if the DB call stalls (for example during a network partition), the poll can block indefinitely while holding s.mu, which also blocks Save, Delete, and PersistAuthFiles on that replica. Please pass a cancellable context with a per-poll timeout instead of an unbounded background context.

Useful? React with 👍 / 👎.

}
} else if useObjectStore {
if objectStoreLocalPath == "" {
Expand Down
106 changes: 106 additions & 0 deletions internal/store/postgresstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,88 @@ func (s *PostgresStore) EnsureSchema(ctx context.Context) error {
return nil
}

// StartAuthSync polls the auth_store table and incrementally updates local auth files
// when changes are detected. This enables multi-replica setups: when one pod
// adds/refreshes credentials, other pods pick them up within the poll interval.
// Only files whose content has actually changed are written, avoiding unnecessary
// fsnotify events and file system churn.
func (s *PostgresStore) StartAuthSync(ctx context.Context, interval time.Duration, onChange func()) {
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(interval):
changed, err := s.incrementalAuthSync(ctx)
if err != nil {
log.Printf("postgres store: auth sync poll: %v", err)
continue
}
if changed && onChange != nil {
onChange()
}
}
}
}()
}

// incrementalAuthSync compares PG auth records with local files and writes only changes.
func (s *PostgresStore) incrementalAuthSync(ctx context.Context) (bool, error) {
s.mu.Lock()
defer s.mu.Unlock()

query := fmt.Sprintf("SELECT id, content FROM %s", s.fullTableName(s.cfg.AuthTable))
rows, err := s.db.QueryContext(ctx, query)
if err != nil {
return false, fmt.Errorf("load auth from database: %w", err)
}
defer rows.Close()

changed := false
seen := make(map[string]bool)
for rows.Next() {
var id, payload string
if err := rows.Scan(&id, &payload); err != nil {
return false, fmt.Errorf("scan auth row: %w", err)
}
seen[id] = true
path, errPath := s.absoluteAuthPath(id)
if errPath != nil {
continue
}
existing, errRead := os.ReadFile(path)
if errRead == nil && string(existing) == payload {
continue
}
if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil {
return false, fmt.Errorf("create auth subdir: %w", err)
}
if err := os.WriteFile(path, []byte(payload), 0o600); err != nil {
return false, fmt.Errorf("write auth file: %w", err)
Comment on lines +202 to +203
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Prevent poller from clobbering pending local auth edits

This overwrite path applies whenever file bytes differ from the DB payload, so in PG mode a direct auth-file edit can be lost if the poll runs before watcher-driven PersistAuthFiles finishes (that persistence is asynchronous): the poll writes stale DB content back to disk, and the stale version is then re-persisted. That makes file-based auth updates non-deterministic under concurrent polling.

Useful? React with 👍 / 👎.

}
changed = true
}
if err := rows.Err(); err != nil {
return false, fmt.Errorf("iterate auth rows: %w", err)
}

_ = filepath.WalkDir(s.authDir, func(path string, d fs.DirEntry, err error) error {
if err != nil || d.IsDir() {
return nil
}
rel, relErr := filepath.Rel(s.authDir, path)
if relErr != nil || seen[rel] {
return nil
}
if removeErr := os.Remove(path); removeErr == nil {
changed = true
}
return nil
})

return changed, nil
}

// Bootstrap synchronizes configuration and auth records between PostgreSQL and the local workspace.
func (s *PostgresStore) Bootstrap(ctx context.Context, exampleConfigPath string) error {
if err := s.EnsureSchema(ctx); err != nil {
Expand Down Expand Up @@ -393,7 +475,31 @@ func (s *PostgresStore) PersistConfig(ctx context.Context) error {
}

// syncConfigFromDatabase writes the database-stored config to disk or seeds the database from template.
// If PGSTORE_CONFIG_OVERRIDE_PATH is set, that file always wins over the database — the file
// is read, written into PG, and synced to the local spool. This lets k8s ConfigMaps override
// PG-stored config on every pod restart.
func (s *PostgresStore) syncConfigFromDatabase(ctx context.Context, exampleConfigPath string) error {
if overridePath := os.Getenv("PGSTORE_CONFIG_OVERRIDE_PATH"); overridePath != "" {
data, readErr := os.ReadFile(overridePath)
if readErr != nil {
log.Printf("postgres store: cannot read config override %s: %v (falling back to DB)", overridePath, readErr)
}
if readErr == nil {
if errPersist := s.persistConfig(ctx, data); errPersist != nil {
log.Printf("postgres store: persist config override: %v (continuing with DB config)", errPersist)
} else {
if errDir := os.MkdirAll(filepath.Dir(s.configPath), 0o700); errDir != nil {
return fmt.Errorf("postgres store: prepare config directory: %w", errDir)
}
if errWrite := os.WriteFile(s.configPath, data, 0o600); errWrite != nil {
return fmt.Errorf("postgres store: write config override to spool: %w", errWrite)
}
log.Printf("postgres store: config overridden from %s", overridePath)
return nil
}
}
}

query := fmt.Sprintf("SELECT content FROM %s WHERE id = $1", s.fullTableName(s.cfg.ConfigTable))
var content string
err := s.db.QueryRowContext(ctx, query, defaultConfigKey).Scan(&content)
Expand Down
Loading