diff --git a/Makefile b/Makefile index 323b0d8a2..3928c556e 100644 --- a/Makefile +++ b/Makefile @@ -34,6 +34,8 @@ test-unit-cov: clean sanitize build docker-container: docker build --pull -t $(PREFIX)/alibaba-cloud-metrics-adapter-$(ARCH):$(VERSION)-$(GIT_COMMIT) -f deploy/Dockerfile . + # docker build --pull -t registry.cn-beijing.aliyuncs.com/sls-model/auto-hpa:0.0 -f deploy/Dockerfile . + #docker push registry.cn-beijing.aliyuncs.com/sls-model/auto-hpa:0.0 clean: rm -f alibaba-cloud-metrics-adapter diff --git a/deploy/deploy.yaml b/deploy/deploy.yaml index f10a5b03b..7fda5c281 100644 --- a/deploy/deploy.yaml +++ b/deploy/deploy.yaml @@ -17,10 +17,12 @@ spec: name: alibaba-cloud-metrics-adapter spec: serviceAccountName: admin + imagePullSecrets: + - name: regsecret containers: - name: alibaba-cloud-metrics-adapter - image: registry.cn-beijing.aliyuncs.com/acs/alibaba-cloud-metrics-adapter-amd64:v0.2.0-alpha-e8f8c17f - imagePullPolicy: IfNotPresent + image: registry.cn-beijing.aliyuncs.com/sls-model/auto-hpa:0.0 + imagePullPolicy: Always ports: - containerPort: 443 name: https diff --git a/examples/sls.yaml b/examples/sls.yaml index e669a2c57..c859c0f93 100644 --- a/examples/sls.yaml +++ b/examples/sls.yaml @@ -64,36 +64,14 @@ spec: - type: External external: metric: - name: sls_ingress_qps + name: sls_ingress_predict selector: matchLabels: - #sls.project: "k8s-log-c550367cdf1e84dfabab013b277cc6bc2" - sls.project: "" - #sls.logstore: "nginx-ingress" - sls.logstore: "" - #sls.ingress.route: "default-nginx-80" - sls.ingress.route: "" + sls.project: "k8s-log-c0ae5df15fbf34b47ba3a9684e6ee2bee" + sls.logstore: "nginx-ingress" + sls.ml.logstore: "ml-res" + sls.ingress.route: "prod-serviceA-8080" + sls.query.delay: "20" target: type: AverageValue - averageValue: 10 - - type: External - external: - metric: - name: sls_ingress_latency_p9999 - selector: - matchLabels: - # default ingress log project is k8s-log-clusterId - # sls.project: "k8s-log-c550367cdf1e84dfabab013b277cc6bc2" - sls.project: "" - # default ingress logstre is nginx-ingress - # sls.logstore: "nginx-ingress" - sls.logstore: "" - # namespace-svc-port - # sls.ingress.route: "default-nginx-80" - sls.ingress.route: "" - # sls vpc endpoint, default true - # sls.internal.endpoint: true - target: - type: Value - # sls_ingress_latency_p9999 > 10ms - value: 10 + averageValue: 50 diff --git a/main.go b/main.go index 875aaff24..471fcbdab 100644 --- a/main.go +++ b/main.go @@ -81,7 +81,8 @@ func main() { // alibabaCloudProvider, err := makeAlibabaCloudProvider(cmd) if err != nil { - klog.Fatalf("unable to construct alibabCloudProvider: %v", err) + //klog.Fatalf("unable to construct alibabCloudProvider: %v", err) + klog.Errorf("unable to construct alibabCloudProvider: %v", err) } if alibabaCloudProvider != nil { @@ -90,12 +91,14 @@ func main() { // load the config if err := cmd.loadConfig(); err != nil { - klog.Fatalf("unable to load metrics discovery config: %v", err) + //klog.Fatalf("unable to load metrics discovery config: %v", err) + klog.Errorf("unable to load metrics discovery config: %v", err) } prometheusProvider, err := makePrometheusProvider(cmd, stopCh) if err != nil { - klog.Fatalf("unable to construct prometheusProvider: %v", err) + //klog.Fatalf("unable to construct prometheusProvider: %v", err) + klog.Errorf("unable to construct prometheusProvider: %v", err) } if prometheusProvider != nil { @@ -107,7 +110,8 @@ func main() { }() if err := cmd.Run(stopCh); err != nil { - klog.Fatalf("Failed to run alibaba-cloud-metrics-adapter: %v", err) + //klog.Fatalf("Failed to run alibaba-cloud-metrics-adapter: %v", err) + klog.Errorf("Failed to run alibaba-cloud-metrics-adapter: %v", err) } } diff --git a/pkg/metrics/sls/ingress.go b/pkg/metrics/sls/ingress.go index bc5632956..17b42ec84 100644 --- a/pkg/metrics/sls/ingress.go +++ b/pkg/metrics/sls/ingress.go @@ -3,7 +3,9 @@ package sls import ( "errors" "fmt" + "math" "strconv" + "strings" "time" "regexp" @@ -33,6 +35,7 @@ func (ss *SLSMetricSource) getSLSIngressQuery(params *SLSIngressParams, metricNa queryRealBegin := now - int64(params.DelaySeconds) - int64(params.Interval) end = now - int64(params.DelaySeconds) begin = now - 100 + if len(params.Route) == 0 { params.Route = "*" } @@ -57,6 +60,7 @@ func (ss *SLSMetricSource) getSLSIngressQuery(params *SLSIngressParams, metricNa query = "" } query = fmt.Sprintf("* and proxy_upstream_name: %s | SELECT %s as value from log WHERE __time__ >= %d and __time__ < %d", params.Route, queryItem, queryRealBegin, end) + return } @@ -124,3 +128,110 @@ func (ss *SLSMetricSource) getSLSIngressMetrics(namespace string, requirements l } return values, errors.New("Query sls timeout,it might because of too many logs.") } + +func (ss *SLSMetricSource) getSLSIngressPredictQuery(params *SLSIngressParams, metricName string) (begin int64, end int64, query string) { + now := time.Now().Unix() + + switch metricName { + case SLS_INGRESS_QPM: + end = (now / 60) * 60 + begin = (now / 60 - 10) * 60 + var querySearch string = fmt.Sprintf(`proxy_upstream_name: '%v' or proxy_alternative_upstream_name: '%v'`, params.Route, params.Route) + query = fmt.Sprintf(`%v | select array_agg(to_unixtime(time)) as ts, array_agg(num) as ds from ( select date_trunc('minute', __time__) as time, COUNT(*) as num from log group by time ) limit 1000`, querySearch) + case SLS_INGRESS_PREDICT: + end = now / 60 * 60 - 60 + begin = end - 60 + if len(params.Route) == 0 { + params.Route = "*" + } + var querySearch string = fmt.Sprintf(`__tag__:__model_type__: predict and '%v'`, params.Route) + var queryItem string = fmt.Sprintf("json_extract(result, '$.ts') as ts, json_extract(result, '$.ds') as ds") + var filterItem string = fmt.Sprintf("json_extract_scalar(meta, '$.logstore_name') = '%v' and json_extract_scalar(meta, '$.project_name') = '%v'", params.LogStore, params.Project) + query = fmt.Sprintf("%v | SELECT %v from log WHERE __time__ >= %d and __time__ < %d and %v limit 1000", querySearch, queryItem, begin, end, filterItem) + } + return +} + +func (ss *SLSMetricSource) getSLSIngressPredictMetrics(namespace string, requirements labels.Requirements, metricName string) (string, int64, error) { + params, err := getSLSParams(requirements) + if err != nil { + return metricName, -1, fmt.Errorf("failed to get sls params,because of %v", err) + } + + client, err := ss.Client(params.Internal) + if err != nil { + log.Errorf("Failed to create sls client, because of %v", err) + return metricName, -1, err + } + + begin, end, query := ss.getSLSIngressPredictQuery(params, metricName) + fmt.Println("begin", begin, "end", end, "query", query) + + if query == "" { + log.Errorf("The metric you specific is not supported.") + return metricName, -1, errors.New("MetricNotSupport") + } + + var queryRsp *slssdk.GetLogsResponse + for i := 0; i < params.MaxRetry; i++ { + logstoreName := "" + switch metricName { + case SLS_INGRESS_QPM: + logstoreName = params.LogStore + case SLS_INGRESS_PREDICT: + logstoreName = params.MlLogStore + } + queryRsp, err = client.GetLogs(params.Project, logstoreName, "", begin, end, query, 100, 0, false) + if err != nil || len(queryRsp.Logs) == 0 { + return metricName, -1, err + } + + // if there are too many logs in sls, query may be not completed, we should retry + if !queryRsp.IsComplete() { + continue + } + + expectScore := -1.0 + for _, logCell := range queryRsp.Logs { + ds := logCell["ds"] + content := ds[1:len(ds)-1] + if len(content) <= 0 { + if expectScore <= 0.0 { + expectScore = 0.0 + } + } else { + dsString := strings.Split(ds[1:len(ds)-1], ",") + + // 计算未来几分钟的最大值,需要提前预留对应的资源 + maxValue := -1.0 + n := len(dsString) + for i := 0; i < n; i++ { + if v, e := strconv.ParseFloat(dsString[i], 64); e == nil { + if maxValue < v { + maxValue = v + } + } else { + err = e + } + } + if maxValue > expectScore { + expectScore = maxValue + } + } + } + + fmt.Println("Project", params.Project, "LogstoreName", logstoreName, "expectScore", expectScore, "timestamp", metav1.Now()) + if err != nil { + return metricName, -1, err + } + + //values = append(values, external_metrics.ExternalMetricValue{ + // MetricName: metricName, + // Value: *resource.NewQuantity(int64(expectScore), resource.DecimalSI), + // Timestamp: metav1.Now(), + //}) + + return metricName, int64(math.Ceil(expectScore)), err + } + return metricName, -1, errors.New("Query sls timeout,it might because of too many logs.") +} \ No newline at end of file diff --git a/pkg/metrics/sls/sls.go b/pkg/metrics/sls/sls.go index 94c0da93a..991c64d9a 100644 --- a/pkg/metrics/sls/sls.go +++ b/pkg/metrics/sls/sls.go @@ -6,6 +6,8 @@ import ( "github.com/AliyunContainerService/alibaba-cloud-metrics-adapter/pkg/utils" "github.com/aliyun/aliyun-log-go-sdk" p "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" log "k8s.io/klog" "k8s.io/metrics/pkg/apis/external_metrics" @@ -14,15 +16,18 @@ import ( const ( SLS_INGRESS_QPS = "sls_ingress_qps" + SLS_INGRESS_QPM = "sls_ingress_qpm" SLS_INGRESS_LATENCY_AVG = "sls_ingress_latency_avg" SLS_INGRESS_LATENCY_P50 = "sls_ingress_latency_p50" SLS_INGRESS_LATENCY_P95 = "sls_ingress_latency_p95" SLS_INGRESS_LATENCY_P9999 = "sls_ingress_latency_p9999" SLS_INGRESS_LATENCY_P99 = "sls_ingress_latency_p99" - SLS_INGRESS_INFLOW = "sls_ingress_inflow" // byte per second + SLS_INGRESS_INFLOW = "sls_ingress_inflow" // byte per second + SLS_INGRESS_PREDICT = "sls_ingress_predict" // when use this metric type, should start sls predict job SLS_LABEL_PROJECT = "sls.project" SLS_LABEL_LOGSTORE = "sls.logstore" + SLS_LABEL_ML_LOGSTORE = "sls.ml.logstore" SLS_LABEL_QUERY_INTERVAL = "sls.query.interval" // query interval seconds, min val 15s SLS_LABEL_QUERY_DELAY = "sls.query.delay" // query delay seconds, default 0s SLS_LABEL_QUERY_MAX_RETRY = "sls.query.max_retry" // max retry, default 5 @@ -66,10 +71,49 @@ func (ss *SLSMetricSource) GetExternalMetricInfoList() []p.ExternalMetricInfo { metricInfoList = append(metricInfoList, p.ExternalMetricInfo{ Metric: SLS_INGRESS_INFLOW, }) + // ingress predict + metricInfoList = append(metricInfoList, p.ExternalMetricInfo{ + Metric: SLS_INGRESS_PREDICT, + }) return metricInfoList } + func (ss *SLSMetricSource) GetExternalMetric(info p.ExternalMetricInfo, namespace string, requirements labels.Requirements) (values []external_metrics.ExternalMetricValue, err error) { - values, err = ss.getSLSIngressMetrics(namespace, requirements, info.Metric) + switch info.Metric { + case SLS_INGRESS_PREDICT, SLS_INGRESS_QPM: + var resErr error = nil + var predTargetVal int64 = -1 + if info.Metric == SLS_INGRESS_PREDICT { + _, predTargetVal, resErr = ss.getSLSIngressPredictMetrics(namespace, requirements, info.Metric) + } + _, baseVal, errBase := ss.getSLSIngressPredictMetrics(namespace, requirements, SLS_INGRESS_QPS) + + var targetVal int64 = -1 + if predTargetVal > baseVal { + targetVal = predTargetVal + } else { + targetVal = baseVal + } + + if targetVal <= 0 { + log.Warning("Failed to Get Ingress Predict Value from SLS, because of model not finished ! We should use ingress qps instead !") + if resErr != nil { + err = resErr + } + if errBase != nil { + err = errBase + } + } else { + values = append(values, external_metrics.ExternalMetricValue{ + MetricName: info.Metric, + Value: *resource.NewQuantity(targetVal, resource.DecimalSI), + Timestamp: metav1.Now(), + }) + err = nil + } + default: + values, err = ss.getSLSIngressMetrics(namespace, requirements, info.Metric) + } if err != nil { log.Warningf("Failed to GetExternalMetric %s,because of %v", info.Metric, err) } @@ -106,6 +150,7 @@ func getSLSParams(requirements labels.Requirements) (params *SLSIngressParams, e Internal: true, }, } + log.Info(requirements) for _, r := range requirements { if len(r.Values().List()) <= 0 { @@ -120,6 +165,8 @@ func getSLSParams(requirements labels.Requirements) (params *SLSIngressParams, e params.Project = value case SLS_LABEL_LOGSTORE: params.LogStore = value + case SLS_LABEL_ML_LOGSTORE: + params.MlLogStore = value case SLS_LABEL_INGRESS_ROUTE: params.Route = value case SLS_LABEL_QUERY_INTERVAL: @@ -160,6 +207,8 @@ func getSLSParams(requirements labels.Requirements) (params *SLSIngressParams, e } + log.Info(params) + return params, nil } @@ -167,6 +216,7 @@ func getSLSParams(requirements labels.Requirements) (params *SLSIngressParams, e type SLSGlobalParams struct { Project string LogStore string + MlLogStore string Interval int DelaySeconds int MaxRetry int