diff --git a/pitr/cli/internal/cmd/backup.go b/pitr/cli/internal/cmd/backup.go index 3d8eb9e1..96bc2ca1 100644 --- a/pitr/cli/internal/cmd/backup.go +++ b/pitr/cli/internal/cmd/backup.go @@ -345,20 +345,35 @@ func checkBackupStatus(lsBackup *model.LsBackup) model.BackupStatus { dataNodeMap[dn.IP] = dn } - pw := prettyoutput.NewPW(totalNum) + pw := prettyoutput.NewProgressPrinter(prettyoutput.ProgressPrintOption{ + NumTrackersExpected: totalNum, + }) + go pw.Render() - for idx := 0; idx < totalNum; idx++ { - sn := lsBackup.SsBackup.StorageNodes[idx] + + for i := 0; i < totalNum; i++ { + sn := lsBackup.SsBackup.StorageNodes[i] as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort)) dn := dataNodeMap[sn.IP] - go checkStatus(as, sn, dn, dnCh, pw) + backupInfo := &model.BackupInfo{} + task := &backuptask{ + As: as, + Sn: sn, + Dn: dn, + DnCh: dnCh, + Backup: backupInfo, + retries: defaultShowDetailRetryTimes, + } + tracker := &progress.Tracker{ + Message: fmt.Sprintf("Checking backup status # %s:%d", sn.IP, sn.Port), + Total: 0, + Units: progress.UnitsDefault, + } + pw.AppendTracker(tracker) + go pw.UpdateProgress(tracker, task.checkProgress) } - // wait for all data node backup finished - time.Sleep(time.Millisecond * 100) - for pw.IsRenderInProgress() { - time.Sleep(time.Millisecond * 100) - } + pw.BlockedRendered() close(dnCh) @@ -388,62 +403,48 @@ func checkBackupStatus(lsBackup *model.LsBackup) model.BackupStatus { return backupFinalStatus } -func checkStatus(as pkg.IAgentServer, sn *model.StorageNode, dn *model.DataNode, dnCh chan *model.DataNode, pw progress.Writer) { - var ( - // mark check status is done, time ticker should break. - done = make(chan struct{}) - // time ticker, try to doCheck request every 2 seconds. - ticker = time.Tick(time.Second * 2) - // progress bar. - tracker = progress.Tracker{Message: fmt.Sprintf("Checking backup status # %s:%d", sn.IP, sn.Port), Total: 0, Units: progress.UnitsDefault} - ) - - pw.AppendTracker(&tracker) +type backuptask struct { + As pkg.IAgentServer + Sn *model.StorageNode + Dn *model.DataNode + DnCh chan *model.DataNode - for !tracker.IsDone() { - select { - case <-done: - return - case <-ticker: - status, err := doCheck(as, sn, dn.BackupID, defaultShowDetailRetryTimes) - if err != nil { - tracker.MarkAsErrored() - dn.Status = status - dn.EndTime = timeutil.Now().String() - dnCh <- dn - done <- struct{}{} - } - if status == model.SsBackupStatusCompleted || status == model.SsBackupStatusFailed { - tracker.MarkAsDone() - dn.Status = status - dn.EndTime = timeutil.Now().String() - dnCh <- dn - done <- struct{}{} - } - } - } + Backup *model.BackupInfo + retries int } -func doCheck(as pkg.IAgentServer, sn *model.StorageNode, backupID string, retries int) (status model.BackupStatus, err error) { +func (t *backuptask) checkProgress() (bool, error) { + var err error in := &model.ShowDetailIn{ - DBPort: sn.Port, - DBName: sn.Database, - Username: sn.Username, - Password: sn.Password, - DnBackupID: backupID, + DBPort: t.Sn.Port, + DBName: t.Sn.Database, + Username: t.Sn.Username, + Password: t.Sn.Password, + DnBackupID: t.Dn.BackupID, DnBackupPath: BackupPath, Instance: defaultInstance, } - backupInfo, err := as.ShowDetail(in) + + t.Backup, err = t.As.ShowDetail(in) if err != nil { - if retries == 0 { - return model.SsBackupStatusCheckError, err + if t.retries == 0 { + t.Dn.Status = model.SsBackupStatusCheckError + t.DnCh <- t.Dn + return false, err } time.Sleep(time.Second * 1) - return doCheck(as, sn, backupID, retries-1) + t.retries-- + return t.checkProgress() } - return backupInfo.Status, nil + t.Dn.Status = t.Backup.Status + t.Dn.EndTime = timeutil.Now().String() + + if t.Backup.Status == model.SsBackupStatusCompleted || t.Backup.Status == model.SsBackupStatusFailed { + t.DnCh <- t.Dn + return true, nil + } + return false, nil } type deleteMode int diff --git a/pitr/cli/internal/cmd/backup_test.go b/pitr/cli/internal/cmd/backup_test.go index 65ab690a..ee72697a 100644 --- a/pitr/cli/internal/cmd/backup_test.go +++ b/pitr/cli/internal/cmd/backup_test.go @@ -41,42 +41,63 @@ var _ = Describe("Backup", func() { sn = &model.StorageNode{ IP: "127.0.0.1", } + + task = &backuptask{} ) BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) as = mock_pkg.NewMockIAgentServer(ctrl) + task = &backuptask{ + As: as, + Sn: sn, + Dn: &model.DataNode{}, + DnCh: make(chan *model.DataNode, 2), + + Backup: &model.BackupInfo{}, + } }) AfterEach(func() { ctrl.Finish() }) - It("agent server return err", func() { - as.EXPECT().ShowDetail(gomock.Any()).Return(nil, errors.New("timeout")) - status, err := doCheck(as, sn, "", 0) + It("mock agent server return err", func() { + as.EXPECT().ShowDetail(gomock.Any()).Return(nil, errors.New("mock agent timeout")) + + finished, err := task.checkProgress() Expect(err).To(HaveOccurred()) - Expect(status).To(Equal(model.SsBackupStatusCheckError)) + Expect(finished).To(BeFalse()) + Expect(task.Dn.Status).To(Equal(model.SsBackupStatusCheckError)) }) It("mock agent server and return failed status", func() { as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: model.SsBackupStatusFailed}, nil) - status, err := doCheck(as, sn, "", 0) + finished, err := task.checkProgress() Expect(err).ToNot(HaveOccurred()) - Expect(status).To(Equal(model.SsBackupStatusFailed)) + Expect(finished).To(BeTrue()) + Expect(task.Backup.Status).To(Equal(model.SsBackupStatusFailed)) }) It("mock agent server and return completed status", func() { as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: model.SsBackupStatusCompleted}, nil) - status, err := doCheck(as, sn, "", 0) + + finished, err := task.checkProgress() Expect(err).ToNot(HaveOccurred()) - Expect(status).To(Equal(model.SsBackupStatusCompleted)) + Expect(finished).To(BeTrue()) + Expect(task.Backup.Status).To(Equal(model.SsBackupStatusCompleted)) }) It("mock agent server and return check err first time and then success", func() { - as.EXPECT().ShowDetail(gomock.Any()).Return(nil, errors.New("timeout")) + as.EXPECT().ShowDetail(gomock.Any()).Return(nil, errors.New("mock agent timeout")) + finished, err := task.checkProgress() + Expect(err).To(HaveOccurred()) + Expect(finished).To(BeFalse()) + Expect(task.Dn.Status).To(Equal(model.SsBackupStatusCheckError)) + as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: model.SsBackupStatusCompleted}, nil) - status, err := doCheck(as, sn, "", 1) + finished, err = task.checkProgress() Expect(err).ToNot(HaveOccurred()) - Expect(status).To(Equal(model.SsBackupStatusCompleted)) + Expect(finished).To(BeTrue()) + Expect(task.Backup.Status).To(Equal(model.SsBackupStatusCompleted)) }) }) diff --git a/pitr/cli/pkg/prettyoutput/progress.go b/pitr/cli/pkg/prettyoutput/progress.go index e70552ab..ae4ebb4f 100644 --- a/pitr/cli/pkg/prettyoutput/progress.go +++ b/pitr/cli/pkg/prettyoutput/progress.go @@ -18,18 +18,83 @@ package prettyoutput import ( + "time" + "github.com/jedib0t/go-pretty/v6/progress" ) func NewPW(totalNum int) progress.Writer { pw := progress.NewWriter() + pw.SetTrackerLength(25) pw.SetAutoStop(true) pw.SetNumTrackersExpected(totalNum) pw.SetSortBy(progress.SortByPercentDsc) + pw.SetTrackerPosition(progress.PositionRight) + style := progress.StyleDefault style.Options.PercentIndeterminate = "running" pw.SetStyle(style) - pw.SetTrackerPosition(progress.PositionRight) + return pw } + +type ProgressPrintOption struct { + NumTrackersExpected int +} + +type ProgressPrinter struct { + progress.Writer +} + +func NewProgressPrinter(opt ProgressPrintOption) *ProgressPrinter { + p := &ProgressPrinter{ + Writer: progress.NewWriter(), + } + + // passed printer options + p.SetNumTrackersExpected(opt.NumTrackersExpected) + + // default printer options + p.SetTrackerLength(25) + p.SetAutoStop(true) + p.SetSortBy(progress.SortByPercentDsc) + p.SetTrackerPosition(progress.PositionRight) + style := progress.StyleDefault + style.Options.PercentIndeterminate = "running" + p.SetStyle(style) + + return p +} + +func (p *ProgressPrinter) BlockedRendered() { + time.Sleep(time.Millisecond * 100) + for p.IsRenderInProgress() { + time.Sleep(time.Millisecond * 100) + } +} + +func (p *ProgressPrinter) UpdateProgress(tracker *progress.Tracker, updateF func() (bool, error)) { + var ( + done = make(chan struct{}) + ticker = time.NewTicker(time.Second * 2) + ) + + for !tracker.IsDone() { + select { + case <-done: + return + case <-ticker.C: + finished, err := updateF() + if err != nil { + tracker.MarkAsErrored() + done <- struct{}{} + } + + if finished { + tracker.MarkAsDone() + done <- struct{}{} + } + } + } +}