From 837eadc345f6bb7dac791ff1887767fe67326736 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=82=9F=E5=86=A5?= Date: Thu, 15 Oct 2020 12:09:23 +0800 Subject: [PATCH 1/3] update sls ingress predict metric --- Makefile | 2 + deploy/deploy.yaml | 6 ++- examples/sls.yaml | 35 +++---------- main.go | 12 +++-- pkg/metrics/sls/ingress.go | 104 +++++++++++++++++++++++++++++++++++++ pkg/metrics/sls/sls.go | 27 +++++++++- 6 files changed, 149 insertions(+), 37 deletions(-) diff --git a/Makefile b/Makefile index 323b0d8a2..3928c556e 100644 --- a/Makefile +++ b/Makefile @@ -34,6 +34,8 @@ test-unit-cov: clean sanitize build docker-container: docker build --pull -t $(PREFIX)/alibaba-cloud-metrics-adapter-$(ARCH):$(VERSION)-$(GIT_COMMIT) -f deploy/Dockerfile . + # docker build --pull -t registry.cn-beijing.aliyuncs.com/sls-model/auto-hpa:0.0 -f deploy/Dockerfile . + #docker push registry.cn-beijing.aliyuncs.com/sls-model/auto-hpa:0.0 clean: rm -f alibaba-cloud-metrics-adapter diff --git a/deploy/deploy.yaml b/deploy/deploy.yaml index f10a5b03b..7fda5c281 100644 --- a/deploy/deploy.yaml +++ b/deploy/deploy.yaml @@ -17,10 +17,12 @@ spec: name: alibaba-cloud-metrics-adapter spec: serviceAccountName: admin + imagePullSecrets: + - name: regsecret containers: - name: alibaba-cloud-metrics-adapter - image: registry.cn-beijing.aliyuncs.com/acs/alibaba-cloud-metrics-adapter-amd64:v0.2.0-alpha-e8f8c17f - imagePullPolicy: IfNotPresent + image: registry.cn-beijing.aliyuncs.com/sls-model/auto-hpa:0.0 + imagePullPolicy: Always ports: - containerPort: 443 name: https diff --git a/examples/sls.yaml b/examples/sls.yaml index e669a2c57..8b924674d 100644 --- a/examples/sls.yaml +++ b/examples/sls.yaml @@ -64,36 +64,13 @@ spec: - type: External external: metric: - name: sls_ingress_qps + name: sls_ingress_predict selector: matchLabels: - #sls.project: "k8s-log-c550367cdf1e84dfabab013b277cc6bc2" - sls.project: "" - #sls.logstore: "nginx-ingress" - sls.logstore: "" - #sls.ingress.route: "default-nginx-80" - sls.ingress.route: "" + sls.project: "k8s-log-c0ae5df15fbf34b47ba3a9684e6ee2bee" + sls.logstore: "nginx-ingress" + sls.ml.logstore: "ml-service" + sls.ingress.route: "prod-serviceA-8080" target: type: AverageValue - averageValue: 10 - - type: External - external: - metric: - name: sls_ingress_latency_p9999 - selector: - matchLabels: - # default ingress log project is k8s-log-clusterId - # sls.project: "k8s-log-c550367cdf1e84dfabab013b277cc6bc2" - sls.project: "" - # default ingress logstre is nginx-ingress - # sls.logstore: "nginx-ingress" - sls.logstore: "" - # namespace-svc-port - # sls.ingress.route: "default-nginx-80" - sls.ingress.route: "" - # sls vpc endpoint, default true - # sls.internal.endpoint: true - target: - type: Value - # sls_ingress_latency_p9999 > 10ms - value: 10 + averageValue: 100 diff --git a/main.go b/main.go index 875aaff24..471fcbdab 100644 --- a/main.go +++ b/main.go @@ -81,7 +81,8 @@ func main() { // alibabaCloudProvider, err := makeAlibabaCloudProvider(cmd) if err != nil { - klog.Fatalf("unable to construct alibabCloudProvider: %v", err) + //klog.Fatalf("unable to construct alibabCloudProvider: %v", err) + klog.Errorf("unable to construct alibabCloudProvider: %v", err) } if alibabaCloudProvider != nil { @@ -90,12 +91,14 @@ func main() { // load the config if err := cmd.loadConfig(); err != nil { - klog.Fatalf("unable to load metrics discovery config: %v", err) + //klog.Fatalf("unable to load metrics discovery config: %v", err) + klog.Errorf("unable to load metrics discovery config: %v", err) } prometheusProvider, err := makePrometheusProvider(cmd, stopCh) if err != nil { - klog.Fatalf("unable to construct prometheusProvider: %v", err) + //klog.Fatalf("unable to construct prometheusProvider: %v", err) + klog.Errorf("unable to construct prometheusProvider: %v", err) } if prometheusProvider != nil { @@ -107,7 +110,8 @@ func main() { }() if err := cmd.Run(stopCh); err != nil { - klog.Fatalf("Failed to run alibaba-cloud-metrics-adapter: %v", err) + //klog.Fatalf("Failed to run alibaba-cloud-metrics-adapter: %v", err) + klog.Errorf("Failed to run alibaba-cloud-metrics-adapter: %v", err) } } diff --git a/pkg/metrics/sls/ingress.go b/pkg/metrics/sls/ingress.go index f8afabc70..7f2cbfa93 100644 --- a/pkg/metrics/sls/ingress.go +++ b/pkg/metrics/sls/ingress.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "strconv" + "strings" "time" "regexp" @@ -33,6 +34,7 @@ func (ss *SLSMetricSource) getSLSIngressQuery(params *SLSIngressParams, metricNa queryRealBegin := now - int64(params.DelaySeconds) - int64(params.Interval) end = now - int64(params.DelaySeconds) begin = now - 100 + if len(params.Route) == 0 { params.Route = "*" } @@ -57,6 +59,7 @@ func (ss *SLSMetricSource) getSLSIngressQuery(params *SLSIngressParams, metricNa query = "" } query = fmt.Sprintf("* and proxy_upstream_name: %s | SELECT %s as value from log WHERE __time__ >= %d and __time__ < %d", params.Route, queryItem, queryRealBegin, end) + return } @@ -124,3 +127,104 @@ func (ss *SLSMetricSource) getSLSIngressMetrics(namespace string, requirements l } return values, errors.New("Query sls timeout,it might because of too many logs.") } + +func (ss *SLSMetricSource) getSLSIngressPredictQuery(params *SLSIngressParams, metricName string) (begin int64, end int64, query string) { + now := time.Now().Unix() + + switch metricName { + case SLS_INGRESS_QPM: + end = (now / 60) * 60 + begin = (now / 60 - 10) * 60 + var querySearch string = fmt.Sprintf(`proxy_upstream_name: '%v' or proxy_alternative_upstream_name: '%v'`, params.Route, params.Route) + query = fmt.Sprintf(`%v | select array_agg(to_unixtime(time)) as ts, array_agg(num) as ds from ( select date_trunc('minute', __time__) as time, COUNT(*) as num from log group by time ) limit 1000`, querySearch) + case SLS_INGRESS_PREDICT: + end = now / 60 * 60 - 60 + begin = end - 60 + if len(params.Route) == 0 { + params.Route = "*" + } + var querySearch string = fmt.Sprintf(`__tag__:__model_type__: predict and '%v'`, params.Route) + var queryItem string = fmt.Sprintf("json_extract(result, '$.ts') as ts, json_extract(result, '$.ds') as ds") + var filterItem string = fmt.Sprintf("json_extract_scalar(meta, '$.logstore_name') = '%v' and json_extract_scalar(meta, '$.project_name') = '%v'", params.LogStore, params.Project) + query = fmt.Sprintf("%v | SELECT %v from log WHERE __time__ >= %d and __time__ < %d and %v limit 1000", querySearch, queryItem, begin, end, filterItem) + } + return +} + +func (ss *SLSMetricSource) getSLSIngressPredictMetrics(namespace string, requirements labels.Requirements, metricName string) (values []external_metrics.ExternalMetricValue, err error) { + params, err := getSLSParams(requirements) + if err != nil { + return values, fmt.Errorf("failed to get sls params,because of %v", err) + } + + client, err := ss.Client(params.Project, params.Internal) + if err != nil { + log.Errorf("Failed to create sls client, because of %v", err) + return values, err + } + + begin, end, query := ss.getSLSIngressPredictQuery(params, metricName) + fmt.Println("begin", begin, "end", end, "query", query) + + if query == "" { + log.Errorf("The metric you specific is not supported.") + return values, errors.New("MetricNotSupport") + } + + var queryRsp *slssdk.GetLogsResponse + for i := 0; i < params.MaxRetry; i++ { + queryRsp, err = client.GetLogs(params.Project, params.MlLogStore, "", begin, end, query, 100, 0, false) + + if err != nil || len(queryRsp.Logs) == 0 { + return values, err + } + + // if there are too many logs in sls, query may be not completed, we should retry + if !queryRsp.IsComplete() { + continue + } + + expectScore := -1.0 + for _, logCell := range queryRsp.Logs { + ds := logCell["ds"] + content := ds[1:len(ds)-1] + if len(content) <= 0 { + if expectScore <= 0.0 { + expectScore = 0.0 + } + } else { + dsString := strings.Split(ds[1:len(ds)-1], ",") + + // 计算未来几分钟的最大值,需要提前预留对应的资源 + maxValue := -1.0 + n := len(dsString) + for i := 0; i < n; i++ { + if v, e := strconv.ParseFloat(dsString[i], 64); e == nil { + if maxValue < v { + maxValue = v + } + } else { + err = e + } + } + if maxValue > expectScore { + expectScore = maxValue + } + } + } + + fmt.Println("Project", params.Project, "MlLogStore", params.MlLogStore, "expectScore", expectScore, "timestamp", metav1.Now()) + if err != nil { + return values, err + } + + values = append(values, external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(expectScore), resource.DecimalSI), + Timestamp: metav1.Now(), + }) + + return values, err + } + return values, errors.New("Query sls timeout,it might because of too many logs.") +} \ No newline at end of file diff --git a/pkg/metrics/sls/sls.go b/pkg/metrics/sls/sls.go index a008c2c47..01b236256 100644 --- a/pkg/metrics/sls/sls.go +++ b/pkg/metrics/sls/sls.go @@ -14,15 +14,18 @@ import ( const ( SLS_INGRESS_QPS = "sls_ingress_qps" + SLS_INGRESS_QPM = "sls_ingress_qpm" SLS_INGRESS_LATENCY_AVG = "sls_ingress_latency_avg" SLS_INGRESS_LATENCY_P50 = "sls_ingress_latency_p50" SLS_INGRESS_LATENCY_P95 = "sls_ingress_latency_p95" SLS_INGRESS_LATENCY_P9999 = "sls_ingress_latency_p9999" SLS_INGRESS_LATENCY_P99 = "sls_ingress_latency_p99" - SLS_INGRESS_INFLOW = "sls_ingress_inflow" // byte per second + SLS_INGRESS_INFLOW = "sls_ingress_inflow" // byte per second + SLS_INGRESS_PREDICT = "sls_ingress_predict" // when use this metric type, should start sls predict job SLS_LABEL_PROJECT = "sls.project" SLS_LABEL_LOGSTORE = "sls.logstore" + SLS_LABEL_ML_LOGSTORE = "sls.ml.logstore" SLS_LABEL_QUERY_INTERVAL = "sls.query.interval" // query interval seconds, min val 15s SLS_LABEL_QUERY_DELAY = "sls.query.delay" // query delay seconds, default 0s SLS_LABEL_QUERY_MAX_RETRY = "sls.query.max_retry" // max retry, default 5 @@ -66,10 +69,24 @@ func (ss *SLSMetricSource) GetExternalMetricInfoList() []p.ExternalMetricInfo { metricInfoList = append(metricInfoList, p.ExternalMetricInfo{ Metric: SLS_INGRESS_INFLOW, }) + // ingress predict + metricInfoList = append(metricInfoList, p.ExternalMetricInfo{ + Metric: SLS_INGRESS_PREDICT, + }) return metricInfoList } + func (ss *SLSMetricSource) GetExternalMetric(info p.ExternalMetricInfo, namespace string, requirements labels.Requirements) (values []external_metrics.ExternalMetricValue, err error) { - values, err = ss.getSLSIngressMetrics(namespace, requirements, info.Metric) + switch info.Metric { + case SLS_INGRESS_PREDICT, SLS_INGRESS_QPM: + values, err = ss.getSLSIngressPredictMetrics(namespace, requirements, info.Metric) + if len(values) == 0 { + log.Warning("Failed to Get Ingress Predict Value from SLS, because of model not finished ! We should use ingress qps instead !") + values, err = ss.getSLSIngressPredictMetrics(namespace, requirements, SLS_INGRESS_QPS) + } + default: + values, err = ss.getSLSIngressMetrics(namespace, requirements, info.Metric) + } if err != nil { log.Warningf("Failed to GetExternalMetric %s,because of %v", info.Metric, err) } @@ -106,6 +123,7 @@ func getSLSParams(requirements labels.Requirements) (params *SLSIngressParams, e Internal: true, }, } + log.Info(requirements) for _, r := range requirements { if len(r.Values().List()) <= 0 { @@ -120,6 +138,8 @@ func getSLSParams(requirements labels.Requirements) (params *SLSIngressParams, e params.Project = value case SLS_LABEL_LOGSTORE: params.LogStore = value + case SLS_LABEL_ML_LOGSTORE: + params.MlLogStore = value case SLS_LABEL_INGRESS_ROUTE: params.Route = value case SLS_LABEL_QUERY_INTERVAL: @@ -160,6 +180,8 @@ func getSLSParams(requirements labels.Requirements) (params *SLSIngressParams, e } + log.Info(params) + return params, nil } @@ -167,6 +189,7 @@ func getSLSParams(requirements labels.Requirements) (params *SLSIngressParams, e type SLSGlobalParams struct { Project string LogStore string + MlLogStore string Interval int DelaySeconds int MaxRetry int From 1886c3f77fc935a6eb118eae171285a991a3985e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=82=9F=E5=86=A5?= Date: Mon, 9 Nov 2020 10:07:20 +0800 Subject: [PATCH 2/3] use Ingress QPM and Regression Predict QPM to reduce risk --- examples/sls.yaml | 5 +++-- pkg/metrics/sls/ingress.go | 39 ++++++++++++++++++++++---------------- pkg/metrics/sls/sls.go | 33 +++++++++++++++++++++++++++++--- 3 files changed, 56 insertions(+), 21 deletions(-) diff --git a/examples/sls.yaml b/examples/sls.yaml index 8b924674d..c859c0f93 100644 --- a/examples/sls.yaml +++ b/examples/sls.yaml @@ -69,8 +69,9 @@ spec: matchLabels: sls.project: "k8s-log-c0ae5df15fbf34b47ba3a9684e6ee2bee" sls.logstore: "nginx-ingress" - sls.ml.logstore: "ml-service" + sls.ml.logstore: "ml-res" sls.ingress.route: "prod-serviceA-8080" + sls.query.delay: "20" target: type: AverageValue - averageValue: 100 + averageValue: 50 diff --git a/pkg/metrics/sls/ingress.go b/pkg/metrics/sls/ingress.go index 7f2cbfa93..1de4d83f4 100644 --- a/pkg/metrics/sls/ingress.go +++ b/pkg/metrics/sls/ingress.go @@ -3,6 +3,7 @@ package sls import ( "errors" "fmt" + "math" "strconv" "strings" "time" @@ -151,16 +152,16 @@ func (ss *SLSMetricSource) getSLSIngressPredictQuery(params *SLSIngressParams, m return } -func (ss *SLSMetricSource) getSLSIngressPredictMetrics(namespace string, requirements labels.Requirements, metricName string) (values []external_metrics.ExternalMetricValue, err error) { +func (ss *SLSMetricSource) getSLSIngressPredictMetrics(namespace string, requirements labels.Requirements, metricName string) (string, int64, error) { params, err := getSLSParams(requirements) if err != nil { - return values, fmt.Errorf("failed to get sls params,because of %v", err) + return metricName, -1, fmt.Errorf("failed to get sls params,because of %v", err) } client, err := ss.Client(params.Project, params.Internal) if err != nil { log.Errorf("Failed to create sls client, because of %v", err) - return values, err + return metricName, -1, err } begin, end, query := ss.getSLSIngressPredictQuery(params, metricName) @@ -168,15 +169,21 @@ func (ss *SLSMetricSource) getSLSIngressPredictMetrics(namespace string, require if query == "" { log.Errorf("The metric you specific is not supported.") - return values, errors.New("MetricNotSupport") + return metricName, -1, errors.New("MetricNotSupport") } var queryRsp *slssdk.GetLogsResponse for i := 0; i < params.MaxRetry; i++ { - queryRsp, err = client.GetLogs(params.Project, params.MlLogStore, "", begin, end, query, 100, 0, false) - + logstoreName := "" + switch metricName { + case SLS_INGRESS_QPM: + logstoreName = params.LogStore + case SLS_INGRESS_PREDICT: + logstoreName = params.MlLogStore + } + queryRsp, err = client.GetLogs(params.Project, logstoreName, "", begin, end, query, 100, 0, false) if err != nil || len(queryRsp.Logs) == 0 { - return values, err + return metricName, -1, err } // if there are too many logs in sls, query may be not completed, we should retry @@ -213,18 +220,18 @@ func (ss *SLSMetricSource) getSLSIngressPredictMetrics(namespace string, require } } - fmt.Println("Project", params.Project, "MlLogStore", params.MlLogStore, "expectScore", expectScore, "timestamp", metav1.Now()) + fmt.Println("Project", params.Project, "LogstoreName", logstoreName, "expectScore", expectScore, "timestamp", metav1.Now()) if err != nil { - return values, err + return metricName, -1, err } - values = append(values, external_metrics.ExternalMetricValue{ - MetricName: metricName, - Value: *resource.NewQuantity(int64(expectScore), resource.DecimalSI), - Timestamp: metav1.Now(), - }) + //values = append(values, external_metrics.ExternalMetricValue{ + // MetricName: metricName, + // Value: *resource.NewQuantity(int64(expectScore), resource.DecimalSI), + // Timestamp: metav1.Now(), + //}) - return values, err + return metricName, int64(math.Ceil(expectScore)), err } - return values, errors.New("Query sls timeout,it might because of too many logs.") + return metricName, -1, errors.New("Query sls timeout,it might because of too many logs.") } \ No newline at end of file diff --git a/pkg/metrics/sls/sls.go b/pkg/metrics/sls/sls.go index 01b236256..8c964ec84 100644 --- a/pkg/metrics/sls/sls.go +++ b/pkg/metrics/sls/sls.go @@ -6,6 +6,8 @@ import ( "github.com/AliyunContainerService/alibaba-cloud-metrics-adapter/pkg/utils" "github.com/aliyun/aliyun-log-go-sdk" p "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" log "k8s.io/klog" "k8s.io/metrics/pkg/apis/external_metrics" @@ -79,10 +81,35 @@ func (ss *SLSMetricSource) GetExternalMetricInfoList() []p.ExternalMetricInfo { func (ss *SLSMetricSource) GetExternalMetric(info p.ExternalMetricInfo, namespace string, requirements labels.Requirements) (values []external_metrics.ExternalMetricValue, err error) { switch info.Metric { case SLS_INGRESS_PREDICT, SLS_INGRESS_QPM: - values, err = ss.getSLSIngressPredictMetrics(namespace, requirements, info.Metric) - if len(values) == 0 { + var resErr error = nil + var predTargetVal int64 = -1 + if info.Metric == SLS_INGRESS_PREDICT { + _, predTargetVal, resErr = ss.getSLSIngressPredictMetrics(namespace, requirements, info.Metric) + } + _, baseVal, errBase := ss.getSLSIngressPredictMetrics(namespace, requirements, SLS_INGRESS_QPS) + + var targetVal int64 = -1 + if predTargetVal > baseVal { + targetVal = predTargetVal + } else { + targetVal = baseVal + } + + if targetVal <= 0 { log.Warning("Failed to Get Ingress Predict Value from SLS, because of model not finished ! We should use ingress qps instead !") - values, err = ss.getSLSIngressPredictMetrics(namespace, requirements, SLS_INGRESS_QPS) + if resErr != nil { + err = resErr + } + if errBase != nil { + err = errBase + } + } else { + values = append(values, external_metrics.ExternalMetricValue{ + MetricName: info.Metric, + Value: *resource.NewQuantity(targetVal, resource.DecimalSI), + Timestamp: metav1.Now(), + }) + err = nil } default: values, err = ss.getSLSIngressMetrics(namespace, requirements, info.Metric) From 914083cc1b5922729403ac0261054c8f2e364fd2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=82=9F=E5=86=A5?= Date: Sat, 21 Nov 2020 11:05:32 +0800 Subject: [PATCH 3/3] fix conflict about new ss.Client interface --- pkg/metrics/sls/ingress.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/metrics/sls/ingress.go b/pkg/metrics/sls/ingress.go index 99d28c6cd..17b42ec84 100644 --- a/pkg/metrics/sls/ingress.go +++ b/pkg/metrics/sls/ingress.go @@ -158,7 +158,7 @@ func (ss *SLSMetricSource) getSLSIngressPredictMetrics(namespace string, require return metricName, -1, fmt.Errorf("failed to get sls params,because of %v", err) } - client, err := ss.Client(params.Project, params.Internal) + client, err := ss.Client(params.Internal) if err != nil { log.Errorf("Failed to create sls client, because of %v", err) return metricName, -1, err