From c26829fbb12c5bb1153a26c57bbffee8ac546b30 Mon Sep 17 00:00:00 2001 From: AlinsRan Date: Thu, 16 Oct 2025 15:37:01 +0800 Subject: [PATCH] feat: support resolve svc.ports[].appProtocol (#2601) (cherry picked from commit f28b34292be45587184d9faa8cb5bfc477d5d1b8) --- Makefile | 4 + api/v2/apisixroute_types.go | 2 +- api/v2/zz_generated.deepcopy.go | 5 + internal/adc/translator/apisixroute.go | 156 +++++++++++-- internal/adc/translator/gatewayproxy.go | 2 +- internal/adc/translator/grpcroute.go | 2 +- internal/adc/translator/httproute.go | 43 +++- internal/adc/translator/ingress.go | 288 ++++++++++++++---------- internal/adc/translator/tcproute.go | 163 ++++++++++++++ internal/adc/translator/tlsroute.go | 159 +++++++++++++ internal/adc/translator/udproute.go | 152 +++++++++++++ internal/types/k8s.go | 7 + test/e2e/crds/v2/route.go | 143 ++++++++++++ test/e2e/framework/manifests/nginx.yaml | 73 +++++- test/e2e/gatewayapi/httproute.go | 103 +++++++++ test/e2e/ingress/ingress.go | 127 +++++++++++ 16 files changed, 1282 insertions(+), 147 deletions(-) create mode 100644 internal/adc/translator/tcproute.go create mode 100644 internal/adc/translator/tlsroute.go create mode 100644 internal/adc/translator/udproute.go diff --git a/Makefile b/Makefile index d462fe27c..71faf0c1c 100644 --- a/Makefile +++ b/Makefile @@ -59,7 +59,11 @@ GO_LDFLAGS ?= "-X=$(VERSYM)=$(VERSION) -X=$(GITSHASYM)=$(GITSHA) -X=$(BUILDOSSYM # gateway-api GATEAY_API_VERSION ?= v1.3.0 ## https://github.com/kubernetes-sigs/gateway-api/blob/v1.3.0/pkg/features/httproute.go +<<<<<<< HEAD SUPPORTED_EXTENDED_FEATURES = "HTTPRouteDestinationPortMatching,HTTPRouteMethodMatching,HTTPRoutePortRedirect,HTTPRouteRequestMirror,HTTPRouteSchemeRedirect,GatewayAddressEmpty,HTTPRouteResponseHeaderModification,GatewayPort8080" +======= +SUPPORTED_EXTENDED_FEATURES = "HTTPRouteDestinationPortMatching,HTTPRouteMethodMatching,HTTPRoutePortRedirect,HTTPRouteRequestMirror,HTTPRouteSchemeRedirect,GatewayAddressEmpty,HTTPRouteResponseHeaderModification,GatewayPort8080,HTTPRouteHostRewrite,HTTPRouteQueryParamMatching,HTTPRoutePathRewrite,HTTPRouteBackendProtocolWebSocket" +>>>>>>> f28b3429 (feat: support resolve svc.ports[].appProtocol (#2601)) CONFORMANCE_TEST_REPORT_OUTPUT ?= $(DIR)/apisix-ingress-controller-conformance-report.yaml ## https://github.com/kubernetes-sigs/gateway-api/blob/v1.3.0/conformance/utils/suite/profiles.go CONFORMANCE_PROFILES ?= GATEWAY-HTTP,GATEWAY-GRPC diff --git a/api/v2/apisixroute_types.go b/api/v2/apisixroute_types.go index 14776b6c2..5b77accc0 100644 --- a/api/v2/apisixroute_types.go +++ b/api/v2/apisixroute_types.go @@ -94,7 +94,7 @@ type ApisixRouteHTTP struct { // Websocket enables or disables websocket support for this route. // +kubebuilder:validation:Optional - Websocket bool `json:"websocket" yaml:"websocket"` + Websocket *bool `json:"websocket" yaml:"websocket"` // PluginConfigName specifies the name of the plugin config to apply. PluginConfigName string `json:"plugin_config_name,omitempty" yaml:"plugin_config_name,omitempty"` // PluginConfigNamespace specifies the namespace of the plugin config. diff --git a/api/v2/zz_generated.deepcopy.go b/api/v2/zz_generated.deepcopy.go index 9a8c078cb..06675b10d 100644 --- a/api/v2/zz_generated.deepcopy.go +++ b/api/v2/zz_generated.deepcopy.go @@ -756,6 +756,11 @@ func (in *ApisixRouteHTTP) DeepCopyInto(out *ApisixRouteHTTP) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Websocket != nil { + in, out := &in.Websocket, &out.Websocket + *out = new(bool) + **out = **in + } if in.Plugins != nil { in, out := &in.Plugins, &out.Plugins *out = make([]ApisixRoutePlugin, len(*in)) diff --git a/internal/adc/translator/apisixroute.go b/internal/adc/translator/apisixroute.go index 505ba35f9..9355f4f7d 100644 --- a/internal/adc/translator/apisixroute.go +++ b/internal/adc/translator/apisixroute.go @@ -25,8 +25,12 @@ import ( "github.com/api7/gopkg/pkg/log" "github.com/pkg/errors" +<<<<<<< HEAD "go.uber.org/zap" v1 "k8s.io/api/core/v1" +======= + corev1 "k8s.io/api/core/v1" +>>>>>>> f28b3429 (feat: support resolve svc.ports[].appProtocol (#2601)) metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" @@ -72,10 +76,16 @@ func (t *Translator) translateHTTPRule(tctx *provider.TranslateContext, ar *apiv return nil, err } + var enableWebsocket *bool service := t.buildService(ar, rule, ruleIndex) +<<<<<<< HEAD t.buildRoute(ar, service, rule, plugins, timeout, vars) t.buildUpstream(tctx, service, ar, rule, ruleIndex) +======= + t.buildUpstream(tctx, service, ar, rule, &enableWebsocket) + t.buildRoute(ar, service, rule, plugins, timeout, vars, &enableWebsocket) +>>>>>>> f28b3429 (feat: support resolve svc.ports[].appProtocol (#2601)) return service, nil } @@ -141,7 +151,7 @@ func (t *Translator) loadRoutePlugins(tctx *provider.TranslateContext, ar *apiv2 } } -func (t *Translator) buildPluginConfig(plugin apiv2.ApisixRoutePlugin, namespace string, secrets map[types.NamespacedName]*v1.Secret) map[string]any { +func (t *Translator) buildPluginConfig(plugin apiv2.ApisixRoutePlugin, namespace string, secrets map[types.NamespacedName]*corev1.Secret) map[string]any { config := make(map[string]any) if len(plugin.Config.Raw) > 0 { if err := json.Unmarshal(plugin.Config.Raw, &config); err != nil { @@ -181,13 +191,16 @@ func (t *Translator) addAuthenticationPlugins(rule apiv2.ApisixRouteHTTP, plugin } } -func (t *Translator) buildRoute(ar *apiv2.ApisixRoute, service *adc.Service, rule apiv2.ApisixRouteHTTP, plugins adc.Plugins, timeout *adc.Timeout, vars adc.Vars) { +func (t *Translator) buildRoute(ar *apiv2.ApisixRoute, service *adc.Service, rule apiv2.ApisixRouteHTTP, plugins adc.Plugins, timeout *adc.Timeout, vars adc.Vars, enableWebsocket **bool) { route := adc.NewDefaultRoute() route.Name = adc.ComposeRouteName(ar.Namespace, ar.Name, rule.Name) route.ID = id.GenID(route.Name) route.Desc = "Created by apisix-ingress-controller, DO NOT modify it manually" route.Labels = label.GenLabel(ar) - route.EnableWebsocket = ptr.To(rule.Websocket) + route.EnableWebsocket = rule.Websocket + if route.EnableWebsocket == nil && *enableWebsocket != nil { + route.EnableWebsocket = *enableWebsocket + } route.FilterFunc = rule.Match.FilterFunc route.Hosts = rule.Match.Hosts route.Methods = rule.Match.Methods @@ -204,7 +217,11 @@ func (t *Translator) buildRoute(ar *apiv2.ApisixRoute, service *adc.Service, rul service.Routes = []*adc.Route{route} } +<<<<<<< HEAD func (t *Translator) buildUpstream(tctx *provider.TranslateContext, service *adc.Service, ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteHTTP, ruleIndex int) { +======= +func (t *Translator) buildUpstream(tctx *provider.TranslateContext, service *adc.Service, ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteHTTP, enableWebsocket **bool) { +>>>>>>> f28b3429 (feat: support resolve svc.ports[].appProtocol (#2601)) var ( upstreams = make([]*adc.Upstream, 0) weightedUpstreams = make([]adc.TrafficSplitConfigRuleWeightedUpstream, 0) @@ -215,9 +232,16 @@ func (t *Translator) buildUpstream(tctx *provider.TranslateContext, service *adc upstream := adc.NewDefaultUpstream() // try to get the apisixupstream with the same name as the backend service to be upstream config. // err is ignored because it does not care about the externalNodes of the apisixupstream. +<<<<<<< HEAD auNN := types.NamespacedName{Namespace: ar.GetNamespace(), Name: backend.ServiceName} if au, ok := tctx.Upstreams[auNN]; ok { upstream, _ = t.translateApisixUpstream(tctx, au) +======= + upstream, err := t.translateApisixRouteHTTPBackend(tctx, ar, backend, enableWebsocket) + if err != nil { + t.Log.Error(err, "failed to translate ApisixRoute backend", "backend", backend) + continue +>>>>>>> f28b3429 (feat: support resolve svc.ports[].appProtocol (#2601)) } if backend.ResolveGranularity == apiv2.ResolveGranularityService { @@ -332,7 +356,7 @@ func (t *Translator) buildService(ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteH return service } -func getPortFromService(svc *v1.Service, backendSvcPort intstr.IntOrString) (int32, error) { +func getPortFromService(svc *corev1.Service, backendSvcPort intstr.IntOrString) (int32, error) { var port int32 if backendSvcPort.Type == intstr.Int { port = int32(backendSvcPort.IntValue()) @@ -352,32 +376,107 @@ func getPortFromService(svc *v1.Service, backendSvcPort intstr.IntOrString) (int return port, nil } +<<<<<<< HEAD func (t *Translator) translateApisixRouteBackendResolveGranularityService(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, error) { +======= +func findMatchingServicePort(svc *corev1.Service, backendSvcPort intstr.IntOrString) (*corev1.ServicePort, error) { + var servicePort *corev1.ServicePort + var portNumber int32 = -1 + var servicePortName string + switch backendSvcPort.Type { + case intstr.Int: + portNumber = backendSvcPort.IntVal + case intstr.String: + servicePortName = backendSvcPort.StrVal + } + for _, svcPort := range svc.Spec.Ports { + p := svcPort + if p.Port == portNumber || (p.Name != "" && p.Name == servicePortName) { + servicePort = &p + break + } + } + if servicePort == nil { + return nil, errors.Errorf("service port %s not found in service %s", backendSvcPort.String(), svc.Name) + } + + return servicePort, nil +} + +func (t *Translator) translateApisixRouteHTTPBackend(tctx *provider.TranslateContext, ar *apiv2.ApisixRoute, backend apiv2.ApisixRouteHTTPBackend, enableWebsocket **bool) (*adc.Upstream, error) { + auNN := types.NamespacedName{ + Namespace: ar.Namespace, + Name: backend.ServiceName, + } + upstream := adc.NewDefaultUpstream() + if au, ok := tctx.Upstreams[auNN]; ok { + svc := tctx.Services[auNN] + if svc == nil { + return nil, errors.Errorf("service not found, ApisixRoute: %s, Service: %s", utils.NamespacedName(ar).String(), auNN) + } + port, err := getPortFromService(svc, backend.ServicePort) + if err != nil { + return nil, err + } + u, err := t.translateApisixUpstreamForPort(tctx, au, ptr.To(port)) + if err != nil { + return nil, err + } + upstream = u + } + var ( + err error + nodes adc.UpstreamNodes + protocol string + ) + if backend.ResolveGranularity == apiv2.ResolveGranularityService { + nodes, protocol, err = t.translateApisixRouteBackendResolveGranularityService(tctx, auNN, backend) + } else { + nodes, protocol, err = t.translateApisixRouteBackendResolveGranularityEndpoint(tctx, auNN, backend) + } + if err != nil { + return nil, err + } + upstream.Nodes = nodes + if upstream.Scheme == "" { + upstream.Scheme = appProtocolToUpstreamScheme(protocol) + } + if protocol == internaltypes.AppProtocolWS || protocol == internaltypes.AppProtocolWSS { + *enableWebsocket = ptr.To(true) + } + if backend.Weight != nil { + upstream.Labels["meta_weight"] = strconv.FormatInt(int64(*backend.Weight), 10) + } + return upstream, nil +} + +func (t *Translator) translateApisixRouteBackendResolveGranularityService(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, string, error) { +>>>>>>> f28b3429 (feat: support resolve svc.ports[].appProtocol (#2601)) serviceNN := types.NamespacedName{ Namespace: arNN.Namespace, Name: backend.ServiceName, } svc, ok := tctx.Services[serviceNN] if !ok { - return nil, errors.Errorf("service not found, ApisixRoute: %s, Service: %s", arNN, serviceNN) + return nil, "", errors.Errorf("service not found, ApisixRoute: %s, Service: %s", arNN, serviceNN) } if svc.Spec.ClusterIP == "" { - return nil, errors.Errorf("conflict headless service and backend resolve granularity, ApisixRoute: %s, Service: %s", arNN, serviceNN) + return nil, "", errors.Errorf("conflict headless service and backend resolve granularity, ApisixRoute: %s, Service: %s", arNN, serviceNN) } - port, err := getPortFromService(svc, backend.ServicePort) + port, err := findMatchingServicePort(svc, backend.ServicePort) if err != nil { - return nil, err + return nil, "", err } return adc.UpstreamNodes{ { Host: svc.Spec.ClusterIP, - Port: int(port), + Port: int(port.Port), Weight: *cmp.Or(backend.Weight, ptr.To(apiv2.DefaultWeight)), }, - }, nil + }, ptr.Deref(port.AppProtocol, ""), nil } -func (t *Translator) translateApisixRouteStreamBackendResolveGranularity(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteStreamBackend) (adc.UpstreamNodes, error) { +func (t *Translator) translateApisixRouteStreamBackendResolveGranularity(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteStreamBackend) (adc.UpstreamNodes, string, error) { tsBackend := apiv2.ApisixRouteHTTPBackend{ ServiceName: backend.ServiceName, ServicePort: backend.ServicePort, @@ -391,18 +490,18 @@ func (t *Translator) translateApisixRouteStreamBackendResolveGranularity(tctx *p } } -func (t *Translator) translateApisixRouteBackendResolveGranularityEndpoint(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, error) { +func (t *Translator) translateApisixRouteBackendResolveGranularityEndpoint(tctx *provider.TranslateContext, arNN types.NamespacedName, backend apiv2.ApisixRouteHTTPBackend) (adc.UpstreamNodes, string, error) { serviceNN := types.NamespacedName{ Namespace: arNN.Namespace, Name: backend.ServiceName, } svc, ok := tctx.Services[serviceNN] if !ok { - return nil, errors.Errorf("service not found, ApisixRoute: %s, Service: %s", arNN, serviceNN) + return nil, "", errors.Errorf("service not found, ApisixRoute: %s, Service: %s", arNN, serviceNN) } port, err := getPortFromService(svc, backend.ServicePort) if err != nil { - return nil, err + return nil, "", err } weight := int32(*cmp.Or(backend.Weight, ptr.To(apiv2.DefaultWeight))) backendRef := gatewayv1.BackendRef{ @@ -451,3 +550,32 @@ func (t *Translator) translateStreamRule(tctx *provider.TranslateContext, ar *ap svc.Upstream = upstream return svc, nil } +<<<<<<< HEAD +======= + +func (t *Translator) translateApisixRouteStreamBackend(tctx *provider.TranslateContext, ar *apiv2.ApisixRoute, backend apiv2.ApisixRouteStreamBackend) (*adc.Upstream, error) { + auNN := types.NamespacedName{Namespace: ar.GetNamespace(), Name: backend.ServiceName} + upstream := adc.NewDefaultUpstream() + if au, ok := tctx.Upstreams[auNN]; ok { + service := tctx.Services[auNN] + if service == nil { + return nil, errors.Errorf("service not found, ApisixRoute: %s, Service: %s", utils.NamespacedName(ar), auNN) + } + port, err := getPortFromService(service, backend.ServicePort) + if err != nil { + return nil, err + } + u, err := t.translateApisixUpstreamForPort(tctx, au, ptr.To(port)) + if err != nil { + return nil, err + } + upstream = u + } + nodes, _, err := t.translateApisixRouteStreamBackendResolveGranularity(tctx, utils.NamespacedName(ar), backend) + if err != nil { + return nil, err + } + upstream.Nodes = nodes + return upstream, nil +} +>>>>>>> f28b3429 (feat: support resolve svc.ports[].appProtocol (#2601)) diff --git a/internal/adc/translator/gatewayproxy.go b/internal/adc/translator/gatewayproxy.go index 8b7fb673e..64edf2462 100644 --- a/internal/adc/translator/gatewayproxy.go +++ b/internal/adc/translator/gatewayproxy.go @@ -100,7 +100,7 @@ func (t *Translator) TranslateGatewayProxyToConfig(tctx *provider.TranslateConte if endpoint == nil { return nil, nil } - upstreamNodes, err := t.TranslateBackendRefWithFilter(tctx, gatewayv1.BackendRef{ + upstreamNodes, _, err := t.TranslateBackendRefWithFilter(tctx, gatewayv1.BackendRef{ BackendObjectReference: gatewayv1.BackendObjectReference{ Name: gatewayv1.ObjectName(provider.ControlPlane.Service.Name), Namespace: (*gatewayv1.Namespace)(&gatewayProxy.Namespace), diff --git a/internal/adc/translator/grpcroute.go b/internal/adc/translator/grpcroute.go index d24fc8bc7..abe6dfab0 100644 --- a/internal/adc/translator/grpcroute.go +++ b/internal/adc/translator/grpcroute.go @@ -183,7 +183,7 @@ func (t *Translator) TranslateGRPCRoute(tctx *provider.TranslateContext, grpcRou backend.Namespace = &namespace } upstream := adctypes.NewDefaultUpstream() - upNodes, err := t.translateBackendRef(tctx, backend.BackendRef, DefaultEndpointFilter) + upNodes, _, err := t.translateBackendRef(tctx, backend.BackendRef, DefaultEndpointFilter) if err != nil { backendErr = err continue diff --git a/internal/adc/translator/httproute.go b/internal/adc/translator/httproute.go index ddb5d329e..9e4adf209 100644 --- a/internal/adc/translator/httproute.go +++ b/internal/adc/translator/httproute.go @@ -373,13 +373,15 @@ func DefaultEndpointFilter(endpoint *discoveryv1.Endpoint) bool { return true } -func (t *Translator) TranslateBackendRefWithFilter(tctx *provider.TranslateContext, ref gatewayv1.BackendRef, endpointFilter func(*discoveryv1.Endpoint) bool) (adctypes.UpstreamNodes, error) { +func (t *Translator) TranslateBackendRefWithFilter(tctx *provider.TranslateContext, ref gatewayv1.BackendRef, endpointFilter func(*discoveryv1.Endpoint) bool) (adctypes.UpstreamNodes, string, error) { return t.translateBackendRef(tctx, ref, endpointFilter) } -func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref gatewayv1.BackendRef, endpointFilter func(*discoveryv1.Endpoint) bool) (adctypes.UpstreamNodes, error) { +func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref gatewayv1.BackendRef, endpointFilter func(*discoveryv1.Endpoint) bool) (adctypes.UpstreamNodes, string, error) { + nodes := adctypes.UpstreamNodes{} + var protocol string if ref.Kind != nil && *ref.Kind != internaltypes.KindService { - return adctypes.UpstreamNodes{}, fmt.Errorf("kind %s is not supported", *ref.Kind) + return nodes, protocol, fmt.Errorf("kind %s is not supported", *ref.Kind) } key := types.NamespacedName{ @@ -388,7 +390,7 @@ func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref ga } service, ok := tctx.Services[key] if !ok { - return adctypes.UpstreamNodes{}, fmt.Errorf("service %s not found", key) + return nodes, protocol, fmt.Errorf("service %s not found", key) } weight := 1 @@ -407,7 +409,7 @@ func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref ga Port: port, Weight: weight, }, - }, nil + }, protocol, nil } var portName *string @@ -415,16 +417,18 @@ func (t *Translator) translateBackendRef(tctx *provider.TranslateContext, ref ga for _, p := range service.Spec.Ports { if int(p.Port) == int(*ref.Port) { portName = ptr.To(p.Name) + protocol = ptr.Deref(p.AppProtocol, "") break } } if portName == nil { - return adctypes.UpstreamNodes{}, nil + return adctypes.UpstreamNodes{}, protocol, nil } } endpointSlices := tctx.EndpointSlices[key] - return t.translateEndpointSlice(portName, weight, endpointSlices, endpointFilter), nil + nodes = t.translateEndpointSlice(portName, weight, endpointSlices, endpointFilter) + return nodes, protocol, nil } // calculateHTTPRoutePriority calculates the priority of the HTTP route. @@ -526,6 +530,7 @@ func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRou upstreams = make([]*adctypes.Upstream, 0) weightedUpstreams = make([]adctypes.TrafficSplitConfigRuleWeightedUpstream, 0) backendErr error + enableWebsocket *bool ) for _, backend := range rule.BackendRefs { @@ -534,7 +539,7 @@ func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRou backend.Namespace = &namespace } upstream := adctypes.NewDefaultUpstream() - upNodes, err := t.translateBackendRef(tctx, backend.BackendRef, DefaultEndpointFilter) + upNodes, protocol, err := t.translateBackendRef(tctx, backend.BackendRef, DefaultEndpointFilter) if err != nil { backendErr = err continue @@ -542,10 +547,13 @@ func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRou if len(upNodes) == 0 { continue } + if protocol == internaltypes.AppProtocolWS || protocol == internaltypes.AppProtocolWSS { + enableWebsocket = ptr.To(true) + } t.AttachBackendTrafficPolicyToUpstream(backend.BackendRef, tctx.BackendTrafficPolicies, upstream) upstream.Nodes = upNodes - + upstream.Scheme = appProtocolToUpstreamScheme(protocol) var ( kind string port int32 @@ -666,7 +674,7 @@ func (t *Translator) TranslateHTTPRoute(tctx *provider.TranslateContext, httpRou route.Name = name route.ID = id.GenID(name) route.Labels = labels - route.EnableWebsocket = ptr.To(true) + route.EnableWebsocket = enableWebsocket // Set the route priority priority := calculateHTTPRoutePriority(&match, ruleIndex, hosts) @@ -809,3 +817,18 @@ func (t *Translator) translateHTTPRouteHeaderMatchToVars(header gatewayv1.HTTPHe } return HeaderMatchToVars(matchType, string(header.Name), header.Value) } + +func appProtocolToUpstreamScheme(appProtocol string) string { + switch appProtocol { + case internaltypes.AppProtocolHTTP: + return apiv2.SchemeHTTP + case internaltypes.AppProtocolHTTPS: + return apiv2.SchemeHTTPS + case internaltypes.AppProtocolWS: + return apiv2.SchemeHTTP + case internaltypes.AppProtocolWSS: + return apiv2.SchemeHTTPS + default: + return "" + } +} diff --git a/internal/adc/translator/ingress.go b/internal/adc/translator/ingress.go index f17b159fa..fc8be571b 100644 --- a/internal/adc/translator/ingress.go +++ b/internal/adc/translator/ingress.go @@ -25,6 +25,7 @@ import ( discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" adctypes "github.com/apache/apisix-ingress-controller/api/adc" "github.com/apache/apisix-ingress-controller/internal/controller/label" @@ -69,13 +70,51 @@ func (t *Translator) translateIngressTLS(ingressTLS *networkingv1.IngressTLS, se return ssl, nil } -func (t *Translator) TranslateIngress(tctx *provider.TranslateContext, obj *networkingv1.Ingress) (*TranslateResult, error) { +func (t *Translator) TranslateIngress( + tctx *provider.TranslateContext, + obj *networkingv1.Ingress, +) (*TranslateResult, error) { result := &TranslateResult{} labels := label.GenLabel(obj) // handle TLS configuration, convert to SSL objects +<<<<<<< HEAD for _, tls := range obj.Spec.TLS { +======= + if err := t.translateIngressTLSSection(tctx, obj, result, labels); err != nil { + return nil, err + } + + // process Ingress rules, convert to Service and Route objects + for i, rule := range obj.Spec.Rules { + if rule.HTTP == nil { + continue + } + + hosts := []string{} + if rule.Host != "" { + hosts = append(hosts, rule.Host) + } + + for j, path := range rule.HTTP.Paths { + if svc := t.buildServiceFromIngressPath(tctx, obj, &path, i, j, hosts, labels); svc != nil { + result.Services = append(result.Services, svc) + } + } + } + + return result, nil +} + +func (t *Translator) translateIngressTLSSection( + tctx *provider.TranslateContext, + obj *networkingv1.Ingress, + result *TranslateResult, + labels map[string]string, +) error { + for tlsIndex, tls := range obj.Spec.TLS { +>>>>>>> f28b3429 (feat: support resolve svc.ports[].appProtocol (#2601)) if tls.SecretName == "" { continue } @@ -88,137 +127,150 @@ func (t *Translator) TranslateIngress(tctx *provider.TranslateContext, obj *netw } ssl, err := t.translateIngressTLS(&tls, secret, labels) if err != nil { - return nil, err + return err } - result.SSL = append(result.SSL, ssl) } + return nil +} - // process Ingress rules, convert to Service and Route objects - for i, rule := range obj.Spec.Rules { - // extract hostnames - var hosts []string - if rule.Host != "" { - hosts = append(hosts, rule.Host) - } - // if there is no HTTP path, skip - if rule.HTTP == nil { - continue - } +func (t *Translator) buildServiceFromIngressPath( + tctx *provider.TranslateContext, + obj *networkingv1.Ingress, + path *networkingv1.HTTPIngressPath, + ruleIndex, pathIndex int, + hosts []string, + labels map[string]string, +) *adctypes.Service { + if path.Backend.Service == nil { + return nil + } - // create a service for each path - for j, path := range rule.HTTP.Paths { - if path.Backend.Service == nil { - continue - } + service := adctypes.NewDefaultService() + service.Labels = labels + service.Name = adctypes.ComposeServiceNameWithRule(obj.Namespace, obj.Name, fmt.Sprintf("%d-%d", ruleIndex, pathIndex)) + service.ID = id.GenID(service.Name) + service.Hosts = hosts - service := adctypes.NewDefaultService() - service.Labels = labels - service.Name = adctypes.ComposeServiceNameWithRule(obj.Namespace, obj.Name, fmt.Sprintf("%d-%d", i, j)) - service.ID = id.GenID(service.Name) - service.Hosts = hosts + upstream := adctypes.NewDefaultUpstream() + protocol := t.resolveIngressUpstream(tctx, obj, path.Backend.Service, upstream) + service.Upstream = upstream - // create an upstream - upstream := adctypes.NewDefaultUpstream() + route := buildRouteFromIngressPath(obj, path, ruleIndex, pathIndex, labels) + if protocol == internaltypes.AppProtocolWS || protocol == internaltypes.AppProtocolWSS { + route.EnableWebsocket = ptr.To(true) + } + service.Routes = []*adctypes.Route{route} - // get the EndpointSlice of the backend service - backendService := path.Backend.Service - if backendService != nil { - backendRef := convertBackendRef(obj.Namespace, backendService.Name, internaltypes.KindService) - t.AttachBackendTrafficPolicyToUpstream(backendRef, tctx.BackendTrafficPolicies, upstream) - } + t.fillHTTPRoutePoliciesForIngress(tctx, service.Routes) + return service +} - // get the service port configuration - var servicePort int32 = 0 - var servicePortName string - if backendService.Port.Number != 0 { - servicePort = backendService.Port.Number - } else if backendService.Port.Name != "" { - servicePortName = backendService.Port.Name - } +func (t *Translator) resolveIngressUpstream( + tctx *provider.TranslateContext, + obj *networkingv1.Ingress, + backendService *networkingv1.IngressServiceBackend, + upstream *adctypes.Upstream, +) string { + backendRef := convertBackendRef(obj.Namespace, backendService.Name, internaltypes.KindService) + t.AttachBackendTrafficPolicyToUpstream(backendRef, tctx.BackendTrafficPolicies, upstream) + // determine service port/port name + var protocol string + var servicePort int32 = 0 + var servicePortName string + if backendService.Port.Number != 0 { + servicePort = backendService.Port.Number + } else if backendService.Port.Name != "" { + servicePortName = backendService.Port.Name + } - getService := tctx.Services[types.NamespacedName{ - Namespace: obj.Namespace, - Name: backendService.Name, - }] - if getService == nil { - continue - } - if getService.Spec.Type == corev1.ServiceTypeExternalName { - defaultServicePort := 80 - if servicePort > 0 { - defaultServicePort = int(servicePort) - } - upstream.Nodes = adctypes.UpstreamNodes{ - { - Host: getService.Spec.ExternalName, - Port: defaultServicePort, - Weight: 1, - }, - } - } else { - var getServicePort *corev1.ServicePort - for _, port := range getService.Spec.Ports { - port := port - if servicePort > 0 && port.Port == servicePort { - getServicePort = &port - break - } - if servicePortName != "" && port.Name == servicePortName { - getServicePort = &port - break - } - } - endpointSlices := tctx.EndpointSlices[types.NamespacedName{ - Namespace: obj.Namespace, - Name: backendService.Name, - }] - // convert the EndpointSlice to upstream nodes - if len(endpointSlices) > 0 { - upstream.Nodes = t.translateEndpointSliceForIngress(1, endpointSlices, getServicePort) - } - } + getService := tctx.Services[types.NamespacedName{ + Namespace: obj.Namespace, + Name: backendService.Name, + }] + if getService == nil { + return protocol + } - service.Upstream = upstream - - // create a route - route := adctypes.NewDefaultRoute() - route.Name = adctypes.ComposeRouteName(obj.Namespace, obj.Name, fmt.Sprintf("%d-%d", i, j)) - route.ID = id.GenID(route.Name) - route.Labels = labels - - uris := []string{path.Path} - if path.PathType != nil { - switch *path.PathType { - case networkingv1.PathTypePrefix: - // As per the specification of Ingress path matching rule: - // if the last element of the path is a substring of the - // last element in request path, it is not a match, e.g. /foo/bar - // matches /foo/bar/baz, but does not match /foo/barbaz. - // While in APISIX, /foo/bar matches both /foo/bar/baz and - // /foo/barbaz. - // In order to be conformant with Ingress specification, here - // we create two paths here, the first is the path itself - // (exact match), the other is path + "/*" (prefix match). - prefix := path.Path - if strings.HasSuffix(prefix, "/") { - prefix += "*" - } else { - prefix += "/*" - } - uris = append(uris, prefix) - case networkingv1.PathTypeImplementationSpecific: - uris = []string{"/*"} - } - } - route.Uris = uris - service.Routes = []*adctypes.Route{route} - t.fillHTTPRoutePoliciesForIngress(tctx, service.Routes) - result.Services = append(result.Services, service) + if getService.Spec.Type == corev1.ServiceTypeExternalName { + defaultServicePort := 80 + if servicePort > 0 { + defaultServicePort = int(servicePort) } + upstream.Nodes = adctypes.UpstreamNodes{ + { + Host: getService.Spec.ExternalName, + Port: defaultServicePort, + Weight: 1, + }, + } + return protocol } - return result, nil + // find matching service port object + var getServicePort *corev1.ServicePort + for _, port := range getService.Spec.Ports { + p := port + if servicePort > 0 && p.Port == servicePort { + getServicePort = &p + break + } + if servicePortName != "" && p.Name == servicePortName { + getServicePort = &p + break + } + } + + if getServicePort != nil && getServicePort.AppProtocol != nil { + protocol = *getServicePort.AppProtocol + if upstream.Scheme == "" { + upstream.Scheme = appProtocolToUpstreamScheme(*getServicePort.AppProtocol) + } + } + + endpointSlices := tctx.EndpointSlices[types.NamespacedName{ + Namespace: obj.Namespace, + Name: backendService.Name, + }] + if len(endpointSlices) > 0 { + upstream.Nodes = t.translateEndpointSliceForIngress(1, endpointSlices, getServicePort) + } + + return protocol +} + +func buildRouteFromIngressPath( + obj *networkingv1.Ingress, + path *networkingv1.HTTPIngressPath, + ruleIndex, pathIndex int, + labels map[string]string, +) *adctypes.Route { + route := adctypes.NewDefaultRoute() + route.Name = adctypes.ComposeRouteName(obj.Namespace, obj.Name, fmt.Sprintf("%d-%d", ruleIndex, pathIndex)) + route.ID = id.GenID(route.Name) + route.Labels = labels + + uris := []string{path.Path} + if path.PathType != nil { + switch *path.PathType { + case networkingv1.PathTypePrefix: + // As per the specification of Ingress path matching rule: + // if the last element of the path is a substring of the + // last element in request path, it is not a match, e.g. /foo/bar + // matches /foo/bar/baz, but does not match /foo/barbaz. + // While in APISIX, /foo/bar matches both /foo/bar/baz and + // /foo/barbaz. + // In order to be conformant with Ingress specification, here + // we create two paths here, the first is the path itself + // (exact match), the other is path + "/*" (prefix match). + prefix := strings.TrimSuffix(path.Path, "/") + "/*" + uris = append(uris, prefix) + case networkingv1.PathTypeImplementationSpecific: + uris = []string{"/*"} + } + } + route.Uris = uris + return route } // translateEndpointSliceForIngress create upstream nodes from EndpointSlice diff --git a/internal/adc/translator/tcproute.go b/internal/adc/translator/tcproute.go new file mode 100644 index 000000000..fc9c0b133 --- /dev/null +++ b/internal/adc/translator/tcproute.go @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package translator + +import ( + "fmt" + + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + + adctypes "github.com/apache/apisix-ingress-controller/api/adc" + apiv2 "github.com/apache/apisix-ingress-controller/api/v2" + "github.com/apache/apisix-ingress-controller/internal/controller/label" + "github.com/apache/apisix-ingress-controller/internal/id" + "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/types" +) + +func newDefaultUpstreamWithoutScheme() *adctypes.Upstream { + return &adctypes.Upstream{ + Metadata: adctypes.Metadata{ + Labels: map[string]string{ + "managed-by": "apisix-ingress-controller", + }, + }, + Nodes: make(adctypes.UpstreamNodes, 0), + } +} + +func (t *Translator) TranslateTCPRoute(tctx *provider.TranslateContext, tcpRoute *gatewayv1alpha2.TCPRoute) (*TranslateResult, error) { + result := &TranslateResult{} + rules := tcpRoute.Spec.Rules + labels := label.GenLabel(tcpRoute) + for ruleIndex, rule := range rules { + service := adctypes.NewDefaultService() + service.Labels = labels + service.Name = adctypes.ComposeServiceNameWithStream(tcpRoute.Namespace, tcpRoute.Name, fmt.Sprintf("%d", ruleIndex), "TCP") + service.ID = id.GenID(service.Name) + var ( + upstreams = make([]*adctypes.Upstream, 0) + weightedUpstreams = make([]adctypes.TrafficSplitConfigRuleWeightedUpstream, 0) + ) + for _, backend := range rule.BackendRefs { + if backend.Namespace == nil { + namespace := gatewayv1.Namespace(tcpRoute.Namespace) + backend.Namespace = &namespace + } + upstream := newDefaultUpstreamWithoutScheme() + upNodes, _, err := t.translateBackendRef(tctx, backend, DefaultEndpointFilter) + if err != nil { + continue + } + if len(upNodes) == 0 { + continue + } + // TODO: Confirm BackendTrafficPolicy attachment with e2e test case. + t.AttachBackendTrafficPolicyToUpstream(backend, tctx.BackendTrafficPolicies, upstream) + upstream.Nodes = upNodes + var ( + kind string + port int32 + ) + if backend.Kind == nil { + kind = types.KindService + } else { + kind = string(*backend.Kind) + } + if backend.Port != nil { + port = int32(*backend.Port) + } + namespace := string(*backend.Namespace) + name := string(backend.Name) + upstreamName := adctypes.ComposeUpstreamNameForBackendRef(kind, namespace, name, port) + upstream.Name = upstreamName + upstream.ID = id.GenID(upstreamName) + upstreams = append(upstreams, upstream) + } + + // Handle multiple backends with traffic-split plugin + if len(upstreams) == 0 { + // Create a default upstream if no valid backends + upstream := adctypes.NewDefaultUpstream() + service.Upstream = upstream + } else if len(upstreams) == 1 { + // Single backend - use directly as service upstream + service.Upstream = upstreams[0] + // remove the id and name of the service.upstream, adc schema does not need id and name for it + service.Upstream.ID = "" + service.Upstream.Name = "" + } else { + // Multiple backends - use traffic-split plugin + service.Upstream = upstreams[0] + // remove the id and name of the service.upstream, adc schema does not need id and name for it + service.Upstream.ID = "" + service.Upstream.Name = "" + + upstreams = upstreams[1:] + + if len(upstreams) > 0 { + service.Upstreams = upstreams + } + + // Set weight in traffic-split for the default upstream + weight := apiv2.DefaultWeight + if rule.BackendRefs[0].Weight != nil { + weight = int(*rule.BackendRefs[0].Weight) + } + weightedUpstreams = append(weightedUpstreams, adctypes.TrafficSplitConfigRuleWeightedUpstream{ + Weight: weight, + }) + + // Set other upstreams in traffic-split using upstream_id + for i, upstream := range upstreams { + weight := apiv2.DefaultWeight + // get weight from the backend refs starting from the second backend + if i+1 < len(rule.BackendRefs) && rule.BackendRefs[i+1].Weight != nil { + weight = int(*rule.BackendRefs[i+1].Weight) + } + weightedUpstreams = append(weightedUpstreams, adctypes.TrafficSplitConfigRuleWeightedUpstream{ + UpstreamID: upstream.ID, + Weight: weight, + }) + } + + if len(weightedUpstreams) > 0 { + if service.Plugins == nil { + service.Plugins = make(map[string]any) + } + service.Plugins["traffic-split"] = &adctypes.TrafficSplitConfig{ + Rules: []adctypes.TrafficSplitConfigRule{ + { + WeightedUpstreams: weightedUpstreams, + }, + }, + } + } + } + streamRoute := adctypes.NewDefaultStreamRoute() + streamRouteName := adctypes.ComposeStreamRouteName(tcpRoute.Namespace, tcpRoute.Name, fmt.Sprintf("%d", ruleIndex), "TCP") + streamRoute.Name = streamRouteName + streamRoute.ID = id.GenID(streamRouteName) + streamRoute.Labels = labels + // TODO: support remote_addr, server_addr, sni, server_port + service.StreamRoutes = append(service.StreamRoutes, streamRoute) + result.Services = append(result.Services, service) + } + return result, nil +} diff --git a/internal/adc/translator/tlsroute.go b/internal/adc/translator/tlsroute.go new file mode 100644 index 000000000..b1eb5fa0b --- /dev/null +++ b/internal/adc/translator/tlsroute.go @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package translator + +import ( + "fmt" + + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + + adctypes "github.com/apache/apisix-ingress-controller/api/adc" + apiv2 "github.com/apache/apisix-ingress-controller/api/v2" + "github.com/apache/apisix-ingress-controller/internal/controller/label" + "github.com/apache/apisix-ingress-controller/internal/id" + "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/types" +) + +func (t *Translator) TranslateTLSRoute(tctx *provider.TranslateContext, tlsRoute *gatewayv1alpha2.TLSRoute) (*TranslateResult, error) { + result := &TranslateResult{} + rules := tlsRoute.Spec.Rules + labels := label.GenLabel(tlsRoute) + hosts := make([]string, 0, len(tlsRoute.Spec.Hostnames)) + for _, hostname := range tlsRoute.Spec.Hostnames { + hosts = append(hosts, string(hostname)) + } + for ruleIndex, rule := range rules { + service := adctypes.NewDefaultService() + service.Labels = labels + service.Name = adctypes.ComposeServiceNameWithStream(tlsRoute.Namespace, tlsRoute.Name, fmt.Sprintf("%d", ruleIndex), "TLS") + service.ID = id.GenID(service.Name) + var ( + upstreams = make([]*adctypes.Upstream, 0) + weightedUpstreams = make([]adctypes.TrafficSplitConfigRuleWeightedUpstream, 0) + ) + for _, backend := range rule.BackendRefs { + if backend.Namespace == nil { + namespace := gatewayv1.Namespace(tlsRoute.Namespace) + backend.Namespace = &namespace + } + upstream := newDefaultUpstreamWithoutScheme() + upNodes, _, err := t.translateBackendRef(tctx, backend, DefaultEndpointFilter) + if err != nil { + continue + } + if len(upNodes) == 0 { + continue + } + // TODO: Confirm BackendTrafficPolicy attachment with e2e test case. + t.AttachBackendTrafficPolicyToUpstream(backend, tctx.BackendTrafficPolicies, upstream) + upstream.Nodes = upNodes + var ( + kind string + port int32 + ) + if backend.Kind == nil { + kind = types.KindService + } else { + kind = string(*backend.Kind) + } + if backend.Port != nil { + port = int32(*backend.Port) + } + namespace := string(*backend.Namespace) + name := string(backend.Name) + upstreamName := adctypes.ComposeUpstreamNameForBackendRef(kind, namespace, name, port) + upstream.Name = upstreamName + upstream.ID = id.GenID(upstreamName) + upstreams = append(upstreams, upstream) + } + + // Handle multiple backends with traffic-split plugin + if len(upstreams) == 0 { + // Create a default upstream if no valid backends + upstream := adctypes.NewDefaultUpstream() + service.Upstream = upstream + } else if len(upstreams) == 1 { + // Single backend - use directly as service upstream + service.Upstream = upstreams[0] + // remove the id and name of the service.upstream, adc schema does not need id and name for it + service.Upstream.ID = "" + service.Upstream.Name = "" + } else { + // Multiple backends - use traffic-split plugin + service.Upstream = upstreams[0] + // remove the id and name of the service.upstream, adc schema does not need id and name for it + service.Upstream.ID = "" + service.Upstream.Name = "" + + upstreams = upstreams[1:] + + if len(upstreams) > 0 { + service.Upstreams = upstreams + } + + // Set weight in traffic-split for the default upstream + weight := apiv2.DefaultWeight + if rule.BackendRefs[0].Weight != nil { + weight = int(*rule.BackendRefs[0].Weight) + } + weightedUpstreams = append(weightedUpstreams, adctypes.TrafficSplitConfigRuleWeightedUpstream{ + Weight: weight, + }) + + // Set other upstreams in traffic-split using upstream_id + for i, upstream := range upstreams { + weight := apiv2.DefaultWeight + // get weight from the backend refs starting from the second backend + if i+1 < len(rule.BackendRefs) && rule.BackendRefs[i+1].Weight != nil { + weight = int(*rule.BackendRefs[i+1].Weight) + } + weightedUpstreams = append(weightedUpstreams, adctypes.TrafficSplitConfigRuleWeightedUpstream{ + UpstreamID: upstream.ID, + Weight: weight, + }) + } + + if len(weightedUpstreams) > 0 { + if service.Plugins == nil { + service.Plugins = make(map[string]any) + } + service.Plugins["traffic-split"] = &adctypes.TrafficSplitConfig{ + Rules: []adctypes.TrafficSplitConfigRule{ + { + WeightedUpstreams: weightedUpstreams, + }, + }, + } + } + } + + for _, host := range hosts { + streamRoute := adctypes.NewDefaultStreamRoute() + streamRouteName := adctypes.ComposeStreamRouteName(tlsRoute.Namespace, tlsRoute.Name, fmt.Sprintf("%d", ruleIndex), "TLS") + streamRoute.Name = streamRouteName + streamRoute.ID = id.GenID(streamRouteName) + streamRoute.SNI = host + streamRoute.Labels = labels + service.StreamRoutes = append(service.StreamRoutes, streamRoute) + } + result.Services = append(result.Services, service) + } + return result, nil +} diff --git a/internal/adc/translator/udproute.go b/internal/adc/translator/udproute.go new file mode 100644 index 000000000..5cc09a109 --- /dev/null +++ b/internal/adc/translator/udproute.go @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package translator + +import ( + "fmt" + + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + + adctypes "github.com/apache/apisix-ingress-controller/api/adc" + apiv2 "github.com/apache/apisix-ingress-controller/api/v2" + "github.com/apache/apisix-ingress-controller/internal/controller/label" + "github.com/apache/apisix-ingress-controller/internal/id" + "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/types" +) + +func (t *Translator) TranslateUDPRoute(tctx *provider.TranslateContext, udpRoute *gatewayv1alpha2.UDPRoute) (*TranslateResult, error) { + result := &TranslateResult{} + rules := udpRoute.Spec.Rules + labels := label.GenLabel(udpRoute) + for ruleIndex, rule := range rules { + service := adctypes.NewDefaultService() + service.Labels = labels + service.Name = adctypes.ComposeServiceNameWithStream(udpRoute.Namespace, udpRoute.Name, fmt.Sprintf("%d", ruleIndex), "UDP") + service.ID = id.GenID(service.Name) + var ( + upstreams = make([]*adctypes.Upstream, 0) + weightedUpstreams = make([]adctypes.TrafficSplitConfigRuleWeightedUpstream, 0) + ) + for _, backend := range rule.BackendRefs { + if backend.Namespace == nil { + namespace := gatewayv1.Namespace(udpRoute.Namespace) + backend.Namespace = &namespace + } + upstream := newDefaultUpstreamWithoutScheme() + upNodes, _, err := t.translateBackendRef(tctx, backend, DefaultEndpointFilter) + if err != nil { + continue + } + if len(upNodes) == 0 { + continue + } + // TODO: Confirm BackendTrafficPolicy attachment with e2e test case. + t.AttachBackendTrafficPolicyToUpstream(backend, tctx.BackendTrafficPolicies, upstream) + upstream.Nodes = upNodes + var ( + kind string + port int32 + ) + if backend.Kind == nil { + kind = types.KindService + } else { + kind = string(*backend.Kind) + } + if backend.Port != nil { + port = int32(*backend.Port) + } + namespace := string(*backend.Namespace) + name := string(backend.Name) + upstreamName := adctypes.ComposeUpstreamNameForBackendRef(kind, namespace, name, port) + upstream.Name = upstreamName + upstream.ID = id.GenID(upstreamName) + upstreams = append(upstreams, upstream) + } + + // Handle multiple backends with traffic-split plugin + if len(upstreams) == 0 { + // Create a default upstream if no valid backends + upstream := adctypes.NewDefaultUpstream() + service.Upstream = upstream + } else if len(upstreams) == 1 { + // Single backend - use directly as service upstream + service.Upstream = upstreams[0] + // remove the id and name of the service.upstream, adc schema does not need id and name for it + service.Upstream.ID = "" + service.Upstream.Name = "" + } else { + // Multiple backends - use traffic-split plugin + service.Upstream = upstreams[0] + // remove the id and name of the service.upstream, adc schema does not need id and name for it + service.Upstream.ID = "" + service.Upstream.Name = "" + + upstreams = upstreams[1:] + + if len(upstreams) > 0 { + service.Upstreams = upstreams + } + + // Set weight in traffic-split for the default upstream + weight := apiv2.DefaultWeight + if rule.BackendRefs[0].Weight != nil { + weight = int(*rule.BackendRefs[0].Weight) + } + weightedUpstreams = append(weightedUpstreams, adctypes.TrafficSplitConfigRuleWeightedUpstream{ + Weight: weight, + }) + + // Set other upstreams in traffic-split using upstream_id + for i, upstream := range upstreams { + weight := apiv2.DefaultWeight + // get weight from the backend refs starting from the second backend + if i+1 < len(rule.BackendRefs) && rule.BackendRefs[i+1].Weight != nil { + weight = int(*rule.BackendRefs[i+1].Weight) + } + weightedUpstreams = append(weightedUpstreams, adctypes.TrafficSplitConfigRuleWeightedUpstream{ + UpstreamID: upstream.ID, + Weight: weight, + }) + } + + if len(weightedUpstreams) > 0 { + if service.Plugins == nil { + service.Plugins = make(map[string]any) + } + service.Plugins["traffic-split"] = &adctypes.TrafficSplitConfig{ + Rules: []adctypes.TrafficSplitConfigRule{ + { + WeightedUpstreams: weightedUpstreams, + }, + }, + } + } + } + streamRoute := adctypes.NewDefaultStreamRoute() + streamRouteName := adctypes.ComposeStreamRouteName(udpRoute.Namespace, udpRoute.Name, fmt.Sprintf("%d", ruleIndex), "UDP") + streamRoute.Name = streamRouteName + streamRoute.ID = id.GenID(streamRouteName) + streamRoute.Labels = labels + // TODO: support remote_addr, server_addr, sni, server_port + service.StreamRoutes = append(service.StreamRoutes, streamRoute) + result.Services = append(result.Services, service) + } + return result, nil +} diff --git a/internal/types/k8s.go b/internal/types/k8s.go index 047fd2a8e..4084ac9a8 100644 --- a/internal/types/k8s.go +++ b/internal/types/k8s.go @@ -56,6 +56,13 @@ const ( KindApisixUpstream = "ApisixUpstream" ) +const ( + AppProtocolHTTP = "http" + AppProtocolHTTPS = "https" + AppProtocolWS = "kubernetes.io/ws" + AppProtocolWSS = "kubernetes.io/wss" +) + func KindOf(obj any) string { switch obj.(type) { case *gatewayv1.Gateway: diff --git a/test/e2e/crds/v2/route.go b/test/e2e/crds/v2/route.go index 349bc0490..e7c5b1a52 100644 --- a/test/e2e/crds/v2/route.go +++ b/test/e2e/crds/v2/route.go @@ -19,6 +19,7 @@ package v2 import ( "context" + "crypto/tls" "fmt" "io" "math" @@ -2141,6 +2142,7 @@ spec: }) }) +<<<<<<< HEAD Context("Test ApisixRoute with multiple backends", func() { It("create ApisixRoute with multiple backends", func() { var httpService = ` @@ -2202,13 +2204,154 @@ spec: s.RequestAssert(&scaffold.RequestAssert{ Method: "GET", Path: "/get", +======= + Context("Test Services With AppProtocol", func() { + const apisixRoute = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: nginx +spec: + ingressClassName: %s + http: + - name: rule0 + match: + hosts: + - nginx.example + paths: + - /v1 + backends: + - serviceName: nginx + servicePort: 443 +` + const apisixRouteWithGranularityService = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: nginx-v2 +spec: + ingressClassName: %s + http: + - name: rule0 + match: + hosts: + - nginx.example + paths: + - /v2 + backends: + - serviceName: nginx + servicePort: 443 + resolveGranularity: service +` + const apisixRouteWithBackendWSS = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: default +spec: + ingressClassName: %s + http: + - name: rule0 + match: + hosts: + - api6.com + paths: + - /ws + backends: + - serviceName: nginx + servicePort: 8443 + resolveGranularity: service +` + + const apisixTlsSpec = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixTls +metadata: + name: test-tls +spec: + ingressClassName: %s + hosts: + - api6.com + secret: + name: test-tls-secret + namespace: %s +` + BeforeEach(func() { + s.DeployNginx(framework.NginxOptions{ + Namespace: s.Namespace(), + Replicas: ptr.To(int32(1)), + }) + }) + + It("HTTPS Backend", func() { + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "nginx"}, + new(apiv2.ApisixRoute), fmt.Sprintf(apisixRoute, s.Namespace())) + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "nginx-v2"}, + new(apiv2.ApisixRoute), fmt.Sprintf(apisixRouteWithGranularityService, s.Namespace())) + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/v1", + Host: "nginx.example", +>>>>>>> f28b3429 (feat: support resolve svc.ports[].appProtocol (#2601)) Check: scaffold.WithExpectedStatus(http.StatusOK), }) s.RequestAssert(&scaffold.RequestAssert{ Method: "GET", +<<<<<<< HEAD Path: "/ip", Check: scaffold.WithExpectedStatus(http.StatusOK), }) }) +======= + Path: "/v2", + Host: "nginx.example", + Check: scaffold.WithExpectedStatus(http.StatusOK), + }) + }) + + It("WSS Backend", func() { + err := s.NewKubeTlsSecret("test-tls-secret", Cert, Key) + Expect(err).NotTo(HaveOccurred(), "creating TLS secret") + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "test-tls"}, + &apiv2.ApisixTls{}, fmt.Sprintf(apisixTlsSpec, s.Namespace(), s.Namespace())) + + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, + new(apiv2.ApisixRoute), fmt.Sprintf(apisixRouteWithBackendWSS, s.Namespace())) + time.Sleep(6 * time.Second) + + By("verify wss connection") + u := url.URL{ + Scheme: "wss", + Host: s.GetAPISIXHTTPSEndpoint(), + Path: "/ws", + } + headers := http.Header{"Host": []string{"api6.com"}} + dialer := websocket.Dialer{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + ServerName: "api6.com", + }, + } + + conn, resp, err := dialer.Dial(u.String(), headers) + Expect(err).ShouldNot(HaveOccurred(), "WebSocket handshake") + Expect(resp.StatusCode).Should(Equal(http.StatusSwitchingProtocols)) + + defer func() { + _ = conn.Close() + }() + + By("send and receive message through WebSocket") + testMessage := "hello, this is APISIX" + err = conn.WriteMessage(websocket.TextMessage, []byte(testMessage)) + Expect(err).ShouldNot(HaveOccurred(), "writing WebSocket message") + + // Then our echo + _, msg, err := conn.ReadMessage() + Expect(err).ShouldNot(HaveOccurred(), "reading echo message") + Expect(string(msg)).To(Equal(testMessage), "message content verification") + }) +>>>>>>> f28b3429 (feat: support resolve svc.ports[].appProtocol (#2601)) }) }) diff --git a/test/e2e/framework/manifests/nginx.yaml b/test/e2e/framework/manifests/nginx.yaml index 7fb93f08e..654fce67a 100644 --- a/test/e2e/framework/manifests/nginx.yaml +++ b/test/e2e/framework/manifests/nginx.yaml @@ -35,6 +35,39 @@ data: location / { return 200 'Hello, World!'; } + + location /ws { + content_by_lua_block { + local server = require "resty.websocket.server" + local wb, err = server:new { + timeout = 5000, -- 5 seconds timeout + max_payload_len = 65535 -- max message length + } + if not wb then + ngx.log(ngx.ERR, "failed to create websocket: ", err) + return ngx.exit(444) + end + + while true do + local data, typ, err = wb:recv_frame() + if wb.fatal then + ngx.log(ngx.ERR, "failed to receive frame: ", err) + break + end + + if typ == "close" then + wb:send_close() + break + elseif typ == "text" then + wb:send_text(data) -- echo text + elseif typ == "binary" then + wb:send_binary(data) -- echo binary + elseif typ == "ping" then + wb:send_pong() + end + end + } + } } } @@ -67,7 +100,7 @@ spec: path: /healthz port: 80 timeoutSeconds: 2 - image: "nginx:1.21.4" + image: "openresty/openresty:1.27.1.2-4-bullseye-fat" imagePullPolicy: IfNotPresent name: nginx ports: @@ -75,7 +108,7 @@ spec: name: "http" protocol: "TCP" volumeMounts: - - mountPath: /etc/nginx/nginx.conf + - mountPath: /usr/local/openresty/nginx/conf/nginx.conf name: nginx-config subPath: nginx.conf --- @@ -91,4 +124,40 @@ spec: port: 80 protocol: TCP targetPort: 80 +<<<<<<< HEAD +======= + - name: https + port: 443 + protocol: TCP + targetPort: 443 + appProtocol: https + - name: ws + port: 8080 + protocol: TCP + targetPort: 80 + appProtocol: kubernetes.io/ws + - name: wss + port: 8443 + protocol: TCP + targetPort: 443 + appProtocol: kubernetes.io/wss + type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + name: nginx2 +spec: + selector: + app: nginx + ports: + - name: http + port: 80 + protocol: TCP + targetPort: 80 + - name: https + port: 443 + protocol: TCP + targetPort: 443 +>>>>>>> f28b3429 (feat: support resolve svc.ports[].appProtocol (#2601)) type: ClusterIP diff --git a/test/e2e/gatewayapi/httproute.go b/test/e2e/gatewayapi/httproute.go index 5b4a262a8..35f7c2bea 100644 --- a/test/e2e/gatewayapi/httproute.go +++ b/test/e2e/gatewayapi/httproute.go @@ -19,11 +19,14 @@ package gatewayapi import ( "context" + "crypto/tls" "fmt" "net/http" + "net/url" "strings" "time" + "github.com/gorilla/websocket" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/pkg/errors" @@ -2439,4 +2442,104 @@ spec: }) }) + + Context("Test Service With AppProtocol", func() { + var ( + httproute = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: nginx +spec: + parentRefs: + - name: %s + hostnames: + - api6.com + rules: + - matches: + - path: + type: Exact + value: /get + backendRefs: + - name: nginx + port: 443 + ` + httprouteWithWSS = ` +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: nginx-wss +spec: + parentRefs: + - name: %s + hostnames: + - api6.com + rules: + - matches: + - path: + type: Exact + value: /ws + backendRefs: + - name: nginx + port: 8443 + ` + ) + + BeforeEach(func() { + beforeEachHTTPS() + s.DeployNginx(framework.NginxOptions{ + Namespace: s.Namespace(), + Replicas: ptr.To(int32(1)), + }) + }) + It("HTTPS backend", func() { + s.ResourceApplied("HTTPRoute", "nginx", fmt.Sprintf(httproute, s.Namespace()), 1) + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "api6.com", + Check: scaffold.WithExpectedStatus(http.StatusOK), + }) + }) + + It("WSS backend", func() { + s.ResourceApplied("HTTPRoute", "nginx-wss", fmt.Sprintf(httprouteWithWSS, s.Namespace()), 1) + time.Sleep(6 * time.Second) + + By("verify wss connection") + u := url.URL{ + Scheme: "wss", + Host: s.GetAPISIXHTTPSEndpoint(), + Path: "/ws", + } + headers := http.Header{"Host": []string{"api6.com"}} + + hostname := "api6.com" + + dialer := websocket.Dialer{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + ServerName: hostname, + }, + } + + conn, resp, err := dialer.Dial(u.String(), headers) + Expect(err).ShouldNot(HaveOccurred(), "WebSocket handshake") + Expect(resp.StatusCode).Should(Equal(http.StatusSwitchingProtocols)) + + defer func() { + _ = conn.Close() + }() + + By("send and receive message through WebSocket") + testMessage := "hello, this is APISIX" + err = conn.WriteMessage(websocket.TextMessage, []byte(testMessage)) + Expect(err).ShouldNot(HaveOccurred(), "writing WebSocket message") + + // Then our echo + _, msg, err := conn.ReadMessage() + Expect(err).ShouldNot(HaveOccurred(), "reading echo message") + Expect(string(msg)).To(Equal(testMessage), "message content verification") + }) + }) }) diff --git a/test/e2e/ingress/ingress.go b/test/e2e/ingress/ingress.go index b61e31096..1e3e3a502 100644 --- a/test/e2e/ingress/ingress.go +++ b/test/e2e/ingress/ingress.go @@ -19,16 +19,20 @@ package ingress import ( "context" + "crypto/tls" "encoding/base64" "fmt" "net/http" + "net/url" "strings" "time" + "github.com/gorilla/websocket" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" "github.com/apache/apisix-ingress-controller/api/v1alpha1" "github.com/apache/apisix-ingress-controller/test/e2e/framework" @@ -929,6 +933,129 @@ spec: }) }) + Context("Test Services With AppProtocol", func() { + var ingressClass = ` +apiVersion: networking.k8s.io/v1 +kind: IngressClass +metadata: + name: %s +spec: + controller: "%s" + parameters: + apiGroup: "apisix.apache.org" + kind: "GatewayProxy" + name: "apisix-proxy-config" + namespace: "%s" + scope: "Namespace" +` + var ingress = ` +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: apisix-ingress-tls +spec: + ingressClassName: %s + rules: + - host: nginx.example + http: + paths: + - path: /get + pathType: Exact + backend: + service: + name: nginx + port: + number: 443 +` + var ingressWithWSS = ` +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: apisix-ingress-wss +spec: + ingressClassName: %s + tls: + - hosts: + - api6.com + secretName: test-ingress-tls + rules: + - host: api6.com + http: + paths: + - path: /ws + pathType: Exact + backend: + service: + name: nginx + port: + number: 8443 +` + BeforeEach(func() { + s.DeployNginx(framework.NginxOptions{ + Namespace: s.Namespace(), + Replicas: ptr.To(int32(1)), + }) + By("create GatewayProxy") + Expect(s.CreateResourceFromString(s.GetGatewayProxySpec())).NotTo(HaveOccurred(), "creating GatewayProxy") + + By("create IngressClass") + err := s.CreateResourceFromStringWithNamespace(fmt.Sprintf(ingressClass, s.Namespace(), s.GetControllerName(), s.Namespace()), s.Namespace()) + Expect(err).NotTo(HaveOccurred(), "creating IngressClass") + time.Sleep(5 * time.Second) + }) + + It("Ingress With HTTPS Backend", func() { + By("create Ingress") + Expect(s.CreateResourceFromString(fmt.Sprintf(ingress, s.Namespace()))).ShouldNot(HaveOccurred(), "creating Ingress") + + s.RequestAssert(&scaffold.RequestAssert{ + Method: "GET", + Path: "/get", + Host: "nginx.example", + Check: scaffold.WithExpectedStatus(http.StatusOK), + }) + }) + + It("Ingress With WSS Backend", func() { + createSecret(s, _secretName) + By("create Ingress") + Expect(s.CreateResourceFromString(fmt.Sprintf(ingressWithWSS, s.Namespace()))).ShouldNot(HaveOccurred(), "creating Ingress") + time.Sleep(6 * time.Second) + + By("verify wss connection") + u := url.URL{ + Scheme: "wss", + Host: s.GetAPISIXHTTPSEndpoint(), + Path: "/ws", + } + headers := http.Header{"Host": []string{"api6.com"}} + dialer := websocket.Dialer{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + ServerName: "api6.com", + }, + } + + conn, resp, err := dialer.Dial(u.String(), headers) + Expect(err).ShouldNot(HaveOccurred(), "WebSocket handshake") + Expect(resp.StatusCode).Should(Equal(http.StatusSwitchingProtocols)) + + defer func() { + _ = conn.Close() + }() + + By("send and receive message through WebSocket") + testMessage := "hello, this is APISIX" + err = conn.WriteMessage(websocket.TextMessage, []byte(testMessage)) + Expect(err).ShouldNot(HaveOccurred(), "writing WebSocket message") + + // Then our echo + _, msg, err := conn.ReadMessage() + Expect(err).ShouldNot(HaveOccurred(), "reading echo message") + Expect(string(msg)).To(Equal(testMessage), "message content verification") + }) + }) + PContext("GatewayProxy reference Secret", func() { const secretSpec = ` apiVersion: v1