Skip to content

Commit

Permalink
Merge pull request #471 from mlycore/ref-pretty
Browse files Browse the repository at this point in the history
  • Loading branch information
tristaZero authored Nov 20, 2023
2 parents bb1f000 + 634ca60 commit 23554ab
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 36 deletions.
70 changes: 62 additions & 8 deletions pitr/cli/internal/cmd/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ package cmd
import (
"fmt"
"os"
"time"

"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg"
"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/xerr"
"github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/logging"
"github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/prettyoutput"
"github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/promptutil"
"github.com/jedib0t/go-pretty/v6/progress"
"github.com/jedib0t/go-pretty/v6/table"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -152,7 +152,10 @@ func _execDelete(lsBackup *model.LsBackup) error {
return nil
}

pw := prettyoutput.NewPW(totalNum)
pw := prettyoutput.NewProgressPrinter(prettyoutput.ProgressPrintOption{
NumTrackersExpected: totalNum,
})

go pw.Render()

for _, storagenode := range lsBackup.SsBackup.StorageNodes {
Expand All @@ -162,15 +165,26 @@ func _execDelete(lsBackup *model.LsBackup) error {
continue
} else {
as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort))
go doDelete(as, sn, dn, resultCh, pw)
backupInfo := &model.BackupInfo{
ID: dn.BackupID,
}
task := &deletetask{
As: as,
Sn: sn,
Dn: dn,
ResultCh: resultCh,
Backup: backupInfo,
}

tracker := &progress.Tracker{
Message: fmt.Sprintf("Deleting backup files # %s:%d", sn.IP, sn.Port),
}
pw.AppendTracker(tracker)
go pw.UpdateProgress(tracker, task.checkProgress)
}
}

time.Sleep(time.Millisecond * 100)

for pw.IsRenderInProgress() {
time.Sleep(time.Millisecond * 100)
}
pw.BlockedRendered()

close(resultCh)

Expand Down Expand Up @@ -200,3 +214,43 @@ func _execDelete(lsBackup *model.LsBackup) error {

return nil
}

type deletetask struct {
As pkg.IAgentServer
Sn *model.StorageNode
Dn *model.DataNode
ResultCh chan *model.DeleteBackupResult
Backup *model.BackupInfo
}

func (t *deletetask) checkProgress() (bool, error) {
var (
err error
)
in := &model.DeleteBackupIn{
DBPort: t.Sn.Port,
DBName: t.Sn.Database,
Username: t.Sn.Username,
Password: t.Sn.Password,
BackupID: t.Backup.ID,
DnBackupPath: BackupPath,
Instance: defaultInstance,
}

r := &model.DeleteBackupResult{
IP: t.Sn.IP,
Port: t.Sn.Port,
}

if err = t.As.DeleteBackup(in); err != nil {
r.Status = model.SsBackupStatusFailed
r.Msg = err.Error()
t.ResultCh <- r
return false, err
}

r.Status = model.SsBackupStatusCompleted
t.ResultCh <- r

return true, nil
}
78 changes: 50 additions & 28 deletions pitr/cli/internal/cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"os"
"strings"
"time"

"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg"
"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
Expand Down Expand Up @@ -199,20 +198,35 @@ func execRestore(lsBackup *model.LsBackup) error {
return xerr.NewCliErr(fmt.Sprintf("no storage node found, please check backup record [%s].", lsBackup.Info.ID))
}

pw := prettyoutput.NewPW(totalNum)
pw := prettyoutput.NewProgressPrinter(prettyoutput.ProgressPrintOption{
NumTrackersExpected: totalNum,
})

go pw.Render()
for i := 0; i < totalNum; i++ {
sn := lsBackup.SsBackup.StorageNodes[i]
dn := dataNodeMap[sn.IP]
as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort))
go doRestore(as, sn, dn.BackupID, resultCh, pw)
}
backupInfo := &model.BackupInfo{
ID: dn.BackupID,
}
task := &restoretask{
As: as,
Sn: sn,
Dn: dn,
ResultCh: resultCh,
Backup: backupInfo,
}

time.Sleep(time.Millisecond * 100)
for pw.IsRenderInProgress() {
time.Sleep(time.Millisecond * 100)
tracker := &progress.Tracker{
Message: fmt.Sprintf("Restore data to openGauss: %s", sn.IP),
}
pw.AppendTracker(tracker)
go pw.UpdateProgress(tracker, task.checkProgress)
}

pw.BlockedRendered()

close(resultCh)

for result := range resultCh {
Expand Down Expand Up @@ -242,34 +256,42 @@ func execRestore(lsBackup *model.LsBackup) error {
return nil
}

func doRestore(as pkg.IAgentServer, sn *model.StorageNode, backupID string, resultCh chan *model.RestoreResult, pw progress.Writer) {
tracker := &progress.Tracker{Message: fmt.Sprintf("Restore data to openGauss: %s", sn.IP)}
result := ""
type restoretask struct {
As pkg.IAgentServer
Sn *model.StorageNode
Dn *model.DataNode
ResultCh chan *model.RestoreResult
Backup *model.BackupInfo
}

func (t *restoretask) checkProgress() (bool, error) {
var (
err error
)
in := &model.RestoreIn{
DBPort: sn.Port,
DBName: sn.Database,
Username: sn.Username,
Password: sn.Password,
Instance: defaultInstance,
DBPort: t.Sn.Port,
DBName: t.Sn.Database,
Username: t.Sn.Username,
Password: t.Sn.Password,
DnBackupID: t.Backup.ID,
DnBackupPath: BackupPath,
DnBackupID: backupID,
Instance: defaultInstance,
DnThreadsNum: ThreadsNum,
}

pw.AppendTracker(tracker)

if err := as.Restore(in); err != nil {
tracker.MarkAsErrored()
result = "Failed"
} else {
tracker.MarkAsDone()
result = "Completed"
r := &model.RestoreResult{
IP: t.Sn.IP,
Port: t.Sn.Port,
}

resultCh <- &model.RestoreResult{
IP: sn.IP,
Port: sn.Port,
Status: result,
if err = t.As.Restore(in); err != nil {
r.Status = "Failed"
t.ResultCh <- r
return false, err
}

r.Status = "Completed"
t.ResultCh <- r

return true, nil
}

0 comments on commit 23554ab

Please sign in to comment.