From 75195a5873292167d017dc075c6258b06a51e8f5 Mon Sep 17 00:00:00 2001
From: Pierangelo Di Pilato
Date: Thu, 13 Feb 2025 16:08:52 +0100
Subject: [PATCH] EventTransform: Reconcile address and service (#8458)
- Set address only once endpoints are available
- Add debug logging
- Fix semantic comparisons to avoid loops
Signed-off-by: Pierangelo Di Pilato
---
cmd/webhook/main.go | 3 +-
config/core/deployments/controller.yaml | 3 +
docs/eventing-api.md | 12 +-
.../v1alpha1/eventtransform_defaults.go | 24 ++++
.../v1alpha1/eventtransform_lifecycle.go | 7 +-
.../v1alpha1/eventtransform_lifecycle_test.go | 2 +-
.../eventing/v1alpha1/eventtransform_types.go | 5 +-
.../v1alpha1/eventtransform_validation.go | 6 +-
.../eventtransform_validation_test.go | 46 ++++++
.../v1alpha1/zz_generated.deepcopy.go | 2 +-
pkg/reconciler/eventtransform/controller.go | 22 +++
.../eventtransform/eventtransform.go | 135 +++++++++++++++---
.../eventtransform/resources_jsonata.go | 60 +++++---
.../core/v1/service/filtered/service.go | 65 +++++++++
vendor/modules.txt | 1 +
15 files changed, 346 insertions(+), 47 deletions(-)
create mode 100644 pkg/apis/eventing/v1alpha1/eventtransform_defaults.go
create mode 100644 pkg/apis/eventing/v1alpha1/eventtransform_validation_test.go
create mode 100644 vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/service/filtered/service.go
diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go
index 76dc703e1e5..634a3927f16 100644
--- a/cmd/webhook/main.go
+++ b/cmd/webhook/main.go
@@ -79,7 +79,8 @@ func init() {
var ourTypes = map[schema.GroupVersionKind]resourcesemantics.GenericCRD{
// For group eventing.knative.dev.
// v1alpha1
- eventingv1alpha1.SchemeGroupVersion.WithKind("EventPolicy"): &eventingv1alpha1.EventPolicy{},
+ eventingv1alpha1.SchemeGroupVersion.WithKind("EventPolicy"): &eventingv1alpha1.EventPolicy{},
+ eventingv1alpha1.SchemeGroupVersion.WithKind("EventTransform"): &eventingv1alpha1.EventTransform{},
// v1beta1
eventingv1beta1.SchemeGroupVersion.WithKind("EventType"): &eventingv1beta1.EventType{},
// v1beta2
diff --git a/config/core/deployments/controller.yaml b/config/core/deployments/controller.yaml
index 77ff138b53c..efb5cab5432 100644
--- a/config/core/deployments/controller.yaml
+++ b/config/core/deployments/controller.yaml
@@ -77,6 +77,9 @@ spec:
fieldRef:
fieldPath: metadata.name
+ - name: EVENT_TRANSFORM_JSONATA_IMAGE
+ value: quay.io/pierdipi/jsonata-transform@sha256:6aafed0012b8a3c24a1cd72fc4521981ec7680cf64988c22ded0aafa2a431c93
+
## Adapter settings
# - name: K_LOGGING_CONFIG
diff --git a/docs/eventing-api.md b/docs/eventing-api.md
index 8e0861534c8..8f01fce7573 100644
--- a/docs/eventing-api.md
+++ b/docs/eventing-api.md
@@ -2858,7 +2858,7 @@ back to the broker.
-Transformations
+EventTransformations
EventTransformations
@@ -2867,9 +2867,9 @@ EventTransformations
|
-(Members of Transformations are embedded into this type.)
+(Members of EventTransformations are embedded into this type.)
-Transformations contain all possible transformations, only one “type” can be used.
+EventTransformations contain all possible transformations, only one “type” can be used.
|
@@ -3421,7 +3421,7 @@ back to the broker.
-Transformations
+EventTransformations
EventTransformations
@@ -3430,9 +3430,9 @@ EventTransformations
|
-(Members of Transformations are embedded into this type.)
+(Members of EventTransformations are embedded into this type.)
-Transformations contain all possible transformations, only one “type” can be used.
+EventTransformations contain all possible transformations, only one “type” can be used.
|
diff --git a/pkg/apis/eventing/v1alpha1/eventtransform_defaults.go b/pkg/apis/eventing/v1alpha1/eventtransform_defaults.go
new file mode 100644
index 00000000000..d061fbe60ac
--- /dev/null
+++ b/pkg/apis/eventing/v1alpha1/eventtransform_defaults.go
@@ -0,0 +1,24 @@
+/*
+Copyright 2025 The Knative Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package v1alpha1
+
+import (
+ "context"
+)
+
+func (t *EventTransform) SetDefaults(_ context.Context) {
+}
diff --git a/pkg/apis/eventing/v1alpha1/eventtransform_lifecycle.go b/pkg/apis/eventing/v1alpha1/eventtransform_lifecycle.go
index bb57a481fb9..05134cf3bc4 100644
--- a/pkg/apis/eventing/v1alpha1/eventtransform_lifecycle.go
+++ b/pkg/apis/eventing/v1alpha1/eventtransform_lifecycle.go
@@ -28,7 +28,8 @@ const (
TransformConditionAddressable apis.ConditionType = "Addressable"
TransformationConditionReady apis.ConditionType = "TransformationReady"
- TransformationAddressableEmptyURL string = "NoURL"
+ TransformationAddressableEmptyURL string = "NoURL"
+ TransformationAddressableWaitingForServiceEndpoints string = "WaitingForServiceEndpoints"
// Specific transformations conditions
@@ -159,6 +160,10 @@ func (ts *EventTransformStatus) propagateTransformationConditionStatus(cond *api
}
}
+func (ts *EventTransformStatus) MarkWaitingForServiceEndpoints() {
+ ts.GetConditionSet().Manage(ts).MarkFalse(TransformConditionAddressable, TransformationAddressableWaitingForServiceEndpoints, "URL is empty")
+}
+
func (ts *EventTransformStatus) SetAddresses(addresses ...duckv1.Addressable) {
if len(addresses) == 0 || addresses[0].URL.IsEmpty() {
ts.GetConditionSet().Manage(ts).MarkFalse(TransformConditionAddressable, TransformationAddressableEmptyURL, "URL is empty")
diff --git a/pkg/apis/eventing/v1alpha1/eventtransform_lifecycle_test.go b/pkg/apis/eventing/v1alpha1/eventtransform_lifecycle_test.go
index 3b5e4983b9d..df7a537318f 100644
--- a/pkg/apis/eventing/v1alpha1/eventtransform_lifecycle_test.go
+++ b/pkg/apis/eventing/v1alpha1/eventtransform_lifecycle_test.go
@@ -34,7 +34,7 @@ func TestFullLifecycle(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{},
Spec: EventTransformSpec{
Sink: nil,
- Transformations: EventTransformations{
+ EventTransformations: EventTransformations{
Jsonata: &JsonataEventTransformationSpec{
Expression: `
{
diff --git a/pkg/apis/eventing/v1alpha1/eventtransform_types.go b/pkg/apis/eventing/v1alpha1/eventtransform_types.go
index 8106a0261b8..a7a5d13e46f 100644
--- a/pkg/apis/eventing/v1alpha1/eventtransform_types.go
+++ b/pkg/apis/eventing/v1alpha1/eventtransform_types.go
@@ -49,6 +49,7 @@ type EventTransform struct {
var (
// Check that EventTransform can be validated, can be defaulted, and has immutable fields.
_ apis.Validatable = (*EventTransform)(nil)
+ _ apis.Defaultable = (*EventTransform)(nil)
// Check that EventTransform can return its spec untyped.
_ apis.HasSpec = (*EventTransform)(nil)
@@ -73,8 +74,8 @@ type EventTransformSpec struct {
// +optional
Sink *duckv1.Destination `json:"sink,omitempty"`
- // Transformations contain all possible transformations, only one "type" can be used.
- Transformations EventTransformations `json:",inline"`
+ // EventTransformations contain all possible transformations, only one "type" can be used.
+ EventTransformations `json:",inline"`
}
type EventTransformations struct {
diff --git a/pkg/apis/eventing/v1alpha1/eventtransform_validation.go b/pkg/apis/eventing/v1alpha1/eventtransform_validation.go
index 1dbc47a58cf..7954a42560e 100644
--- a/pkg/apis/eventing/v1alpha1/eventtransform_validation.go
+++ b/pkg/apis/eventing/v1alpha1/eventtransform_validation.go
@@ -36,7 +36,7 @@ func (ts *EventTransformSpec) Validate(ctx context.Context) *apis.FieldError {
// These are transformations field paths.
transformations := make([]string, 0, 2)
- if ts.Transformations.Jsonata != nil {
+ if ts.EventTransformations.Jsonata != nil {
transformations = append(transformations, "jsonata")
}
@@ -46,7 +46,7 @@ func (ts *EventTransformSpec) Validate(ctx context.Context) *apis.FieldError {
errs = errs.Also(apis.ErrMultipleOneOf(transformations...))
}
- errs = errs.Also(ts.Transformations.Jsonata.Validate(ctx).ViaField("jsonata"))
+ errs = errs.Also(ts.EventTransformations.Jsonata.Validate(ctx).ViaField("jsonata"))
errs = errs.Also(ts.Sink.Validate(ctx).ViaField("sink"))
if apis.IsInUpdate(ctx) {
@@ -71,7 +71,7 @@ func (in *EventTransformSpec) CheckImmutableFields(_ context.Context, original *
var errs *apis.FieldError
- if original.Spec.Transformations.Jsonata != nil && in.Transformations.Jsonata == nil {
+ if original.Spec.EventTransformations.Jsonata != nil && in.EventTransformations.Jsonata == nil {
errs = errs.Also(apis.ErrGeneric("transformations types are immutable, jsonata transformation cannot be changed to a different transformation type"))
}
diff --git a/pkg/apis/eventing/v1alpha1/eventtransform_validation_test.go b/pkg/apis/eventing/v1alpha1/eventtransform_validation_test.go
new file mode 100644
index 00000000000..754ac87c684
--- /dev/null
+++ b/pkg/apis/eventing/v1alpha1/eventtransform_validation_test.go
@@ -0,0 +1,46 @@
+/*
+Copyright 2025 The Knative Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package v1alpha1
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "knative.dev/pkg/webhook/json"
+)
+
+func TestJSONDecode(t *testing.T) {
+
+ et := &EventTransform{}
+
+ err := json.Decode([]byte(`
+{
+ "apiVersion": "eventing.knative.dev/v1alpha1",
+ "kind": "EventTransform",
+ "metadata": {
+ "name": "identity"
+ },
+ "spec": {
+ "jsonata": {
+ "expression": "{\n \"specversion\": \"1.0\",\n \"id\": id,\n \"type\": \"transformation.jsonata\",\n \"source\": \"transformation.json.identity\",\n \"data\": $\n}\n"
+ }
+ }
+}
+`), et, true)
+
+ assert.Nil(t, err)
+}
diff --git a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go
index 23e5f8a1348..3df74897a33 100644
--- a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go
+++ b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go
@@ -328,7 +328,7 @@ func (in *EventTransformSpec) DeepCopyInto(out *EventTransformSpec) {
*out = new(duckv1.Destination)
(*in).DeepCopyInto(*out)
}
- in.Transformations.DeepCopyInto(&out.Transformations)
+ in.EventTransformations.DeepCopyInto(&out.EventTransformations)
return
}
diff --git a/pkg/reconciler/eventtransform/controller.go b/pkg/reconciler/eventtransform/controller.go
index b2cb16612f3..12750a24f32 100644
--- a/pkg/reconciler/eventtransform/controller.go
+++ b/pkg/reconciler/eventtransform/controller.go
@@ -19,10 +19,13 @@ package eventtransform
import (
"context"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
+ "k8s.io/client-go/informers"
kubeclient "knative.dev/pkg/client/injection/kube/client"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/filtered"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered"
+ serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service/filtered"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/kmeta"
@@ -48,6 +51,17 @@ func NewController(
jsonataConfigMapInformer := configmapinformer.Get(ctx, JsonataResourcesSelector)
jsonataDeploymentInformer := deploymentinformer.Get(ctx, JsonataResourcesSelector)
jsonataSinkBindingInformer := sinkbindinginformer.Get(ctx, JsonataResourcesSelector)
+ jsonataServiceInformer := serviceinformer.Get(ctx, JsonataResourcesSelector)
+
+ // Create a custom informer as one in knative/pkg doesn't exist for endpoints.
+ factory := informers.NewSharedInformerFactoryWithOptions(
+ kubeclient.Get(ctx),
+ controller.DefaultResyncPeriod,
+ informers.WithTweakListOptions(func(options *metav1.ListOptions) {
+ options.LabelSelector = JsonataResourcesSelector
+ }),
+ )
+ jsonataEndpointInformer := factory.Core().V1().Endpoints()
var globalResync func()
@@ -63,6 +77,8 @@ func NewController(
client: eventingclient.Get(ctx),
jsonataConfigMapLister: jsonataConfigMapInformer.Lister(),
jsonataDeploymentsLister: jsonataDeploymentInformer.Lister(),
+ jsonataServiceLister: jsonataServiceInformer.Lister(),
+ jsonataEndpointLister: jsonataEndpointInformer.Lister(),
jsonataSinkBindingLister: jsonataSinkBindingInformer.Lister(),
}
@@ -79,9 +95,15 @@ func NewController(
eventTransformInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
jsonataDeploymentInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl)))
+ jsonataServiceInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl)))
+ jsonataEndpointInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl)))
jsonataConfigMapInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl)))
jsonataSinkBindingInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl)))
+ // Start the factory after creating all necessary informers.
+ factory.Start(ctx.Done())
+ factory.WaitForCacheSync(ctx.Done())
+
return impl
}
diff --git a/pkg/reconciler/eventtransform/eventtransform.go b/pkg/reconciler/eventtransform/eventtransform.go
index 2500cdd88c4..467a9f4a626 100644
--- a/pkg/reconciler/eventtransform/eventtransform.go
+++ b/pkg/reconciler/eventtransform/eventtransform.go
@@ -28,7 +28,12 @@ import (
"k8s.io/client-go/kubernetes"
appslister "k8s.io/client-go/listers/apps/v1"
corelister "k8s.io/client-go/listers/core/v1"
+ "knative.dev/pkg/apis"
+ duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/controller"
+ "knative.dev/pkg/logging"
+ "knative.dev/pkg/network"
+ "knative.dev/pkg/ptr"
"knative.dev/pkg/reconciler"
eventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
@@ -43,6 +48,8 @@ type Reconciler struct {
jsonataConfigMapLister corelister.ConfigMapLister
jsonataDeploymentsLister appslister.DeploymentLister
+ jsonataServiceLister corelister.ServiceLister
+ jsonataEndpointLister corelister.EndpointsLister
jsonataSinkBindingLister sourceslisters.SinkBindingLister
}
@@ -54,21 +61,39 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, transform *eventing.Even
}
func (r *Reconciler) reconcileJsonataTransformation(ctx context.Context, transform *eventing.EventTransform) error {
- if transform.Spec.Transformations.Jsonata == nil {
+ logger := logging.FromContext(ctx)
+
+ if transform.Spec.EventTransformations.Jsonata == nil {
+ logger.Debug("No Jsonata transformation found")
return nil
}
+ logger.Debugw("Reconciling Jsonata transformation ConfigMap")
expressionCm, err := r.reconcileJsonataTransformationConfigMap(ctx, transform)
if err != nil {
return fmt.Errorf("failed to reconcile Jsonata transformation deployment: %w", err)
}
+
+ logger.Debugw("Reconciling Jsonata transformation Service")
+ if err := r.reconcileJsonataTransformationService(ctx, transform); err != nil {
+ return fmt.Errorf("failed to reconcile Jsonata transformation deployment: %w", err)
+ }
+
+ logger.Debugw("Reconciling Jsonata transformation Deployment")
if err := r.reconcileJsonataTransformationDeployment(ctx, expressionCm, transform); err != nil {
return fmt.Errorf("failed to reconcile Jsonata transformation deployment: %w", err)
}
+
+ logger.Debugw("Reconciling Jsonata transformation SinkBinding")
if err := r.reconcileJsonataTransformationSinkBinding(ctx, transform); err != nil {
return fmt.Errorf("failed to reconcile Jsonata transformation sink binding: %w", err)
}
+ logger.Debugw("Reconciling Jsonata transformation address")
+ if err := r.reconcileJsonataTransformationAddress(ctx, transform); err != nil {
+ return fmt.Errorf("failed to reconcile Jsonata transformation address: %w", err)
+ }
+
return nil
}
@@ -82,13 +107,42 @@ func (r *Reconciler) reconcileJsonataTransformationConfigMap(ctx context.Context
if err != nil {
return nil, fmt.Errorf("failed to get configmap %s/%s: %w", expected.GetNamespace(), expected.GetName(), err)
}
- if equality.Semantic.DeepDerivative(expected, curr) {
+ if equality.Semantic.DeepDerivative(expected.Data, curr.Data) &&
+ equality.Semantic.DeepDerivative(expected.Labels, curr.Labels) &&
+ equality.Semantic.DeepDerivative(expected.Annotations, curr.Annotations) {
return curr, nil
}
expected.ResourceVersion = curr.ResourceVersion
return r.updateConfigMap(ctx, transform, expected)
}
+func (r *Reconciler) reconcileJsonataTransformationService(ctx context.Context, transform *eventing.EventTransform) error {
+ expected := jsonataService(ctx, transform)
+
+ curr, err := r.jsonataServiceLister.Services(expected.GetNamespace()).Get(expected.GetName())
+ if apierrors.IsNotFound(err) {
+ _, err := r.createService(ctx, transform, expected)
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+ if err != nil {
+ return fmt.Errorf("failed to get service %s/%s: %w", expected.GetNamespace(), expected.GetName(), err)
+ }
+ if equality.Semantic.DeepDerivative(expected.Spec, curr.Spec) &&
+ equality.Semantic.DeepDerivative(expected.Labels, curr.Labels) &&
+ equality.Semantic.DeepDerivative(expected.Annotations, curr.Annotations) {
+ return nil
+ }
+ expected.ResourceVersion = curr.ResourceVersion
+ _, err = r.updateService(ctx, transform, expected)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
func (r *Reconciler) reconcileJsonataTransformationDeployment(ctx context.Context, expression *corev1.ConfigMap, transform *eventing.EventTransform) error {
expected := jsonataDeployment(ctx, expression, transform)
@@ -104,7 +158,9 @@ func (r *Reconciler) reconcileJsonataTransformationDeployment(ctx context.Contex
if err != nil {
return fmt.Errorf("failed to get deployment %s/%s: %w", expected.GetNamespace(), expected.GetName(), err)
}
- if equality.Semantic.DeepDerivative(expected, curr) {
+ if equality.Semantic.DeepDerivative(expected.Spec, curr.Spec) &&
+ equality.Semantic.DeepDerivative(expected.Labels, curr.Labels) &&
+ equality.Semantic.DeepDerivative(expected.Annotations, curr.Annotations) {
transform.Status.PropagateJsonataDeploymentStatus(curr.Status)
return nil
}
@@ -136,7 +192,9 @@ func (r *Reconciler) reconcileJsonataTransformationSinkBinding(ctx context.Conte
if err != nil {
return fmt.Errorf("failed to get deployment %s/%s: %w", expected.GetNamespace(), expected.GetName(), err)
}
- if equality.Semantic.DeepDerivative(expected, curr) {
+ if equality.Semantic.DeepDerivative(expected.Spec, curr.Spec) &&
+ equality.Semantic.DeepDerivative(expected.Labels, curr.Labels) &&
+ equality.Semantic.DeepDerivative(expected.Annotations, curr.Annotations) {
transform.Status.PropagateJsonataSinkBindingStatus(curr.Status)
return nil
}
@@ -149,21 +207,56 @@ func (r *Reconciler) reconcileJsonataTransformationSinkBinding(ctx context.Conte
return nil
}
-func (r *Reconciler) createDeployment(ctx context.Context, transform *eventing.EventTransform, expected appsv1.Deployment) (*appsv1.Deployment, error) {
- created, err := r.k8s.AppsV1().Deployments(expected.GetNamespace()).Create(ctx, &expected, metav1.CreateOptions{})
+func (r *Reconciler) reconcileJsonataTransformationAddress(ctx context.Context, transform *eventing.EventTransform) error {
+ service := jsonataService(ctx, transform)
+ endpoint, err := r.jsonataEndpointLister.Endpoints(transform.GetNamespace()).Get(service.GetName())
+ if apierrors.IsNotFound(err) {
+ transform.Status.MarkWaitingForServiceEndpoints()
+ return nil
+ }
if err != nil {
- return nil, fmt.Errorf("failed to create jsonata deployment %s/%s: %w", expected.GetNamespace(), expected.GetName(), err)
+ return fmt.Errorf("failed to list jsonata endpoints: %w", err)
}
- controller.GetEventRecorder(ctx).Event(transform, "JsonataDeploymentCreated", "", expected.GetName())
- return created, nil
+ if len(endpoint.Subsets) == 0 || len(endpoint.Subsets[0].Ports) == 0 {
+ transform.Status.MarkWaitingForServiceEndpoints()
+ return nil
+ }
+
+ // TODO: Support TLS and Authn/z
+
+ hostname := network.GetServiceHostname(service.GetName(), service.GetNamespace())
+ transform.Status.SetAddresses(duckv1.Addressable{
+ Name: ptr.String("http"),
+ URL: apis.HTTP(hostname),
+ })
+
+ return nil
}
-func (r *Reconciler) createConfigMap(ctx context.Context, transform *eventing.EventTransform, expected corev1.ConfigMap) (*corev1.ConfigMap, error) {
- created, err := r.k8s.CoreV1().ConfigMaps(expected.GetNamespace()).Create(ctx, &expected, metav1.CreateOptions{})
+func (r *Reconciler) createService(ctx context.Context, transform *eventing.EventTransform, expected corev1.Service) (*corev1.Service, error) {
+ created, err := r.k8s.CoreV1().Services(expected.GetNamespace()).Create(ctx, &expected, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create jsonata configmap %s/%s: %w", expected.GetNamespace(), expected.GetName(), err)
}
- controller.GetEventRecorder(ctx).Event(transform, "JsonataConfigMapCreated", "", expected.GetName())
+ controller.GetEventRecorder(ctx).Event(transform, corev1.EventTypeNormal, "JsonataServiceCreated", expected.GetName())
+ return created, nil
+}
+
+func (r *Reconciler) updateService(ctx context.Context, transform *eventing.EventTransform, expected corev1.Service) (*corev1.Service, error) {
+ updated, err := r.k8s.CoreV1().Services(expected.GetNamespace()).Update(ctx, &expected, metav1.UpdateOptions{})
+ if err != nil {
+ return nil, fmt.Errorf("failed to update configmap %s/%s: %w", expected.GetNamespace(), expected.GetName(), err)
+ }
+ controller.GetEventRecorder(ctx).Event(transform, corev1.EventTypeNormal, "JsonataServiceUpdated", expected.GetName())
+ return updated, nil
+}
+
+func (r *Reconciler) createDeployment(ctx context.Context, transform *eventing.EventTransform, expected appsv1.Deployment) (*appsv1.Deployment, error) {
+ created, err := r.k8s.AppsV1().Deployments(expected.GetNamespace()).Create(ctx, &expected, metav1.CreateOptions{})
+ if err != nil {
+ return nil, fmt.Errorf("failed to create jsonata deployment %s/%s: %w", expected.GetNamespace(), expected.GetName(), err)
+ }
+ controller.GetEventRecorder(ctx).Event(transform, corev1.EventTypeNormal, "JsonataDeploymentCreated", expected.GetName())
return created, nil
}
@@ -172,16 +265,25 @@ func (r *Reconciler) updateDeployment(ctx context.Context, transform *eventing.E
if err != nil {
return nil, fmt.Errorf("failed to update deployment %s/%s: %w", expected.GetNamespace(), expected.GetName(), err)
}
- controller.GetEventRecorder(ctx).Event(transform, "JsonataDeploymentUpdated", "", expected.GetName())
+ controller.GetEventRecorder(ctx).Event(transform, corev1.EventTypeNormal, "JsonataDeploymentUpdated", expected.GetName())
return updated, nil
}
+func (r *Reconciler) createConfigMap(ctx context.Context, transform *eventing.EventTransform, expected corev1.ConfigMap) (*corev1.ConfigMap, error) {
+ created, err := r.k8s.CoreV1().ConfigMaps(expected.GetNamespace()).Create(ctx, &expected, metav1.CreateOptions{})
+ if err != nil {
+ return nil, fmt.Errorf("failed to create jsonata configmap %s/%s: %w", expected.GetNamespace(), expected.GetName(), err)
+ }
+ controller.GetEventRecorder(ctx).Event(transform, corev1.EventTypeNormal, "JsonataConfigMapCreated", expected.GetName())
+ return created, nil
+}
+
func (r *Reconciler) updateConfigMap(ctx context.Context, transform *eventing.EventTransform, expected corev1.ConfigMap) (*corev1.ConfigMap, error) {
updated, err := r.k8s.CoreV1().ConfigMaps(expected.GetNamespace()).Update(ctx, &expected, metav1.UpdateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to update configmap %s/%s: %w", expected.GetNamespace(), expected.GetName(), err)
}
- controller.GetEventRecorder(ctx).Event(transform, "ConfigMapUpdated", "", expected.GetName())
+ controller.GetEventRecorder(ctx).Event(transform, corev1.EventTypeNormal, "JsonataConfigMapUpdated", expected.GetName())
return updated, nil
}
@@ -202,6 +304,7 @@ func (r *Reconciler) deleteJsonataTransformationSinkBinding(ctx context.Context,
if err != nil {
return fmt.Errorf("failed to delete sink binding %s/%s: %w", transform.GetNamespace(), sbName, err)
}
+ controller.GetEventRecorder(ctx).Event(transform, corev1.EventTypeNormal, "JsonataSinkBindingDeleted", sbName)
return nil
}
@@ -210,7 +313,7 @@ func (r *Reconciler) createSinkBinding(ctx context.Context, transform *eventing.
if err != nil {
return nil, fmt.Errorf("failed to create jsonata sink binding %s/%s: %w", expected.GetNamespace(), expected.GetName(), err)
}
- controller.GetEventRecorder(ctx).Event(transform, "JsonataSinkBindingCreated", "", expected.GetName())
+ controller.GetEventRecorder(ctx).Event(transform, corev1.EventTypeNormal, "JsonataSinkBindingCreated", expected.GetName())
return created, nil
}
@@ -219,6 +322,6 @@ func (r *Reconciler) updateSinkBinding(ctx context.Context, transform *eventing.
if err != nil {
return nil, fmt.Errorf("failed to update sink binding %s/%s: %w", expected.GetNamespace(), expected.GetName(), err)
}
- controller.GetEventRecorder(ctx).Event(transform, "JsonataSinkBindingUpdated", "", expected.GetName())
+ controller.GetEventRecorder(ctx).Event(transform, corev1.EventTypeNormal, "JsonataSinkBindingUpdated", expected.GetName())
return updated, nil
}
diff --git a/pkg/reconciler/eventtransform/resources_jsonata.go b/pkg/reconciler/eventtransform/resources_jsonata.go
index 6dc7e0ef980..9bb59435999 100644
--- a/pkg/reconciler/eventtransform/resources_jsonata.go
+++ b/pkg/reconciler/eventtransform/resources_jsonata.go
@@ -40,7 +40,7 @@ const (
JsonataResourcesLabelKey = "eventing.knative.dev/event-transform-jsonata"
JsonataResourcesLabelValue = "true"
JsonataExpressionHashKey = "eventing.knative.dev/event-transform-jsonata-expression-hash"
- JsonataResourcesNameSuffix = "jsonata"
+ JsonataResourcesNameSuffix = "-jsonata"
JsonataExpressionDataKey = "jsonata-expression"
JsonataExpressionPath = "/etc/jsonata"
@@ -50,16 +50,15 @@ const (
func jsonataExpressionConfigMap(_ context.Context, transform *eventing.EventTransform) corev1.ConfigMap {
expression := corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
- Name: kmeta.ChildName(transform.Name, JsonataResourcesNameSuffix),
- Namespace: transform.GetNamespace(),
- Labels: jsonataLabels(transform),
- Annotations: transform.Annotations,
+ Name: kmeta.ChildName(transform.Name, JsonataResourcesNameSuffix),
+ Namespace: transform.GetNamespace(),
+ Labels: jsonataLabels(transform),
OwnerReferences: []metav1.OwnerReference{
*kmeta.NewControllerRef(transform),
},
},
Data: map[string]string{
- JsonataExpressionDataKey: transform.Spec.Transformations.Jsonata.Expression,
+ JsonataExpressionDataKey: transform.Spec.EventTransformations.Jsonata.Expression,
},
}
return expression
@@ -76,7 +75,7 @@ func jsonataDeployment(_ context.Context, expression *corev1.ConfigMap, transfor
Name: kmeta.ChildName(transform.Name, JsonataResourcesNameSuffix),
Namespace: transform.GetNamespace(),
Labels: jsonataLabels(transform),
- Annotations: transform.Annotations,
+ Annotations: make(map[string]string, 2),
OwnerReferences: []metav1.OwnerReference{
*kmeta.NewControllerRef(transform),
},
@@ -143,12 +142,45 @@ func jsonataDeployment(_ context.Context, expression *corev1.ConfigMap, transfor
}
// When the expression changes, this rolls out a new deployment with the latest ConfigMap data.
- hash := sha256.Sum256([]byte(transform.Spec.Transformations.Jsonata.Expression))
+ hash := sha256.Sum256([]byte(transform.Spec.EventTransformations.Jsonata.Expression))
d.Annotations[JsonataExpressionHashKey] = base64.StdEncoding.EncodeToString(hash[:])
return d
}
+func jsonataService(_ context.Context, transform *eventing.EventTransform) corev1.Service {
+ s := corev1.Service{
+ TypeMeta: metav1.TypeMeta{},
+ ObjectMeta: metav1.ObjectMeta{
+ Name: kmeta.ChildName(transform.Name, JsonataResourcesNameSuffix),
+ Namespace: transform.GetNamespace(),
+ Labels: jsonataLabels(transform),
+ Annotations: make(map[string]string, 2),
+ OwnerReferences: []metav1.OwnerReference{
+ *kmeta.NewControllerRef(transform),
+ },
+ },
+ Spec: corev1.ServiceSpec{
+ Ports: []corev1.ServicePort{
+ {
+ Name: "http",
+ Protocol: corev1.ProtocolTCP,
+ AppProtocol: ptr.String("http"),
+ Port: 80,
+ TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080},
+ },
+ },
+ Selector: map[string]string{
+ JsonataResourcesLabelKey: JsonataResourcesLabelValue,
+ NameLabelKey: transform.GetName(),
+ },
+ Type: corev1.ServiceTypeClusterIP,
+ },
+ }
+
+ return s
+}
+
func jsonataSinkBindingName(transform *eventing.EventTransform) string {
return kmeta.ChildName(transform.Name, JsonataResourcesNameSuffix)
}
@@ -157,10 +189,9 @@ func jsonataSinkBinding(_ context.Context, transform *eventing.EventTransform) s
name := jsonataSinkBindingName(transform)
sb := sourcesv1.SinkBinding{
ObjectMeta: metav1.ObjectMeta{
- Name: name,
- Namespace: transform.GetNamespace(),
- Labels: jsonataLabels(transform),
- Annotations: transform.Annotations,
+ Name: name,
+ Namespace: transform.GetNamespace(),
+ Labels: jsonataLabels(transform),
OwnerReferences: []metav1.OwnerReference{
*kmeta.NewControllerRef(transform),
},
@@ -184,10 +215,7 @@ func jsonataSinkBinding(_ context.Context, transform *eventing.EventTransform) s
}
func jsonataLabels(transform *eventing.EventTransform) map[string]string {
- labels := make(map[string]string, len(transform.Labels)+2)
- for k, v := range transform.Labels {
- labels[k] = v
- }
+ labels := make(map[string]string, 2)
labels[JsonataResourcesLabelKey] = JsonataResourcesLabelValue
labels[NameLabelKey] = transform.GetName()
return labels
diff --git a/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/service/filtered/service.go b/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/service/filtered/service.go
new file mode 100644
index 00000000000..f20068a9847
--- /dev/null
+++ b/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/service/filtered/service.go
@@ -0,0 +1,65 @@
+/*
+Copyright 2022 The Knative Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Code generated by injection-gen. DO NOT EDIT.
+
+package filtered
+
+import (
+ context "context"
+
+ v1 "k8s.io/client-go/informers/core/v1"
+ filtered "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
+ controller "knative.dev/pkg/controller"
+ injection "knative.dev/pkg/injection"
+ logging "knative.dev/pkg/logging"
+)
+
+func init() {
+ injection.Default.RegisterFilteredInformers(withInformer)
+}
+
+// Key is used for associating the Informer inside the context.Context.
+type Key struct {
+ Selector string
+}
+
+func withInformer(ctx context.Context) (context.Context, []controller.Informer) {
+ untyped := ctx.Value(filtered.LabelKey{})
+ if untyped == nil {
+ logging.FromContext(ctx).Panic(
+ "Unable to fetch labelkey from context.")
+ }
+ labelSelectors := untyped.([]string)
+ infs := []controller.Informer{}
+ for _, selector := range labelSelectors {
+ f := filtered.Get(ctx, selector)
+ inf := f.Core().V1().Services()
+ ctx = context.WithValue(ctx, Key{Selector: selector}, inf)
+ infs = append(infs, inf.Informer())
+ }
+ return ctx, infs
+}
+
+// Get extracts the typed informer from the context.
+func Get(ctx context.Context, selector string) v1.ServiceInformer {
+ untyped := ctx.Value(Key{Selector: selector})
+ if untyped == nil {
+ logging.FromContext(ctx).Panicf(
+ "Unable to fetch k8s.io/client-go/informers/core/v1.ServiceInformer with selector %s from context.", selector)
+ }
+ return untyped.(v1.ServiceInformer)
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index de5e231e5e5..038dfbdeda6 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -1170,6 +1170,7 @@ knative.dev/pkg/client/injection/kube/informers/core/v1/namespace/fake
knative.dev/pkg/client/injection/kube/informers/core/v1/secret/filtered
knative.dev/pkg/client/injection/kube/informers/core/v1/service
knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake
+knative.dev/pkg/client/injection/kube/informers/core/v1/service/filtered
knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount
knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake
knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered