Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 68 additions & 2 deletions internal/controller/flp/flp_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import (
"context"
"fmt"
"slices"
"strings"

flowslatest "github.com/netobserv/network-observability-operator/api/flowcollector/v1beta2"
metricslatest "github.com/netobserv/network-observability-operator/api/flowmetrics/v1alpha1"
Expand All @@ -18,6 +20,9 @@
appsv1 "k8s.io/api/apps/v1"
ascv2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -26,6 +31,8 @@
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

const netpolLabel = "netobserv.io/label-ipblocks"

// Reconciler reconciles the current flowlogs-pipeline state with the desired configuration
type Reconciler struct {
client.Client
Expand Down Expand Up @@ -62,6 +69,19 @@
return []reconcile.Request{}
}),
reconcilers.IgnoreStatusChange,
).
Watches(
&networkingv1.NetworkPolicy{},
handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request {
lbls := o.GetLabels()
if lbls != nil {
if _, ok := lbls[netpolLabel]; ok {
return []reconcile.Request{{NamespacedName: constants.FlowCollectorName}}
}
}
return []reconcile.Request{}
}),
reconcilers.IgnoreStatusChange,
)

ctrl, err := builder.Build(&r)
Expand Down Expand Up @@ -127,14 +147,24 @@
log.Error(err, "unable to obtain cluster ID")
}

// Get subnets from netpols
netpols := networkingv1.NetworkPolicyList{}
selector := labels.NewSelector()
req, _ := labels.NewRequirement(netpolLabel, selection.Exists, nil)
selector = selector.Add(*req)
if err := r.Client.List(ctx, &netpols, &client.ListOptions{LabelSelector: selector}); err != nil {
return r.status.Error("CantListNetworkPolicies", err)
}
subnetLabels := getSubnetsFromPolicies(netpols.Items)

// Auto-detect subnets
var subnetLabels []flowslatest.SubnetLabel
if r.mgr.ClusterInfo.IsOpenShift() && fc.Spec.Processor.HasAutoDetectOpenShiftNetworks() {
var err error
subnetLabels, err = r.getOpenShiftSubnets(ctx)
openshiftLabels, err := r.getOpenShiftSubnets(ctx)
if err != nil {
log.Error(err, "error while reading subnet definitions")
}
subnetLabels = append(subnetLabels, openshiftLabels...)
}

// List custom metrics
Expand Down Expand Up @@ -287,6 +317,42 @@
return subnets, nil
}

func getSubnetsFromPolicies(netpols []networkingv1.NetworkPolicy) []flowslatest.SubnetLabel {
var subnets []flowslatest.SubnetLabel
for _, np := range netpols {

Check failure on line 322 in internal/controller/flp/flp_controller.go

View workflow job for this annotation

GitHub Actions / Build, lint, test

rangeValCopy: each iteration copies 368 bytes (consider pointers or indexing) (gocritic)
if len(np.Labels) > 0 {
name := np.Labels[netpolLabel]
if len(name) > 0 {
var cidrs []string
for _, rule := range np.Spec.Ingress {
for _, peer := range rule.From {
if peer.IPBlock != nil && peer.IPBlock.CIDR != "" && !slices.Contains(cidrs, peer.IPBlock.CIDR) {
cidrs = append(cidrs, peer.IPBlock.CIDR)
}
}
}
for _, rule := range np.Spec.Egress {
for _, peer := range rule.To {
if peer.IPBlock != nil && peer.IPBlock.CIDR != "" && !slices.Contains(cidrs, peer.IPBlock.CIDR) {
cidrs = append(cidrs, peer.IPBlock.CIDR)
}
}
}
if len(cidrs) > 0 {
// sort CIDRs
slices.Sort(cidrs)
subnets = append(subnets, flowslatest.SubnetLabel{Name: name, CIDRs: cidrs})
}
}
}
}
// Sort subnet by name
slices.SortFunc(subnets, func(a, b flowslatest.SubnetLabel) int {
return strings.Compare(a.Name, b.Name)
})
return subnets
}

func readMachineNetworks(cm *corev1.ConfigMap) ([]flowslatest.SubnetLabel, error) {
var subnets []flowslatest.SubnetLabel

Expand Down
76 changes: 76 additions & 0 deletions internal/controller/flp/flp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
ascv2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -809,3 +810,78 @@ func TestToleration(t *testing.T) {
assert.Len(ds.Spec.Template.Spec.Tolerations, 1)
assert.Equal(corev1.Toleration{Operator: "Exists"}, ds.Spec.Template.Spec.Tolerations[0])
}

func TestSubnetsFromNetpols(t *testing.T) {
netpols := []networkingv1.NetworkPolicy{
{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
netpolLabel: "my-database",
}},
Spec: networkingv1.NetworkPolicySpec{
Egress: []networkingv1.NetworkPolicyEgressRule{
{
To: []networkingv1.NetworkPolicyPeer{
{
IPBlock: &networkingv1.IPBlock{
CIDR: "1.2.3.4/32",
},
},
{
IPBlock: &networkingv1.IPBlock{
CIDR: "1.2.3.3/32",
},
},
},
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
netpolLabel: "01-vm",
}},
Spec: networkingv1.NetworkPolicySpec{
Egress: []networkingv1.NetworkPolicyEgressRule{
{
To: []networkingv1.NetworkPolicyPeer{
{
IPBlock: &networkingv1.IPBlock{
CIDR: "20.20.20.20/24",
},
},
{
IPBlock: &networkingv1.IPBlock{
CIDR: "10.10.10.10/24",
},
},
},
},
},
},
},
{
// Ignored due to missing label
Spec: networkingv1.NetworkPolicySpec{
Egress: []networkingv1.NetworkPolicyEgressRule{
{
To: []networkingv1.NetworkPolicyPeer{
{
IPBlock: &networkingv1.IPBlock{
CIDR: "8.8.8.8/32",
},
},
},
},
},
},
},
}

subnetLabels := getSubnetsFromPolicies(netpols)

// Should be ordered by name, and nexted CIDRs should be sorted
assert.Equal(t, []flowslatest.SubnetLabel{
{Name: "01-vm", CIDRs: []string{"10.10.10.10/24", "20.20.20.20/24"}},
{Name: "my-database", CIDRs: []string{"1.2.3.3/32", "1.2.3.4/32"}},
}, subnetLabels)
}
Loading