diff --git a/internal/database/database.go b/internal/database/database.go index c2d71ee..fa79172 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -315,6 +315,7 @@ func AutoMigrate(db *gorm.DB) error { &PullRequestChangeFile{}, &PullRequestChangeHunk{}, &RepoChangeSyncState{}, + &RepoOpenPullInventory{}, &WebhookDelivery{}, &RepositoryRefreshJob{}, ) diff --git a/internal/database/git_changes.go b/internal/database/git_changes.go index 71c986c..e65a961 100644 --- a/internal/database/git_changes.go +++ b/internal/database/git_changes.go @@ -183,3 +183,19 @@ type RepoChangeSyncState struct { CreatedAt time.Time UpdatedAt time.Time } + +type RepoOpenPullInventory struct { + ID uint `gorm:"primaryKey"` + RepositoryID uint `gorm:"uniqueIndex:idx_repo_open_pull_inventory_repo_pr,priority:1;index:idx_repo_open_pull_inventory_repo_freshness_updated,priority:1"` + PullRequestNumber int `gorm:"uniqueIndex:idx_repo_open_pull_inventory_repo_pr,priority:2;index:idx_repo_open_pull_inventory_repo_freshness_updated,priority:3"` + GitHubUpdatedAt time.Time `gorm:"column:github_updated_at"` + HeadSHA string + BaseSHA string + BaseRef string + State string + Draft bool + FreshnessState string `gorm:"index:idx_repo_open_pull_inventory_repo_freshness_updated,priority:2"` + LastSeenAt time.Time + CreatedAt time.Time + UpdatedAt time.Time +} diff --git a/internal/githubsync/change_sync.go b/internal/githubsync/change_sync.go index e310980..7dcea9d 100644 --- a/internal/githubsync/change_sync.go +++ b/internal/githubsync/change_sync.go @@ -39,13 +39,7 @@ type RepoBackfillResult struct { } type backfillCandidate struct { - pull gh.PullRequestResponse - snapshot *database.PullRequestChangeSnapshot -} - -type repoOpenPullScan struct { - Result RepoBackfillResult - Candidates []backfillCandidate + inventory database.RepoOpenPullInventory } type ChangeSyncWorker struct { @@ -307,14 +301,22 @@ func (s *Service) BackfillOpenPullRequests(ctx context.Context, owner, repo stri return RepoBackfillResult{}, err } - scan, err := s.scanOpenPullRequests(ctx, owner, repo, repository.ID, state) + candidates, err := s.listBackfillCandidatesFromInventory(ctx, repository.ID, state.OpenPRCursorUpdatedAt, state.OpenPRCursorNumber, options.MaxPRs) if err != nil { return RepoBackfillResult{}, err } - result := scan.Result - candidates := scan.Candidates + stateCounts, err := s.repoChangeStateByRepositoryID(ctx, repository.ID) + if err != nil { + return RepoBackfillResult{}, err + } + result := RepoBackfillResult{ + OpenPRTotal: stateCounts.OpenPRTotal, + OpenPRCurrent: stateCounts.OpenPRCurrent, + OpenPRStale: stateCounts.OpenPRStale, + OpenPRMissing: maxInt(0, stateCounts.OpenPRTotal-stateCounts.OpenPRCurrent-stateCounts.OpenPRStale), + } deadline := time.Now().UTC().Add(options.MaxRuntime) - var lastProcessed *backfillCandidate + var lastProcessed *database.RepoOpenPullInventory for _, candidate := range candidates { if result.ProcessedPRs >= options.MaxPRs || time.Now().UTC().After(deadline) { break @@ -323,57 +325,85 @@ func (s *Service) BackfillOpenPullRequests(ctx context.Context, owner, repo stri return RepoBackfillResult{}, err } result.ProcessedPRs++ - if err := s.syncPullRequestChangeOnly(ctx, owner, repo, repository, candidate.pull.Number); err != nil { + newFreshness := candidate.inventory.FreshnessState + updatedInventory := candidate.inventory + pull, err := s.syncPullRequestChangeOnly(ctx, owner, repo, repository, candidate.inventory.PullRequestNumber) + if err != nil { result.FailedPRs++ - continue + if strings.TrimSpace(newFreshness) == "" { + newFreshness = "failed" + } + } else { + result.IndexedPRs++ + updatedInventory = inventoryFromPull(repository.ID, pull) + var freshnessErr error + newFreshness, freshnessErr = s.reconcileInventoryFreshness(ctx, repository.ID, updatedInventory) + if freshnessErr != nil { + return RepoBackfillResult{}, freshnessErr + } + } + if err := s.advanceBackfillProgress(ctx, state.ID, updatedInventory, newFreshness); err != nil { + return RepoBackfillResult{}, err } - result.IndexedPRs++ - copyCandidate := candidate - lastProcessed = ©Candidate + candidateCopy := updatedInventory + lastProcessed = &candidateCopy } if lastProcessed != nil { - idx := findCandidateIndex(candidates, *lastProcessed) - if idx >= 0 && idx < len(candidates)-1 { - next := lastProcessed.pull + more, err := s.hasBackfillCandidatesAfterCursor(ctx, repository.ID, lastProcessed.GitHubUpdatedAt, lastProcessed.PullRequestNumber) + if err != nil { + return RepoBackfillResult{}, err + } + if more { result.Completed = false - result.NextCursorNum = intPtr(next.Number) - nextTime := next.UpdatedAt.UTC() + result.NextCursorNum = intPtr(lastProcessed.PullRequestNumber) + nextTime := lastProcessed.GitHubUpdatedAt.UTC() result.NextCursorTime = &nextTime } else { result.Completed = true } } else { - result.Completed = len(candidates) == 0 + any, err := s.hasAnyBackfillCandidates(ctx, repository.ID) + if err != nil { + return RepoBackfillResult{}, err + } + result.Completed = !any } - postScan, err := s.scanOpenPullRequests(ctx, owner, repo, repository.ID, state) + finalState, err := s.repoChangeStateByRepositoryID(ctx, repository.ID) if err != nil { return RepoBackfillResult{}, err } - result.OpenPRTotal = postScan.Result.OpenPRTotal - result.OpenPRCurrent = postScan.Result.OpenPRCurrent - result.OpenPRStale = postScan.Result.OpenPRStale - result.OpenPRMissing = postScan.Result.OpenPRMissing + result.OpenPRTotal = finalState.OpenPRTotal + result.OpenPRCurrent = finalState.OpenPRCurrent + result.OpenPRStale = finalState.OpenPRStale + result.OpenPRMissing = maxInt(0, finalState.OpenPRTotal-finalState.OpenPRCurrent-finalState.OpenPRStale) + if result.Completed { + result.NextCursorNum = nil + result.NextCursorTime = nil + } return result, nil } -func (s *Service) syncPullRequestChangeOnly(ctx context.Context, owner, repo string, canonicalRepo database.Repository, number int) error { +func (s *Service) syncPullRequestChangeOnly(ctx context.Context, owner, repo string, canonicalRepo database.Repository, number int) (gh.PullRequestResponse, error) { issue, err := s.github.GetIssue(ctx, owner, repo, number) if err != nil { - return err + return gh.PullRequestResponse{}, err } if _, err := s.upsertIssue(ctx, canonicalRepo.ID, issue); err != nil { - return err + return gh.PullRequestResponse{}, err } pull, err := s.github.GetPullRequest(ctx, owner, repo, number) if err != nil { - return err + return gh.PullRequestResponse{}, err } if err := s.upsertPullRequest(ctx, canonicalRepo.ID, pull); err != nil { - return err + return gh.PullRequestResponse{}, err } - return s.SyncPullRequestIndex(ctx, owner, repo, canonicalRepo.ID, pull) + if err := s.SyncPullRequestIndex(ctx, owner, repo, canonicalRepo.ID, pull); err != nil { + return gh.PullRequestResponse{}, err + } + return pull, nil } func (w *ChangeSyncWorker) processDirtyRepo(ctx context.Context) (bool, error) { @@ -454,11 +484,11 @@ func (w *ChangeSyncWorker) runFetchPass(ctx context.Context, state database.Repo return w.finishFetchStateWithError(ctx, state, err) } - scan, err := w.service.scanOpenPullRequests(ctx, owner, name, repository.ID, state) + result, err := w.service.syncOpenPullInventory(ctx, owner, name, repository.ID) if err != nil { return w.finishFetchStateWithError(ctx, state, err) } - return w.completeFetchPass(ctx, state, scan.Result) + return w.completeFetchPass(ctx, state, result) } func (w *ChangeSyncWorker) runBackfillPass(ctx context.Context, state database.RepoChangeSyncState, _ bool) error { @@ -482,12 +512,8 @@ func (w *ChangeSyncWorker) runBackfillPass(ctx context.Context, state database.R updates := map[string]any{ "last_error": "", - "open_pr_total": result.OpenPRTotal, - "open_pr_current": result.OpenPRCurrent, - "open_pr_stale": result.OpenPRStale, "open_pr_cursor_number": result.NextCursorNum, "open_pr_cursor_updated_at": result.NextCursorTime, - "last_open_pr_scan_at": time.Now().UTC(), } return w.completeBackfillPass(ctx, state, updates) } @@ -505,6 +531,10 @@ func (w *ChangeSyncWorker) completeFetchPass(ctx context.Context, state database "fetch_lease_until": nil, "updated_at": now, } + if result.OpenPRCurrent == result.OpenPRTotal { + updates["open_pr_cursor_number"] = nil + updates["open_pr_cursor_updated_at"] = nil + } if err := w.db.WithContext(ctx).Model(&database.RepoChangeSyncState{}).Where("id = ?", state.ID).Updates(updates).Error; err != nil { return err } @@ -636,10 +666,10 @@ func normalizeBackfillMode(mode string) string { } } -func (s *Service) scanOpenPullRequests(ctx context.Context, owner, repo string, repositoryID uint, state database.RepoChangeSyncState) (repoOpenPullScan, error) { +func (s *Service) syncOpenPullInventory(ctx context.Context, owner, repo string, repositoryID uint) (RepoBackfillResult, error) { openPulls, err := s.github.ListPullRequests(ctx, owner, repo, "open") if err != nil { - return repoOpenPullScan{}, err + return RepoBackfillResult{}, err } sort.Slice(openPulls, func(i, j int) bool { if openPulls[i].UpdatedAt.Equal(openPulls[j].UpdatedAt) { @@ -650,42 +680,71 @@ func (s *Service) scanOpenPullRequests(ctx context.Context, owner, repo string, snapshotMap, err := s.pullRequestSnapshotMap(ctx, repositoryID) if err != nil { - return repoOpenPullScan{}, err + return RepoBackfillResult{}, err } result := RepoBackfillResult{OpenPRTotal: len(openPulls)} - candidates := make([]backfillCandidate, 0, len(openPulls)) now := time.Now().UTC() + seen := make([]int, 0, len(openPulls)) - for _, pull := range openPulls { - snapshot := snapshotMap[pull.Number] - freshness := desiredFreshness(snapshot, pull) - switch freshness { - case "current": - result.OpenPRCurrent++ - case "": - result.OpenPRMissing++ - candidates = append(candidates, backfillCandidate{pull: pull}) - default: - result.OpenPRStale++ - candidates = append(candidates, backfillCandidate{pull: pull, snapshot: snapshot}) + if err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + for _, pull := range openPulls { + snapshot := snapshotMap[pull.Number] + freshness := desiredFreshness(snapshot, pull) if snapshot != nil && snapshot.IndexFreshness != freshness { - if err := s.db.WithContext(ctx). - Model(&database.PullRequestChangeSnapshot{}). + if err := tx.Model(&database.PullRequestChangeSnapshot{}). Where("id = ?", snapshot.ID). Updates(map[string]any{ "index_freshness": freshness, "updated_at": now, }).Error; err != nil { - return repoOpenPullScan{}, err + return err } } + + inventory := database.RepoOpenPullInventory{ + RepositoryID: repositoryID, + PullRequestNumber: pull.Number, + GitHubUpdatedAt: pull.UpdatedAt.UTC(), + HeadSHA: strings.TrimSpace(pull.Head.SHA), + BaseSHA: strings.TrimSpace(pull.Base.SHA), + BaseRef: strings.TrimSpace(pull.Base.Ref), + State: strings.TrimSpace(pull.State), + Draft: pull.Draft, + FreshnessState: freshness, + LastSeenAt: now, + } + if err := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "repository_id"}, {Name: "pull_request_number"}}, + DoUpdates: clause.Assignments(map[string]any{ + "github_updated_at": inventory.GitHubUpdatedAt, + "head_sha": inventory.HeadSHA, + "base_sha": inventory.BaseSHA, + "base_ref": inventory.BaseRef, + "state": inventory.State, + "draft": inventory.Draft, + "freshness_state": inventory.FreshnessState, + "last_seen_at": inventory.LastSeenAt, + "updated_at": now, + }), + }).Create(&inventory).Error; err != nil { + return err + } + + seen = append(seen, pull.Number) + result.OpenPRCurrent, result.OpenPRStale = adjustBackfillCounts(result.OpenPRCurrent, result.OpenPRStale, "", freshness) + } + + prune := tx.Where("repository_id = ?", repositoryID) + if len(seen) > 0 { + prune = prune.Where("pull_request_number NOT IN ?", seen) } + return prune.Delete(&database.RepoOpenPullInventory{}).Error + }); err != nil { + return RepoBackfillResult{}, err } - return repoOpenPullScan{ - Result: result, - Candidates: applyCandidateCursor(candidates, state.OpenPRCursorUpdatedAt, state.OpenPRCursorNumber), - }, nil + result.OpenPRMissing = maxInt(0, result.OpenPRTotal-result.OpenPRCurrent-result.OpenPRStale) + return result, nil } func normalizeBackfillBaseRef(ref string) string { @@ -693,6 +752,188 @@ func normalizeBackfillBaseRef(ref string) string { return strings.TrimPrefix(ref, "refs/heads/") } +func (s *Service) listBackfillCandidatesFromInventory(ctx context.Context, repositoryID uint, cursorTime *time.Time, cursorNumber *int, limit int) ([]backfillCandidate, error) { + if limit <= 0 { + limit = 10 + } + var inventoryRows []database.RepoOpenPullInventory + query := s.backfillInventoryQuery(ctx, repositoryID, cursorTime, cursorNumber). + Order("github_updated_at DESC"). + Order("pull_request_number DESC"). + Limit(limit) + if err := query.Find(&inventoryRows).Error; err != nil { + return nil, err + } + candidates := make([]backfillCandidate, 0, len(inventoryRows)) + for _, row := range inventoryRows { + candidates = append(candidates, backfillCandidate{inventory: row}) + } + return candidates, nil +} + +func (s *Service) hasBackfillCandidatesAfterCursor(ctx context.Context, repositoryID uint, cursorTime time.Time, cursorNumber int) (bool, error) { + var count int64 + err := s.backfillInventoryQuery(ctx, repositoryID, &cursorTime, &cursorNumber). + Limit(1). + Count(&count).Error + return count > 0, err +} + +func (s *Service) hasAnyBackfillCandidates(ctx context.Context, repositoryID uint) (bool, error) { + var count int64 + err := s.backfillInventoryQuery(ctx, repositoryID, nil, nil). + Limit(1). + Count(&count).Error + return count > 0, err +} + +func (s *Service) backfillInventoryQuery(ctx context.Context, repositoryID uint, cursorTime *time.Time, cursorNumber *int) *gorm.DB { + query := s.db.WithContext(ctx). + Model(&database.RepoOpenPullInventory{}). + Where("repository_id = ?", repositoryID). + Where("freshness_state <> ?", "current") + if cursorTime != nil && cursorNumber != nil { + updatedAt := cursorTime.UTC() + query = query.Where( + "(github_updated_at < ?) OR (github_updated_at = ? AND pull_request_number < ?)", + updatedAt, + updatedAt, + *cursorNumber, + ) + } + return query +} + +func (s *Service) reconcileInventoryFreshness(ctx context.Context, repositoryID uint, inventory database.RepoOpenPullInventory) (string, error) { + snapshot, err := s.pullRequestSnapshotOptional(ctx, repositoryID, inventory.PullRequestNumber) + if err != nil { + return "", err + } + if snapshot == nil { + return "failed", nil + } + freshness := desiredInventoryFreshness(snapshot, inventory) + if snapshot.IndexFreshness != freshness { + if err := s.db.WithContext(ctx). + Model(&database.PullRequestChangeSnapshot{}). + Where("id = ?", snapshot.ID). + Updates(map[string]any{ + "index_freshness": freshness, + "updated_at": time.Now().UTC(), + }).Error; err != nil { + return "", err + } + } + return freshness, nil +} + +func inventoryFromPull(repositoryID uint, pull gh.PullRequestResponse) database.RepoOpenPullInventory { + return database.RepoOpenPullInventory{ + RepositoryID: repositoryID, + PullRequestNumber: pull.Number, + GitHubUpdatedAt: pull.UpdatedAt.UTC(), + HeadSHA: strings.TrimSpace(pull.Head.SHA), + BaseSHA: strings.TrimSpace(pull.Base.SHA), + BaseRef: strings.TrimSpace(pull.Base.Ref), + State: strings.TrimSpace(pull.State), + Draft: pull.Draft, + } +} + +func (s *Service) advanceBackfillProgress(ctx context.Context, stateID uint, inventory database.RepoOpenPullInventory, newFreshness string) error { + now := time.Now().UTC() + return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + var stored database.RepoOpenPullInventory + if err := tx.Where("repository_id = ? AND pull_request_number = ?", inventory.RepositoryID, inventory.PullRequestNumber). + First(&stored).Error; err != nil { + return err + } + var state database.RepoChangeSyncState + if err := tx.Where("id = ?", stateID).First(&state).Error; err != nil { + return err + } + + nextCurrent, nextStale := adjustBackfillCounts(state.OpenPRCurrent, state.OpenPRStale, stored.FreshnessState, newFreshness) + if err := tx.Model(&database.RepoOpenPullInventory{}). + Where("id = ?", stored.ID). + Updates(map[string]any{ + "github_updated_at": inventory.GitHubUpdatedAt, + "head_sha": inventory.HeadSHA, + "base_sha": inventory.BaseSHA, + "base_ref": inventory.BaseRef, + "state": inventory.State, + "draft": inventory.Draft, + "freshness_state": newFreshness, + "updated_at": now, + }).Error; err != nil { + return err + } + return tx.Model(&database.RepoChangeSyncState{}). + Where("id = ?", state.ID). + Updates(map[string]any{ + "open_pr_current": nextCurrent, + "open_pr_stale": nextStale, + "open_pr_cursor_number": inventory.PullRequestNumber, + "open_pr_cursor_updated_at": inventory.GitHubUpdatedAt.UTC(), + "updated_at": now, + }).Error + }) +} + +func (s *Service) pullRequestSnapshotOptional(ctx context.Context, repositoryID uint, number int) (*database.PullRequestChangeSnapshot, error) { + var snapshot database.PullRequestChangeSnapshot + err := s.db.WithContext(ctx). + Where("repository_id = ? AND pull_request_number = ?", repositoryID, number). + First(&snapshot).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + return nil, err + } + return &snapshot, nil +} + +func desiredInventoryFreshness(snapshot *database.PullRequestChangeSnapshot, inventory database.RepoOpenPullInventory) string { + return desiredFreshness(snapshot, gh.PullRequestResponse{ + Head: gh.PullBranch{SHA: inventory.HeadSHA}, + Base: gh.PullBranch{SHA: inventory.BaseSHA, Ref: inventory.BaseRef}, + }) +} + +func adjustBackfillCounts(current, stale int, oldFreshness, newFreshness string) (int, int) { + switch backfillFreshnessCategory(oldFreshness) { + case "current": + current-- + case "stale": + stale-- + } + switch backfillFreshnessCategory(newFreshness) { + case "current": + current++ + case "stale": + stale++ + } + if current < 0 { + current = 0 + } + if stale < 0 { + stale = 0 + } + return current, stale +} + +func backfillFreshnessCategory(freshness string) string { + switch strings.TrimSpace(freshness) { + case "": + return "missing" + case "current": + return "current" + default: + return "stale" + } +} + func (s *Service) heartbeatBackfillLease(ctx context.Context, stateID uint, leaseTTL time.Duration) error { if stateID == 0 || leaseTTL <= 0 { return nil @@ -706,28 +947,6 @@ func (s *Service) heartbeatBackfillLease(ctx context.Context, stateID uint, leas }).Error } -func applyCandidateCursor(candidates []backfillCandidate, cursorTime *time.Time, cursorNumber *int) []backfillCandidate { - if cursorTime == nil || cursorNumber == nil { - return candidates - } - for i, candidate := range candidates { - updatedAt := candidate.pull.UpdatedAt.UTC() - if updatedAt.Before(cursorTime.UTC()) || (updatedAt.Equal(cursorTime.UTC()) && candidate.pull.Number < *cursorNumber) { - return candidates[i:] - } - } - return nil -} - -func findCandidateIndex(candidates []backfillCandidate, target backfillCandidate) int { - for i, candidate := range candidates { - if candidate.pull.Number == target.pull.Number && candidate.pull.UpdatedAt.Equal(target.pull.UpdatedAt) { - return i - } - } - return -1 -} - func intPtr(v int) *int { return &v } diff --git a/internal/githubsync/change_sync_test.go b/internal/githubsync/change_sync_test.go index 66d6d80..71d9578 100644 --- a/internal/githubsync/change_sync_test.go +++ b/internal/githubsync/change_sync_test.go @@ -8,6 +8,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "testing" "time" @@ -61,6 +62,11 @@ func TestChangeSyncWorkerBackfillsOpenPullRequestsGradually(t *testing.T) { require.Equal(t, 3, status.OpenPRMissing) require.False(t, status.Dirty) require.Nil(t, status.OpenPRCursorNumber) + require.Equal(t, 1, server.ListPullCount()) + + var inventoryRows int64 + require.NoError(t, db.WithContext(ctx).Model(&database.RepoOpenPullInventory{}).Count(&inventoryRows).Error) + require.EqualValues(t, 3, inventoryRows) processed, err = worker.RunOnce(ctx) require.NoError(t, err) @@ -72,6 +78,7 @@ func TestChangeSyncWorkerBackfillsOpenPullRequestsGradually(t *testing.T) { require.Equal(t, 2, status.OpenPRMissing) require.False(t, status.Dirty) require.NotNil(t, status.OpenPRCursorNumber) + require.Equal(t, 1, server.ListPullCount()) processed, err = worker.RunOnce(ctx) require.NoError(t, err) @@ -82,6 +89,7 @@ func TestChangeSyncWorkerBackfillsOpenPullRequestsGradually(t *testing.T) { require.Equal(t, 2, status.OpenPRCurrent) require.Equal(t, 1, status.OpenPRMissing) require.False(t, status.Dirty) + require.Equal(t, 1, server.ListPullCount()) processed, err = worker.RunOnce(ctx) require.NoError(t, err) @@ -93,6 +101,7 @@ func TestChangeSyncWorkerBackfillsOpenPullRequestsGradually(t *testing.T) { require.Equal(t, 0, status.OpenPRMissing) require.False(t, status.Dirty) require.Nil(t, status.OpenPRCursorNumber) + require.Equal(t, 1, server.ListPullCount()) prStatus, err := service.GetPullRequestChangeStatus(ctx, "acme", "widgets", 101) require.NoError(t, err) @@ -142,6 +151,7 @@ func TestChangeSyncWorkerBackfillsWhileRepoRemainsDirty(t *testing.T) { require.NoError(t, err) firstFetchStarted := status.LastFetchStartedAt require.NotNil(t, firstFetchStarted) + require.Equal(t, 1, server.ListPullCount()) processed, err = worker.RunOnce(ctx) require.NoError(t, err) @@ -150,6 +160,7 @@ func TestChangeSyncWorkerBackfillsWhileRepoRemainsDirty(t *testing.T) { status, err = service.GetRepoChangeStatus(ctx, "acme", "widgets") require.NoError(t, err) require.Equal(t, 1, status.OpenPRCurrent) + require.Equal(t, 1, server.ListPullCount()) dirtyAt := time.Now().UTC() require.NoError(t, service.MarkRepositoryChangeDirty(ctx, state.RepositoryID, dirtyAt)) @@ -164,9 +175,28 @@ func TestChangeSyncWorkerBackfillsWhileRepoRemainsDirty(t *testing.T) { require.Equal(t, 2, status.OpenPRCurrent) require.Equal(t, 1, status.OpenPRMissing) require.Equal(t, firstFetchStarted, status.LastFetchStartedAt) + require.Equal(t, 1, server.ListPullCount()) +} + +type backfillGitHubServer struct { + *httptest.Server + mu sync.Mutex + listPullCount int +} + +func (s *backfillGitHubServer) recordListPull() { + s.mu.Lock() + defer s.mu.Unlock() + s.listPullCount++ +} + +func (s *backfillGitHubServer) ListPullCount() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.listPullCount } -func newBackfillGitHubServer(t *testing.T, fixture testfixtures.LocalPullRepo) *httptest.Server { +func newBackfillGitHubServer(t *testing.T, fixture testfixtures.LocalPullRepo) *backfillGitHubServer { t.Helper() repo := github.RepositoryResponse{ @@ -250,11 +280,13 @@ func newBackfillGitHubServer(t *testing.T, fixture testfixtures.LocalPullRepo) * } } + server := &backfillGitHubServer{} mux := http.NewServeMux() mux.HandleFunc("/repos/acme/widgets", func(w http.ResponseWriter, r *http.Request) { writeBackfillJSON(t, w, repo) }) mux.HandleFunc("/repos/acme/widgets/pulls", func(w http.ResponseWriter, r *http.Request) { + server.recordListPull() writeBackfillJSON(t, w, []github.PullRequestResponse{pulls[103], pulls[102], pulls[101]}) }) mux.HandleFunc("/repos/acme/widgets/issues/", func(w http.ResponseWriter, r *http.Request) { @@ -268,7 +300,8 @@ func newBackfillGitHubServer(t *testing.T, fixture testfixtures.LocalPullRepo) * writeBackfillJSON(t, w, pulls[number]) }) - return httptest.NewServer(mux) + server.Server = httptest.NewServer(mux) + return server } func tailNumber(path, prefix string) (int, bool) { diff --git a/migrations/000007_open_pr_inventory.down.sql b/migrations/000007_open_pr_inventory.down.sql new file mode 100644 index 0000000..3aaf19e --- /dev/null +++ b/migrations/000007_open_pr_inventory.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS repo_open_pull_inventories; diff --git a/migrations/000007_open_pr_inventory.up.sql b/migrations/000007_open_pr_inventory.up.sql new file mode 100644 index 0000000..b6a4d02 --- /dev/null +++ b/migrations/000007_open_pr_inventory.up.sql @@ -0,0 +1,19 @@ +CREATE TABLE IF NOT EXISTS repo_open_pull_inventories ( + id BIGSERIAL PRIMARY KEY, + repository_id BIGINT NOT NULL REFERENCES repositories(id) ON DELETE CASCADE, + pull_request_number INTEGER NOT NULL, + github_updated_at TIMESTAMPTZ NOT NULL, + head_sha TEXT NOT NULL DEFAULT '', + base_sha TEXT NOT NULL DEFAULT '', + base_ref TEXT NOT NULL DEFAULT '', + state TEXT NOT NULL DEFAULT '', + draft BOOLEAN NOT NULL DEFAULT FALSE, + freshness_state TEXT NOT NULL DEFAULT '', + last_seen_at TIMESTAMPTZ NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + CONSTRAINT idx_repo_open_pull_inventories_repo_pr UNIQUE (repository_id, pull_request_number) +); + +CREATE INDEX IF NOT EXISTS idx_repo_open_pull_inventories_repo_freshness_updated + ON repo_open_pull_inventories(repository_id, freshness_state, github_updated_at DESC, pull_request_number DESC);