-
Notifications
You must be signed in to change notification settings - Fork 57
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Initial gateway config manager implementation and tests
- Loading branch information
Showing
24 changed files
with
3,587 additions
and
1,091 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
package adapt | ||
|
||
import ( | ||
configv1 "github.com/rancher/opni/pkg/config/v1" | ||
"github.com/rancher/opni/pkg/config/v1beta1" | ||
"github.com/samber/lo" | ||
) | ||
|
||
func V1GatewayConfigOf[T *v1beta1.GatewayConfig | *configv1.GatewayConfigSpec](in T) *configv1.GatewayConfigSpec { | ||
switch in := any(in).(type) { | ||
case *configv1.GatewayConfigSpec: | ||
return in | ||
case *v1beta1.GatewayConfig: | ||
return &configv1.GatewayConfigSpec{ | ||
Server: &configv1.ServerSpec{ | ||
HttpListenAddress: &in.Spec.HTTPListenAddress, | ||
GrpcListenAddress: &in.Spec.GRPCListenAddress, | ||
}, | ||
Management: &configv1.ManagementServerSpec{ | ||
HttpListenAddress: lo.ToPtr(in.Spec.Management.GetHTTPListenAddress()), | ||
GrpcListenAddress: lo.ToPtr(in.Spec.Management.GetGRPCListenAddress()), | ||
}, | ||
Relay: &configv1.RelayServerSpec{ | ||
GrpcListenAddress: &in.Spec.Management.RelayListenAddress, | ||
AdvertiseAddress: &in.Spec.Management.RelayAdvertiseAddress, | ||
}, | ||
Health: &configv1.HealthServerSpec{ | ||
HttpListenAddress: &in.Spec.HTTPListenAddress, | ||
}, | ||
Dashboard: &configv1.DashboardServerSpec{ | ||
HttpListenAddress: &in.Spec.Management.WebListenAddress, | ||
Hostname: &in.Spec.Hostname, | ||
AdvertiseAddress: &in.Spec.Management.WebAdvertiseAddress, | ||
TrustedProxies: in.Spec.TrustedProxies, | ||
}, | ||
Storage: &configv1.StorageSpec{ | ||
Backend: func() *configv1.StorageBackend { | ||
switch in.Spec.Storage.Type { | ||
case v1beta1.StorageTypeEtcd: | ||
fallthrough | ||
default: | ||
return configv1.StorageBackend_Etcd.Enum() | ||
case v1beta1.StorageTypeJetStream: | ||
return configv1.StorageBackend_JetStream.Enum() | ||
} | ||
}(), | ||
Etcd: func() *configv1.EtcdSpec { | ||
if in.Spec.Storage.Etcd == nil { | ||
return nil | ||
} | ||
return &configv1.EtcdSpec{ | ||
Endpoints: in.Spec.Storage.Etcd.Endpoints, | ||
Certs: func() *configv1.MTLSSpec { | ||
if in.Spec.Storage.Etcd.Certs == nil { | ||
return nil | ||
} | ||
return &configv1.MTLSSpec{ | ||
ServerCA: &in.Spec.Storage.Etcd.Certs.ServerCA, | ||
ClientCA: &in.Spec.Storage.Etcd.Certs.ClientCA, | ||
ClientCert: &in.Spec.Storage.Etcd.Certs.ClientCert, | ||
ClientKey: &in.Spec.Storage.Etcd.Certs.ClientKey, | ||
} | ||
}(), | ||
} | ||
}(), | ||
JetStream: func() *configv1.JetStreamSpec { | ||
if in.Spec.Storage.JetStream == nil { | ||
return nil | ||
} | ||
return &configv1.JetStreamSpec{ | ||
Endpoint: &in.Spec.Storage.JetStream.Endpoint, | ||
NkeySeedPath: &in.Spec.Storage.JetStream.NkeySeedPath, | ||
} | ||
}(), | ||
}, | ||
Certs: &configv1.CertsSpec{ | ||
CaCert: in.Spec.Certs.CACert, | ||
CaCertData: lo.Ternary(len(in.Spec.Certs.CACertData) > 0, lo.ToPtr(string(in.Spec.Certs.CACertData)), nil), | ||
ServingCert: in.Spec.Certs.ServingCert, | ||
ServingCertData: lo.Ternary(len(in.Spec.Certs.ServingCertData) > 0, lo.ToPtr(string(in.Spec.Certs.ServingCertData)), nil), | ||
ServingKey: in.Spec.Certs.ServingKey, | ||
ServingKeyData: lo.Ternary(len(in.Spec.Certs.ServingKeyData) > 0, lo.ToPtr(string(in.Spec.Certs.ServingKeyData)), nil), | ||
}, | ||
Plugins: &configv1.PluginsSpec{ | ||
Dir: &in.Spec.Plugins.Dir, | ||
Cache: &configv1.CacheSpec{ | ||
Backend: configv1.CacheBackend_Filesystem.Enum(), | ||
Filesystem: &configv1.FilesystemCacheSpec{ | ||
Dir: &in.Spec.Plugins.Binary.Cache.Filesystem.Dir, | ||
}, | ||
}, | ||
}, | ||
Keyring: &configv1.KeyringSpec{ | ||
RuntimeKeyDirs: in.Spec.Keyring.EphemeralKeyDirs, | ||
}, | ||
Upgrades: &configv1.UpgradesSpec{ | ||
Agents: &configv1.AgentUpgradesSpec{ | ||
Driver: func() *configv1.AgentUpgradesSpec_Driver { | ||
switch in.Spec.AgentUpgrades.Kubernetes.ImageResolver { | ||
case v1beta1.ImageResolverNoop: | ||
fallthrough | ||
default: | ||
return configv1.AgentUpgradesSpec_Noop.Enum() | ||
case v1beta1.ImageResolverKubernetes: | ||
return configv1.AgentUpgradesSpec_Kubernetes.Enum() | ||
} | ||
}(), | ||
Kubernetes: &configv1.KubernetesAgentUpgradeSpec{ | ||
ImageResolver: func() *configv1.KubernetesAgentUpgradeSpec_ImageResolver { | ||
switch in.Spec.AgentUpgrades.Kubernetes.ImageResolver { | ||
case v1beta1.ImageResolverNoop: | ||
fallthrough | ||
default: | ||
return configv1.KubernetesAgentUpgradeSpec_Noop.Enum() | ||
case v1beta1.ImageResolverKubernetes: | ||
return configv1.KubernetesAgentUpgradeSpec_Kubernetes.Enum() | ||
} | ||
}(), | ||
}, | ||
}, | ||
Plugins: &configv1.PluginUpgradesSpec{ | ||
Driver: configv1.PluginUpgradesSpec_Binary.Enum(), | ||
Binary: &configv1.BinaryPluginUpgradeSpec{ | ||
PatchEngine: func() *configv1.PatchEngine { | ||
switch in.Spec.Plugins.Binary.Cache.PatchEngine { | ||
case v1beta1.PatchEngineBsdiff: | ||
return configv1.PatchEngine_Bsdiff.Enum() | ||
case v1beta1.PatchEngineZstd: | ||
fallthrough | ||
default: | ||
return configv1.PatchEngine_Zstd.Enum() | ||
} | ||
}(), | ||
}, | ||
}, | ||
}, | ||
RateLimiting: func() *configv1.RateLimitingSpec { | ||
if in.Spec.RateLimit == nil { | ||
return nil | ||
} | ||
return &configv1.RateLimitingSpec{ | ||
Rate: &in.Spec.RateLimit.Rate, | ||
Burst: func() *int32 { | ||
burst := int32(in.Spec.RateLimit.Burst) | ||
return &burst | ||
}(), | ||
} | ||
}(), | ||
} | ||
} | ||
panic("unreachable") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
package reactive | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
|
||
gsync "github.com/kralicky/gpkg/sync" | ||
|
||
"google.golang.org/protobuf/reflect/protoreflect" | ||
) | ||
|
||
// Bind groups multiple reactive.Value instances together, de-duplicating | ||
// updates using the revision of the underlying config. | ||
// | ||
// The callback is invoked when one or more reactive.Values change, | ||
// and is passed the current or updated value of each reactive value, in the | ||
// order they were passed to Bind. Values that have never been set will be | ||
// invalid (protoreflect.Value.IsValid() returns false). | ||
// | ||
// For partial updates, the values passed to the callback will be either the | ||
// updated value or the current value, depending on whether the value was | ||
// updated in the current revision. | ||
// | ||
// The callback is guaranteed to be invoked exactly once for a single change | ||
// to the active config, even if multiple values in the group change at the | ||
// same time. The values must all be created from the same controller, | ||
// otherwise the behavior is undefined. | ||
func Bind(ctx context.Context, callback func([]protoreflect.Value), reactiveValues ...Value) { | ||
b := &binder{ | ||
reactiveValues: reactiveValues, | ||
callback: callback, | ||
} | ||
for i, rv := range reactiveValues { | ||
i := i | ||
rv.watchFuncWithRev(ctx, func(rev int64, v protoreflect.Value) { | ||
b.onUpdate(i, rev, v) | ||
}) | ||
} | ||
} | ||
|
||
type binder struct { | ||
callback func([]protoreflect.Value) | ||
reactiveValues []Value | ||
queues gsync.Map[int64, *queuedUpdate] | ||
} | ||
|
||
type queuedUpdate struct { | ||
lazyInit sync.Once | ||
values []protoreflect.Value | ||
resolve sync.Once | ||
} | ||
|
||
func (q *queuedUpdate) doLazyInit(size int) { | ||
q.lazyInit.Do(func() { | ||
q.values = make([]protoreflect.Value, size) | ||
}) | ||
} | ||
|
||
func (b *binder) onUpdate(i int, rev int64, v protoreflect.Value) { | ||
q, _ := b.queues.LoadOrStore(rev, &queuedUpdate{}) | ||
q.doLazyInit(len(b.reactiveValues)) | ||
// this *must* happen synchronously, since the group channel is closed | ||
// once all callbacks have returned. | ||
q.values[i] = v | ||
|
||
go func() { | ||
b.reactiveValues[i].wait() | ||
q.resolve.Do(func() { | ||
b.queues.Delete(rev) | ||
b.doResolve(q) | ||
}) | ||
}() | ||
} | ||
|
||
func (b *binder) doResolve(q *queuedUpdate) { | ||
for i, v := range q.values { | ||
if !v.IsValid() { | ||
q.values[i] = b.reactiveValues[i].Value() | ||
} | ||
} | ||
b.callback(q.values) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
package reactive_test | ||
|
||
import ( | ||
"context" | ||
|
||
. "github.com/onsi/ginkgo/v2" | ||
. "github.com/onsi/gomega" | ||
"google.golang.org/protobuf/reflect/protoreflect" | ||
|
||
"github.com/rancher/opni/pkg/config/reactive" | ||
"github.com/rancher/opni/pkg/plugins/driverutil" | ||
"github.com/rancher/opni/pkg/storage" | ||
"github.com/rancher/opni/pkg/storage/inmemory" | ||
"github.com/rancher/opni/pkg/test/testdata/plugins/ext" | ||
"github.com/rancher/opni/pkg/util" | ||
"github.com/rancher/opni/pkg/util/flagutil" | ||
) | ||
|
||
var _ = Describe("Bind", Label("unit"), func() { | ||
var ctrl *reactive.Controller[*ext.SampleConfiguration] | ||
var defaultStore, activeStore storage.ValueStoreT[*ext.SampleConfiguration] | ||
|
||
BeforeEach(func() { | ||
defaultStore = inmemory.NewValueStore[*ext.SampleConfiguration](util.ProtoClone) | ||
activeStore = inmemory.NewValueStore[*ext.SampleConfiguration](util.ProtoClone) | ||
ctrl = reactive.NewController(driverutil.NewDefaultingConfigTracker(defaultStore, activeStore, flagutil.LoadDefaults)) | ||
ctx, ca := context.WithCancel(context.Background()) | ||
Expect(ctrl.Start(ctx)).To(Succeed()) | ||
DeferCleanup(ca) | ||
}) | ||
|
||
It("should bind reactive values", func(ctx SpecContext) { | ||
called := make(chan struct{}) | ||
reactive.Bind(ctx, | ||
func(v []protoreflect.Value) { | ||
defer close(called) | ||
Expect(v).To(HaveLen(6)) | ||
Expect(v[0].Int()).To(Equal(int64(100))) | ||
Expect(v[1].Int()).To(Equal(int64(200))) | ||
Expect(v[2].Int()).To(Equal(int64(300))) | ||
Expect(v[3].Int()).To(Equal(int64(400))) | ||
Expect(v[4].Int()).To(Equal(int64(500))) | ||
Expect(v[5].Int()).To(Equal(int64(600))) | ||
}, | ||
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field1()), | ||
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field2()), | ||
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field3()), | ||
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field4()), | ||
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field5()), | ||
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field6()), | ||
) | ||
|
||
Expect(activeStore.Put(ctx, &ext.SampleConfiguration{ | ||
MessageField: &ext.SampleMessage{ | ||
Field6: &ext.Sample6FieldMsg{ | ||
Field1: 100, | ||
Field2: 200, | ||
Field3: 300, | ||
Field4: 400, | ||
Field5: 500, | ||
Field6: 600, | ||
}, | ||
}, | ||
})).To(Succeed()) | ||
|
||
Eventually(called).Should(BeClosed()) | ||
// ensure no more updates are received | ||
Consistently(called).Should(BeClosed()) | ||
}) | ||
|
||
It("should handle partial updates", func(ctx SpecContext) { | ||
callback := new(func(v []protoreflect.Value)) | ||
reactive.Bind(ctx, | ||
func(v []protoreflect.Value) { | ||
(*callback)(v) | ||
}, | ||
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field1()), | ||
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field2()), | ||
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field3()), | ||
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field4()), | ||
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field5()), | ||
ctrl.Reactive((&ext.SampleConfiguration{}).ProtoPath().MessageField().Field6().Field6()), | ||
) | ||
|
||
called := make(chan struct{}) | ||
*callback = func(v []protoreflect.Value) { | ||
defer close(called) | ||
Expect(v).To(HaveLen(6)) | ||
Expect(v[0].Int()).To(Equal(int64(100))) | ||
Expect(v[1].Int()).To(Equal(int64(200))) | ||
Expect(v[2].Int()).To(Equal(int64(300))) | ||
Expect(v[3].Int()).To(Equal(int64(400))) | ||
Expect(v[4].Int()).To(Equal(int64(500))) | ||
Expect(v[5].Int()).To(Equal(int64(600))) | ||
} | ||
|
||
Expect(activeStore.Put(ctx, &ext.SampleConfiguration{ | ||
MessageField: &ext.SampleMessage{ | ||
Field6: &ext.Sample6FieldMsg{ | ||
Field1: 100, | ||
Field2: 200, | ||
Field3: 300, | ||
Field4: 400, | ||
Field5: 500, | ||
Field6: 600, | ||
}, | ||
}, | ||
})).To(Succeed()) | ||
|
||
Eventually(called).Should(BeClosed()) | ||
Consistently(called).Should(BeClosed()) | ||
|
||
called = make(chan struct{}) | ||
*callback = func(v []protoreflect.Value) { | ||
defer close(called) | ||
Expect(v).To(HaveLen(6)) | ||
Expect(v[0].Int()).To(Equal(int64(1000))) | ||
Expect(v[1].Int()).To(Equal(int64(2000))) | ||
Expect(v[2].Int()).To(Equal(int64(3000))) | ||
Expect(v[3].Int()).To(Equal(int64(400))) | ||
Expect(v[4].Int()).To(Equal(int64(500))) | ||
Expect(v[5].Int()).To(Equal(int64(600))) | ||
} | ||
|
||
Expect(activeStore.Put(ctx, &ext.SampleConfiguration{ | ||
MessageField: &ext.SampleMessage{ | ||
Field6: &ext.Sample6FieldMsg{ | ||
Field1: 1000, | ||
Field2: 2000, | ||
Field3: 3000, | ||
Field4: 400, | ||
Field5: 500, | ||
Field6: 600, | ||
}, | ||
}, | ||
})).To(Succeed()) | ||
|
||
Eventually(called).Should(BeClosed()) | ||
Consistently(called).Should(BeClosed()) | ||
}) | ||
}) |
Oops, something went wrong.