diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index 76dc703e1e5..634a3927f16 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -79,7 +79,8 @@ func init() { var ourTypes = map[schema.GroupVersionKind]resourcesemantics.GenericCRD{ // For group eventing.knative.dev. // v1alpha1 - eventingv1alpha1.SchemeGroupVersion.WithKind("EventPolicy"): &eventingv1alpha1.EventPolicy{}, + eventingv1alpha1.SchemeGroupVersion.WithKind("EventPolicy"): &eventingv1alpha1.EventPolicy{}, + eventingv1alpha1.SchemeGroupVersion.WithKind("EventTransform"): &eventingv1alpha1.EventTransform{}, // v1beta1 eventingv1beta1.SchemeGroupVersion.WithKind("EventType"): &eventingv1beta1.EventType{}, // v1beta2 diff --git a/config/core/deployments/controller.yaml b/config/core/deployments/controller.yaml index 77ff138b53c..efb5cab5432 100644 --- a/config/core/deployments/controller.yaml +++ b/config/core/deployments/controller.yaml @@ -77,6 +77,9 @@ spec: fieldRef: fieldPath: metadata.name + - name: EVENT_TRANSFORM_JSONATA_IMAGE + value: quay.io/pierdipi/jsonata-transform@sha256:6aafed0012b8a3c24a1cd72fc4521981ec7680cf64988c22ded0aafa2a431c93 + ## Adapter settings # - name: K_LOGGING_CONFIG diff --git a/docs/eventing-api.md b/docs/eventing-api.md index 8e0861534c8..8f01fce7573 100644 --- a/docs/eventing-api.md +++ b/docs/eventing-api.md @@ -2858,7 +2858,7 @@ back to the broker.

-Transformations
+EventTransformations
EventTransformations @@ -2867,9 +2867,9 @@ EventTransformations

-(Members of Transformations are embedded into this type.) +(Members of EventTransformations are embedded into this type.)

-

Transformations contain all possible transformations, only one “type” can be used.

+

EventTransformations contain all possible transformations, only one “type” can be used.

@@ -3421,7 +3421,7 @@ back to the broker.

-Transformations
+EventTransformations
EventTransformations @@ -3430,9 +3430,9 @@ EventTransformations

-(Members of Transformations are embedded into this type.) +(Members of EventTransformations are embedded into this type.)

-

Transformations contain all possible transformations, only one “type” can be used.

+

EventTransformations contain all possible transformations, only one “type” can be used.

diff --git a/pkg/apis/eventing/v1alpha1/eventtransform_defaults.go b/pkg/apis/eventing/v1alpha1/eventtransform_defaults.go new file mode 100644 index 00000000000..d061fbe60ac --- /dev/null +++ b/pkg/apis/eventing/v1alpha1/eventtransform_defaults.go @@ -0,0 +1,24 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" +) + +func (t *EventTransform) SetDefaults(_ context.Context) { +} diff --git a/pkg/apis/eventing/v1alpha1/eventtransform_lifecycle.go b/pkg/apis/eventing/v1alpha1/eventtransform_lifecycle.go index bb57a481fb9..05134cf3bc4 100644 --- a/pkg/apis/eventing/v1alpha1/eventtransform_lifecycle.go +++ b/pkg/apis/eventing/v1alpha1/eventtransform_lifecycle.go @@ -28,7 +28,8 @@ const ( TransformConditionAddressable apis.ConditionType = "Addressable" TransformationConditionReady apis.ConditionType = "TransformationReady" - TransformationAddressableEmptyURL string = "NoURL" + TransformationAddressableEmptyURL string = "NoURL" + TransformationAddressableWaitingForServiceEndpoints string = "WaitingForServiceEndpoints" // Specific transformations conditions @@ -159,6 +160,10 @@ func (ts *EventTransformStatus) propagateTransformationConditionStatus(cond *api } } +func (ts *EventTransformStatus) MarkWaitingForServiceEndpoints() { + ts.GetConditionSet().Manage(ts).MarkFalse(TransformConditionAddressable, TransformationAddressableWaitingForServiceEndpoints, "URL is empty") +} + func (ts *EventTransformStatus) SetAddresses(addresses ...duckv1.Addressable) { if len(addresses) == 0 || addresses[0].URL.IsEmpty() { ts.GetConditionSet().Manage(ts).MarkFalse(TransformConditionAddressable, TransformationAddressableEmptyURL, "URL is empty") diff --git a/pkg/apis/eventing/v1alpha1/eventtransform_lifecycle_test.go b/pkg/apis/eventing/v1alpha1/eventtransform_lifecycle_test.go index 3b5e4983b9d..df7a537318f 100644 --- a/pkg/apis/eventing/v1alpha1/eventtransform_lifecycle_test.go +++ b/pkg/apis/eventing/v1alpha1/eventtransform_lifecycle_test.go @@ -34,7 +34,7 @@ func TestFullLifecycle(t *testing.T) { ObjectMeta: metav1.ObjectMeta{}, Spec: EventTransformSpec{ Sink: nil, - Transformations: EventTransformations{ + EventTransformations: EventTransformations{ Jsonata: &JsonataEventTransformationSpec{ Expression: ` { diff --git a/pkg/apis/eventing/v1alpha1/eventtransform_types.go b/pkg/apis/eventing/v1alpha1/eventtransform_types.go index 8106a0261b8..a7a5d13e46f 100644 --- a/pkg/apis/eventing/v1alpha1/eventtransform_types.go +++ b/pkg/apis/eventing/v1alpha1/eventtransform_types.go @@ -49,6 +49,7 @@ type EventTransform struct { var ( // Check that EventTransform can be validated, can be defaulted, and has immutable fields. _ apis.Validatable = (*EventTransform)(nil) + _ apis.Defaultable = (*EventTransform)(nil) // Check that EventTransform can return its spec untyped. _ apis.HasSpec = (*EventTransform)(nil) @@ -73,8 +74,8 @@ type EventTransformSpec struct { // +optional Sink *duckv1.Destination `json:"sink,omitempty"` - // Transformations contain all possible transformations, only one "type" can be used. - Transformations EventTransformations `json:",inline"` + // EventTransformations contain all possible transformations, only one "type" can be used. + EventTransformations `json:",inline"` } type EventTransformations struct { diff --git a/pkg/apis/eventing/v1alpha1/eventtransform_validation.go b/pkg/apis/eventing/v1alpha1/eventtransform_validation.go index 1dbc47a58cf..7954a42560e 100644 --- a/pkg/apis/eventing/v1alpha1/eventtransform_validation.go +++ b/pkg/apis/eventing/v1alpha1/eventtransform_validation.go @@ -36,7 +36,7 @@ func (ts *EventTransformSpec) Validate(ctx context.Context) *apis.FieldError { // These are transformations field paths. transformations := make([]string, 0, 2) - if ts.Transformations.Jsonata != nil { + if ts.EventTransformations.Jsonata != nil { transformations = append(transformations, "jsonata") } @@ -46,7 +46,7 @@ func (ts *EventTransformSpec) Validate(ctx context.Context) *apis.FieldError { errs = errs.Also(apis.ErrMultipleOneOf(transformations...)) } - errs = errs.Also(ts.Transformations.Jsonata.Validate(ctx).ViaField("jsonata")) + errs = errs.Also(ts.EventTransformations.Jsonata.Validate(ctx).ViaField("jsonata")) errs = errs.Also(ts.Sink.Validate(ctx).ViaField("sink")) if apis.IsInUpdate(ctx) { @@ -71,7 +71,7 @@ func (in *EventTransformSpec) CheckImmutableFields(_ context.Context, original * var errs *apis.FieldError - if original.Spec.Transformations.Jsonata != nil && in.Transformations.Jsonata == nil { + if original.Spec.EventTransformations.Jsonata != nil && in.EventTransformations.Jsonata == nil { errs = errs.Also(apis.ErrGeneric("transformations types are immutable, jsonata transformation cannot be changed to a different transformation type")) } diff --git a/pkg/apis/eventing/v1alpha1/eventtransform_validation_test.go b/pkg/apis/eventing/v1alpha1/eventtransform_validation_test.go new file mode 100644 index 00000000000..754ac87c684 --- /dev/null +++ b/pkg/apis/eventing/v1alpha1/eventtransform_validation_test.go @@ -0,0 +1,46 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "knative.dev/pkg/webhook/json" +) + +func TestJSONDecode(t *testing.T) { + + et := &EventTransform{} + + err := json.Decode([]byte(` +{ + "apiVersion": "eventing.knative.dev/v1alpha1", + "kind": "EventTransform", + "metadata": { + "name": "identity" + }, + "spec": { + "jsonata": { + "expression": "{\n \"specversion\": \"1.0\",\n \"id\": id,\n \"type\": \"transformation.jsonata\",\n \"source\": \"transformation.json.identity\",\n \"data\": $\n}\n" + } + } +} +`), et, true) + + assert.Nil(t, err) +} diff --git a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go index 23e5f8a1348..3df74897a33 100644 --- a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go @@ -328,7 +328,7 @@ func (in *EventTransformSpec) DeepCopyInto(out *EventTransformSpec) { *out = new(duckv1.Destination) (*in).DeepCopyInto(*out) } - in.Transformations.DeepCopyInto(&out.Transformations) + in.EventTransformations.DeepCopyInto(&out.EventTransformations) return } diff --git a/pkg/reconciler/eventtransform/controller.go b/pkg/reconciler/eventtransform/controller.go index b2cb16612f3..12750a24f32 100644 --- a/pkg/reconciler/eventtransform/controller.go +++ b/pkg/reconciler/eventtransform/controller.go @@ -19,10 +19,13 @@ package eventtransform import ( "context" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" kubeclient "knative.dev/pkg/client/injection/kube/client" deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/filtered" configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered" + serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service/filtered" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/kmeta" @@ -48,6 +51,17 @@ func NewController( jsonataConfigMapInformer := configmapinformer.Get(ctx, JsonataResourcesSelector) jsonataDeploymentInformer := deploymentinformer.Get(ctx, JsonataResourcesSelector) jsonataSinkBindingInformer := sinkbindinginformer.Get(ctx, JsonataResourcesSelector) + jsonataServiceInformer := serviceinformer.Get(ctx, JsonataResourcesSelector) + + // Create a custom informer as one in knative/pkg doesn't exist for endpoints. + factory := informers.NewSharedInformerFactoryWithOptions( + kubeclient.Get(ctx), + controller.DefaultResyncPeriod, + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.LabelSelector = JsonataResourcesSelector + }), + ) + jsonataEndpointInformer := factory.Core().V1().Endpoints() var globalResync func() @@ -63,6 +77,8 @@ func NewController( client: eventingclient.Get(ctx), jsonataConfigMapLister: jsonataConfigMapInformer.Lister(), jsonataDeploymentsLister: jsonataDeploymentInformer.Lister(), + jsonataServiceLister: jsonataServiceInformer.Lister(), + jsonataEndpointLister: jsonataEndpointInformer.Lister(), jsonataSinkBindingLister: jsonataSinkBindingInformer.Lister(), } @@ -79,9 +95,15 @@ func NewController( eventTransformInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) jsonataDeploymentInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl))) + jsonataServiceInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl))) + jsonataEndpointInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl))) jsonataConfigMapInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl))) jsonataSinkBindingInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl))) + // Start the factory after creating all necessary informers. + factory.Start(ctx.Done()) + factory.WaitForCacheSync(ctx.Done()) + return impl } diff --git a/pkg/reconciler/eventtransform/eventtransform.go b/pkg/reconciler/eventtransform/eventtransform.go index 2500cdd88c4..467a9f4a626 100644 --- a/pkg/reconciler/eventtransform/eventtransform.go +++ b/pkg/reconciler/eventtransform/eventtransform.go @@ -28,7 +28,12 @@ import ( "k8s.io/client-go/kubernetes" appslister "k8s.io/client-go/listers/apps/v1" corelister "k8s.io/client-go/listers/core/v1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/controller" + "knative.dev/pkg/logging" + "knative.dev/pkg/network" + "knative.dev/pkg/ptr" "knative.dev/pkg/reconciler" eventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1" @@ -43,6 +48,8 @@ type Reconciler struct { jsonataConfigMapLister corelister.ConfigMapLister jsonataDeploymentsLister appslister.DeploymentLister + jsonataServiceLister corelister.ServiceLister + jsonataEndpointLister corelister.EndpointsLister jsonataSinkBindingLister sourceslisters.SinkBindingLister } @@ -54,21 +61,39 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, transform *eventing.Even } func (r *Reconciler) reconcileJsonataTransformation(ctx context.Context, transform *eventing.EventTransform) error { - if transform.Spec.Transformations.Jsonata == nil { + logger := logging.FromContext(ctx) + + if transform.Spec.EventTransformations.Jsonata == nil { + logger.Debug("No Jsonata transformation found") return nil } + logger.Debugw("Reconciling Jsonata transformation ConfigMap") expressionCm, err := r.reconcileJsonataTransformationConfigMap(ctx, transform) if err != nil { return fmt.Errorf("failed to reconcile Jsonata transformation deployment: %w", err) } + + logger.Debugw("Reconciling Jsonata transformation Service") + if err := r.reconcileJsonataTransformationService(ctx, transform); err != nil { + return fmt.Errorf("failed to reconcile Jsonata transformation deployment: %w", err) + } + + logger.Debugw("Reconciling Jsonata transformation Deployment") if err := r.reconcileJsonataTransformationDeployment(ctx, expressionCm, transform); err != nil { return fmt.Errorf("failed to reconcile Jsonata transformation deployment: %w", err) } + + logger.Debugw("Reconciling Jsonata transformation SinkBinding") if err := r.reconcileJsonataTransformationSinkBinding(ctx, transform); err != nil { return fmt.Errorf("failed to reconcile Jsonata transformation sink binding: %w", err) } + logger.Debugw("Reconciling Jsonata transformation address") + if err := r.reconcileJsonataTransformationAddress(ctx, transform); err != nil { + return fmt.Errorf("failed to reconcile Jsonata transformation address: %w", err) + } + return nil } @@ -82,13 +107,42 @@ func (r *Reconciler) reconcileJsonataTransformationConfigMap(ctx context.Context if err != nil { return nil, fmt.Errorf("failed to get configmap %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) } - if equality.Semantic.DeepDerivative(expected, curr) { + if equality.Semantic.DeepDerivative(expected.Data, curr.Data) && + equality.Semantic.DeepDerivative(expected.Labels, curr.Labels) && + equality.Semantic.DeepDerivative(expected.Annotations, curr.Annotations) { return curr, nil } expected.ResourceVersion = curr.ResourceVersion return r.updateConfigMap(ctx, transform, expected) } +func (r *Reconciler) reconcileJsonataTransformationService(ctx context.Context, transform *eventing.EventTransform) error { + expected := jsonataService(ctx, transform) + + curr, err := r.jsonataServiceLister.Services(expected.GetNamespace()).Get(expected.GetName()) + if apierrors.IsNotFound(err) { + _, err := r.createService(ctx, transform, expected) + if err != nil { + return err + } + return nil + } + if err != nil { + return fmt.Errorf("failed to get service %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + if equality.Semantic.DeepDerivative(expected.Spec, curr.Spec) && + equality.Semantic.DeepDerivative(expected.Labels, curr.Labels) && + equality.Semantic.DeepDerivative(expected.Annotations, curr.Annotations) { + return nil + } + expected.ResourceVersion = curr.ResourceVersion + _, err = r.updateService(ctx, transform, expected) + if err != nil { + return err + } + return nil +} + func (r *Reconciler) reconcileJsonataTransformationDeployment(ctx context.Context, expression *corev1.ConfigMap, transform *eventing.EventTransform) error { expected := jsonataDeployment(ctx, expression, transform) @@ -104,7 +158,9 @@ func (r *Reconciler) reconcileJsonataTransformationDeployment(ctx context.Contex if err != nil { return fmt.Errorf("failed to get deployment %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) } - if equality.Semantic.DeepDerivative(expected, curr) { + if equality.Semantic.DeepDerivative(expected.Spec, curr.Spec) && + equality.Semantic.DeepDerivative(expected.Labels, curr.Labels) && + equality.Semantic.DeepDerivative(expected.Annotations, curr.Annotations) { transform.Status.PropagateJsonataDeploymentStatus(curr.Status) return nil } @@ -136,7 +192,9 @@ func (r *Reconciler) reconcileJsonataTransformationSinkBinding(ctx context.Conte if err != nil { return fmt.Errorf("failed to get deployment %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) } - if equality.Semantic.DeepDerivative(expected, curr) { + if equality.Semantic.DeepDerivative(expected.Spec, curr.Spec) && + equality.Semantic.DeepDerivative(expected.Labels, curr.Labels) && + equality.Semantic.DeepDerivative(expected.Annotations, curr.Annotations) { transform.Status.PropagateJsonataSinkBindingStatus(curr.Status) return nil } @@ -149,21 +207,56 @@ func (r *Reconciler) reconcileJsonataTransformationSinkBinding(ctx context.Conte return nil } -func (r *Reconciler) createDeployment(ctx context.Context, transform *eventing.EventTransform, expected appsv1.Deployment) (*appsv1.Deployment, error) { - created, err := r.k8s.AppsV1().Deployments(expected.GetNamespace()).Create(ctx, &expected, metav1.CreateOptions{}) +func (r *Reconciler) reconcileJsonataTransformationAddress(ctx context.Context, transform *eventing.EventTransform) error { + service := jsonataService(ctx, transform) + endpoint, err := r.jsonataEndpointLister.Endpoints(transform.GetNamespace()).Get(service.GetName()) + if apierrors.IsNotFound(err) { + transform.Status.MarkWaitingForServiceEndpoints() + return nil + } if err != nil { - return nil, fmt.Errorf("failed to create jsonata deployment %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + return fmt.Errorf("failed to list jsonata endpoints: %w", err) } - controller.GetEventRecorder(ctx).Event(transform, "JsonataDeploymentCreated", "", expected.GetName()) - return created, nil + if len(endpoint.Subsets) == 0 || len(endpoint.Subsets[0].Ports) == 0 { + transform.Status.MarkWaitingForServiceEndpoints() + return nil + } + + // TODO: Support TLS and Authn/z + + hostname := network.GetServiceHostname(service.GetName(), service.GetNamespace()) + transform.Status.SetAddresses(duckv1.Addressable{ + Name: ptr.String("http"), + URL: apis.HTTP(hostname), + }) + + return nil } -func (r *Reconciler) createConfigMap(ctx context.Context, transform *eventing.EventTransform, expected corev1.ConfigMap) (*corev1.ConfigMap, error) { - created, err := r.k8s.CoreV1().ConfigMaps(expected.GetNamespace()).Create(ctx, &expected, metav1.CreateOptions{}) +func (r *Reconciler) createService(ctx context.Context, transform *eventing.EventTransform, expected corev1.Service) (*corev1.Service, error) { + created, err := r.k8s.CoreV1().Services(expected.GetNamespace()).Create(ctx, &expected, metav1.CreateOptions{}) if err != nil { return nil, fmt.Errorf("failed to create jsonata configmap %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) } - controller.GetEventRecorder(ctx).Event(transform, "JsonataConfigMapCreated", "", expected.GetName()) + controller.GetEventRecorder(ctx).Event(transform, corev1.EventTypeNormal, "JsonataServiceCreated", expected.GetName()) + return created, nil +} + +func (r *Reconciler) updateService(ctx context.Context, transform *eventing.EventTransform, expected corev1.Service) (*corev1.Service, error) { + updated, err := r.k8s.CoreV1().Services(expected.GetNamespace()).Update(ctx, &expected, metav1.UpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to update configmap %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + controller.GetEventRecorder(ctx).Event(transform, corev1.EventTypeNormal, "JsonataServiceUpdated", expected.GetName()) + return updated, nil +} + +func (r *Reconciler) createDeployment(ctx context.Context, transform *eventing.EventTransform, expected appsv1.Deployment) (*appsv1.Deployment, error) { + created, err := r.k8s.AppsV1().Deployments(expected.GetNamespace()).Create(ctx, &expected, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to create jsonata deployment %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + controller.GetEventRecorder(ctx).Event(transform, corev1.EventTypeNormal, "JsonataDeploymentCreated", expected.GetName()) return created, nil } @@ -172,16 +265,25 @@ func (r *Reconciler) updateDeployment(ctx context.Context, transform *eventing.E if err != nil { return nil, fmt.Errorf("failed to update deployment %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) } - controller.GetEventRecorder(ctx).Event(transform, "JsonataDeploymentUpdated", "", expected.GetName()) + controller.GetEventRecorder(ctx).Event(transform, corev1.EventTypeNormal, "JsonataDeploymentUpdated", expected.GetName()) return updated, nil } +func (r *Reconciler) createConfigMap(ctx context.Context, transform *eventing.EventTransform, expected corev1.ConfigMap) (*corev1.ConfigMap, error) { + created, err := r.k8s.CoreV1().ConfigMaps(expected.GetNamespace()).Create(ctx, &expected, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to create jsonata configmap %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + controller.GetEventRecorder(ctx).Event(transform, corev1.EventTypeNormal, "JsonataConfigMapCreated", expected.GetName()) + return created, nil +} + func (r *Reconciler) updateConfigMap(ctx context.Context, transform *eventing.EventTransform, expected corev1.ConfigMap) (*corev1.ConfigMap, error) { updated, err := r.k8s.CoreV1().ConfigMaps(expected.GetNamespace()).Update(ctx, &expected, metav1.UpdateOptions{}) if err != nil { return nil, fmt.Errorf("failed to update configmap %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) } - controller.GetEventRecorder(ctx).Event(transform, "ConfigMapUpdated", "", expected.GetName()) + controller.GetEventRecorder(ctx).Event(transform, corev1.EventTypeNormal, "JsonataConfigMapUpdated", expected.GetName()) return updated, nil } @@ -202,6 +304,7 @@ func (r *Reconciler) deleteJsonataTransformationSinkBinding(ctx context.Context, if err != nil { return fmt.Errorf("failed to delete sink binding %s/%s: %w", transform.GetNamespace(), sbName, err) } + controller.GetEventRecorder(ctx).Event(transform, corev1.EventTypeNormal, "JsonataSinkBindingDeleted", sbName) return nil } @@ -210,7 +313,7 @@ func (r *Reconciler) createSinkBinding(ctx context.Context, transform *eventing. if err != nil { return nil, fmt.Errorf("failed to create jsonata sink binding %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) } - controller.GetEventRecorder(ctx).Event(transform, "JsonataSinkBindingCreated", "", expected.GetName()) + controller.GetEventRecorder(ctx).Event(transform, corev1.EventTypeNormal, "JsonataSinkBindingCreated", expected.GetName()) return created, nil } @@ -219,6 +322,6 @@ func (r *Reconciler) updateSinkBinding(ctx context.Context, transform *eventing. if err != nil { return nil, fmt.Errorf("failed to update sink binding %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) } - controller.GetEventRecorder(ctx).Event(transform, "JsonataSinkBindingUpdated", "", expected.GetName()) + controller.GetEventRecorder(ctx).Event(transform, corev1.EventTypeNormal, "JsonataSinkBindingUpdated", expected.GetName()) return updated, nil } diff --git a/pkg/reconciler/eventtransform/resources_jsonata.go b/pkg/reconciler/eventtransform/resources_jsonata.go index 6dc7e0ef980..9bb59435999 100644 --- a/pkg/reconciler/eventtransform/resources_jsonata.go +++ b/pkg/reconciler/eventtransform/resources_jsonata.go @@ -40,7 +40,7 @@ const ( JsonataResourcesLabelKey = "eventing.knative.dev/event-transform-jsonata" JsonataResourcesLabelValue = "true" JsonataExpressionHashKey = "eventing.knative.dev/event-transform-jsonata-expression-hash" - JsonataResourcesNameSuffix = "jsonata" + JsonataResourcesNameSuffix = "-jsonata" JsonataExpressionDataKey = "jsonata-expression" JsonataExpressionPath = "/etc/jsonata" @@ -50,16 +50,15 @@ const ( func jsonataExpressionConfigMap(_ context.Context, transform *eventing.EventTransform) corev1.ConfigMap { expression := corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Name: kmeta.ChildName(transform.Name, JsonataResourcesNameSuffix), - Namespace: transform.GetNamespace(), - Labels: jsonataLabels(transform), - Annotations: transform.Annotations, + Name: kmeta.ChildName(transform.Name, JsonataResourcesNameSuffix), + Namespace: transform.GetNamespace(), + Labels: jsonataLabels(transform), OwnerReferences: []metav1.OwnerReference{ *kmeta.NewControllerRef(transform), }, }, Data: map[string]string{ - JsonataExpressionDataKey: transform.Spec.Transformations.Jsonata.Expression, + JsonataExpressionDataKey: transform.Spec.EventTransformations.Jsonata.Expression, }, } return expression @@ -76,7 +75,7 @@ func jsonataDeployment(_ context.Context, expression *corev1.ConfigMap, transfor Name: kmeta.ChildName(transform.Name, JsonataResourcesNameSuffix), Namespace: transform.GetNamespace(), Labels: jsonataLabels(transform), - Annotations: transform.Annotations, + Annotations: make(map[string]string, 2), OwnerReferences: []metav1.OwnerReference{ *kmeta.NewControllerRef(transform), }, @@ -143,12 +142,45 @@ func jsonataDeployment(_ context.Context, expression *corev1.ConfigMap, transfor } // When the expression changes, this rolls out a new deployment with the latest ConfigMap data. - hash := sha256.Sum256([]byte(transform.Spec.Transformations.Jsonata.Expression)) + hash := sha256.Sum256([]byte(transform.Spec.EventTransformations.Jsonata.Expression)) d.Annotations[JsonataExpressionHashKey] = base64.StdEncoding.EncodeToString(hash[:]) return d } +func jsonataService(_ context.Context, transform *eventing.EventTransform) corev1.Service { + s := corev1.Service{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: kmeta.ChildName(transform.Name, JsonataResourcesNameSuffix), + Namespace: transform.GetNamespace(), + Labels: jsonataLabels(transform), + Annotations: make(map[string]string, 2), + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(transform), + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "http", + Protocol: corev1.ProtocolTCP, + AppProtocol: ptr.String("http"), + Port: 80, + TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080}, + }, + }, + Selector: map[string]string{ + JsonataResourcesLabelKey: JsonataResourcesLabelValue, + NameLabelKey: transform.GetName(), + }, + Type: corev1.ServiceTypeClusterIP, + }, + } + + return s +} + func jsonataSinkBindingName(transform *eventing.EventTransform) string { return kmeta.ChildName(transform.Name, JsonataResourcesNameSuffix) } @@ -157,10 +189,9 @@ func jsonataSinkBinding(_ context.Context, transform *eventing.EventTransform) s name := jsonataSinkBindingName(transform) sb := sourcesv1.SinkBinding{ ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: transform.GetNamespace(), - Labels: jsonataLabels(transform), - Annotations: transform.Annotations, + Name: name, + Namespace: transform.GetNamespace(), + Labels: jsonataLabels(transform), OwnerReferences: []metav1.OwnerReference{ *kmeta.NewControllerRef(transform), }, @@ -184,10 +215,7 @@ func jsonataSinkBinding(_ context.Context, transform *eventing.EventTransform) s } func jsonataLabels(transform *eventing.EventTransform) map[string]string { - labels := make(map[string]string, len(transform.Labels)+2) - for k, v := range transform.Labels { - labels[k] = v - } + labels := make(map[string]string, 2) labels[JsonataResourcesLabelKey] = JsonataResourcesLabelValue labels[NameLabelKey] = transform.GetName() return labels diff --git a/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/service/filtered/service.go b/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/service/filtered/service.go new file mode 100644 index 00000000000..f20068a9847 --- /dev/null +++ b/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/service/filtered/service.go @@ -0,0 +1,65 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package filtered + +import ( + context "context" + + v1 "k8s.io/client-go/informers/core/v1" + filtered "knative.dev/pkg/client/injection/kube/informers/factory/filtered" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" + logging "knative.dev/pkg/logging" +) + +func init() { + injection.Default.RegisterFilteredInformers(withInformer) +} + +// Key is used for associating the Informer inside the context.Context. +type Key struct { + Selector string +} + +func withInformer(ctx context.Context) (context.Context, []controller.Informer) { + untyped := ctx.Value(filtered.LabelKey{}) + if untyped == nil { + logging.FromContext(ctx).Panic( + "Unable to fetch labelkey from context.") + } + labelSelectors := untyped.([]string) + infs := []controller.Informer{} + for _, selector := range labelSelectors { + f := filtered.Get(ctx, selector) + inf := f.Core().V1().Services() + ctx = context.WithValue(ctx, Key{Selector: selector}, inf) + infs = append(infs, inf.Informer()) + } + return ctx, infs +} + +// Get extracts the typed informer from the context. +func Get(ctx context.Context, selector string) v1.ServiceInformer { + untyped := ctx.Value(Key{Selector: selector}) + if untyped == nil { + logging.FromContext(ctx).Panicf( + "Unable to fetch k8s.io/client-go/informers/core/v1.ServiceInformer with selector %s from context.", selector) + } + return untyped.(v1.ServiceInformer) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index de5e231e5e5..038dfbdeda6 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1170,6 +1170,7 @@ knative.dev/pkg/client/injection/kube/informers/core/v1/namespace/fake knative.dev/pkg/client/injection/kube/informers/core/v1/secret/filtered knative.dev/pkg/client/injection/kube/informers/core/v1/service knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake +knative.dev/pkg/client/injection/kube/informers/core/v1/service/filtered knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered