Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EventTransform Jsonata reconciler and CRD #8456

Merged
merged 7 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ import (
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
"knative.dev/pkg/signals"

eventingfilteredfactory "knative.dev/eventing/pkg/client/injection/informers/factory/filtered"

"knative.dev/eventing/pkg/apis/sinks"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/eventing/pkg/reconciler/eventpolicy"
"knative.dev/eventing/pkg/reconciler/eventtransform"
"knative.dev/eventing/pkg/reconciler/jobsink"

"knative.dev/eventing/pkg/reconciler/apiserversource"
Expand All @@ -53,6 +56,11 @@ func main() {
auth.OIDCLabelSelector,
eventingtls.TrustBundleLabelSelector,
sinks.JobSinkJobsLabelSelector,
eventtransform.JsonataResourcesSelector,
)

ctx = eventingfilteredfactory.WithSelectors(ctx,
eventtransform.JsonataResourcesSelector,
)

sharedmain.MainWithContext(ctx, "controller",
Expand Down Expand Up @@ -84,5 +92,8 @@ func main() {
// Sugar
sugarnamespace.NewController,
sugartrigger.NewController,

// Transform
eventtransform.NewController,
)
}
1 change: 1 addition & 0 deletions cmd/schema/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func main() {
registry.Register(&flowsv1.Sequence{})
registry.Register(&flowsv1.Parallel{})
registry.Register(&eventingv1alpha1.EventPolicy{})
registry.Register(&eventingv1alpha1.EventTransform{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this used to generate the actual CRD?

Asking because other new CRDs (e.g. JobSink or Integration source/sink are missing.
(I can add them in a separate PR)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partially, this is to generate the schema part of the CRD

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it's only for bootstrapping it to give an head start

go run ./cmd/schema/ dump EventTransform


if err := commands.New("knative.dev/eventing").Execute(); err != nil {
log.Fatal("Error during command execution: ", err)
Expand Down
1 change: 1 addition & 0 deletions config/300-eventtransform.yaml
259 changes: 259 additions & 0 deletions config/core/resources/eventtransform.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
# Copyright 2025 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: eventtransforms.eventing.knative.dev
labels:
knative.dev/crd-install: "true"
app.kubernetes.io/version: devel
app.kubernetes.io/name: knative-eventing
spec:
group: eventing.knative.dev
versions:
- name: v1alpha1
served: true
storage: true
subresources:
status: { }
schema:
openAPIV3Schema:
type: object
properties:
spec:
description: Spec defines the desired state of the EventTransform.
type: object
properties:
jsonata:
type: object
properties:
expression:
description: Expression is the JSONata expression.
type: string
sink:
description: 'Sink is a reference to an object that will resolve to a uri to use as the sink. If not present, the transformation will send back the transformed event as response, this is useful to leverage the built-in Broker reply feature to re-publish a transformed event back to the broker. '
type: object
properties:
CACerts:
description: CACerts are Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. If set, these CAs are appended to the set of CAs provided by the Addressable target, if any.
type: string
audience:
description: Audience is the OIDC audience. This need only be set, if the target is not an Addressable and thus the Audience can't be received from the Addressable itself. In case the Addressable specifies an Audience too, the Destinations Audience takes preference.
type: string
ref:
description: Ref points to an Addressable.
type: object
properties:
address:
description: Address points to a specific Address Name.
type: string
apiVersion:
description: API version of the referent.
type: string
group:
description: 'Group of the API, without the version of the group. This can be used as an alternative to the APIVersion, and then resolved using ResolveGroup. Note: This API is EXPERIMENTAL and might break anytime. For more details: https://github.com/knative/eventing/issues/5086'
type: string
kind:
description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
namespace:
description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ This is optional field, it gets defaulted to the object holding it if left out.'
type: string
uri:
description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref.
type: string
status:
description: Status represents the current state of the EventTransform. This data may be out of date.
type: object
properties:
address:
description: Address is a single Addressable address. If Addresses is present, Address will be ignored by clients.
type: object
required:
- url
properties:
CACerts:
description: CACerts is the Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
type: string
audience:
description: Audience is the OIDC audience for this address.
type: string
name:
description: Name is the name of the address.
type: string
url:
type: string
addresses:
description: Addresses is a list of addresses for different protocols (HTTP and HTTPS) If Addresses is present, Address must be ignored by clients.
type: array
items:
type: object
required:
- url
properties:
CACerts:
description: CACerts is the Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
type: string
audience:
description: Audience is the OIDC audience for this address.
type: string
name:
description: Name is the name of the address.
type: string
url:
type: string
annotations:
description: Annotations is additional Status fields for the Resource to save some additional State as well as convey more information to the user. This is roughly akin to Annotations on any k8s resource, just the reconciler conveying richer information outwards.
type: object
x-kubernetes-preserve-unknown-fields: true
auth:
description: Auth defines the attributes that provide the generated service account name in the resource status.
type: object
required:
- serviceAccountName
properties:
serviceAccountName:
description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication.
type: string
serviceAccountNames:
description: ServiceAccountNames is the list of names of the generated service accounts used for this components OIDC authentication. This list can have len() > 1, when the component uses multiple identities (e.g. in case of a Parallel).
type: array
items:
type: string
conditions:
description: Conditions the latest available observations of a resource's current state.
type: array
items:
type: object
required:
- type
- status
properties:
lastTransitionTime:
description: LastTransitionTime is the last time the condition transitioned from one status to another. We use VolatileTime in place of metav1.Time to exclude this from creating equality.Semantic differences (all other things held constant).
type: string
message:
description: A human readable message indicating details about the transition.
type: string
reason:
description: The reason for the condition's last transition.
type: string
severity:
description: Severity with which to treat failures of this type of condition. When this is not specified, it defaults to Error.
type: string
status:
description: Status of the condition, one of True, False, Unknown.
type: string
type:
description: Type of condition.
type: string
jsonata:
description: JsonataTransformationStatus is the status associated with JsonataEventTransformationSpec.
type: object
properties:
deployment:
type: object
properties:
availableReplicas:
description: Total number of available pods (ready for at least minReadySeconds) targeted by this deployment.
type: integer
format: int32
collisionCount:
description: Count of hash collisions for the Deployment. The Deployment controller uses this field as a collision avoidance mechanism when it needs to create the name for the newest ReplicaSet.
type: integer
format: int32
conditions:
description: Represents the latest available observations of a deployment's current state.
type: array
items:
type: object
properties:
lastTransitionTime:
description: Last time the condition transitioned from one status to another.
type: string
lastUpdateTime:
description: The last time this condition was updated.
type: string
message:
description: A human readable message indicating details about the transition.
type: string
reason:
description: The reason for the condition's last transition.
type: string
status:
description: Status of the condition, one of True, False, Unknown.
type: string
type:
description: Type of deployment condition.
type: string
observedGeneration:
description: The generation observed by the deployment controller.
type: integer
format: int64
readyReplicas:
description: readyReplicas is the number of pods targeted by this Deployment with a Ready Condition.
type: integer
format: int32
replicas:
description: Total number of non-terminated pods targeted by this deployment (their labels match the selector).
type: integer
format: int32
unavailableReplicas:
description: Total number of unavailable pods targeted by this deployment. This is the total number of pods that are still required for the deployment to have 100% available capacity. They may either be pods that are running but not yet available or pods that still have not been created.
type: integer
format: int32
updatedReplicas:
description: Total number of non-terminated pods targeted by this deployment that have the desired template spec.
type: integer
format: int32
observedGeneration:
description: ObservedGeneration is the 'Generation' of the Service that was last processed by the controller.
type: integer
format: int64
sinkAudience:
description: SinkAudience is the OIDC audience of the sink.
type: string
sinkCACerts:
description: SinkCACerts are Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
type: string
sinkUri:
description: SinkURI is the current active sink URI that has been configured for the Source.
type: string

additionalPrinterColumns:
- name: URL
type: string
jsonPath: ".status.address.url"
- name: Sink
type: string
jsonPath: ".status.sinkUri"
- name: Ready
type: string
jsonPath: ".status.conditions[?(@.type==\"Ready\")].status"
- name: Reason
type: string
jsonPath: ".status.conditions[?(@.type==\"Ready\")].reason"
names:
kind: EventTransform
plural: eventtransforms
singular: eventtransform
categories:
- all
- knative
- eventing
scope: Namespaced
22 changes: 22 additions & 0 deletions config/core/roles/addressable-resolvers-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,25 @@ rules:
- get
- list
- watch

---

kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: eventtransforms-addressable-resolver
labels:
duck.knative.dev/addressable: "true"
app.kubernetes.io/version: devel
app.kubernetes.io/name: knative-eventing
# Do not use this role directly. These rules will be added to the "addressable-resolver" role.
rules:
- apiGroups:
- eventing.knative.dev
resources:
- eventtransforms
- eventtransforms/status
verbs:
- get
- list
- watch
2 changes: 2 additions & 0 deletions config/core/roles/controller-clusterroles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ rules:
- "eventtypes/status"
- "eventpolicies"
- "eventpolicies/status"
- "eventtransforms"
- "eventtransforms/status"
verbs:
- "get"
- "list"
Expand Down
Loading
Loading