Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion bundle/manifests/netobserv-operator.clusterserviceversion.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ metadata:
categories: Monitoring, Networking, Observability
console.openshift.io/plugins: '["netobserv-plugin"]'
containerImage: quay.io/netobserv/network-observability-operator:1.10.0-community
createdAt: "2025-11-25T09:57:07Z"
createdAt: "2025-11-26T13:16:01Z"
description: Network flows collector and monitoring solution
operatorframework.io/initialization-resource: '{"apiVersion":"flows.netobserv.io/v1beta2",
"kind":"FlowCollector","metadata":{"name":"cluster"},"spec": {}}'
Expand Down Expand Up @@ -901,6 +901,14 @@ spec:
- patch
- update
- watch
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- get
- list
- watch
- apiGroups:
- flows.netobserv.io
resources:
Expand Down
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ rules:
- patch
- update
- watch
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- get
- list
- watch
- apiGroups:
- flows.netobserv.io
resources:
Expand Down
8 changes: 8 additions & 0 deletions helm/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ rules:
- patch
- update
- watch
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- get
- list
- watch
- apiGroups:
- flows.netobserv.io
resources:
Expand Down
100 changes: 100 additions & 0 deletions internal/controller/networkpolicy/apiserver_endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package networkpolicy

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

const (
kubernetesServiceName = "kubernetes"
kubernetesServiceNamespace = "default"
)

// GetAPIServerEndpointIPs retrieves the API server endpoint IP addresses.
// It first tries to use EndpointSlice API (v1), and falls back to Endpoints API if unavailable.
func GetAPIServerEndpointIPs(ctx context.Context, cl client.Client) ([]string, error) {
logger := log.FromContext(ctx)

// Try EndpointSlice first (discovery.k8s.io/v1, available since k8s 1.21)
ips, err := getEndpointIPsFromEndpointSlice(ctx, cl)
if err == nil && len(ips) > 0 {
logger.V(1).Info("Retrieved API server endpoint IPs from EndpointSlice", "ips", ips)
return ips, nil
}

if err != nil {
logger.V(1).Info("Failed to get EndpointSlice, falling back to Endpoints API", "error", err)
}

// Fallback to Endpoints API (core/v1, deprecated but widely available)
ips, err = getEndpointIPsFromEndpoints(ctx, cl)
if err != nil {
return nil, fmt.Errorf("failed to get API server endpoint IPs: %w", err)
}

if len(ips) == 0 {
return nil, fmt.Errorf("no API server endpoint IPs found")
}

logger.V(1).Info("Retrieved API server endpoint IPs from Endpoints", "ips", ips)
return ips, nil
}

// getEndpointIPsFromEndpointSlice retrieves endpoint IPs using the EndpointSlice API
func getEndpointIPsFromEndpointSlice(ctx context.Context, cl client.Client) ([]string, error) {
// Get the EndpointSlice directly by name
endpointSlice := &discoveryv1.EndpointSlice{}
err := cl.Get(ctx, types.NamespacedName{
Name: kubernetesServiceName,
Namespace: kubernetesServiceNamespace,
}, endpointSlice)
if err != nil {
if errors.IsNotFound(err) {
return nil, fmt.Errorf("EndpointSlice for kubernetes service not found")
}
return nil, err
}

var ips []string
for j := range endpointSlice.Endpoints {
endpoint := &endpointSlice.Endpoints[j]
// Only use ready endpoints
if endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready {
ips = append(ips, endpoint.Addresses...)
}
}

return ips, nil
}

// getEndpointIPsFromEndpoints retrieves endpoint IPs using the legacy Endpoints API
func getEndpointIPsFromEndpoints(ctx context.Context, cl client.Client) ([]string, error) {
//nolint:staticcheck // SA1019: Endpoints is deprecated but used as fallback for k8s < 1.21
endpoints := &corev1.Endpoints{}
err := cl.Get(ctx, types.NamespacedName{
Name: kubernetesServiceName,
Namespace: kubernetesServiceNamespace,
}, endpoints)
if err != nil {
if errors.IsNotFound(err) {
return nil, fmt.Errorf("endpoints for kubernetes service not found")
}
return nil, err
}

var ips []string
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
ips = append(ips, address.IP)
}
}

return ips, nil
}
15 changes: 14 additions & 1 deletion internal/controller/networkpolicy/np_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,24 @@ func (r *Reconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result
}

func (r *Reconciler) reconcile(ctx context.Context, clh *helper.Client, desired *flowslatest.FlowCollector) error {
l := log.FromContext(ctx)

cni, err := r.mgr.ClusterInfo.GetCNI()
if err != nil {
return err
}
npName, desiredNp := buildMainNetworkPolicy(desired, r.mgr, cni)

// Get API server endpoint IPs for network policy
var apiServerIPs []string
if r.mgr.ClusterInfo.IsOpenShift() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is specific to Openshift? Is it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I see, this policy rule is only created for openshift; forget my comment :-)

apiServerIPs, err = GetAPIServerEndpointIPs(ctx, r.Client)
if err != nil {
l.Error(err, "Failed to get API server endpoint IPs")
return fmt.Errorf("cannot determine API server endpoint IPs: %w", err)
}
}

npName, desiredNp := buildMainNetworkPolicy(desired, r.mgr, cni, apiServerIPs)
if err := reconcilers.ReconcileNetworkPolicy(ctx, clh, npName, desiredNp); err != nil {
return err
}
Expand Down
47 changes: 45 additions & 2 deletions internal/controller/networkpolicy/np_objects.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package networkpolicy

import (
"net"

flowslatest "github.com/netobserv/network-observability-operator/api/flowcollector/v1beta2"
"github.com/netobserv/network-observability-operator/internal/controller/constants"
"github.com/netobserv/network-observability-operator/internal/pkg/cluster"
Expand All @@ -16,6 +18,22 @@ import (

const netpolName = "netobserv"

// ipToCIDR converts an IP address to a CIDR with proper prefix length
// IPv4 addresses get /32, IPv6 addresses get /128
func ipToCIDR(ipStr string) string {
ip := net.ParseIP(ipStr)
if ip == nil {
return ""
}

// Check if it's IPv4 (net.IP.To4() returns nil for IPv6)
if ip.To4() != nil {
return ipStr + "/32"
}
// IPv6
return ipStr + "/128"
}

func peerInNamespace(ns string) networkingv1.NetworkPolicyPeer {
return networkingv1.NetworkPolicyPeer{
NamespaceSelector: &metav1.LabelSelector{
Expand Down Expand Up @@ -50,7 +68,7 @@ func addAllowedNamespaces(np *networkingv1.NetworkPolicy, in, out []string) {
}
}

func buildMainNetworkPolicy(desired *flowslatest.FlowCollector, mgr *manager.Manager, cni cluster.NetworkType) (types.NamespacedName, *networkingv1.NetworkPolicy) {
func buildMainNetworkPolicy(desired *flowslatest.FlowCollector, mgr *manager.Manager, cni cluster.NetworkType, apiServerIPs []string) (types.NamespacedName, *networkingv1.NetworkPolicy) {
ns := desired.Spec.GetNamespace()

name := types.NamespacedName{Name: netpolName, Namespace: ns}
Expand Down Expand Up @@ -156,7 +174,8 @@ func buildMainNetworkPolicy(desired *flowslatest.FlowCollector, mgr *manager.Man
},
Ports: hostNetworkPorts,
})
// Allow fetching from apiserver

// Allow fetching from in-cluster apiserver namespaces
np.Spec.Egress = append(np.Spec.Egress, networkingv1.NetworkPolicyEgressRule{
To: []networkingv1.NetworkPolicyPeer{
peerInNamespaces([]string{constants.OpenShiftAPIServerNamespace, constants.OpenShiftKubeAPIServerNamespace}),
Expand All @@ -166,6 +185,30 @@ func buildMainNetworkPolicy(desired *flowslatest.FlowCollector, mgr *manager.Man
Port: ptr.To(intstr.FromInt32(constants.K8sAPIServerPort)),
}},
})

// Allow fetching from external apiserver (HyperShift and other external control planes)
// The kubernetes service may redirect to external endpoints on port 6443
if len(apiServerIPs) > 0 {
// Build a single egress rule with multiple IP peers
peers := []networkingv1.NetworkPolicyPeer{}
for _, ip := range apiServerIPs {
cidr := ipToCIDR(ip)
if cidr != "" {
peers = append(peers, networkingv1.NetworkPolicyPeer{
IPBlock: &networkingv1.IPBlock{
CIDR: cidr,
},
})
}
}
np.Spec.Egress = append(np.Spec.Egress, networkingv1.NetworkPolicyEgressRule{
To: peers,
Ports: []networkingv1.NetworkPolicyPort{{
Protocol: ptr.To(corev1.ProtocolTCP),
Port: ptr.To(intstr.FromInt32(constants.K8sAPIServerPort)),
}},
})
}
} else {
// Not OpenShift
// Allow fetching from apiserver / kube-system
Expand Down
Loading
Loading