Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

提交pg库表元数据功能代码 #2086

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -62,6 +62,8 @@ require (
github.com/swaggo/swag v1.6.7
github.com/ungerik/go-dry v0.0.0-20210209114055-a3e162a9e62e
github.com/urfave/cli/v2 v2.8.1
github.com/vektah/gqlparser/v2 v2.5.1
github.com/lib/pq v1.10.2
golang.org/x/net v0.15.0
google.golang.org/grpc v1.50.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0
55 changes: 38 additions & 17 deletions sqle/server/auditplan/meta.go
Original file line number Diff line number Diff line change
@@ -19,26 +19,27 @@ type Meta struct {
}

const (
TypeDefault = "default"
TypeMySQLSlowLog = "mysql_slow_log"
TypeMySQLMybatis = "mysql_mybatis"
TypeMySQLSchemaMeta = "mysql_schema_meta"
TypeMySQLProcesslist = "mysql_processlist"
TypeAliRdsMySQLSlowLog = "ali_rds_mysql_slow_log"
TypeAliRdsMySQLAuditLog = "ali_rds_mysql_audit_log"
TypeHuaweiRdsMySQLSlowLog = "huawei_rds_mysql_slow_log"
TypeOracleTopSQL = "oracle_top_sql"
TypeTiDBAuditLog = "tidb_audit_log"
TypeAllAppExtract = "all_app_extract"
TypeBaiduRdsMySQLSlowLog = "baidu_rds_mysql_slow_log"
TypeSQLFile = "sql_file"
TypeDefault = "default"
TypeMySQLSlowLog = "mysql_slow_log"
TypeMySQLMybatis = "mysql_mybatis"
TypeMySQLSchemaMeta = "mysql_schema_meta"
TypeMySQLProcesslist = "mysql_processlist"
TypeAliRdsMySQLSlowLog = "ali_rds_mysql_slow_log"
TypeAliRdsMySQLAuditLog = "ali_rds_mysql_audit_log"
TypeOracleTopSQL = "oracle_top_sql"
TypeTiDBAuditLog = "tidb_audit_log"
TypeAllAppExtract = "all_app_extract"
TypeBaiduRdsMySQLSlowLog = "baidu_rds_mysql_slow_log"
TypeSQLFile = "sql_file"
TypePostgreSQLSchemaMeta = "Postgresql_schema_meta"
)

const (
InstanceTypeAll = ""
InstanceTypeMySQL = "MySQL"
InstanceTypeOracle = "Oracle"
InstanceTypeTiDB = "TiDB"
InstanceTypeAll = ""
InstanceTypeMySQL = "MySQL"
InstanceTypeOracle = "Oracle"
InstanceTypeTiDB = "TiDB"
InstanceTypePostgreSQL = "PostgreSQL"
)

const (
@@ -371,6 +372,26 @@ var Metas = []Meta{
InstanceType: InstanceTypeAll,
CreateTask: NewDefaultTask,
},
{
Type: TypePostgreSQLSchemaMeta,
Desc: "库表元数据",
InstanceType: InstanceTypePostgreSQL,
CreateTask: NewPostgreSQLSchemaMetaTask,
Params: []*params.Param{
{
Key: paramKeyCollectIntervalMinute,
Desc: "采集周期(分钟)",
Value: "60",
Type: params.ParamTypeInt,
},
{
Key: "collect_view",
Desc: "是否采集视图信息",
Value: "0",
Type: params.ParamTypeBool,
},
},
},
}

var MetaMap = map[string]Meta{}
443 changes: 443 additions & 0 deletions sqle/server/auditplan/task.go
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package auditplan

import "C"
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/actiontech/sqle/sqle/driver"
"net/http"
"strconv"
"strings"
@@ -1550,3 +1552,444 @@ func NewBaiduRdsMySQLSlowLogTask(entry *logrus.Entry, ap *model.AuditPlan) Task

return b
}

// PostgreSQLSchemaMetaTask : PostgreSQL库表元数据
type PostgreSQLSchemaMetaTask struct {
*sqlCollector
}

type PostgreSQLSchema struct {
schemaName string
}

type PostgreSQLTablesAndViews struct {
schemaName string
tableName string
tableType string
}

type PostgreSQLViewInfo struct {
schemaName string
tableName string
viewSql string
}

type PostgreSQLCreateTableSql struct {
createTableSql string
createIndexSql string
}

type PostgreSQLTableColumnInfo struct {
schemaName string
tableName string
columnSql string
}

type PostgreSQLConstraint struct {
schemaName string
tableName string
constraintDefinition string
}

type PostgreSQLIndex struct {
schemaName string
tableName string
indexName string
indexDefinition string
}

func NewPostgreSQLSchemaMetaTask(entry *logrus.Entry, ap *model.AuditPlan) Task {
sqlCollector := newSQLCollector(entry, ap)
task := &PostgreSQLSchemaMetaTask{
sqlCollector,
}
sqlCollector.do = task.collectorDo
return task
}

func (at *PostgreSQLSchemaMetaTask) collectorDo() {
if at.ap.InstanceName == "" {
at.logger.Warnf("instance is not configured")
return
}
if at.ap.InstanceDatabase == "" {
at.logger.Warnf("instance database is not configured")
return
}
instance, _, err := dms.GetInstanceInProjectByName(context.Background(), string(at.ap.ProjectId), at.ap.InstanceName)
if err != nil {
at.logger.Errorf("get pg instance in project by instanceName failed: %s\n", err)
return
}

pluginMgr := driver.GetPluginManager()
if !pluginMgr.IsOptionalModuleEnabled(instance.DbType, driverV2.OptionalModuleQuery) {
at.logger.Errorf("collect pg schema meta failed: %v",
driver.NewErrPluginAPINotImplement(driverV2.OptionalModuleQuery))
return
}
plugin, err := pluginMgr.OpenPlugin(at.logger, instance.DbType, &driverV2.Config{DSN: &driverV2.DSN{
Host: instance.Host,
Port: instance.Port,
User: instance.User,
Password: instance.Password,
AdditionalParams: instance.AdditionalParams,
DatabaseName: at.ap.InstanceDatabase,
}})
if err != nil {
at.logger.Errorf("connect to instance fail, error: %v", err)
return
}
defer pluginMgr.Stop()

// 获取所有的数据库及对应的schema
schemas, err := at.GetAllUserSchemas(plugin, at.ap.InstanceDatabase)
if err != nil {
at.logger.Errorf("get databases=%s schemas fail: %s", at.ap.InstanceDatabase, err)
return
}

// 获取表和视图
tablesAndViews, err := at.GetAllTablesAndViewsForPg(plugin, at.ap.InstanceDatabase)
if err != nil {
at.logger.Errorf("get all table and view fail, error: %s", err)
}

// 获取列信息
columnsInfo, err := at.GetAllColumnsInfoForPg(plugin, at.ap.InstanceDatabase)
if err != nil {
at.logger.Errorf("get all columns information fail, error: %s", err)
}

// 获取约束信息
constraints, err := at.GetAllConstraintsForPg(plugin)
if err != nil {
at.logger.Errorf("get all constraints fail, error: %s", err)
}

// 获取索引信息
indexes, err := at.GetAllIndexesForPg(plugin)
if err != nil {
at.logger.Errorf("get all indexes fail, error: %s", err)
}

// 是否收集视图sql
isCollectView := false
if at.ap.Params.GetParam("collect_view").Bool() {
isCollectView = true
}

var viewsSql []*PostgreSQLViewInfo
if isCollectView {
// 获取视图创建sql
viewsSql, err = at.GetAllViewsSqlForPg(plugin, at.ap.InstanceDatabase)
if err != nil {
at.logger.Errorf("get all views sql fail, error: %s", err)
}
}

auditPlanSQLV2Slice := make([]*model.AuditPlanSQLV2, 0)
for _, schema := range schemas {
currentSchema := schema.schemaName
createTableSqls := make([]string, 0)
createViewSqls := make([]string, 0)
for _, tableOrView := range tablesAndViews {
if tableOrView.schemaName != currentSchema {
continue
}
dataObjectType := ""
tableOrViewName := tableOrView.tableName
if tableOrView.tableType == "BASE TABLE" || tableOrView.tableType == "SYSTEM VIEW" { // 表
dataObjectType = "table"
} else if tableOrView.tableType == "VIEW" { // 视图
dataObjectType = "view"
}
if dataObjectType == "table" {
createDDL := createTableSqlForPg(currentSchema, tableOrViewName, columnsInfo, constraints, indexes)
tableDDl := fmt.Sprintf("%s;%s", createDDL.createTableSql, createDDL.createIndexSql)
createTableSqls = append(createTableSqls, tableDDl)
} else if dataObjectType == "view" {
if !isCollectView {
continue
}
// 视图sql
for _, view := range viewsSql {
schemaName, tableName := view.schemaName, view.tableName
if schemaName != schema.schemaName || tableName != tableOrViewName {
continue
}
createViewSqls = append(createViewSqls, view.viewSql)
}
}
}

if len(createTableSqls) > 0 {
auditPlanSQLV2Slice = append(auditPlanSQLV2Slice, convertRawSQLToModelSQLs(createTableSqls, currentSchema)...)
}

if len(createViewSqls) > 0 {
auditPlanSQLV2Slice = append(auditPlanSQLV2Slice, convertRawSQLToModelSQLs(createViewSqls, currentSchema)...)
}
}

if len(auditPlanSQLV2Slice) > 0 {
err = at.persist.OverrideAuditPlanSQLs(at.ap.ID, auditPlanSQLV2Slice)
if err != nil {
at.logger.Errorf("save table and view schema meta to storage fail, error: %s", err)
}
}
}

func createTableSqlForPg(schema, tableOrViewName string, columnsInfo []*PostgreSQLTableColumnInfo, constraints []*PostgreSQLConstraint, indexes []*PostgreSQLIndex) *PostgreSQLCreateTableSql {
tableDDl := fmt.Sprintf("CREATE TABLE %s.%s(", schema, tableOrViewName)
// 列信息
for _, columnInfo := range columnsInfo {
schemaName, tableName := columnInfo.schemaName, columnInfo.tableName
if schemaName != schema || tableName != tableOrViewName {
continue
}
tableDDl += columnInfo.columnSql
}
// 约束信息
for _, constraintInfo := range constraints {
schemaName, tableName := constraintInfo.schemaName, constraintInfo.tableName
if schemaName != schema || tableName != tableOrViewName {
continue
}
tableDDl += fmt.Sprintf(",%s\n", constraintInfo.constraintDefinition)
}
tableDDl += ")"
indexDDl := ""
// 索引信息
for _, indexInfo := range indexes {
schemaName, tableName := indexInfo.schemaName, indexInfo.tableName
if schemaName != schema || tableName != tableOrViewName {
continue
}
indexDDl += fmt.Sprintf("%s;\n", indexInfo.indexDefinition)
}
return &PostgreSQLCreateTableSql{
createTableSql: tableDDl,
createIndexSql: indexDDl,
}
}

func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin, database string) ([]*PostgreSQLSchema, error) {
result := make([]*PostgreSQLSchema, 0)
querySql := fmt.Sprintf(`
SELECT schema_name FROM information_schema.schemata WHERE catalog_name = '%s'
AND schema_name NOT LIKE 'pg_%%' AND schema_name != 'information_schema' ORDER BY schema_name`, database)
res, err := at.GetResult(plugin, querySql)
if err != nil {
return result, err
}
for _, value := range res {
if len(value) == 0 {
continue
}
result = append(result, &PostgreSQLSchema{value[0]})
}
if len(result) == 0 {
return result, fmt.Errorf("database=%s has no schema", database)
}
return result, nil
}

func (at *PostgreSQLSchemaMetaTask) GetAllTablesAndViewsForPg(plugin driver.Plugin, database string) ([]*PostgreSQLTablesAndViews, error) {
querySql := fmt.Sprintf(`select table_schema, table_name, table_type from information_schema.tables
where table_catalog = '%s' and table_schema not like 'pg_%%' AND table_schema != 'information_schema'
ORDER BY table_name`, database)
result := make([]*PostgreSQLTablesAndViews, 0)
ret, err := at.GetResult(plugin, querySql)
if err != nil {
return result, err
}
for _, value := range ret {
if len(value) < 3 {
return result, fmt.Errorf("get tables and views error, column length is not three")
}
result = append(result, &PostgreSQLTablesAndViews{
schemaName: value[0],
tableName: value[1],
tableType: value[2],
})
}
return result, nil
}

func (at *PostgreSQLSchemaMetaTask) GetAllColumnsInfoForPg(plugin driver.Plugin, database string) ([]*PostgreSQLTableColumnInfo, error) {
columns := fmt.Sprintf(`select table_schema, table_name,
string_agg(
concat(
column_name, ' ',
case
when lower(data_type) in ('char', 'varchar', 'character', 'character varying') then concat(data_type, '(', coalesce(character_maximum_length, 0), ')')
when lower(data_type) in ('numeric', 'decimal') then concat(data_type, '(', coalesce(numeric_precision, 0), ',', coalesce(numeric_scale, 0), ')')
when lower(data_type) in ('integer', 'smallint', 'bigint', 'text') then data_type
else udt_name
end,
case
when column_default != '' then concat(' default ', column_default) else ''
end,
case
when is_nullable = 'no' then ' not null' else ''
end
), ', ' order by ordinal_position
) as columns_sql
from information_schema.columns
where table_catalog = '%s' and table_schema not like 'pg_%%' and table_schema != 'information_schema'
group by table_schema, table_name`, database)
result := make([]*PostgreSQLTableColumnInfo, 0)
ret, err := at.GetResult(plugin, columns)
if err != nil {
return result, err
}
for _, value := range ret {
if len(value) < 3 {
return result, fmt.Errorf("get column info error, column length is not three")
}
result = append(result, &PostgreSQLTableColumnInfo{
schemaName: value[0],
tableName: value[1],
columnSql: value[2],
})
}
return result, nil
}

func (at *PostgreSQLSchemaMetaTask) GetAllConstraintsForPg(plugin driver.Plugin) ([]*PostgreSQLConstraint, error) {
querySql := `select
n.nspname as schema_name,
c.relname as table_name,
concat ( 'constraint ', r.conname, ' ', pg_catalog.pg_get_constraintdef ( r.oid, true ) ) as constraint_definition
from
pg_catalog.pg_constraint r
join pg_catalog.pg_class c on c.oid = r.conrelid
join pg_catalog.pg_namespace n on n.oid = c.relnamespace
where
n.nspname not like'pg_%'
and n.nspname != 'information_schema'`
result := make([]*PostgreSQLConstraint, 0)
ret, err := at.GetResult(plugin, querySql)
if err != nil {
return result, err
}
for _, value := range ret {
if len(value) < 3 {
return result, fmt.Errorf("get constraint error, column length is not three")
}
result = append(result, &PostgreSQLConstraint{
schemaName: value[0],
tableName: value[1],
constraintDefinition: value[2],
})
}
return result, nil
}

func (at *PostgreSQLSchemaMetaTask) GetAllIndexesForPg(plugin driver.Plugin) ([]*PostgreSQLIndex, error) {
querySql := `select schemaname, tablename, indexname, indexdef from pg_indexes
where schemaname not like 'pg_%' and schemaname != 'information_schema'`
result := make([]*PostgreSQLIndex, 0)
ret, err := at.GetResult(plugin, querySql)
if err != nil {
return result, err
}
for _, value := range ret {
if len(value) < 4 {
return result, fmt.Errorf("get index error, column length is not four")
}
result = append(result, &PostgreSQLIndex{
schemaName: value[0],
tableName: value[1],
indexName: value[2],
indexDefinition: value[3],
})
}
return result, nil
}

func (at *PostgreSQLSchemaMetaTask) GetAllViewsSqlForPg(plugin driver.Plugin, database string) ([]*PostgreSQLViewInfo, error) {
querySql := fmt.Sprintf(`select table_schema, table_name,
concat('create or replace view ', table_schema, '.', table_name, ' as ', view_definition) as create_view_statement
from information_schema.views
where table_catalog = '%s'
and table_schema not like 'pg_%%'
and table_schema != 'information_schema'
order by table_name`, database)
result := make([]*PostgreSQLViewInfo, 0)
ret, err := at.GetResult(plugin, querySql)
if err != nil {
return result, err
}
for _, value := range ret {
if len(value) < 3 {
return result, fmt.Errorf("get view sql error, column length is not three")
}
result = append(result, &PostgreSQLViewInfo{
schemaName: value[0],
tableName: value[1],
viewSql: value[2],
})
}
return result, nil
}

func (at *PostgreSQLSchemaMetaTask) GetResult(plugin driver.Plugin, sql string) ([][]string, error) {
var ret [][]string
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2)
defer cancel()

result, err := plugin.Query(ctx, sql, &driverV2.QueryConf{TimeOutSecond: 120})
if err != nil {
at.logger.Errorf("plugin.Query() failed:%s\n", err)
return nil, err
}
rows := result.Rows
for _, row := range rows {
values := row.Values
if len(values) == 0 {
continue
}
var valueArr []string
for _, value := range values {
valueArr = append(valueArr, value.Value)
}
ret = append(ret, valueArr)
}
return ret, nil
}

func (at *PostgreSQLSchemaMetaTask) Audit() (*AuditResultResp, error) {
task, err := getTaskWithInstanceByAuditPlan(at.ap, at.persist)
if err != nil {
return nil, err
}
return at.baseTask.audit(task)
}

func (at *PostgreSQLSchemaMetaTask) GetSQLs(args map[string]interface{}) ([]Head, []map[string] /* head name */ string, uint64, error) {
auditPlanSQLs, count, err := at.persist.GetAuditPlanSQLsByReq(args)
if err != nil {
return nil, nil, count, err
}
head, rows := buildPostgreSQLSchemaMetaSQLsResult(auditPlanSQLs)
return head, rows, count, nil
}

func buildPostgreSQLSchemaMetaSQLsResult(auditPlanSQLs []*model.AuditPlanSQLListDetail) ([]Head, []map[string] /* head name */ string) {
head := []Head{
{
Name: "sql",
Desc: "SQL语句",
Type: "sql",
},
}
rows := make([]map[string]string, 0, len(auditPlanSQLs))
for _, sql := range auditPlanSQLs {
rows = append(rows, map[string]string{
"sql": sql.SQLContent,
})
}
return head, rows
}