Skip to content

Commit

Permalink
feat(metrics): implement metrics for kubernetes and hazelcast data-count
Browse files Browse the repository at this point in the history
  • Loading branch information
Th3Shadowbroker committed Oct 28, 2024
1 parent 6bb51a1 commit 01d5622
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/fsfe/reuse-tool
rev: v1.0.0
rev: v4.0.3
hooks:
- id: reuse
stages:
Expand Down
30 changes: 30 additions & 0 deletions internal/k8s/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
package k8s

import (
"context"
"fmt"
"github.com/rs/zerolog/log"
"github.com/telekom/quasar/internal/config"
"github.com/telekom/quasar/internal/metrics"
"github.com/telekom/quasar/internal/mongo"
"github.com/telekom/quasar/internal/store"
"github.com/telekom/quasar/internal/utils"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -67,6 +69,8 @@ func NewResourceWatcher(
DeleteFunc: watcher.delete,
})

go watcher.collectMetrics(client, resourceConfig)

return &watcher, err
}

Expand Down Expand Up @@ -147,3 +151,29 @@ func (w *ResourceWatcher) Start() {
func (w *ResourceWatcher) Stop() {
close(w.stopChan)
}

func (w *ResourceWatcher) collectMetrics(client dynamic.Interface, resourceConfig *config.ResourceConfiguration) {
if err := recover(); err != nil {
log.Error().Msgf("Recovered from %s during kubernetes metric collection", err)
return
}

for {
list, err := client.Resource(resourceConfig.GetGroupVersionResource()).
Namespace(resourceConfig.Kubernetes.Namespace).
List(context.Background(), v1.ListOptions{})

if err != nil {
log.Error().Err(err).Fields(map[string]any{
"resource": resourceConfig.GetCacheName(),
}).Msg("Could not resource count")

time.Sleep(15 * time.Second)
continue
}

var gaugeName = resourceConfig.GetCacheName() + "_kubernetes_count"
metrics.GetOrCreateCustom(gaugeName).WithLabelValues().Set(float64(len(list.Items)))
time.Sleep(15 * time.Second)
}
}
23 changes: 23 additions & 0 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,26 @@ func GetOrCreate(resourceConfig *config.ResourceConfiguration) *prometheus.Gauge

return gauge
}

func GetOrCreateCustom(name string) *prometheus.GaugeVec {
var gaugeName = strings.ToLower(name)

gauge, ok := gauges[name]
if !ok {
gauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: fmt.Sprintf(gaugeName),
}, []string{})

gauges[gaugeName] = gauge
if err := registry.Register(gauge); err != nil {
log.Error().Err(err).
Fields(map[string]any{
"name": fmt.Sprintf("%s_%s", namespace, gaugeName),
}).
Msg("Could not create metric")
}
}

return gauge
}
33 changes: 33 additions & 0 deletions internal/store/hazelcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"github.com/hazelcast/hazelcast-go-client/serialization"
"github.com/rs/zerolog/log"
"github.com/telekom/quasar/internal/config"
"github.com/telekom/quasar/internal/metrics"
"github.com/telekom/quasar/internal/mongo"
"github.com/telekom/quasar/internal/utils"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"time"
)

type HazelcastStore struct {
Expand Down Expand Up @@ -63,6 +65,8 @@ func (s *HazelcastStore) InitializeResource(resourceConfig *config.ResourceConfi
}).Err(err).Msg("Could not create hazelcast index")
}
}

go s.collectMetrics(resourceConfig.GetCacheName())
}

func (s *HazelcastStore) OnAdd(obj *unstructured.Unstructured) {
Expand Down Expand Up @@ -133,3 +137,32 @@ func (s *HazelcastStore) getMap(obj *unstructured.Unstructured) *hazelcast.Map {

return cacheMap
}

func (s *HazelcastStore) collectMetrics(resourceName string) {
if err := recover(); err != nil {
log.Error().Msgf("Recovered from %s during hazelcast metric collection", err)
return
}

for {
hzMap, err := s.client.GetMap(context.Background(), resourceName)
if err != nil {
log.Error().Err(err).Fields(map[string]any{
"map": hzMap.Name(),
}).Msg("Could not collect data")
}

size, err := hzMap.Size(context.Background())
if err != nil {
log.Error().Err(err).Fields(map[string]any{
"map": hzMap.Name(),
}).Msg("Could not retrieve size")

time.Sleep(15 * time.Second)
continue
}

metrics.GetOrCreateCustom(resourceName + "_hazelcast_count").WithLabelValues().Set(float64(size))
time.Sleep(15 * time.Second)
}
}

0 comments on commit 01d5622

Please sign in to comment.