Skip to content

Commit

Permalink
EventTransform: Reconcile address and service (#8458)
Browse files Browse the repository at this point in the history
- Set address only once endpoints are available
- Add debug logging
- Fix semantic comparisons to avoid loops

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored Feb 13, 2025
1 parent d9498eb commit 75195a5
Show file tree
Hide file tree
Showing 15 changed files with 346 additions and 47 deletions.
3 changes: 2 additions & 1 deletion cmd/webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func init() {
var ourTypes = map[schema.GroupVersionKind]resourcesemantics.GenericCRD{
// For group eventing.knative.dev.
// v1alpha1
eventingv1alpha1.SchemeGroupVersion.WithKind("EventPolicy"): &eventingv1alpha1.EventPolicy{},
eventingv1alpha1.SchemeGroupVersion.WithKind("EventPolicy"): &eventingv1alpha1.EventPolicy{},
eventingv1alpha1.SchemeGroupVersion.WithKind("EventTransform"): &eventingv1alpha1.EventTransform{},
// v1beta1
eventingv1beta1.SchemeGroupVersion.WithKind("EventType"): &eventingv1beta1.EventType{},
// v1beta2
Expand Down
3 changes: 3 additions & 0 deletions config/core/deployments/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ spec:
fieldRef:
fieldPath: metadata.name

- name: EVENT_TRANSFORM_JSONATA_IMAGE
value: quay.io/pierdipi/jsonata-transform@sha256:6aafed0012b8a3c24a1cd72fc4521981ec7680cf64988c22ded0aafa2a431c93


## Adapter settings
# - name: K_LOGGING_CONFIG
Expand Down
12 changes: 6 additions & 6 deletions docs/eventing-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -2858,7 +2858,7 @@ back to the broker.</p>
</tr>
<tr>
<td>
<code>Transformations</code><br/>
<code>EventTransformations</code><br/>
<em>
<a href="#eventing.knative.dev/v1alpha1.EventTransformations">
EventTransformations
Expand All @@ -2867,9 +2867,9 @@ EventTransformations
</td>
<td>
<p>
(Members of <code>Transformations</code> are embedded into this type.)
(Members of <code>EventTransformations</code> are embedded into this type.)
</p>
<p>Transformations contain all possible transformations, only one &ldquo;type&rdquo; can be used.</p>
<p>EventTransformations contain all possible transformations, only one &ldquo;type&rdquo; can be used.</p>
</td>
</tr>
</table>
Expand Down Expand Up @@ -3421,7 +3421,7 @@ back to the broker.</p>
</tr>
<tr>
<td>
<code>Transformations</code><br/>
<code>EventTransformations</code><br/>
<em>
<a href="#eventing.knative.dev/v1alpha1.EventTransformations">
EventTransformations
Expand All @@ -3430,9 +3430,9 @@ EventTransformations
</td>
<td>
<p>
(Members of <code>Transformations</code> are embedded into this type.)
(Members of <code>EventTransformations</code> are embedded into this type.)
</p>
<p>Transformations contain all possible transformations, only one &ldquo;type&rdquo; can be used.</p>
<p>EventTransformations contain all possible transformations, only one &ldquo;type&rdquo; can be used.</p>
</td>
</tr>
</tbody>
Expand Down
24 changes: 24 additions & 0 deletions pkg/apis/eventing/v1alpha1/eventtransform_defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
Copyright 2025 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
"context"
)

func (t *EventTransform) SetDefaults(_ context.Context) {
}
7 changes: 6 additions & 1 deletion pkg/apis/eventing/v1alpha1/eventtransform_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ const (
TransformConditionAddressable apis.ConditionType = "Addressable"
TransformationConditionReady apis.ConditionType = "TransformationReady"

TransformationAddressableEmptyURL string = "NoURL"
TransformationAddressableEmptyURL string = "NoURL"
TransformationAddressableWaitingForServiceEndpoints string = "WaitingForServiceEndpoints"

// Specific transformations conditions

Expand Down Expand Up @@ -159,6 +160,10 @@ func (ts *EventTransformStatus) propagateTransformationConditionStatus(cond *api
}
}

func (ts *EventTransformStatus) MarkWaitingForServiceEndpoints() {
ts.GetConditionSet().Manage(ts).MarkFalse(TransformConditionAddressable, TransformationAddressableWaitingForServiceEndpoints, "URL is empty")
}

func (ts *EventTransformStatus) SetAddresses(addresses ...duckv1.Addressable) {
if len(addresses) == 0 || addresses[0].URL.IsEmpty() {
ts.GetConditionSet().Manage(ts).MarkFalse(TransformConditionAddressable, TransformationAddressableEmptyURL, "URL is empty")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestFullLifecycle(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{},
Spec: EventTransformSpec{
Sink: nil,
Transformations: EventTransformations{
EventTransformations: EventTransformations{
Jsonata: &JsonataEventTransformationSpec{
Expression: `
{
Expand Down
5 changes: 3 additions & 2 deletions pkg/apis/eventing/v1alpha1/eventtransform_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type EventTransform struct {
var (
// Check that EventTransform can be validated, can be defaulted, and has immutable fields.
_ apis.Validatable = (*EventTransform)(nil)
_ apis.Defaultable = (*EventTransform)(nil)

// Check that EventTransform can return its spec untyped.
_ apis.HasSpec = (*EventTransform)(nil)
Expand All @@ -73,8 +74,8 @@ type EventTransformSpec struct {
// +optional
Sink *duckv1.Destination `json:"sink,omitempty"`

// Transformations contain all possible transformations, only one "type" can be used.
Transformations EventTransformations `json:",inline"`
// EventTransformations contain all possible transformations, only one "type" can be used.
EventTransformations `json:",inline"`
}

type EventTransformations struct {
Expand Down
6 changes: 3 additions & 3 deletions pkg/apis/eventing/v1alpha1/eventtransform_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (ts *EventTransformSpec) Validate(ctx context.Context) *apis.FieldError {
// These are transformations field paths.
transformations := make([]string, 0, 2)

if ts.Transformations.Jsonata != nil {
if ts.EventTransformations.Jsonata != nil {
transformations = append(transformations, "jsonata")
}

Expand All @@ -46,7 +46,7 @@ func (ts *EventTransformSpec) Validate(ctx context.Context) *apis.FieldError {
errs = errs.Also(apis.ErrMultipleOneOf(transformations...))
}

errs = errs.Also(ts.Transformations.Jsonata.Validate(ctx).ViaField("jsonata"))
errs = errs.Also(ts.EventTransformations.Jsonata.Validate(ctx).ViaField("jsonata"))
errs = errs.Also(ts.Sink.Validate(ctx).ViaField("sink"))

if apis.IsInUpdate(ctx) {
Expand All @@ -71,7 +71,7 @@ func (in *EventTransformSpec) CheckImmutableFields(_ context.Context, original *

var errs *apis.FieldError

if original.Spec.Transformations.Jsonata != nil && in.Transformations.Jsonata == nil {
if original.Spec.EventTransformations.Jsonata != nil && in.EventTransformations.Jsonata == nil {
errs = errs.Also(apis.ErrGeneric("transformations types are immutable, jsonata transformation cannot be changed to a different transformation type"))
}

Expand Down
46 changes: 46 additions & 0 deletions pkg/apis/eventing/v1alpha1/eventtransform_validation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
Copyright 2025 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
"testing"

"github.com/stretchr/testify/assert"
"knative.dev/pkg/webhook/json"
)

func TestJSONDecode(t *testing.T) {

et := &EventTransform{}

err := json.Decode([]byte(`
{
"apiVersion": "eventing.knative.dev/v1alpha1",
"kind": "EventTransform",
"metadata": {
"name": "identity"
},
"spec": {
"jsonata": {
"expression": "{\n \"specversion\": \"1.0\",\n \"id\": id,\n \"type\": \"transformation.jsonata\",\n \"source\": \"transformation.json.identity\",\n \"data\": $\n}\n"
}
}
}
`), et, true)

assert.Nil(t, err)
}
2 changes: 1 addition & 1 deletion pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions pkg/reconciler/eventtransform/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package eventtransform
import (
"context"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
kubeclient "knative.dev/pkg/client/injection/kube/client"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/filtered"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered"
serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service/filtered"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/kmeta"
Expand All @@ -48,6 +51,17 @@ func NewController(
jsonataConfigMapInformer := configmapinformer.Get(ctx, JsonataResourcesSelector)
jsonataDeploymentInformer := deploymentinformer.Get(ctx, JsonataResourcesSelector)
jsonataSinkBindingInformer := sinkbindinginformer.Get(ctx, JsonataResourcesSelector)
jsonataServiceInformer := serviceinformer.Get(ctx, JsonataResourcesSelector)

// Create a custom informer as one in knative/pkg doesn't exist for endpoints.
factory := informers.NewSharedInformerFactoryWithOptions(
kubeclient.Get(ctx),
controller.DefaultResyncPeriod,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = JsonataResourcesSelector
}),
)
jsonataEndpointInformer := factory.Core().V1().Endpoints()

var globalResync func()

Expand All @@ -63,6 +77,8 @@ func NewController(
client: eventingclient.Get(ctx),
jsonataConfigMapLister: jsonataConfigMapInformer.Lister(),
jsonataDeploymentsLister: jsonataDeploymentInformer.Lister(),
jsonataServiceLister: jsonataServiceInformer.Lister(),
jsonataEndpointLister: jsonataEndpointInformer.Lister(),
jsonataSinkBindingLister: jsonataSinkBindingInformer.Lister(),
}

Expand All @@ -79,9 +95,15 @@ func NewController(
eventTransformInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))

jsonataDeploymentInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl)))
jsonataServiceInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl)))
jsonataEndpointInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl)))
jsonataConfigMapInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl)))
jsonataSinkBindingInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl)))

// Start the factory after creating all necessary informers.
factory.Start(ctx.Done())
factory.WaitForCacheSync(ctx.Done())

return impl
}

Expand Down
Loading

0 comments on commit 75195a5

Please sign in to comment.