Skip to content

Commit

Permalink
Add new v1 config helpers for storage setup machinery
Browse files Browse the repository at this point in the history
  • Loading branch information
kralicky committed Nov 21, 2023
1 parent c24ba68 commit b8e3360
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 82 deletions.
156 changes: 156 additions & 0 deletions pkg/config/v1/extensions.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package configv1

import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
errors "errors"
"fmt"
"os"

cli "github.com/rancher/opni/internal/codegen/cli"
Expand Down Expand Up @@ -30,3 +35,154 @@ func init() {
BuildGatewayConfigResetDefaultConfigurationCmd(),
))
}

func (m *MTLSSpec) AsTlsConfig() (*tls.Config, error) {
var serverCaData, clientCaData, clientCertData, clientKeyData []byte
var err error
switch {
case m.ClientCA != nil:
clientCaData, err = os.ReadFile(m.GetClientCA())
if err != nil {
return nil, fmt.Errorf("failed to read client CA: %w", err)
}
case m.ClientCAData != nil:
clientCaData = []byte(m.GetClientCAData())
}

switch {
case m.ServerCA != nil:
serverCaData, err = os.ReadFile(m.GetServerCA())
if err != nil {
return nil, fmt.Errorf("failed to read server CA: %w", err)
}
case m.ServerCAData != nil:
serverCaData = []byte(m.GetServerCAData())
default:
return nil, errors.New("no server CA configured")
}

switch {
case m.ClientCert != nil:
clientCertData, err = os.ReadFile(m.GetClientCert())
if err != nil {
return nil, fmt.Errorf("failed to read client cert: %w", err)
}
case m.ClientCertData != nil:
clientCertData = []byte(m.GetClientCertData())
default:
return nil, errors.New("no client cert configured")
}

switch {
case m.ClientKey != nil:
clientKeyData, err = os.ReadFile(m.GetClientKey())
if err != nil {
return nil, fmt.Errorf("failed to read client key: %w", err)
}
case m.ClientKeyData != nil:
clientKeyData = []byte(m.GetClientKeyData())
default:
return nil, errors.New("no client key configured")
}

clientCert, err := tls.X509KeyPair(clientCertData, clientKeyData)
if err != nil {
return nil, err
}

var clientCAPool *x509.CertPool
if len(clientCaData) > 0 {
clientCAPool = x509.NewCertPool()
clientCAPool.AppendCertsFromPEM(clientCaData)
}

serverCAPool := x509.NewCertPool()
serverCAPool.AppendCertsFromPEM(serverCaData)

return &tls.Config{
MinVersion: tls.VersionTLS13,
Certificates: []tls.Certificate{clientCert},
ClientCAs: clientCAPool,
RootCAs: serverCAPool,
}, nil
}

func (c *CertsSpec) AsTlsConfig(clientAuth tls.ClientAuthType) (*tls.Config, error) {
var caCertData, servingCertData, servingKeyData []byte
switch {
case c.CaCert != nil:
data, err := os.ReadFile(c.GetCaCert())
if err != nil {
return nil, fmt.Errorf("failed to load CA cert: %w", err)
}
caCertData = data
case c.CaCertData != nil:
caCertData = []byte(c.GetCaCertData())
default:
return nil, errors.New("no CA cert configured")
}
switch {
case c.ServingCert != nil:
data, err := os.ReadFile(c.GetServingCert())
if err != nil {
return nil, fmt.Errorf("failed to load serving cert: %w", err)
}
servingCertData = data
case c.ServingCertData != nil:
servingCertData = []byte(c.GetServingCertData())
default:
return nil, errors.New("no serving cert configured")
}
switch {
case c.ServingKey != nil:
data, err := os.ReadFile(c.GetServingKey())
if err != nil {
return nil, fmt.Errorf("failed to load serving key: %w", err)
}
servingKeyData = data
case c.ServingKeyData != nil:
servingKeyData = []byte(c.GetServingKeyData())
default:
return nil, errors.New("no serving key configured")
}

var block *pem.Block
block, _ = pem.Decode(caCertData)
if block == nil {
return nil, errors.New("failed to decode CA cert PEM data")
}
rootCA, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return nil, fmt.Errorf("failed to parse CA cert: %w", err)
}
servingCert, err := tls.X509KeyPair(servingCertData, servingKeyData)
if err != nil {
return nil, fmt.Errorf("failed to load TLS certificate: %w", err)
}
servingRootData := servingCert.Certificate[len(servingCert.Certificate)-1]
servingRoot, err := x509.ParseCertificate(servingRootData)
if err != nil {
return nil, fmt.Errorf("failed to parse serving root certificate: %w", err)
}
if !rootCA.Equal(servingRoot) {
servingCert.Certificate = append(servingCert.Certificate, rootCA.Raw)
}
caPool := x509.NewCertPool()
caPool.AddCert(rootCA)

if clientAuth == tls.NoClientCert {
return &tls.Config{
MinVersion: tls.VersionTLS12,
Certificates: []tls.Certificate{servingCert},
RootCAs: caPool,
}, nil
} else {
return &tls.Config{
MinVersion: tls.VersionTLS12,
Certificates: []tls.Certificate{servingCert},
RootCAs: caPool,
ClientCAs: caPool,
ClientAuth: clientAuth,
}, nil
}
}
32 changes: 32 additions & 0 deletions pkg/machinery/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"

configv1 "github.com/rancher/opni/pkg/config/v1"
"github.com/rancher/opni/pkg/config/v1beta1"
"github.com/rancher/opni/pkg/storage"
)
Expand Down Expand Up @@ -46,3 +47,34 @@ func ConfigureStorageBackend(ctx context.Context, cfg *v1beta1.StorageSpec) (sto
storageBackend.Use(store)
return storageBackend, nil
}

func ConfigureStorageBackendV1(ctx context.Context, cfg *configv1.StorageSpec) (storage.Backend, error) {
storageBackend := storage.CompositeBackend{}
builder := storage.GetStoreBuilder(cfg.GetBackend().String())
if builder == nil {
return nil, fmt.Errorf("unknown storage backend %s", cfg.GetBackend().String())
}
var store any
var err error
switch cfg.GetBackend() {
case configv1.StorageBackend_Etcd:
options := cfg.Etcd
if options == nil {
return nil, errors.New("etcd storage options are not set")
}
store, err = builder(ctx, cfg.Etcd, "gateway")
case configv1.StorageBackend_JetStream:
options := cfg.JetStream
if options == nil {
return nil, errors.New("jetstream storage options are not set")
}
store, err = builder(ctx, options)
default:
return nil, errors.New("unknown storage type")
}
if err != nil {
return nil, err
}
storageBackend.Use(store)
return storageBackend, nil
}
4 changes: 4 additions & 0 deletions pkg/plugins/driverutil/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func (ct *DefaultingConfigTracker[T]) newDefaultSpec() (t T) {
return t
}

func (ct *DefaultingConfigTracker[T]) ActiveStore() storage.ValueStoreT[T] {
return ct.activeStore
}

// Gets the default config if one has been set, otherwise returns a new default
// config as defined by the type.
func (ct *DefaultingConfigTracker[T]) GetDefaultConfig(ctx context.Context, atRevision ...*corev1.Revision) (T, error) {
Expand Down
23 changes: 19 additions & 4 deletions pkg/storage/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
"go.uber.org/zap"

corev1 "github.com/rancher/opni/pkg/apis/core/v1"
configv1 "github.com/rancher/opni/pkg/config/v1"
"github.com/rancher/opni/pkg/config/v1beta1"
"github.com/rancher/opni/pkg/logger"
"github.com/rancher/opni/pkg/storage"
"github.com/rancher/opni/pkg/util"
)

var (
Expand Down Expand Up @@ -64,14 +64,14 @@ func WithPrefix(prefix string) EtcdStoreOption {
}
}

func NewEtcdStore(ctx context.Context, conf *v1beta1.EtcdStorageSpec, opts ...EtcdStoreOption) (*EtcdStore, error) {
func NewEtcdStore(ctx context.Context, conf *configv1.EtcdSpec, opts ...EtcdStoreOption) (*EtcdStore, error) {
options := EtcdStoreOptions{}
options.apply(opts...)
lg := logger.New(logger.WithLogLevel(slog.LevelWarn)).WithGroup("etcd")
var tlsConfig *tls.Config
if conf.Certs != nil {
var err error
tlsConfig, err = util.LoadClientMTLSConfig(*conf.Certs)
tlsConfig, err = conf.Certs.AsTlsConfig()
if err != nil {
return nil, fmt.Errorf("failed to load client TLS config: %w", err)
}
Expand Down Expand Up @@ -137,7 +137,22 @@ func (e *EtcdStore) LockManager(prefix string) storage.LockManager {
func init() {
storage.RegisterStoreBuilder(v1beta1.StorageTypeEtcd, func(args ...any) (any, error) {
ctx := args[0].(context.Context)
conf := args[1].(*v1beta1.EtcdStorageSpec)

var conf *configv1.EtcdSpec
switch spec := args[1].(type) {
case *v1beta1.EtcdStorageSpec:
conf = &configv1.EtcdSpec{
Endpoints: spec.Endpoints,
Certs: &configv1.MTLSSpec{
ServerCA: &spec.Certs.ServerCA,
ClientCA: &spec.Certs.ClientCA,
ClientCert: &spec.Certs.ClientCert,
ClientKey: &spec.Certs.ClientKey,
},
}
case *configv1.EtcdSpec:
conf = spec
}

var opts []EtcdStoreOption
for _, arg := range args[2:] {
Expand Down
19 changes: 15 additions & 4 deletions pkg/storage/jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/lestrrat-go/backoff/v2"
"github.com/nats-io/nats.go"
corev1 "github.com/rancher/opni/pkg/apis/core/v1"
configv1 "github.com/rancher/opni/pkg/config/v1"
"github.com/rancher/opni/pkg/config/v1beta1"
"github.com/rancher/opni/pkg/logger"
"github.com/rancher/opni/pkg/storage"
Expand Down Expand Up @@ -71,19 +72,19 @@ func WithBucketPrefix(prefix string) JetStreamStoreOption {
}
}

func NewJetStreamStore(ctx context.Context, conf *v1beta1.JetStreamStorageSpec, opts ...JetStreamStoreOption) (*JetStreamStore, error) {
func NewJetStreamStore(ctx context.Context, conf *configv1.JetStreamSpec, opts ...JetStreamStoreOption) (*JetStreamStore, error) {
options := JetStreamStoreOptions{
BucketPrefix: "gateway",
}
options.apply(opts...)

lg := logger.New(logger.WithLogLevel(slog.LevelWarn)).WithGroup("jetstream")

nkeyOpt, err := nats.NkeyOptionFromSeed(conf.NkeySeedPath)
nkeyOpt, err := nats.NkeyOptionFromSeed(conf.GetNkeySeedPath())
if err != nil {
return nil, err
}
nc, err := nats.Connect(conf.Endpoint,
nc, err := nats.Connect(conf.GetEndpoint(),
nkeyOpt,
nats.MaxReconnects(-1),
nats.RetryOnFailedConnect(true),
Expand Down Expand Up @@ -264,7 +265,17 @@ func jetstreamGrpcError(err error) error {
func init() {
storage.RegisterStoreBuilder(v1beta1.StorageTypeJetStream, func(args ...any) (any, error) {
ctx := args[0].(context.Context)
conf := args[1].(*v1beta1.JetStreamStorageSpec)

var conf *configv1.JetStreamSpec
switch spec := args[1].(type) {
case *v1beta1.JetStreamStorageSpec:
conf = &configv1.JetStreamSpec{
Endpoint: &spec.Endpoint,
NkeySeedPath: &spec.NkeySeedPath,
}
case *configv1.JetStreamSpec:
conf = spec
}

var opts []JetStreamStoreOption
for _, arg := range args[2:] {
Expand Down
13 changes: 7 additions & 6 deletions pkg/test/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/rancher/opni/pkg/clients"
"github.com/rancher/opni/pkg/config"
"github.com/rancher/opni/pkg/config/meta"
configv1 "github.com/rancher/opni/pkg/config/v1"
"github.com/rancher/opni/pkg/config/v1beta1"
"github.com/rancher/opni/pkg/gateway"
"github.com/rancher/opni/pkg/ident"
Expand Down Expand Up @@ -2395,22 +2396,22 @@ func (e *Environment) EtcdClient() (*clientv3.Client, error) {
})
}

func (e *Environment) EtcdConfig() *v1beta1.EtcdStorageSpec {
func (e *Environment) EtcdConfig() *configv1.EtcdSpec {
if !e.enableEtcd {
panic("etcd disabled")
}
return &v1beta1.EtcdStorageSpec{
return &configv1.EtcdSpec{
Endpoints: []string{fmt.Sprintf("http://localhost:%d", e.ports.Etcd)},
}
}

func (e *Environment) JetStreamConfig() *v1beta1.JetStreamStorageSpec {
func (e *Environment) JetStreamConfig() *configv1.JetStreamSpec {
if !e.enableJetstream {
panic("JetStream disabled")
}
return &v1beta1.JetStreamStorageSpec{
Endpoint: fmt.Sprintf("http://localhost:%d", e.ports.Jetstream),
NkeySeedPath: path.Join(e.tempDir, "jetstream", "seed", "nats-auth.conf"),
return &configv1.JetStreamSpec{
Endpoint: lo.ToPtr(fmt.Sprintf("http://localhost:%d", e.ports.Jetstream)),
NkeySeedPath: lo.ToPtr(path.Join(e.tempDir, "jetstream", "seed", "nats-auth.conf")),
}
}

Expand Down
Loading

0 comments on commit b8e3360

Please sign in to comment.