Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 69 additions & 17 deletions pkg/multicluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"

apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -234,9 +235,29 @@ func (c *Client) clusterForWrite(gvk schema.GroupVersionKind, obj any) (cluster.
return nil, fmt.Errorf("no cluster matched for GVK %s", gvk)
}

type duplicateError struct{ msg string }

func (e *duplicateError) Error() string { return e.msg }

// IsDuplicateError returns true if the error indicates that a resource was
// found in multiple clusters. This can be used by callers of the Get and List
// methods to keep using the result even if a duplicate exists, as long as they
// don't mind that the result is potentially inconsistent.
func IsDuplicateError(err error) bool {
var de *duplicateError
return errors.As(err, &de)
}

// Get iterates over all clusters with the GVK and returns the result.
// Returns an error if the resource is found in multiple clusters (duplicate).
//
// If the requested resource is encountered in multiple clusters, this function
// will return the first one, but will set an error message that can be checked
// with IsDuplicateError. In that way the result can be used if the caller
// just cares about the resource existing in at least one cluster, and doesn't
// mind which one is returned.
//
// If no cluster has the resource, a NotFound error is returned.
//
// Non-NotFound errors from individual clusters are logged and silently skipped
// so that a single unavailable cluster does not block the entire read path.
func (c *Client) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
Expand All @@ -256,10 +277,13 @@ func (c *Client) Get(ctx context.Context, key client.ObjectKey, obj client.Objec
candidate := obj.DeepCopyObject().(client.Object)
err := cl.GetClient().Get(ctx, key, candidate, opts...)
if err == nil {
return fmt.Errorf("duplicate resource found: %s %s/%s exists in multiple clusters", gvk, key.Namespace, key.Name)
// In this case Get() was already called and the object set.
return &duplicateError{msg: fmt.Sprintf("duplicate %s %s/%s in multiple clusters",
gvk, key.Namespace, key.Name)}
}
if !apierrors.IsNotFound(err) {
log.Error(err, "error checking for duplicate resource in cluster", "gvk", gvk, "namespace", key.Namespace, "name", key.Name)
log.Error(err, "error checking for duplicate resource in cluster",
"gvk", gvk, "namespace", key.Namespace, "name", key.Name)
}
continue
}
Expand All @@ -270,7 +294,8 @@ func (c *Client) Get(ctx context.Context, key client.ObjectKey, obj client.Objec
continue
}
if !apierrors.IsNotFound(err) {
log.Error(err, "error getting resource from cluster", "gvk", gvk, "namespace", key.Namespace, "name", key.Name)
log.Error(err, "error getting resource from cluster", "gvk", gvk,
"namespace", key.Namespace, "name", key.Name)
}
}
if !found {
Expand All @@ -279,8 +304,15 @@ func (c *Client) Get(ctx context.Context, key client.ObjectKey, obj client.Objec
return nil
}

// List iterates over all clusters with the GVK and returns a combined list.
// Returns an error if any resources share the same namespace/name across clusters.
// List iterates over all clusters with the GVK and returns a combined list
// containing all resources found in any cluster.
//
// If resources are encountered in multiple clusters with the same
// namespace/name, this function will still return a combined list of all
// resources, but will set an error message that can be checked with
// IsDuplicateError. In that way the result can be used if duplicates are ok
// and disambiguated by the caller.
//
// Errors from individual clusters are logged and silently skipped so that a
// single unavailable cluster does not block the entire read path.
func (c *Client) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
Expand Down Expand Up @@ -323,11 +355,14 @@ func (c *Client) List(ctx context.Context, list client.ObjectList, opts ...clien
}
seen[key] = true
}
if err := meta.SetList(list, allItems); err != nil {
return err
}
if len(duplicates) > 0 {
return fmt.Errorf("duplicate resources found in multiple clusters for %s: %v", gvk, duplicates)
return &duplicateError{msg: fmt.Sprintf("duplicate %s [%s] in multiple clusters",
gvk, strings.Join(duplicates, ", "))}
}

return meta.SetList(list, allItems)
return nil
}

// Apply is not supported in the multicluster client as the group version kind
Expand Down Expand Up @@ -498,10 +533,19 @@ type subResourceClient struct {
}

// Get iterates over all clusters with the GVK and returns the result.
// Returns an error if the resource is found in multiple clusters (duplicate).
//
// If the requested resource is encountered in multiple clusters, this function
// will return the first one, but will set an error message that can be checked
// with IsDuplicateError. In that way the result can be used if the caller
// just cares about the resource existing in at least one cluster, and doesn't
// mind which one is returned.
//
// If no cluster has the resource, a NotFound error is returned.
//
// Non-NotFound errors from individual clusters are logged and silently skipped
// so that a single unavailable cluster does not block the entire read path.
func (c *subResourceClient) Get(ctx context.Context, obj, subResource client.Object, opts ...client.SubResourceGetOption) error {
log := ctrl.LoggerFrom(ctx)

gvk, err := c.multiclusterClient.GVKFromHomeScheme(obj)
if err != nil {
return err
Expand All @@ -510,29 +554,37 @@ func (c *subResourceClient) Get(ctx context.Context, obj, subResource client.Obj
if err != nil {
return err
}

found := false
for _, cl := range clusters {
// If we already found the resource in a previous cluster, we want to check if it also exists in this cluster to detect duplicates.
if found {
candidateObj := obj.DeepCopyObject().(client.Object)
candidateSub := subResource.DeepCopyObject().(client.Object)
err := cl.GetClient().SubResource(c.subResource).Get(ctx, candidateObj, candidateSub, opts...)
err := cl.GetClient().SubResource(c.subResource).
Get(ctx, candidateObj, candidateSub, opts...)
if err == nil {
return fmt.Errorf("duplicate sub-resource found: %s %s/%s exists in multiple clusters", gvk, obj.GetNamespace(), obj.GetName())
// In this case Get() was already called and the object set.
return &duplicateError{msg: fmt.Sprintf("duplicate %s %s/%s subresource %s in multiple clusters",
gvk, candidateObj.GetNamespace(), candidateObj.GetName(), c.subResource)}
}
if !apierrors.IsNotFound(err) {
log.Error(err, "error checking for duplicate sub-resource in cluster", "gvk", gvk, "namespace", obj.GetNamespace(), "name", obj.GetName(), "subresource", c.subResource)
log.Error(err, "error checking for duplicate sub-resource in cluster",
"gvk", gvk, "namespace", obj.GetNamespace(), "name", obj.GetName(),
"subresource", c.subResource)
}
continue
}

err := cl.GetClient().SubResource(c.subResource).Get(ctx, obj, subResource, opts...)
err := cl.GetClient().SubResource(c.subResource).
Get(ctx, obj, subResource, opts...)
if err == nil {
found = true
continue
}
if !apierrors.IsNotFound(err) {
log.Error(err, "error getting sub-resource from cluster", "gvk", gvk, "namespace", obj.GetNamespace(), "name", obj.GetName(), "subresource", c.subResource)
log.Error(err, "error getting sub-resource from cluster", "gvk", gvk,
"namespace", obj.GetNamespace(), "name", obj.GetName(),
"subresource", c.subResource)
}
}
if !found {
Expand Down
70 changes: 62 additions & 8 deletions pkg/multicluster/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ package multicluster

import (
"context"
"errors"
"strings"
"sync"
"testing"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -157,6 +159,42 @@ var configMapGVK = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Conf
var configMapListGVK = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMapList"}
var podGVK = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}

func TestIsDuplicateError(t *testing.T) {
tests := []struct {
name string
err error
expected bool
}{
{
name: "nil error",
err: nil,
expected: false,
},
{
name: "duplicate error",
err: &duplicateError{msg: "duplicate /v1, Kind=ConfigMap default/foo in multiple clusters"},
expected: true,
},
{
name: "unrelated error",
err: errors.New("something went wrong"),
expected: false,
},
{
name: "not found error",
err: apierrors.NewNotFound(schema.GroupResource{Group: "", Resource: "ConfigMap"}, "foo"),
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsDuplicateError(tt.err); got != tt.expected {
t.Errorf("IsDuplicateError(%v) = %v, want %v", tt.err, got, tt.expected)
}
})
}
}

func TestClient_Apply(t *testing.T) {
c := &Client{HomeScheme: newTestScheme(t)}

Expand Down Expand Up @@ -684,8 +722,12 @@ func TestClient_Get_MultiCluster_DuplicateError(t *testing.T) {
if err == nil {
t.Fatal("expected duplicate error, got nil")
}
if !strings.Contains(err.Error(), "duplicate") {
t.Errorf("expected duplicate error, got: %v", err)
if !IsDuplicateError(err) {
t.Errorf("expected IsDuplicateError to return true, got false for error: %v", err)
}
// The result should still be populated with the object from the first cluster.
if result.Name != "shared-cm" {
t.Errorf("expected result to be populated, got name %q", result.Name)
}
}

Expand Down Expand Up @@ -713,8 +755,12 @@ func TestClient_Get_HomeAndRemote_DuplicateError(t *testing.T) {
if err == nil {
t.Fatal("expected duplicate error, got nil")
}
if !strings.Contains(err.Error(), "duplicate") {
t.Errorf("expected duplicate error, got: %v", err)
if !IsDuplicateError(err) {
t.Errorf("expected IsDuplicateError to return true, got false for error: %v", err)
}
// The result should still be populated with the object from the first cluster.
if result.Name != "shared-cm" {
t.Errorf("expected result to be populated, got name %q", result.Name)
}
}

Expand Down Expand Up @@ -850,12 +896,16 @@ func TestClient_List_MultipleClusters_DuplicateError(t *testing.T) {
if err == nil {
t.Fatal("expected duplicate error, got nil")
}
if !strings.Contains(err.Error(), "duplicate") {
t.Errorf("expected duplicate error, got: %v", err)
if !IsDuplicateError(err) {
t.Errorf("expected IsDuplicateError to return true, got false for error: %v", err)
}
if !strings.Contains(err.Error(), "default/shared-cm") {
t.Errorf("expected error to contain duplicated resource name, got: %v", err)
}
// The list should still be populated (all items from all clusters).
if len(cmList.Items) == 0 {
t.Error("expected list to be populated even when duplicate error is returned")
}
}

func TestClient_List_HomeAndRemote_DuplicateError(t *testing.T) {
Expand All @@ -882,8 +932,12 @@ func TestClient_List_HomeAndRemote_DuplicateError(t *testing.T) {
if err == nil {
t.Fatal("expected duplicate error, got nil")
}
if !strings.Contains(err.Error(), "duplicate") {
t.Errorf("expected duplicate error, got: %v", err)
if !IsDuplicateError(err) {
t.Errorf("expected IsDuplicateError to return true, got false for error: %v", err)
}
// The list should still be populated (all items from all clusters).
if len(cmList.Items) == 0 {
t.Error("expected list to be populated even when duplicate error is returned")
}
}

Expand Down
Loading