From 19cf24903e2ce7f58b3ed5f396c02647b234a600 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Tue, 18 Nov 2025 16:46:33 +0100 Subject: [PATCH] NETOBSERV-2495: Generate subnet config from netpol IPBlock Based on label presence "netobserv.io/label-ipblocks" --- internal/controller/flp/flp_controller.go | 70 ++++++++++++++++++++- internal/controller/flp/flp_test.go | 76 +++++++++++++++++++++++ 2 files changed, 144 insertions(+), 2 deletions(-) diff --git a/internal/controller/flp/flp_controller.go b/internal/controller/flp/flp_controller.go index 1f2e82538..88f602da7 100644 --- a/internal/controller/flp/flp_controller.go +++ b/internal/controller/flp/flp_controller.go @@ -3,6 +3,8 @@ package flp import ( "context" "fmt" + "slices" + "strings" flowslatest "github.com/netobserv/network-observability-operator/api/flowcollector/v1beta2" metricslatest "github.com/netobserv/network-observability-operator/api/flowmetrics/v1alpha1" @@ -18,6 +20,9 @@ import ( appsv1 "k8s.io/api/apps/v1" ascv2 "k8s.io/api/autoscaling/v2" corev1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -26,6 +31,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" ) +const netpolLabel = "netobserv.io/label-ipblocks" + // Reconciler reconciles the current flowlogs-pipeline state with the desired configuration type Reconciler struct { client.Client @@ -62,6 +69,19 @@ func Start(ctx context.Context, mgr *manager.Manager) error { return []reconcile.Request{} }), reconcilers.IgnoreStatusChange, + ). + Watches( + &networkingv1.NetworkPolicy{}, + handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request { + lbls := o.GetLabels() + if lbls != nil { + if _, ok := lbls[netpolLabel]; ok { + return []reconcile.Request{{NamespacedName: constants.FlowCollectorName}} + } + } + return []reconcile.Request{} + }), + reconcilers.IgnoreStatusChange, ) ctrl, err := builder.Build(&r) @@ -127,14 +147,24 @@ func (r *Reconciler) reconcile(ctx context.Context, clh *helper.Client, fc *flow log.Error(err, "unable to obtain cluster ID") } + // Get subnets from netpols + netpols := networkingv1.NetworkPolicyList{} + selector := labels.NewSelector() + req, _ := labels.NewRequirement(netpolLabel, selection.Exists, nil) + selector = selector.Add(*req) + if err := r.Client.List(ctx, &netpols, &client.ListOptions{LabelSelector: selector}); err != nil { + return r.status.Error("CantListNetworkPolicies", err) + } + subnetLabels := getSubnetsFromPolicies(netpols.Items) + // Auto-detect subnets - var subnetLabels []flowslatest.SubnetLabel if r.mgr.ClusterInfo.IsOpenShift() && fc.Spec.Processor.HasAutoDetectOpenShiftNetworks() { var err error - subnetLabels, err = r.getOpenShiftSubnets(ctx) + openshiftLabels, err := r.getOpenShiftSubnets(ctx) if err != nil { log.Error(err, "error while reading subnet definitions") } + subnetLabels = append(subnetLabels, openshiftLabels...) } // List custom metrics @@ -287,6 +317,42 @@ func (r *Reconciler) getOpenShiftSubnets(ctx context.Context) ([]flowslatest.Sub return subnets, nil } +func getSubnetsFromPolicies(netpols []networkingv1.NetworkPolicy) []flowslatest.SubnetLabel { + var subnets []flowslatest.SubnetLabel + for _, np := range netpols { + if len(np.Labels) > 0 { + name := np.Labels[netpolLabel] + if len(name) > 0 { + var cidrs []string + for _, rule := range np.Spec.Ingress { + for _, peer := range rule.From { + if peer.IPBlock != nil && peer.IPBlock.CIDR != "" && !slices.Contains(cidrs, peer.IPBlock.CIDR) { + cidrs = append(cidrs, peer.IPBlock.CIDR) + } + } + } + for _, rule := range np.Spec.Egress { + for _, peer := range rule.To { + if peer.IPBlock != nil && peer.IPBlock.CIDR != "" && !slices.Contains(cidrs, peer.IPBlock.CIDR) { + cidrs = append(cidrs, peer.IPBlock.CIDR) + } + } + } + if len(cidrs) > 0 { + // sort CIDRs + slices.Sort(cidrs) + subnets = append(subnets, flowslatest.SubnetLabel{Name: name, CIDRs: cidrs}) + } + } + } + } + // Sort subnet by name + slices.SortFunc(subnets, func(a, b flowslatest.SubnetLabel) int { + return strings.Compare(a.Name, b.Name) + }) + return subnets +} + func readMachineNetworks(cm *corev1.ConfigMap) ([]flowslatest.SubnetLabel, error) { var subnets []flowslatest.SubnetLabel diff --git a/internal/controller/flp/flp_test.go b/internal/controller/flp/flp_test.go index cf443fdbe..29a14a5b4 100644 --- a/internal/controller/flp/flp_test.go +++ b/internal/controller/flp/flp_test.go @@ -20,6 +20,7 @@ import ( appsv1 "k8s.io/api/apps/v1" ascv2 "k8s.io/api/autoscaling/v2" corev1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" @@ -809,3 +810,78 @@ func TestToleration(t *testing.T) { assert.Len(ds.Spec.Template.Spec.Tolerations, 1) assert.Equal(corev1.Toleration{Operator: "Exists"}, ds.Spec.Template.Spec.Tolerations[0]) } + +func TestSubnetsFromNetpols(t *testing.T) { + netpols := []networkingv1.NetworkPolicy{ + { + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + netpolLabel: "my-database", + }}, + Spec: networkingv1.NetworkPolicySpec{ + Egress: []networkingv1.NetworkPolicyEgressRule{ + { + To: []networkingv1.NetworkPolicyPeer{ + { + IPBlock: &networkingv1.IPBlock{ + CIDR: "1.2.3.4/32", + }, + }, + { + IPBlock: &networkingv1.IPBlock{ + CIDR: "1.2.3.3/32", + }, + }, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ + netpolLabel: "01-vm", + }}, + Spec: networkingv1.NetworkPolicySpec{ + Egress: []networkingv1.NetworkPolicyEgressRule{ + { + To: []networkingv1.NetworkPolicyPeer{ + { + IPBlock: &networkingv1.IPBlock{ + CIDR: "20.20.20.20/24", + }, + }, + { + IPBlock: &networkingv1.IPBlock{ + CIDR: "10.10.10.10/24", + }, + }, + }, + }, + }, + }, + }, + { + // Ignored due to missing label + Spec: networkingv1.NetworkPolicySpec{ + Egress: []networkingv1.NetworkPolicyEgressRule{ + { + To: []networkingv1.NetworkPolicyPeer{ + { + IPBlock: &networkingv1.IPBlock{ + CIDR: "8.8.8.8/32", + }, + }, + }, + }, + }, + }, + }, + } + + subnetLabels := getSubnetsFromPolicies(netpols) + + // Should be ordered by name, and nexted CIDRs should be sorted + assert.Equal(t, []flowslatest.SubnetLabel{ + {Name: "01-vm", CIDRs: []string{"10.10.10.10/24", "20.20.20.20/24"}}, + {Name: "my-database", CIDRs: []string{"1.2.3.3/32", "1.2.3.4/32"}}, + }, subnetLabels) +}