Skip to content

Commit

Permalink
k8s: Support user-provided CA certs
Browse files Browse the repository at this point in the history
  • Loading branch information
0x5d committed Mar 21, 2023
1 parent 35769bc commit 5c9f8af
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 24 deletions.
1 change: 1 addition & 0 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ type AdminAPITLS struct {

// PandaproxyAPITLS configures the TLS of the Pandaproxy API
//
// TODO: Don't generate CA if ClientCACertRef isn't nil.
// If Enabled is set to true, one-way TLS verification is enabled.
// In that case, a key pair ('tls.crt', 'tls.key') and CA certificate 'ca.crt'
// are generated and stored in a Secret named '<redpanda-cluster-name>-proxy-api-node'
Expand Down
1 change: 1 addition & 0 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,7 @@ func hasDifferentNodeSecret(listener1, listener2 KafkaAPITLS) bool {
listener1.NodeSecretRef.Name != listener2.NodeSecretRef.Name
}

// TODO: might need to change this.
func validateListener(
tlsEnabled, requireClientAuth bool,
issuerRef *cmmeta.ObjectReference,
Expand Down
155 changes: 131 additions & 24 deletions src/go/k8s/pkg/resources/certmanager/type_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ type apiCertificates struct {
selfSignedNodeCertificate bool

// CR allows to specify node certificate, if not provided this will be nil
externalNodeCertificate *corev1.ObjectReference
externalNodeCertificate *corev1.ObjectReference
externalClientCACertificate *corev1.ObjectReference

// all certificates need to exist in this namespace for mounting of secrets to work
clusterNamespace string
Expand Down Expand Up @@ -206,28 +207,28 @@ func NewClusterCertificates(
}
var err error
if kafkaListeners := kafkaAPIListeners(cluster); len(kafkaListeners) > 0 {
cc.kafkaAPI, err = cc.prepareAPI(ctx, kafkaAPI, RedpandaNodeCert, []string{OperatorClientCert, UserClientCert, AdminClientCert}, kafkaListeners, &keystoreSecret)
cc.kafkaAPI, err = cc.prepareAPI(ctx, kafkaAPI, RedpandaNodeCert, []string{OperatorClientCert, UserClientCert, AdminClientCert}, kafkaListeners, &keystoreSecret, cc.logger, "")
if err != nil {
return nil, fmt.Errorf("kafka api certificates %w", err)
}
}

if adminListeners := adminAPIListeners(cluster); len(adminListeners) > 0 {
cc.adminAPI, err = cc.prepareAPI(ctx, adminAPI, adminAPINodeCert, []string{adminAPIClientCert}, adminListeners, &keystoreSecret)
cc.adminAPI, err = cc.prepareAPI(ctx, adminAPI, adminAPINodeCert, []string{adminAPIClientCert}, adminListeners, &keystoreSecret, cc.logger, "")
if err != nil {
return nil, fmt.Errorf("admin api certificates %w", err)
}
}

if pandaProxyListeners := pandaProxyAPIListeners(cluster); len(pandaProxyListeners) > 0 {
cc.pandaProxyAPI, err = cc.prepareAPI(ctx, pandaproxyAPI, pandaproxyAPINodeCert, []string{pandaproxyAPIClientCert}, pandaProxyListeners, &keystoreSecret)
cc.pandaProxyAPI, err = cc.prepareAPI(ctx, pandaproxyAPI, pandaproxyAPINodeCert, []string{pandaproxyAPIClientCert}, pandaProxyListeners, &keystoreSecret, cc.logger, "pandaproxy")
if err != nil {
return nil, fmt.Errorf("panda proxy certificates %w", err)
}
}

if schemaRegistryListeners := schemaRegistryAPIListeners(cluster); len(schemaRegistryListeners) > 0 {
cc.schemaRegistryAPI, err = cc.prepareAPI(ctx, schemaRegistryAPI, schemaRegistryAPINodeCert, []string{schemaRegistryAPIClientCert}, schemaRegistryListeners, &keystoreSecret)
cc.schemaRegistryAPI, err = cc.prepareAPI(ctx, schemaRegistryAPI, schemaRegistryAPINodeCert, []string{schemaRegistryAPIClientCert}, schemaRegistryListeners, &keystoreSecret, cc.logger, "")
if err != nil {
return nil, fmt.Errorf("schema registry certificates %w", err)
}
Expand All @@ -243,6 +244,8 @@ func (cc *ClusterCertificates) prepareAPI(
clientCerts []string,
listeners []APIListener,
keystoreSecret *types.NamespacedName,
log logr.Logger,
name string,
) (*apiCertificates, error) {
tlsListeners := getTLSListeners(listeners)
externalTLSListener := getExternalTLSListener(listeners)
Expand Down Expand Up @@ -270,11 +273,21 @@ func (cc *ClusterCertificates) prepareAPI(

// for now we disallow having different issuer for each listener so that
// every time both listeners share the same set of certificates
nodeSecretRef := tlsListeners[0].GetTLS().NodeSecretRef
mainTLSListener := tlsListeners[0].GetTLS()

for i, list := range tlsListeners {
if list.GetTLS().ClientCACertRef != nil {
log.Info(fmt.Sprintf("found listener with ClientCACertRef for %q. Index: %d Name: %q", name, i, list.GetTLS().ClientCACertRef.Name))
}
}

result.externalClientCACertificate = mainTLSListener.ClientCACertRef

nodeSecretRef := mainTLSListener.NodeSecretRef
result.externalNodeCertificate = nodeSecretRef
isSelfSigned, err := isSelfSigned(ctx,
nodeSecretRef,
tlsListeners[0].GetTLS().IssuerRef,
mainTLSListener.IssuerRef,
cc.pandaCluster.Namespace,
cc.client)
if err != nil {
Expand Down Expand Up @@ -307,13 +320,17 @@ func (cc *ClusterCertificates) prepareAPI(
result.nodeCertificate = nodeCert
}

anyListenerWithMutualTLS := false
generateClientCerts := false
for _, l := range tlsListeners {
if l.GetTLS().RequireClientAuth {
anyListenerWithMutualTLS = true
tls := l.GetTLS()
// Trigger client cert generation only if RequireClientAuth is true and
// ClientCACertRef hasn't been set.
if tls.RequireClientAuth && tls.ClientCACertRef == nil {
generateClientCerts = true
break
}
}
if anyListenerWithMutualTLS {
if generateClientCerts {
// if there is at least one listener with mutual tls, we are going to
// generate the client certificates
for _, clientCertName := range clientCerts {
Expand Down Expand Up @@ -421,6 +438,13 @@ func (ac *apiCertificates) resources(
}
}

clientCACertSecretRef := ac.externalClientCACertificate
if clientCACertSecretRef != nil && clientCACertSecretRef.Name != "" && clientCACertSecretRef.Namespace != "" && clientCACertSecretRef.Namespace != ac.clusterNamespace {
if err := copyNodeSecretToLocalNamespace(ctx, clientCACertSecretRef, ac.clusterNamespace, k8sClient, logger); err != nil {
return nil, fmt.Errorf("copy node secret for %s cert group in namespace %s: %w", ac.clientCACertificateName().Name, clientCACertSecretRef.Namespace, err)
}
}

res := []resources.Resource{}
res = append(res, ac.rootResources...)
if ac.nodeCertificate != nil {
Expand Down Expand Up @@ -483,6 +507,16 @@ func (ac *apiCertificates) nodeCertificateName() *types.NamespacedName {
return nil
}

func (ac *apiCertificates) clientCACertificateName() *types.NamespacedName {
if ac.externalClientCACertificate == nil {
return nil
}
return &types.NamespacedName{
Name: ac.externalClientCACertificate.Name,
Namespace: ac.externalClientCACertificate.Namespace,
}
}

func (ac *apiCertificates) clientCertificateNames() []types.NamespacedName {
names := []types.NamespacedName{}
for _, c := range ac.clientCertificates {
Expand Down Expand Up @@ -521,6 +555,7 @@ func (cc *ClusterCertificates) Resources(
return res, nil
}

// Here, certs are mounted.
// Volumes returns volumes and mounts that statefulset has to define to have
// access to all TLS certificates redpanda has enabled
func (cc *ClusterCertificates) Volumes() (
Expand All @@ -533,19 +568,63 @@ func (cc *ClusterCertificates) Volumes() (

// kafka client certs are needed for pandaproxy and schema registry if enabled
shouldIncludeKafkaClientCerts := len(cc.kafkaAPI.clientCertificates) > 0
vol, mount := secretVolumesForTLS(cc.kafkaAPI.nodeCertificateName(), cc.kafkaAPI.clientCertificates, redpandaCertVolName, redpandaClientVolName, mountPoints.KafkaAPI.NodeCertMountDir, mountPoints.KafkaAPI.ClientCAMountDir, cc.kafkaAPI.selfSignedNodeCertificate, shouldIncludeKafkaClientCerts)
vol, mount := secretVolumesForTLS(
cc.kafkaAPI.nodeCertificateName(),
cc.kafkaAPI.clientCACertificateName(),
cc.kafkaAPI.clientCertificates,
redpandaCertVolName,
redpandaClientVolName,
mountPoints.KafkaAPI.NodeCertMountDir,
mountPoints.KafkaAPI.ClientCAMountDir,
cc.kafkaAPI.selfSignedNodeCertificate,
shouldIncludeKafkaClientCerts,
cc.logger,
)
vols = append(vols, vol...)
mounts = append(mounts, mount...)

vol, mount = secretVolumesForTLS(cc.adminAPI.nodeCertificateName(), cc.adminAPI.clientCertificates, adminAPICertVolName, adminAPIClientCAVolName, mountPoints.AdminAPI.NodeCertMountDir, mountPoints.AdminAPI.ClientCAMountDir, false, false)
vol, mount = secretVolumesForTLS(
cc.adminAPI.nodeCertificateName(),
cc.adminAPI.clientCACertificateName(),
cc.adminAPI.clientCertificates,
adminAPICertVolName,
adminAPIClientCAVolName,
mountPoints.AdminAPI.NodeCertMountDir,
mountPoints.AdminAPI.ClientCAMountDir,
false,
false,
cc.logger,
)
vols = append(vols, vol...)
mounts = append(mounts, mount...)

vol, mount = secretVolumesForTLS(cc.pandaProxyAPI.nodeCertificateName(), cc.pandaProxyAPI.clientCertificates, pandaProxyCertVolName, pandaProxyClientCAVolName, mountPoints.PandaProxyAPI.NodeCertMountDir, mountPoints.PandaProxyAPI.ClientCAMountDir, false, false)
vol, mount = secretVolumesForTLS(
cc.pandaProxyAPI.nodeCertificateName(),
cc.pandaProxyAPI.clientCACertificateName(),
cc.pandaProxyAPI.clientCertificates,
pandaProxyCertVolName,
pandaProxyClientCAVolName,
mountPoints.PandaProxyAPI.NodeCertMountDir,
mountPoints.PandaProxyAPI.ClientCAMountDir,
cc.pandaProxyAPI.externalClientCACertificate != nil,
false,
cc.logger,
)
vols = append(vols, vol...)
mounts = append(mounts, mount...)

vol, mount = secretVolumesForTLS(cc.schemaRegistryAPI.nodeCertificateName(), cc.schemaRegistryAPI.clientCertificates, schemaRegistryCertVolName, schemaRegistryClientCAVolName, mountPoints.SchemaRegistryAPI.NodeCertMountDir, mountPoints.SchemaRegistryAPI.ClientCAMountDir, false, false)
vol, mount = secretVolumesForTLS(
cc.schemaRegistryAPI.nodeCertificateName(),
cc.schemaRegistryAPI.clientCACertificateName(),
cc.schemaRegistryAPI.clientCertificates,
schemaRegistryCertVolName,
schemaRegistryClientCAVolName,
mountPoints.SchemaRegistryAPI.NodeCertMountDir,
mountPoints.SchemaRegistryAPI.ClientCAMountDir,
cc.schemaRegistryAPI.externalClientCACertificate != nil,
false,
cc.logger,
)
vols = append(vols, vol...)
mounts = append(mounts, mount...)

Expand All @@ -554,9 +633,11 @@ func (cc *ClusterCertificates) Volumes() (

func secretVolumesForTLS(
nodeCertificate *types.NamespacedName,
clientCACertificate *types.NamespacedName,
clientCertificates []resources.Resource,
volumeName, clientVolumeName, mountDir, caMountDir string,
volumeName, clientVolumeName, mountDir, clientMountDir string,
shouldIncludeNodeCa, shouldIncludeClientCert bool,
log logr.Logger,
) ([]corev1.Volume, []corev1.VolumeMount) {
var vols []corev1.Volume
var mounts []corev1.VolumeMount
Expand Down Expand Up @@ -584,19 +665,45 @@ func secretVolumesForTLS(
},
}

if shouldIncludeNodeCa {
nodeVolume.VolumeSource.Secret.Items = append(nodeVolume.VolumeSource.Secret.Items, corev1.KeyToPath{
Key: cmmetav1.TLSCAKey,
Path: cmmetav1.TLSCAKey,
})
}

vols = append(vols, nodeVolume)
mounts = append(mounts, corev1.VolumeMount{
Name: volumeName,
MountPath: mountDir,
})

log.Info(fmt.Sprintf("clientCACert: %q, shouldIncludeNodeCa: %t", clientCACertificate, shouldIncludeClientCert))
if shouldIncludeNodeCa {
// If the client CA cert wasn't provided, use the default one generated.
if clientCACertificate == nil {
nodeVolume.VolumeSource.Secret.Items = append(
nodeVolume.VolumeSource.Secret.Items,
corev1.KeyToPath{
Key: cmmetav1.TLSCAKey,
Path: cmmetav1.TLSCAKey,
},
)
} else {
// Otherwise, create a volume & mount for the secret where the provided cert is stored.
caVolume := corev1.Volume{
Name: clientVolumeName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: clientCACertificate.Name,
Items: []corev1.KeyToPath{{
Key: cmmetav1.TLSCAKey,
Path: cmmetav1.TLSCAKey,
}},
},
},
}
vols = append(vols, caVolume)
mounts = append(mounts, corev1.VolumeMount{
Name: clientVolumeName,
MountPath: clientMountDir,
})
}
}

// if mutual TLS is enabled, mount also client cerificate CA to be able to
// verify client certificates
if len(clientCertificates) > 0 {
Expand Down Expand Up @@ -626,7 +733,7 @@ func secretVolumesForTLS(
vols = append(vols, clientCertVolume)
mounts = append(mounts, corev1.VolumeMount{
Name: clientVolumeName,
MountPath: caMountDir,
MountPath: clientMountDir,
})
}

Expand Down
3 changes: 3 additions & 0 deletions src/go/k8s/pkg/resources/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ func (r *ConfigMapResource) prepareSchemaRegistryClient(
return cfg.AppendToAdditionalRedpandaProperty(superusersConfigurationKey, username)
}

// TODO: might need to change this
func (r *ConfigMapResource) preparePandaproxyTLS(
cfgRpk *config.Config, mountPoints *resourcetypes.TLSMountPoints,
) {
Expand All @@ -608,12 +609,14 @@ func (r *ConfigMapResource) preparePandaproxyTLS(
RequireClientAuth: tlsListener.TLS.RequireClientAuth,
}
if tlsListener.TLS.RequireClientAuth {
// This path!
tls.TruststoreFile = fmt.Sprintf("%s/%s", mountPoints.PandaProxyAPI.ClientCAMountDir, cmetav1.TLSCAKey)
}
cfgRpk.Pandaproxy.PandaproxyAPITLS = []config.ServerTLS{tls}
}
}

// here
func (r *ConfigMapResource) prepareSchemaRegistryTLS(
cfgRpk *config.Config, mountPoints *resourcetypes.TLSMountPoints,
) {
Expand Down

0 comments on commit 5c9f8af

Please sign in to comment.