Skip to content

Commit d7e5d0f

Browse files
committed
diagnosis job return create db/table sql
1 parent 6e7925b commit d7e5d0f

File tree

2 files changed

+81
-3
lines changed

2 files changed

+81
-3
lines changed

api/handler/v2/job.go

+68-3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package v2
33
import (
44
"archive/tar"
55
"compress/gzip"
6+
"context"
67
"encoding/json"
78
"errors"
89
"fmt"
@@ -20,6 +21,7 @@ import (
2021
"syscall"
2122
"time"
2223

24+
usql "github.com/actiontech/dtle/driver/mysql/sql"
2325
nomadApi "github.com/hashicorp/nomad/api"
2426
"github.com/hashicorp/nomad/nomad/structs"
2527
"github.com/labstack/echo/v4"
@@ -29,6 +31,7 @@ import (
2931
"github.com/actiontech/dtle/driver/common"
3032
"github.com/actiontech/dtle/driver/kafka"
3133
mysql "github.com/actiontech/dtle/driver/mysql"
34+
"github.com/actiontech/dtle/driver/mysql/mysqlconfig"
3235
"github.com/actiontech/dtle/g"
3336
)
3437

@@ -2162,6 +2165,14 @@ func DiagnosisJobAndTarFile(logger g.LoggerType, jobId, src, dst string) (err er
21622165
tw := tar.NewWriter(gw)
21632166
defer tw.Close()
21642167

2168+
// schema and table info
2169+
jobSchemasName := path.Join(src, "dtle-"+jobId+"-schemas")
2170+
err = createSchemaInfoFile(logger, jobId, jobSchemasName)
2171+
if err != nil {
2172+
return err
2173+
}
2174+
defer os.Remove(jobSchemasName)
2175+
21652176
// create job info file for tar
21662177
jobInfoFileName := path.Join(src, "dtle-"+jobId+"-info")
21672178
err = createJobInfoFile(logger, jobId, jobInfoFileName)
@@ -2194,7 +2205,7 @@ func DiagnosisJobAndTarFile(logger g.LoggerType, jobId, src, dst string) (err er
21942205
if err != nil {
21952206
return err
21962207
}
2197-
hdr.Name = strings.TrimPrefix(filePath, string(filepath.Separator))
2208+
hdr.Name = path.Join("dump", hdr.Name)
21982209

21992210
if err := tw.WriteHeader(hdr); err != nil {
22002211
return err
@@ -2262,7 +2273,7 @@ func recordProcessStatus(logger g.LoggerType, w io.Writer) error {
22622273
}
22632274

22642275
func createJobInfoFile(logger g.LoggerType, jobId, fileName string) error {
2265-
logger.Info("create job %v info file", jobId)
2276+
logger.Info("create job info file", "jobId", jobId)
22662277
f, err := os.OpenFile(fileName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
22672278
if nil != err {
22682279
return fmt.Errorf("capture job info error(%v)", err)
@@ -2273,7 +2284,6 @@ func createJobInfoFile(logger g.LoggerType, jobId, fileName string) error {
22732284
if err != nil {
22742285
return fmt.Errorf("get job detail failed")
22752286
}
2276-
22772287
jobByts, err := json.Marshal(jobInfo)
22782288
if err != nil {
22792289
return err
@@ -2291,3 +2301,58 @@ func createJobInfoFile(logger g.LoggerType, jobId, fileName string) error {
22912301
}
22922302
return nil
22932303
}
2304+
func createSchemaInfoFile(logger g.LoggerType, jobId, fileName string) error {
2305+
logger.Info("create job schema file", "jobId", jobId)
2306+
f, err := os.OpenFile(fileName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
2307+
if nil != err {
2308+
return fmt.Errorf("capture job info error(%v)", err)
2309+
}
2310+
defer f.Close()
2311+
var fileInfo []byte
2312+
// get job detail
2313+
resp, err := getJobDetail(logger, jobId, GetJobTypeFromJobId(jobId))
2314+
if err != nil {
2315+
return err
2316+
}
2317+
connectionConfig := mysqlconfig.ConnectionConfig{
2318+
Host: resp.BasicTaskProfile.ConnectionInfo.SrcDataBase.Host,
2319+
Port: resp.BasicTaskProfile.ConnectionInfo.SrcDataBase.Port,
2320+
Password: resp.BasicTaskProfile.ConnectionInfo.SrcDataBase.Password,
2321+
User: resp.BasicTaskProfile.ConnectionInfo.SrcDataBase.User,
2322+
}
2323+
db, err := usql.CreateDB(connectionConfig.GetDBUri())
2324+
if err != nil {
2325+
return err
2326+
}
2327+
2328+
// query create db/table sql
2329+
databases, err := usql.ShowDatabases(db)
2330+
if err != nil {
2331+
return err
2332+
}
2333+
for _, database := range databases {
2334+
fileInfo = append(fileInfo, []byte(" schemaName : "+database+" \n")...)
2335+
createSchemas, err := usql.ShowCreateSchema(context.TODO(), db, database)
2336+
if err != nil {
2337+
return err
2338+
}
2339+
fileInfo = append(fileInfo, []byte(createSchemas+" \n")...)
2340+
tables, err := usql.ShowTables(db, database, true)
2341+
if err != nil {
2342+
return err
2343+
}
2344+
for _, table := range tables {
2345+
fileInfo = append(fileInfo, []byte(" tableName : "+table.TableName+" \n")...)
2346+
createTable, err := usql.ShowCreateTable(context.TODO(), db, database, table.TableName)
2347+
if err != nil {
2348+
return err
2349+
}
2350+
fileInfo = append(fileInfo, []byte(createTable+" \n")...)
2351+
}
2352+
}
2353+
err = os.WriteFile(fileName, fileInfo, 0644)
2354+
if err != nil {
2355+
return fmt.Errorf("write info to file err: %v", err)
2356+
}
2357+
return nil
2358+
}

driver/mysql/sql/sqlutils.go

+13
Original file line numberDiff line numberDiff line change
@@ -426,3 +426,16 @@ func GetSetWaitTimeout(db QueryAble) (originVal int, err error) {
426426
}
427427
return originVal, nil
428428
}
429+
430+
func ShowCreateTable(ctx context.Context, db QueryAble, dbName, tbName string) (r string, err error) {
431+
query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", mysqlconfig.EscapeName(dbName), mysqlconfig.EscapeName(tbName))
432+
g.Logger.Debug("ShowCreateTable", "query", query)
433+
row := db.QueryRowContext(ctx, query)
434+
var dummy interface{}
435+
// | Table | Create Table |
436+
err = row.Scan(&dummy, &r)
437+
if err != nil {
438+
return "", errors.Wrap(err, "ShowCreateTable")
439+
}
440+
return r, nil
441+
}

0 commit comments

Comments
 (0)