Skip to content

Commit

Permalink
Initial gateway config manager implementation and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kralicky committed Nov 21, 2023
1 parent bc973a3 commit c24ba68
Show file tree
Hide file tree
Showing 25 changed files with 3,568 additions and 1,083 deletions.
152 changes: 152 additions & 0 deletions pkg/config/adapt/conversion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package adapt

import (
configv1 "github.com/rancher/opni/pkg/config/v1"
"github.com/rancher/opni/pkg/config/v1beta1"
"github.com/samber/lo"
)

func V1GatewayConfigOf[T *v1beta1.GatewayConfig | *configv1.GatewayConfigSpec](in T) *configv1.GatewayConfigSpec {
switch in := any(in).(type) {
case *configv1.GatewayConfigSpec:
return in
case *v1beta1.GatewayConfig:
return &configv1.GatewayConfigSpec{
Server: &configv1.ServerSpec{
HttpListenAddress: &in.Spec.HTTPListenAddress,
GrpcListenAddress: &in.Spec.GRPCListenAddress,
},
Management: &configv1.ManagementServerSpec{
HttpListenAddress: lo.ToPtr(in.Spec.Management.GetHTTPListenAddress()),
GrpcListenAddress: lo.ToPtr(in.Spec.Management.GetGRPCListenAddress()),
},
Relay: &configv1.RelayServerSpec{
GrpcListenAddress: &in.Spec.Management.RelayListenAddress,
AdvertiseAddress: &in.Spec.Management.RelayAdvertiseAddress,
},
Health: &configv1.HealthServerSpec{
HttpListenAddress: &in.Spec.HTTPListenAddress,
},
Dashboard: &configv1.DashboardServerSpec{
HttpListenAddress: &in.Spec.Management.WebListenAddress,
Hostname: &in.Spec.Hostname,
AdvertiseAddress: &in.Spec.Management.WebAdvertiseAddress,
TrustedProxies: in.Spec.TrustedProxies,
},
Storage: &configv1.StorageSpec{
Backend: func() *configv1.StorageBackend {
switch in.Spec.Storage.Type {
case v1beta1.StorageTypeEtcd:
fallthrough
default:
return configv1.StorageBackend_Etcd.Enum()
case v1beta1.StorageTypeJetStream:
return configv1.StorageBackend_JetStream.Enum()
}
}(),
Etcd: func() *configv1.EtcdSpec {
if in.Spec.Storage.Etcd == nil {
return nil
}
return &configv1.EtcdSpec{
Endpoints: in.Spec.Storage.Etcd.Endpoints,
Certs: func() *configv1.MTLSSpec {
if in.Spec.Storage.Etcd.Certs == nil {
return nil
}
return &configv1.MTLSSpec{
ServerCA: &in.Spec.Storage.Etcd.Certs.ServerCA,
ClientCA: &in.Spec.Storage.Etcd.Certs.ClientCA,
ClientCert: &in.Spec.Storage.Etcd.Certs.ClientCert,
ClientKey: &in.Spec.Storage.Etcd.Certs.ClientKey,
}
}(),
}
}(),
JetStream: func() *configv1.JetStreamSpec {
if in.Spec.Storage.JetStream == nil {
return nil
}
return &configv1.JetStreamSpec{
Endpoint: &in.Spec.Storage.JetStream.Endpoint,
NkeySeedPath: &in.Spec.Storage.JetStream.NkeySeedPath,
}
}(),
},
Certs: &configv1.CertsSpec{
CaCert: in.Spec.Certs.CACert,
CaCertData: lo.Ternary(len(in.Spec.Certs.CACertData) > 0, lo.ToPtr(string(in.Spec.Certs.CACertData)), nil),
ServingCert: in.Spec.Certs.ServingCert,
ServingCertData: lo.Ternary(len(in.Spec.Certs.ServingCertData) > 0, lo.ToPtr(string(in.Spec.Certs.ServingCertData)), nil),
ServingKey: in.Spec.Certs.ServingKey,
ServingKeyData: lo.Ternary(len(in.Spec.Certs.ServingKeyData) > 0, lo.ToPtr(string(in.Spec.Certs.ServingKeyData)), nil),
},
Plugins: &configv1.PluginsSpec{
Dir: &in.Spec.Plugins.Dir,
Cache: &configv1.CacheSpec{
Backend: configv1.CacheBackend_Filesystem.Enum(),
Filesystem: &configv1.FilesystemCacheSpec{
Dir: &in.Spec.Plugins.Binary.Cache.Filesystem.Dir,
},
},
},
Keyring: &configv1.KeyringSpec{
RuntimeKeyDirs: in.Spec.Keyring.EphemeralKeyDirs,
},
Upgrades: &configv1.UpgradesSpec{
Agents: &configv1.AgentUpgradesSpec{
Driver: func() *configv1.AgentUpgradesSpec_Driver {
switch in.Spec.AgentUpgrades.Kubernetes.ImageResolver {
case v1beta1.ImageResolverNoop:
fallthrough
default:
return configv1.AgentUpgradesSpec_Noop.Enum()
case v1beta1.ImageResolverKubernetes:
return configv1.AgentUpgradesSpec_Kubernetes.Enum()
}
}(),
Kubernetes: &configv1.KubernetesAgentUpgradeSpec{
ImageResolver: func() *configv1.KubernetesAgentUpgradeSpec_ImageResolver {
switch in.Spec.AgentUpgrades.Kubernetes.ImageResolver {
case v1beta1.ImageResolverNoop:
fallthrough
default:
return configv1.KubernetesAgentUpgradeSpec_Noop.Enum()
case v1beta1.ImageResolverKubernetes:
return configv1.KubernetesAgentUpgradeSpec_Kubernetes.Enum()
}
}(),
},
},
Plugins: &configv1.PluginUpgradesSpec{
Driver: configv1.PluginUpgradesSpec_Binary.Enum(),
Binary: &configv1.BinaryPluginUpgradeSpec{
PatchEngine: func() *configv1.PatchEngine {
switch in.Spec.Plugins.Binary.Cache.PatchEngine {
case v1beta1.PatchEngineBsdiff:
return configv1.PatchEngine_Bsdiff.Enum()
case v1beta1.PatchEngineZstd:
fallthrough
default:
return configv1.PatchEngine_Zstd.Enum()
}
}(),
},
},
},
RateLimiting: func() *configv1.RateLimitingSpec {
if in.Spec.RateLimit == nil {
return nil
}
return &configv1.RateLimitingSpec{
Rate: &in.Spec.RateLimit.Rate,
Burst: func() *int32 {
burst := int32(in.Spec.RateLimit.Burst)
return &burst
}(),
}
}(),
}
}
panic("unreachable")
}
82 changes: 82 additions & 0 deletions pkg/config/reactive/bind.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package reactive

import (
"context"
"sync"

gsync "github.com/kralicky/gpkg/sync"

"google.golang.org/protobuf/reflect/protoreflect"
)

// Bind groups multiple reactive.Value instances together, de-duplicating
// updates using the revision of the underlying config.
//
// The callback is invoked when one or more reactive.Values change,
// and is passed the current or updated value of each reactive value, in the
// order they were passed to Bind. Values that have never been set will be
// invalid (protoreflect.Value.IsValid() returns false).
//
// For partial updates, the values passed to the callback will be either the
// updated value or the current value, depending on whether the value was
// updated in the current revision.
//
// The callback is guaranteed to be invoked exactly once for a single change
// to the active config, even if multiple values in the group change at the
// same time. The values must all be created from the same controller,
// otherwise the behavior is undefined.
func Bind(ctx context.Context, callback func([]protoreflect.Value), reactiveValues ...Value) {
b := &binder{
reactiveValues: reactiveValues,
callback: callback,
}
for i, rv := range reactiveValues {
i := i
rv.watchFuncWithRev(ctx, func(rev int64, v protoreflect.Value) {
b.onUpdate(i, rev, v)
})
}
}

type binder struct {
callback func([]protoreflect.Value)
reactiveValues []Value
queues gsync.Map[int64, *queuedUpdate]
}

type queuedUpdate struct {
lazyInit sync.Once
values []protoreflect.Value
resolve sync.Once
}

func (q *queuedUpdate) doLazyInit(size int) {
q.lazyInit.Do(func() {
q.values = make([]protoreflect.Value, size)
})
}

func (b *binder) onUpdate(i int, rev int64, v protoreflect.Value) {
q, _ := b.queues.LoadOrStore(rev, &queuedUpdate{})
q.doLazyInit(len(b.reactiveValues))
// this *must* happen synchronously, since the group channel is closed
// once all callbacks have returned.
q.values[i] = v

go func() {
b.reactiveValues[i].wait()
q.resolve.Do(func() {
b.queues.Delete(rev)
b.doResolve(q)
})
}()
}

func (b *binder) doResolve(q *queuedUpdate) {
for i, v := range q.values {
if !v.IsValid() {
q.values[i] = b.reactiveValues[i].Value()
}
}
b.callback(q.values)
}
141 changes: 141 additions & 0 deletions pkg/config/reactive/bind_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package reactive_test

import (
"context"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"google.golang.org/protobuf/reflect/protoreflect"

"github.com/rancher/opni/pkg/config/reactive"
"github.com/rancher/opni/pkg/plugins/driverutil"
"github.com/rancher/opni/pkg/storage"
"github.com/rancher/opni/pkg/storage/inmemory"
"github.com/rancher/opni/pkg/test/testdata/plugins/ext"
"github.com/rancher/opni/pkg/util"
"github.com/rancher/opni/pkg/util/flagutil"
)

var _ = Describe("Bind", Label("unit"), func() {
var ctrl *reactive.Controller[*ext.SampleConfiguration]
var defaultStore, activeStore storage.ValueStoreT[*ext.SampleConfiguration]

BeforeEach(func() {
defaultStore = inmemory.NewValueStore[*ext.SampleConfiguration](util.ProtoClone)
activeStore = inmemory.NewValueStore[*ext.SampleConfiguration](util.ProtoClone)
ctrl = reactive.NewController(driverutil.NewDefaultingConfigTracker(defaultStore, activeStore, flagutil.LoadDefaults))
ctx, ca := context.WithCancel(context.Background())
Expect(ctrl.Start(ctx)).To(Succeed())
DeferCleanup(ca)
})

It("should bind reactive values", func(ctx SpecContext) {
called := make(chan struct{})
reactive.Bind(ctx,
func(v []protoreflect.Value) {
defer close(called)
Expect(v).To(HaveLen(6))
Expect(v[0].Int()).To(Equal(int64(100)))
Expect(v[1].Int()).To(Equal(int64(200)))
Expect(v[2].Int()).To(Equal(int64(300)))
Expect(v[3].Int()).To(Equal(int64(400)))
Expect(v[4].Int()).To(Equal(int64(500)))
Expect(v[5].Int()).To(Equal(int64(600)))
},
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field1()),
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field2()),
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field3()),
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field4()),
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field5()),
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field6()),
)

Expect(activeStore.Put(ctx, &ext.SampleConfiguration{
MessageField: &ext.SampleMessage{
Field6: &ext.Sample6FieldMsg{
Field1: 100,
Field2: 200,
Field3: 300,
Field4: 400,
Field5: 500,
Field6: 600,
},
},
})).To(Succeed())

Eventually(called).Should(BeClosed())
// ensure no more updates are received
Consistently(called).Should(BeClosed())
})

It("should handle partial updates", func(ctx SpecContext) {
callback := new(func(v []protoreflect.Value))
reactive.Bind(ctx,
func(v []protoreflect.Value) {
(*callback)(v)
},
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field1()),
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field2()),
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field3()),
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field4()),
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field5()),
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field6()),
)

called := make(chan struct{})
*callback = func(v []protoreflect.Value) {
defer close(called)
Expect(v).To(HaveLen(6))
Expect(v[0].Int()).To(Equal(int64(100)))
Expect(v[1].Int()).To(Equal(int64(200)))
Expect(v[2].Int()).To(Equal(int64(300)))
Expect(v[3].Int()).To(Equal(int64(400)))
Expect(v[4].Int()).To(Equal(int64(500)))
Expect(v[5].Int()).To(Equal(int64(600)))
}

Expect(activeStore.Put(ctx, &ext.SampleConfiguration{
MessageField: &ext.SampleMessage{
Field6: &ext.Sample6FieldMsg{
Field1: 100,
Field2: 200,
Field3: 300,
Field4: 400,
Field5: 500,
Field6: 600,
},
},
})).To(Succeed())

Eventually(called).Should(BeClosed())
Consistently(called).Should(BeClosed())

called = make(chan struct{})
*callback = func(v []protoreflect.Value) {
defer close(called)
Expect(v).To(HaveLen(6))
Expect(v[0].Int()).To(Equal(int64(1000)))
Expect(v[1].Int()).To(Equal(int64(2000)))
Expect(v[2].Int()).To(Equal(int64(3000)))
Expect(v[3].Int()).To(Equal(int64(400)))
Expect(v[4].Int()).To(Equal(int64(500)))
Expect(v[5].Int()).To(Equal(int64(600)))
}

Expect(activeStore.Put(ctx, &ext.SampleConfiguration{
MessageField: &ext.SampleMessage{
Field6: &ext.Sample6FieldMsg{
Field1: 1000,
Field2: 2000,
Field3: 3000,
Field4: 400,
Field5: 500,
Field6: 600,
},
},
})).To(Succeed())

Eventually(called).Should(BeClosed())
Consistently(called).Should(BeClosed())
})
})
Loading

0 comments on commit c24ba68

Please sign in to comment.