diff --git a/README.md b/README.md index b8a80a9a..29742e25 100644 --- a/README.md +++ b/README.md @@ -108,6 +108,50 @@ make docker-build docker-push IMG=/operator:tag And it is required to have access to pull the image from the working environment. Make sure you have the proper permission to the registry if the above commands don’t work. +**Enable audit for a stack** + +```sh +cat < + # Used for consumer group + queriedBy: admin +--- +apiVersion: formance.com/v1beta1 +kind: Benthos +metadata: + name: benthos +spec: + stack: +--- +# Create a stream +apiVersion: formance.com/v1beta1 +kind: BenthosStream +metadata: + name: audit-stream +spec: + name: audit + stack: + data: | + input: + event_bus: + topic: gateway + consumer_group: audit + pipeline: {} + output: + label: "debug" + stdout: + codec: lines +EOF +``` + **Install the CRDs into the cluster:** ```sh diff --git a/api/formance.com/v1beta1/benthos_types.go b/api/formance.com/v1beta1/benthos_types.go index edeb7208..4c00c2ce 100644 --- a/api/formance.com/v1beta1/benthos_types.go +++ b/api/formance.com/v1beta1/benthos_types.go @@ -21,24 +21,16 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// Batching allow to define custom batching configuration -type Batching struct { - // Count indicates the number of messages that can be kept in memory before being flushed to ElasticSearch - Count int `json:"count"` - // Period indicates the maximum duration messages can be kept in memory before being flushed to ElasticSearch - Period string `json:"period"` -} - type BenthosSpec struct { StackDependency `json:",inline"` DevProperties `json:",inline"` //+optional ResourceProperties *corev1.ResourceRequirements `json:"resourceRequirements,omitempty"` //+optional - Batching *Batching `json:"batching,omitempty"` - //+optional InitContainers []corev1.Container `json:"initContainers"` ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"` + Resources map[string]string `json:"resources,omitempty"` + Templates map[string]string `json:"templates,omitempty"` } type BenthosStatus struct { diff --git a/api/formance.com/v1beta1/benthosstream_types.go b/api/formance.com/v1beta1/benthosstream_types.go index 3c10c34e..f83af066 100644 --- a/api/formance.com/v1beta1/benthosstream_types.go +++ b/api/formance.com/v1beta1/benthosstream_types.go @@ -27,7 +27,8 @@ type BenthosStreamSpec struct { } type BenthosStreamStatus struct { - Status `json:",inline"` + Status `json:",inline"` + ConfigMapHash string `json:"configMapHash,omitempty"` } //+kubebuilder:object:root=true diff --git a/api/formance.com/v1beta1/ledger_types.go b/api/formance.com/v1beta1/ledger_types.go index cbd80deb..3062ccc4 100644 --- a/api/formance.com/v1beta1/ledger_types.go +++ b/api/formance.com/v1beta1/ledger_types.go @@ -17,54 +17,14 @@ limitations under the License. package v1beta1 import ( - "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -type LockingStrategyRedisConfig struct { - Uri string `json:"uri,omitempty"` - // +optional - // +kubebuilder:default:=false - TLS bool `json:"tls"` - // +optional - // +kubebuilder:default:=false - InsecureTLS bool `json:"insecure,omitempty"` - // +optional - Duration time.Duration `json:"duration,omitempty"` - // +optional - Retry time.Duration `json:"retry,omitempty"` -} - -type LockingStrategy struct { - // +kubebuilder:Enum:={memory,redis} - // +kubebuilder:default:=memory - // +optional - Strategy string `json:"strategy,omitempty"` - // +optional - Redis *LockingStrategyRedisConfig `json:"redis"` -} - -type DeploymentStrategy string - -const ( - DeploymentStrategySingle = "single" - DeploymentStrategyMonoWriterMultipleReader = "single-writer" -) - type LedgerSpec struct { ModuleProperties `json:",inline"` StackDependency `json:",inline"` // +optional Auth *AuthConfig `json:"auth,omitempty"` - //+kubebuilder:Enum:={single, single-writer} - //+kubebuilder:default:=single - //+optional - // Deprecated. - DeploymentStrategy DeploymentStrategy `json:"deploymentStrategy,omitempty"` - // Locking is intended for ledger v1 only - //+optional - Locking *LockingStrategy `json:"locking,omitempty"` } type LedgerStatus struct { @@ -73,18 +33,8 @@ type LedgerStatus struct { // Ledger is the module allowing to install a ledger instance. // -// The ledger is actually a stateful application on the writer part. -// So we cannot scale the ledger as we want without prior configuration. -// -// So, the ledger can run in two modes : -// * single instance: Only one instance will be deployed. We cannot scale in that mode. -// * single writer / multiple reader: In this mode, we will have a single writer and multiple readers if needed. -// -// Use setting `ledger.deployment-strategy` with either the value : -// - single : For the single instance mode. -// - single-writer: For the single writer / multiple reader mode. -// Under the hood, the operator create two deployments and force the scaling of the writer to stay at 1. -// Then you can scale the deployment of the reader to the value you want. +// The ledger is a stateful application that manages financial transactions +// and maintains an immutable audit trail. // // +kubebuilder:object:root=true // +kubebuilder:subresource:status diff --git a/api/formance.com/v1beta1/search_types.go b/api/formance.com/v1beta1/search_types.go index 28c96e11..880d99f9 100644 --- a/api/formance.com/v1beta1/search_types.go +++ b/api/formance.com/v1beta1/search_types.go @@ -23,8 +23,6 @@ import ( type SearchSpec struct { StackDependency `json:",inline"` ModuleProperties `json:",inline"` - //+optional - Batching *Batching `json:"batching,omitempty"` // +optional Auth *AuthConfig `json:"auth,omitempty"` } @@ -33,10 +31,6 @@ type SearchStatus struct { Status `json:",inline"` //+optional ElasticSearchURI *URI `json:"elasticSearchURI,omitempty"` - // TopicCleaned is used to flag stacks where the topics have been cleaned (still search-ledgerv2 and co consumers) - //+optional - // +kubebuilder:default:=false - TopicCleaned bool `json:"topicCleaned,omitempty"` } // +kubebuilder:object:root=true diff --git a/api/formance.com/v1beta1/stack_types.go b/api/formance.com/v1beta1/stack_types.go index e355985e..5484c3fb 100644 --- a/api/formance.com/v1beta1/stack_types.go +++ b/api/formance.com/v1beta1/stack_types.go @@ -35,6 +35,7 @@ type StackSpec struct { // +kubebuilder:default:=false // EnableAudit enable audit at the stack level. // Actually, it enables audit on [Gateway](#gateway) + // deprecated EnableAudit bool `json:"enableAudit,omitempty"` // +optional // +kubebuilder:default:=false diff --git a/api/formance.com/v1beta1/zz_generated.deepcopy.go b/api/formance.com/v1beta1/zz_generated.deepcopy.go index 40751643..9c899c31 100644 --- a/api/formance.com/v1beta1/zz_generated.deepcopy.go +++ b/api/formance.com/v1beta1/zz_generated.deepcopy.go @@ -259,21 +259,6 @@ func (in *AuthStatus) DeepCopy() *AuthStatus { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *Batching) DeepCopyInto(out *Batching) { - *out = *in -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Batching. -func (in *Batching) DeepCopy() *Batching { - if in == nil { - return nil - } - out := new(Batching) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Benthos) DeepCopyInto(out *Benthos) { *out = *in @@ -343,11 +328,6 @@ func (in *BenthosSpec) DeepCopyInto(out *BenthosSpec) { *out = new(v1.ResourceRequirements) (*in).DeepCopyInto(*out) } - if in.Batching != nil { - in, out := &in.Batching, &out.Batching - *out = new(Batching) - **out = **in - } if in.InitContainers != nil { in, out := &in.InitContainers, &out.InitContainers *out = make([]v1.Container, len(*in)) @@ -360,6 +340,20 @@ func (in *BenthosSpec) DeepCopyInto(out *BenthosSpec) { *out = make([]v1.LocalObjectReference, len(*in)) copy(*out, *in) } + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.Templates != nil { + in, out := &in.Templates, &out.Templates + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BenthosSpec. @@ -1273,11 +1267,6 @@ func (in *LedgerSpec) DeepCopyInto(out *LedgerSpec) { *out = new(AuthConfig) **out = **in } - if in.Locking != nil { - in, out := &in.Locking, &out.Locking - *out = new(LockingStrategy) - (*in).DeepCopyInto(*out) - } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LedgerSpec. @@ -1306,41 +1295,6 @@ func (in *LedgerStatus) DeepCopy() *LedgerStatus { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *LockingStrategy) DeepCopyInto(out *LockingStrategy) { - *out = *in - if in.Redis != nil { - in, out := &in.Redis, &out.Redis - *out = new(LockingStrategyRedisConfig) - **out = **in - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LockingStrategy. -func (in *LockingStrategy) DeepCopy() *LockingStrategy { - if in == nil { - return nil - } - out := new(LockingStrategy) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *LockingStrategyRedisConfig) DeepCopyInto(out *LockingStrategyRedisConfig) { - *out = *in -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LockingStrategyRedisConfig. -func (in *LockingStrategyRedisConfig) DeepCopy() *LockingStrategyRedisConfig { - if in == nil { - return nil - } - out := new(LockingStrategyRedisConfig) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ModuleProperties) DeepCopyInto(out *ModuleProperties) { *out = *in @@ -1812,11 +1766,6 @@ func (in *SearchSpec) DeepCopyInto(out *SearchSpec) { *out = *in out.StackDependency = in.StackDependency out.ModuleProperties = in.ModuleProperties - if in.Batching != nil { - in, out := &in.Batching, &out.Batching - *out = new(Batching) - **out = **in - } if in.Auth != nil { in, out := &in.Auth, &out.Auth *out = new(AuthConfig) diff --git a/config/crd/bases/formance.com_benthos.yaml b/config/crd/bases/formance.com_benthos.yaml index 207c3c4d..d5143a1f 100644 --- a/config/crd/bases/formance.com_benthos.yaml +++ b/config/crd/bases/formance.com_benthos.yaml @@ -51,21 +51,6 @@ spec: type: object spec: properties: - batching: - description: Batching allow to define custom batching configuration - properties: - count: - description: Count indicates the number of messages that can be - kept in memory before being flushed to ElasticSearch - type: integer - period: - description: Period indicates the maximum duration messages can - be kept in memory before being flushed to ElasticSearch - type: string - required: - - count - - period - type: object debug: default: false description: Allow to enable debug mode on the module @@ -1480,9 +1465,17 @@ spec: More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ type: object type: object + resources: + additionalProperties: + type: string + type: object stack: description: Stack indicates the stack on which the module is installed type: string + templates: + additionalProperties: + type: string + type: object type: object status: properties: diff --git a/config/crd/bases/formance.com_benthosstreams.yaml b/config/crd/bases/formance.com_benthosstreams.yaml index f6ca538f..d1d9fcc7 100644 --- a/config/crd/bases/formance.com_benthosstreams.yaml +++ b/config/crd/bases/formance.com_benthosstreams.yaml @@ -123,6 +123,8 @@ spec: - type type: object type: array + configMapHash: + type: string info: description: Info can contain any additional like reconciliation errors type: string diff --git a/config/crd/bases/formance.com_ledgers.yaml b/config/crd/bases/formance.com_ledgers.yaml index f54870df..60dff8bf 100644 --- a/config/crd/bases/formance.com_ledgers.yaml +++ b/config/crd/bases/formance.com_ledgers.yaml @@ -40,20 +40,8 @@ spec: Ledger is the module allowing to install a ledger instance. - The ledger is actually a stateful application on the writer part. - So we cannot scale the ledger as we want without prior configuration. - - - So, the ledger can run in two modes : - * single instance: Only one instance will be deployed. We cannot scale in that mode. - * single writer / multiple reader: In this mode, we will have a single writer and multiple readers if needed. - - - Use setting `ledger.deployment-strategy` with either the value : - - single : For the single instance mode. - - single-writer: For the single writer / multiple reader mode. - Under the hood, the operator create two deployments and force the scaling of the writer to stay at 1. - Then you can scale the deployment of the reader to the value you want. + The ledger is a stateful application that manages financial transactions + and maintains an immutable audit trail. properties: apiVersion: description: |- @@ -85,48 +73,12 @@ spec: default: false description: Allow to enable debug mode on the module type: boolean - deploymentStrategy: - default: single - description: Deprecated. - type: string dev: default: false description: |- Allow to enable dev mode on the module Dev mode is used to allow some application to do custom setup in development mode (allow insecure certificates for example) type: boolean - locking: - description: Locking is intended for ledger v1 only - properties: - redis: - properties: - duration: - description: |- - A Duration represents the elapsed time between two instants - as an int64 nanosecond count. The representation limits the - largest representable duration to approximately 290 years. - format: int64 - type: integer - insecure: - default: false - type: boolean - retry: - description: |- - A Duration represents the elapsed time between two instants - as an int64 nanosecond count. The representation limits the - largest representable duration to approximately 290 years. - format: int64 - type: integer - tls: - default: false - type: boolean - uri: - type: string - type: object - strategy: - default: memory - type: string - type: object stack: description: Stack indicates the stack on which the module is installed type: string diff --git a/config/crd/bases/formance.com_searches.yaml b/config/crd/bases/formance.com_searches.yaml index 0371d2e1..d664ffa1 100644 --- a/config/crd/bases/formance.com_searches.yaml +++ b/config/crd/bases/formance.com_searches.yaml @@ -65,21 +65,6 @@ spec: readKeySetMaxRetries: type: integer type: object - batching: - description: Batching allow to define custom batching configuration - properties: - count: - description: Count indicates the number of messages that can be - kept in memory before being flushed to ElasticSearch - type: integer - period: - description: Period indicates the maximum duration messages can - be kept in memory before being flushed to ElasticSearch - type: string - required: - - count - - period - type: object debug: default: false description: Allow to enable debug mode on the module @@ -176,11 +161,6 @@ spec: description: Ready indicates if the resource is seen as completely reconciled type: boolean - topicCleaned: - default: false - description: TopicCleaned is used to flag stacks where the topics - have been cleaned (still search-ledgerv2 and co consumers) - type: boolean type: object type: object served: true diff --git a/config/crd/bases/formance.com_stacks.yaml b/config/crd/bases/formance.com_stacks.yaml index c08deb3c..4e8e3b5b 100644 --- a/config/crd/bases/formance.com_stacks.yaml +++ b/config/crd/bases/formance.com_stacks.yaml @@ -107,6 +107,7 @@ spec: description: |- EnableAudit enable audit at the stack level. Actually, it enables audit on [Gateway](#gateway) + deprecated type: boolean version: description: |- diff --git a/docs/09-Configuration reference/01-Settings.md b/docs/09-Configuration reference/01-Settings.md index 89e01847..a43f8522 100644 --- a/docs/09-Configuration reference/01-Settings.md +++ b/docs/09-Configuration reference/01-Settings.md @@ -20,7 +20,6 @@ While we have some basic types (string, number, bool ...), we also have some com | opentelemetry.traces.dsn | URI | | OpenTelemetry collector URI | | opentelemetry.traces.resource-attributes | Map | key1=value1,key2=value2 | Opentelemetry additional resource attributes | | clear-database | bool | true | Whether to remove databases on stack deletion | -| ledger.deployment-strategy | string | single | Ledger deployment type | | ledger.logs.max-batch-size | Int | 1024 | Ledger logs batching max size | | ledger.api.bulk-max-size | Int | 100 | Max bulk size | | ledger.api.default-page-size | Int | | Default api page size | @@ -46,7 +45,6 @@ While we have some basic types (string, number, bool ...), we also have some com | jobs.``.containers.``.run-as | Map | user=X, group=X | Configure the security context for containers in jobs by specifying the user and group IDs to run as | | registries.``.endpoint | string | example.com?pullSecret=foo | Specify a custom endpoint for a specific docker repository | | registries.``.images.``.rewrite | string | formancehq/example | Allow to rewrite the image path | -| search.batching | Map | period=1s, count=10 | Override default batching parameters | | services.``.annotations | Map | | Allow to specify custom annotations to apply on created k8s services | | gateway.ingress.annotations | Map | | Allow to specify custom annotations to apply on the gateway ingress | | gateway.ingress.labels | Map | | Allow to specify custom labels to apply on the gateways ingress | diff --git a/docs/09-Configuration reference/02-Custom Resource Definitions.md b/docs/09-Configuration reference/02-Custom Resource Definitions.md index cdd6899b..0d9227ff 100644 --- a/docs/09-Configuration reference/02-Custom Resource Definitions.md +++ b/docs/09-Configuration reference/02-Custom Resource Definitions.md @@ -116,7 +116,7 @@ If `versions` and `versionsFromFile` are not specified, "latest" will be used. | `dev` _boolean_ | Allow to enable dev mode on the module
Dev mode is used to allow some application to do custom setup in development mode (allow insecure certificates for example) | false | | | `version` _string_ | Version allow to specify the version of the components
Must be a valid docker tag | | | | `versionsFromFile` _string_ | VersionsFromFile allow to specify a formance.com/Versions object which contains individual versions
for each component.
Must reference a valid formance.com/Versions object | | | -| `enableAudit` _boolean_ | EnableAudit enable audit at the stack level.
Actually, it enables audit on [Gateway](#gateway) | false | | +| `enableAudit` _boolean_ | EnableAudit enable audit at the stack level.
Actually, it enables audit on [Gateway](#gateway)
deprecated | false | | | `disabled` _boolean_ | Disabled indicate the stack is disabled.
A disabled stack disable everything
It just keeps the namespace and the [Database](#database) resources. | false | | @@ -707,20 +707,8 @@ Gateway is the Schema for the gateways API Ledger is the module allowing to install a ledger instance. -The ledger is actually a stateful application on the writer part. -So we cannot scale the ledger as we want without prior configuration. - - -So, the ledger can run in two modes : -* single instance: Only one instance will be deployed. We cannot scale in that mode. -* single writer / multiple reader: In this mode, we will have a single writer and multiple readers if needed. - - -Use setting `ledger.deployment-strategy` with either the value : - - single : For the single instance mode. - - single-writer: For the single writer / multiple reader mode. - Under the hood, the operator create two deployments and force the scaling of the writer to stay at 1. - Then you can scale the deployment of the reader to the value you want. +The ledger is a stateful application that manages financial transactions +and maintains an immutable audit trail. @@ -772,82 +760,6 @@ Use setting `ledger.deployment-strategy` with either the value : | `dev` _boolean_ | Allow to enable dev mode on the module
Dev mode is used to allow some application to do custom setup in development mode (allow insecure certificates for example) | false | | | `version` _string_ | Version allow to override global version defined at stack level for a specific module | | | | `stack` _string_ | Stack indicates the stack on which the module is installed | | | -| `deploymentStrategy` _[DeploymentStrategy](#deploymentstrategy)_ | Deprecated. | single | | -| `locking` _[LockingStrategy](#lockingstrategy)_ | Locking is intended for ledger v1 only | | | - -###### DeploymentStrategy - -_Underlying type:_ _string_ - - - - - - - - - - - - - - - - - - -###### LockingStrategy - - - - - - - - - - - - - - - - - - - -| Field | Description | Default | Validation | -| --- | --- | --- | --- | -| `strategy` _string_ | | memory | | -| `redis` _[LockingStrategyRedisConfig](#lockingstrategyredisconfig)_ | | | | - -###### LockingStrategyRedisConfig - - - - - - - - - - - - - - - - - - - -| Field | Description | Default | Validation | -| --- | --- | --- | --- | -| `uri` _string_ | | | | -| `tls` _boolean_ | | false | | -| `insecure` _boolean_ | | false | | -| `duration` _string_ | | | | -| `retry` _string_ | | | | @@ -1198,32 +1110,6 @@ Search is the Schema for the searches API | `debug` _boolean_ | Allow to enable debug mode on the module | false | | | `dev` _boolean_ | Allow to enable dev mode on the module
Dev mode is used to allow some application to do custom setup in development mode (allow insecure certificates for example) | false | | | `version` _string_ | Version allow to override global version defined at stack level for a specific module | | | -| `batching` _[Batching](#batching)_ | | | | - -###### Batching - - - -Batching allow to define custom batching configuration - - - - - - - - - - - - - - - -| Field | Description | Default | Validation | -| --- | --- | --- | --- | -| `count` _integer_ | Count indicates the number of messages that can be kept in memory before being flushed to ElasticSearch | | | -| `period` _string_ | Period indicates the maximum duration messages can be kept in memory before being flushed to ElasticSearch | | | @@ -1254,7 +1140,6 @@ Batching allow to define custom batching configuration | `ready` _boolean_ | Ready indicates if the resource is seen as completely reconciled | | | | `info` _string_ | Info can contain any additional like reconciliation errors | | | | `elasticSearchURI` _string_ | | | Type: string
| -| `topicCleaned` _boolean_ | TopicCleaned is used to flag stacks where the topics have been cleaned (still search-ledgerv2 and co consumers) | false | | #### Stargate @@ -1724,34 +1609,10 @@ Benthos is the Schema for the benthos API | `debug` _boolean_ | Allow to enable debug mode on the module | false | | | `dev` _boolean_ | Allow to enable dev mode on the module
Dev mode is used to allow some application to do custom setup in development mode (allow insecure certificates for example) | false | | | `resourceRequirements` _[ResourceRequirements](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#resourcerequirements-v1-core)_ | | | | -| `batching` _[Batching](#batching)_ | | | | | `initContainers` _[Container](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#container-v1-core) array_ | | | | | `imagePullSecrets` _[LocalObjectReference](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#localobjectreference-v1-core) array_ | | | | - -###### Batching - - - -Batching allow to define custom batching configuration - - - - - - - - - - - - - - - -| Field | Description | Default | Validation | -| --- | --- | --- | --- | -| `count` _integer_ | Count indicates the number of messages that can be kept in memory before being flushed to ElasticSearch | | | -| `period` _string_ | Period indicates the maximum duration messages can be kept in memory before being flushed to ElasticSearch | | | +| `resources` _object (keys:string, values:string)_ | | | | +| `templates` _object (keys:string, values:string)_ | | | | @@ -1868,6 +1729,7 @@ BenthosStream is the Schema for the benthosstreams API | --- | --- | --- | --- | | `ready` _boolean_ | Ready indicates if the resource is seen as completely reconciled | | | | `info` _string_ | Info can contain any additional like reconciliation errors | | | +| `configMapHash` _string_ | | | | #### Broker diff --git a/go.mod b/go.mod index 799fb65e..47d401f4 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ toolchain go1.24.9 require ( github.com/formancehq/go-libs/v2 v2.2.3 - github.com/formancehq/search v0.0.0-20240926085257-6b5288dc2576 github.com/go-logr/logr v1.4.3 github.com/google/go-cmp v0.7.0 github.com/google/uuid v1.6.0 @@ -35,7 +34,7 @@ require ( require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch/v5 v5.8.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect diff --git a/go.sum b/go.sum index af37ca5d..00920b52 100644 --- a/go.sum +++ b/go.sum @@ -15,8 +15,6 @@ github.com/evanphx/json-patch/v5 v5.8.0 h1:lRj6N9Nci7MvzrXuX6HFzU8XjmhPiXPlsKEy1 github.com/evanphx/json-patch/v5 v5.8.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= github.com/formancehq/go-libs/v2 v2.2.3 h1:7irBJ4NfSkJdVKxXPktqVAoQa7hJcejmrXgjdgh27Zk= github.com/formancehq/go-libs/v2 v2.2.3/go.mod h1:JvBjEDWNf7izCy2dq/eI3aMc9d28gChBe1rjw5yYlAs= -github.com/formancehq/search v0.0.0-20240926085257-6b5288dc2576 h1:2PuiQ5FPsyseXDs+8UoyvpS1Qgysh597ReyapnHB3Ig= -github.com/formancehq/search v0.0.0-20240926085257-6b5288dc2576/go.mod h1:vmUx4wanfOqGBZk0t2bhAXx+r+Pp9WaIA1UQQNc9Zds= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= diff --git a/helm/crds/Chart.yaml b/helm/crds/Chart.yaml index b46448f3..050b3229 100644 --- a/helm/crds/Chart.yaml +++ b/helm/crds/Chart.yaml @@ -12,9 +12,9 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: "2.8.5" +version: "3.0.0" # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: v2.8.5 +appVersion: v3.0.0 diff --git a/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_benthos.formance.com.yaml b/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_benthos.formance.com.yaml index 9e6c0059..d08da896 100644 --- a/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_benthos.formance.com.yaml +++ b/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_benthos.formance.com.yaml @@ -51,21 +51,6 @@ spec: type: object spec: properties: - batching: - description: Batching allow to define custom batching configuration - properties: - count: - description: Count indicates the number of messages that can be - kept in memory before being flushed to ElasticSearch - type: integer - period: - description: Period indicates the maximum duration messages can - be kept in memory before being flushed to ElasticSearch - type: string - required: - - count - - period - type: object debug: default: false description: Allow to enable debug mode on the module @@ -1480,9 +1465,17 @@ spec: More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ type: object type: object + resources: + additionalProperties: + type: string + type: object stack: description: Stack indicates the stack on which the module is installed type: string + templates: + additionalProperties: + type: string + type: object type: object status: properties: diff --git a/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_benthosstreams.formance.com.yaml b/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_benthosstreams.formance.com.yaml index 70d1c8d8..a82c000b 100644 --- a/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_benthosstreams.formance.com.yaml +++ b/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_benthosstreams.formance.com.yaml @@ -123,6 +123,8 @@ spec: - type type: object type: array + configMapHash: + type: string info: description: Info can contain any additional like reconciliation errors type: string diff --git a/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_ledgers.formance.com.yaml b/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_ledgers.formance.com.yaml index ae2478c4..b06c46bd 100644 --- a/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_ledgers.formance.com.yaml +++ b/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_ledgers.formance.com.yaml @@ -40,20 +40,8 @@ spec: Ledger is the module allowing to install a ledger instance. - The ledger is actually a stateful application on the writer part. - So we cannot scale the ledger as we want without prior configuration. - - - So, the ledger can run in two modes : - * single instance: Only one instance will be deployed. We cannot scale in that mode. - * single writer / multiple reader: In this mode, we will have a single writer and multiple readers if needed. - - - Use setting `ledger.deployment-strategy` with either the value : - - single : For the single instance mode. - - single-writer: For the single writer / multiple reader mode. - Under the hood, the operator create two deployments and force the scaling of the writer to stay at 1. - Then you can scale the deployment of the reader to the value you want. + The ledger is a stateful application that manages financial transactions + and maintains an immutable audit trail. properties: apiVersion: description: |- @@ -85,48 +73,12 @@ spec: default: false description: Allow to enable debug mode on the module type: boolean - deploymentStrategy: - default: single - description: Deprecated. - type: string dev: default: false description: |- Allow to enable dev mode on the module Dev mode is used to allow some application to do custom setup in development mode (allow insecure certificates for example) type: boolean - locking: - description: Locking is intended for ledger v1 only - properties: - redis: - properties: - duration: - description: |- - A Duration represents the elapsed time between two instants - as an int64 nanosecond count. The representation limits the - largest representable duration to approximately 290 years. - format: int64 - type: integer - insecure: - default: false - type: boolean - retry: - description: |- - A Duration represents the elapsed time between two instants - as an int64 nanosecond count. The representation limits the - largest representable duration to approximately 290 years. - format: int64 - type: integer - tls: - default: false - type: boolean - uri: - type: string - type: object - strategy: - default: memory - type: string - type: object stack: description: Stack indicates the stack on which the module is installed type: string diff --git a/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_searches.formance.com.yaml b/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_searches.formance.com.yaml index 27048afe..69491994 100644 --- a/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_searches.formance.com.yaml +++ b/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_searches.formance.com.yaml @@ -65,21 +65,6 @@ spec: readKeySetMaxRetries: type: integer type: object - batching: - description: Batching allow to define custom batching configuration - properties: - count: - description: Count indicates the number of messages that can be - kept in memory before being flushed to ElasticSearch - type: integer - period: - description: Period indicates the maximum duration messages can - be kept in memory before being flushed to ElasticSearch - type: string - required: - - count - - period - type: object debug: default: false description: Allow to enable debug mode on the module @@ -176,11 +161,6 @@ spec: description: Ready indicates if the resource is seen as completely reconciled type: boolean - topicCleaned: - default: false - description: TopicCleaned is used to flag stacks where the topics - have been cleaned (still search-ledgerv2 and co consumers) - type: boolean type: object type: object served: true diff --git a/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_stacks.formance.com.yaml b/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_stacks.formance.com.yaml index 82bc4ccf..1bbde333 100644 --- a/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_stacks.formance.com.yaml +++ b/helm/crds/templates/crds/apiextensions.k8s.io_v1_customresourcedefinition_stacks.formance.com.yaml @@ -107,6 +107,7 @@ spec: description: |- EnableAudit enable audit at the stack level. Actually, it enables audit on [Gateway](#gateway) + deprecated type: boolean version: description: |- diff --git a/helm/operator/Chart.lock b/helm/operator/Chart.lock index e312f49b..68ca0453 100644 --- a/helm/operator/Chart.lock +++ b/helm/operator/Chart.lock @@ -1,6 +1,6 @@ dependencies: - name: operator-crds repository: file://../crds - version: 2.8.5 -digest: sha256:93cb2facc30c36f1f216fbd06095ee4e6320bd67bdfa55e5e9cb6bb957d7233a -generated: "2025-11-05T14:58:04.62803+01:00" + version: 3.0.0 +digest: sha256:f339ac5b9b62e4f9c0f9aaf2bc06ccbd534f89170a47ca6da1489411d313a373 +generated: "2025-11-10T16:22:25.461931+01:00" diff --git a/helm/operator/Chart.yaml b/helm/operator/Chart.yaml index c1417604..2405a1bc 100644 --- a/helm/operator/Chart.yaml +++ b/helm/operator/Chart.yaml @@ -12,14 +12,14 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: "2.17.2" +version: "3.0.0" # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "v2.17.2" +appVersion: "v3.0.0" dependencies: - name: operator-crds - version: "2.X" + version: "3.X" repository: "file://../crds" condition: operator-crds.create diff --git a/internal/core/env.go b/internal/core/env.go index 5eba8189..7f72c11b 100644 --- a/internal/core/env.go +++ b/internal/core/env.go @@ -69,16 +69,9 @@ func GetDevEnvVars(stack *v1beta1.Stack, service interface { IsDebug() bool IsDev() bool }) []corev1.EnvVar { - return GetDevEnvVarsWithPrefix(stack, service, "") -} - -func GetDevEnvVarsWithPrefix(stack *v1beta1.Stack, service interface { - IsDebug() bool - IsDev() bool -}, prefix string) []corev1.EnvVar { return []corev1.EnvVar{ - EnvFromBool(fmt.Sprintf("%sDEBUG", prefix), stack.Spec.Debug || service.IsDebug()), - EnvFromBool(fmt.Sprintf("%sDEV", prefix), stack.Spec.Dev || service.IsDev()), - Env(fmt.Sprintf("%sSTACK", prefix), stack.Name), + EnvFromBool("DEBUG", stack.Spec.Debug || service.IsDebug()), + EnvFromBool("DEV", stack.Spec.Dev || service.IsDev()), + Env("STACK", stack.Name), } } diff --git a/internal/core/reconciler.go b/internal/core/reconciler.go index 1076f2bb..8478df05 100644 --- a/internal/core/reconciler.go +++ b/internal/core/reconciler.go @@ -135,7 +135,7 @@ func WithWatchSettings[T client.Object]() ReconcilerOption[T] { func WithWatchDependency[T client.Object](t v1beta1.Dependent) ReconcilerOption[T] { return func(options *ReconcilerOptions[T]) { options.Watchers[t] = ReconcilerOptionsWatch{ - Handler: func(mgr Manager, builder *builder.Builder, target client.Object) (handler.EventHandler, []builder.WatchesOption) { + Handler: func(mgr Manager, b *builder.Builder, target client.Object) (handler.EventHandler, []builder.WatchesOption) { return handler.EnqueueRequestsFromMapFunc(WatchDependents(mgr, target)), nil }, } @@ -230,12 +230,6 @@ func withReconciler[T client.Object](controller ObjectController[T], opts ...Rec } } -func WithReconciler[T client.Object](controller func(ctx Context, req T) error, opts ...ReconcilerOption[T]) Initializer { - return withReconciler(func(ctx Context, reconcilerOptions *ReconcilerOptions[T], req T) error { - return controller(ctx, req) - }, opts...) -} - func reconcileObject[T client.Object](mgr Manager, controller ObjectController[T], reconcilerOptions ReconcilerOptions[T]) func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { return func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { diff --git a/internal/resources/auths/env.go b/internal/resources/auths/env.go index 5dc35be4..84fea02f 100644 --- a/internal/resources/auths/env.go +++ b/internal/resources/auths/env.go @@ -1,7 +1,6 @@ package auths import ( - "fmt" "strconv" "github.com/formancehq/operator/api/formance.com/v1beta1" @@ -11,10 +10,6 @@ import ( ) func ProtectedEnvVars(ctx Context, stack *v1beta1.Stack, moduleName string, auth *v1beta1.AuthConfig) ([]v1.EnvVar, error) { - return ProtectedAPIEnvVarsWithPrefix(ctx, stack, moduleName, auth, "") -} - -func ProtectedAPIEnvVarsWithPrefix(ctx Context, stack *v1beta1.Stack, moduleName string, auth *v1beta1.AuthConfig, prefix string) ([]v1.EnvVar, error) { ret := make([]v1.EnvVar, 0) hasAuth, err := HasDependency(ctx, stack.Name, &v1beta1.Auth{}) @@ -31,14 +26,14 @@ func ProtectedAPIEnvVarsWithPrefix(ctx Context, stack *v1beta1.Stack, moduleName } ret = append(ret, - Env(fmt.Sprintf("%sAUTH_ENABLED", prefix), "true"), - Env(fmt.Sprintf("%sAUTH_ISSUER", prefix), url), + Env("AUTH_ENABLED", "true"), + Env("AUTH_ISSUER", url), ) if auth != nil { if auth.ReadKeySetMaxRetries != 0 { ret = append(ret, - Env(fmt.Sprintf("%sAUTH_READ_KEY_SET_MAX_RETRIES", prefix), strconv.Itoa(auth.ReadKeySetMaxRetries)), + Env("AUTH_READ_KEY_SET_MAX_RETRIES", strconv.Itoa(auth.ReadKeySetMaxRetries)), ) } } @@ -51,8 +46,8 @@ func ProtectedAPIEnvVarsWithPrefix(ctx Context, stack *v1beta1.Stack, moduleName if checkScopes { ret = append(ret, - Env(fmt.Sprintf("%sAUTH_CHECK_SCOPES", prefix), "true"), - Env(fmt.Sprintf("%sAUTH_SERVICE", prefix), moduleName), + Env("AUTH_CHECK_SCOPES", "true"), + Env("AUTH_SERVICE", moduleName), ) } diff --git a/internal/resources/auths/init.go b/internal/resources/auths/init.go index 00da7f70..9fcc2bc4 100644 --- a/internal/resources/auths/init.go +++ b/internal/resources/auths/init.go @@ -17,7 +17,6 @@ limitations under the License. package auths import ( - "github.com/davecgh/go-spew/spew" . "github.com/formancehq/go-libs/v2/collectionutils" "github.com/formancehq/operator/api/formance.com/v1beta1" . "github.com/formancehq/operator/internal/core" @@ -82,9 +81,8 @@ func Reconcile(ctx Context, stack *v1beta1.Stack, auth *v1beta1.Auth, version st if err != nil { return errors.Wrap(err, "resolving image configuration") } - spew.Dump(imageConfiguration) - if IsGreaterOrEqual(version, "v2.0.0-rc.5") && databases.GetSavedModuleVersion(database) != version { + if databases.GetSavedModuleVersion(database) != version { if err := databases.Migrate(ctx, stack, auth, imageConfiguration, database); err != nil { return err } diff --git a/internal/resources/benthos/builtin-templates/event_bus.yaml b/internal/resources/benthos/builtin-templates/event_bus.yaml new file mode 100644 index 00000000..51056d7a --- /dev/null +++ b/internal/resources/benthos/builtin-templates/event_bus.yaml @@ -0,0 +1,46 @@ +name: event_bus +type: input + +fields: + - name: topic + type: string + - name: consumer_group + type: string + +mapping: | + root = if env("BROKER") == "kafka" {{ + "kafka_franz": { + "seed_brokers": [ env("KAFKA_ADDRESS") ], + "topics": [ env("TOPIC_PREFIX") + this.topic ], + "consumer_group": this.consumer_group, + "checkpoint_limit": 1024, + "sasl": [ + { + "mechanism": env("KAFKA_SASL_MECHANISM"), + "password": env("KAFKA_SASL_PASSWORD"), + "username": env("KAFKA_SASL_USERNAME"), + "aws": { + "region": env("AWS_REGION"), + "credentials": { + "profile": env("AWS_PROFILE"), + "id": env("AWS_ACCESS_KEY_ID"), + "secret": env("AWS_SECRET_ACCESS_KEY"), + "token": env("AWS_SESSION_TOKEN"), + "role": env("AWS_ROLE_ARN") + } + } + } + ], + "tls": { + "enabled": env("KAFKA_TLS_ENABLED") == "true" + } + } + }} else {{ + "nats_jetstream": { + "urls": [env("NATS_URL")], + "queue": this.consumer_group, + "subject": env("TOPIC_PREFIX") + this.topic, + "durable": if env("NATS_BIND") == "true" { this.consumer_group + "_" + this.topic } else { this.consumer_group }, + "bind": env("NATS_BIND") == "true" + } + }} \ No newline at end of file diff --git a/internal/resources/benthos/controller.go b/internal/resources/benthos/controller.go index 477c2f87..325e1097 100644 --- a/internal/resources/benthos/controller.go +++ b/internal/resources/benthos/controller.go @@ -1,7 +1,10 @@ package benthos import ( + "crypto/sha256" "embed" + "encoding/base64" + "encoding/json" "fmt" "sort" @@ -11,10 +14,8 @@ import ( "github.com/formancehq/operator/internal/resources/applications" "github.com/formancehq/operator/internal/resources/registries" "github.com/formancehq/operator/internal/resources/resourcereferences" - benthosOperator "github.com/formancehq/operator/internal/resources/searches/benthos" "github.com/formancehq/operator/internal/resources/services" "github.com/formancehq/operator/internal/resources/settings" - "github.com/formancehq/search/benthos" "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -25,6 +26,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +//go:embed builtin-templates +var builtinTemplates embed.FS + //+kubebuilder:rbac:groups=formance.com,resources=benthos,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=formance.com,resources=benthos/status,verbs=get;update;patch //+kubebuilder:rbac:groups=formance.com,resources=benthos/finalizers,verbs=update @@ -64,35 +68,22 @@ func createService(ctx Context, b *v1beta1.Benthos) error { return err } -// TODO(gfyrag): there is a ton of search related configuration // We need to this controller and keep it focused on benthos func createDeployment(ctx Context, stack *v1beta1.Stack, b *v1beta1.Benthos) error { - brokerURI, err := settings.RequireURL(ctx, stack.Name, "broker", "dsn") - if err != nil { - return errors.Wrap(err, "searching broker configuration") - } - - elasticSearchURI, err := settings.RequireURL(ctx, stack.Name, "elasticsearch", "dsn") - if err != nil { - return errors.Wrap(err, "searching elasticsearch configuration") - } - serviceAccountName, err := settings.GetAWSServiceAccount(ctx, stack.Name) if err != nil { return err } - awsIAMEnabled := serviceAccountName != "" - var resourceReference *v1beta1.ResourceReference - if secret := elasticSearchURI.Query().Get("secret"); !awsIAMEnabled && secret != "" { - resourceReference, err = resourcereferences.Create(ctx, b, "elasticsearch", secret, &corev1.Secret{}) - } else { - err = resourcereferences.Delete(ctx, b, "elasticsearch") - } + // Cleanup potential old resource reference (pre v3.0.0) + // todo(next-minor): remove + err = resourcereferences.Delete(ctx, b, "elasticsearch") if err != nil { return err } + awsIAMEnabled := serviceAccountName != "" + broker := &v1beta1.Broker{} if err := ctx.GetClient().Get(ctx, types.NamespacedName{ Name: stack.Name, @@ -112,66 +103,33 @@ func createDeployment(ctx Context, stack *v1beta1.Stack, b *v1beta1.Benthos) err } env := []corev1.EnvVar{ - Env("OPENSEARCH_URL", elasticSearchURI.WithoutQuery().String()), Env("TOPIC_PREFIX", topicPrefix), - Env("OPENSEARCH_INDEX", "stacks"), Env("STACK", b.Spec.Stack), + Env("BROKER", broker.Status.URI.Scheme), } if awsIAMEnabled { env = append(env, Env("AWS_IAM_ENABLED", "true")) } - if b.Spec.Batching != nil { - if b.Spec.Batching.Count != 0 { - env = append(env, Env("OPENSEARCH_BATCHING_COUNT", fmt.Sprint(b.Spec.Batching.Count))) - } - if b.Spec.Batching.Period != "" { - env = append(env, Env("OPENSEARCH_BATCHING_PERIOD", b.Spec.Batching.Period)) - } - } - - if brokerURI.Scheme == "kafka" { - env = append(env, Env("KAFKA_ADDRESS", brokerURI.Host)) - if settings.IsTrue(brokerURI.Query().Get("tls")) { + if broker.Status.URI.Scheme == "kafka" { + env = append(env, Env("KAFKA_ADDRESS", broker.Status.URI.Host)) + if settings.IsTrue(broker.Status.URI.Query().Get("tls")) { env = append(env, Env("KAFKA_TLS_ENABLED", "true")) } - if settings.IsTrue(brokerURI.Query().Get("saslEnabled")) { + if settings.IsTrue(broker.Status.URI.Query().Get("saslEnabled")) { env = append(env, - Env("KAFKA_SASL_USERNAME", brokerURI.Query().Get("saslUsername")), - Env("KAFKA_SASL_PASSWORD", brokerURI.Query().Get("saslPassword")), - Env("KAFKA_SASL_MECHANISM", brokerURI.Query().Get("saslMechanism")), + Env("KAFKA_SASL_USERNAME", broker.Status.URI.Query().Get("saslUsername")), + Env("KAFKA_SASL_PASSWORD", broker.Status.URI.Query().Get("saslPassword")), + Env("KAFKA_SASL_MECHANISM", broker.Status.URI.Query().Get("saslMechanism")), ) } } - if brokerURI.Scheme == "nats" { - env = append(env, Env("NATS_URL", brokerURI.Host)) + if broker.Status.URI.Scheme == "nats" { + env = append(env, Env("NATS_URL", broker.Status.URI.Host)) if broker.Status.Mode == v1beta1.ModeOneStreamByStack { env = append(env, Env("NATS_BIND", "true")) } } - if secret := elasticSearchURI.Query().Get("secret"); elasticSearchURI.User != nil || secret != "" { - env = append(env, Env("BASIC_AUTH_ENABLED", "true")) - if secret == "" { - password, _ := brokerURI.User.Password() - env = append(env, - Env("BASIC_AUTH_USERNAME", brokerURI.User.Username()), - Env("BASIC_AUTH_PASSWORD", password), - ) - } else { - env = append(env, - EnvFromSecret("BASIC_AUTH_USERNAME", secret, "username"), - EnvFromSecret("BASIC_AUTH_PASSWORD", secret, "password"), - ) - } - } else { - // Even if basic auth is not enabled, we need to set the env vars - // to avoid benthos to crash due to linting errors - env = append(env, - Env("BASIC_AUTH_ENABLED", "false"), - Env("BASIC_AUTH_USERNAME", "username"), - Env("BASIC_AUTH_PASSWORD", "password"), - ) - } cmd := []string{ "/benthos", @@ -181,58 +139,69 @@ func createDeployment(ctx Context, stack *v1beta1.Stack, b *v1beta1.Benthos) err cmd = append(cmd, "--log.level", "trace", "streams", "/streams/*.yaml") - volumes := make([]corev1.Volume, 0) - volumeMounts := make([]corev1.VolumeMount, 0) + // Drop config map if exists (pre v3.0.0) + kinds, _, err := ctx.GetScheme().ObjectKinds(&corev1.ConfigMap{}) + if err != nil { + return err + } - type directory struct { - name string - fs embed.FS + object := &unstructured.Unstructured{} + object.SetGroupVersionKind(kinds[0]) + object.SetNamespace(stack.Name) + object.SetName("benthos-audit") + if err := client.IgnoreNotFound(ctx.GetClient().Delete(ctx, object)); err != nil { + return errors.Wrap(err, "deleting audit config map") } - directories := []directory{ + volumes := make([]corev1.Volume, 0) + volumeMounts := make([]corev1.VolumeMount, 0) + configMaps := make([]*corev1.ConfigMap, 0) + + for _, object := range []struct { + discr string + files map[string]string + }{ { - name: "templates", - fs: benthos.Templates, + discr: "resources", + files: b.Spec.Resources, }, { - name: "resources", - fs: benthos.Resources, + discr: "templates", + files: func() map[string]string { + ret := b.Spec.Templates + if ret == nil { + ret = make(map[string]string) + } + + files, err := builtinTemplates.ReadDir("builtin-templates") + if err != nil { + panic(err) + } + + for _, file := range files { + data, err := builtinTemplates.ReadFile("builtin-templates/" + file.Name()) + if err != nil { + panic(err) + } + + ret[file.Name()] = string(data) + } + + return ret + }(), }, - } - - if stack.Spec.EnableAudit { - directories = append(directories, directory{ - name: "audit", - fs: benthosOperator.Audit, - }) - } else { - kinds, _, err := ctx.GetScheme().ObjectKinds(&corev1.ConfigMap{}) - if err != nil { - return err - } - - object := &unstructured.Unstructured{} - object.SetGroupVersionKind(kinds[0]) - object.SetNamespace(stack.Name) - object.SetName("benthos-audit") - if err := client.IgnoreNotFound(ctx.GetClient().Delete(ctx, object)); err != nil { - return errors.Wrap(err, "deleting audit config map") - } - } - - configMaps := make([]*corev1.ConfigMap, 0) - - for _, x := range directories { - data := make(map[string]string) - - CopyDir(x.fs, x.name, x.name, &data) + } { + configMapName := fmt.Sprintf("benthos-%s", object.discr) configMap, _, err := CreateOrUpdate[*corev1.ConfigMap](ctx, types.NamespacedName{ Namespace: b.Spec.Stack, - Name: "benthos-" + x.name, + Name: configMapName, }, func(t *corev1.ConfigMap) error { - t.Data = data + t.Data = object.files + if t.Data == nil { + t.Data = make(map[string]string) + } return nil }, @@ -244,27 +213,24 @@ func createDeployment(ctx Context, stack *v1beta1.Stack, b *v1beta1.Benthos) err configMaps = append(configMaps, configMap) + volumeName := object.discr volumes = append(volumes, corev1.Volume{ - Name: x.name, + Name: volumeName, VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ - Name: "benthos-" + x.name, + Name: configMapName, }, }, }, }) volumeMounts = append(volumeMounts, corev1.VolumeMount{ - Name: x.name, + Name: volumeName, ReadOnly: true, - MountPath: "/" + x.name, + MountPath: fmt.Sprintf("/%s", object.discr), }) } - if stack.Spec.EnableAudit { - cmd = append(cmd, "/audit/gateway_audit.yaml") - } - streamList := &v1beta1.BenthosStreamList{} if err := ctx.GetClient().List(ctx, streamList, client.MatchingFields{ "stack": b.Spec.Stack, @@ -282,11 +248,19 @@ func createDeployment(ctx Context, stack *v1beta1.Stack, b *v1beta1.Benthos) err return err } - podAnnotations := map[string]string{ - "config-hash": HashFromConfigMaps(configMaps...), + digest := sha256.New() + for _, configMap := range configMaps { + if err := json.NewEncoder(digest).Encode(configMap.Data); err != nil { + panic(err) + } } - if resourceReference != nil { - podAnnotations["elasticsearch-secret-hash"] = resourceReference.Status.Hash + for _, stream := range streams { + digest.Write([]byte(stream.Status.ConfigMapHash)) + } + configHash := base64.StdEncoding.EncodeToString(digest.Sum(nil)) + + podAnnotations := map[string]string{ + "config-hash": configHash, } return applications. diff --git a/internal/resources/benthosstreams/create.go b/internal/resources/benthosstreams/create.go deleted file mode 100644 index 87987116..00000000 --- a/internal/resources/benthosstreams/create.go +++ /dev/null @@ -1,90 +0,0 @@ -package benthosstreams - -import ( - "embed" - "fmt" - - "github.com/formancehq/go-libs/v2/collectionutils" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/formancehq/operator/api/formance.com/v1beta1" - . "github.com/formancehq/operator/internal/core" - - "strings" - - "github.com/pkg/errors" - "k8s.io/apimachinery/pkg/types" -) - -func LoadFromFileSystem(ctx Context, fs embed.FS, - owner v1beta1.Module, streamDirectory, discr string, opts ...ObjectMutator[*v1beta1.BenthosStream]) error { - streamFiles, err := fs.ReadDir(streamDirectory) - if err != nil { - return err - } - - search := &v1beta1.Search{} - hasSearch, err := HasDependency(ctx, owner.GetStack(), search) - if err != nil { - return err - } - - if hasSearch { - names := make([]string, 0) - for _, file := range streamFiles { - streamContent, err := fs.ReadFile(streamDirectory + "/" + file.Name()) - if err != nil { - return err - } - - sanitizedName := strings.ReplaceAll(file.Name(), "_", "-") - - opts = append(opts, - func(stream *v1beta1.BenthosStream) error { - stream.Spec.Data = string(streamContent) - stream.Spec.Stack = owner.GetStack() - stream.Spec.Name = strings.TrimSuffix(file.Name(), ".yaml") - - return nil - }, - WithLabels[*v1beta1.BenthosStream](map[string]string{ - "service": string(owner.GetUID()) + "-" + discr, - }), - WithController[*v1beta1.BenthosStream](ctx.GetScheme(), owner)) - - name := fmt.Sprintf("%s-%s", owner.GetStack(), sanitizedName) - _, _, err = CreateOrUpdate[*v1beta1.BenthosStream](ctx, types.NamespacedName{ - Name: name, - }, opts...) - if err != nil { - return errors.Wrap(err, "creating stream") - } - - names = append(names, name) - } - - // Clean potential orphan streams - l := &v1beta1.BenthosStreamList{} - if err := ctx.GetClient().List(ctx, l, client.MatchingLabels{ - "service": string(owner.GetUID()) + "-" + discr, - }); err != nil { - return err - } - - for _, stream := range l.Items { - if !collectionutils.Contains(names, stream.Name) { - if err := ctx.GetClient().Delete(ctx, &stream); err != nil { - return err - } - } - } - } else { - if err := ctx.GetClient().DeleteAllOf(ctx, &v1beta1.BenthosStream{}, client.MatchingLabels{ - "service": string(owner.GetUID()) + "-" + discr, - }); err != nil { - return err - } - } - - return nil -} diff --git a/internal/resources/benthosstreams/init.go b/internal/resources/benthosstreams/init.go index b95d9f0d..c59c672f 100644 --- a/internal/resources/benthosstreams/init.go +++ b/internal/resources/benthosstreams/init.go @@ -30,7 +30,16 @@ import ( //+kubebuilder:rbac:groups=formance.com,resources=benthosstreams/finalizers,verbs=update func Reconcile(ctx Context, _ *v1beta1.Stack, stream *v1beta1.BenthosStream) error { - _, _, err := CreateOrUpdate[*corev1.ConfigMap](ctx, types.NamespacedName{ + + // todo(next-minor): remove that cleanup code + // clean legacy + // since we don't create any BenthosStream in the operator + // we can clean all streams owned by one of our component + if len(stream.GetOwnerReferences()) == 1 && stream.GetOwnerReferences()[0].APIVersion == "formance.com/v1beta1" { + return ctx.GetClient().Delete(ctx, stream) + } + + cm, _, err := CreateOrUpdate[*corev1.ConfigMap](ctx, types.NamespacedName{ Namespace: stream.Spec.Stack, Name: fmt.Sprintf("stream-%s", stream.Name), }, @@ -47,7 +56,9 @@ func Reconcile(ctx Context, _ *v1beta1.Stack, stream *v1beta1.BenthosStream) err return err } - return err + stream.Status.ConfigMapHash = HashFromConfigMaps(cm) + + return nil } func init() { diff --git a/internal/resources/brokerconsumers/create.go b/internal/resources/brokerconsumers/create.go index 49f01760..eb8f9e0b 100644 --- a/internal/resources/brokerconsumers/create.go +++ b/internal/resources/brokerconsumers/create.go @@ -8,7 +8,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" . "github.com/formancehq/go-libs/v2/collectionutils" - v1beta1 "github.com/formancehq/operator/api/formance.com/v1beta1" + "github.com/formancehq/operator/api/formance.com/v1beta1" "github.com/formancehq/operator/internal/core" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" diff --git a/internal/resources/brokers/utils.go b/internal/resources/brokers/utils.go index 2f988357..c4f67a84 100644 --- a/internal/resources/brokers/utils.go +++ b/internal/resources/brokers/utils.go @@ -14,35 +14,31 @@ import ( ) func GetBrokerEnvVars(ctx core.Context, brokerURI *v1beta1.URI, stackName, serviceName string) ([]v1.EnvVar, error) { - return GetEnvVarsWithPrefix(ctx, brokerURI, stackName, serviceName, "") -} - -func GetEnvVarsWithPrefix(ctx core.Context, brokerURI *v1beta1.URI, stackName, serviceName, prefix string) ([]v1.EnvVar, error) { ret := make([]v1.EnvVar, 0) - ret = append(ret, core.Env(fmt.Sprintf("%sBROKER", prefix), brokerURI.Scheme)) + ret = append(ret, core.Env("BROKER", brokerURI.Scheme)) if brokerURI.Query().Get("circuitBreakerEnabled") == "true" { - ret = append(ret, core.Env(fmt.Sprintf("%sPUBLISHER_CIRCUIT_BREAKER_ENABLED", prefix), "true")) + ret = append(ret, core.Env("PUBLISHER_CIRCUIT_BREAKER_ENABLED", "true")) if openInterval := brokerURI.Query().Get("circuitBreakerOpenInterval"); openInterval != "" { - ret = append(ret, core.Env(fmt.Sprintf("%sPUBLISHER_CIRCUIT_BREAKER_OPEN_INTERVAL_DURATION", prefix), openInterval)) + ret = append(ret, core.Env("PUBLISHER_CIRCUIT_BREAKER_OPEN_INTERVAL_DURATION", openInterval)) } } switch { case brokerURI.Scheme == "kafka": ret = append(ret, - core.Env(fmt.Sprintf("%sBROKER", prefix), "kafka"), - core.Env(fmt.Sprintf("%sPUBLISHER_KAFKA_ENABLED", prefix), "true"), - core.Env(fmt.Sprintf("%sPUBLISHER_KAFKA_BROKER", prefix), brokerURI.Host), + core.Env("BROKER", "kafka"), + core.Env("PUBLISHER_KAFKA_ENABLED", "true"), + core.Env("PUBLISHER_KAFKA_BROKER", brokerURI.Host), ) if settings.IsTrue(brokerURI.Query().Get("saslEnabled")) { ret = append(ret, - core.Env(fmt.Sprintf("%sPUBLISHER_KAFKA_SASL_ENABLED", prefix), "true"), - core.Env(fmt.Sprintf("%sPUBLISHER_KAFKA_SASL_USERNAME", prefix), brokerURI.Query().Get("saslUsername")), - core.Env(fmt.Sprintf("%sPUBLISHER_KAFKA_SASL_PASSWORD", prefix), brokerURI.Query().Get("saslPassword")), - core.Env(fmt.Sprintf("%sPUBLISHER_KAFKA_SASL_MECHANISM", prefix), brokerURI.Query().Get("saslMechanism")), - core.Env(fmt.Sprintf("%sPUBLISHER_KAFKA_SASL_SCRAM_SHA_SIZE", prefix), brokerURI.Query().Get("saslSCRAMSHASize")), + core.Env("PUBLISHER_KAFKA_SASL_ENABLED", "true"), + core.Env("PUBLISHER_KAFKA_SASL_USERNAME", brokerURI.Query().Get("saslUsername")), + core.Env("PUBLISHER_KAFKA_SASL_PASSWORD", brokerURI.Query().Get("saslPassword")), + core.Env("PUBLISHER_KAFKA_SASL_MECHANISM", brokerURI.Query().Get("saslMechanism")), + core.Env("PUBLISHER_KAFKA_SASL_SCRAM_SHA_SIZE", brokerURI.Query().Get("saslSCRAMSHASize")), ) serviceAccount, err := settings.GetAWSServiceAccount(ctx, stackName) @@ -51,39 +47,39 @@ func GetEnvVarsWithPrefix(ctx core.Context, brokerURI *v1beta1.URI, stackName, s } if serviceAccount != "" { - ret = append(ret, core.Env(fmt.Sprintf("%sPUBLISHER_KAFKA_SASL_IAM_ENABLED", prefix), "true")) + ret = append(ret, core.Env("PUBLISHER_KAFKA_SASL_IAM_ENABLED", "true")) } } if settings.IsTrue(brokerURI.Query().Get("tls")) { ret = append(ret, - core.Env(fmt.Sprintf("%sPUBLISHER_KAFKA_TLS_ENABLED", prefix), "true"), + core.Env("PUBLISHER_KAFKA_TLS_ENABLED", "true"), ) } case brokerURI.Scheme == "nats": ret = append(ret, - core.Env(fmt.Sprintf("%sPUBLISHER_NATS_ENABLED", prefix), "true"), - core.Env(fmt.Sprintf("%sPUBLISHER_NATS_URL", prefix), brokerURI.Host), - core.Env(fmt.Sprintf("%sPUBLISHER_NATS_CLIENT_ID", prefix), fmt.Sprintf("%s-%s", stackName, serviceName)), + core.Env("PUBLISHER_NATS_ENABLED", "true"), + core.Env("PUBLISHER_NATS_URL", brokerURI.Host), + core.Env("PUBLISHER_NATS_CLIENT_ID", fmt.Sprintf("%s-%s", stackName, serviceName)), ) } return ret, nil } -func GetPublisherEnvVars(stack *v1beta1.Stack, broker *v1beta1.Broker, service, prefix string) []v1.EnvVar { +func GetPublisherEnvVars(stack *v1beta1.Stack, broker *v1beta1.Broker, service string) []v1.EnvVar { switch broker.Status.Mode { case v1beta1.ModeOneStreamByService: return []v1.EnvVar{ - core.Env(fmt.Sprintf("%sPUBLISHER_TOPIC_MAPPING", prefix), "*:"+core.GetObjectName(stack.Name, service)), + core.Env("PUBLISHER_TOPIC_MAPPING", "*:"+core.GetObjectName(stack.Name, service)), } case v1beta1.ModeOneStreamByStack: ret := []v1.EnvVar{ - core.Env(fmt.Sprintf("%sPUBLISHER_TOPIC_MAPPING", prefix), fmt.Sprintf("*:%s.%s", stack.Name, service)), + core.Env("PUBLISHER_TOPIC_MAPPING", fmt.Sprintf("*:%s.%s", stack.Name, service)), } if broker.Status.URI.Scheme == "nats" { - ret = append(ret, core.Env(fmt.Sprintf("%sPUBLISHER_NATS_AUTO_PROVISION", prefix), "false")) + ret = append(ret, core.Env("PUBLISHER_NATS_AUTO_PROVISION", "false")) } return ret default: diff --git a/internal/resources/databases/env.go b/internal/resources/databases/env.go index c148479f..dee45afe 100644 --- a/internal/resources/databases/env.go +++ b/internal/resources/databases/env.go @@ -1,7 +1,6 @@ package databases import ( - "fmt" "net/url" "strconv" "time" @@ -13,43 +12,39 @@ import ( corev1 "k8s.io/api/core/v1" ) -func GetPostgresEnvVars(ctx core.Context, stack *v1beta1.Stack, db *v1beta1.Database) ([]corev1.EnvVar, error) { - return PostgresEnvVarsWithPrefix(ctx, stack, db, "") -} - -func PostgresEnvVarsWithPrefix(ctx core.Context, stack *v1beta1.Stack, database *v1beta1.Database, prefix string) ([]corev1.EnvVar, error) { +func GetPostgresEnvVars(ctx core.Context, stack *v1beta1.Stack, database *v1beta1.Database) ([]corev1.EnvVar, error) { ret := []corev1.EnvVar{ - core.Env(fmt.Sprintf("%sPOSTGRES_HOST", prefix), database.Status.URI.Hostname()), - core.Env(fmt.Sprintf("%sPOSTGRES_PORT", prefix), database.Status.URI.Port()), - core.Env(fmt.Sprintf("%sPOSTGRES_DATABASE", prefix), database.Status.Database), + core.Env("POSTGRES_HOST", database.Status.URI.Hostname()), + core.Env("POSTGRES_PORT", database.Status.URI.Port()), + core.Env("POSTGRES_DATABASE", database.Status.Database), } if database.Status.URI.User.Username() != "" || database.Status.URI.Query().Get("secret") != "" { if database.Status.URI.User.Username() != "" { password, _ := database.Status.URI.User.Password() ret = append(ret, - core.Env(fmt.Sprintf("%sPOSTGRES_USERNAME", prefix), database.Status.URI.User.Username()), - core.Env(fmt.Sprintf("%sPOSTGRES_PASSWORD", prefix), url.QueryEscape(password)), + core.Env("POSTGRES_USERNAME", database.Status.URI.User.Username()), + core.Env("POSTGRES_PASSWORD", url.QueryEscape(password)), ) } else { secret := database.Status.URI.Query().Get("secret") ret = append(ret, - core.EnvFromSecret(fmt.Sprintf("%sPOSTGRES_USERNAME", prefix), secret, "username"), - core.EnvFromSecret(fmt.Sprintf("%sPOSTGRES_PASSWORD", prefix), secret, "password"), + core.EnvFromSecret("POSTGRES_USERNAME", secret, "username"), + core.EnvFromSecret("POSTGRES_PASSWORD", secret, "password"), ) } ret = append(ret, - core.Env(fmt.Sprintf("%sPOSTGRES_NO_DATABASE_URI", prefix), core.ComputeEnvVar("postgresql://%s:%s@%s:%s", - fmt.Sprintf("%sPOSTGRES_USERNAME", prefix), - fmt.Sprintf("%sPOSTGRES_PASSWORD", prefix), - fmt.Sprintf("%sPOSTGRES_HOST", prefix), - fmt.Sprintf("%sPOSTGRES_PORT", prefix), + core.Env("POSTGRES_NO_DATABASE_URI", core.ComputeEnvVar("postgresql://%s:%s@%s:%s", + "POSTGRES_USERNAME", + "POSTGRES_PASSWORD", + "POSTGRES_HOST", + "POSTGRES_PORT", )), ) } else { ret = append(ret, - core.Env(fmt.Sprintf("%sPOSTGRES_NO_DATABASE_URI", prefix), core.ComputeEnvVar("postgresql://%s:%s", - fmt.Sprintf("%sPOSTGRES_HOST", prefix), - fmt.Sprintf("%sPOSTGRES_PORT", prefix), + core.Env("POSTGRES_NO_DATABASE_URI", core.ComputeEnvVar("postgresql://%s:%s", + "POSTGRES_HOST", + "POSTGRES_PORT", )), ) } @@ -60,7 +55,7 @@ func PostgresEnvVarsWithPrefix(ctx core.Context, stack *v1beta1.Stack, database } if awsRole != "" { - ret = append(ret, core.Env(fmt.Sprintf("%sPOSTGRES_AWS_ENABLE_IAM", prefix), "true")) + ret = append(ret, core.Env("POSTGRES_AWS_ENABLE_IAM", "true")) } f := "%s/%s" @@ -68,9 +63,9 @@ func PostgresEnvVarsWithPrefix(ctx core.Context, stack *v1beta1.Stack, database f += "?sslmode=disable" } ret = append(ret, - core.Env(fmt.Sprintf("%sPOSTGRES_URI", prefix), core.ComputeEnvVar(f, - fmt.Sprintf("%sPOSTGRES_NO_DATABASE_URI", prefix), - fmt.Sprintf("%sPOSTGRES_DATABASE", prefix))), + core.Env("POSTGRES_URI", core.ComputeEnvVar(f, + "POSTGRES_NO_DATABASE_URI", + "POSTGRES_DATABASE")), ) config, err := settings.GetAs[connectionPoolConfiguration](ctx, stack.Name, "modules", database.Spec.Service, "database", "connection-pool") @@ -83,28 +78,28 @@ func PostgresEnvVarsWithPrefix(ctx core.Context, stack *v1beta1.Stack, database if err != nil { return nil, errors.Wrap(err, "cannot parse max idle value") } - ret = append(ret, core.Env(fmt.Sprintf("%sPOSTGRES_MAX_IDLE_CONNS", prefix), config.MaxIdle)) + ret = append(ret, core.Env("POSTGRES_MAX_IDLE_CONNS", config.MaxIdle)) } if config.MaxIdleTime != "" { _, err := time.ParseDuration(config.MaxIdleTime) if err != nil { return nil, errors.Wrap(err, "cannot parse max idle time value") } - ret = append(ret, core.Env(fmt.Sprintf("%sPOSTGRES_CONN_MAX_IDLE_TIME", prefix), config.MaxIdleTime)) + ret = append(ret, core.Env("POSTGRES_CONN_MAX_IDLE_TIME", config.MaxIdleTime)) } if config.MaxOpen != "" { _, err := strconv.ParseUint(config.MaxOpen, 10, 64) if err != nil { return nil, errors.Wrap(err, "cannot parse max open conns value") } - ret = append(ret, core.Env(fmt.Sprintf("%sPOSTGRES_MAX_OPEN_CONNS", prefix), config.MaxOpen)) + ret = append(ret, core.Env("POSTGRES_MAX_OPEN_CONNS", config.MaxOpen)) } if config.MaxLifetime != "" { _, err := time.ParseDuration(config.MaxLifetime) if err != nil { return nil, errors.Wrap(err, "cannot parse max lifetime value") } - ret = append(ret, core.Env(fmt.Sprintf("%sPOSTGRES_CONN_MAX_LIFETIME", prefix), config.MaxLifetime)) + ret = append(ret, core.Env("POSTGRES_CONN_MAX_LIFETIME", config.MaxLifetime)) } return ret, nil diff --git a/internal/resources/gateways/Caddyfile.gotpl b/internal/resources/gateways/Caddyfile.gotpl index a581cf7b..3bdc7bc9 100644 --- a/internal/resources/gateways/Caddyfile.gotpl +++ b/internal/resources/gateways/Caddyfile.gotpl @@ -11,6 +11,14 @@ {{- if .EnableAudit }} (audit) { audit { + topic_name {$TOPIC_NAME} + auto_provision false + auth_enabled {$AUTH_ENABLED:false} + auth_url {$AUTH_URL:http://auth:8080} + auth_issuer {$AUTH_ISSUER:http://auth:8080} + organization_id {$ORGANIZATION_ID:""} + stack_id {$STACK_ID:""} + # Kafka publisher {{- if (eq .Broker "kafka") }} publisher_kafka_broker {$PUBLISHER_KAFKA_BROKER:redpanda:29092} @@ -51,7 +59,7 @@ # c.f. https://caddyserver.com/docs/caddyfile/directives#directive-order order versions after metrics {{- if .EnableAudit }} - order audit after encode + order audit before handle {{- end }} } diff --git a/internal/resources/gateways/caddyfile.go b/internal/resources/gateways/caddyfile.go index 82beb698..6d4731ad 100644 --- a/internal/resources/gateways/caddyfile.go +++ b/internal/resources/gateways/caddyfile.go @@ -22,8 +22,7 @@ func CreateCaddyfile(ctx core.Context, stack *v1beta1.Stack, }, } - // TODO(gfyrag): Check if search is enabled - if stack.Spec.EnableAudit && broker != nil { + if broker != nil { data["EnableAudit"] = true data["Broker"] = broker.Status.URI.Scheme } diff --git a/internal/resources/gateways/deployment.go b/internal/resources/gateways/deployment.go index e40c9271..4918aece 100644 --- a/internal/resources/gateways/deployment.go +++ b/internal/resources/gateways/deployment.go @@ -8,22 +8,47 @@ import ( "github.com/formancehq/operator/internal/resources/caddy" "github.com/formancehq/operator/internal/resources/registries" v1 "k8s.io/api/core/v1" + "strings" ) -func createDeployment(ctx core.Context, stack *v1beta1.Stack, - gateway *v1beta1.Gateway, caddyfileConfigMap *v1.ConfigMap, - broker *v1beta1.Broker, version string) error { +func createDeployment( + ctx core.Context, + stack *v1beta1.Stack, + gateway *v1beta1.Gateway, + caddyfileConfigMap *v1.ConfigMap, + broker *v1beta1.Broker, + version string, +) error { env := GetEnvVars(gateway) env = append(env, core.GetDevEnvVars(stack, gateway)...) - if stack.Spec.EnableAudit && broker != nil { + if broker != nil { brokerEnvVar, err := brokers.GetBrokerEnvVars(ctx, broker.Status.URI, stack.Name, "gateway") if err != nil { return err } env = append(env, brokerEnvVar...) + + parts := strings.SplitN(stack.Name, "-", 2) + if len(parts) == 2 { + env = append(env, + core.Env("ORGANIZATION_ID", parts[0]), + core.Env("STACK_ID", parts[1]), + ) + } + + hasDependency, err := core.HasDependency(ctx, stack.Name, &v1beta1.Auth{}) + if err != nil { + return err + } + if hasDependency { + env = append(env, + core.Env("AUTH_ENABLED", "true"), + core.Env("AUTH_ISSUER", URL(gateway)+"/api/auth"), + ) + } } imageConfiguration, err := registries.GetFormanceImage(ctx, stack, "gateway", version) @@ -36,6 +61,21 @@ func createDeployment(ctx core.Context, stack *v1beta1.Stack, return err } + if broker != nil { + var topicPrefix string + switch broker.Status.Mode { + case v1beta1.ModeOneStreamByService: + topicPrefix = broker.Spec.Stack + "-" + case v1beta1.ModeOneStreamByStack: + topicPrefix = broker.Spec.Stack + "." + } + + caddyTpl.Spec.Template.Spec.Containers[0].Env = append( + caddyTpl.Spec.Template.Spec.Containers[0].Env, + core.Env("TOPIC_NAME", topicPrefix+"gateway"), + ) + } + caddyTpl.Name = "gateway" return applications. New(gateway, caddyTpl). diff --git a/internal/resources/gateways/gateways.go b/internal/resources/gateways/gateways.go index 0bce6f77..091d5529 100644 --- a/internal/resources/gateways/gateways.go +++ b/internal/resources/gateways/gateways.go @@ -13,10 +13,6 @@ import ( var Caddyfile string func EnvVarsIfEnabled(ctx core.Context, stackName string) ([]v1.EnvVar, error) { - return EnvVarsIfEnabledWithPrefix(ctx, stackName, "") -} - -func EnvVarsIfEnabledWithPrefix(ctx core.Context, stackName, prefix string) ([]v1.EnvVar, error) { gateway := &v1beta1.Gateway{} ok, err := core.GetIfExists(ctx, stackName, gateway) if err != nil { @@ -26,21 +22,17 @@ func EnvVarsIfEnabledWithPrefix(ctx core.Context, stackName, prefix string) ([]v return nil, nil } - return GetEnvVarsWithPrefix(gateway, prefix), nil + return GetEnvVars(gateway), nil } func GetEnvVars(gateway *v1beta1.Gateway) []v1.EnvVar { - return GetEnvVarsWithPrefix(gateway, "") -} - -func GetEnvVarsWithPrefix(gateway *v1beta1.Gateway, prefix string) []v1.EnvVar { ret := []v1.EnvVar{{ - Name: fmt.Sprintf("%sSTACK_URL", prefix), + Name: "STACK_URL", Value: "http://gateway:8080", }} if gateway.Spec.Ingress != nil { ret = append(ret, v1.EnvVar{ - Name: fmt.Sprintf("%sSTACK_PUBLIC_URL", prefix), + Name: "STACK_PUBLIC_URL", Value: fmt.Sprintf("%s://%s", gateway.Spec.Ingress.Scheme, gateway.Spec.Ingress.Host), }) } diff --git a/internal/resources/gateways/init.go b/internal/resources/gateways/init.go index 15d8b41d..8dd3de3a 100644 --- a/internal/resources/gateways/init.go +++ b/internal/resources/gateways/init.go @@ -60,6 +60,12 @@ func Reconcile(ctx Context, stack *v1beta1.Stack, gateway *v1beta1.Gateway, vers } } + if broker != nil { + if !broker.Status.Ready { + return NewPendingError().WithMessage("broker not ready") + } + } + configMap, err := createConfigMap(ctx, stack, gateway, httpAPIs, broker) if err != nil { return err diff --git a/internal/resources/ledgers/assets.go b/internal/resources/ledgers/assets.go deleted file mode 100644 index 1363d6bf..00000000 --- a/internal/resources/ledgers/assets.go +++ /dev/null @@ -1,12 +0,0 @@ -package ledgers - -import ( - "embed" - _ "embed" -) - -//go:embed assets/Caddyfile.gotpl -var Caddyfile string - -//go:embed assets/reindex -var reindexStreams embed.FS diff --git a/internal/resources/ledgers/assets/Caddyfile.gotpl b/internal/resources/ledgers/assets/Caddyfile.gotpl deleted file mode 100644 index 6ceffd83..00000000 --- a/internal/resources/ledgers/assets/Caddyfile.gotpl +++ /dev/null @@ -1,30 +0,0 @@ -{ - {{ if .Debug }}debug{{ end }} -} - -:8080 { - {{- if .EnableOpenTelemetry }} - tracing { - span gateway - } - {{- end }} - log { - output stdout - {{- if .Debug }} - level DEBUG - {{- end }} - } - - handle { - method GET - reverse_proxy ledger-read:8080 { - header_up Host {upstream_hostport} - } - } - - handle { - reverse_proxy ledger-write:8080 { - header_up Host {upstream_hostport} - } - } -} diff --git a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex.yaml b/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex.yaml deleted file mode 100644 index 8db8e175..00000000 --- a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex.yaml +++ /dev/null @@ -1,16 +0,0 @@ -input: - http_server: - path: / - -output: - broker: - outputs: - - http_client: - verb: POST - url: http://localhost:4195/ledger_reindex_volumes - - http_client: - verb: POST - url: http://localhost:4195/ledger_reindex_transactions - - http_client: - verb: POST - url: http://localhost:4195/ledger_reindex_accounts diff --git a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_accounts.yaml b/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_accounts.yaml deleted file mode 100644 index 56dfae15..00000000 --- a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_accounts.yaml +++ /dev/null @@ -1,60 +0,0 @@ -input: - http_server: - path: / - -pipeline: - processors: - - bloblang: | - meta ledger = this.ledger - meta batchSize = 100 - - postgres_query: - service: ledger - query: 'select count(*) as accounts_count from "${! meta("ledger") }".accounts' - - unarchive: - format: json_array - - bloblang: | - meta loopCount = (this.accounts_count.number() / meta("batchSize").number()).ceil() - meta loopIndex = 0 - - bloblang: | - root = if meta("loopCount") == "0" { - deleted() - } - - while: - check: 'meta("loopIndex") < meta("loopCount")' - processors: - - postgres_query: - service: ledger - query: | - select address, metadata - from "${! meta("ledger") }".accounts - offset ${! meta("loopIndex").number() * meta("batchSize").number() } - limit ${! meta("batchSize") } - - bloblang: - meta loopIndex = meta("loopIndex").number() + 1 - - unarchive: - format: json_array - - bloblang: | - root = this.assign({ - "metadata": this.metadata.parse_json() - }) - - bloblang: | - root = { - "document": { - "data": { - "address": this.address, - "ledger": meta("ledger"), - "metadata": this.metadata - }, - "indexed": { - "address": this.address, - "ledger": meta("ledger") - }, - "kind": "ACCOUNT", - "ledger": meta("ledger") - }, - "id": "ACCOUNT-%s-%s".format(meta("ledger"), this.address), - "action": "upsert" - } - -output: - resource: elasticsearch diff --git a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_all.yaml b/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_all.yaml deleted file mode 100644 index 1a86110b..00000000 --- a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_all.yaml +++ /dev/null @@ -1,18 +0,0 @@ -input: - http_server: - path: / - -pipeline: - processors: - - postgres_query: - service: ledger - query: 'select * from "_system".ledgers' - - unarchive: - format: json_array - -output: - broker: - outputs: - - http_client: - verb: POST - url: http://localhost:4195/ledger_reindex diff --git a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_transactions.yaml b/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_transactions.yaml deleted file mode 100644 index a8b5cefe..00000000 --- a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_transactions.yaml +++ /dev/null @@ -1,76 +0,0 @@ -input: - http_server: - path: / - -pipeline: - processors: - - bloblang: | - meta ledger = this.ledger - meta batchSize = 100 - - postgres_query: - service: ledger - query: 'select count(*) as transactions_count from "${! meta("ledger") }".transactions' - - unarchive: - format: json_array - - bloblang: | - meta loopCount = (this.transactions_count.number() / meta("batchSize").number()).ceil() - meta loopIndex = 0 - - bloblang: | - root = if meta("loopCount") == "0" { - deleted() - } - - while: - check: 'meta("loopIndex") < meta("loopCount")' - processors: - - postgres_query: - service: ledger - query: | - select id, timestamp, reference, metadata, postings - from "${! meta("ledger") }".transactions - offset ${! meta("loopIndex").number() * meta("batchSize").number() } - limit ${! meta("batchSize") } - - bloblang: - meta loopIndex = meta("loopIndex").number() + 1 - - unarchive: - format: json_array - - bloblang: | - root = this.assign({ - "postings": this.postings.parse_json(), - "metadata": this.metadata.parse_json() - }) - - bloblang: | - root = { - "id": "TRANSACTION-%s-%s".format(meta("ledger"), this.id), - "action": "upsert", - "document": { - "data": { - "postings": this.postings, - "reference": this.reference, - "txid": this.txid, - "timestamp": this.timestamp, - "metadata": if this.metadata { this.metadata } else {{}}, - "ledger": meta("ledger") - }, - "indexed": { - "reference": this.reference, - "txid": this.id, - "timestamp": this.timestamp, - "asset": this.postings.map_each(p -> p.asset), - "source": this.postings.map_each(p -> p.source), - "destination": this.postings.map_each(p -> p.destination), - "amount": this.postings.map_each(p -> if p.asset.contains("/") { - [ - p.amount, - p.amount / range(0, p.asset.split("/").index(1).number()).fold(1, t -> t.tally * 10) # amount / pow(10, decimal part of asset) - ] - } else { [ p.amount ] }).flatten().map_each(v -> "%v".format(v)), - "ledger": meta("ledger") - }, - "kind": "TRANSACTION", - "ledger": meta("ledger"), - "when": this.date - } - } - -output: - resource: elasticsearch diff --git a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_volumes.yaml b/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_volumes.yaml deleted file mode 100644 index 62223860..00000000 --- a/internal/resources/ledgers/assets/reindex/v1.0.0/ledger_reindex_volumes.yaml +++ /dev/null @@ -1,60 +0,0 @@ -input: - http_server: - path: / - -pipeline: - processors: - - bloblang: | - meta ledger = this.ledger - meta batchSize = 100 - - postgres_query: - service: ledger - query: 'select count(*) as volumes_count from "${! meta("ledger") }".volumes' - - unarchive: - format: json_array - - bloblang: | - meta loopCount = (this.volumes_count.number() / meta("batchSize").number()).ceil() - meta loopIndex = 0 - - bloblang: | - root = if meta("loopCount") == "0" { - deleted() - } - - while: - check: 'meta("loopIndex") < meta("loopCount")' - processors: - - postgres_query: - service: ledger - query: | - select account, asset, input, output - from "${! meta("ledger") }".volumes - offset ${! meta("loopIndex").number() * meta("batchSize").number() } - limit ${! meta("batchSize") } - - bloblang: - meta loopIndex = meta("loopIndex").number() + 1 - - unarchive: - format: json_array - - bloblang: | - root = { - "id": "ASSET-%s-%s-%s".format(meta("ledger"), this.account, this.asset), - "action": "upsert", - "document": { - "data": { - "name": this.asset, - "input": this.input, - "output": this.output, - "account": this.account, - "ledger": meta("ledger") - }, - "indexed": { - "account": this.account, - "name": this.asset, - "ledger": meta("ledger") - }, - "kind": "ASSET", - "ledger": meta("ledger"), - "when": this.date - } - } - -output: - resource: elasticsearch diff --git a/internal/resources/ledgers/assets/reindex/v2.0.0/ledger_reindex.yaml b/internal/resources/ledgers/assets/reindex/v2.0.0/ledger_reindex.yaml deleted file mode 100644 index 00e8546f..00000000 --- a/internal/resources/ledgers/assets/reindex/v2.0.0/ledger_reindex.yaml +++ /dev/null @@ -1,13 +0,0 @@ -input: - http_server: - path: / - -output: - broker: - outputs: - - http_client: - verb: POST - url: http://localhost:4195/ledger_reindex_transactions - - http_client: - verb: POST - url: http://localhost:4195/ledger_reindex_accounts diff --git a/internal/resources/ledgers/assets/reindex/v2.0.0/ledger_reindex_accounts.yaml b/internal/resources/ledgers/assets/reindex/v2.0.0/ledger_reindex_accounts.yaml deleted file mode 100644 index 28532145..00000000 --- a/internal/resources/ledgers/assets/reindex/v2.0.0/ledger_reindex_accounts.yaml +++ /dev/null @@ -1,41 +0,0 @@ -input: - http_server: - path: / - -pipeline: - processors: - - bloblang: | - meta ledger = this.ledger - - postgres_query: - service: ledger - query: | - select address, metadata - from "${! meta("ledger") }".accounts - - unarchive: - format: json_array - - bloblang: | - root = this.assign({ - "metadata": this.metadata.parse_json() - }) - - bloblang: | - root = { - "document": { - "data": { - "address": this.address, - "ledger": meta("ledger"), - "metadata": this.metadata - }, - "indexed": { - "address": this.address, - "ledger": meta("ledger") - }, - "kind": "ACCOUNT", - "ledger": meta("ledger"), - "when": this.date - }, - "id": "ACCOUNT-%s-%s".format(meta("ledger"), this.address), - "action": "upsert" - } - -output: - resource: elasticsearch diff --git a/internal/resources/ledgers/assets/reindex/v2.0.0/ledger_reindex_all.yaml b/internal/resources/ledgers/assets/reindex/v2.0.0/ledger_reindex_all.yaml deleted file mode 100644 index 6b8a2b04..00000000 --- a/internal/resources/ledgers/assets/reindex/v2.0.0/ledger_reindex_all.yaml +++ /dev/null @@ -1,20 +0,0 @@ -input: - http_server: - path: / - -pipeline: - processors: - - postgres_query: - service: ledger - query: 'select * from "_system".ledgers' - - unarchive: - format: json_array - - log: - message: "Process ledger: ${! this.ledger }" - -output: - broker: - outputs: - - http_client: - verb: POST - url: http://localhost:4195/ledger_reindex diff --git a/internal/resources/ledgers/assets/reindex/v2.0.0/ledger_reindex_transactions.yaml b/internal/resources/ledgers/assets/reindex/v2.0.0/ledger_reindex_transactions.yaml deleted file mode 100644 index 1052d7ac..00000000 --- a/internal/resources/ledgers/assets/reindex/v2.0.0/ledger_reindex_transactions.yaml +++ /dev/null @@ -1,56 +0,0 @@ -input: - http_server: - path: / - -pipeline: - processors: - - bloblang: | - meta ledger = this.ledger - - postgres_query: - service: ledger - query: | - select id::varchar as id, timestamp, reference, metadata, postings - from "${! meta("ledger") }".transactions; - - unarchive: - format: json_array - - bloblang: | - root = this.assign({ - "postings": this.postings.parse_json(), - "metadata": this.metadata.parse_json() - }) - - bloblang: | - root = { - "id": "TRANSACTION-%s-%s".format(meta("ledger"), this.id), - "action": "upsert", - "document": { - "data": { - "postings": this.postings, - "reference": this.reference, - "txid": this.id, - "timestamp": this.timestamp, - "metadata": if this.metadata { this.metadata } else {{}}, - "ledger": meta("ledger") - }, - "indexed": { - "reference": this.reference, - "txid": this.id, - "timestamp": this.timestamp, - "asset": this.postings.map_each(p -> p.asset), - "source": this.postings.map_each(p -> p.source), - "destination": this.postings.map_each(p -> p.destination), - "amount": this.postings.map_each(p -> if p.asset.contains("/") { - [ - p.amount, - p.amount / range(0, p.asset.split("/").index(1).number()).fold(1, t -> t.tally * 10) # amount / pow(10, decimal part of asset) - ] - } else { [ p.amount ] }).flatten().map_each(v -> "%v".format(v)), - "ledger": meta("ledger") - }, - "kind": "TRANSACTION", - "ledger": meta("ledger"), - "when": this.date - } - } - -output: - resource: elasticsearch diff --git a/internal/resources/ledgers/deployments.go b/internal/resources/ledgers/deployments.go index c3127085..ab86097e 100644 --- a/internal/resources/ledgers/deployments.go +++ b/internal/resources/ledgers/deployments.go @@ -3,12 +3,10 @@ package ledgers import ( "fmt" "github.com/formancehq/operator/internal/resources/auths" - "golang.org/x/mod/semver" - "strconv" - "github.com/formancehq/operator/internal/resources/brokers" "github.com/formancehq/operator/internal/resources/brokertopics" - "github.com/formancehq/operator/internal/resources/caddy" + "github.com/formancehq/operator/internal/resources/services" + "golang.org/x/mod/semver" "k8s.io/apimachinery/pkg/types" "github.com/formancehq/operator/api/formance.com/v1beta1" @@ -17,62 +15,13 @@ import ( "github.com/formancehq/operator/internal/resources/databases" "github.com/formancehq/operator/internal/resources/gateways" "github.com/formancehq/operator/internal/resources/registries" - "github.com/formancehq/operator/internal/resources/services" "github.com/formancehq/operator/internal/resources/settings" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -const ( - ConditionTypeDeploymentStrategy = "LedgerDeploymentStrategy" - ReasonLedgerSingle = "Single" - ReasonLedgerMonoWriterMultipleReader = "MonoWriterMultipleReader" -) - -func hasDeploymentStrategyChanged(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, strategy string) (err error) { - condition := v1beta1.NewCondition(ConditionTypeDeploymentStrategy, ledger.Generation).SetReason( - func() string { - switch strategy { - case v1beta1.DeploymentStrategySingle: - return ReasonLedgerSingle - case v1beta1.DeploymentStrategyMonoWriterMultipleReader: - return ReasonLedgerMonoWriterMultipleReader - default: - return "unknown strategy" - } - }(), - ).SetMessage("Deployment strategy initialized") - - defer func() { - ledger.GetConditions().AppendOrReplace(*condition, v1beta1.AndConditions( - v1beta1.ConditionTypeMatch(ConditionTypeDeploymentStrategy), - v1beta1.ConditionGenerationMatch(ledger.Generation), - )) - }() - - // There is no generation 0, so we can't check for a change in strategy - // Uninstall is useless if the ledger deployment strategy has not changed - if ledger.GetConditions().Check(v1beta1.AndConditions( - v1beta1.ConditionTypeMatch(ConditionTypeDeploymentStrategy), - v1beta1.ConditionReasonMatch(condition.Reason), - v1beta1.ConditionGenerationMatch(ledger.Generation-1), - )) || ledger.GetGeneration() == 1 { - return - } - - condition.SetMessage("Deployment strategy has changed") - switch strategy { - case v1beta1.DeploymentStrategySingle: - return uninstallLedgerMonoWriterMultipleReader(ctx, stack) - case v1beta1.DeploymentStrategyMonoWriterMultipleReader: - return core.DeleteIfExists[*appsv1.Deployment](ctx, core.GetNamespacedResourceName(stack.Name, "ledger")) - default: - return fmt.Errorf("unknown deployment strategy %s", strategy) - } -} - -func installLedger(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, database *v1beta1.Database, imageConfiguration *registries.ImageConfiguration, version string, isV2 bool) (err error) { +func installLedger(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, database *v1beta1.Database, imageConfiguration *registries.ImageConfiguration, version string) (err error) { if !semver.IsValid(version) || semver.Compare(version, "v2.2.0-alpha") > 0 { if err := uninstallLedgerMonoWriterMultipleReader(ctx, stack); err != nil { @@ -89,65 +38,23 @@ func installLedger(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledge return nil } - deploymentStrategySettings, err := settings.GetStringOrDefault(ctx, stack.Name, v1beta1.DeploymentStrategySingle, "ledger", "deployment-strategy") - if err != nil { - return err - } - - if ledger.Spec.DeploymentStrategy == v1beta1.DeploymentStrategyMonoWriterMultipleReader { - deploymentStrategySettings = v1beta1.DeploymentStrategyMonoWriterMultipleReader - } - - if err = hasDeploymentStrategyChanged(ctx, stack, ledger, deploymentStrategySettings); err != nil { - return err - } - - switch deploymentStrategySettings { - case v1beta1.DeploymentStrategySingle: - return installLedgerSingleInstance(ctx, stack, ledger, database, imageConfiguration, isV2) - case v1beta1.DeploymentStrategyMonoWriterMultipleReader: - return installLedgerMonoWriterMultipleReader(ctx, stack, ledger, database, imageConfiguration, isV2) - default: - return fmt.Errorf("unknown deployment strategy %s", deploymentStrategySettings) - } + // For older versions, just use single instance deployment + return installLedgerSingleInstance(ctx, stack, ledger, database, imageConfiguration) } -func installLedgerSingleInstance( - ctx core.Context, - stack *v1beta1.Stack, - ledger *v1beta1.Ledger, - database *v1beta1.Database, - imageConfiguration *registries.ImageConfiguration, - v2 bool, -) error { - container, err := createLedgerContainerFull(ctx, stack, v2) +func installLedgerSingleInstance(ctx core.Context, stack *v1beta1.Stack, + ledger *v1beta1.Ledger, database *v1beta1.Database, imageConfiguration *registries.ImageConfiguration) error { + container, err := createLedgerContainerFull(ctx, stack) if err != nil { return err } - err = setCommonAPIContainerConfiguration(ctx, stack, ledger, imageConfiguration, database, container, v2) + err = setCommonAPIContainerConfiguration(ctx, stack, ledger, imageConfiguration, database, container) if err != nil { return err } - if !v2 && ledger.Spec.Locking != nil && ledger.Spec.Locking.Strategy == "redis" { - container.Env = append(container.Env, - core.Env("NUMARY_LOCK_STRATEGY", "redis"), - core.Env("NUMARY_LOCK_STRATEGY_REDIS_URL", ledger.Spec.Locking.Redis.Uri), - core.Env("NUMARY_LOCK_STRATEGY_REDIS_TLS_ENABLED", strconv.FormatBool(ledger.Spec.Locking.Redis.TLS)), - core.Env("NUMARY_LOCK_STRATEGY_REDIS_TLS_INSECURE", strconv.FormatBool(ledger.Spec.Locking.Redis.InsecureTLS)), - ) - - if ledger.Spec.Locking.Redis.Duration != 0 { - container.Env = append(container.Env, core.Env("NUMARY_LOCK_STRATEGY_REDIS_DURATION", ledger.Spec.Locking.Redis.Duration.String())) - } - - if ledger.Spec.Locking.Redis.Retry != 0 { - container.Env = append(container.Env, core.Env("NUMARY_LOCK_STRATEGY_REDIS_RETRY", ledger.Spec.Locking.Redis.Retry.String())) - } - } - - if err := createDeployment(ctx, stack, ledger, "ledger", *container, v2, 1, imageConfiguration); err != nil { + if err := createDeployment(ctx, stack, ledger, "ledger", *container, 1, imageConfiguration); err != nil { return err } @@ -225,7 +132,7 @@ func installLedgerStateless(ctx core.Context, stack *v1beta1.Stack, ledger *v1be } container.Env = append(container.Env, brokerEnvVar...) - container.Env = append(container.Env, brokers.GetPublisherEnvVars(stack, broker, "ledger", "")...) + container.Env = append(container.Env, brokers.GetPublisherEnvVars(stack, broker, "ledger")...) } bulkMaxSize, err := settings.GetInt(ctx, stack.Name, "ledger", "api", "bulk-max-size") @@ -236,7 +143,7 @@ func installLedgerStateless(ctx core.Context, stack *v1beta1.Stack, ledger *v1be container.Env = append(container.Env, core.Env("BULK_MAX_SIZE", fmt.Sprint(*bulkMaxSize))) } - err = setCommonAPIContainerConfiguration(ctx, stack, ledger, imageConfiguration, database, &container, true) + err = setCommonAPIContainerConfiguration(ctx, stack, ledger, imageConfiguration, database, &container) if err != nil { return err } @@ -283,7 +190,7 @@ func installLedgerWorker(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1 Args: []string{"worker"}, } - err := setCommonContainerConfiguration(ctx, stack, ledger, imageConfiguration, database, &container, true) + err := setCommonContainerConfiguration(ctx, stack, ledger, imageConfiguration, database, &container) if err != nil { return err } @@ -343,45 +250,6 @@ func installLedgerWorker(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1 return nil } -func installLedgerMonoWriterMultipleReader(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, database *v1beta1.Database, imageConfiguration *registries.ImageConfiguration, v2 bool) error { - - createDeployment := func(name string, container corev1.Container, replicas uint64) error { - err := setCommonAPIContainerConfiguration(ctx, stack, ledger, imageConfiguration, database, &container, v2) - if err != nil { - return err - } - - if err := createDeployment(ctx, stack, ledger, name, container, v2, replicas, imageConfiguration); err != nil { - return err - } - - if _, err := services.Create(ctx, ledger, name, services.WithDefault(name)); err != nil { - return err - } - - return nil - } - - container, err := createLedgerContainerWriteOnly(ctx, stack, v2) - if err != nil { - return err - } - if err := createDeployment("ledger-write", *container, 1); err != nil { - return err - } - - container = createLedgerContainerReadOnly(v2) - if err := createDeployment("ledger-read", *container, 0); err != nil { - return err - } - - if err := createGatewayDeployment(ctx, stack, ledger); err != nil { - return err - } - - return nil -} - func uninstallLedgerMonoWriterMultipleReader(ctx core.Context, stack *v1beta1.Stack) error { remove := func(name string) error { @@ -410,22 +278,13 @@ func uninstallLedgerMonoWriterMultipleReader(ctx core.Context, stack *v1beta1.St return nil } -func createDeployment(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, name string, container corev1.Container, v2 bool, replicas uint64, imageConfiguration *registries.ImageConfiguration) error { +func createDeployment(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, name string, container corev1.Container, replicas uint64, imageConfiguration *registries.ImageConfiguration) error { serviceAccountName, err := settings.GetAWSServiceAccount(ctx, stack.Name) if err != nil { return err } - var volumes []corev1.Volume - if !v2 { - volumes = []corev1.Volume{{ - Name: "config", - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{}, - }, - }} - } - + // No volumes needed for v2 tpl := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -435,7 +294,6 @@ func createDeployment(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Le Spec: corev1.PodSpec{ ImagePullSecrets: imageConfiguration.PullSecrets, Containers: []corev1.Container{container}, - Volumes: volumes, ServiceAccountName: serviceAccountName, }, }, @@ -448,21 +306,17 @@ func createDeployment(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Le Install(ctx) } -func setCommonContainerConfiguration(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, imageConfiguration *registries.ImageConfiguration, database *v1beta1.Database, container *corev1.Container, v2 bool) error { +func setCommonContainerConfiguration(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, imageConfiguration *registries.ImageConfiguration, database *v1beta1.Database, container *corev1.Container) error { - prefix := "" - if !v2 { - prefix = "NUMARY_" - } env := make([]corev1.EnvVar, 0) - otlpEnv, err := settings.GetOTELEnvVarsWithPrefix(ctx, stack.Name, core.LowerCamelCaseKind(ctx, ledger), prefix, " ") + otlpEnv, err := settings.GetOTELEnvVars(ctx, stack.Name, core.LowerCamelCaseKind(ctx, ledger), " ") if err != nil { return err } env = append(env, otlpEnv...) - env = append(env, core.GetDevEnvVarsWithPrefix(stack, ledger, prefix)...) + env = append(env, core.GetDevEnvVars(stack, ledger)...) - postgresEnvVar, err := databases.PostgresEnvVarsWithPrefix(ctx, stack, database, prefix) + postgresEnvVar, err := databases.GetPostgresEnvVars(ctx, stack, database) if err != nil { return err } @@ -470,30 +324,25 @@ func setCommonContainerConfiguration(ctx core.Context, stack *v1beta1.Stack, led container.Image = imageConfiguration.GetFullImageName() container.Env = append(container.Env, env...) - container.Env = append(container.Env, core.Env(fmt.Sprintf("%sSTORAGE_POSTGRES_CONN_STRING", prefix), fmt.Sprintf("$(%sPOSTGRES_URI)", prefix))) - container.Env = append(container.Env, core.Env(fmt.Sprintf("%sSTORAGE_DRIVER", prefix), "postgres")) + container.Env = append(container.Env, core.Env("STORAGE_POSTGRES_CONN_STRING", "$(POSTGRES_URI)")) + container.Env = append(container.Env, core.Env("STORAGE_DRIVER", "postgres")) return nil } -func setCommonAPIContainerConfiguration(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, imageConfiguration *registries.ImageConfiguration, database *v1beta1.Database, container *corev1.Container, v2 bool) error { +func setCommonAPIContainerConfiguration(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, imageConfiguration *registries.ImageConfiguration, database *v1beta1.Database, container *corev1.Container) error { - prefix := "" - if !v2 { - prefix = "NUMARY_" - } - - if err := setCommonContainerConfiguration(ctx, stack, ledger, imageConfiguration, database, container, v2); err != nil { + if err := setCommonContainerConfiguration(ctx, stack, ledger, imageConfiguration, database, container); err != nil { return err } - authEnvVars, err := auths.ProtectedAPIEnvVarsWithPrefix(ctx, stack, "ledger", ledger.Spec.Auth, prefix) + authEnvVars, err := auths.ProtectedEnvVars(ctx, stack, "ledger", ledger.Spec.Auth) if err != nil { return err } container.Env = append(container.Env, authEnvVars...) - gatewayEnv, err := gateways.EnvVarsIfEnabledWithPrefix(ctx, stack.Name, prefix) + gatewayEnv, err := gateways.EnvVarsIfEnabled(ctx, stack.Name) if err != nil { return err } @@ -504,28 +353,16 @@ func setCommonAPIContainerConfiguration(ctx core.Context, stack *v1beta1.Stack, return nil } -func createBaseLedgerContainer(v2 bool) *corev1.Container { +func createBaseLedgerContainer() *corev1.Container { ret := &corev1.Container{ Name: "ledger", } - var bindFlag = "BIND" - if !v2 { - bindFlag = "NUMARY_SERVER_HTTP_BIND_ADDRESS" - } - ret.Env = append(ret.Env, core.Env(bindFlag, ":8080")) - if !v2 { - ret.VolumeMounts = []corev1.VolumeMount{{ - Name: "config", - ReadOnly: false, - MountPath: "/root/.numary", - }} - } - + ret.Env = append(ret.Env, core.Env("BIND", ":8080")) return ret } -func createLedgerContainerFull(ctx core.Context, stack *v1beta1.Stack, v2 bool) (*corev1.Container, error) { - container := createBaseLedgerContainer(v2) +func createLedgerContainerFull(ctx core.Context, stack *v1beta1.Stack) (*corev1.Container, error) { + container := createBaseLedgerContainer() var broker *v1beta1.Broker if t, err := brokertopics.Find(ctx, stack, "ledger"); err != nil { @@ -543,67 +380,23 @@ func createLedgerContainerFull(ctx core.Context, stack *v1beta1.Stack, v2 bool) if !broker.Status.Ready { return nil, core.NewPendingError().WithMessage("broker not ready") } - prefix := "" - if !v2 { - prefix = "NUMARY_" - } - brokerEnvVar, err := brokers.GetEnvVarsWithPrefix(ctx, broker.Status.URI, stack.Name, "ledger", prefix) + brokerEnvVar, err := brokers.GetBrokerEnvVars(ctx, broker.Status.URI, stack.Name, "ledger") if err != nil { return nil, err } container.Env = append(container.Env, brokerEnvVar...) - container.Env = append(container.Env, brokers.GetPublisherEnvVars(stack, broker, "ledger", prefix)...) + container.Env = append(container.Env, brokers.GetPublisherEnvVars(stack, broker, "ledger")...) } - if v2 { - logsBatchSize, err := settings.GetInt(ctx, stack.Name, "ledger", "logs", "max-batch-size") - if err != nil { - return nil, err - } - if logsBatchSize != nil && *logsBatchSize != 0 { - container.Env = append(container.Env, core.Env("LEDGER_BATCH_SIZE", fmt.Sprint(*logsBatchSize))) - } - } - - return container, nil -} - -func createLedgerContainerWriteOnly(ctx core.Context, stack *v1beta1.Stack, v2 bool) (*corev1.Container, error) { - return createLedgerContainerFull(ctx, stack, v2) -} - -func createLedgerContainerReadOnly(v2 bool) *corev1.Container { - container := createBaseLedgerContainer(v2) - container.Env = append(container.Env, core.Env("READ_ONLY", "true")) - return container -} - -func createGatewayDeployment(ctx core.Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger) error { - - caddyfileConfigMap, err := caddy.CreateCaddyfileConfigMap(ctx, stack, "ledger", Caddyfile, map[string]any{ - "Debug": stack.Spec.Debug || ledger.Spec.Debug, - }, core.WithController[*corev1.ConfigMap](ctx.GetScheme(), ledger)) + logsBatchSize, err := settings.GetInt(ctx, stack.Name, "ledger", "logs", "max-batch-size") if err != nil { - return err - } - - env := make([]corev1.EnvVar, 0) - env = append(env, core.GetDevEnvVars(stack, ledger)...) - - caddyImage, err := registries.GetCaddyImage(ctx, stack) - if err != nil { - return err + return nil, err } - - tpl, err := caddy.DeploymentTemplate(ctx, stack, ledger, caddyfileConfigMap, caddyImage, env) - if err != nil { - return err + if logsBatchSize != nil && *logsBatchSize != 0 { + container.Env = append(container.Env, core.Env("LEDGER_BATCH_SIZE", fmt.Sprint(*logsBatchSize))) } - tpl.Name = "ledger-gateway" - return applications. - New(ledger, tpl). - Install(ctx) + return container, nil } diff --git a/internal/resources/ledgers/init.go b/internal/resources/ledgers/init.go index 2f653211..5baf64b9 100644 --- a/internal/resources/ledgers/init.go +++ b/internal/resources/ledgers/init.go @@ -18,24 +18,20 @@ package ledgers import ( _ "embed" - "fmt" - "github.com/formancehq/operator/internal/resources/jobs" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/formancehq/operator/api/formance.com/v1beta1" . "github.com/formancehq/operator/internal/core" - "github.com/formancehq/operator/internal/resources/benthosstreams" "github.com/formancehq/operator/internal/resources/brokertopics" "github.com/formancehq/operator/internal/resources/databases" "github.com/formancehq/operator/internal/resources/gatewayhttpapis" + "github.com/formancehq/operator/internal/resources/jobs" "github.com/formancehq/operator/internal/resources/registries" - "github.com/formancehq/search/benthos" "github.com/pkg/errors" "golang.org/x/mod/semver" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" ) //+kubebuilder:rbac:groups=formance.com,resources=ledgers,verbs=get;list;watch;create;update;patch;delete @@ -58,23 +54,6 @@ func Reconcile(ctx Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, versio return err } - isV2 := false - if !semver.IsValid(version) || semver.Compare(version, "v2.0.0-alpha") > 0 { - isV2 = true - } - - if err := benthosstreams.LoadFromFileSystem(ctx, benthos.Streams, ledger, "streams/ledger", "ingestion"); err != nil { - return err - } - - streamsVersion := "v1.0.0" - if isV2 { - streamsVersion = "v2.0.0" - } - if err := benthosstreams.LoadFromFileSystem(ctx, reindexStreams, ledger, fmt.Sprintf("assets/reindex/%s", streamsVersion), "reindex"); err != nil { - return err - } - hasDependency, err := HasDependency(ctx, stack.Name, &v1beta1.Search{}) if err != nil { return err @@ -95,7 +74,7 @@ func Reconcile(ctx Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, versio return NewPendingError().WithMessage("database not ready") } - if isV2 && databases.GetSavedModuleVersion(database) != version { + if databases.GetSavedModuleVersion(database) != version { err := databases.Migrate( ctx, stack, @@ -103,9 +82,6 @@ func Reconcile(ctx Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, versio imageConfiguration, database, jobs.Mutator(func(t *batchv1.Job) error { - if IsLower(version, "v2.0.0-rc.6") { - t.Spec.Template.Spec.Containers[0].Command = []string{"buckets", "upgrade-all"} - } t.Spec.Template.Spec.Containers[0].Env = append(t.Spec.Template.Spec.Containers[0].Env, Env("STORAGE_POSTGRES_CONN_STRING", "$(POSTGRES_URI)")) return nil @@ -134,7 +110,7 @@ func Reconcile(ctx Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, versio } if IsApplicationError(err) { // Start the ledger even if migrations are not terminated - return installLedger(ctx, stack, ledger, database, imageConfiguration, version, isV2) + return installLedger(ctx, stack, ledger, database, imageConfiguration, version) } return err @@ -144,7 +120,7 @@ func Reconcile(ctx Context, stack *v1beta1.Stack, ledger *v1beta1.Ledger, versio } } - return installLedger(ctx, stack, ledger, database, imageConfiguration, version, isV2) + return installLedger(ctx, stack, ledger, database, imageConfiguration, version) } func init() { diff --git a/internal/resources/orchestrations/deployments.go b/internal/resources/orchestrations/deployments.go index 80c21bf8..c4b70512 100644 --- a/internal/resources/orchestrations/deployments.go +++ b/internal/resources/orchestrations/deployments.go @@ -159,7 +159,7 @@ func createDeployment( return err } env = append(env, brokerEnvVars...) - env = append(env, brokers.GetPublisherEnvVars(stack, broker, "orchestration", "")...) + env = append(env, brokers.GetPublisherEnvVars(stack, broker, "orchestration")...) serviceAccountName, err := settings.GetAWSServiceAccount(ctx, stack.Name) if err != nil { diff --git a/internal/resources/orchestrations/init.go b/internal/resources/orchestrations/init.go index a9deef06..ea3a4e21 100644 --- a/internal/resources/orchestrations/init.go +++ b/internal/resources/orchestrations/init.go @@ -63,7 +63,7 @@ func Reconcile(ctx Context, stack *v1beta1.Stack, o *v1beta1.Orchestration, vers return errors.Wrap(err, "resolving image") } - if IsGreaterOrEqual(version, "v2.0.0-rc.5") && databases.GetSavedModuleVersion(database) != version { + if databases.GetSavedModuleVersion(database) != version { if err := databases.Migrate(ctx, stack, o, imageConfiguration, database); err != nil { return err diff --git a/internal/resources/payments/deployments.go b/internal/resources/payments/deployments.go index 00bacaae..0e2feb2c 100644 --- a/internal/resources/payments/deployments.go +++ b/internal/resources/payments/deployments.go @@ -229,7 +229,7 @@ func createFullDeployment( } env = append(env, brokerEnvVar...) - env = append(env, brokers.GetPublisherEnvVars(stack, broker, "payments", "")...) + env = append(env, brokers.GetPublisherEnvVars(stack, broker, "payments")...) } if v3 { @@ -412,7 +412,7 @@ func createConnectorsDeployment(ctx core.Context, stack *v1beta1.Stack, payments } env = append(env, brokerEnvVar...) - env = append(env, brokers.GetPublisherEnvVars(stack, broker, "payments", "")...) + env = append(env, brokers.GetPublisherEnvVars(stack, broker, "payments")...) } serviceAccountName, err := settings.GetAWSServiceAccount(ctx, stack.Name) diff --git a/internal/resources/payments/init.go b/internal/resources/payments/init.go index 82056c56..f7c10be2 100644 --- a/internal/resources/payments/init.go +++ b/internal/resources/payments/init.go @@ -27,11 +27,9 @@ import ( "github.com/formancehq/operator/api/formance.com/v1beta1" . "github.com/formancehq/operator/internal/core" - "github.com/formancehq/operator/internal/resources/benthosstreams" "github.com/formancehq/operator/internal/resources/brokertopics" "github.com/formancehq/operator/internal/resources/databases" "github.com/formancehq/operator/internal/resources/gatewayhttpapis" - "github.com/formancehq/search/benthos" "golang.org/x/mod/semver" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -76,10 +74,6 @@ func Reconcile(ctx Context, stack *v1beta1.Stack, p *v1beta1.Payments, version s healthEndpoint := "_health" switch { - case semver.IsValid(version) && semver.Compare(version, "v1.0.0-alpha") < 0: - if err := createFullDeployment(ctx, stack, p, database, imageConfiguration, false); err != nil { - return err - } case semver.IsValid(version) && semver.Compare(version, "v1.0.0-alpha") >= 0 && semver.Compare(version, "v3.0.0") < 0: if err := createReadDeployment(ctx, stack, p, database, imageConfiguration); err != nil { @@ -103,10 +97,6 @@ func Reconcile(ctx Context, stack *v1beta1.Stack, p *v1beta1.Payments, version s } } - if err := benthosstreams.LoadFromFileSystem(ctx, benthos.Streams, p, "streams/payments", "ingestion"); err != nil { - return err - } - if err := gatewayhttpapis.Create(ctx, p, gatewayhttpapis.WithHealthCheckEndpoint(healthEndpoint), gatewayhttpapis.WithRules( diff --git a/internal/resources/searches/benthos/audit/gateway_audit.yaml b/internal/resources/searches/benthos/audit/gateway_audit.yaml deleted file mode 100644 index 431d364d..00000000 --- a/internal/resources/searches/benthos/audit/gateway_audit.yaml +++ /dev/null @@ -1,34 +0,0 @@ -input: - event_bus: - topic: gateway - consumer_group: search - -pipeline: - processors: - - log: - message: "receive audit message: ${! this.payload.id }" - - switch_event_type: - events: - - label: AUDIT - version: v1 - processors: - - bloblang: | - root = { - "document": { - "data": this.payload, - "indexed": { - "identity": this.payload.identity, - "requestPath": this.payload.request.path, - "requestMethod": this.payload.request.method, - "responseStatusCode": this.payload.response.status_code, - }, - "kind": "AUDIT", - "when": this.date - }, - "action": "index", - "id": "AUDIT-%s".format(this.payload.id) - } - - -output: - resource: elasticsearch diff --git a/internal/resources/searches/benthos/fs.go b/internal/resources/searches/benthos/fs.go deleted file mode 100644 index 4d93f33a..00000000 --- a/internal/resources/searches/benthos/fs.go +++ /dev/null @@ -1,8 +0,0 @@ -package benthos - -import ( - "embed" -) - -//go:embed audit -var Audit embed.FS diff --git a/internal/resources/searches/clean_legacy_consumers.go b/internal/resources/searches/clean_legacy_consumers.go deleted file mode 100644 index 54b6fa7d..00000000 --- a/internal/resources/searches/clean_legacy_consumers.go +++ /dev/null @@ -1,44 +0,0 @@ -package searches - -import ( - "fmt" - - "github.com/formancehq/operator/api/formance.com/v1beta1" - . "github.com/formancehq/operator/internal/core" - "github.com/formancehq/operator/internal/resources/jobs" - "github.com/formancehq/operator/internal/resources/settings" - corev1 "k8s.io/api/core/v1" -) - -func cleanConsumers(ctx Context, search *v1beta1.Search) error { - - brokerURI, err := settings.RequireURL(ctx, search.Spec.Stack, "broker", "dsn") - if err != nil { - return err - } - - if brokerURI == nil { - return nil - } - - //todo: better handle errors - const script = ` - for service in ledger payments audit; do - for consumer in search-ledgerv2 search-payments-resets search-audit; do - index=$(nats --server $NATS_URI consumer ls $STACK-$service -j | jq "index(\"$consumer\")") - if [ "$index" != "null" ]; then - nats --server $NATS_URI consumer rm $STACK-$service $consumer -f || true - fi - done - done -` - return jobs.Handle(ctx, search, "clean-consumers", corev1.Container{ - Image: "natsio/nats-box:0.14.1", - Name: "delete-consumer", - Args: ShellScript(script), - Env: []corev1.EnvVar{ - Env("NATS_URI", fmt.Sprintf("nats://%s", brokerURI.Host)), - Env("STACK", search.Spec.Stack), - }, - }) -} diff --git a/internal/resources/searches/init.go b/internal/resources/searches/init.go index bed2b102..8b75a808 100644 --- a/internal/resources/searches/init.go +++ b/internal/resources/searches/init.go @@ -16,234 +16,6 @@ limitations under the License. package searches -import ( - "fmt" - "strconv" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/formancehq/operator/api/formance.com/v1beta1" - . "github.com/formancehq/operator/internal/core" - "github.com/formancehq/operator/internal/resources/applications" - "github.com/formancehq/operator/internal/resources/auths" - "github.com/formancehq/operator/internal/resources/brokerconsumers" - "github.com/formancehq/operator/internal/resources/gatewayhttpapis" - "github.com/formancehq/operator/internal/resources/gateways" - . "github.com/formancehq/operator/internal/resources/registries" - "github.com/formancehq/operator/internal/resources/resourcereferences" - "github.com/formancehq/operator/internal/resources/settings" - appsv1 "k8s.io/api/apps/v1" - v1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" -) - //+kubebuilder:rbac:groups=formance.com,resources=searches,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=formance.com,resources=searches/status,verbs=get;update;patch //+kubebuilder:rbac:groups=formance.com,resources=searches/finalizers,verbs=update - -func Reconcile(ctx Context, stack *v1beta1.Stack, search *v1beta1.Search, version string) error { - elasticSearchURI, err := settings.RequireURL(ctx, stack.Name, "elasticsearch", "dsn") - if err != nil { - return err - } - - serviceAccountName, err := settings.GetAWSServiceAccount(ctx, stack.Name) - if err != nil { - return err - } - - awsIAMEnabled := serviceAccountName != "" - - var elasticSearchSecretResourceRef *v1beta1.ResourceReference - if secret := elasticSearchURI.Query().Get("secret"); !awsIAMEnabled && secret != "" { - elasticSearchSecretResourceRef, err = resourcereferences.Create(ctx, search, "elasticsearch", secret, &corev1.Secret{}) - } else { - err = resourcereferences.Delete(ctx, search, "elasticsearch") - } - if err != nil { - return err - } - - env := make([]corev1.EnvVar, 0) - if awsIAMEnabled { - env = append(env, Env("AWS_IAM_ENABLED", "true")) - } - - otlpEnv, err := settings.GetOTELEnvVars(ctx, stack.Name, LowerCamelCaseKind(ctx, search), " ") - if err != nil { - return err - } - env = append(env, otlpEnv...) - env = append(env, GetDevEnvVars(stack, search)...) - - gatewayEnvVars, err := gateways.EnvVarsIfEnabled(ctx, stack.Name) - if err != nil { - return err - } - env = append(env, gatewayEnvVars...) - - env = append(env, - Env("OPEN_SEARCH_SERVICE", elasticSearchURI.Host), - Env("OPEN_SEARCH_SCHEME", elasticSearchURI.Scheme), - Env("ES_INDICES", "stacks"), - ) - if secret := elasticSearchURI.Query().Get("secret"); elasticSearchURI.User != nil || secret != "" { - if secret == "" { - password, _ := elasticSearchURI.User.Password() - env = append(env, - Env("OPEN_SEARCH_USERNAME", elasticSearchURI.User.Username()), - Env("OPEN_SEARCH_PASSWORD", password), - ) - } else { - env = append(env, - EnvFromSecret("OPEN_SEARCH_USERNAME", secret, "username"), - EnvFromSecret("OPEN_SEARCH_PASSWORD", secret, "password"), - ) - } - } - - authEnvVars, err := auths.ProtectedEnvVars(ctx, stack, "search", search.Spec.Auth) - if err != nil { - return err - } - env = append(env, authEnvVars...) - - imageConfiguration, err := GetFormanceImage(ctx, stack, "search", version) - if err != nil { - return err - } - - if err := createConsumers(ctx, search); err != nil { - return err - } - - batching := search.Spec.Batching - if batching == nil { - - batchingMap, err := settings.GetMapOrEmpty(ctx, stack.Name, "search", "batching") - if err != nil { - return err - } - - batching = &v1beta1.Batching{} - if countString, ok := batchingMap["count"]; ok { - count, err := strconv.ParseUint(countString, 10, 64) - if err != nil { - return err - } - batching.Count = int(count) - } - - if period, ok := batchingMap["period"]; ok { - batching.Period = period - } - } - - _, _, err = CreateOrUpdate[*v1beta1.Benthos](ctx, types.NamespacedName{ - Name: GetObjectName(stack.Name, "benthos"), - }, - WithController[*v1beta1.Benthos](ctx.GetScheme(), search), - func(t *v1beta1.Benthos) error { - t.Spec.Stack = stack.Name - t.Spec.Batching = batching - t.Spec.DevProperties = search.Spec.DevProperties - t.Spec.InitContainers = []corev1.Container{{ - Name: "init-mapping", - Image: imageConfiguration.GetFullImageName(), - Args: []string{"init-mapping"}, - Env: env, - }} - t.Spec.ImagePullSecrets = imageConfiguration.PullSecrets - - return nil - }, - ) - if err != nil { - return err - } - - annotations := map[string]string{} - if elasticSearchSecretResourceRef != nil { - annotations["elasticsearch-secret-hash"] = elasticSearchSecretResourceRef.Status.Hash - } - - err = applications. - New(search, &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: "search", - }, - Spec: appsv1.DeploymentSpec{ - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Annotations: annotations, - }, - Spec: corev1.PodSpec{ - ServiceAccountName: serviceAccountName, - ImagePullSecrets: imageConfiguration.PullSecrets, - Containers: []corev1.Container{{ - Name: "search", - Image: imageConfiguration.GetFullImageName(), - Ports: []corev1.ContainerPort{applications.StandardHTTPPort()}, - Env: env, - LivenessProbe: applications.DefaultLiveness("http"), - }}, - }, - }, - }, - }). - IsEE(). - Install(ctx) - if err != nil { - return err - } - - if err := gatewayhttpapis.Create(ctx, search, gatewayhttpapis.WithHealthCheckEndpoint("_healthcheck")); err != nil { - return err - } - - if !search.Status.TopicCleaned { - if err := cleanConsumers(ctx, search); err != nil { - return fmt.Errorf("failed to clean consumers for search: %w", err) - } - search.Status.TopicCleaned = true - } - - return err -} - -func createConsumers(ctx Context, search *v1beta1.Search) error { - for _, o := range []v1beta1.Module{ - &v1beta1.Payments{}, - &v1beta1.Ledger{}, - &v1beta1.Gateway{}, - } { - if ok, err := HasDependency(ctx, search.Spec.Stack, o); err != nil { - return err - } else if ok { - consumer, err := brokerconsumers.Create(ctx, search, LowerCamelCaseKind(ctx, o), LowerCamelCaseKind(ctx, o)) - if err != nil { - return err - } - if !consumer.Status.Ready { - return NewPendingError().WithMessage("waiting for consumer %s to be ready", consumer.Name) - } - } - } - - return nil -} - -func init() { - Init( - WithModuleReconciler(Reconcile, - WithWatchSettings[*v1beta1.Search](), - WithOwn[*v1beta1.Search](&v1beta1.BrokerConsumer{}), - WithOwn[*v1beta1.Search](&v1beta1.ResourceReference{}), - WithOwn[*v1beta1.Search](&v1beta1.Benthos{}), - WithOwn[*v1beta1.Search](&v1beta1.GatewayHTTPAPI{}), - WithOwn[*v1beta1.Search](&appsv1.Deployment{}), - WithOwn[*v1beta1.Search](&v1.Job{}), - ), - ) -} diff --git a/internal/resources/settings/opentelemetry.go b/internal/resources/settings/opentelemetry.go index 57b4f60f..66bc954f 100644 --- a/internal/resources/settings/opentelemetry.go +++ b/internal/resources/settings/opentelemetry.go @@ -17,22 +17,17 @@ const ( ) func GetOTELEnvVars(ctx core.Context, stack, serviceName string, sliceStringSeparator string) ([]v1.EnvVar, error) { - return GetOTELEnvVarsWithPrefix(ctx, stack, serviceName, "", sliceStringSeparator) -} - -func GetOTELEnvVarsWithPrefix(ctx core.Context, stack, serviceName, prefix, sliceStringSeparator string) ([]v1.EnvVar, error) { - - traces, err := otelEnvVars(ctx, stack, MonitoringTypeTraces, serviceName, prefix, sliceStringSeparator) + traces, err := otelEnvVars(ctx, stack, MonitoringTypeTraces, serviceName, sliceStringSeparator) if err != nil { return nil, err } - metrics, err := otelEnvVars(ctx, stack, MonitoringTypeMetrics, serviceName, prefix, sliceStringSeparator) + metrics, err := otelEnvVars(ctx, stack, MonitoringTypeMetrics, serviceName, sliceStringSeparator) if err != nil { return nil, err } if len(metrics) > 0 { - metrics = append(metrics, core.Env(fmt.Sprintf("%sOTEL_METRICS_RUNTIME", prefix), "true")) + metrics = append(metrics, core.Env("OTEL_METRICS_RUNTIME", "true")) } return append(traces, metrics...), nil @@ -51,7 +46,7 @@ func HasOpenTelemetryTracesEnabled(ctx core.Context, stack string) (bool, error) return true, nil } -func otelEnvVars(ctx core.Context, stack string, monitoringType MonitoringType, serviceName, prefix, sliceStringSeparator string) ([]v1.EnvVar, error) { +func otelEnvVars(ctx core.Context, stack string, monitoringType MonitoringType, serviceName, sliceStringSeparator string) ([]v1.EnvVar, error) { otlp, err := GetURL(ctx, stack, "opentelemetry", strings.ToLower(string(monitoringType)), "dsn") if err != nil { @@ -62,12 +57,12 @@ func otelEnvVars(ctx core.Context, stack string, monitoringType MonitoringType, } ret := []v1.EnvVar{ - core.Env(fmt.Sprintf("%sOTEL_%s", prefix, string(monitoringType)), "true"), - core.Env(fmt.Sprintf("%sOTEL_%s_BATCH", prefix, string(monitoringType)), "true"), - core.Env(fmt.Sprintf("%sOTEL_%s_EXPORTER", prefix, string(monitoringType)), "otlp"), - core.EnvFromBool(fmt.Sprintf("%sOTEL_%s_EXPORTER_OTLP_INSECURE", prefix, string(monitoringType)), IsTrue(otlp.Query().Get("insecure"))), - core.Env(fmt.Sprintf("%sOTEL_SERVICE_NAME", prefix), serviceName), - core.Env(fmt.Sprintf("%sOTEL_%s_EXPORTER_OTLP_MODE", prefix, string(monitoringType)), otlp.Scheme), + core.Env(fmt.Sprintf("OTEL_%s", string(monitoringType)), "true"), + core.Env(fmt.Sprintf("OTEL_%s_BATCH", string(monitoringType)), "true"), + core.Env(fmt.Sprintf("OTEL_%s_EXPORTER", string(monitoringType)), "otlp"), + core.EnvFromBool(fmt.Sprintf("OTEL_%s_EXPORTER_OTLP_INSECURE", string(monitoringType)), IsTrue(otlp.Query().Get("insecure"))), + core.Env("OTEL_SERVICE_NAME", serviceName), + core.Env(fmt.Sprintf("OTEL_%s_EXPORTER_OTLP_MODE", string(monitoringType)), otlp.Scheme), { Name: "POD_NAME", ValueFrom: &v1.EnvVarSource{ @@ -80,18 +75,18 @@ func otelEnvVars(ctx core.Context, stack string, monitoringType MonitoringType, // If the path is not empty, we use the full URL as the endpoint. var otlpEndpoint v1.EnvVar - otlpEndpointEnvName := fmt.Sprintf("%sOTEL_%s_EXPORTER_OTLP_ENDPOINT", prefix, string(monitoringType)) + otlpEndpointEnvName := fmt.Sprintf("OTEL_%s_EXPORTER_OTLP_ENDPOINT", string(monitoringType)) if otlp.Path != "" { otlpEndpoint = core.Env(otlpEndpointEnvName, otlp.String()) } else { - ret = append(ret, core.Env(fmt.Sprintf("%sOTEL_%s_PORT", prefix, string(monitoringType)), otlp.Port())) - ret = append(ret, core.Env(fmt.Sprintf("%sOTEL_%s_ENDPOINT", prefix, string(monitoringType)), otlp.Hostname())) + ret = append(ret, core.Env(fmt.Sprintf("OTEL_%s_PORT", string(monitoringType)), otlp.Port())) + ret = append(ret, core.Env(fmt.Sprintf("OTEL_%s_ENDPOINT", string(monitoringType)), otlp.Hostname())) otlpEndpoint = core.Env( otlpEndpointEnvName, core.ComputeEnvVar( "%s:%s", - fmt.Sprintf("%sOTEL_%s_ENDPOINT", prefix, string(monitoringType)), - fmt.Sprintf("%sOTEL_%s_PORT", prefix, string(monitoringType)), + fmt.Sprintf("OTEL_%s_ENDPOINT", string(monitoringType)), + fmt.Sprintf("OTEL_%s_PORT", string(monitoringType)), ), ) } @@ -115,7 +110,7 @@ func otelEnvVars(ctx core.Context, stack string, monitoringType MonitoringType, slices.Sort(resourceAttributesArray) ret = append(ret, core.Env( - fmt.Sprintf("%sOTEL_RESOURCE_ATTRIBUTES", prefix), + "OTEL_RESOURCE_ATTRIBUTES", strings.Join(resourceAttributesArray, sliceStringSeparator), )) diff --git a/internal/resources/webhooks/deployment.go b/internal/resources/webhooks/deployment.go index 9d1d64a2..543f86dc 100644 --- a/internal/resources/webhooks/deployment.go +++ b/internal/resources/webhooks/deployment.go @@ -1,14 +1,10 @@ package webhooks import ( - "fmt" - "strings" - "github.com/formancehq/operator/internal/resources/brokers" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - . "github.com/formancehq/go-libs/v2/collectionutils" "github.com/formancehq/operator/api/formance.com/v1beta1" "github.com/formancehq/operator/internal/core" "github.com/formancehq/operator/internal/resources/applications" @@ -128,70 +124,6 @@ func createAPIDeployment(ctx core.Context, stack *v1beta1.Stack, webhooks *v1bet Install(ctx) } -func createWorkerDeployment( - ctx core.Context, - stack *v1beta1.Stack, - webhooks *v1beta1.Webhooks, - database *v1beta1.Database, - consumer *v1beta1.BrokerConsumer, - version string, -) error { - - imageConfiguration, err := registries.GetFormanceImage(ctx, stack, "webhooks", version) - if err != nil { - return err - } - - env, err := deploymentEnvVars(ctx, stack, webhooks, database) - if err != nil { - return err - } - - env = append(env, core.Env("WORKER", "true")) - env = append(env, core.Env("KAFKA_TOPICS", strings.Join(Map(consumer.Spec.Services, func(from string) string { - return fmt.Sprintf("%s-%s", stack.Name, from) - }), " "))) - - serviceAccountName, err := settings.GetAWSServiceAccount(ctx, stack.Name) - if err != nil { - return err - } - - return applications. - New(webhooks, &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: "webhooks-worker", - }, - Spec: appsv1.DeploymentSpec{ - Template: v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - ImagePullSecrets: imageConfiguration.PullSecrets, - ServiceAccountName: serviceAccountName, - Containers: []v1.Container{{ - Name: "worker", - Env: env, - Image: imageConfiguration.GetFullImageName(), - Args: []string{"worker"}, - }}, - }, - }, - }, - }). - IsEE(). - Install(ctx) -} - func createSingleDeployment(ctx core.Context, stack *v1beta1.Stack, webhooks *v1beta1.Webhooks, database *v1beta1.Database, consumer *v1beta1.BrokerConsumer, version string) error { return createAPIDeployment(ctx, stack, webhooks, database, consumer, version, true) } - -func createDualDeployment(ctx core.Context, stack *v1beta1.Stack, webhooks *v1beta1.Webhooks, database *v1beta1.Database, consumer *v1beta1.BrokerConsumer, version string) error { - if err := createAPIDeployment(ctx, stack, webhooks, database, consumer, version, false); err != nil { - return err - } - if err := createWorkerDeployment(ctx, stack, webhooks, database, consumer, version); err != nil { - return err - } - - return nil -} diff --git a/internal/resources/webhooks/init.go b/internal/resources/webhooks/init.go index b354291a..8b26aeaf 100644 --- a/internal/resources/webhooks/init.go +++ b/internal/resources/webhooks/init.go @@ -68,14 +68,8 @@ func Reconcile(ctx Context, stack *v1beta1.Stack, webhooks *v1beta1.Webhooks, ve } if consumer.Status.Ready { - if IsGreaterOrEqual(version, "v0.7.1") { - if err := createSingleDeployment(ctx, stack, webhooks, database, consumer, version); err != nil { - return err - } - } else { - if err := createDualDeployment(ctx, stack, webhooks, database, consumer, version); err != nil { - return err - } + if err := createSingleDeployment(ctx, stack, webhooks, database, consumer, version); err != nil { + return err } } diff --git a/internal/tests/benthos_controller_test.go b/internal/tests/benthos_controller_test.go index e79b5b7d..fe5d1d5f 100644 --- a/internal/tests/benthos_controller_test.go +++ b/internal/tests/benthos_controller_test.go @@ -1,7 +1,7 @@ package tests_test import ( - v1beta1 "github.com/formancehq/operator/api/formance.com/v1beta1" + "github.com/formancehq/operator/api/formance.com/v1beta1" "github.com/formancehq/operator/internal/core" "github.com/formancehq/operator/internal/resources/settings" . "github.com/formancehq/operator/internal/tests/internal" @@ -11,26 +11,22 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" ) var _ = Describe("BenthosController", func() { Context("When creating a Benthos", func() { var ( - benthos *v1beta1.Benthos - broker *v1beta1.Broker - stack *v1beta1.Stack - brokerDSNSettings *v1beta1.Settings - elasticSearchDSNSettings *v1beta1.Settings + benthos *v1beta1.Benthos + broker *v1beta1.Broker + stack *v1beta1.Stack + brokerDSNSetting *v1beta1.Settings ) BeforeEach(func() { stack = &v1beta1.Stack{ ObjectMeta: RandObjectMeta(), Spec: v1beta1.StackSpec{}, } - brokerDSNSettings = settings.New(uuid.NewString(), "broker.dsn", "nats://localhost:1234", stack.Name) - elasticSearchDSNSettings = settings.New(uuid.NewString(), "elasticsearch.dsn", "https://localhost", stack.Name) broker = &v1beta1.Broker{ ObjectMeta: v1.ObjectMeta{ Name: stack.Name, @@ -51,16 +47,14 @@ var _ = Describe("BenthosController", func() { } }) JustBeforeEach(func() { - Expect(Create(brokerDSNSettings)).To(BeNil()) Expect(Create(stack)).To(Succeed()) - Expect(Create(elasticSearchDSNSettings)).To(Succeed()) + brokerDSNSetting = settings.New(uuid.NewString(), "broker.dsn", "noop://do-nothing", stack.Name) + Expect(Create(brokerDSNSetting)).To(BeNil()) Expect(Create(broker)).To(Succeed()) Expect(Create(benthos)).To(Succeed()) }) JustAfterEach(func() { Expect(Delete(stack)).To(Succeed()) - Expect(Delete(elasticSearchDSNSettings)).To(Succeed()) - Expect(Delete(brokerDSNSettings)).To(Succeed()) }) It("Should create appropriate resources", func() { By("Should create a deployment", func() { @@ -82,41 +76,5 @@ var _ = Describe("BenthosController", func() { }).Should(BeNil()) }) }) - Context("with audit enabled on stack", func() { - BeforeEach(func() { - stack.Spec.EnableAudit = true - }) - It("Should properly configure the service", func() { - By("should add a config map for the stream", func() { - Eventually(func() error { - cm := &corev1.ConfigMap{} - return LoadResource(stack.Name, "benthos-audit", cm) - }).Should(Succeed()) - }) - By("should add a cmd args to the deployment", func() { - t := &appsv1.Deployment{} - Eventually(func(g Gomega) []string { - g.Expect(LoadResource(stack.Name, "benthos", t)).To(Succeed()) - return t.Spec.Template.Spec.Containers[0].Command - }).Should(ContainElement("/audit/gateway_audit.yaml")) - }) - }) - Context("then disabling audit", func() { - JustBeforeEach(func() { - Eventually(func() error { - cm := &corev1.ConfigMap{} - return LoadResource(stack.Name, "benthos-audit", cm) - }).Should(Succeed()) - patch := client.MergeFrom(stack.DeepCopy()) - stack.Spec.EnableAudit = false - Expect(Patch(stack, patch)).To(Succeed()) - }) - It("should remove the associated config map", func() { - Eventually(func() error { - return LoadResource(stack.Name, "benthos-audit", &corev1.ConfigMap{}) - }).Should(BeNotFound()) - }) - }) - }) }) }) diff --git a/internal/tests/gateway_controller_test.go b/internal/tests/gateway_controller_test.go index f25f894a..32c5a7c3 100644 --- a/internal/tests/gateway_controller_test.go +++ b/internal/tests/gateway_controller_test.go @@ -193,13 +193,12 @@ var _ = Describe("GatewayController", func() { MatchGoldenFile("gateway-controller", "configmap-with-ledger-and-another-service.yaml")) }) }) - Context("With audit enabled", func() { + Context("With a consumer on gateway", func() { var ( brokerNatsDSNSettings *v1beta1.Settings consumer *v1beta1.BrokerConsumer ) BeforeEach(func() { - stack.Spec.EnableAudit = true brokerNatsDSNSettings = settings.New(uuid.NewString(), "broker.dsn", "nats://localhost:1234", stack.Name) consumer = &v1beta1.BrokerConsumer{ ObjectMeta: RandObjectMeta(), diff --git a/internal/tests/ledger_controller_test.go b/internal/tests/ledger_controller_test.go index b1698900..4f777ac6 100644 --- a/internal/tests/ledger_controller_test.go +++ b/internal/tests/ledger_controller_test.go @@ -1,7 +1,6 @@ package tests_test import ( - "github.com/formancehq/go-libs/v2/collectionutils" "github.com/formancehq/operator/api/formance.com/v1beta1" "github.com/formancehq/operator/internal/core" "github.com/formancehq/operator/internal/resources/settings" @@ -10,9 +9,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" - v1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) @@ -170,64 +167,6 @@ var _ = Describe("LedgerController", func() { }) }) }) - Context("With Search enabled", func() { - var search *v1beta1.Search - BeforeEach(func() { - search = &v1beta1.Search{ - ObjectMeta: RandObjectMeta(), - Spec: v1beta1.SearchSpec{ - StackDependency: v1beta1.StackDependency{ - Stack: stack.Name, - }, - }, - } - }) - JustBeforeEach(func() { - Expect(Create(search)).To(Succeed()) - }) - JustAfterEach(func() { - Expect(client.IgnoreNotFound(Delete(search))).To(Succeed()) - }) - checkResourcesExists := func() { - l := &v1beta1.BenthosStreamList{} - Eventually(func(g Gomega) int { - g.Expect(List(l)).To(Succeed()) - return len(collectionutils.Filter(l.Items, func(stream v1beta1.BenthosStream) bool { - return stream.Spec.Stack == stack.Name - })) - }).Should(BeNumerically(">", 0)) - - cronJob := &v1.CronJob{} - Eventually(func() error { - return Get(types.NamespacedName{ - Namespace: stack.Name, - Name: "reindex-ledger", - }, cronJob) - }).Should(BeNil()) - } - It("Should create appropriate resources", checkResourcesExists) - Context("Then when removing search", func() { - JustBeforeEach(func() { - checkResourcesExists() - Expect(Delete(search)).To(Succeed()) - }) - It("Should remove resources", func() { - l := &v1beta1.BenthosStreamList{} - Eventually(func(g Gomega) int { - g.Expect(List(l)).To(Succeed()) - return len(collectionutils.Filter(l.Items, func(stream v1beta1.BenthosStream) bool { - return stream.Spec.Stack == stack.Name - })) - }).Should(BeZero()) - Eventually(func() error { - return Get(types.NamespacedName{ - Namespace: stack.Name, - Name: "reindex-ledger", - }, &v1.CronJob{}) - }).Should(BeNotFound()) - }) - }) - }) Context("With database connection pool settings", func() { var connectionPoolSettings *v1beta1.Settings BeforeEach(func() { diff --git a/internal/tests/payments_controller_test.go b/internal/tests/payments_controller_test.go index 61fafa8a..c3804103 100644 --- a/internal/tests/payments_controller_test.go +++ b/internal/tests/payments_controller_test.go @@ -1,7 +1,6 @@ package tests_test import ( - "github.com/formancehq/go-libs/v2/collectionutils" v1beta1 "github.com/formancehq/operator/api/formance.com/v1beta1" "github.com/formancehq/operator/internal/core" "github.com/formancehq/operator/internal/resources/settings" @@ -10,7 +9,6 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" - "sigs.k8s.io/controller-runtime/pkg/client" ) var _ = Describe("PaymentsController", func() { @@ -72,49 +70,5 @@ var _ = Describe("PaymentsController", func() { Expect(deployment).To(BeControlledBy(payments)) }) }) - Context("With Search enabled", func() { - var search *v1beta1.Search - BeforeEach(func() { - search = &v1beta1.Search{ - ObjectMeta: RandObjectMeta(), - Spec: v1beta1.SearchSpec{ - StackDependency: v1beta1.StackDependency{ - Stack: stack.Name, - }, - }, - } - }) - JustBeforeEach(func() { - Expect(Create(search)).To(Succeed()) - }) - JustAfterEach(func() { - Expect(client.IgnoreNotFound(Delete(search))).To(Succeed()) - }) - checkStreamsExists := func() { - l := &v1beta1.BenthosStreamList{} - Eventually(func(g Gomega) int { - g.Expect(List(l)).To(Succeed()) - return len(collectionutils.Filter(l.Items, func(stream v1beta1.BenthosStream) bool { - return stream.Spec.Stack == stack.Name - })) - }).Should(BeNumerically(">", 0)) - } - It("Should create streams", checkStreamsExists) - Context("Then when removing search", func() { - JustBeforeEach(func() { - checkStreamsExists() - Expect(Delete(search)).To(Succeed()) - }) - It("Should remove streams", func() { - l := &v1beta1.BenthosStreamList{} - Eventually(func(g Gomega) int { - g.Expect(List(l)).To(Succeed()) - return len(collectionutils.Filter(l.Items, func(stream v1beta1.BenthosStream) bool { - return stream.Spec.Stack == stack.Name - })) - }).Should(BeZero()) - }) - }) - }) }) }) diff --git a/internal/tests/searches_controller_test.go b/internal/tests/searches_controller_test.go deleted file mode 100644 index 96ce7d86..00000000 --- a/internal/tests/searches_controller_test.go +++ /dev/null @@ -1,90 +0,0 @@ -package tests_test - -import ( - "fmt" - - "github.com/formancehq/operator/internal/core" - - v1beta1 "github.com/formancehq/operator/api/formance.com/v1beta1" - "github.com/formancehq/operator/internal/resources/settings" - . "github.com/formancehq/operator/internal/tests/internal" - "github.com/google/uuid" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" -) - -var _ = Describe("SearchesController", func() { - Context("When creating a Search object", func() { - var ( - stack *v1beta1.Stack - search *v1beta1.Search - elasticSearchDSNSetting *v1beta1.Settings - brokerDSNSettings *v1beta1.Settings - ) - BeforeEach(func() { - stack = &v1beta1.Stack{ - ObjectMeta: RandObjectMeta(), - Spec: v1beta1.StackSpec{}, - } - search = &v1beta1.Search{ - ObjectMeta: RandObjectMeta(), - Spec: v1beta1.SearchSpec{ - StackDependency: v1beta1.StackDependency{ - Stack: stack.Name, - }, - }, - } - elasticSearchDSNSetting = settings.New(uuid.NewString(), "elasticsearch.dsn", "https://localhost", stack.Name) - brokerDSNSettings = settings.New(uuid.NewString(), "broker.dsn", "nats://localhost:1234", stack.Name) - }) - JustBeforeEach(func() { - Expect(Create(stack)).To(Succeed()) - Expect(Create(elasticSearchDSNSetting)).To(Succeed()) - Expect(Create(brokerDSNSettings)).To(Succeed()) - Expect(Create(search)).To(Succeed()) - }) - AfterEach(func() { - Expect(Delete(search)).To(Succeed()) - Expect(Delete(elasticSearchDSNSetting)).To(Succeed()) - Expect(Delete(stack)).To(Succeed()) - Expect(Delete(brokerDSNSettings)).To(Succeed()) - }) - It("Should create resources", func() { - By("Should add an owner reference on the stack", func() { - Eventually(func(g Gomega) bool { - g.Expect(LoadResource("", search.Name, search)).To(Succeed()) - reference, err := core.HasOwnerReference(TestContext(), stack, search) - g.Expect(err).To(BeNil()) - return reference - }).Should(BeTrue()) - }) - By("Should create a Benthos", func() { - benthos := &v1beta1.Benthos{} - Eventually(func() error { - return LoadResource(stack.Name, fmt.Sprintf("%s-benthos", stack.Name), benthos) - }).Should(Succeed()) - Expect(benthos).To(BeControlledBy(search)) - }) - }) - Context("Then when creating a SearchBatchingConfiguration object", func() { - var searchBatchingCountSettings *v1beta1.Settings - JustBeforeEach(func() { - searchBatchingCountSettings = settings.New(uuid.NewString(), "search.batching", "count=10", stack.Name) - Expect(Create(searchBatchingCountSettings)).To(Succeed()) - }) - JustAfterEach(func() { - Expect(Delete(searchBatchingCountSettings)).To(Succeed()) - }) - It("Should update the Benthos with the new batching configuration", func() { - benthos := &v1beta1.Benthos{} - Eventually(func(g Gomega) v1beta1.Batching { - g.Expect(LoadResource(stack.Name, fmt.Sprintf("%s-benthos", stack.Name), benthos)).To(Succeed()) - g.Expect(benthos.Spec.Batching).NotTo(BeNil()) - return *benthos.Spec.Batching - }).Should(Equal(v1beta1.Batching{ - Count: 10, - })) - }) - }) - }) -})