Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update sls ingress predict metric #33

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -34,6 +34,8 @@ test-unit-cov: clean sanitize build

docker-container:
docker build --pull -t $(PREFIX)/alibaba-cloud-metrics-adapter-$(ARCH):$(VERSION)-$(GIT_COMMIT) -f deploy/Dockerfile .
# docker build --pull -t registry.cn-beijing.aliyuncs.com/sls-model/auto-hpa:0.0 -f deploy/Dockerfile .
#docker push registry.cn-beijing.aliyuncs.com/sls-model/auto-hpa:0.0

clean:
rm -f alibaba-cloud-metrics-adapter
6 changes: 4 additions & 2 deletions deploy/deploy.yaml
Original file line number Diff line number Diff line change
@@ -17,10 +17,12 @@ spec:
name: alibaba-cloud-metrics-adapter
spec:
serviceAccountName: admin
imagePullSecrets:
- name: regsecret
containers:
- name: alibaba-cloud-metrics-adapter
image: registry.cn-beijing.aliyuncs.com/acs/alibaba-cloud-metrics-adapter-amd64:v0.2.0-alpha-e8f8c17f
imagePullPolicy: IfNotPresent
image: registry.cn-beijing.aliyuncs.com/sls-model/auto-hpa:0.0
imagePullPolicy: Always
ports:
- containerPort: 443
name: https
36 changes: 7 additions & 29 deletions examples/sls.yaml
Original file line number Diff line number Diff line change
@@ -64,36 +64,14 @@ spec:
- type: External
external:
metric:
name: sls_ingress_qps
name: sls_ingress_predict
selector:
matchLabels:
#sls.project: "k8s-log-c550367cdf1e84dfabab013b277cc6bc2"
sls.project: ""
#sls.logstore: "nginx-ingress"
sls.logstore: ""
#sls.ingress.route: "default-nginx-80"
sls.ingress.route: ""
sls.project: "k8s-log-c0ae5df15fbf34b47ba3a9684e6ee2bee"
sls.logstore: "nginx-ingress"
sls.ml.logstore: "ml-res"
sls.ingress.route: "prod-serviceA-8080"
sls.query.delay: "20"
target:
type: AverageValue
averageValue: 10
- type: External
external:
metric:
name: sls_ingress_latency_p9999
selector:
matchLabels:
# default ingress log project is k8s-log-clusterId
# sls.project: "k8s-log-c550367cdf1e84dfabab013b277cc6bc2"
sls.project: ""
# default ingress logstre is nginx-ingress
# sls.logstore: "nginx-ingress"
sls.logstore: ""
# namespace-svc-port
# sls.ingress.route: "default-nginx-80"
sls.ingress.route: ""
# sls vpc endpoint, default true
# sls.internal.endpoint: true
target:
type: Value
# sls_ingress_latency_p9999 > 10ms
value: 10
averageValue: 50
12 changes: 8 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
@@ -81,7 +81,8 @@ func main() {
//
alibabaCloudProvider, err := makeAlibabaCloudProvider(cmd)
if err != nil {
klog.Fatalf("unable to construct alibabCloudProvider: %v", err)
//klog.Fatalf("unable to construct alibabCloudProvider: %v", err)
klog.Errorf("unable to construct alibabCloudProvider: %v", err)
}

if alibabaCloudProvider != nil {
@@ -90,12 +91,14 @@ func main() {

// load the config
if err := cmd.loadConfig(); err != nil {
klog.Fatalf("unable to load metrics discovery config: %v", err)
//klog.Fatalf("unable to load metrics discovery config: %v", err)
klog.Errorf("unable to load metrics discovery config: %v", err)
}

prometheusProvider, err := makePrometheusProvider(cmd, stopCh)
if err != nil {
klog.Fatalf("unable to construct prometheusProvider: %v", err)
//klog.Fatalf("unable to construct prometheusProvider: %v", err)
klog.Errorf("unable to construct prometheusProvider: %v", err)
}

if prometheusProvider != nil {
@@ -107,7 +110,8 @@ func main() {
}()

if err := cmd.Run(stopCh); err != nil {
klog.Fatalf("Failed to run alibaba-cloud-metrics-adapter: %v", err)
//klog.Fatalf("Failed to run alibaba-cloud-metrics-adapter: %v", err)
klog.Errorf("Failed to run alibaba-cloud-metrics-adapter: %v", err)
}
}

111 changes: 111 additions & 0 deletions pkg/metrics/sls/ingress.go
Original file line number Diff line number Diff line change
@@ -3,7 +3,9 @@ package sls
import (
"errors"
"fmt"
"math"
"strconv"
"strings"
"time"

"regexp"
@@ -33,6 +35,7 @@ func (ss *SLSMetricSource) getSLSIngressQuery(params *SLSIngressParams, metricNa
queryRealBegin := now - int64(params.DelaySeconds) - int64(params.Interval)
end = now - int64(params.DelaySeconds)
begin = now - 100

if len(params.Route) == 0 {
params.Route = "*"
}
@@ -57,6 +60,7 @@ func (ss *SLSMetricSource) getSLSIngressQuery(params *SLSIngressParams, metricNa
query = ""
}
query = fmt.Sprintf("* and proxy_upstream_name: %s | SELECT %s as value from log WHERE __time__ >= %d and __time__ < %d", params.Route, queryItem, queryRealBegin, end)

return
}

@@ -124,3 +128,110 @@ func (ss *SLSMetricSource) getSLSIngressMetrics(namespace string, requirements l
}
return values, errors.New("Query sls timeout,it might because of too many logs.")
}

func (ss *SLSMetricSource) getSLSIngressPredictQuery(params *SLSIngressParams, metricName string) (begin int64, end int64, query string) {
now := time.Now().Unix()

switch metricName {
case SLS_INGRESS_QPM:
end = (now / 60) * 60
begin = (now / 60 - 10) * 60
var querySearch string = fmt.Sprintf(`proxy_upstream_name: '%v' or proxy_alternative_upstream_name: '%v'`, params.Route, params.Route)
query = fmt.Sprintf(`%v | select array_agg(to_unixtime(time)) as ts, array_agg(num) as ds from ( select date_trunc('minute', __time__) as time, COUNT(*) as num from log group by time ) limit 1000`, querySearch)
case SLS_INGRESS_PREDICT:
end = now / 60 * 60 - 60
begin = end - 60
if len(params.Route) == 0 {
params.Route = "*"
}
var querySearch string = fmt.Sprintf(`__tag__:__model_type__: predict and '%v'`, params.Route)
var queryItem string = fmt.Sprintf("json_extract(result, '$.ts') as ts, json_extract(result, '$.ds') as ds")
var filterItem string = fmt.Sprintf("json_extract_scalar(meta, '$.logstore_name') = '%v' and json_extract_scalar(meta, '$.project_name') = '%v'", params.LogStore, params.Project)
query = fmt.Sprintf("%v | SELECT %v from log WHERE __time__ >= %d and __time__ < %d and %v limit 1000", querySearch, queryItem, begin, end, filterItem)
}
return
}

func (ss *SLSMetricSource) getSLSIngressPredictMetrics(namespace string, requirements labels.Requirements, metricName string) (string, int64, error) {
params, err := getSLSParams(requirements)
if err != nil {
return metricName, -1, fmt.Errorf("failed to get sls params,because of %v", err)
}

client, err := ss.Client(params.Internal)
if err != nil {
log.Errorf("Failed to create sls client, because of %v", err)
return metricName, -1, err
}

begin, end, query := ss.getSLSIngressPredictQuery(params, metricName)
fmt.Println("begin", begin, "end", end, "query", query)

if query == "" {
log.Errorf("The metric you specific is not supported.")
return metricName, -1, errors.New("MetricNotSupport")
}

var queryRsp *slssdk.GetLogsResponse
for i := 0; i < params.MaxRetry; i++ {
logstoreName := ""
switch metricName {
case SLS_INGRESS_QPM:
logstoreName = params.LogStore
case SLS_INGRESS_PREDICT:
logstoreName = params.MlLogStore
}
queryRsp, err = client.GetLogs(params.Project, logstoreName, "", begin, end, query, 100, 0, false)
if err != nil || len(queryRsp.Logs) == 0 {
return metricName, -1, err
}

// if there are too many logs in sls, query may be not completed, we should retry
if !queryRsp.IsComplete() {
continue
}

expectScore := -1.0
for _, logCell := range queryRsp.Logs {
ds := logCell["ds"]
content := ds[1:len(ds)-1]
if len(content) <= 0 {
if expectScore <= 0.0 {
expectScore = 0.0
}
} else {
dsString := strings.Split(ds[1:len(ds)-1], ",")

// 计算未来几分钟的最大值,需要提前预留对应的资源
maxValue := -1.0
n := len(dsString)
for i := 0; i < n; i++ {
if v, e := strconv.ParseFloat(dsString[i], 64); e == nil {
if maxValue < v {
maxValue = v
}
} else {
err = e
}
}
if maxValue > expectScore {
expectScore = maxValue
}
}
}

fmt.Println("Project", params.Project, "LogstoreName", logstoreName, "expectScore", expectScore, "timestamp", metav1.Now())
if err != nil {
return metricName, -1, err
}

//values = append(values, external_metrics.ExternalMetricValue{
// MetricName: metricName,
// Value: *resource.NewQuantity(int64(expectScore), resource.DecimalSI),
// Timestamp: metav1.Now(),
//})

return metricName, int64(math.Ceil(expectScore)), err
}
return metricName, -1, errors.New("Query sls timeout,it might because of too many logs.")
}
54 changes: 52 additions & 2 deletions pkg/metrics/sls/sls.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,8 @@ import (
"github.com/AliyunContainerService/alibaba-cloud-metrics-adapter/pkg/utils"
"github.com/aliyun/aliyun-log-go-sdk"
p "github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
log "k8s.io/klog"
"k8s.io/metrics/pkg/apis/external_metrics"
@@ -14,15 +16,18 @@ import (

const (
SLS_INGRESS_QPS = "sls_ingress_qps"
SLS_INGRESS_QPM = "sls_ingress_qpm"
SLS_INGRESS_LATENCY_AVG = "sls_ingress_latency_avg"
SLS_INGRESS_LATENCY_P50 = "sls_ingress_latency_p50"
SLS_INGRESS_LATENCY_P95 = "sls_ingress_latency_p95"
SLS_INGRESS_LATENCY_P9999 = "sls_ingress_latency_p9999"
SLS_INGRESS_LATENCY_P99 = "sls_ingress_latency_p99"
SLS_INGRESS_INFLOW = "sls_ingress_inflow" // byte per second
SLS_INGRESS_INFLOW = "sls_ingress_inflow" // byte per second
SLS_INGRESS_PREDICT = "sls_ingress_predict" // when use this metric type, should start sls predict job

SLS_LABEL_PROJECT = "sls.project"
SLS_LABEL_LOGSTORE = "sls.logstore"
SLS_LABEL_ML_LOGSTORE = "sls.ml.logstore"
SLS_LABEL_QUERY_INTERVAL = "sls.query.interval" // query interval seconds, min val 15s
SLS_LABEL_QUERY_DELAY = "sls.query.delay" // query delay seconds, default 0s
SLS_LABEL_QUERY_MAX_RETRY = "sls.query.max_retry" // max retry, default 5
@@ -66,10 +71,49 @@ func (ss *SLSMetricSource) GetExternalMetricInfoList() []p.ExternalMetricInfo {
metricInfoList = append(metricInfoList, p.ExternalMetricInfo{
Metric: SLS_INGRESS_INFLOW,
})
// ingress predict
metricInfoList = append(metricInfoList, p.ExternalMetricInfo{
Metric: SLS_INGRESS_PREDICT,
})
return metricInfoList
}

func (ss *SLSMetricSource) GetExternalMetric(info p.ExternalMetricInfo, namespace string, requirements labels.Requirements) (values []external_metrics.ExternalMetricValue, err error) {
values, err = ss.getSLSIngressMetrics(namespace, requirements, info.Metric)
switch info.Metric {
case SLS_INGRESS_PREDICT, SLS_INGRESS_QPM:
var resErr error = nil
var predTargetVal int64 = -1
if info.Metric == SLS_INGRESS_PREDICT {
_, predTargetVal, resErr = ss.getSLSIngressPredictMetrics(namespace, requirements, info.Metric)
}
_, baseVal, errBase := ss.getSLSIngressPredictMetrics(namespace, requirements, SLS_INGRESS_QPS)

var targetVal int64 = -1
if predTargetVal > baseVal {
targetVal = predTargetVal
} else {
targetVal = baseVal
}

if targetVal <= 0 {
log.Warning("Failed to Get Ingress Predict Value from SLS, because of model not finished ! We should use ingress qps instead !")
if resErr != nil {
err = resErr
}
if errBase != nil {
err = errBase
}
} else {
values = append(values, external_metrics.ExternalMetricValue{
MetricName: info.Metric,
Value: *resource.NewQuantity(targetVal, resource.DecimalSI),
Timestamp: metav1.Now(),
})
err = nil
}
default:
values, err = ss.getSLSIngressMetrics(namespace, requirements, info.Metric)
}
if err != nil {
log.Warningf("Failed to GetExternalMetric %s,because of %v", info.Metric, err)
}
@@ -106,6 +150,7 @@ func getSLSParams(requirements labels.Requirements) (params *SLSIngressParams, e
Internal: true,
},
}
log.Info(requirements)
for _, r := range requirements {

if len(r.Values().List()) <= 0 {
@@ -120,6 +165,8 @@ func getSLSParams(requirements labels.Requirements) (params *SLSIngressParams, e
params.Project = value
case SLS_LABEL_LOGSTORE:
params.LogStore = value
case SLS_LABEL_ML_LOGSTORE:
params.MlLogStore = value
case SLS_LABEL_INGRESS_ROUTE:
params.Route = value
case SLS_LABEL_QUERY_INTERVAL:
@@ -160,13 +207,16 @@ func getSLSParams(requirements labels.Requirements) (params *SLSIngressParams, e

}

log.Info(params)

return params, nil
}

// Global params
type SLSGlobalParams struct {
Project string
LogStore string
MlLogStore string
Interval int
DelaySeconds int
MaxRetry int