diff --git a/cmd/cloud-network-config-controller/main.go b/cmd/cloud-network-config-controller/main.go index 7fdbd5cf0..486479a57 100644 --- a/cmd/cloud-network-config-controller/main.go +++ b/cmd/cloud-network-config-controller/main.go @@ -153,6 +153,7 @@ func main() { cloudNetworkClient, cloudNetworkInformerFactory.Cloud().V1().CloudPrivateIPConfigs(), kubeInformerFactory.Core().V1().Nodes(), + platformCfg, ) if err != nil { klog.Fatalf("Error getting cloud private ip controller, err: %v", err) diff --git a/pkg/cloudprovider/azure.go b/pkg/cloudprovider/azure.go index 7215418fd..c0faf4bc9 100644 --- a/pkg/cloudprovider/azure.go +++ b/pkg/cloudprovider/azure.go @@ -9,23 +9,29 @@ import ( "sync" "time" - "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" - "k8s.io/utils/ptr" - "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm" "github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud" "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute" "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v6" azureapi "github.com/Azure/go-autorest/autorest/azure" "github.com/Azure/msi-dataplane/pkg/dataplane" + cloudnetworkv1 "github.com/openshift/api/cloudnetwork/v1" configv1 "github.com/openshift/api/config/v1" + cloudnetworklisters "github.com/openshift/client-go/cloudnetwork/listers/cloudnetwork/v1" + "github.com/openshift/cloud-network-config-controller/pkg/cloudprivateipconfig" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" + "k8s.io/utils/ptr" ) const ( @@ -48,7 +54,6 @@ type Azure struct { vmClient *armcompute.VirtualMachinesClient virtualNetworkClient *armnetwork.VirtualNetworksClient networkClient *armnetwork.InterfacesClient - backendAddressPoolClient *armnetwork.LoadBalancerBackendAddressPoolsClient nodeMapLock sync.Mutex nodeLockMap map[string]*sync.Mutex azureWorkloadIdentityEnabled bool @@ -114,6 +119,7 @@ func (a *Azure) readAzureCredentialsConfig() (*azureCredentialsConfig, error) { return &cfg, nil } + func (a *Azure) initCredentials() error { cfg, err := a.readAzureCredentialsConfig() if err != nil { @@ -162,11 +168,6 @@ func (a *Azure) initCredentials() error { return fmt.Errorf("failed to initialize new VirtualNetworksClient: %w", err) } - a.backendAddressPoolClient, err = armnetwork.NewLoadBalancerBackendAddressPoolsClient(cfg.subscriptionID, cred, options) - if err != nil { - return fmt.Errorf("failed to initialize new LoadBalancerBackendAddressPoolsClient: %w", err) - } - return nil } @@ -178,11 +179,11 @@ func (a *Azure) AssignPrivateIP(ip net.IP, node *corev1.Node) error { defer nodeLock.Unlock() instance, err := a.getInstance(node) if err != nil { - return err + return fmt.Errorf("error while retrieving instance details from Azure: %w", err) } networkInterfaces, err := a.getNetworkInterfaces(instance) if err != nil { - return err + return fmt.Errorf("error while retrieving interface details from Azure: %w", err) } if networkInterfaces[0].Properties == nil { return fmt.Errorf("nil network interface properties") @@ -198,65 +199,14 @@ func (a *Azure) AssignPrivateIP(ip net.IP, node *corev1.Node) error { name := fmt.Sprintf("%s_%s", node.Name, ipc) untrue := false - // In some Azure setups (Azure private, public ARO, private ARO) outbound connectivity is achieved through - // outbound rules tied to the backend address pool of the primary IP of the VM NIC. An Azure constraint - // forbids the creation of a secondary IP tied to such address pool and would result in - // OutboundRuleCannotBeUsedWithBackendAddressPoolThatIsReferencedBySecondaryIpConfigs. - // Work around it by not specifying the backend address pool when an outbound rule is set, even though - // that means preventing outbound connectivity to the egress IP, which will be able to reach the - // infrastructure subnet nonetheless. In public Azure clusters, outbound connectivity is achieved through - // UserDefinedRouting, which doesn't impose such constraints on secondary IPs. - loadBalancerBackendAddressPoolsArgument := networkInterface.Properties.IPConfigurations[0].Properties.LoadBalancerBackendAddressPools - var attachedOutboundRule *armnetwork.SubResource -OuterLoop: - for _, ipconfig := range networkInterface.Properties.IPConfigurations { - if ipconfig.Properties.LoadBalancerBackendAddressPools != nil { - for _, pool := range ipconfig.Properties.LoadBalancerBackendAddressPools { - if pool.ID == nil { - continue - } - // for some reason, the struct for the pool above is not entirely filled out: - // BackendAddressPoolPropertiesFormat:(*network.BackendAddressPoolPropertiesFormat)(nil) - // Do a separate get for this pool in order to check whether there are any outbound rules - // attached to it - realPool, err := a.getBackendAddressPool(ptr.Deref(pool.ID, "")) - if err != nil { - return fmt.Errorf("error looking up backend address pool %s with ID %s: %v", ptr.Deref(pool.Name, ""), ptr.Deref(pool.ID, ""), err) - } - if len(realPool.Properties.LoadBalancerBackendAddresses) > 0 { - if realPool.Properties.OutboundRule != nil { - loadBalancerBackendAddressPoolsArgument = nil - attachedOutboundRule = realPool.Properties.OutboundRule - break OuterLoop - } - if len(realPool.Properties.OutboundRules) > 0 { - loadBalancerBackendAddressPoolsArgument = nil - attachedOutboundRule = (realPool.Properties.OutboundRules)[0] - break OuterLoop - } - } - } - } - } - if loadBalancerBackendAddressPoolsArgument == nil { - outboundRuleStr := "" - if attachedOutboundRule != nil && attachedOutboundRule.ID != nil { - // https://issues.redhat.com/browse/OCPBUGS-33617 showed that there can be a rule without an ID... - outboundRuleStr = fmt.Sprintf(": %s", ptr.Deref(attachedOutboundRule.ID, "")) - } - klog.Warningf("Egress IP %s will have no outbound connectivity except for the infrastructure subnet: "+ - "omitting backend address pool when adding secondary IP: it has an outbound rule already%s", - ipc, outboundRuleStr) - } newIPConfiguration := &armnetwork.InterfaceIPConfiguration{ Name: &name, Properties: &armnetwork.InterfaceIPConfigurationPropertiesFormat{ - PrivateIPAddress: &ipc, - PrivateIPAllocationMethod: ptr.To(armnetwork.IPAllocationMethodStatic), - Subnet: networkInterface.Properties.IPConfigurations[0].Properties.Subnet, - Primary: &untrue, - LoadBalancerBackendAddressPools: loadBalancerBackendAddressPoolsArgument, - ApplicationSecurityGroups: applicationSecurityGroups, + PrivateIPAddress: &ipc, + PrivateIPAllocationMethod: ptr.To(armnetwork.IPAllocationMethodStatic), + Subnet: networkInterface.Properties.IPConfigurations[0].Properties.Subnet, + Primary: &untrue, + ApplicationSecurityGroups: applicationSecurityGroups, }, } for _, ipCfg := range ipConfigurations { @@ -272,9 +222,11 @@ OuterLoop: ipConfigurations = append(ipConfigurations, newIPConfiguration) networkInterface.Properties.IPConfigurations = ipConfigurations // Send the request + klog.Warningf("Egress IP %s will have no outbound connectivity except for the infrastructure subnet: "+ + "omitting backend address pool when adding secondary IP", ipc) poller, err := a.createOrUpdate(networkInterface) if err != nil { - return err + return fmt.Errorf("error while updating network interface: %w", err) } return a.waitForCompletion(poller) } @@ -286,11 +238,11 @@ func (a *Azure) ReleasePrivateIP(ip net.IP, node *corev1.Node) error { defer nodeLock.Unlock() instance, err := a.getInstance(node) if err != nil { - return err + return fmt.Errorf("error while retrieving instance details from Azure: %w", err) } networkInterfaces, err := a.getNetworkInterfaces(instance) if err != nil { - return err + return fmt.Errorf("error while retrieving interface details from Azure: %w", err) } // Perform the operation against the first interface listed, which will be // the primary interface (if it's defined as such) or the first one returned @@ -317,7 +269,7 @@ func (a *Azure) ReleasePrivateIP(ip net.IP, node *corev1.Node) error { // Send the request poller, err := a.createOrUpdate(networkInterface) if err != nil { - return err + return fmt.Errorf("error while updating network interface: %w", err) } return a.waitForCompletion(poller) } @@ -357,6 +309,74 @@ func (a *Azure) GetNodeEgressIPConfiguration(node *corev1.Node, cpicIPs sets.Set return []*NodeEgressIPConfiguration{config}, nil } +// The consensus is to not add egress IP to public load balancer +// backend pool regardless of the presence of an OutBoundRule. +// During upgrade this function removes any egress IP added to +// public load balancer backend pool previously. +func (a *Azure) SyncLBBackend(cloudPrivateIPConfigLister cloudnetworklisters.CloudPrivateIPConfigLister, nodeLister corelisters.NodeLister) error { + cloudPrivateIPConfigs, err := cloudPrivateIPConfigLister.List(labels.Everything()) + if err != nil { + return fmt.Errorf("error listing cloud private ip config, err: %v", err) + } + for _, cloudPrivateIPConfig := range cloudPrivateIPConfigs { + if !isCloudPrivateIPConfigAssigned(cloudPrivateIPConfig) { + continue + } + ip, _, err := cloudprivateipconfig.NameToIP(cloudPrivateIPConfig.Name) + if err != nil { + return fmt.Errorf("error parsing CloudPrivateIPConfig %s: %v", cloudPrivateIPConfig.Name, err) + } + ipc := ip.String() + node, err := nodeLister.Get(cloudPrivateIPConfig.Spec.Node) + if err != nil && apierrors.IsNotFound(err) { + klog.Warningf("source node: %s no longer exists for CloudPrivateIPConfig: %q", + cloudPrivateIPConfig.Spec.Node, cloudPrivateIPConfig.Name) + continue + } else if err != nil { + return fmt.Errorf("error getting node %s for CloudPrivateIPConfig %q: %w", + cloudPrivateIPConfig.Spec.Node, cloudPrivateIPConfig.Name, err) + } + + instance, err := a.getInstance(node) + if err != nil { + return fmt.Errorf("error while retrieving instance details from Azure: %w", err) + } + networkInterfaces, err := a.getNetworkInterfaces(instance) + if err != nil { + return fmt.Errorf("error while retrieving interface details from Azure: %w", err) + } + if networkInterfaces[0].Properties == nil { + return fmt.Errorf("nil network interface properties") + } + // Perform the operation against the first interface listed, which will be + // the primary interface (if it's defined as such) or the first one returned + // following the order Azure specifies. + networkInterface := networkInterfaces[0] + var loadBalancerBackendPoolModified bool + // omit Egress IP from LB backend pool + ipConfigurations := networkInterface.Properties.IPConfigurations + for _, ipCfg := range ipConfigurations { + if ptr.Deref(ipCfg.Properties.PrivateIPAddress, "") == ipc && + ipCfg.Properties.LoadBalancerBackendAddressPools != nil { + klog.Infof("Removing Egress IP %s from Azure public load balancer backend pool", ipc) + ipCfg.Properties.LoadBalancerBackendAddressPools = nil + loadBalancerBackendPoolModified = true + } + } + if loadBalancerBackendPoolModified { + networkInterface.Properties.IPConfigurations = ipConfigurations + poller, err := a.createOrUpdate(networkInterface) + if err != nil { + return fmt.Errorf("error while updating network interface: %w", err) + } + if err = a.waitForCompletion(poller); err != nil { + return fmt.Errorf("error while updating network interface: %w", err) + } + } + } + return nil +} + func (a *Azure) createOrUpdate(networkInterface armnetwork.Interface) (*runtime.Poller[armnetwork.InterfacesClientCreateOrUpdateResponse], error) { ctx, cancel := context.WithTimeout(a.ctx, defaultAzureOperationTimeout) defer cancel() @@ -490,46 +510,6 @@ func (a *Azure) getNetworkInterfaces(instance *armcompute.VirtualMachine) ([]arm return networkInterfaces, nil } -func splitObjectID(azureResourceID string) (resourceGroupName, loadBalancerName, backendAddressPoolName string) { - // example of an azureResourceID: - // "/subscriptions/53b8f551-f0fc-4bea-8cba-6d1fefd54c8a/resourceGroups/huirwang-debug1-2qh9t-rg/providers/Microsoft.Network/loadBalancers/huirwang-debug1-2qh9t/backendAddressPools/huirwang-debug1-2qh9t" - - // Split the Azure resource ID into parts using "/" - parts := strings.Split(azureResourceID, "/") - - // Iterate through the parts to find the relevant subIDs - for i, part := range parts { - switch part { - case "resourceGroups": - if i+1 < len(parts) { - resourceGroupName = parts[i+1] - } - case "loadBalancers": - if i+1 < len(parts) { - loadBalancerName = parts[i+1] - } - case "backendAddressPools": - if i+1 < len(parts) { - backendAddressPoolName = parts[i+1] - } - } - } - return -} - -func (a *Azure) getBackendAddressPool(poolID string) (*armnetwork.BackendAddressPool, error) { - ctx, cancel := context.WithTimeout(a.ctx, defaultAzureOperationTimeout) - defer cancel() - resourceGroupName, loadBalancerName, backendAddressPoolName := splitObjectID(poolID) - response, err := a.backendAddressPoolClient.Get(ctx, resourceGroupName, loadBalancerName, backendAddressPoolName, nil) - if err != nil { - return nil, fmt.Errorf("failed to retrieve backend address pool for backendAddressPoolClient=%s, loadBalancerName=%s, backendAddressPoolName=%s: %w", - resourceGroupName, loadBalancerName, backendAddressPoolName, err) - } - return &response.BackendAddressPool, nil - -} - func (a *Azure) getNetworkInterface(id string) (armnetwork.Interface, error) { ctx, cancel := context.WithTimeout(a.ctx, defaultAzureOperationTimeout) defer cancel() @@ -699,3 +679,12 @@ func ParseCloudEnvironment(env azureapi.Environment) cloud.Configuration { } return cloudConfig } + +func isCloudPrivateIPConfigAssigned(cpic *cloudnetworkv1.CloudPrivateIPConfig) bool { + for _, condition := range cpic.Status.Conditions { + if condition.Type == string(cloudnetworkv1.Assigned) && condition.Status == v1.ConditionTrue { + return true + } + } + return false +} diff --git a/pkg/cloudprovider/cloudprovider.go b/pkg/cloudprovider/cloudprovider.go index 7f56cf636..ea4cc8fee 100644 --- a/pkg/cloudprovider/cloudprovider.go +++ b/pkg/cloudprovider/cloudprovider.go @@ -134,7 +134,9 @@ func (n *NodeEgressIPConfiguration) String() string { return fmt.Sprintf("%v", *n) } -func NewCloudProviderClient(cfg CloudProviderConfig, platformStatus *configv1.PlatformStatus, featureGates featuregates.FeatureGate) (CloudProviderIntf, error) { +func NewCloudProviderClient(cfg CloudProviderConfig, + platformStatus *configv1.PlatformStatus, + featureGates featuregates.FeatureGate) (CloudProviderIntf, error) { var cloudProviderIntf CloudProviderIntf // Initialize a separate context from the main context, rationale: cloud diff --git a/pkg/controller/cloudprivateipconfig/cloudprivateipconfig_controller.go b/pkg/controller/cloudprivateipconfig/cloudprivateipconfig_controller.go index d19545224..bd89fcf4b 100644 --- a/pkg/controller/cloudprivateipconfig/cloudprivateipconfig_controller.go +++ b/pkg/controller/cloudprivateipconfig/cloudprivateipconfig_controller.go @@ -56,6 +56,9 @@ type CloudPrivateIPConfigController struct { // down to all API client calls as to make sure all in-flight calls get // cancelled if the main context is ctx context.Context + // initialSyncHook is an optional function called during InitialSync for + // cloud-provider-specific cleanup. If nil, no cleanup is performed. + initialSyncHook func() error } // NewCloudPrivateIPConfigController returns a new CloudPrivateIPConfig controller @@ -64,7 +67,8 @@ func NewCloudPrivateIPConfigController( cloudProviderClient cloudprovider.CloudProviderIntf, cloudNetworkClientset cloudnetworkclientset.Interface, cloudPrivateIPConfigInformer cloudnetworkinformers.CloudPrivateIPConfigInformer, - nodeInformer coreinformers.NodeInformer) (*controller.CloudNetworkConfigController, error) { + nodeInformer coreinformers.NodeInformer, + cfg cloudprovider.CloudProviderConfig) (*controller.CloudNetworkConfigController, error) { cloudPrivateIPConfigController := &CloudPrivateIPConfigController{ nodesLister: nodeInformer.Lister(), @@ -73,6 +77,14 @@ func NewCloudPrivateIPConfigController( cloudPrivateIPConfigLister: cloudPrivateIPConfigInformer.Lister(), ctx: controllerContext, } + + if cfg.PlatformType == cloudprovider.PlatformTypeAzure { + azureClient := cloudProviderClient.(*cloudprovider.Azure) + cloudPrivateIPConfigController.initialSyncHook = func() error { + return azureClient.SyncLBBackend(cloudPrivateIPConfigInformer.Lister(), nodeInformer.Lister()) + } + } + controller := controller.NewCloudNetworkConfigController( []cache.InformerSynced{cloudPrivateIPConfigInformer.Informer().HasSynced, nodeInformer.Informer().HasSynced}, cloudPrivateIPConfigController, @@ -116,6 +128,17 @@ func NewCloudPrivateIPConfigController( return controller, nil } +// InitialSync performs one-time cleanup on startup. +// This is called after informer caches are synced but before workers start processing items. +// If an initialSyncHook was provided, it will be called to perform cloud-provider-specific cleanup. +func (c *CloudPrivateIPConfigController) InitialSync() error { + if c.initialSyncHook == nil { + return nil + } + klog.Info("Running initial CloudPrivateIPConfig sync/cleanup") + return c.initialSyncHook() +} + // syncHandler compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the CloudPrivateIPConfig // resource with the current status of the resource. diff --git a/pkg/controller/cloudprivateipconfig/cloudprivateipconfig_controller_test.go b/pkg/controller/cloudprivateipconfig/cloudprivateipconfig_controller_test.go index 38531b34b..2ef0c450c 100644 --- a/pkg/controller/cloudprivateipconfig/cloudprivateipconfig_controller_test.go +++ b/pkg/controller/cloudprivateipconfig/cloudprivateipconfig_controller_test.go @@ -97,6 +97,7 @@ func (t *CloudPrivateIPConfigTestCase) NewFakeCloudPrivateIPConfigController() ( fakeCloudNetworkClient, cloudNetworkInformerFactory.Cloud().V1().CloudPrivateIPConfigs(), kubeInformerFactory.Core().V1().Nodes(), + cloudprovider.CloudProviderConfig{}, ) if err != nil { return nil, err diff --git a/pkg/controller/configmap/configmap_controller.go b/pkg/controller/configmap/configmap_controller.go index f8c7279ff..095e6745d 100644 --- a/pkg/controller/configmap/configmap_controller.go +++ b/pkg/controller/configmap/configmap_controller.go @@ -103,6 +103,10 @@ func (s *ConfigMapController) SyncHandler(key string) error { return nil } +func (s *ConfigMapController) InitialSync() error { + return nil +} + // shutdown is called in case we hit a configMap rotation. We need to: process all // in-flight requests and pause all our controllers for any further ones (since // we can't communicate with the cloud API using the old data anymore). I don't diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 5abb26b50..3cccdacef 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -39,6 +39,10 @@ const ( type CloudNetworkConfigControllerIntf interface { SyncHandler(key string) error + // InitialSync is called once after informer caches are synced but before workers start. + // Use this for one-time cleanup or initialization that requires access to the full + // state of the cluster (e.g., listing all existing resources). + InitialSync() error } type CloudNetworkConfigController struct { @@ -97,6 +101,12 @@ func (c *CloudNetworkConfigController) Run(stopCh <-chan struct{}) error { return fmt.Errorf("failed to wait for caches to sync for %s workqueue", c.controllerKey) } + // Perform one-time initial sync/cleanup before starting workers + klog.Infof("Running initial sync for %s controller", c.controllerKey) + if err := c.InitialSync(); err != nil { + return fmt.Errorf("initial sync failed for %s controller: %v", c.controllerKey, err) + } + klog.Infof("Starting %s workers", c.controllerKey) // Launch default amount of workers to process resources for i := 0; i < defaultWorkerThreadiness; i++ { diff --git a/pkg/controller/node/node_controller.go b/pkg/controller/node/node_controller.go index 1b90520ce..bfaf1ea3c 100644 --- a/pkg/controller/node/node_controller.go +++ b/pkg/controller/node/node_controller.go @@ -21,8 +21,8 @@ import ( cloudnetworkinformers "github.com/openshift/client-go/cloudnetwork/informers/externalversions/cloudnetwork/v1" cloudnetworklisters "github.com/openshift/client-go/cloudnetwork/listers/cloudnetwork/v1" - cloudprovider "github.com/openshift/cloud-network-config-controller/pkg/cloudprovider" "github.com/openshift/cloud-network-config-controller/pkg/cloudprivateipconfig" + cloudprovider "github.com/openshift/cloud-network-config-controller/pkg/cloudprovider" controller "github.com/openshift/cloud-network-config-controller/pkg/controller" ) @@ -178,6 +178,10 @@ func (n *NodeController) generateAnnotation(nodeEgressIPConfigs []*cloudprovider return string(serialized), nil } +func (n *NodeController) InitialSync() error { + return nil +} + // TaintKeyExists checks if the given taint key exists in list of taints. Returns true if exists false otherwise. // Copied from k8s.io/kubernetes/pkg/util/taints/taints.go to avoid dependency hell. func taintKeyExists(taints []v1.Taint, taintKeyToMatch string) bool { diff --git a/pkg/controller/secret/secret_controller.go b/pkg/controller/secret/secret_controller.go index 7b086a057..5d7d47ef3 100644 --- a/pkg/controller/secret/secret_controller.go +++ b/pkg/controller/secret/secret_controller.go @@ -103,6 +103,10 @@ func (s *SecretController) SyncHandler(key string) error { return nil } +func (s *SecretController) InitialSync() error { + return nil +} + // shutdown is called in case we hit a secret rotation. We need to: process all // in-flight requests and pause all our controllers for any further ones (since // we can't communicate with the cloud API using the old data anymore). I don't