diff --git a/go.mod b/go.mod index c4a9f337ef03e..f6b5af8bb611e 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/google/uuid v1.2.0 github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.4.2 - github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca + github.com/grafana/dskit v0.0.0-20220708141012-99f3d0043c23 github.com/grafana/go-gelf/v2 v2.0.1 github.com/grafana/regexp v0.0.0-20220304100321-149c8afcd6cb github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 @@ -92,7 +92,7 @@ require ( github.com/thanos-io/thanos v0.22.0 github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448 github.com/uber/jaeger-client-go v2.30.0+incompatible - github.com/weaveworks/common v0.0.0-20211015155308-ebe5bdc2c89e + github.com/weaveworks/common v0.0.0-20220629114710-e3b70df0f08b github.com/xdg-go/scram v1.0.2 go.etcd.io/bbolt v1.3.6 go.uber.org/atomic v1.9.0 @@ -232,7 +232,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/alertmanager v0.23.1-0.20210914172521-e35efbddb66a // indirect github.com/prometheus/common/sigv4 v0.1.0 // indirect - github.com/prometheus/node_exporter v1.0.0-rc.0.0.20200428091818-01054558c289 // indirect + github.com/prometheus/exporter-toolkit v0.7.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/rs/xid v1.2.1 // indirect @@ -326,4 +326,4 @@ exclude k8s.io/client-go v8.0.0+incompatible // Replace memberlist with our fork which includes some fixes that haven't been // merged upstream yet. -replace github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167 +replace github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91 diff --git a/go.sum b/go.sum index 9f219fdd26508..bf4a83f17d063 100644 --- a/go.sum +++ b/go.sum @@ -1039,14 +1039,14 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM= -github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca h1:0qHzm6VS0bCsSWKHuyfpt+pdpyScdZbzY/IFIyKSYOk= -github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca/go.mod h1:q51XdMLLHNZJSG6KOGujC20ed2OoLFdx0hBmOEVfRs0= +github.com/grafana/dskit v0.0.0-20220708141012-99f3d0043c23 h1:VF+BC/NBcxMP33P04x9j+4cSa1aBKafoS5RSVYtT4ic= +github.com/grafana/dskit v0.0.0-20220708141012-99f3d0043c23/go.mod h1:D5GdDQDsPN12+eGhq+lSCY4o/glBYO6NC8CRkzb23gs= github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak= github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85/go.mod h1:crI9WX6p0IhrqB+DqIUHulRW853PaNFf7o4UprV//3I= -github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167 h1:PgEQkGHR4YimSCEGT5IoswN9gJKZDVskf+he6UClCLw= -github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= +github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91 h1:/NipyHnOmvRsVzj81j2qE0VxsvsqhOB0f4vJIhk2qCQ= +github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= github.com/grafana/regexp v0.0.0-20220202152315-e74e38789280/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A= github.com/grafana/regexp v0.0.0-20220304100321-149c8afcd6cb h1:wwzNkyaQwcXCzQuKoWz3lwngetmcyg+EhW0fF5lz73M= github.com/grafana/regexp v0.0.0-20220304100321-149c8afcd6cb/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A= @@ -1673,7 +1673,6 @@ github.com/prometheus/exporter-toolkit v0.6.1/go.mod h1:ZUBIj498ePooX9t/2xtDjeQY github.com/prometheus/exporter-toolkit v0.7.0/go.mod h1:ZUBIj498ePooX9t/2xtDjeQYwvRpiPP2lh5u4iblj2g= github.com/prometheus/exporter-toolkit v0.7.1 h1:c6RXaK8xBVercEeUQ4tRNL8UGWzDHfvj9dseo1FcK1Y= github.com/prometheus/exporter-toolkit v0.7.1/go.mod h1:ZUBIj498ePooX9t/2xtDjeQYwvRpiPP2lh5u4iblj2g= -github.com/prometheus/node_exporter v1.0.0-rc.0.0.20200428091818-01054558c289 h1:dTUS1vaLWq+Y6XKOTnrFpoVsQKLCbCp1OLj24TDi7oM= github.com/prometheus/node_exporter v1.0.0-rc.0.0.20200428091818-01054558c289/go.mod h1:FGbBv5OPKjch+jNUJmEQpMZytIdyW0NdBtWFcfSKusc= github.com/prometheus/procfs v0.0.0-20180125133057-cb4147076ac7/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20180408092902-8b1c2da0d56d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= @@ -1888,8 +1887,8 @@ github.com/vmware/govmomi v0.18.0/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59b github.com/vmware/govmomi v0.19.0/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU= github.com/wavefronthq/wavefront-sdk-go v0.9.2/go.mod h1:hQI6y8M9OtTCtc0xdwh+dCER4osxXdEAeCpacjpDZEU= github.com/weaveworks/common v0.0.0-20210913144402-035033b78a78/go.mod h1:YU9FvnS7kUnRt6HY10G+2qHkwzP3n3Vb1XsXDsJTSp8= -github.com/weaveworks/common v0.0.0-20211015155308-ebe5bdc2c89e h1:B0gVGyVpjfWJWSRe027EkhmEype0a0Dt2uHVxcPrhfs= -github.com/weaveworks/common v0.0.0-20211015155308-ebe5bdc2c89e/go.mod h1:GWX2dQ7yjrgvqH0+d3kCJC5bsY8oOFwqjxFMHaRK4/k= +github.com/weaveworks/common v0.0.0-20220629114710-e3b70df0f08b h1:SJ5hUUQFMfIQ55XT06vnCUqwpxz3HddMkLsYl/nGOvU= +github.com/weaveworks/common v0.0.0-20220629114710-e3b70df0f08b/go.mod h1:YfOOLoW1Q/jIIu0WLeSwgStmrKjuJEZSKTAUc+0KFvE= github.com/weaveworks/promrus v1.2.0 h1:jOLf6pe6/vss4qGHjXmGz4oDJQA+AOCqEL3FvvZGz7M= github.com/weaveworks/promrus v1.2.0/go.mod h1:SaE82+OJ91yqjrE1rsvBWVzNZKcHYFtMUyS1+Ogs/KA= github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index ab0a2526fb7f5..b62c4980ab630 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -6,7 +6,6 @@ import ( "time" "github.com/grafana/dskit/grpcclient" - dsmiddleware "github.com/grafana/dskit/middleware" "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" @@ -88,14 +87,14 @@ func instrumentation(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamC unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), middleware.ClientUserHeaderInterceptor, - dsmiddleware.PrometheusGRPCUnaryInstrumentation(ingesterClientRequestDuration), + middleware.UnaryClientInstrumentInterceptor(ingesterClientRequestDuration), ) var streamInterceptors []grpc.StreamClientInterceptor streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...) streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()), middleware.StreamClientUserHeaderInterceptor, - dsmiddleware.PrometheusGRPCStreamInstrumentation(ingesterClientRequestDuration), + middleware.StreamClientInstrumentInterceptor(ingesterClientRequestDuration), ) return unaryInterceptors, streamInterceptors diff --git a/pkg/querier/worker/scheduler_processor.go b/pkg/querier/worker/scheduler_processor.go index ed18897ec44d1..e0d4dc8ff1715 100644 --- a/pkg/querier/worker/scheduler_processor.go +++ b/pkg/querier/worker/scheduler_processor.go @@ -11,7 +11,6 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/grpcclient" - dskit_middleware "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/services" otgrpc "github.com/opentracing-contrib/go-grpc" @@ -188,7 +187,7 @@ func (sp *schedulerProcessor) createFrontendClient(addr string) (client.PoolClie opts, err := sp.grpcConfig.DialOption([]grpc.UnaryClientInterceptor{ otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), middleware.ClientUserHeaderInterceptor, - dskit_middleware.PrometheusGRPCUnaryInstrumentation(sp.metrics.frontendClientRequestDuration), + middleware.UnaryClientInstrumentInterceptor(sp.metrics.frontendClientRequestDuration), }, nil) if err != nil { return nil, err diff --git a/pkg/storage/chunk/client/gcp/instrumentation.go b/pkg/storage/chunk/client/gcp/instrumentation.go index 2d217d8803c3e..c2d703fcbcddf 100644 --- a/pkg/storage/chunk/client/gcp/instrumentation.go +++ b/pkg/storage/chunk/client/gcp/instrumentation.go @@ -5,11 +5,11 @@ import ( "strconv" "time" - "github.com/grafana/dskit/middleware" otgrpc "github.com/opentracing-contrib/go-grpc" opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/middleware" "google.golang.org/api/option" "google.golang.org/grpc" ) @@ -39,11 +39,11 @@ var ( func bigtableInstrumentation() ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { return []grpc.UnaryClientInterceptor{ otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), - middleware.PrometheusGRPCUnaryInstrumentation(bigtableRequestDuration), + middleware.UnaryClientInstrumentInterceptor(bigtableRequestDuration), }, []grpc.StreamClientInterceptor{ otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()), - middleware.PrometheusGRPCStreamInstrumentation(bigtableRequestDuration), + middleware.StreamClientInstrumentInterceptor(bigtableRequestDuration), } } diff --git a/production/ksonnet/loki/memberlist.libsonnet b/production/ksonnet/loki/memberlist.libsonnet index c393990c0efbf..38ed04f6c35ca 100644 --- a/production/ksonnet/loki/memberlist.libsonnet +++ b/production/ksonnet/loki/memberlist.libsonnet @@ -3,7 +3,15 @@ 'memberlist.abort-if-join-fails': false, 'memberlist.bind-port': gossipRingPort, 'memberlist.join': 'gossip-ring.%s.svc.cluster.local:%d' % [$._config.namespace, gossipRingPort], - }, + } + ( + if $._config.memberlist_cluster_label == '' then {} else { + 'memberlist.cluster-label': $._config.memberlist_cluster_label, + } + ) + ( + if !$._config.memberlist_cluster_label_verification_disabled then {} else { + 'memberlist.cluster-label-verification-disabled': true, + } + ), local setupGossipRing(storeOption, consulHostnameOption, multiStoreOptionsPrefix) = if $._config.multikv_migration_enabled then { [storeOption]: 'multi', @@ -21,6 +29,11 @@ // but "primary" KV depends on value of multikv_primary. memberlist_ring_enabled: false, + // Configures the memberlist cluster label. When verification is enabled, a memberlist member rejects any packet or stream + // with a mismatching cluster label. + memberlist_cluster_label: '', + memberlist_cluster_label_verification_disabled: false, + // Migrating from consul to memberlist is a multi-step process: // 1) Enable multikv_migration_enabled, with primary=consul, secondary=memberlist, and multikv_mirror_enabled=false, restart components. // 2) Set multikv_mirror_enabled=true. This doesn't require restart. diff --git a/vendor/github.com/grafana/dskit/grpcclient/instrumentation.go b/vendor/github.com/grafana/dskit/grpcclient/instrumentation.go index b22a58834058b..c8d3528895a27 100644 --- a/vendor/github.com/grafana/dskit/grpcclient/instrumentation.go +++ b/vendor/github.com/grafana/dskit/grpcclient/instrumentation.go @@ -6,18 +6,16 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/weaveworks/common/middleware" "google.golang.org/grpc" - - dsmiddleware "github.com/grafana/dskit/middleware" ) func Instrument(requestDuration *prometheus.HistogramVec) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { return []grpc.UnaryClientInterceptor{ otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), middleware.ClientUserHeaderInterceptor, - dsmiddleware.PrometheusGRPCUnaryInstrumentation(requestDuration), + middleware.UnaryClientInstrumentInterceptor(requestDuration), }, []grpc.StreamClientInterceptor{ otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()), middleware.StreamClientUserHeaderInterceptor, - dsmiddleware.PrometheusGRPCStreamInstrumentation(requestDuration), + middleware.StreamClientInstrumentInterceptor(requestDuration), } } diff --git a/vendor/github.com/grafana/dskit/kv/consul/client.go b/vendor/github.com/grafana/dskit/kv/consul/client.go index 0fefe4314cccf..c7ef8d372e709 100644 --- a/vendor/github.com/grafana/dskit/kv/consul/client.go +++ b/vendor/github.com/grafana/dskit/kv/consul/client.go @@ -46,10 +46,10 @@ type Config struct { ConsistentReads bool `yaml:"consistent_reads" category:"advanced"` WatchKeyRateLimit float64 `yaml:"watch_rate_limit" category:"advanced"` // Zero disables rate limit WatchKeyBurstSize int `yaml:"watch_burst_size" category:"advanced"` // Burst when doing rate-limit, defaults to 1 + CasRetryDelay time.Duration `yaml:"cas_retry_delay" category:"advanced"` // Used in tests only. - MaxCasRetries int `yaml:"-"` - CasRetryDelay time.Duration `yaml:"-"` + MaxCasRetries int `yaml:"-"` } type kv interface { @@ -78,6 +78,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, prefix string) { f.BoolVar(&cfg.ConsistentReads, prefix+"consul.consistent-reads", false, "Enable consistent reads to Consul.") f.Float64Var(&cfg.WatchKeyRateLimit, prefix+"consul.watch-rate-limit", 1, "Rate limit when watching key or prefix in Consul, in requests per second. 0 disables the rate limit.") f.IntVar(&cfg.WatchKeyBurstSize, prefix+"consul.watch-burst-size", 1, "Burst size used in rate limit. Values less than 1 are treated as 1.") + f.DurationVar(&cfg.CasRetryDelay, prefix+"consul.cas-retry-delay", 1*time.Second, "Maximum duration to wait before retrying a Compare And Swap (CAS) operation.") } // NewClient returns a new Client. diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/http_status_handler.go b/vendor/github.com/grafana/dskit/kv/memberlist/http_status_handler.go new file mode 100644 index 0000000000000..3ecd83e387b64 --- /dev/null +++ b/vendor/github.com/grafana/dskit/kv/memberlist/http_status_handler.go @@ -0,0 +1,219 @@ +package memberlist + +import ( + _ "embed" + "encoding/json" + "fmt" + "html/template" + "net/http" + "sort" + "strconv" + "strings" + "time" + + "github.com/hashicorp/memberlist" +) + +// HTTPStatusHandler is a http.Handler with status information about memberlist. +type HTTPStatusHandler struct { + kvs *KVInitService + tpl *template.Template +} + +// StatusPageData represents the data passed to the template rendered by HTTPStatusHandler +type StatusPageData struct { + Now time.Time + Memberlist *memberlist.Memberlist + SortedMembers []*memberlist.Node + Store map[string]ValueDesc + MessageHistoryBufferBytes int + SentMessages []Message + ReceivedMessages []Message +} + +// NewHTTPStatusHandler creates a new HTTPStatusHandler that will render the provided template using the data from StatusPageData. +func NewHTTPStatusHandler(kvs *KVInitService, tpl *template.Template) HTTPStatusHandler { + return HTTPStatusHandler{kvs, tpl} +} + +func (h HTTPStatusHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + kv := h.kvs.getKV() + if kv == nil { + w.Header().Set("Content-Type", "text/plain") + // Ignore inactionable errors. + _, _ = w.Write([]byte("This instance doesn't use memberlist.")) + return + } + + const ( + downloadKeyParam = "downloadKey" + viewKeyParam = "viewKey" + viewMsgParam = "viewMsg" + deleteMessagesParam = "deleteMessages" + ) + + if err := req.ParseForm(); err == nil { + if req.Form[downloadKeyParam] != nil { + downloadKey(w, kv, kv.storeCopy(), req.Form[downloadKeyParam][0]) // Use first value, ignore the rest. + return + } + + if req.Form[viewKeyParam] != nil { + viewKey(w, kv.storeCopy(), req.Form[viewKeyParam][0], getFormat(req)) + return + } + + if req.Form[viewMsgParam] != nil { + msgID, err := strconv.Atoi(req.Form[viewMsgParam][0]) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + sent, received := kv.getSentAndReceivedMessages() + + for _, m := range append(sent, received...) { + if m.ID == msgID { + viewMessage(w, kv, m, getFormat(req)) + return + } + } + + http.Error(w, "message not found", http.StatusNotFound) + return + } + + if len(req.Form[deleteMessagesParam]) > 0 && req.Form[deleteMessagesParam][0] == "true" { + kv.deleteSentReceivedMessages() + + // Redirect back. + w.Header().Set("Location", "?"+deleteMessagesParam+"=false") + w.WriteHeader(http.StatusFound) + return + } + } + + members := kv.memberlist.Members() + sort.Slice(members, func(i, j int) bool { + return members[i].Name < members[j].Name + }) + + sent, received := kv.getSentAndReceivedMessages() + + v := StatusPageData{ + Now: time.Now(), + Memberlist: kv.memberlist, + SortedMembers: members, + Store: kv.storeCopy(), + MessageHistoryBufferBytes: kv.cfg.MessageHistoryBufferBytes, + SentMessages: sent, + ReceivedMessages: received, + } + + accept := req.Header.Get("Accept") + if strings.Contains(accept, "application/json") { + w.Header().Set("Content-Type", "application/json") + + if err := json.NewEncoder(w).Encode(v); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + return + } + + w.Header().Set("Content-Type", "text/html") + if err := h.tpl.Execute(w, v); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + +func getFormat(req *http.Request) string { + const viewFormat = "format" + + format := "" + if len(req.Form[viewFormat]) > 0 { + format = req.Form[viewFormat][0] + } + return format +} + +func viewMessage(w http.ResponseWriter, kv *KV, msg Message, format string) { + c := kv.GetCodec(msg.Pair.Codec) + if c == nil { + http.Error(w, "codec not found", http.StatusNotFound) + return + } + + val, err := c.Decode(msg.Pair.Value) + if err != nil { + http.Error(w, fmt.Sprintf("failed to decode: %v", err), http.StatusInternalServerError) + return + } + + formatValue(w, val, format) +} + +func viewKey(w http.ResponseWriter, store map[string]ValueDesc, key string, format string) { + if store[key].value == nil { + http.Error(w, "value not found", http.StatusNotFound) + return + } + + formatValue(w, store[key].value, format) +} + +func formatValue(w http.ResponseWriter, val interface{}, format string) { + w.WriteHeader(200) + w.Header().Add("content-type", "text/plain") + + switch format { + case "json", "json-pretty": + enc := json.NewEncoder(w) + if format == "json-pretty" { + enc.SetIndent("", " ") + } + + err := enc.Encode(val) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + + default: + _, _ = fmt.Fprintf(w, "%#v", val) + } +} + +func downloadKey(w http.ResponseWriter, kv *KV, store map[string]ValueDesc, key string) { + if store[key].value == nil { + http.Error(w, "value not found", http.StatusNotFound) + return + } + + val := store[key] + + c := kv.GetCodec(store[key].CodecID) + if c == nil { + http.Error(w, "codec not found", http.StatusNotFound) + return + } + + encoded, err := c.Encode(val.value) + if err != nil { + http.Error(w, fmt.Sprintf("failed to encode: %v", err), http.StatusInternalServerError) + return + } + + w.Header().Add("content-type", "application/octet-stream") + // Set content-length so that client knows whether it has received full response or not. + w.Header().Add("content-length", strconv.Itoa(len(encoded))) + w.Header().Add("content-disposition", fmt.Sprintf("attachment; filename=%d-%s", val.Version, key)) + w.WriteHeader(200) + + // Ignore errors, we cannot do anything about them. + _, _ = w.Write(encoded) +} + +//go:embed status.gohtml +var defaultPageContent string +var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.FuncMap{ + "StringsJoin": strings.Join, +}).Parse(defaultPageContent)) diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go b/vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go index 1a8313cded151..5b505a54882d8 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go @@ -2,19 +2,10 @@ package memberlist import ( "context" - _ "embed" - "encoding/json" - "fmt" - "html/template" "net/http" - "sort" - "strconv" - "strings" "sync" - "time" "github.com/go-kit/log" - "github.com/hashicorp/memberlist" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" @@ -95,191 +86,5 @@ func (kvs *KVInitService) stopping(_ error) error { } func (kvs *KVInitService) ServeHTTP(w http.ResponseWriter, req *http.Request) { - kv := kvs.getKV() - if kv == nil { - w.Header().Set("Content-Type", "text/plain") - // Ignore inactionable errors. - _, _ = w.Write([]byte("This instance doesn't use memberlist.")) - return - } - - const ( - downloadKeyParam = "downloadKey" - viewKeyParam = "viewKey" - viewMsgParam = "viewMsg" - deleteMessagesParam = "deleteMessages" - ) - - if err := req.ParseForm(); err == nil { - if req.Form[downloadKeyParam] != nil { - downloadKey(w, kv, kv.storeCopy(), req.Form[downloadKeyParam][0]) // Use first value, ignore the rest. - return - } - - if req.Form[viewKeyParam] != nil { - viewKey(w, kv.storeCopy(), req.Form[viewKeyParam][0], getFormat(req)) - return - } - - if req.Form[viewMsgParam] != nil { - msgID, err := strconv.Atoi(req.Form[viewMsgParam][0]) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - sent, received := kv.getSentAndReceivedMessages() - - for _, m := range append(sent, received...) { - if m.ID == msgID { - viewMessage(w, kv, m, getFormat(req)) - return - } - } - - http.Error(w, "message not found", http.StatusNotFound) - return - } - - if len(req.Form[deleteMessagesParam]) > 0 && req.Form[deleteMessagesParam][0] == "true" { - kv.deleteSentReceivedMessages() - - // Redirect back. - w.Header().Set("Location", "?"+deleteMessagesParam+"=false") - w.WriteHeader(http.StatusFound) - return - } - } - - members := kv.memberlist.Members() - sort.Slice(members, func(i, j int) bool { - return members[i].Name < members[j].Name - }) - - sent, received := kv.getSentAndReceivedMessages() - - v := pageData{ - Now: time.Now(), - Memberlist: kv.memberlist, - SortedMembers: members, - Store: kv.storeCopy(), - SentMessages: sent, - ReceivedMessages: received, - } - - accept := req.Header.Get("Accept") - if strings.Contains(accept, "application/json") { - w.Header().Set("Content-Type", "application/json") - - if err := json.NewEncoder(w).Encode(v); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - return - } - - w.Header().Set("Content-Type", "text/html") - if err := defaultPageTemplate.Execute(w, v); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } -} - -func getFormat(req *http.Request) string { - const viewFormat = "format" - - format := "" - if len(req.Form[viewFormat]) > 0 { - format = req.Form[viewFormat][0] - } - return format -} - -func viewMessage(w http.ResponseWriter, kv *KV, msg message, format string) { - c := kv.GetCodec(msg.Pair.Codec) - if c == nil { - http.Error(w, "codec not found", http.StatusNotFound) - return - } - - val, err := c.Decode(msg.Pair.Value) - if err != nil { - http.Error(w, fmt.Sprintf("failed to decode: %v", err), http.StatusInternalServerError) - return - } - - formatValue(w, val, format) -} - -func viewKey(w http.ResponseWriter, store map[string]valueDesc, key string, format string) { - if store[key].value == nil { - http.Error(w, "value not found", http.StatusNotFound) - return - } - - formatValue(w, store[key].value, format) -} - -func formatValue(w http.ResponseWriter, val interface{}, format string) { - w.WriteHeader(200) - w.Header().Add("content-type", "text/plain") - - switch format { - case "json", "json-pretty": - enc := json.NewEncoder(w) - if format == "json-pretty" { - enc.SetIndent("", " ") - } - - err := enc.Encode(val) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - - default: - _, _ = fmt.Fprintf(w, "%#v", val) - } -} - -func downloadKey(w http.ResponseWriter, kv *KV, store map[string]valueDesc, key string) { - if store[key].value == nil { - http.Error(w, "value not found", http.StatusNotFound) - return - } - - val := store[key] - - c := kv.GetCodec(store[key].codecID) - if c == nil { - http.Error(w, "codec not found", http.StatusNotFound) - return - } - - encoded, err := c.Encode(val.value) - if err != nil { - http.Error(w, fmt.Sprintf("failed to encode: %v", err), http.StatusInternalServerError) - return - } - - w.Header().Add("content-type", "application/octet-stream") - // Set content-length so that client knows whether it has received full response or not. - w.Header().Add("content-length", strconv.Itoa(len(encoded))) - w.Header().Add("content-disposition", fmt.Sprintf("attachment; filename=%d-%s", val.version, key)) - w.WriteHeader(200) - - // Ignore errors, we cannot do anything about them. - _, _ = w.Write(encoded) + NewHTTPStatusHandler(kvs, defaultPageTemplate).ServeHTTP(w, req) } - -type pageData struct { - Now time.Time - Memberlist *memberlist.Memberlist - SortedMembers []*memberlist.Node - Store map[string]valueDesc - SentMessages []message - ReceivedMessages []message -} - -//go:embed status.gohtml -var defaultPageContent string -var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.FuncMap{ - "StringsJoin": strings.Join, -}).Parse(defaultPageContent)) diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go index 30f0992d35213..87d0485e52597 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go @@ -27,6 +27,7 @@ import ( const ( maxCasRetries = 10 // max retries in CAS operation noChangeDetectedRetrySleep = time.Second // how long to sleep after no change was detected in CAS + notifyMsgQueueSize = 1024 // size of buffered channels to handle memberlist messages ) // Client implements kv.Client interface, by using memberlist.KV @@ -139,6 +140,9 @@ type KVConfig struct { AdvertiseAddr string `yaml:"advertise_addr"` AdvertisePort int `yaml:"advertise_port"` + ClusterLabel string `yaml:"cluster_label" category:"experimental"` + ClusterLabelVerificationDisabled bool `yaml:"cluster_label_verification_disabled" category:"experimental"` + // List of members to join JoinMembers flagext.StringSlice `yaml:"join_members"` MinJoinBackoff time.Duration `yaml:"min_join_backoff" category:"advanced"` @@ -179,7 +183,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.DurationVar(&cfg.MinJoinBackoff, prefix+"memberlist.min-join-backoff", 1*time.Second, "Min backoff duration to join other cluster members.") f.DurationVar(&cfg.MaxJoinBackoff, prefix+"memberlist.max-join-backoff", 1*time.Minute, "Max backoff duration to join other cluster members.") f.IntVar(&cfg.MaxJoinRetries, prefix+"memberlist.max-join-retries", 10, "Max number of retries to join other cluster members.") - f.BoolVar(&cfg.AbortIfJoinFails, prefix+"memberlist.abort-if-join-fails", true, "If this node fails to join memberlist cluster, abort.") + f.BoolVar(&cfg.AbortIfJoinFails, prefix+"memberlist.abort-if-join-fails", cfg.AbortIfJoinFails, "If this node fails to join memberlist cluster, abort.") f.DurationVar(&cfg.RejoinInterval, prefix+"memberlist.rejoin-interval", 0, "If not 0, how often to rejoin the cluster. Occasional rejoin can help to fix the cluster split issue, and is harmless otherwise. For example when using only few components as a seed nodes (via -memberlist.join), then it's recommended to use rejoin. If -memberlist.join points to dynamic service that resolves to all gossiping nodes (eg. Kubernetes headless service), then rejoin is not needed.") f.DurationVar(&cfg.LeftIngestersTimeout, prefix+"memberlist.left-ingesters-timeout", 5*time.Minute, "How long to keep LEFT ingesters in the ring.") f.DurationVar(&cfg.LeaveTimeout, prefix+"memberlist.leave-timeout", 5*time.Second, "Timeout for leaving memberlist cluster.") @@ -192,6 +196,8 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.BoolVar(&cfg.EnableCompression, prefix+"memberlist.compression-enabled", mlDefaults.EnableCompression, "Enable message compression. This can be used to reduce bandwidth usage at the cost of slightly more CPU utilization.") f.StringVar(&cfg.AdvertiseAddr, prefix+"memberlist.advertise-addr", mlDefaults.AdvertiseAddr, "Gossip address to advertise to other members in the cluster. Used for NAT traversal.") f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.") + f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.") + f.BoolVar(&cfg.ClusterLabelVerificationDisabled, prefix+"memberlist.cluster-label-verification-disabled", mlDefaults.SkipInboundLabelCheck, "When true, memberlist doesn't verify that inbound packets and gossip streams have the cluster label matching the configured one. This verification should be disabled while rolling out the change to the configured cluster label in a live memberlist cluster.") cfg.TCPTransport.RegisterFlagsWithPrefix(f, prefix) } @@ -232,7 +238,7 @@ type KV struct { // KV Store. storeMu sync.Mutex - store map[string]valueDesc + store map[string]ValueDesc // Codec registry codecs map[string]codec.Codec @@ -245,12 +251,16 @@ type KV struct { // Buffers with sent and received messages. Used for troubleshooting only. // New messages are appended, old messages (based on configured size limit) removed from the front. messagesMu sync.Mutex - sentMessages []message + sentMessages []Message sentMessagesSize int - receivedMessages []message + receivedMessages []Message receivedMessagesSize int messageCounter int // Used to give each message in the sentMessages and receivedMessages a unique ID, for UI. + // Per-key value update workers + workersMu sync.Mutex + workersChannels map[string]chan valueUpdate + // closed on shutdown shutdown chan struct{} @@ -258,6 +268,7 @@ type KV struct { numberOfReceivedMessages prometheus.Counter totalSizeOfReceivedMessages prometheus.Counter numberOfInvalidReceivedMessages prometheus.Counter + numberOfDroppedMessages prometheus.Counter numberOfPulls prometheus.Counter numberOfPushes prometheus.Counter totalSizeOfPulls prometheus.Counter @@ -285,7 +296,7 @@ type KV struct { // Message describes incoming or outgoing message, and local state after applying incoming message, or state when sending message. // Fields are exported for templating to work. -type message struct { +type Message struct { ID int // Unique local ID of the message. Time time.Time // Time when message was sent or received. Size int // Message size @@ -296,21 +307,22 @@ type message struct { Changes []string // List of changes in this message (as computed by *this* node). } -type valueDesc struct { +// ValueDesc stores the value along with it's codec and local version. +type ValueDesc struct { // We store the decoded value here to prevent decoding the entire state for every // update we receive. Whilst the updates are small and fast to decode, // the total state can be quite large. // The CAS function is passed a deep copy because it modifies in-place. value Mergeable - // version (local only) is used to keep track of what we're gossiping about, and invalidate old messages - version uint + // Version (local only) is used to keep track of what we're gossiping about, and invalidate old messages. + Version uint // ID of codec used to write this value. Only used when sending full state. - codecID string + CodecID string } -func (v valueDesc) Clone() (result valueDesc) { +func (v ValueDesc) Clone() (result ValueDesc) { result = v if v.value != nil { result.value = v.value.Clone() @@ -318,8 +330,14 @@ func (v valueDesc) Clone() (result valueDesc) { return } -func (v valueDesc) String() string { - return fmt.Sprintf("version: %d, codec: %s", v.version, v.codecID) +type valueUpdate struct { + value []byte + codec codec.Codec + messageSize int +} + +func (v ValueDesc) String() string { + return fmt.Sprintf("version: %d, codec: %s", v.Version, v.CodecID) } var ( @@ -338,17 +356,17 @@ func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace mlkv := &KV{ - cfg: cfg, - logger: logger, - registerer: registerer, - provider: dnsProvider, - - store: make(map[string]valueDesc), - codecs: make(map[string]codec.Codec), - watchers: make(map[string][]chan string), - prefixWatchers: make(map[string][]chan string), - shutdown: make(chan struct{}), - maxCasRetries: maxCasRetries, + cfg: cfg, + logger: logger, + registerer: registerer, + provider: dnsProvider, + store: make(map[string]ValueDesc), + codecs: make(map[string]codec.Codec), + watchers: make(map[string][]chan string), + prefixWatchers: make(map[string][]chan string), + workersChannels: make(map[string]chan valueUpdate), + shutdown: make(chan struct{}), + maxCasRetries: maxCasRetries, } mlkv.createAndRegisterMetrics() @@ -386,12 +404,14 @@ func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) { mlCfg.AdvertiseAddr = m.cfg.AdvertiseAddr mlCfg.AdvertisePort = m.cfg.AdvertisePort + mlCfg.Label = m.cfg.ClusterLabel + mlCfg.SkipInboundLabelCheck = m.cfg.ClusterLabelVerificationDisabled + if m.cfg.NodeName != "" { mlCfg.Name = m.cfg.NodeName } if m.cfg.RandomizeNodeName { mlCfg.Name = mlCfg.Name + "-" + generateRandomSuffix(m.logger) - level.Info(m.logger).Log("msg", "Using memberlist cluster node name", "name", mlCfg.Name) } mlCfg.LogOutput = newMemberlistLoggerAdapter(m.logger, false) @@ -407,6 +427,8 @@ func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) { mlCfg.ProbeInterval = 5 * time.Second // Probe a random node every this interval. This setting is also the total timeout for the direct + indirect probes. mlCfg.ProbeTimeout = 2 * time.Second // Timeout for the direct probe. + level.Info(m.logger).Log("msg", "Using memberlist cluster label %q and node name %q", mlCfg.Label, mlCfg.Name) + return mlCfg, nil } @@ -428,7 +450,6 @@ func (m *KV) starting(_ context.Context) error { if err != nil { return fmt.Errorf("failed to create memberlist: %v", err) } - // Finish delegate initialization. m.memberlist = list m.broadcasts = &memberlist.TransmitLimitedQueue{ @@ -642,7 +663,7 @@ func (m *KV) get(key string, codec codec.Codec) (out interface{}, version uint, _, _ = v.value.RemoveTombstones(time.Time{}) } - return v.value, v.version, nil + return v.value, v.Version, nil } // WatchKey watches for value changes for given key. When value changes, 'f' function is called with the @@ -909,7 +930,7 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec return } - m.addSentMessage(message{ + m.addSentMessage(Message{ Time: time.Now(), Size: len(pairData), Pair: kvPair, @@ -930,8 +951,6 @@ func (m *KV) NodeMeta(limit int) []byte { // NotifyMsg is method from Memberlist Delegate interface // Called when single message is received, i.e. what our broadcastNewValue has sent. func (m *KV) NotifyMsg(msg []byte) { - m.initWG.Wait() - m.numberOfReceivedMessages.Inc() m.totalSizeOfReceivedMessages.Add(float64(len(msg))) @@ -956,29 +975,67 @@ func (m *KV) NotifyMsg(msg []byte) { return } - // we have a ring update! Let's merge it with our version of the ring for given key - mod, version, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec) + ch := m.getKeyWorkerChannel(kvPair.Key) + select { + case ch <- valueUpdate{value: kvPair.Value, codec: codec, messageSize: len(msg)}: + default: + m.numberOfDroppedMessages.Inc() + level.Warn(m.logger).Log("msg", "notify queue full, dropping message", "key", kvPair.Key) + } +} + +func (m *KV) getKeyWorkerChannel(key string) chan<- valueUpdate { + m.workersMu.Lock() + defer m.workersMu.Unlock() - changes := []string(nil) - if mod != nil { - changes = mod.MergeContent() + ch := m.workersChannels[key] + if ch == nil { + // spawn a key associated worker goroutine to process updates in background + ch = make(chan valueUpdate, notifyMsgQueueSize) + go m.processValueUpdate(ch, key) + + m.workersChannels[key] = ch } + return ch +} - m.addReceivedMessage(message{ - Time: time.Now(), - Size: len(msg), - Pair: kvPair, - Version: version, - Changes: changes, - }) +func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { + for { + select { + case update := <-workerCh: + // we have a value update! Let's merge it with our current version for given key + mod, version, err := m.mergeBytesValueForKey(key, update.value, update.codec) - if err != nil { - level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err) - } else if version > 0 { - m.notifyWatchers(kvPair.Key) + changes := []string(nil) + if mod != nil { + changes = mod.MergeContent() + } - // Don't resend original message, but only changes. - m.broadcastNewValue(kvPair.Key, mod, version, codec) + m.addReceivedMessage(Message{ + Time: time.Now(), + Size: update.messageSize, + Pair: KeyValuePair{ + Key: key, + Value: update.value, + Codec: update.codec.CodecID(), + }, + Version: version, + Changes: changes, + }) + + if err != nil { + level.Error(m.logger).Log("msg", "failed to store received value", "key", key, "err", err) + } else if version > 0 { + m.notifyWatchers(key) + + // Don't resend original message, but only changes. + m.broadcastNewValue(key, mod, version, update.codec) + } + + case <-m.shutdown: + // stop running on shutdown + return + } } } @@ -1033,9 +1090,9 @@ func (m *KV) LocalState(join bool) []byte { continue } - codec := m.GetCodec(val.codecID) + codec := m.GetCodec(val.CodecID) if codec == nil { - level.Error(m.logger).Log("msg", "failed to encode remote state: unknown codec for key", "codec", val.codecID, "key", key) + level.Error(m.logger).Log("msg", "failed to encode remote state: unknown codec for key", "codec", val.CodecID, "key", key) continue } @@ -1048,7 +1105,7 @@ func (m *KV) LocalState(join bool) []byte { kvPair.Reset() kvPair.Key = key kvPair.Value = encoded - kvPair.Codec = val.codecID + kvPair.Codec = val.CodecID ser, err := kvPair.Marshal() if err != nil { @@ -1068,11 +1125,11 @@ func (m *KV) LocalState(join bool) []byte { } buf.Write(ser) - m.addSentMessage(message{ + m.addSentMessage(Message{ Time: sent, Size: len(ser), Pair: kvPair, // Makes a copy of kvPair. - Version: val.version, + Version: val.Version, }) } @@ -1136,7 +1193,7 @@ func (m *KV) MergeRemoteState(data []byte, join bool) { changes = change.MergeContent() } - m.addReceivedMessage(message{ + m.addReceivedMessage(Message{ Time: received, Size: int(kvPairLength), Pair: kvPair, // Makes a copy of kvPair. @@ -1184,7 +1241,7 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui // the full state anywhere as is done elsewhere (i.e. Get/WatchKey/CAS). curr := m.store[key] // if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set. - if casVersion > 0 && curr.version != casVersion { + if casVersion > 0 && curr.Version != casVersion { return nil, 0, errVersionMismatch } result, change, err := computeNewValue(incomingValue, curr.value, casVersion > 0) @@ -1215,11 +1272,11 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui } } - newVersion := curr.version + 1 - m.store[key] = valueDesc{ + newVersion := curr.Version + 1 + m.store[key] = ValueDesc{ value: result, - version: newVersion, - codecID: codec.CodecID(), + Version: newVersion, + CodecID: codec.CodecID(), } // The "changes" returned by Merge() can contain references to the "result" @@ -1240,17 +1297,17 @@ func computeNewValue(incoming Mergeable, oldVal Mergeable, cas bool) (Mergeable, return oldVal, change, err } -func (m *KV) storeCopy() map[string]valueDesc { +func (m *KV) storeCopy() map[string]ValueDesc { m.storeMu.Lock() defer m.storeMu.Unlock() - result := make(map[string]valueDesc, len(m.store)) + result := make(map[string]ValueDesc, len(m.store)) for k, v := range m.store { result[k] = v.Clone() } return result } -func (m *KV) addReceivedMessage(msg message) { +func (m *KV) addReceivedMessage(msg Message) { if m.cfg.MessageHistoryBufferBytes == 0 { return } @@ -1264,7 +1321,7 @@ func (m *KV) addReceivedMessage(msg message) { m.receivedMessages, m.receivedMessagesSize = addMessageToBuffer(m.receivedMessages, m.receivedMessagesSize, m.cfg.MessageHistoryBufferBytes, msg) } -func (m *KV) addSentMessage(msg message) { +func (m *KV) addSentMessage(msg Message) { if m.cfg.MessageHistoryBufferBytes == 0 { return } @@ -1278,12 +1335,12 @@ func (m *KV) addSentMessage(msg message) { m.sentMessages, m.sentMessagesSize = addMessageToBuffer(m.sentMessages, m.sentMessagesSize, m.cfg.MessageHistoryBufferBytes, msg) } -func (m *KV) getSentAndReceivedMessages() (sent, received []message) { +func (m *KV) getSentAndReceivedMessages() (sent, received []Message) { m.messagesMu.Lock() defer m.messagesMu.Unlock() // Make copy of both slices. - return append([]message(nil), m.sentMessages...), append([]message(nil), m.receivedMessages...) + return append([]Message(nil), m.sentMessages...), append([]Message(nil), m.receivedMessages...) } func (m *KV) deleteSentReceivedMessages() { @@ -1296,7 +1353,7 @@ func (m *KV) deleteSentReceivedMessages() { m.receivedMessagesSize = 0 } -func addMessageToBuffer(msgs []message, size int, limit int, msg message) ([]message, int) { +func addMessageToBuffer(msgs []Message, size int, limit int, msg Message) ([]Message, int) { msgs = append(msgs, msg) size += msg.Size diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/metrics.go b/vendor/github.com/grafana/dskit/kv/memberlist/metrics.go index c7d3f01c277b6..9ab56a662db41 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/metrics.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/metrics.go @@ -36,6 +36,13 @@ func (m *KV) createAndRegisterMetrics() { Help: "Number of received broadcast user messages that were invalid. Hopefully 0.", }) + m.numberOfDroppedMessages = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ + Namespace: m.cfg.MetricsNamespace, + Subsystem: subsystem, + Name: "received_broadcasts_dropped_total", + Help: "Number of received broadcast user messages that were dropped. Hopefully 0.", + }) + m.numberOfPushes = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/status.gohtml b/vendor/github.com/grafana/dskit/kv/memberlist/status.gohtml index 3ab6d0936374b..6f845b6e06033 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/status.gohtml +++ b/vendor/github.com/grafana/dskit/kv/memberlist/status.gohtml @@ -1,4 +1,4 @@ -{{- /*gotype: github.com/grafana/dskit/kv/memberlist.statusPageData */ -}} +{{- /*gotype: github.com/grafana/dskit/kv/memberlist.StatusPageData */ -}} @@ -20,7 +20,8 @@ Key - Value Details + Codec + Version Actions @@ -29,7 +30,8 @@ {{ range $k, $v := .Store }} {{ $k }} - {{ $v }} + {{ $v.CodecID }} + {{ $v.Version }} json | json-pretty @@ -68,76 +70,83 @@

State: 0 = Alive, 1 = Suspect, 2 = Dead, 3 = Left

-

Received Messages

+

Message History

-Delete All Messages (received and sent) +{{ if .MessageHistoryBufferBytes }} - - - - - - - - - - - - +

Received Messages

- - {{ range .ReceivedMessages }} + Delete All Messages (received and sent) + +
IDTimeKeyValue in the MessageVersion After Update (0 = no change)ChangesActions
+ - - - - - - - + + + + + + + - {{ end }} - -
{{ .ID }}{{ .Time.Format "15:04:05.000" }}{{ .Pair.Key }}size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }}{{ .Version }}{{ StringsJoin .Changes ", " }} - json - | json-pretty - | struct - IDTimeKeyValue in the MessageVersion After Update (0 = no change)ChangesActions
- -

Sent Messages

- -Delete All Messages (received and sent) - - - - - - - - - - - - - - - - {{ range .SentMessages }} + + + + {{ range .ReceivedMessages }} + + + + + + + + + + {{ end }} + +
IDTimeKeyValueVersionChangesActions
{{ .ID }}{{ .Time.Format "15:04:05.000" }}{{ .Pair.Key }}size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }}{{ .Version }}{{ StringsJoin .Changes ", " }} + json + | json-pretty + | struct +
+ +

Sent Messages

+ + Delete All Messages (received and sent) + + + - - - - - - - + + + + + + + - {{ end }} - -
{{ .ID }}{{ .Time.Format "15:04:05.000" }}{{ .Pair.Key }}size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }}{{ .Version }}{{ StringsJoin .Changes ", " }} - json - | json-pretty - | struct - IDTimeKeyValueVersionChangesActions
+ + + + {{ range .SentMessages }} + + {{ .ID }} + {{ .Time.Format "15:04:05.000" }} + {{ .Pair.Key }} + size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }} + {{ .Version }} + {{ StringsJoin .Changes ", " }} + + json + | json-pretty + | struct + + + {{ end }} + + +{{ else }} +

Message history buffer is disabled, refer to the configuration to enable it in order to troubleshoot the message history.

+{{ end }} \ No newline at end of file diff --git a/vendor/github.com/grafana/dskit/middleware/grpc.go b/vendor/github.com/grafana/dskit/middleware/grpc.go deleted file mode 100644 index 66f0d376608b4..0000000000000 --- a/vendor/github.com/grafana/dskit/middleware/grpc.go +++ /dev/null @@ -1,101 +0,0 @@ -package middleware - -import ( - "context" - "io" - "strconv" - "time" - - "github.com/prometheus/client_golang/prometheus" - grpcUtils "github.com/weaveworks/common/grpc" - "github.com/weaveworks/common/httpgrpc" - "google.golang.org/grpc" - "google.golang.org/grpc/metadata" -) - -// PrometheusGRPCUnaryInstrumentation records duration of gRPC requests client side. -func PrometheusGRPCUnaryInstrumentation(metric *prometheus.HistogramVec) grpc.UnaryClientInterceptor { - return func(ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - start := time.Now() - err := invoker(ctx, method, req, resp, cc, opts...) - metric.WithLabelValues(method, errorCode(err)).Observe(time.Since(start).Seconds()) - return err - } -} - -// PrometheusGRPCStreamInstrumentation records duration of streaming gRPC requests client side. -func PrometheusGRPCStreamInstrumentation(metric *prometheus.HistogramVec) grpc.StreamClientInterceptor { - return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, - streamer grpc.Streamer, opts ...grpc.CallOption, - ) (grpc.ClientStream, error) { - start := time.Now() - stream, err := streamer(ctx, desc, cc, method, opts...) - return &instrumentedClientStream{ - metric: metric, - start: start, - method: method, - ClientStream: stream, - }, err - } -} - -type instrumentedClientStream struct { - metric *prometheus.HistogramVec - start time.Time - method string - grpc.ClientStream -} - -func (s *instrumentedClientStream) SendMsg(m interface{}) error { - err := s.ClientStream.SendMsg(m) - if err == nil { - return err - } - - if err == io.EOF { - s.metric.WithLabelValues(s.method, errorCode(nil)).Observe(time.Since(s.start).Seconds()) - } else { - s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(s.start).Seconds()) - } - - return err -} - -func (s *instrumentedClientStream) RecvMsg(m interface{}) error { - err := s.ClientStream.RecvMsg(m) - if err == nil { - return err - } - - if err == io.EOF { - s.metric.WithLabelValues(s.method, errorCode(nil)).Observe(time.Since(s.start).Seconds()) - } else { - s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(s.start).Seconds()) - } - - return err -} - -func (s *instrumentedClientStream) Header() (metadata.MD, error) { - md, err := s.ClientStream.Header() - if err != nil { - s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(s.start).Seconds()) - } - return md, err -} - -func errorCode(err error) string { - respStatus := "2xx" - if err != nil { - if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok { - statusFamily := int(errResp.Code / 100) - respStatus = strconv.Itoa(statusFamily) + "xx" - } else if grpcUtils.IsCanceled(err) { - respStatus = "cancel" - } else { - respStatus = "error" - } - } - - return respStatus -} diff --git a/vendor/github.com/grafana/dskit/modules/module_service.go b/vendor/github.com/grafana/dskit/modules/module_service.go index b27d0244ab7fa..8ca4e25714de4 100644 --- a/vendor/github.com/grafana/dskit/modules/module_service.go +++ b/vendor/github.com/grafana/dskit/modules/module_service.go @@ -27,6 +27,16 @@ type moduleService struct { startDeps, stopDeps func(string) map[string]services.Service } +type delegatedNamedService struct { + services.Service + + delegate services.NamedService +} + +func (n delegatedNamedService) ServiceName() string { + return n.delegate.ServiceName() +} + // NewModuleService wraps a module service, and makes sure that dependencies are started/stopped before module service starts or stops. // If any dependency fails to start, this service fails as well. // On stop, errors from failed dependencies are ignored. @@ -40,6 +50,14 @@ func NewModuleService(name string, logger log.Logger, service services.Service, } w.Service = services.NewBasicService(w.start, w.run, w.stop) + + if namedService, isNamed := service.(services.NamedService); isNamed { + // return a value that implements services.NamedService only if the wrapped service implements services.NamedService + return delegatedNamedService{ + Service: w, + delegate: namedService, + } + } return w } diff --git a/vendor/github.com/grafana/dskit/netutil/netutil.go b/vendor/github.com/grafana/dskit/netutil/netutil.go index a1b7c1d40f67d..232317d4b5439 100644 --- a/vendor/github.com/grafana/dskit/netutil/netutil.go +++ b/vendor/github.com/grafana/dskit/netutil/netutil.go @@ -2,6 +2,7 @@ package netutil import ( "net" + "strings" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -52,6 +53,6 @@ func privateNetworkInterfaces(all []net.Interface, fallback []string, logger log if len(privInts) == 0 { return fallback } - level.Debug(logger).Log("msg", "found network interfaces with private IP addresses assigned", "interfaces", privInts) + level.Debug(logger).Log("msg", "found network interfaces with private IP addresses assigned", "interfaces", strings.Join(privInts, " ")) return privInts } diff --git a/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go b/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go index 32775c98291ce..780926c0d755b 100644 --- a/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go @@ -49,6 +49,7 @@ type BasicLifecyclerConfig struct { Zone string HeartbeatPeriod time.Duration + HeartbeatTimeout time.Duration TokensObservePeriod time.Duration NumTokens int @@ -405,8 +406,13 @@ func (l *BasicLifecycler) updateInstance(ctx context.Context, update func(*Desc, // This could happen if the backend store restarted (and content deleted) // or the instance has been forgotten. In this case, we do re-insert it. if !ok { - level.Warn(l.logger).Log("msg", "instance missing in the ring, adding it back", "ring", l.ringName) - instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, l.GetTokens(), l.GetState(), l.GetRegisteredAt()) + level.Warn(l.logger).Log("msg", "instance is missing in the ring (e.g. the ring backend storage has been reset), registering the instance with an updated registration timestamp", "ring", l.ringName) + + // Due to how shuffle sharding work, the missing instance for some period of time could have cause + // a resharding of tenants among instances: to guarantee query correctness we need to update the + // registration timestamp to current time. + registeredAt := time.Now() + instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, l.GetTokens(), l.GetState(), registeredAt) } prevTimestamp := instanceDesc.Timestamp @@ -507,5 +513,5 @@ func (l *BasicLifecycler) getRing(ctx context.Context) (*Desc, error) { } func (l *BasicLifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - newRingPageHandler(l, l.cfg.HeartbeatPeriod).handle(w, req) + newRingPageHandler(l, l.cfg.HeartbeatTimeout).handle(w, req) } diff --git a/vendor/github.com/grafana/dskit/ring/basic_lifecycler_delegates.go b/vendor/github.com/grafana/dskit/ring/basic_lifecycler_delegates.go index 26e3cfa41ddd2..177962f697dc4 100644 --- a/vendor/github.com/grafana/dskit/ring/basic_lifecycler_delegates.go +++ b/vendor/github.com/grafana/dskit/ring/basic_lifecycler_delegates.go @@ -150,3 +150,38 @@ func (d *AutoForgetDelegate) OnRingInstanceHeartbeat(lifecycler *BasicLifecycler d.next.OnRingInstanceHeartbeat(lifecycler, ringDesc, instanceDesc) } + +// InstanceRegisterDelegate generates a new set of tokenCount tokens on instance register, and returns the registerState InstanceState. +type InstanceRegisterDelegate struct { + registerState InstanceState + tokenCount int +} + +func NewInstanceRegisterDelegate(state InstanceState, tokenCount int) InstanceRegisterDelegate { + return InstanceRegisterDelegate{ + registerState: state, + tokenCount: tokenCount, + } +} + +func (d InstanceRegisterDelegate) OnRingInstanceRegister(_ *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc InstanceDesc) (InstanceState, Tokens) { + // Keep the existing tokens if any, otherwise start with a clean situation. + var tokens []uint32 + if instanceExists { + tokens = instanceDesc.GetTokens() + } + + takenTokens := ringDesc.GetTokens() + newTokens := GenerateTokens(d.tokenCount-len(tokens), takenTokens) + + // Tokens sorting will be enforced by the parent caller. + tokens = append(tokens, newTokens...) + + return d.registerState, tokens +} + +func (d InstanceRegisterDelegate) OnRingInstanceTokens(*BasicLifecycler, Tokens) {} + +func (d InstanceRegisterDelegate) OnRingInstanceStopping(*BasicLifecycler) {} + +func (d InstanceRegisterDelegate) OnRingInstanceHeartbeat(*BasicLifecycler, *Desc, *InstanceDesc) {} diff --git a/vendor/github.com/grafana/dskit/ring/http.go b/vendor/github.com/grafana/dskit/ring/http.go index 18a56177cbf2c..26d28e3e5d05a 100644 --- a/vendor/github.com/grafana/dskit/ring/http.go +++ b/vendor/github.com/grafana/dskit/ring/http.go @@ -53,14 +53,14 @@ type ringAccess interface { } type ringPageHandler struct { - r ringAccess - heartbeatPeriod time.Duration + r ringAccess + heartbeatTimeout time.Duration } -func newRingPageHandler(r ringAccess, heartbeatPeriod time.Duration) *ringPageHandler { +func newRingPageHandler(r ringAccess, heartbeatTimeout time.Duration) *ringPageHandler { return &ringPageHandler{ - r: r, - heartbeatPeriod: heartbeatPeriod, + r: r, + heartbeatTimeout: heartbeatTimeout, } } @@ -93,7 +93,7 @@ func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) return } - _, ownedTokens := ringDesc.countTokens() + ownedTokens := ringDesc.countTokens() var ingesterIDs []string for id := range ringDesc.Ingesters { @@ -106,7 +106,7 @@ func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) { for _, id := range ingesterIDs { ing := ringDesc.Ingesters[id] state := ing.State.String() - if !ing.IsHealthy(Reporting, h.heartbeatPeriod, now) { + if !ing.IsHealthy(Reporting, h.heartbeatTimeout, now) { state = "UNHEALTHY" } diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler.go b/vendor/github.com/grafana/dskit/ring/lifecycler.go index 602e1fdb58af8..dfef6afb64278 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler.go @@ -13,7 +13,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" - perrors "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" @@ -30,6 +29,7 @@ type LifecyclerConfig struct { // Config for the ingester lifecycle control NumTokens int `yaml:"num_tokens" category:"advanced"` HeartbeatPeriod time.Duration `yaml:"heartbeat_period" category:"advanced"` + HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout" category:"advanced"` ObservePeriod time.Duration `yaml:"observe_period" category:"advanced"` JoinAfter time.Duration `yaml:"join_after" category:"advanced"` MinReadyDuration time.Duration `yaml:"min_ready_duration" category:"advanced"` @@ -71,6 +71,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag f.IntVar(&cfg.NumTokens, prefix+"num-tokens", 128, "Number of tokens for each ingester.") f.DurationVar(&cfg.HeartbeatPeriod, prefix+"heartbeat-period", 5*time.Second, "Period at which to heartbeat to consul. 0 = disabled.") + f.DurationVar(&cfg.HeartbeatTimeout, prefix+"heartbeat-timeout", 1*time.Minute, "Heartbeat timeout after which instance is assumed to be unhealthy. 0 = disabled.") f.DurationVar(&cfg.JoinAfter, prefix+"join-after", 0*time.Second, "Period to wait for a claim from another member; will join automatically after this.") f.DurationVar(&cfg.ObservePeriod, prefix+"observe-period", 0*time.Second, "Observe tokens after generating to resolve collisions. Useful when using gossiping ring.") f.DurationVar(&cfg.MinReadyDuration, prefix+"min-ready-duration", 15*time.Second, "Minimum duration to wait after the internal readiness checks have passed but before succeeding the readiness endpoint. This is used to slowdown deployment controllers (eg. Kubernetes) after an instance is ready and before they proceed with a rolling update, to give the rest of the cluster instances enough time to receive ring updates.") @@ -110,8 +111,9 @@ type Lifecycler struct { Zone string // Whether to flush if transfer fails on shutdown. - flushOnShutdown *atomic.Bool - unregisterOnShutdown *atomic.Bool + flushOnShutdown *atomic.Bool + unregisterOnShutdown *atomic.Bool + clearTokensOnShutdown *atomic.Bool // We need to remember the ingester state, tokens and registered timestamp just in case the KV store // goes away and comes back empty. The state changes during lifecycle of instance. @@ -160,23 +162,22 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa } l := &Lifecycler{ - cfg: cfg, - flushTransferer: flushTransferer, - KVStore: store, - Addr: fmt.Sprintf("%s:%d", addr, port), - ID: cfg.ID, - RingName: ringName, - RingKey: ringKey, - flushOnShutdown: atomic.NewBool(flushOnShutdown), - unregisterOnShutdown: atomic.NewBool(cfg.UnregisterOnShutdown), - Zone: cfg.Zone, - actorChan: make(chan func()), - state: PENDING, - lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg), - logger: logger, - } - - l.lifecyclerMetrics.tokensToOwn.Set(float64(cfg.NumTokens)) + cfg: cfg, + flushTransferer: flushTransferer, + KVStore: store, + Addr: fmt.Sprintf("%s:%d", addr, port), + ID: cfg.ID, + RingName: ringName, + RingKey: ringKey, + flushOnShutdown: atomic.NewBool(flushOnShutdown), + unregisterOnShutdown: atomic.NewBool(cfg.UnregisterOnShutdown), + clearTokensOnShutdown: atomic.NewBool(false), + Zone: cfg.Zone, + actorChan: make(chan func()), + state: PENDING, + lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg), + logger: logger, + } l.BasicService = services. NewBasicService(nil, l.loop, l.stopping). @@ -304,8 +305,6 @@ func (i *Lifecycler) getTokens() Tokens { } func (i *Lifecycler) setTokens(tokens Tokens) { - i.lifecyclerMetrics.tokensOwned.Set(float64(len(tokens))) - i.stateMtx.Lock() defer i.stateMtx.Unlock() @@ -397,7 +396,7 @@ func (i *Lifecycler) loop(ctx context.Context) error { // First, see if we exist in the cluster, update our state to match if we do, // and add ourselves (without tokens) if we don't. if err := i.initRing(context.Background()); err != nil { - return perrors.Wrapf(err, "failed to join the ring %s", i.RingName) + return errors.Wrapf(err, "failed to join the ring %s", i.RingName) } // We do various period tasks @@ -420,14 +419,14 @@ func (i *Lifecycler) loop(ctx context.Context) error { // let's observe the ring. By using JOINING state, this ingester will be ignored by LEAVING // ingesters, but we also signal that it is not fully functional yet. if err := i.autoJoin(context.Background(), JOINING); err != nil { - return perrors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) + return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) } level.Info(i.logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName) observeChan = time.After(i.cfg.ObservePeriod) } else { if err := i.autoJoin(context.Background(), ACTIVE); err != nil { - return perrors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) + return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) } } } @@ -514,11 +513,18 @@ heartbeatLoop: if i.ShouldUnregisterOnShutdown() { if err := i.unregister(context.Background()); err != nil { - return perrors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName) + return errors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName) } level.Info(i.logger).Log("msg", "instance removed from the KV store", "ring", i.RingName) } + if i.cfg.TokensFilePath != "" && i.ClearTokensOnShutdown() { + if err := os.Remove(i.cfg.TokensFilePath); err != nil { + return errors.Wrapf(err, "failed to delete tokens file %s", i.cfg.TokensFilePath) + } + level.Info(i.logger).Log("msg", "removed tokens file from disk", "path", i.cfg.TokensFilePath) + } + return nil } @@ -594,6 +600,10 @@ func (i *Lifecycler) initRing(ctx context.Context) error { instanceDesc.State = ACTIVE } + // We're taking over this entry, update instanceDesc with our values + instanceDesc.Addr = i.Addr + instanceDesc.Zone = i.Zone + // We exist in the ring, so assume the ring is right and copy out tokens & state out of there. i.setState(instanceDesc.State) tokens, _ := ringDesc.TokensFor(i.ID) @@ -601,10 +611,9 @@ func (i *Lifecycler) initRing(ctx context.Context) error { level.Info(i.logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", len(tokens), "ring", i.RingName) - // Update the ring if the instance has been changed and the heartbeat is disabled. - // We dont need to update KV here when heartbeat is enabled as this info will eventually be update on KV - // on the next heartbeat - if i.cfg.HeartbeatPeriod == 0 && !instanceDesc.Equal(ringDesc.Ingesters[i.ID]) { + // Update the ring if the instance has been changed. We don't want to rely on heartbeat update, as heartbeat + // can be configured to long time, and until then lifecycler would not report this instance as ready in CheckReady. + if !instanceDesc.Equal(ringDesc.Ingesters[i.ID]) { // Update timestamp to give gossiping client a chance register ring change. instanceDesc.Timestamp = time.Now().Unix() ringDesc.Ingesters[i.ID] = instanceDesc @@ -738,9 +747,13 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error { } instanceDesc, ok := ringDesc.Ingesters[i.ID] + if !ok { - // consul must have restarted - level.Info(i.logger).Log("msg", "found empty ring, inserting tokens", "ring", i.RingName) + // If the instance is missing in the ring, we need to add it back. However, due to how shuffle sharding work, + // the missing instance for some period of time could have cause a resharding of tenants among instances: + // to guarantee query correctness we need to update the registration timestamp to current time. + level.Info(i.logger).Log("msg", "instance is missing in the ring (e.g. the ring backend storage has been reset), registering the instance with an updated registration timestamp", "ring", i.RingName) + i.setRegisteredAt(time.Now()) ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt()) } else { instanceDesc.Timestamp = time.Now().Unix() @@ -825,8 +838,20 @@ func (i *Lifecycler) SetUnregisterOnShutdown(enabled bool) { i.unregisterOnShutdown.Store(enabled) } +// ClearTokensOnShutdown returns if persisted tokens should be cleared on shutdown. +func (i *Lifecycler) ClearTokensOnShutdown() bool { + return i.clearTokensOnShutdown.Load() +} + +// SetClearTokensOnShutdown enables/disables deletions of tokens on shutdown. +// Set to `true` in case one wants to clear tokens on shutdown which are +// otherwise persisted, e.g. useful in custom shutdown handlers. +func (i *Lifecycler) SetClearTokensOnShutdown(enabled bool) { + i.clearTokensOnShutdown.Store(enabled) +} + func (i *Lifecycler) processShutdown(ctx context.Context) { - flushRequired := i.flushOnShutdown.Load() + flushRequired := i.FlushOnShutdown() transferStart := time.Now() if err := i.flushTransferer.TransferOut(ctx); err != nil { if err == ErrTransferDisabled { @@ -865,7 +890,7 @@ func (i *Lifecycler) getRing(ctx context.Context) (*Desc, error) { } func (i *Lifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - newRingPageHandler(i, i.cfg.HeartbeatPeriod).handle(w, req) + newRingPageHandler(i, i.cfg.HeartbeatTimeout).handle(w, req) } // unregister removes our entry from consul. diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go b/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go index 422a564c18b7a..fe29cdfd5fc80 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go @@ -7,8 +7,6 @@ import ( type LifecyclerMetrics struct { consulHeartbeats prometheus.Counter - tokensOwned prometheus.Gauge - tokensToOwn prometheus.Gauge shutdownDuration *prometheus.HistogramVec } @@ -19,16 +17,6 @@ func NewLifecyclerMetrics(ringName string, reg prometheus.Registerer) *Lifecycle Help: "The total number of heartbeats sent to consul.", ConstLabels: prometheus.Labels{"name": ringName}, }), - tokensOwned: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "member_ring_tokens_owned", - Help: "The number of tokens owned in the ring.", - ConstLabels: prometheus.Labels{"name": ringName}, - }), - tokensToOwn: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "member_ring_tokens_to_own", - Help: "The number of tokens to own in the ring.", - ConstLabels: prometheus.Labels{"name": ringName}, - }), shutdownDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "shutdown_duration_seconds", Help: "Duration (in seconds) of shutdown procedure (ie transfer or flush).", diff --git a/vendor/github.com/grafana/dskit/ring/model.go b/vendor/github.com/grafana/dskit/ring/model.go index 4166d9e6f8ed7..fd58e534cb76c 100644 --- a/vendor/github.com/grafana/dskit/ring/model.go +++ b/vendor/github.com/grafana/dskit/ring/model.go @@ -489,6 +489,27 @@ func (d *Desc) getTokensByZone() map[string][]uint32 { return MergeTokensByZone(zones) } +// getOldestRegisteredTimestamp returns unix timestamp of oldest "RegisteredTimestamp" value from all instances. +// If any instance has 0 value of RegisteredTimestamp, this function returns 0. +func (d *Desc) getOldestRegisteredTimestamp() int64 { + var result int64 + + for _, instance := range d.Ingesters { + switch { + case instance.RegisteredTimestamp == 0: + return 0 + + case result == 0: + result = instance.RegisteredTimestamp + + case instance.RegisteredTimestamp < result: + result = instance.RegisteredTimestamp + } + } + + return result +} + type CompareResult int // CompareResult responses diff --git a/vendor/github.com/grafana/dskit/ring/ring.go b/vendor/github.com/grafana/dskit/ring/ring.go index 6c7e4a49fc0d3..1f7c2c928d3a8 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.go +++ b/vendor/github.com/grafana/dskit/ring/ring.go @@ -167,6 +167,10 @@ type Ring struct { ringTokens []uint32 ringTokensByZone map[string][]uint32 + // Oldest value of RegisteredTimestamp from all instances. If any instance had RegisteredTimestamp == 0, + // then this value will be 0. + oldestRegisteredTimestamp int64 + // Maps a token with the information of the instance holding it. This map is immutable and // cannot be chanced in place because it's shared "as is" between subrings (the only way to // change it is to create a new one and replace it). @@ -183,12 +187,9 @@ type Ring struct { // If set to nil, no caching is done (used by tests, and subrings). shuffledSubringCache map[subringCacheKey]*Ring - memberOwnershipGaugeVec *prometheus.GaugeVec numMembersGaugeVec *prometheus.GaugeVec totalTokensGauge prometheus.Gauge - numTokensGaugeVec *prometheus.GaugeVec oldestTimestampGaugeVec *prometheus.GaugeVec - reportedOwners map[string]struct{} logger log.Logger } @@ -227,11 +228,6 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client strategy: strategy, ringDesc: &Desc{}, shuffledSubringCache: map[subringCacheKey]*Ring{}, - memberOwnershipGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "ring_member_ownership_percent", - Help: "The percent ownership of the ring by member", - ConstLabels: map[string]string{"name": name}}, - []string{"member"}), numMembersGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "ring_members", Help: "Number of members in the ring", @@ -241,11 +237,6 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client Name: "ring_tokens_total", Help: "Number of tokens in the ring", ConstLabels: map[string]string{"name": name}}), - numTokensGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "ring_tokens_owned", - Help: "The number of tokens in the ring owned by the member", - ConstLabels: map[string]string{"name": name}}, - []string{"member"}), oldestTimestampGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "ring_oldest_member_timestamp", Help: "Timestamp of the oldest member in the ring.", @@ -323,6 +314,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) { ringTokensByZone := ringDesc.getTokensByZone() ringInstanceByToken := ringDesc.getTokensInfo() ringZones := getZones(ringTokensByZone) + oldestRegisteredTimestamp := ringDesc.getOldestRegisteredTimestamp() r.mtx.Lock() defer r.mtx.Unlock() @@ -331,6 +323,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) { r.ringTokensByZone = ringTokensByZone r.ringInstanceByToken = ringInstanceByToken r.ringZones = ringZones + r.oldestRegisteredTimestamp = oldestRegisteredTimestamp r.lastTopologyChange = now if r.shuffledSubringCache != nil { // Invalidate all cached subrings. @@ -514,12 +507,10 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro }, nil } -// countTokens returns the number of tokens and tokens within the range for each instance. -func (r *Desc) countTokens() (map[string]uint32, map[string]uint32) { +// countTokens returns the number tokens within the range for each instance. +func (r *Desc) countTokens() map[string]uint32 { var ( - owned = map[string]uint32{} - numTokens = map[string]uint32{} - + owned = map[string]uint32{} ringTokens = r.GetTokens() ringInstanceByToken = r.getTokensInfo() ) @@ -535,7 +526,6 @@ func (r *Desc) countTokens() (map[string]uint32, map[string]uint32) { } info := ringInstanceByToken[token] - numTokens[info.InstanceID] = numTokens[info.InstanceID] + 1 owned[info.InstanceID] = owned[info.InstanceID] + diff } @@ -543,11 +533,10 @@ func (r *Desc) countTokens() (map[string]uint32, map[string]uint32) { for id := range r.Ingesters { if _, ok := owned[id]; !ok { owned[id] = 0 - numTokens[id] = 0 } } - return numTokens, owned + return owned } // updateRingMetrics updates ring metrics. Caller must be holding the Write lock! @@ -587,21 +576,6 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) { return } - prevOwners := r.reportedOwners - r.reportedOwners = make(map[string]struct{}) - numTokens, ownedRange := r.ringDesc.countTokens() - for id, totalOwned := range ownedRange { - r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32)) - r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id])) - delete(prevOwners, id) - r.reportedOwners[id] = struct{}{} - } - - for k := range prevOwners { - r.memberOwnershipGaugeVec.DeleteLabelValues(k) - r.numTokensGaugeVec.DeleteLabelValues(k) - } - r.totalTokensGauge.Set(float64(len(r.ringTokens))) } @@ -635,8 +609,11 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { } result := r.shuffleShard(identifier, size, 0, time.Now()) - - r.setCachedShuffledSubring(identifier, size, result) + // Only cache subring if it is different from this ring, to avoid deadlocks in getCachedShuffledSubring, + // when we update the cached ring. + if result != r { + r.setCachedShuffledSubring(identifier, size, result) + } return result } @@ -662,6 +639,16 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur r.mtx.RLock() defer r.mtx.RUnlock() + // If all instances have RegisteredTimestamp within the lookback period, + // then all instances would be included in the resulting ring, so we can + // simply return this ring. + // + // If any instance had RegisteredTimestamp equal to 0 (it would not cause additional lookup of next instance), + // then r.oldestRegisteredTimestamp is zero too, and we skip this optimization. + if lookbackPeriod > 0 && r.oldestRegisteredTimestamp > 0 && r.oldestRegisteredTimestamp >= lookbackUntil { + return r + } + var numInstancesPerZone int var actualZones []string @@ -753,6 +740,8 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur ringTokensByZone: shardTokensByZone, ringZones: getZones(shardTokensByZone), + oldestRegisteredTimestamp: shardDesc.getOldestRegisteredTimestamp(), + // We reference the original map as is in order to avoid copying. It's safe to do // because this map is immutable by design and it's a superset of the actual instances // with the subring. @@ -802,6 +791,11 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int) *Ring { return nil } + // No need to update cached subring, if it is the original ring itself. + if r == cached { + return cached + } + cached.mtx.Lock() defer cached.mtx.Unlock() diff --git a/vendor/github.com/grafana/dskit/services/basic_service.go b/vendor/github.com/grafana/dskit/services/basic_service.go index ead611a3f97c7..6ced33aabf924 100644 --- a/vendor/github.com/grafana/dskit/services/basic_service.go +++ b/vendor/github.com/grafana/dskit/services/basic_service.go @@ -15,7 +15,7 @@ import ( type StartingFn func(serviceContext context.Context) error // RunningFn function is called when service enters Running state. When it returns, service will move to Stopping state. -// If RunningFn or Stopping return error, Service will end in Failed state, otherwise if both functions return without +// If RunningFn or StoppingFn return error, Service will end in Failed state, otherwise if both functions return without // error, service will end in Terminated state. type RunningFn func(serviceContext context.Context) error diff --git a/vendor/github.com/grafana/dskit/services/service.go b/vendor/github.com/grafana/dskit/services/service.go index c559ef96a3512..6170951a10663 100644 --- a/vendor/github.com/grafana/dskit/services/service.go +++ b/vendor/github.com/grafana/dskit/services/service.go @@ -100,6 +100,8 @@ type NamedService interface { Service // ServiceName returns name of the service, if it has one. + // Subsequent calls to ServiceName can return different values, + // for example service may update its name based on its state. ServiceName() string } diff --git a/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go b/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go index 91876a7281575..a639460bbe6f3 100644 --- a/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go +++ b/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go @@ -97,11 +97,9 @@ func (s *SpanLogger) Error(err error) error { } func withContext(ctx context.Context, logger log.Logger, resolver TenantResolver) log.Logger { - // Weaveworks uses "orgs" and "orgID" to represent Cortex users, - // even though the code-base generally uses `userID` to refer to the same thing. userID, err := resolver.TenantID(ctx) if err == nil && userID != "" { - logger = log.With(logger, "org_id", userID) + logger = log.With(logger, "user", userID) } traceID, ok := tracing.ExtractSampledTraceID(ctx) diff --git a/vendor/github.com/hashicorp/memberlist/net.go b/vendor/github.com/hashicorp/memberlist/net.go index 66c1dcd94d27b..609e01dd9d031 100644 --- a/vendor/github.com/hashicorp/memberlist/net.go +++ b/vendor/github.com/hashicorp/memberlist/net.go @@ -249,10 +249,6 @@ func (m *Memberlist) handleConn(conn net.Conn) { } if m.config.SkipInboundLabelCheck { - if streamLabel != "" { - m.logger.Printf("[ERR] memberlist: unexpected double stream label header: %s", LogConn(conn)) - return - } // Set this from config so that the auth data assertions work below. streamLabel = m.config.Label } @@ -369,10 +365,6 @@ func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time } if m.config.SkipInboundLabelCheck { - if packetLabel != "" { - m.logger.Printf("[ERR] memberlist: unexpected double packet label header: %s", LogAddress(from)) - return - } // Set this from config so that the auth data assertions work below. packetLabel = m.config.Label } diff --git a/vendor/github.com/prometheus/node_exporter/LICENSE b/vendor/github.com/prometheus/exporter-toolkit/LICENSE similarity index 100% rename from vendor/github.com/prometheus/node_exporter/LICENSE rename to vendor/github.com/prometheus/exporter-toolkit/LICENSE diff --git a/vendor/github.com/prometheus/exporter-toolkit/web/README.md b/vendor/github.com/prometheus/exporter-toolkit/web/README.md new file mode 100644 index 0000000000000..1e16644bedcb9 --- /dev/null +++ b/vendor/github.com/prometheus/exporter-toolkit/web/README.md @@ -0,0 +1,10 @@ +# web package + +This package can be used by Prometheus exporters to enable TLS and +authentication. + +We actively encourage the community to use this repository, to provide a +consistent experience across the ecosystem. + +Developers documentation can be found on +[pkg.go.dev](https://pkg.go.dev/github.com/prometheus/exporter-toolkit/). diff --git a/vendor/github.com/prometheus/exporter-toolkit/web/cache.go b/vendor/github.com/prometheus/exporter-toolkit/web/cache.go new file mode 100644 index 0000000000000..9425e7ac9186c --- /dev/null +++ b/vendor/github.com/prometheus/exporter-toolkit/web/cache.go @@ -0,0 +1,91 @@ +// Copyright 2021 The Prometheus Authors +// This code is partly borrowed from Caddy: +// Copyright 2015 Matthew Holt and The Caddy Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package web + +import ( + weakrand "math/rand" + "sync" + "time" +) + +var cacheSize = 100 + +func init() { + weakrand.Seed(time.Now().UnixNano()) +} + +type cache struct { + cache map[string]bool + mtx sync.Mutex +} + +// newCache returns a cache that contains a mapping of plaintext passwords +// to their hashes (with random eviction). This can greatly improve the +// performance of traffic-heavy servers that use secure password hashing +// algorithms, with the downside that plaintext passwords will be stored in +// memory for a longer time (this should not be a problem as long as your +// machine is not compromised, at which point all bets are off, since basicauth +// necessitates plaintext passwords being received over the wire anyway). +func newCache() *cache { + return &cache{ + cache: make(map[string]bool), + } +} + +func (c *cache) get(key string) (bool, bool) { + c.mtx.Lock() + defer c.mtx.Unlock() + v, ok := c.cache[key] + return v, ok +} + +func (c *cache) set(key string, value bool) { + c.mtx.Lock() + defer c.mtx.Unlock() + c.makeRoom() + c.cache[key] = value +} + +func (c *cache) makeRoom() { + if len(c.cache) < cacheSize { + return + } + // We delete more than just 1 entry so that we don't have + // to do this on every request; assuming the capacity of + // the cache is on a long tail, we can save a lot of CPU + // time by doing a whole bunch of deletions now and then + // we won't have to do them again for a while. + numToDelete := len(c.cache) / 10 + if numToDelete < 1 { + numToDelete = 1 + } + for deleted := 0; deleted <= numToDelete; deleted++ { + // Go maps are "nondeterministic" not actually random, + // so although we could just chop off the "front" of the + // map with less code, this is a heavily skewed eviction + // strategy; generating random numbers is cheap and + // ensures a much better distribution. + rnd := weakrand.Intn(len(c.cache)) + i := 0 + for key := range c.cache { + if i == rnd { + delete(c.cache, key) + break + } + i++ + } + } +} diff --git a/vendor/github.com/prometheus/exporter-toolkit/web/handler.go b/vendor/github.com/prometheus/exporter-toolkit/web/handler.go new file mode 100644 index 0000000000000..ae3ebc03b97f6 --- /dev/null +++ b/vendor/github.com/prometheus/exporter-toolkit/web/handler.go @@ -0,0 +1,137 @@ +// Copyright 2020 The Prometheus Authors +// This code is partly borrowed from Caddy: +// Copyright 2015 Matthew Holt and The Caddy Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package web + +import ( + "encoding/hex" + "fmt" + "net/http" + "sync" + + "github.com/go-kit/log" + "golang.org/x/crypto/bcrypt" +) + +// extraHTTPHeaders is a map of HTTP headers that can be added to HTTP +// responses. +// This is private on purpose to ensure consistency in the Prometheus ecosystem. +var extraHTTPHeaders = map[string][]string{ + "Strict-Transport-Security": nil, + "X-Content-Type-Options": {"nosniff"}, + "X-Frame-Options": {"deny", "sameorigin"}, + "X-XSS-Protection": nil, + "Content-Security-Policy": nil, +} + +func validateUsers(configPath string) error { + c, err := getConfig(configPath) + if err != nil { + return err + } + + for _, p := range c.Users { + _, err = bcrypt.Cost([]byte(p)) + if err != nil { + return err + } + } + + return nil +} + +// validateHeaderConfig checks that the provided header configuration is correct. +// It does not check the validity of all the values, only the ones which are +// well-defined enumerations. +func validateHeaderConfig(headers map[string]string) error { +HeadersLoop: + for k, v := range headers { + values, ok := extraHTTPHeaders[k] + if !ok { + return fmt.Errorf("HTTP header %q can not be configured", k) + } + for _, allowedValue := range values { + if v == allowedValue { + continue HeadersLoop + } + } + if len(values) > 0 { + return fmt.Errorf("invalid value for %s. Expected one of: %q, but got: %q", k, values, v) + } + } + return nil +} + +type webHandler struct { + tlsConfigPath string + handler http.Handler + logger log.Logger + cache *cache + // bcryptMtx is there to ensure that bcrypt.CompareHashAndPassword is run + // only once in parallel as this is CPU intensive. + bcryptMtx sync.Mutex +} + +func (u *webHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + c, err := getConfig(u.tlsConfigPath) + if err != nil { + u.logger.Log("msg", "Unable to parse configuration", "err", err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + + // Configure http headers. + for k, v := range c.HTTPConfig.Header { + w.Header().Set(k, v) + } + + if len(c.Users) == 0 { + u.handler.ServeHTTP(w, r) + return + } + + user, pass, auth := r.BasicAuth() + if auth { + hashedPassword, validUser := c.Users[user] + + if !validUser { + // The user is not found. Use a fixed password hash to + // prevent user enumeration by timing requests. + // This is a bcrypt-hashed version of "fakepassword". + hashedPassword = "$2y$10$QOauhQNbBCuQDKes6eFzPeMqBSjb7Mr5DUmpZ/VcEd00UAV/LDeSi" + } + + cacheKey := hex.EncodeToString(append(append([]byte(user), []byte(hashedPassword)...), []byte(pass)...)) + authOk, ok := u.cache.get(cacheKey) + + if !ok { + // This user, hashedPassword, password is not cached. + u.bcryptMtx.Lock() + err := bcrypt.CompareHashAndPassword([]byte(hashedPassword), []byte(pass)) + u.bcryptMtx.Unlock() + + authOk = err == nil + u.cache.set(cacheKey, authOk) + } + + if authOk && validUser { + u.handler.ServeHTTP(w, r) + return + } + } + + w.Header().Set("WWW-Authenticate", "Basic") + http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) +} diff --git a/vendor/github.com/prometheus/exporter-toolkit/web/tls_config.go b/vendor/github.com/prometheus/exporter-toolkit/web/tls_config.go new file mode 100644 index 0000000000000..328c5e0efa60b --- /dev/null +++ b/vendor/github.com/prometheus/exporter-toolkit/web/tls_config.go @@ -0,0 +1,361 @@ +// Copyright 2019 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package web + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + "net" + "net/http" + "path/filepath" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + config_util "github.com/prometheus/common/config" + "gopkg.in/yaml.v2" +) + +var ( + errNoTLSConfig = errors.New("TLS config is not present") +) + +type Config struct { + TLSConfig TLSStruct `yaml:"tls_server_config"` + HTTPConfig HTTPStruct `yaml:"http_server_config"` + Users map[string]config_util.Secret `yaml:"basic_auth_users"` +} + +type TLSStruct struct { + TLSCertPath string `yaml:"cert_file"` + TLSKeyPath string `yaml:"key_file"` + ClientAuth string `yaml:"client_auth_type"` + ClientCAs string `yaml:"client_ca_file"` + CipherSuites []cipher `yaml:"cipher_suites"` + CurvePreferences []curve `yaml:"curve_preferences"` + MinVersion tlsVersion `yaml:"min_version"` + MaxVersion tlsVersion `yaml:"max_version"` + PreferServerCipherSuites bool `yaml:"prefer_server_cipher_suites"` +} + +// SetDirectory joins any relative file paths with dir. +func (t *TLSStruct) SetDirectory(dir string) { + t.TLSCertPath = config_util.JoinDir(dir, t.TLSCertPath) + t.TLSKeyPath = config_util.JoinDir(dir, t.TLSKeyPath) + t.ClientCAs = config_util.JoinDir(dir, t.ClientCAs) +} + +type HTTPStruct struct { + HTTP2 bool `yaml:"http2"` + Header map[string]string `yaml:"headers,omitempty"` +} + +func getConfig(configPath string) (*Config, error) { + content, err := ioutil.ReadFile(configPath) + if err != nil { + return nil, err + } + c := &Config{ + TLSConfig: TLSStruct{ + MinVersion: tls.VersionTLS12, + MaxVersion: tls.VersionTLS13, + PreferServerCipherSuites: true, + }, + HTTPConfig: HTTPStruct{HTTP2: true}, + } + err = yaml.UnmarshalStrict(content, c) + if err == nil { + err = validateHeaderConfig(c.HTTPConfig.Header) + } + c.TLSConfig.SetDirectory(filepath.Dir(configPath)) + return c, err +} + +func getTLSConfig(configPath string) (*tls.Config, error) { + c, err := getConfig(configPath) + if err != nil { + return nil, err + } + return ConfigToTLSConfig(&c.TLSConfig) +} + +// ConfigToTLSConfig generates the golang tls.Config from the TLSStruct config. +func ConfigToTLSConfig(c *TLSStruct) (*tls.Config, error) { + if c.TLSCertPath == "" && c.TLSKeyPath == "" && c.ClientAuth == "" && c.ClientCAs == "" { + return nil, errNoTLSConfig + } + + if c.TLSCertPath == "" { + return nil, errors.New("missing cert_file") + } + + if c.TLSKeyPath == "" { + return nil, errors.New("missing key_file") + } + + loadCert := func() (*tls.Certificate, error) { + cert, err := tls.LoadX509KeyPair(c.TLSCertPath, c.TLSKeyPath) + if err != nil { + return nil, errors.Wrap(err, "failed to load X509KeyPair") + } + return &cert, nil + } + + // Confirm that certificate and key paths are valid. + if _, err := loadCert(); err != nil { + return nil, err + } + + cfg := &tls.Config{ + MinVersion: (uint16)(c.MinVersion), + MaxVersion: (uint16)(c.MaxVersion), + PreferServerCipherSuites: c.PreferServerCipherSuites, + } + + cfg.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) { + return loadCert() + } + + var cf []uint16 + for _, c := range c.CipherSuites { + cf = append(cf, (uint16)(c)) + } + if len(cf) > 0 { + cfg.CipherSuites = cf + } + + var cp []tls.CurveID + for _, c := range c.CurvePreferences { + cp = append(cp, (tls.CurveID)(c)) + } + if len(cp) > 0 { + cfg.CurvePreferences = cp + } + + if c.ClientCAs != "" { + clientCAPool := x509.NewCertPool() + clientCAFile, err := ioutil.ReadFile(c.ClientCAs) + if err != nil { + return nil, err + } + clientCAPool.AppendCertsFromPEM(clientCAFile) + cfg.ClientCAs = clientCAPool + } + + switch c.ClientAuth { + case "RequestClientCert": + cfg.ClientAuth = tls.RequestClientCert + case "RequireAnyClientCert", "RequireClientCert": // Preserved for backwards compatibility. + cfg.ClientAuth = tls.RequireAnyClientCert + case "VerifyClientCertIfGiven": + cfg.ClientAuth = tls.VerifyClientCertIfGiven + case "RequireAndVerifyClientCert": + cfg.ClientAuth = tls.RequireAndVerifyClientCert + case "", "NoClientCert": + cfg.ClientAuth = tls.NoClientCert + default: + return nil, errors.New("Invalid ClientAuth: " + c.ClientAuth) + } + + if c.ClientCAs != "" && cfg.ClientAuth == tls.NoClientCert { + return nil, errors.New("Client CA's have been configured without a Client Auth Policy") + } + + return cfg, nil +} + +// ListenAndServe starts the server on the given address. Based on the file +// tlsConfigPath, TLS or basic auth could be enabled. +func ListenAndServe(server *http.Server, tlsConfigPath string, logger log.Logger) error { + listener, err := net.Listen("tcp", server.Addr) + if err != nil { + return err + } + defer listener.Close() + return Serve(listener, server, tlsConfigPath, logger) +} + +// Server starts the server on the given listener. Based on the file +// tlsConfigPath, TLS or basic auth could be enabled. +func Serve(l net.Listener, server *http.Server, tlsConfigPath string, logger log.Logger) error { + if tlsConfigPath == "" { + level.Info(logger).Log("msg", "TLS is disabled.", "http2", false) + return server.Serve(l) + } + + if err := validateUsers(tlsConfigPath); err != nil { + return err + } + + // Setup basic authentication. + var handler http.Handler = http.DefaultServeMux + if server.Handler != nil { + handler = server.Handler + } + + c, err := getConfig(tlsConfigPath) + if err != nil { + return err + } + + server.Handler = &webHandler{ + tlsConfigPath: tlsConfigPath, + logger: logger, + handler: handler, + cache: newCache(), + } + + config, err := ConfigToTLSConfig(&c.TLSConfig) + switch err { + case nil: + if !c.HTTPConfig.HTTP2 { + server.TLSNextProto = make(map[string]func(*http.Server, *tls.Conn, http.Handler)) + } + // Valid TLS config. + level.Info(logger).Log("msg", "TLS is enabled.", "http2", c.HTTPConfig.HTTP2) + case errNoTLSConfig: + // No TLS config, back to plain HTTP. + level.Info(logger).Log("msg", "TLS is disabled.", "http2", false) + return server.Serve(l) + default: + // Invalid TLS config. + return err + } + + server.TLSConfig = config + + // Set the GetConfigForClient method of the HTTPS server so that the config + // and certs are reloaded on new connections. + server.TLSConfig.GetConfigForClient = func(*tls.ClientHelloInfo) (*tls.Config, error) { + config, err := getTLSConfig(tlsConfigPath) + if err != nil { + return nil, err + } + config.NextProtos = server.TLSConfig.NextProtos + return config, nil + } + return server.ServeTLS(l, "", "") +} + +// Validate configuration file by reading the configuration and the certificates. +func Validate(tlsConfigPath string) error { + if tlsConfigPath == "" { + return nil + } + if err := validateUsers(tlsConfigPath); err != nil { + return err + } + c, err := getConfig(tlsConfigPath) + if err != nil { + return err + } + _, err = ConfigToTLSConfig(&c.TLSConfig) + if err == errNoTLSConfig { + return nil + } + return err +} + +type cipher uint16 + +func (c *cipher) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + err := unmarshal((*string)(&s)) + if err != nil { + return err + } + for _, cs := range tls.CipherSuites() { + if cs.Name == s { + *c = (cipher)(cs.ID) + return nil + } + } + return errors.New("unknown cipher: " + s) +} + +func (c cipher) MarshalYAML() (interface{}, error) { + return tls.CipherSuiteName((uint16)(c)), nil +} + +type curve tls.CurveID + +var curves = map[string]curve{ + "CurveP256": (curve)(tls.CurveP256), + "CurveP384": (curve)(tls.CurveP384), + "CurveP521": (curve)(tls.CurveP521), + "X25519": (curve)(tls.X25519), +} + +func (c *curve) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + err := unmarshal((*string)(&s)) + if err != nil { + return err + } + if curveid, ok := curves[s]; ok { + *c = curveid + return nil + } + return errors.New("unknown curve: " + s) +} + +func (c *curve) MarshalYAML() (interface{}, error) { + for s, curveid := range curves { + if *c == curveid { + return s, nil + } + } + return fmt.Sprintf("%v", c), nil +} + +type tlsVersion uint16 + +var tlsVersions = map[string]tlsVersion{ + "TLS13": (tlsVersion)(tls.VersionTLS13), + "TLS12": (tlsVersion)(tls.VersionTLS12), + "TLS11": (tlsVersion)(tls.VersionTLS11), + "TLS10": (tlsVersion)(tls.VersionTLS10), +} + +func (tv *tlsVersion) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + err := unmarshal((*string)(&s)) + if err != nil { + return err + } + if v, ok := tlsVersions[s]; ok { + *tv = v + return nil + } + return errors.New("unknown TLS version: " + s) +} + +func (tv *tlsVersion) MarshalYAML() (interface{}, error) { + for s, v := range tlsVersions { + if *tv == v { + return s, nil + } + } + return fmt.Sprintf("%v", tv), nil +} + +// Listen starts the server on the given address. Based on the file +// tlsConfigPath, TLS or basic auth could be enabled. +// +// Deprecated: Use ListenAndServe instead. +func Listen(server *http.Server, tlsConfigPath string, logger log.Logger) error { + return ListenAndServe(server, tlsConfigPath, logger) +} diff --git a/vendor/github.com/prometheus/exporter-toolkit/web/web-config.yml b/vendor/github.com/prometheus/exporter-toolkit/web/web-config.yml new file mode 100644 index 0000000000000..7d40d9b70868f --- /dev/null +++ b/vendor/github.com/prometheus/exporter-toolkit/web/web-config.yml @@ -0,0 +1,6 @@ +# Minimal TLS configuration example. Additionally, a certificate and a key file +# are needed. +tls_server_config: + cert_file: server.crt + key_file: server.key + diff --git a/vendor/github.com/prometheus/node_exporter/NOTICE b/vendor/github.com/prometheus/node_exporter/NOTICE deleted file mode 100644 index 970a9b237a9f0..0000000000000 --- a/vendor/github.com/prometheus/node_exporter/NOTICE +++ /dev/null @@ -1,17 +0,0 @@ -Configurable modular Prometheus exporter for various node metrics. -Copyright 2013-2015 The Prometheus Authors - -This product includes software developed at -SoundCloud Ltd. (http://soundcloud.com/). - -The following components are included in this product: - -wifi -https://github.com/mdlayher/wifi -Copyright 2016-2017 Matt Layher -Licensed under the MIT License - -netlink -https://github.com/mdlayher/netlink -Copyright 2016-2017 Matt Layher -Licensed under the MIT License diff --git a/vendor/github.com/prometheus/node_exporter/https/README.md b/vendor/github.com/prometheus/node_exporter/https/README.md deleted file mode 100644 index e8e4504c9164b..0000000000000 --- a/vendor/github.com/prometheus/node_exporter/https/README.md +++ /dev/null @@ -1,28 +0,0 @@ -# HTTPS Package for Prometheus - -The `https` directory contains a Go package and a sample configuration file for -running `node_exporter` with HTTPS instead of HTTP. We currently support TLS 1.3 -and TLS 1.2. - -To run a server with TLS, use the flag `--web.config`. - -e.g. `./node_exporter --web.config="web-config.yml"` -If the config is kept within the https directory. - -The config file should be written in YAML format, and is reloaded on each connection to check for new certificates and/or authentication policy. - -## Sample Config - -``` -tls_config: - # Certificate and key files for server to use to authenticate to client - cert_file: - key_file: - - # Server policy for client authentication. Maps to ClientAuth Policies - # For more detail on clientAuth options: [ClientAuthType](https://golang.org/pkg/crypto/tls/#ClientAuthType) - [ client_auth_type: | default = "NoClientCert" ] - - # CA certificate for client certificate authentication to the server - [ client_ca_file: ] -``` diff --git a/vendor/github.com/prometheus/node_exporter/https/tls_config.go b/vendor/github.com/prometheus/node_exporter/https/tls_config.go deleted file mode 100644 index 4b2986272e574..0000000000000 --- a/vendor/github.com/prometheus/node_exporter/https/tls_config.go +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright 2019 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package https allows the implementation of TLS. -package https - -import ( - "crypto/tls" - "crypto/x509" - "io/ioutil" - "net/http" - - "github.com/pkg/errors" - "gopkg.in/yaml.v2" -) - -type Config struct { - TLSConfig TLSStruct `yaml:"tls_config"` -} - -type TLSStruct struct { - TLSCertPath string `yaml:"cert_file"` - TLSKeyPath string `yaml:"key_file"` - ClientAuth string `yaml:"client_auth_type"` - ClientCAs string `yaml:"client_ca_file"` -} - -func getTLSConfig(configPath string) (*tls.Config, error) { - content, err := ioutil.ReadFile(configPath) - if err != nil { - return nil, err - } - c := &Config{} - err = yaml.Unmarshal(content, c) - if err != nil { - return nil, err - } - return ConfigToTLSConfig(&c.TLSConfig) -} - -// ConfigToTLSConfig generates the golang tls.Config from the TLSStruct config. -func ConfigToTLSConfig(c *TLSStruct) (*tls.Config, error) { - cfg := &tls.Config{ - MinVersion: tls.VersionTLS12, - } - if len(c.TLSCertPath) == 0 { - return nil, errors.New("missing TLSCertPath") - } - if len(c.TLSKeyPath) == 0 { - return nil, errors.New("missing TLSKeyPath") - } - loadCert := func() (*tls.Certificate, error) { - cert, err := tls.LoadX509KeyPair(c.TLSCertPath, c.TLSKeyPath) - if err != nil { - return nil, errors.Wrap(err, "failed to load X509KeyPair") - } - return &cert, nil - } - // Confirm that certificate and key paths are valid. - if _, err := loadCert(); err != nil { - return nil, err - } - cfg.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) { - return loadCert() - } - - if len(c.ClientCAs) > 0 { - clientCAPool := x509.NewCertPool() - clientCAFile, err := ioutil.ReadFile(c.ClientCAs) - if err != nil { - return nil, err - } - clientCAPool.AppendCertsFromPEM(clientCAFile) - cfg.ClientCAs = clientCAPool - } - if len(c.ClientAuth) > 0 { - switch s := (c.ClientAuth); s { - case "NoClientCert": - cfg.ClientAuth = tls.NoClientCert - case "RequestClientCert": - cfg.ClientAuth = tls.RequestClientCert - case "RequireClientCert": - cfg.ClientAuth = tls.RequireAnyClientCert - case "VerifyClientCertIfGiven": - cfg.ClientAuth = tls.VerifyClientCertIfGiven - case "RequireAndVerifyClientCert": - cfg.ClientAuth = tls.RequireAndVerifyClientCert - case "": - cfg.ClientAuth = tls.NoClientCert - default: - return nil, errors.New("Invalid ClientAuth: " + s) - } - } - if len(c.ClientCAs) > 0 && cfg.ClientAuth == tls.NoClientCert { - return nil, errors.New("Client CA's have been configured without a Client Auth Policy") - } - return cfg, nil -} - -// Listen starts the server on the given address. If tlsConfigPath isn't empty the server connection will be started using TLS. -func Listen(server *http.Server, tlsConfigPath string) error { - if (tlsConfigPath) == "" { - return server.ListenAndServe() - } - var err error - server.TLSConfig, err = getTLSConfig(tlsConfigPath) - if err != nil { - return err - } - // Set the GetConfigForClient method of the HTTPS server so that the config - // and certs are reloaded on new connections. - server.TLSConfig.GetConfigForClient = func(*tls.ClientHelloInfo) (*tls.Config, error) { - return getTLSConfig(tlsConfigPath) - } - return server.ListenAndServeTLS("", "") -} diff --git a/vendor/github.com/prometheus/node_exporter/https/web-config.yml b/vendor/github.com/prometheus/node_exporter/https/web-config.yml deleted file mode 100644 index 9937291cdcd16..0000000000000 --- a/vendor/github.com/prometheus/node_exporter/https/web-config.yml +++ /dev/null @@ -1,11 +0,0 @@ -tls_config: - # Certificate and key files for server to use to authenticate to client - cert_file: - key_file: - - # Server policy for client authentication. Maps to ClientAuth Policies - # For more detail on clientAuth options: [ClientAuthType](https://golang.org/pkg/crypto/tls/#ClientAuthType) - [ client_auth_type: | default = "NoClientCert" ] - - # CA certificate for client certificate authentication to the server - [ client_ca_file: ] diff --git a/vendor/github.com/weaveworks/common/instrument/instrument.go b/vendor/github.com/weaveworks/common/instrument/instrument.go index 07aa033c0c9ab..1a1352c842c91 100644 --- a/vendor/github.com/weaveworks/common/instrument/instrument.go +++ b/vendor/github.com/weaveworks/common/instrument/instrument.go @@ -68,9 +68,16 @@ func (c *HistogramCollector) After(ctx context.Context, method, statusCode strin // (this will always work for a HistogramVec). func ObserveWithExemplar(ctx context.Context, histogram prometheus.Observer, seconds float64) { if traceID, ok := tracing.ExtractSampledTraceID(ctx); ok { + lbls := prometheus.Labels{"traceID": traceID} + if userID, err := user.ExtractUserID(ctx); err == nil { + lbls["user"] = userID + } + if orgID, err := user.ExtractOrgID(ctx); err == nil { + lbls["organization"] = orgID + } histogram.(prometheus.ExemplarObserver).ObserveWithExemplar( seconds, - prometheus.Labels{"traceID": traceID}, + lbls, ) return } diff --git a/vendor/github.com/weaveworks/common/logging/dedupe.go b/vendor/github.com/weaveworks/common/logging/dedupe.go deleted file mode 100644 index caa523ef93f18..0000000000000 --- a/vendor/github.com/weaveworks/common/logging/dedupe.go +++ /dev/null @@ -1,137 +0,0 @@ -package logging - -import ( - "fmt" - "strings" - "sync" - "time" - - log "github.com/sirupsen/logrus" -) - -const ( - defaultDedupeInterval = time.Minute -) - -// SetupDeduplication should be performed after any other logging setup. -// For all logs less severe or equal to the given log level (but still higher than the logger's configured log level), -// these logs will be 'deduplicated'. What this means is that, excluding certain special fields like time, multiple -// identical log entries will be grouped up and a summary message emitted. -// For example, instead of: -// 00:00:00 INFO User 123 did xyz -// 00:00:10 INFO User 123 did xyz -// 00:00:25 INFO User 123 did xyz -// 00:00:55 INFO User 123 did xyz -// you would get: -// 00:00:00 INFO User 123 did xyz -// 00:01:00 INFO Repeated 3 times: User 123 did xyz -// The interval argument controls how long to wait for additional messages to arrive before reporting. -// Increase it to deduplicate more aggressively, decrease it to lower latency from a log occurring to it appearing. -// Set it to 0 to pick a sensible default value (recommended). -// NOTE: For simplicity and efficiency, fields are considered 'equal' if and only if their string representations (%v) are equal. -func SetupDeduplication(logLevel string, interval time.Duration) error { - dedupeLevel, err := log.ParseLevel(logLevel) - if err != nil { - return fmt.Errorf("Error parsing log level: %v", err) - } - if interval <= 0 { - interval = defaultDedupeInterval - } - - // We use a special Formatter to either format the log using the original formatter, or to return "" - // so nothing will be written for that event. The repeated entries are later logged along with a field flag - // that tells the formatter to ignore the message. - stdLogger := log.StandardLogger() - stdLogger.Formatter = newDedupeFormatter(stdLogger.Formatter, dedupeLevel, interval) - return nil -} - -type entryCount struct { - entry log.Entry - count int -} - -type dedupeFormatter struct { - innerFormatter log.Formatter - level log.Level - interval time.Duration - seen map[string]entryCount - lock sync.Mutex -} - -func newDedupeFormatter(innerFormatter log.Formatter, level log.Level, interval time.Duration) *dedupeFormatter { - return &dedupeFormatter{ - innerFormatter: innerFormatter, - level: level, - interval: interval, - seen: map[string]entryCount{}, - } -} - -func (f *dedupeFormatter) Format(entry *log.Entry) ([]byte, error) { - if f.shouldLog(entry) { - b, err := f.innerFormatter.Format(entry) - return b, err - } - return []byte{}, nil -} - -func (f *dedupeFormatter) shouldLog(entry *log.Entry) bool { - if _, ok := entry.Data["deduplicated"]; ok { - // ignore our own logs about deduped messages - return true - } - if entry.Level < f.level { - // ignore logs more severe than our level - return true - } - key := fmt.Sprintf("%s %s", entry.Message, fieldsToString(entry.Data)) - f.lock.Lock() - defer f.lock.Unlock() - if ec, ok := f.seen[key]; ok { - // already seen, increment count and do not log - ec.count++ - f.seen[key] = ec - return false - } - // New message, log it but add it to seen. - // We need to copy because the pointer ceases to be valid after we return from Format - f.seen[key] = entryCount{entry: *entry} - go f.evictEntry(key) // queue to evict later - return true -} - -// Wait for interval seconds then evict the entry and send the log -func (f *dedupeFormatter) evictEntry(key string) { - time.Sleep(f.interval) - var ec entryCount - func() { - f.lock.Lock() - defer f.lock.Unlock() - ec = f.seen[key] - delete(f.seen, key) - }() - if ec.count == 0 { - return - } - entry := log.WithFields(ec.entry.Data).WithField("deduplicated", ec.count) - message := fmt.Sprintf("Repeated %d times: %s", ec.count, ec.entry.Message) - // There's no way to choose the log level dynamically, so we have to do this hack - map[log.Level]func(args ...interface{}){ - log.PanicLevel: entry.Panic, - log.FatalLevel: entry.Fatal, - log.ErrorLevel: entry.Error, - log.WarnLevel: entry.Warn, - log.InfoLevel: entry.Info, - log.DebugLevel: entry.Debug, - }[ec.entry.Level](message) -} - -func fieldsToString(data log.Fields) string { - parts := make([]string, 0, len(data)) - // traversal order here is arbitrary but stable, which is fine for our purposes - for k, v := range data { - parts = append(parts, fmt.Sprintf("%s=%v", k, v)) - } - return strings.Join(parts, " ") -} diff --git a/vendor/github.com/weaveworks/common/logging/gokit.go b/vendor/github.com/weaveworks/common/logging/gokit.go index 9384d30a115bd..e4b6fdc032cd2 100644 --- a/vendor/github.com/weaveworks/common/logging/gokit.go +++ b/vendor/github.com/weaveworks/common/logging/gokit.go @@ -17,8 +17,13 @@ func NewGoKitFormat(l Level, f Format) Interface { } else { logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) } + return addStandardFields(logger, l) +} + +// stand-alone for test purposes +func addStandardFields(logger log.Logger, l Level) Interface { + logger = log.With(logger, "ts", log.DefaultTimestampUTC, "caller", log.Caller(5)) logger = level.NewFilter(logger, l.Gokit) - logger = log.With(logger, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) return gokit{logger} } @@ -36,32 +41,52 @@ type gokit struct { log.Logger } +// Helper to defer sprintf until it is needed. +type sprintf struct { + format string + args []interface{} +} + +func (s *sprintf) String() string { + return fmt.Sprintf(s.format, s.args...) +} + +// Helper to defer sprint until it is needed. +// Note we don't use Sprintln because the output is passed to go-kit as one value among many on a line +type sprint struct { + args []interface{} +} + +func (s *sprint) String() string { + return fmt.Sprint(s.args...) +} + func (g gokit) Debugf(format string, args ...interface{}) { - level.Debug(g.Logger).Log("msg", fmt.Sprintf(format, args...)) + level.Debug(g.Logger).Log("msg", &sprintf{format: format, args: args}) } func (g gokit) Debugln(args ...interface{}) { - level.Debug(g.Logger).Log("msg", fmt.Sprintln(args...)) + level.Debug(g.Logger).Log("msg", &sprint{args: args}) } func (g gokit) Infof(format string, args ...interface{}) { - level.Info(g.Logger).Log("msg", fmt.Sprintf(format, args...)) + level.Info(g.Logger).Log("msg", &sprintf{format: format, args: args}) } func (g gokit) Infoln(args ...interface{}) { - level.Info(g.Logger).Log("msg", fmt.Sprintln(args...)) + level.Info(g.Logger).Log("msg", &sprint{args: args}) } func (g gokit) Warnf(format string, args ...interface{}) { - level.Warn(g.Logger).Log("msg", fmt.Sprintf(format, args...)) + level.Warn(g.Logger).Log("msg", &sprintf{format: format, args: args}) } func (g gokit) Warnln(args ...interface{}) { - level.Warn(g.Logger).Log("msg", fmt.Sprintln(args...)) + level.Warn(g.Logger).Log("msg", &sprint{args: args}) } func (g gokit) Errorf(format string, args ...interface{}) { - level.Error(g.Logger).Log("msg", fmt.Sprintf(format, args...)) + level.Error(g.Logger).Log("msg", &sprintf{format: format, args: args}) } func (g gokit) Errorln(args ...interface{}) { - level.Error(g.Logger).Log("msg", fmt.Sprintln(args...)) + level.Error(g.Logger).Log("msg", &sprint{args: args}) } func (g gokit) WithField(key string, value interface{}) Interface { diff --git a/vendor/github.com/weaveworks/common/middleware/grpc_instrumentation.go b/vendor/github.com/weaveworks/common/middleware/grpc_instrumentation.go index 5ced3989e8034..90c955f1e82dc 100644 --- a/vendor/github.com/weaveworks/common/middleware/grpc_instrumentation.go +++ b/vendor/github.com/weaveworks/common/middleware/grpc_instrumentation.go @@ -1,6 +1,8 @@ package middleware import ( + "context" + "io" "strconv" "time" @@ -8,8 +10,8 @@ import ( grpcUtils "github.com/weaveworks/common/grpc" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/instrument" - "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) func observe(ctx context.Context, hist *prometheus.HistogramVec, method string, err error, duration time.Duration) { @@ -45,3 +47,90 @@ func StreamServerInstrumentInterceptor(hist *prometheus.HistogramVec) grpc.Strea return err } } + +// UnaryClientInstrumentInterceptor records duration of gRPC requests client side. +func UnaryClientInstrumentInterceptor(metric *prometheus.HistogramVec) grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + start := time.Now() + err := invoker(ctx, method, req, resp, cc, opts...) + metric.WithLabelValues(method, errorCode(err)).Observe(time.Since(start).Seconds()) + return err + } +} + +// StreamClientInstrumentInterceptor records duration of streaming gRPC requests client side. +func StreamClientInstrumentInterceptor(metric *prometheus.HistogramVec) grpc.StreamClientInterceptor { + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, + streamer grpc.Streamer, opts ...grpc.CallOption, + ) (grpc.ClientStream, error) { + start := time.Now() + stream, err := streamer(ctx, desc, cc, method, opts...) + return &instrumentedClientStream{ + metric: metric, + start: start, + method: method, + ClientStream: stream, + }, err + } +} + +type instrumentedClientStream struct { + metric *prometheus.HistogramVec + start time.Time + method string + grpc.ClientStream +} + +func (s *instrumentedClientStream) SendMsg(m interface{}) error { + err := s.ClientStream.SendMsg(m) + if err == nil { + return nil + } + + if err == io.EOF { + s.metric.WithLabelValues(s.method, errorCode(nil)).Observe(time.Since(s.start).Seconds()) + } else { + s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(s.start).Seconds()) + } + + return err +} + +func (s *instrumentedClientStream) RecvMsg(m interface{}) error { + err := s.ClientStream.RecvMsg(m) + if err == nil { + return nil + } + + if err == io.EOF { + s.metric.WithLabelValues(s.method, errorCode(nil)).Observe(time.Since(s.start).Seconds()) + } else { + s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(s.start).Seconds()) + } + + return err +} + +func (s *instrumentedClientStream) Header() (metadata.MD, error) { + md, err := s.ClientStream.Header() + if err != nil { + s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(s.start).Seconds()) + } + return md, err +} + +// errorCode converts an error into an error code string. +func errorCode(err error) string { + if err == nil { + return "2xx" + } + + if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok { + statusFamily := int(errResp.Code / 100) + return strconv.Itoa(statusFamily) + "xx" + } else if grpcUtils.IsCanceled(err) { + return "cancel" + } else { + return "error" + } +} diff --git a/vendor/github.com/weaveworks/common/middleware/grpc_logging.go b/vendor/github.com/weaveworks/common/middleware/grpc_logging.go index 5a3a393c8aa6c..59994413cd8c8 100644 --- a/vendor/github.com/weaveworks/common/middleware/grpc_logging.go +++ b/vendor/github.com/weaveworks/common/middleware/grpc_logging.go @@ -20,13 +20,18 @@ const ( type GRPCServerLog struct { Log logging.Interface // WithRequest will log the entire request rather than just the error - WithRequest bool + WithRequest bool + DisableRequestSuccessLog bool } // UnaryServerInterceptor returns an interceptor that logs gRPC requests func (s GRPCServerLog) UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { begin := time.Now() resp, err := handler(ctx, req) + if err == nil && s.DisableRequestSuccessLog { + return resp, nil + } + entry := user.LogWith(ctx, s.Log).WithFields(logging.Fields{"method": info.FullMethod, "duration": time.Since(begin)}) if err != nil { if s.WithRequest { @@ -47,6 +52,10 @@ func (s GRPCServerLog) UnaryServerInterceptor(ctx context.Context, req interface func (s GRPCServerLog) StreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { begin := time.Now() err := handler(srv, ss) + if err == nil && s.DisableRequestSuccessLog { + return nil + } + entry := user.LogWith(ss.Context(), s.Log).WithFields(logging.Fields{"method": info.FullMethod, "duration": time.Since(begin)}) if err != nil { if grpcUtils.IsCanceled(err) { diff --git a/vendor/github.com/weaveworks/common/middleware/http_tracing.go b/vendor/github.com/weaveworks/common/middleware/http_tracing.go index ded22aaa517a5..7b55557677e2e 100644 --- a/vendor/github.com/weaveworks/common/middleware/http_tracing.go +++ b/vendor/github.com/weaveworks/common/middleware/http_tracing.go @@ -29,11 +29,19 @@ func (t Tracer) Wrap(next http.Handler) http.Handler { return fmt.Sprintf("HTTP %s - %s", r.Method, op) }), - } - if t.SourceIPs != nil { - options = append(options, nethttp.MWSpanObserver(func(sp opentracing.Span, r *http.Request) { - sp.SetTag("sourceIPs", t.SourceIPs.Get(r)) - })) + nethttp.MWSpanObserver(func(sp opentracing.Span, r *http.Request) { + // add a tag with the client's user agent to the span + userAgent := r.Header.Get("User-Agent") + if userAgent != "" { + sp.SetTag("http.user_agent", userAgent) + } + + // add a tag with the client's sourceIPs to the span, if a + // SourceIPExtractor is given. + if t.SourceIPs != nil { + sp.SetTag("sourceIPs", t.SourceIPs.Get(r)) + } + }), } return nethttp.Middleware(opentracing.GlobalTracer(), next, options...) diff --git a/vendor/github.com/weaveworks/common/middleware/logging.go b/vendor/github.com/weaveworks/common/middleware/logging.go index 81e1e48a271de..015cc3b585ff7 100644 --- a/vendor/github.com/weaveworks/common/middleware/logging.go +++ b/vendor/github.com/weaveworks/common/middleware/logging.go @@ -14,9 +14,10 @@ import ( // Log middleware logs http requests type Log struct { - Log logging.Interface - LogRequestHeaders bool // LogRequestHeaders true -> dump http headers at debug log level - SourceIPs *SourceIPExtractor + Log logging.Interface + LogRequestHeaders bool // LogRequestHeaders true -> dump http headers at debug log level + LogRequestAtInfoLevel bool // LogRequestAtInfoLevel true -> log requests at info log level + SourceIPs *SourceIPExtractor } // logWithRequest information from the request and context as fields. @@ -42,11 +43,12 @@ func (l Log) Wrap(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { begin := time.Now() uri := r.RequestURI // capture the URI before running next, as it may get rewritten + requestLog := l.logWithRequest(r) // Log headers before running 'next' in case other interceptors change the data. headers, err := dumpRequest(r) if err != nil { headers = nil - l.logWithRequest(r).Errorf("Could not dump request headers: %v", err) + requestLog.Errorf("Could not dump request headers: %v", err) } var buf bytes.Buffer wrapped := newBadResponseLoggingWriter(w, &buf) @@ -56,20 +58,32 @@ func (l Log) Wrap(next http.Handler) http.Handler { if writeErr != nil { if errors.Is(writeErr, context.Canceled) { - l.logWithRequest(r).Debugf("%s %s %s, request cancelled: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers) + if l.LogRequestAtInfoLevel { + requestLog.Infof("%s %s %s, request cancelled: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers) + } else { + requestLog.Debugf("%s %s %s, request cancelled: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers) + } } else { - l.logWithRequest(r).Warnf("%s %s %s, error: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers) + requestLog.Warnf("%s %s %s, error: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers) } return } if 100 <= statusCode && statusCode < 500 || statusCode == http.StatusBadGateway || statusCode == http.StatusServiceUnavailable { - l.logWithRequest(r).Debugf("%s %s (%d) %s", r.Method, uri, statusCode, time.Since(begin)) + if l.LogRequestAtInfoLevel { + requestLog.Infof("%s %s (%d) %s", r.Method, uri, statusCode, time.Since(begin)) + } else { + requestLog.Debugf("%s %s (%d) %s", r.Method, uri, statusCode, time.Since(begin)) + } if l.LogRequestHeaders && headers != nil { - l.logWithRequest(r).Debugf("ws: %v; %s", IsWSHandshakeRequest(r), string(headers)) + if l.LogRequestAtInfoLevel { + requestLog.Infof("ws: %v; %s", IsWSHandshakeRequest(r), string(headers)) + } else { + requestLog.Debugf("ws: %v; %s", IsWSHandshakeRequest(r), string(headers)) + } } } else { - l.logWithRequest(r).Warnf("%s %s (%d) %s Response: %q ws: %v; %s", + requestLog.Warnf("%s %s (%d) %s Response: %q ws: %v; %s", r.Method, uri, statusCode, time.Since(begin), buf.Bytes(), IsWSHandshakeRequest(r), headers) } }) diff --git a/vendor/github.com/weaveworks/common/server/server.go b/vendor/github.com/weaveworks/common/server/server.go index 58da4ae3acd7d..6f6840cc459eb 100644 --- a/vendor/github.com/weaveworks/common/server/server.go +++ b/vendor/github.com/weaveworks/common/server/server.go @@ -16,7 +16,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - node_https "github.com/prometheus/node_exporter/https" + "github.com/prometheus/exporter-toolkit/web" "golang.org/x/net/context" "golang.org/x/net/netutil" "google.golang.org/grpc" @@ -62,11 +62,12 @@ type Config struct { GRPCListenPort int `yaml:"grpc_listen_port"` GRPCConnLimit int `yaml:"grpc_listen_conn_limit"` - HTTPTLSConfig node_https.TLSStruct `yaml:"http_tls_config"` - GRPCTLSConfig node_https.TLSStruct `yaml:"grpc_tls_config"` + HTTPTLSConfig web.TLSStruct `yaml:"http_tls_config"` + GRPCTLSConfig web.TLSStruct `yaml:"grpc_tls_config"` - RegisterInstrumentation bool `yaml:"register_instrumentation"` - ExcludeRequestInLog bool `yaml:"-"` + RegisterInstrumentation bool `yaml:"register_instrumentation"` + ExcludeRequestInLog bool `yaml:"-"` + DisableRequestSuccessLog bool `yaml:"-"` ServerGracefulShutdownTimeout time.Duration `yaml:"graceful_shutdown_timeout"` HTTPServerReadTimeout time.Duration `yaml:"http_server_read_timeout"` @@ -91,16 +92,21 @@ type Config struct { GRPCServerMinTimeBetweenPings time.Duration `yaml:"grpc_server_min_time_between_pings"` GRPCServerPingWithoutStreamAllowed bool `yaml:"grpc_server_ping_without_stream_allowed"` - LogFormat logging.Format `yaml:"log_format"` - LogLevel logging.Level `yaml:"log_level"` - Log logging.Interface `yaml:"-"` - LogSourceIPs bool `yaml:"log_source_ips_enabled"` - LogSourceIPsHeader string `yaml:"log_source_ips_header"` - LogSourceIPsRegex string `yaml:"log_source_ips_regex"` + LogFormat logging.Format `yaml:"log_format"` + LogLevel logging.Level `yaml:"log_level"` + Log logging.Interface `yaml:"-"` + LogSourceIPs bool `yaml:"log_source_ips_enabled"` + LogSourceIPsHeader string `yaml:"log_source_ips_header"` + LogSourceIPsRegex string `yaml:"log_source_ips_regex"` + LogRequestAtInfoLevel bool `yaml:"log_request_at_info_level_enabled"` // If not set, default signal handler is used. SignalHandler SignalHandler `yaml:"-"` + // If not set, default Prometheus registry is used. + Registerer prometheus.Registerer `yaml:"-"` + Gatherer prometheus.Gatherer `yaml:"-"` + PathPrefix string `yaml:"http_path_prefix"` } @@ -145,6 +151,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.LogSourceIPs, "server.log-source-ips-enabled", false, "Optionally log the source IPs.") f.StringVar(&cfg.LogSourceIPsHeader, "server.log-source-ips-header", "", "Header field storing the source IPs. Only used if server.log-source-ips-enabled is true. If not set the default Forwarded, X-Real-IP and X-Forwarded-For headers are used") f.StringVar(&cfg.LogSourceIPsRegex, "server.log-source-ips-regex", "", "Regex for matching the source IPs. Only used if server.log-source-ips-enabled is true. If not set the default Forwarded, X-Real-IP and X-Forwarded-For headers are used") + f.BoolVar(&cfg.LogRequestAtInfoLevel, "server.log-request-at-info-level-enabled", false, "Optionally log requests at info level instead of debug level.") } // Server wraps a HTTP and gRPC server, and some common initialization. @@ -160,6 +167,8 @@ type Server struct { HTTPServer *http.Server GRPC *grpc.Server Log logging.Interface + Registerer prometheus.Registerer + Gatherer prometheus.Gatherer } // New makes a new Server @@ -171,6 +180,13 @@ func New(cfg Config) (*Server, error) { }, []string{"protocol"}) prometheus.MustRegister(tcpConnections) + tcpConnectionsLimit := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: cfg.MetricsNamespace, + Name: "tcp_connections_limit", + Help: "The max number of TCP connections that can be accepted (0 means no limit).", + }, []string{"protocol"}) + prometheus.MustRegister(tcpConnectionsLimit) + network := cfg.HTTPListenNetwork if network == "" { network = DefaultNetwork @@ -182,6 +198,7 @@ func New(cfg Config) (*Server, error) { } httpListener = middleware.CountingListener(httpListener, tcpConnections.WithLabelValues("http")) + tcpConnectionsLimit.WithLabelValues("http").Set(float64(cfg.HTTPConnLimit)) if cfg.HTTPConnLimit > 0 { httpListener = netutil.LimitListener(httpListener, cfg.HTTPConnLimit) } @@ -196,6 +213,7 @@ func New(cfg Config) (*Server, error) { } grpcListener = middleware.CountingListener(grpcListener, tcpConnections.WithLabelValues("grpc")) + tcpConnectionsLimit.WithLabelValues("grpc").Set(float64(cfg.GRPCConnLimit)) if cfg.GRPCConnLimit > 0 { grpcListener = netutil.LimitListener(grpcListener, cfg.GRPCConnLimit) } @@ -207,19 +225,29 @@ func New(cfg Config) (*Server, error) { log = logging.NewLogrus(cfg.LogLevel) } + // If user doesn't supply a registerer/gatherer, use Prometheus' by default. + reg := cfg.Registerer + if reg == nil { + reg = prometheus.DefaultRegisterer + } + gatherer := cfg.Gatherer + if gatherer == nil { + gatherer = prometheus.DefaultGatherer + } + // Setup TLS var httpTLSConfig *tls.Config if len(cfg.HTTPTLSConfig.TLSCertPath) > 0 && len(cfg.HTTPTLSConfig.TLSKeyPath) > 0 { - // Note: ConfigToTLSConfig from prometheus/node_exporter is awaiting security review. - httpTLSConfig, err = node_https.ConfigToTLSConfig(&cfg.HTTPTLSConfig) + // Note: ConfigToTLSConfig from prometheus/exporter-toolkit is awaiting security review. + httpTLSConfig, err = web.ConfigToTLSConfig(&cfg.HTTPTLSConfig) if err != nil { return nil, fmt.Errorf("error generating http tls config: %v", err) } } var grpcTLSConfig *tls.Config if len(cfg.GRPCTLSConfig.TLSCertPath) > 0 && len(cfg.GRPCTLSConfig.TLSKeyPath) > 0 { - // Note: ConfigToTLSConfig from prometheus/node_exporter is awaiting security review. - grpcTLSConfig, err = node_https.ConfigToTLSConfig(&cfg.GRPCTLSConfig) + // Note: ConfigToTLSConfig from prometheus/exporter-toolkit is awaiting security review. + grpcTLSConfig, err = web.ConfigToTLSConfig(&cfg.GRPCTLSConfig) if err != nil { return nil, fmt.Errorf("error generating grpc tls config: %v", err) } @@ -232,7 +260,7 @@ func New(cfg Config) (*Server, error) { Help: "Time (in seconds) spent serving HTTP requests.", Buckets: instrument.DefBuckets, }, []string{"method", "route", "status_code", "ws"}) - prometheus.MustRegister(requestDuration) + reg.MustRegister(requestDuration) receivedMessageSize := prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: cfg.MetricsNamespace, @@ -240,7 +268,7 @@ func New(cfg Config) (*Server, error) { Help: "Size (in bytes) of messages received in the request.", Buckets: middleware.BodySizeBuckets, }, []string{"method", "route"}) - prometheus.MustRegister(receivedMessageSize) + reg.MustRegister(receivedMessageSize) sentMessageSize := prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: cfg.MetricsNamespace, @@ -248,21 +276,22 @@ func New(cfg Config) (*Server, error) { Help: "Size (in bytes) of messages sent in response.", Buckets: middleware.BodySizeBuckets, }, []string{"method", "route"}) - prometheus.MustRegister(sentMessageSize) + reg.MustRegister(sentMessageSize) inflightRequests := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: cfg.MetricsNamespace, Name: "inflight_requests", Help: "Current number of inflight requests.", }, []string{"method", "route"}) - prometheus.MustRegister(inflightRequests) + reg.MustRegister(inflightRequests) log.WithField("http", httpListener.Addr()).WithField("grpc", grpcListener.Addr()).Infof("server listening on addresses") // Setup gRPC server serverLog := middleware.GRPCServerLog{ - WithRequest: !cfg.ExcludeRequestInLog, - Log: log, + Log: log, + WithRequest: !cfg.ExcludeRequestInLog, + DisableRequestSuccessLog: cfg.DisableRequestSuccessLog, } grpcMiddleware := []grpc.UnaryServerInterceptor{ serverLog.UnaryServerInterceptor, @@ -325,7 +354,7 @@ func New(cfg Config) (*Server, error) { router = router.PathPrefix(cfg.PathPrefix).Subrouter() } if cfg.RegisterInstrumentation { - RegisterInstrumentation(router) + RegisterInstrumentationWithGatherer(router, gatherer) } var sourceIPs *middleware.SourceIPExtractor @@ -342,8 +371,9 @@ func New(cfg Config) (*Server, error) { SourceIPs: sourceIPs, }, middleware.Log{ - Log: log, - SourceIPs: sourceIPs, + Log: log, + SourceIPs: sourceIPs, + LogRequestAtInfoLevel: cfg.LogRequestAtInfoLevel, }, middleware.Instrument{ RouteMatcher: router, @@ -385,12 +415,19 @@ func New(cfg Config) (*Server, error) { HTTPServer: httpServer, GRPC: grpcServer, Log: log, + Registerer: reg, + Gatherer: gatherer, }, nil } // RegisterInstrumentation on the given router. func RegisterInstrumentation(router *mux.Router) { - router.Handle("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{ + RegisterInstrumentationWithGatherer(router, prometheus.DefaultGatherer) +} + +// RegisterInstrumentationWithGatherer on the given router. +func RegisterInstrumentationWithGatherer(router *mux.Router, gatherer prometheus.Gatherer) { + router.Handle("/metrics", promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{ EnableOpenMetrics: true, })) router.PathPrefix("/debug/pprof").Handler(http.DefaultServeMux) diff --git a/vendor/modules.txt b/vendor/modules.txt index ad12b9513922c..7bef3766d3219 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -528,7 +528,7 @@ github.com/gorilla/mux # github.com/gorilla/websocket v1.4.2 ## explicit; go 1.12 github.com/gorilla/websocket -# github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca +# github.com/grafana/dskit v0.0.0-20220708141012-99f3d0043c23 ## explicit; go 1.17 github.com/grafana/dskit/backoff github.com/grafana/dskit/concurrency @@ -544,7 +544,6 @@ github.com/grafana/dskit/kv/consul github.com/grafana/dskit/kv/etcd github.com/grafana/dskit/kv/memberlist github.com/grafana/dskit/limiter -github.com/grafana/dskit/middleware github.com/grafana/dskit/modules github.com/grafana/dskit/multierror github.com/grafana/dskit/netutil @@ -614,7 +613,7 @@ github.com/hashicorp/go-uuid ## explicit; go 1.12 github.com/hashicorp/golang-lru github.com/hashicorp/golang-lru/simplelru -# github.com/hashicorp/memberlist v0.3.0 => github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167 +# github.com/hashicorp/memberlist v0.3.0 => github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91 ## explicit; go 1.12 github.com/hashicorp/memberlist # github.com/hashicorp/serf v0.9.6 @@ -869,9 +868,9 @@ github.com/prometheus/common/version # github.com/prometheus/common/sigv4 v0.1.0 ## explicit; go 1.15 github.com/prometheus/common/sigv4 -# github.com/prometheus/node_exporter v1.0.0-rc.0.0.20200428091818-01054558c289 -## explicit; go 1.13 -github.com/prometheus/node_exporter/https +# github.com/prometheus/exporter-toolkit v0.7.1 +## explicit; go 1.14 +github.com/prometheus/exporter-toolkit/web # github.com/prometheus/procfs v0.7.3 ## explicit; go 1.13 github.com/prometheus/procfs @@ -1036,8 +1035,8 @@ github.com/uber/jaeger-lib/metrics/prometheus # github.com/ugorji/go/codec v1.1.7 ## explicit github.com/ugorji/go/codec -# github.com/weaveworks/common v0.0.0-20211015155308-ebe5bdc2c89e -## explicit; go 1.13 +# github.com/weaveworks/common v0.0.0-20220629114710-e3b70df0f08b +## explicit; go 1.14 github.com/weaveworks/common/aws github.com/weaveworks/common/errors github.com/weaveworks/common/grpc @@ -1712,4 +1711,4 @@ sigs.k8s.io/yaml # github.com/gocql/gocql => github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 # github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab # github.com/cloudflare/cloudflare-go => github.com/cyriltovena/cloudflare-go v0.27.1-0.20211118103540-ff77400bcb93 -# github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167 +# github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91