Skip to content

Commit

Permalink
Merge pull request kosmos-io#814 from JimDevil/feat-proxy
Browse files Browse the repository at this point in the history
feat: clusterlink-proxy aggregator apiserver provider resource cache and proxy services
  • Loading branch information
duanmengkk authored Feb 13, 2025
2 parents be465bf + 567829e commit 642c918
Show file tree
Hide file tree
Showing 53 changed files with 3,171 additions and 527 deletions.
57 changes: 35 additions & 22 deletions cmd/clusterlink/proxy/app/clusterlink-proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,26 @@ import (
"fmt"

"github.com/spf13/cobra"
genericapiserver "k8s.io/apiserver/pkg/server"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
"k8s.io/klog/v2"

"github.com/kosmos.io/kosmos/cmd/clusterlink/proxy/app/options"
"github.com/kosmos.io/kosmos/pkg/sharedcli/klogflag"
profileflag "github.com/kosmos.io/kosmos/pkg/sharedcli/profileflag"
"github.com/kosmos.io/kosmos/pkg/utils"
)

// NewClusterLinkProxyCommand creates a *cobra.Command object with default parameters
func NewClusterLinkProxyCommand(ctx context.Context) *cobra.Command {
opts := options.NewOptions()

cmd := &cobra.Command{
Use: "proxy",
Long: `The proxy starts a apiserver for agent access the backend proxy`,
RunE: func(cmd *cobra.Command, args []string) error {
// validate options
/*
if errs := opts.Validate(); len(errs) != 0 {
return errs.ToAggregate()
}
*/
Use: utils.KosmosClusrerLinkRroxyComponentName,
Long: `starts a server for agent kube-apiserver`,
RunE: func(_ *cobra.Command, _ []string) error {
if err := opts.Validate(); err != nil {
return err
}
return run(ctx, opts)
},
Args: func(cmd *cobra.Command, args []string) error {
Expand All @@ -37,24 +36,26 @@ func NewClusterLinkProxyCommand(ctx context.Context) *cobra.Command {
return nil
},
}
namedFlagSets := opts.Flags()

fs := cmd.Flags()
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
flags := cmd.Flags()

cols, _, err := term.TerminalSize(cmd.OutOrStdout())
if err != nil {
klog.Warning("term.TerminalSize err: %v", err)
} else {
cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)
}
fss := cliflag.NamedFlagSets{}
genericFlagSet := fss.FlagSet("generic")
opts.AddFlags(genericFlagSet)

logsFlagSet := fss.FlagSet("logs")
klogflag.Add(logsFlagSet)

flags.AddFlagSet(genericFlagSet)
flags.AddFlagSet(logsFlagSet)

return cmd
}

func run(ctx context.Context, opts *options.Options) error {
// pprof
profileflag.ListenAndServe(opts.ProfileOpts)

config, err := opts.Config()
if err != nil {
return err
Expand All @@ -65,5 +66,17 @@ func run(ctx context.Context, opts *options.Options) error {
return err
}

server.GenericAPIServer.AddPostStartHookOrDie("start-proxy-controller", func(context genericapiserver.PostStartHookContext) error {
go func() {
config.ExtraConfig.ProxyController.Run(context.StopCh, 1)
}()
return nil
})

server.GenericAPIServer.AddPostStartHookOrDie("start-apiserver-informer", func(context genericapiserver.PostStartHookContext) error {
config.ExtraConfig.KosmosInformerFactory.Start(context.StopCh)
return nil
})

return server.GenericAPIServer.PrepareRun().Run(ctx.Done())
}
206 changes: 98 additions & 108 deletions cmd/clusterlink/proxy/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,118 +2,81 @@ package options

import (
"fmt"
"log"
"net"
"net/http"
"strings"

"github.com/spf13/pflag"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
"k8s.io/apiserver/pkg/endpoints/openapi"
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
genericapiserver "k8s.io/apiserver/pkg/server"
genericserver "k8s.io/apiserver/pkg/server"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/util/feature"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/featuregate"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

"github.com/kosmos.io/kosmos/pkg/apis/proxy/scheme"
proxyScheme "github.com/kosmos.io/kosmos/pkg/apis/proxy/scheme"
"github.com/kosmos.io/kosmos/pkg/clusterlink/proxy"
proxyctl "github.com/kosmos.io/kosmos/pkg/clusterlink/proxy/controller"
kosmosclientset "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
informerfactory "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions"
generatedopenapi "github.com/kosmos.io/kosmos/pkg/generated/openapi"
profileflag "github.com/kosmos.io/kosmos/pkg/sharedcli/profileflag"
"github.com/kosmos.io/kosmos/pkg/utils"
)

// Options contains command line parameters for clusterlink-proxy
type Options struct {
MaxRequestsInFlight int
MaxMutatingRequestsInFlight int
utils.KubernetesOptions

Logs *logs.Options
SecureServing *genericoptions.SecureServingOptionsWithLoopback
Authentication *genericoptions.DelegatingAuthenticationOptions
Authorization *genericoptions.DelegatingAuthorizationOptions
Audit *genericoptions.AuditOptions
Features *genericoptions.FeatureOptions
CoreAPI *genericoptions.CoreAPIOptions
FeatureGate featuregate.FeatureGate
Admission *genericoptions.AdmissionOptions
// RecommendedOptions *genericoptions.RecommendedOptions
GenericServerRunOptions *genericoptions.ServerRunOptions
SecureServing *genericoptions.SecureServingOptionsWithLoopback
Authentication *genericoptions.DelegatingAuthenticationOptions
Authorization *genericoptions.DelegatingAuthorizationOptions
Audit *genericoptions.AuditOptions
Features *genericoptions.FeatureOptions
CoreAPI *genericoptions.CoreAPIOptions
ServerRunOptions *genericoptions.ServerRunOptions

ProfileOpts profileflag.Options
}

// nolint
func NewOptions() *Options {
sso := genericoptions.NewSecureServingOptions()

// We are composing recommended options for an aggregated api-server,
// whose client is typically a proxy multiplexing many operations ---
// notably including long-running ones --- into one HTTP/2 connection
// into this server. So allow many concurrent operations.
sso.HTTP2MaxStreamsPerConnection = 1000

return &Options{
MaxRequestsInFlight: 0,
MaxMutatingRequestsInFlight: 0,

Logs: logs.NewOptions(),
SecureServing: sso.WithLoopback(),
Authentication: genericoptions.NewDelegatingAuthenticationOptions(),
Authorization: genericoptions.NewDelegatingAuthorizationOptions(),
Audit: genericoptions.NewAuditOptions(),
Features: genericoptions.NewFeatureOptions(),
CoreAPI: genericoptions.NewCoreAPIOptions(),
FeatureGate: feature.DefaultFeatureGate,
Admission: genericoptions.NewAdmissionOptions(),
}
func (o *Options) AddFlags(flags *pflag.FlagSet) {
o.SecureServing.AddFlags(flags)
o.Authentication.AddFlags(flags)
o.Authorization.AddFlags(flags)
o.Audit.AddFlags(flags)
o.Features.AddFlags(flags)
o.CoreAPI.AddFlags(flags)
o.ServerRunOptions.AddUniversalFlags(flags)
o.ProfileOpts.AddFlags(flags)
}

// nolint
func (o *Options) Validate() error {
errors := []error{}
errors = append(errors, o.validateGenericOptions()...)
return utilerrors.NewAggregate(errors)
}

func (o *Options) validateGenericOptions() []error {
errors := []error{}
if o.MaxRequestsInFlight < 0 {
errors = append(errors, fmt.Errorf("--max-requests-inflight can not be negative value"))
}
if o.MaxMutatingRequestsInFlight < 0 {
errors = append(errors, fmt.Errorf("--max-mutating-requests-inflight can not be negative value"))
func NewOptions() *Options {
o := &Options{
GenericServerRunOptions: genericoptions.NewServerRunOptions(),
SecureServing: genericoptions.NewSecureServingOptions().WithLoopback(),
Authentication: genericoptions.NewDelegatingAuthenticationOptions(),
Authorization: genericoptions.NewDelegatingAuthorizationOptions(),
Audit: genericoptions.NewAuditOptions(),
Features: genericoptions.NewFeatureOptions(),
CoreAPI: genericoptions.NewCoreAPIOptions(),
ServerRunOptions: genericoptions.NewServerRunOptions(),
}

errors = append(errors, o.CoreAPI.Validate()...)
errors = append(errors, o.SecureServing.Validate()...)
errors = append(errors, o.Authentication.Validate()...)
errors = append(errors, o.Authorization.Validate()...)
errors = append(errors, o.Audit.Validate()...)
errors = append(errors, o.Features.Validate()...)
return errors
return o
}

// nolint
func (o *Options) Flags() cliflag.NamedFlagSets {
var fss cliflag.NamedFlagSets

genericfs := fss.FlagSet("generic")
genericfs.IntVar(&o.MaxRequestsInFlight, "max-requests-inflight", o.MaxRequestsInFlight, ""+
"Otherwise, this flag limits the maximum number of non-mutating requests in flight, or a zero value disables the limit completely.")
genericfs.IntVar(&o.MaxMutatingRequestsInFlight, "max-mutating-requests-inflight", o.MaxMutatingRequestsInFlight, ""+
"this flag limits the maximum number of mutating requests in flight, or a zero value disables the limit completely.")

globalcfs := fss.FlagSet("global")
globalcfs.Float32Var(&o.KubernetesOptions.QPS, "kube-qps", utils.DefaultKubeQPS, "QPS to use while talking with kube-apiserver.")
globalcfs.IntVar(&o.KubernetesOptions.Burst, "kube-burst", utils.DefaultKubeBurst, "Burst to use while talking with kube-apiserver.")
o.CoreAPI.AddFlags(globalcfs)
o.SecureServing.AddFlags(fss.FlagSet("secure serving"))
o.Authentication.AddFlags(fss.FlagSet("authentication"))
o.Authorization.AddFlags(fss.FlagSet("authorization"))
o.Audit.AddFlags(fss.FlagSet("auditing"))
o.Features.AddFlags(fss.FlagSet("features"))
logsapi.AddFlags(o.Logs, fss.FlagSet("logs"))

// o.Admission.AddFlags(fss.FlagSet("admission"))
// o.Traces.AddFlags(fss.FlagSet("traces"))

return fss
func (o *Options) Validate() error {
errs := []error{}
errs = append(errs, o.SecureServing.Validate()...)
errs = append(errs, o.Authentication.Validate()...)
errs = append(errs, o.Authorization.Validate()...)
errs = append(errs, o.Audit.Validate()...)
errs = append(errs, o.Features.Validate()...)
return utilerrors.NewAggregate(errs)
}

// nolint
Expand All @@ -126,52 +89,79 @@ func (o *Options) Config() (*proxy.Config, error) {
return nil, fmt.Errorf("error create self-signed certificates: %v", err)
}

// remove NamespaceLifecycle admission plugin explicitly
// current admission plugins: mutatingwebhook, validatingwebhook
o.Admission.DisablePlugins = append(o.Admission.DisablePlugins, lifecycle.PluginName)
// o.Admission.DisablePlugins = append(o.RecommendedOptions.Admission.DisablePlugins, lifecycle.PluginName)

genericConfig := genericapiserver.NewRecommendedConfig(proxy.Codecs)
// genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(openapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(apiserver.Scheme))
// genericConfig.OpenAPIConfig.Info.Title = openAPITitle
// genericConfig.OpenAPIConfig.Info.Version= openAPIVersion
genericConfig := genericserver.NewRecommendedConfig(proxyScheme.Codecs)
genericConfig.OpenAPIConfig = genericserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(scheme.Scheme))
genericConfig.OpenAPIConfig.Info.Title = utils.KosmosClusrerLinkRroxyComponentName
genericConfig.OpenAPIConfig.Info.Version = utils.ClusterLinkOpenAPIVersion

// support watch to LongRunningFunc
genericConfig.LongRunningFunc = func(r *http.Request, requestInfo *genericrequest.RequestInfo) bool {
return strings.Contains(r.RequestURI, "watch")
}

if err := o.genericOptionsApplyTo(genericConfig); err != nil {
if err := o.ApplyTo(genericConfig); err != nil {
return nil, err
}

restMapper, err := apiutil.NewDynamicRESTMapper(genericConfig.ClientConfig)
if err != nil {
klog.Errorf("Failed to create REST mapper: %v", err)
return nil, err
}
kosmosClient := kosmosclientset.NewForConfigOrDie(genericConfig.ClientConfig)
kosmosInformerFactory := informerfactory.NewSharedInformerFactory(kosmosClient, 0)

dynamicClient, err := dynamic.NewForConfig(genericConfig.ClientConfig)
if err != nil {
log.Fatal(err)
}

proxyCtl, err := proxyctl.NewResourceCacheController(proxyctl.NewControllerOption{
RestConfig: genericConfig.ClientConfig,
RestMapper: restMapper,
KosmosFactory: kosmosInformerFactory,
DynamicClient: dynamicClient,
})
if err != nil {
return nil, err
}

return &proxy.Config{
GenericConfig: genericConfig,
ExtraConfig: proxy.ExtraConfig{
ProxyController: proxyCtl,
KosmosInformerFactory: kosmosInformerFactory,
},
}, nil
}

func (o *Options) genericOptionsApplyTo(config *genericapiserver.RecommendedConfig) error {
config.MaxRequestsInFlight = o.MaxRequestsInFlight
config.MaxMutatingRequestsInFlight = o.MaxMutatingRequestsInFlight

if err := o.SecureServing.ApplyTo(&config.SecureServing, &config.LoopbackClientConfig); err != nil {
func (o *Options) ApplyTo(config *genericserver.RecommendedConfig) error {
if err := o.SecureServing.ApplyTo(&config.Config.SecureServing, &config.Config.LoopbackClientConfig); err != nil {
return err
}
if err := o.Authentication.ApplyTo(&config.Authentication, config.SecureServing, config.OpenAPIConfig); err != nil {
if err := o.Authentication.ApplyTo(&config.Config.Authentication, config.SecureServing, config.OpenAPIConfig); err != nil {
return err
}
if err := o.Authorization.ApplyTo(&config.Authorization); err != nil {
if err := o.Authorization.ApplyTo(&config.Config.Authorization); err != nil {
return err
}
if err := o.Audit.ApplyTo(&config.Config); err != nil {
return err
}

if err := o.Features.ApplyTo(&config.Config); err != nil {
return err
}
if err := o.CoreAPI.ApplyTo(config); err != nil {
return err
}

utils.SetQPSBurst(config.ClientConfig, o.KubernetesOptions)
return o.Admission.ApplyTo(&config.Config, config.SharedInformerFactory, config.ClientConfig, o.FeatureGate)
if err := o.ServerRunOptions.ApplyTo(&config.Config); err != nil {
return err
}
if err := o.Features.ApplyTo(&config.Config); err != nil {
return err
}
return nil
}
Loading

0 comments on commit 642c918

Please sign in to comment.