Skip to content

Karmada client integration k8s #215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
34 changes: 34 additions & 0 deletions cmd/k8s-dashboard-api/argswrappper/argswrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package argswrapper

import (
"github.com/spf13/pflag"
"k8s.io/dashboard/api/pkg/helpers"
)

var (
argKubeConfigFile = pflag.String("kubeconfig", "", "path to kubeconfig file with control plane location information")
argNamespace = pflag.String("namespace", helpers.GetEnv("POD_NAMESPACE", "kubernetes-dashboard"), "Namespace to use when accessing Dashboard specific resources, i.e. metrics scraper service")
argMetricsScraperServiceName = pflag.String("metrics-scraper-service-name", "kubernetes-dashboard-metrics-scraper", "name of the dashboard metrics scraper service")

// Karmada specific flags
argKarmadaKubeConfigFile = pflag.String("karmada-kubeconfig", "", "path to karmada kubeconfig file with karmada control plane location information")
argKarmadaContext = pflag.String("karmada-context", "", "name of the karmada-kubeconfig context to use")
argKarmadaApiserverSkipTLSVerify = pflag.Bool("karmada-apiserver-skip-tls-verify", false, "enable if connection with remote Karmada API server should skip TLS verify")
)

func init() {
// Register flags on program start
pflag.Parse()
}

func KarmadaKubeConfigPath() string {
return *argKarmadaKubeConfigFile
}

func KarmadaContext() string {
return *argKarmadaContext
}

func KarmadaApiserverSkipTLSVerify() bool {
return *argKarmadaApiserverSkipTLSVerify
}
15 changes: 15 additions & 0 deletions cmd/k8s-dashboard-api/clientwrapper/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2017 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

.tmp/
52 changes: 52 additions & 0 deletions cmd/k8s-dashboard-api/clientwrapper/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
ROOT_DIRECTORY = $(shell dirname $(realpath $(firstword $(MAKEFILE_LIST))))/../../..

# Global makefile partial config
include $(ROOT_DIRECTORY)/hack/include/config.mk

MODULE_NAME := "common.client"
COVERAGE_FILE := $(TMP_DIRECTORY)/$(MODULE_NAME).coverage.out

# ==================== GLOBAL ==================== #

.PHONY: build
build:
@echo "[$(MODULE_NAME)] Building"
@CGO_ENABLED=0 go build -trimpath -ldflags "-s -w" ./...

.PHONY: check
check:
@echo "[$(MODULE_NAME)] Running lint"
@golangci-lint run -c $(GOLANGCI_LINT_CONFIG) ./...

.PHONY: coverage
coverage: DIR := $(TMP_DIRECTORY)
coverage: --ensure-dir
@echo "[$(MODULE_NAME)] Running tests with coverage"
@go test -coverprofile=$(COVERAGE_FILE) -covermode=atomic ./...

# Mocked target to allow global clean target to work
.PHONY: clean
clean:
@:

.PHONY: fix
fix:
@echo "[$(MODULE_NAME)] Running lint --fix"
@golangci-lint run -c $(GOLANGCI_LINT_CONFIG) --fix ./...

.PHONY: test
test:
@echo "[$(MODULE_NAME)] Running tests"
@go test ./...


# ==================== PRIVATE ==================== #

.PHONY: --ensure-dir
--ensure-dir:
@if [ -z "$(DIR)" ]; then \
echo "DIR variable not set" ; \
exit 1 ; \
fi ; \
mkdir -p $(DIR) ; \

60 changes: 60 additions & 0 deletions cmd/k8s-dashboard-api/clientwrapper/args/args.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2017 The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package args

import (
"time"

"github.com/spf13/pflag"
)

var (
argCacheEnabled = pflag.Bool("cache-enabled", true, "whether client cache should be enabled or not")
argClusterContextEnabled = pflag.Bool("cluster-context-enabled", false, "whether multi-cluster cache context support should be enabled or not")
argTokenExchangeEndpoint = pflag.String("token-exchange-endpoint", "", "endpoint used in multi-cluster cache to exchange tokens for context identifiers")
argCacheSize = pflag.Int("cache-size", 1000, "max number of cache entries")
argCacheTTL = pflag.Duration("cache-ttl", 10*time.Minute, "cache entry TTL")
argCacheRefreshDebounce = pflag.Duration("cache-refresh-debounce", 5*time.Second, "minimal time between cache refreshes in the background")
)

func Ensure() {
if *argClusterContextEnabled && len(*argTokenExchangeEndpoint) == 0 {
panic("token-exchange-endpoint must be set when cluster-context-enabled is set to true")
}
}

func CacheEnabled() bool {
return *argCacheEnabled
}

func ClusterContextEnabled() bool {
return *argClusterContextEnabled
}

func TokenExchangeEndpoint() string {
return *argTokenExchangeEndpoint
}

func CacheSize() int {
return *argCacheSize
}

func CacheTTL() time.Duration {
return *argCacheTTL
}

func CacheRefreshDebounce() time.Duration {
return *argCacheRefreshDebounce
}
53 changes: 53 additions & 0 deletions cmd/k8s-dashboard-api/clientwrapper/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2017 The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"net/http"
"strings"

"k8s.io/klog/v2"
)

const (
// authorizationHeader is the default authorization header name.
authorizationHeader = "Authorization"
// authorizationTokenPrefix is the default bearer token prefix.
authorizationTokenPrefix = "Bearer "
)

func HasAuthorizationHeader(req *http.Request) bool {
header := req.Header.Get(authorizationHeader)
if len(header) == 0 {
return false
}

token := extractBearerToken(header)
klog.V(5).InfoS("Bearer token", "size", len(token))
return strings.HasPrefix(header, authorizationTokenPrefix) && len(token) > 0
}

func GetBearerToken(req *http.Request) string {
header := req.Header.Get(authorizationHeader)
return extractBearerToken(header)
}

func SetAuthorizationHeader(req *http.Request, token string) {
req.Header.Set(authorizationHeader, authorizationTokenPrefix+token)
}

func extractBearerToken(header string) string {
return strings.TrimPrefix(header, authorizationTokenPrefix)
}
153 changes: 153 additions & 0 deletions cmd/k8s-dashboard-api/clientwrapper/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright 2017 The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cache

import (
"sync"
"time"

"github.com/Yiling-J/theine-go"
"k8s.io/klog/v2"

"k8s.io/dashboard/client/args"
)

var (
// cache is a global resource cache that maps our custom keys
// created based on information provided by the specific resource client.
// It stores resource lists to speed up the client response times.
cache *theine.Cache[string, any]

// cacheLocks is used as a set that holds information about
// cache keys that are currently fetching the latest data from the
// Kubernetes API server in the background.
// It allows us to avoid multiple concurrent update calls being sent
// to the Kubernetes API.
// Once the lock is removed, the next update call can be initiated.
cacheLocks sync.Map

// syncedLoadLocks is used to synchronize the initial cache hydration phase
// and avoid putting extra pressure on the API server.
// It maps cache keys to mutexes.
syncedLoadLocks sync.Map
)

func init() {
var err error
if cache, err = theine.NewBuilder[string, any](int64(args.CacheSize())).Build(); err != nil {
panic(err)
}
}

// Get gives access to cache entries.
// It requires a Key to be provided which is used to calculate cache key SHA.
func Get[T any](key Key) (*T, bool, error) {
typedValue := new(T)

cacheKey, err := key.SHA()
if err != nil {
return typedValue, false, err
}

value, exists := cache.Get(cacheKey)
if exists {
typedValue = value.(*T)
}

return typedValue, exists, nil
}

// Set allows updating cache with specific values.
// It requires a Key to be provided which is used to calculate cache key SHA.
func Set[T any](key Key, value T) error {
cacheKey, err := key.SHA()
if err != nil {
return err
}

set(cacheKey, value)
return nil
}

func set[T any](key string, value T) bool {
return cache.SetWithTTL(key, value, 1, args.CacheTTL())
}

// DeferredLoad updates cache in the background with the data fetched using the loadFunc.
func DeferredLoad[T any](key Key, loadFunc func() (T, error)) {
go func() {
cacheKey, err := key.SHA()
if err != nil {
klog.ErrorS(err, "failed loading cache key", "key", cacheKey)
return
}

if _, locked := cacheLocks.LoadOrStore(cacheKey, struct{}{}); locked {
klog.V(4).InfoS("cache is already being updated, skipping", "key", cacheKey)
return
}

defer time.AfterFunc(args.CacheRefreshDebounce(), func() {
cacheLocks.Delete(cacheKey)
klog.V(4).InfoS("released cache update lock", "key", cacheKey)
})

cacheValue, err := loadFunc()
if err != nil {
klog.ErrorS(err, "failed loading cache data", "key", cacheKey)
return
}

set(cacheKey, cacheValue)
klog.V(4).InfoS("cache updated successfully", "key", cacheKey)
}()
}

// SyncedLoad initializes the cache using the [loadFunc]. It ensures that there will be no concurrent
// calls to the [loadFunc]. The first call will call the [loadFunc] and initialize the cache while
// concurrent calls will be waiting for the first call to finish. Once cache is updated and lock is freed
// other routines will return the value from cache without making any extra calls to the [loadFunc].
func SyncedLoad[T any](key Key, loadFunc func() (*T, error)) (*T, error) {
cacheKey, err := key.SHA()
if err != nil {
klog.ErrorS(err, "failed loading cache key", "key", cacheKey)
return new(T), err
}

l, _ := syncedLoadLocks.LoadOrStore(cacheKey, &sync.Mutex{})
lock := l.(*sync.Mutex)
lock.Lock()

defer func() {
lock.Unlock()
syncedLoadLocks.Delete(cacheKey)
}()

if value, exists := cache.Get(cacheKey); exists {
klog.V(4).InfoS("synced from the cache", "key", cacheKey)
return value.(*T), nil
}

cacheValue, err := loadFunc()
if err != nil {
klog.ErrorS(err, "failed loading cache data", "key", cacheKey)
return new(T), err
}

set(cacheKey, cacheValue)
klog.V(4).InfoS("cache initialized successfully", "key", cacheKey)

return cacheValue, nil
}
Loading
Loading