diff --git a/.gitignore b/.gitignore index cef2285c5..8d309fee9 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,5 @@ docs-site/node_modules docs-site/.hugo_build.lock api/pkg/apis/v1alpha1/providers/target/rust/target -Cargo.lock \ No newline at end of file +Cargo.lock +**/mtls-certs/ \ No newline at end of file diff --git a/api/pkg/apis/v1alpha1/providers/target/mqtt/README.md b/api/pkg/apis/v1alpha1/providers/target/mqtt/README.md new file mode 100644 index 000000000..5c3e5580f --- /dev/null +++ b/api/pkg/apis/v1alpha1/providers/target/mqtt/README.md @@ -0,0 +1,221 @@ +### Running MQTT provider tests locally with Mosquitto (Docker) + +This guide shows how to run the unit/integration tests for the MQTT target provider against a local Mosquitto MQTT broker running in Docker. It covers both plain TCP (port 1883) and mutual TLS (mTLS, port 8883). + +Prerequisites +- Docker (or Docker Desktop) +- OpenSSL (for generating test certificates if you want to run the mTLS test) +- Go 1.21+ (matching this repo’s go.mod) + +Repository layout of interest +- `api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go` +- `api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt_test.go` + +By default the tests are gated behind environment variables and will be skipped unless explicitly enabled. + +Plain TCP broker (1883) +1) Create a Mosquitto config that enables a listener and allows anonymous access (required for Mosquitto 2.x): + +Create `mosquitto.conf` with: + +```conf +listener 1883 +protocol mqtt +allow_anonymous true +``` + +2) Start Mosquitto with the config mounted: + +```bash +docker run --rm -it --name mosquitto -p 1883:1883 -v $(pwd)/mosquitto.conf:/mosquitto/config/mosquitto.conf -v $(pwd)/mtls-certs:/certs eclipse-mosquitto:2 +``` + +3) In a separate shell, enable the tests that expect a locally running broker: + +- Linux/macOS (bash/zsh): + +```bash +export TEST_MQTT_LOCAL_ENABLED=1 +export TEST_MQTT=1 +``` + +- Windows PowerShell: + +```powershell +$env:TEST_MQTT_LOCAL_ENABLED = "1" +$env:TEST_MQTT = "1" +``` + +4) Run the tests for the MQTT provider package: + +```bash +cd api +go test ./pkg/apis/v1alpha1/providers/target/mqtt -v +``` + +Notes +- The tests publish and subscribe on topics `coa-request` and `coa-response` by default. +- The tests create a responder client within the test itself; Mosquitto simply routes messages. +- If Mosquitto logs show "Starting in local only mode... Create a configuration file which defines a listener", you did not mount a config; use the steps above. + +mTLS broker (8883) + +The file `mqtt_test.go` contains an optional mTLS integration-style test guarded by environment variables. To run it, stand up Mosquitto with TLS and client-certificate auth and point the test to your CA, client cert, and client key. + +1) Generate a simple CA, server, and client certificates (for local testing only): + +```bash +# Clean slate (optional) +rm -f ca.* server.* client.* *.srl + +# 1) CA +openssl genrsa -out ca.key 2048 +openssl req -x509 -new -nodes -key ca.key -sha256 -days 365 \ + -subj "/CN=test-ca" -out ca.crt + +# 2) Keys & CSR config with extensions +openssl genrsa -out server.key 2048 + +cat > server.cnf <<'EOF' +[ req ] +distinguished_name = dn +prompt = no +req_extensions = v3_req + +[ dn ] +CN = localhost + +[ v3_req ] +subjectAltName = @alt_names +extendedKeyUsage = serverAuth + +[ alt_names ] +DNS.1 = localhost +IP.1 = 127.0.0.1 +EOF + +# CSR with req extensions present +openssl req -new -key server.key -out server.csr -config server.cnf + +# Sign CSR and COPY THE SAME EXTENSIONS into the cert +openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial \ + -out server.crt -days 365 -sha256 \ + -extensions v3_req -extfile server.cnf + +openssl genrsa -out client.key 2048 + +cat > client.cnf <<'EOF' +[ req ] +distinguished_name = dn +prompt = no +req_extensions = v3_req + +[ dn ] +CN = mtls-client + +[ v3_req ] +extendedKeyUsage = clientAuth +EOF + +openssl req -new -key client.key -out client.csr -config client.cnf + +openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial \ + -out client.crt -days 365 -sha256 \ + -extensions v3_req -extfile client.cnf + +chmod 600 server.key client.key +chmod 644 ca.crt server.crt client.crt + +echo "---- SERVER ----" +openssl x509 -in server.crt -noout -text | sed -n '/Subject:/p;/Subject Alternative Name/,+1p;/Extended Key Usage/p' +echo "---- CLIENT ----" +openssl x509 -in client.crt -noout -text | sed -n '/Subject:/p;/Extended Key Usage/p' + +``` + +2) Create a Mosquitto config `mosquitto.conf` next to the certs with both 1883 and 8883 enabled and require client certs on 8883: + +```conf +# Plain TCP for anonymous tests +listener 1883 +protocol mqtt +allow_anonymous true + +# TLS (server-auth only) for TestGet_TLS +listener 8883 +protocol mqtt +cafile /certs/ca.crt +certfile /certs/server.crt +keyfile /certs/server.key +allow_anonymous true + +# mTLS (client certs required) for TestGet_mTLS +listener 8884 +protocol mqtt +cafile /certs/ca.crt +certfile /certs/server.crt +keyfile /certs/server.key +require_certificate true +use_identity_as_username true +allow_anonymous true +``` + +3) Start Mosquitto with the config and certs mounted: + +```bash +# From the folder containing mosquitto.conf and the *.crt/*.key files +docker run --rm -it \ + --name mosquitto \ + -p 1883:1883 \ + -p 8883:8883 \ + -p 8884:8884 \ + -v $(pwd)/mosquitto.conf:/mosquitto/config/mosquitto.conf \ + -v $(pwd)/mtls-certs:/certs \ + eclipse-mosquitto:2 +``` + +4) In a separate shell, export the mTLS test environment variables: + +```bash +export TEST_MQTT_TLS=1 +export TEST_MQTT_TLS_BROKER="ssl://127.0.0.1:8883" +export TEST_MQTT_TLS_CA="$(pwd)/mtls-certs/ca.crt" +export TEST_MQTT_TLS_REQUEST_TOPIC="coa-request" +export TEST_MQTT_TLS_RESPONSE_TOPIC="coa-response" + +export TEST_MQTT_MTLS=1 +export TEST_MQTT_MTLS_BROKER="ssl://127.0.0.1:8884" +export TEST_MQTT_MTLS_CA="$(pwd)/mtls-certs/ca.crt" +export TEST_MQTT_MTLS_CERT="$(pwd)/mtls-certs/client.crt" +export TEST_MQTT_MTLS_KEY="$(pwd)/mtls-certs/client.key" +export TEST_MQTT_MTLS_REQUEST_TOPIC="coa-request" +export TEST_MQTT_MTLS_RESPONSE_TOPIC="coa-response" +``` + +5) Run tests: + +```bash +cd api +go test ./pkg/apis/v1alpha1/providers/target/mqtt -v +``` + +Tips +- If you only want to run the mTLS test, use `-run` to filter: + +```bash +go test ./pkg/apis/v1alpha1/providers/target/mqtt -run TestGet_mTLS -v +``` + +- If you see certificate errors, double-check that: + - `TEST_MQTT_MTLS_CA` points to the CA that signed both the server and client certs. + - `server.crt`/`server.key` match, and CN/SAN includes `localhost` or you connect by the same name you issued. + - Mosquitto is actually listening on 8883 (check container logs). + +Environment variables used by tests +- Plain TCP tests (skip unless set): `TEST_MQTT_LOCAL_ENABLED=1`, `TEST_MQTT=1` +- mTLS test (skip unless set): `TEST_MQTT_MTLS=1`, `TEST_MQTT_MTLS_BROKER`, `TEST_MQTT_MTLS_CA`, `TEST_MQTT_MTLS_CERT`, `TEST_MQTT_MTLS_KEY`, `TEST_MQTT_MTLS_REQUEST_TOPIC`, `TEST_MQTT_MTLS_RESPONSE_TOPIC` + +Troubleshooting +- On Windows with WSL, run Docker Desktop and expose the ports to the host. The tests connect to `127.0.0.1`. +- If ports are in use, stop other MQTT brokers or change the exposed ports in `docker run` and env vars accordingly. + diff --git a/api/pkg/apis/v1alpha1/providers/target/mqtt/mosquitto.conf b/api/pkg/apis/v1alpha1/providers/target/mqtt/mosquitto.conf new file mode 100644 index 000000000..e95134ff3 --- /dev/null +++ b/api/pkg/apis/v1alpha1/providers/target/mqtt/mosquitto.conf @@ -0,0 +1,22 @@ +# Plain TCP for anonymous tests +listener 1883 +protocol mqtt +allow_anonymous true + +# TLS (server-auth only) for TestGet_TLS +listener 8883 +protocol mqtt +cafile /certs/ca.crt +certfile /certs/server.crt +keyfile /certs/server.key +allow_anonymous true + +# mTLS (client certs required) for TestGet_mTLS +listener 8884 +protocol mqtt +cafile /certs/ca.crt +certfile /certs/server.crt +keyfile /certs/server.key +require_certificate true +use_identity_as_username true +allow_anonymous true \ No newline at end of file diff --git a/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go b/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go index 85778dc85..0c52bf2ed 100644 --- a/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go +++ b/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt.go @@ -4,13 +4,43 @@ * SPDX-License-Identifier: MIT */ +/* +Client certificate authentication for secure MQTT connections. + +Basic MQTT settings: +- brokerAddress: MQTT broker URL (required) +- clientID: MQTT client identifier (required) +- requestTopic: Topic for sending requests (required) +- responseTopic: Topic for receiving responses (required) +- timeoutSeconds: Request timeout in seconds (default: 8) +- keepAliveSeconds: Keep-alive interval in seconds (default: 2) +- pingTimeoutSeconds: Ping timeout in seconds (default: 1) + +Authentication settings: +- username: MQTT username for basic authentication +- password: MQTT password for basic authentication + +TLS/Certificate settings: +- useTLS: Enable TLS connection (default: false) +- caCertPath: Path to CA certificate file for server verification +- clientCertPath: Path to client certificate file for mutual TLS authentication +- clientKeyPath: Path to client private key file for mutual TLS authentication +- insecureSkipVerify: Skip TLS certificate verification (default: false, use with caution) + +*/ + package mqtt import ( "context" + "crypto/tls" + "crypto/x509" "encoding/json" + "encoding/pem" "fmt" + "io/ioutil" "strconv" + "strings" "sync" "time" @@ -49,6 +79,14 @@ type MQTTTargetProviderConfig struct { TimeoutSeconds int `json:"timeoutSeconds,omitempty"` KeepAliveSeconds int `json:"keepAliveSeconds,omitempty"` PingTimeoutSeconds int `json:"pingTimeoutSeconds,omitempty"` + // TLS/Certificate configuration fields + UseTLS bool `json:"useTLS,omitempty"` + CACertPath string `json:"caCertPath,omitempty"` + ClientCertPath string `json:"clientCertPath,omitempty"` + ClientKeyPath string `json:"clientKeyPath,omitempty"` + InsecureSkipVerify bool `json:"insecureSkipVerify,omitempty"` + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` } var lock sync.Mutex @@ -121,6 +159,30 @@ func MQTTTargetProviderConfigFromMap(properties map[string]string) (MQTTTargetPr if ret.TimeoutSeconds <= 0 { ret.TimeoutSeconds = 8 } + + // Handle TLS/Certificate configuration + if v, ok := properties["useTLS"]; ok { + ret.UseTLS = v == "true" + } + if v, ok := properties["caCertPath"]; ok { + ret.CACertPath = v + } + if v, ok := properties["clientCertPath"]; ok { + ret.ClientCertPath = v + } + if v, ok := properties["clientKeyPath"]; ok { + ret.ClientKeyPath = v + } + if v, ok := properties["insecureSkipVerify"]; ok { + ret.InsecureSkipVerify = v == "true" + } + if v, ok := properties["username"]; ok { + ret.Username = v + } + if v, ok := properties["password"]; ok { + ret.Password = v + } + return ret, nil } @@ -164,10 +226,44 @@ func (i *MQTTTargetProvider) Init(config providers.IProviderConfig) error { opts.SetKeepAlive(time.Duration(i.Config.KeepAliveSeconds) * time.Second) opts.SetPingTimeout(time.Duration(i.Config.PingTimeoutSeconds) * time.Second) opts.CleanSession = true + + // Configure authentication + if i.Config.Username != "" { + opts.SetUsername(i.Config.Username) + } + if i.Config.Password != "" { + opts.SetPassword(i.Config.Password) + } + + // Configure TLS if enabled + if i.Config.UseTLS { + tlsConfig, err := i.createTLSConfig(ctx) + if err != nil { + sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to create TLS config - %+v", err) + return v1alpha2.NewCOAError(err, "failed to create TLS config", v1alpha2.InternalError) + } + opts.SetTLSConfig(tlsConfig) + } + i.MQTTClient = gmqtt.NewClient(opts) if token := i.MQTTClient.Connect(); token.Wait() && token.Error() != nil { - sLog.ErrorfCtx(ctx, " P (MQTT Target): faild to connect to MQTT broker - %+v", err) - return v1alpha2.NewCOAError(token.Error(), "failed to connect to MQTT broker", v1alpha2.InternalError) + connErr := token.Error() + sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to connect to MQTT broker - %+v", connErr) + + // Provide specific guidance for common TLS errors + if strings.Contains(connErr.Error(), "certificate signed by unknown authority") { + sLog.ErrorfCtx(ctx, " P (MQTT Target): TLS certificate verification failed. Common solutions:") + sLog.ErrorfCtx(ctx, " P (MQTT Target): 1. Set 'caCertPath' to the path of your broker's CA certificate") + sLog.ErrorfCtx(ctx, " P (MQTT Target): 2. Set 'insecureSkipVerify' to 'true' for testing (not recommended for production)") + sLog.ErrorfCtx(ctx, " P (MQTT Target): 3. Ensure your broker certificate is issued by a trusted CA") + } else if strings.Contains(connErr.Error(), "tls:") { + sLog.ErrorfCtx(ctx, " P (MQTT Target): TLS connection error. Check your TLS configuration:") + sLog.ErrorfCtx(ctx, " P (MQTT Target): - Broker address should use 'ssl://' or 'tls://' prefix for TLS connections") + sLog.ErrorfCtx(ctx, " P (MQTT Target): - Verify CA certificate path and format") + sLog.ErrorfCtx(ctx, " P (MQTT Target): - Check client certificate and key paths if using mutual TLS") + } + + return v1alpha2.NewCOAError(connErr, "failed to connect to MQTT broker", v1alpha2.InternalError) } if token := i.MQTTClient.Subscribe(i.Config.ResponseTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { @@ -176,7 +272,7 @@ func (i *MQTTTargetProvider) Init(config providers.IProviderConfig) error { proxyResponse := ProxyResponse{ IsOK: response.State == v1alpha2.OK || response.State == v1alpha2.Accepted, State: response.State, - Payload: response.String(), + Payload: response.Body, } if !proxyResponse.IsOK { @@ -259,7 +355,7 @@ func (i *MQTTTargetProvider) Get(ctx context.Context, deployment model.Deploymen select { case resp := <-responseChan: if resp.IsOK { - data := []byte(resp.Payload.(string)) + data := resp.Payload.([]byte) var ret []model.ComponentSpec err = json.Unmarshal(data, &ret) if err != nil { @@ -411,7 +507,7 @@ func (i *MQTTTargetProvider) Apply(ctx context.Context, deployment model.Deploym select { case resp := <-responseChan: if resp.IsOK { - data := []byte(resp.Payload.(string)) + data := resp.Payload.([]byte) var summary model.SummarySpec err = json.Unmarshal(data, &summary) if err == nil { @@ -517,6 +613,81 @@ func (i *MQTTTargetProvider) Apply(ctx context.Context, deployment model.Deploym return ret, nil } +// createTLSConfig creates a TLS configuration for MQTT client authentication +func (i *MQTTTargetProvider) createTLSConfig(ctx context.Context) (*tls.Config, error) { + tlsConfig := &tls.Config{ + InsecureSkipVerify: i.Config.InsecureSkipVerify, + } + + // Load CA certificate if provided + if i.Config.CACertPath != "" { + sLog.InfofCtx(ctx, " P (MQTT Target): attempting to load CA certificate from %s", i.Config.CACertPath) + + caCert, err := ioutil.ReadFile(i.Config.CACertPath) + if err != nil { + sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to read CA certificate - %+v", err) + return nil, fmt.Errorf("failed to read CA certificate: %w", err) + } + + // Verify the CA cert content + sLog.InfofCtx(ctx, " P (MQTT Target): CA certificate file size: %d bytes", len(caCert)) + if len(caCert) == 0 { + return nil, fmt.Errorf("CA certificate file is empty") + } + + // Validate that the file contains valid PEM data + if !isCertificatePEM(caCert) { + sLog.ErrorfCtx(ctx, " P (MQTT Target): CA certificate file does not contain valid PEM data") + return nil, fmt.Errorf("CA certificate file does not contain valid PEM data") + } + + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM(caCert) { + sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to parse CA certificate - invalid PEM format or corrupted certificate") + return nil, fmt.Errorf("failed to parse CA certificate - invalid PEM format or corrupted certificate") + } + tlsConfig.RootCAs = caCertPool + sLog.InfofCtx(ctx, " P (MQTT Target): successfully loaded CA certificate from %s", i.Config.CACertPath) + } else { + if !i.Config.InsecureSkipVerify { + sLog.WarnCtx(ctx, " P (MQTT Target): no CA certificate path provided - using system CA pool. If connection fails with 'certificate signed by unknown authority', either provide a CA certificate or set insecureSkipVerify to true") + } else { + sLog.InfofCtx(ctx, " P (MQTT Target): TLS certificate verification disabled (insecureSkipVerify=true)") + } + } + + // Load client certificate and key if provided + if i.Config.ClientCertPath != "" && i.Config.ClientKeyPath != "" { + clientCert, err := tls.LoadX509KeyPair(i.Config.ClientCertPath, i.Config.ClientKeyPath) + if err != nil { + sLog.ErrorfCtx(ctx, " P (MQTT Target): failed to load client certificate and key - %+v", err) + return nil, fmt.Errorf("failed to load client certificate and key: %w", err) + } + tlsConfig.Certificates = []tls.Certificate{clientCert} + sLog.InfofCtx(ctx, " P (MQTT Target): loaded client certificate from %s and key from %s", + i.Config.ClientCertPath, i.Config.ClientKeyPath) + } else if i.Config.ClientCertPath != "" || i.Config.ClientKeyPath != "" { + // Both cert and key must be provided together + return nil, fmt.Errorf("both clientCertPath and clientKeyPath must be provided for client certificate authentication") + } + + return tlsConfig, nil +} + +// isCertificatePEM checks if the given data contains valid PEM formatted certificate data +func isCertificatePEM(data []byte) bool { + // Check if the data contains PEM headers + dataStr := string(data) + if !strings.Contains(dataStr, "-----BEGIN CERTIFICATE-----") || + !strings.Contains(dataStr, "-----END CERTIFICATE-----") { + return false + } + + // Try to decode the PEM block + block, _ := pem.Decode(data) + return block != nil && block.Type == "CERTIFICATE" +} + func (*MQTTTargetProvider) GetValidationRule(ctx context.Context) model.ValidationRule { return model.ValidationRule{ AllowSidecar: false, diff --git a/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt_test.go b/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt_test.go index 2910784b2..c928deed1 100644 --- a/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt_test.go +++ b/api/pkg/apis/v1alpha1/providers/target/mqtt/mqtt_test.go @@ -8,8 +8,16 @@ package mqtt import ( "context" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" "encoding/json" + "encoding/pem" + "math/big" "os" + "path/filepath" "testing" "time" @@ -160,32 +168,59 @@ func TestGet(t *testing.T) { assert.Nil(t, err) opts := gmqtt.NewClientOptions().AddBroker(config.BrokerAddress).SetClientID("test-sender") - opts.SetKeepAlive(2 * time.Second) - opts.SetPingTimeout(1 * time.Second) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) c := gmqtt.NewClient(opts) - if token := c.Connect(); token.Wait() && token.Error() != nil { - panic(token.Error()) + // Connect with retry + for attempts := 0; attempts < 10; attempts++ { + tok := c.Connect() + if tok.Wait() && tok.Error() != nil { + if attempts == 9 { + t.Fatalf("failed to connect mqtt responder: %v", tok.Error()) + } + time.Sleep(200 * time.Millisecond) + continue + } + break } - if token := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { - var request v1alpha2.COARequest - err := json.Unmarshal(msg.Payload(), &request) - assert.Nil(t, err) - var response v1alpha2.COAResponse - ret := make([]model.ComponentSpec, 0) - data, _ := json.Marshal(ret) - response.State = v1alpha2.OK - response.Metadata = make(map[string]string) - response.Metadata["request-id"] = request.Metadata["request-id"] - response.Body = data - data, _ = json.Marshal(response) - token := c.Publish(config.ResponseTopic, 0, false, data) //sending COARequest directly doesn't seem to work - token.Wait() - - }); token.Wait() && token.Error() != nil { - if token.Error().Error() != "subscription exists" { - panic(token.Error()) + // Wait until connected + for i := 0; i < 25 && !c.IsConnected(); i++ { + time.Sleep(100 * time.Millisecond) + } + if !c.IsConnected() { + t.Fatalf("mqtt responder not connected") + } + // Subscribe with retry + for attempts := 0; attempts < 10; attempts++ { + tok := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { + var request v1alpha2.COARequest + err := json.Unmarshal(msg.Payload(), &request) + assert.Nil(t, err) + var response v1alpha2.COAResponse + ret := make([]model.ComponentSpec, 0) + data, _ := json.Marshal(ret) + response.State = v1alpha2.OK + response.Metadata = make(map[string]string) + response.Metadata["request-id"] = request.Metadata["request-id"] + response.Body = data + data, _ = json.Marshal(response) + token := c.Publish(config.ResponseTopic, 0, false, data) //sending COARequest directly doesn't seem to work + token.Wait() + + }) + if tok.Wait() && tok.Error() != nil { + if tok.Error().Error() == "subscription exists" { + break + } + if attempts == 9 { + t.Fatalf("failed to subscribe mqtt responder: %v", tok.Error()) + } + time.Sleep(200 * time.Millisecond) + continue } + break } arr, err := provider.Get(context.Background(), model.DeploymentSpec{ @@ -214,30 +249,57 @@ func TestGetBad(t *testing.T) { assert.Nil(t, err) opts := gmqtt.NewClientOptions().AddBroker(config.BrokerAddress).SetClientID("test-sender") - opts.SetKeepAlive(2 * time.Second) - opts.SetPingTimeout(1 * time.Second) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) c := gmqtt.NewClient(opts) - if token := c.Connect(); token.Wait() && token.Error() != nil { - panic(token.Error()) + // Connect with retry + for attempts := 0; attempts < 10; attempts++ { + tok := c.Connect() + if tok.Wait() && tok.Error() != nil { + if attempts == 9 { + t.Fatalf("failed to connect mqtt responder: %v", tok.Error()) + } + time.Sleep(200 * time.Millisecond) + continue + } + break } - if token := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { - var request v1alpha2.COARequest - err := json.Unmarshal(msg.Payload(), &request) - assert.Nil(t, err) - var response v1alpha2.COAResponse - response.State = v1alpha2.InternalError - response.Metadata = make(map[string]string) - response.Metadata["request-id"] = request.Metadata["request-id"] - response.Body = []byte("BAD!!") - data, _ := json.Marshal(response) - token := c.Publish(config.ResponseTopic, 0, false, data) //sending COARequest directly doesn't seem to work - token.Wait() - - }); token.Wait() && token.Error() != nil { - if token.Error().Error() != "subscription exists" { - panic(token.Error()) + // Wait until connected + for i := 0; i < 25 && !c.IsConnected(); i++ { + time.Sleep(100 * time.Millisecond) + } + if !c.IsConnected() { + t.Fatalf("mqtt responder not connected") + } + // Subscribe with retry + for attempts := 0; attempts < 10; attempts++ { + tok := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { + var request v1alpha2.COARequest + err := json.Unmarshal(msg.Payload(), &request) + assert.Nil(t, err) + var response v1alpha2.COAResponse + response.State = v1alpha2.InternalError + response.Metadata = make(map[string]string) + response.Metadata["request-id"] = request.Metadata["request-id"] + response.Body = []byte("didn't get response to Get() call over MQTT") + data, _ := json.Marshal(response) + token := c.Publish(config.ResponseTopic, 0, false, data) //sending COARequest directly doesn't seem to work + token.Wait() + + }) + if tok.Wait() && tok.Error() != nil { + if tok.Error().Error() == "subscription exists" { + break + } + if attempts == 9 { + t.Fatalf("failed to subscribe mqtt responder: %v", tok.Error()) + } + time.Sleep(200 * time.Millisecond) + continue } + break } _, err = provider.Get(context.Background(), model.DeploymentSpec{ @@ -247,7 +309,7 @@ func TestGetBad(t *testing.T) { }, nil) assert.NotNil(t, err) - assert.Equal(t, "Internal Error: BAD!!", err.Error()) + assert.Equal(t, "Internal Error: didn't get response to Get() call over MQTT", err.Error()) } func TestApply(t *testing.T) { testMQTT := os.Getenv("TEST_MQTT") @@ -262,7 +324,7 @@ func TestApply(t *testing.T) { MQTTRequestTopic string = "coa-request" MQTTResponseTopic string = "coa-response" - TestTargetSuccessMessage string = "Success" + TestTargetSuccessMessage string = "" ) config := MQTTTargetProviderConfig{ @@ -277,14 +339,17 @@ func TestApply(t *testing.T) { assert.Nil(t, err) opts := gmqtt.NewClientOptions().AddBroker(config.BrokerAddress).SetClientID("test-sender") - opts.SetKeepAlive(2 * time.Second) - opts.SetPingTimeout(1 * time.Second) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) c := gmqtt.NewClient(opts) + // Connect with simple retry to avoid transient broker readiness issues if token := c.Connect(); token.Wait() && token.Error() != nil { - panic(token.Error()) + t.Fatalf("failed to connect mqtt responder: %v", token.Error()) } - if token := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { + // Subscribe with simple retry, tolerating existing subscription + token := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { var request v1alpha2.COARequest err := json.Unmarshal(msg.Payload(), &request) assert.Nil(t, err) @@ -315,10 +380,9 @@ func TestApply(t *testing.T) { token := c.Publish(config.ResponseTopic, 0, false, data) //sending COARequest directly doesn't seem to work token.Wait() - }); token.Wait() && token.Error() != nil { - if token.Error().Error() != "subscription exists" { - panic(token.Error()) - } + }) + if token.Wait() && token.Error() != nil { + t.Fatalf("failed to subscribe mqtt responder: %v", token.Error()) } deploymentSpec := model.DeploymentSpec{ @@ -408,8 +472,9 @@ func TestApplyBad(t *testing.T) { assert.Nil(t, err) opts := gmqtt.NewClientOptions().AddBroker(config.BrokerAddress).SetClientID("test-sender") - opts.SetKeepAlive(2 * time.Second) - opts.SetPingTimeout(1 * time.Second) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) c := gmqtt.NewClient(opts) if token := c.Connect(); token.Wait() && token.Error() != nil { @@ -468,29 +533,56 @@ func TestARemove(t *testing.T) { assert.Nil(t, err) opts := gmqtt.NewClientOptions().AddBroker(config.BrokerAddress).SetClientID("test-sender") - opts.SetKeepAlive(2 * time.Second) - opts.SetPingTimeout(1 * time.Second) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) c := gmqtt.NewClient(opts) - if token := c.Connect(); token.Wait() && token.Error() != nil { - panic(token.Error()) + // Connect with retry + for attempts := 0; attempts < 10; attempts++ { + tok := c.Connect() + if tok.Wait() && tok.Error() != nil { + if attempts == 9 { + t.Fatalf("failed to connect mqtt responder: %v", tok.Error()) + } + time.Sleep(200 * time.Millisecond) + continue + } + break } - if token := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { - var request v1alpha2.COARequest - err := json.Unmarshal(msg.Payload(), &request) - assert.Nil(t, err) - var response v1alpha2.COAResponse - response.State = v1alpha2.OK - response.Metadata = make(map[string]string) - response.Metadata["request-id"] = request.Metadata["request-id"] - data, _ := json.Marshal(response) - token := c.Publish(config.ResponseTopic, 0, false, data) //sending COARequest directly doesn't seem to work - token.Wait() - - }); token.Wait() && token.Error() != nil { - if token.Error().Error() != "subscription exists" { - panic(token.Error()) + // Wait until connected + for i := 0; i < 25 && !c.IsConnected(); i++ { + time.Sleep(100 * time.Millisecond) + } + if !c.IsConnected() { + t.Fatalf("mqtt responder not connected") + } + // Subscribe with retry + for attempts := 0; attempts < 10; attempts++ { + tok := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { + var request v1alpha2.COARequest + err := json.Unmarshal(msg.Payload(), &request) + assert.Nil(t, err) + var response v1alpha2.COAResponse + response.State = v1alpha2.OK + response.Metadata = make(map[string]string) + response.Metadata["request-id"] = request.Metadata["request-id"] + data, _ := json.Marshal(response) + token := c.Publish(config.ResponseTopic, 0, false, data) //sending COARequest directly doesn't seem to work + token.Wait() + + }) + if tok.Wait() && tok.Error() != nil { + if tok.Error().Error() == "subscription exists" { + break + } + if attempts == 9 { + t.Fatalf("failed to subscribe mqtt responder: %v", tok.Error()) + } + time.Sleep(200 * time.Millisecond) + continue } + break } _, err = provider.Apply(context.Background(), model.DeploymentSpec{ @@ -526,8 +618,9 @@ func TestARemoveBad(t *testing.T) { assert.Nil(t, err) opts := gmqtt.NewClientOptions().AddBroker(config.BrokerAddress).SetClientID("test-sender") - opts.SetKeepAlive(2 * time.Second) - opts.SetPingTimeout(1 * time.Second) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) c := gmqtt.NewClient(opts) if token := c.Connect(); token.Wait() && token.Error() != nil { @@ -585,36 +678,63 @@ func TestGetApply(t *testing.T) { assert.Nil(t, err) opts := gmqtt.NewClientOptions().AddBroker(config.BrokerAddress).SetClientID("test-sender") - opts.SetKeepAlive(2 * time.Second) - opts.SetPingTimeout(1 * time.Second) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) c := gmqtt.NewClient(opts) - if token := c.Connect(); token.Wait() && token.Error() != nil { - panic(token.Error()) - } - if token := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { - var request v1alpha2.COARequest - json.Unmarshal(msg.Payload(), &request) - var response v1alpha2.COAResponse - response.Metadata = make(map[string]string) - response.Metadata["request-id"] = request.Metadata["request-id"] - if request.Method == "GET" { - ret := make([]model.ComponentSpec, 0) - data, _ := json.Marshal(ret) - response.State = v1alpha2.OK - response.Body = data - } else { - response.State = v1alpha2.OK + // Connect with retry + for attempts := 0; attempts < 10; attempts++ { + tok := c.Connect() + if tok.Wait() && tok.Error() != nil { + if attempts == 9 { + t.Fatalf("failed to connect mqtt responder: %v", tok.Error()) + } + time.Sleep(200 * time.Millisecond) + continue } - - data, _ := json.Marshal(response) - token := c.Publish(config.ResponseTopic, 0, false, data) //sending COARequest directly doesn't seem to work - token.Wait() - - }); token.Wait() && token.Error() != nil { - if token.Error().Error() != "subscription exists" { - panic(token.Error()) + break + } + // Wait until connected + for i := 0; i < 25 && !c.IsConnected(); i++ { + time.Sleep(100 * time.Millisecond) + } + if !c.IsConnected() { + t.Fatalf("mqtt responder not connected") + } + // Subscribe with retry + for attempts := 0; attempts < 10; attempts++ { + tok := c.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { + var request v1alpha2.COARequest + json.Unmarshal(msg.Payload(), &request) + var response v1alpha2.COAResponse + response.Metadata = make(map[string]string) + response.Metadata["request-id"] = request.Metadata["request-id"] + if request.Method == "GET" { + ret := make([]model.ComponentSpec, 0) + data, _ := json.Marshal(ret) + response.State = v1alpha2.OK + response.Body = data + } else { + response.State = v1alpha2.OK + } + + data, _ := json.Marshal(response) + token := c.Publish(config.ResponseTopic, 0, false, data) //sending COARequest directly doesn't seem to work + token.Wait() + + }) + if tok.Wait() && tok.Error() != nil { + if tok.Error().Error() == "subscription exists" { + break + } + if attempts == 9 { + t.Fatalf("failed to subscribe mqtt responder: %v", tok.Error()) + } + time.Sleep(200 * time.Millisecond) + continue } + break } arr, err := provider.Get(context.Background(), model.DeploymentSpec{ @@ -664,8 +784,9 @@ func TestLocalApplyGet(t *testing.T) { assert.Nil(t, err) opts := gmqtt.NewClientOptions().AddBroker(config.BrokerAddress).SetClientID("test-sender") - opts.SetKeepAlive(2 * time.Second) - opts.SetPingTimeout(1 * time.Second) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) c := gmqtt.NewClient(opts) if token := c.Connect(); token.Wait() && token.Error() != nil { @@ -744,3 +865,260 @@ func TestConformanceSuite(t *testing.T) { // assert.Nil(t, err) okay if provider is not fully initialized conformance.ConformanceSuite(t, provider) } + +// --- TLS/mTLS unit tests --- + +// generateSelfSignedCert creates a temporary self-signed certificate and key. +// Returns paths to cert and key files and the certificate bytes. +func generateSelfSignedCert(t *testing.T) (string, string, []byte) { + t.Helper() + privKey, err := rsa.GenerateKey(rand.Reader, 2048) + assert.Nil(t, err) + + tmpl := x509.Certificate{ + SerialNumber: bigIntOne(t), + Subject: pkix.Name{CommonName: "localhost"}, + NotBefore: time.Now().Add(-time.Hour), + NotAfter: time.Now().Add(24 * time.Hour), + KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + IsCA: true, + } + + certDER, err := x509.CreateCertificate(rand.Reader, &tmpl, &tmpl, &privKey.PublicKey, privKey) + assert.Nil(t, err) + + // Write cert + certFile, err := os.CreateTemp("", "mtls-cert-*.pem") + assert.Nil(t, err) + defer certFile.Close() + assert.Nil(t, pem.Encode(certFile, &pem.Block{Type: "CERTIFICATE", Bytes: certDER})) + + // Write key + keyFile, err := os.CreateTemp("", "mtls-key-*.pem") + assert.Nil(t, err) + defer keyFile.Close() + assert.Nil(t, pem.Encode(keyFile, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(privKey)})) + + return certFile.Name(), keyFile.Name(), pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER}) +} + +func bigIntOne(t *testing.T) *big.Int { + t.Helper() + return big.NewInt(1) +} + +func TestCreateTLSConfig_InvalidCAPath(t *testing.T) { + provider := &MQTTTargetProvider{Config: MQTTTargetProviderConfig{ + UseTLS: true, + CACertPath: filepath.Join(os.TempDir(), "non-existent-ca.pem"), + }} + _, err := provider.createTLSConfig(context.Background()) + assert.NotNil(t, err) +} + +func TestCreateTLSConfig_InvalidCAPEM(t *testing.T) { + caFile, err := os.CreateTemp("", "invalid-ca-*.pem") + assert.Nil(t, err) + defer os.Remove(caFile.Name()) + defer caFile.Close() + _, _ = caFile.Write([]byte("not a pem")) + + provider := &MQTTTargetProvider{Config: MQTTTargetProviderConfig{ + UseTLS: true, + CACertPath: caFile.Name(), + }} + _, cfgErr := provider.createTLSConfig(context.Background()) + assert.NotNil(t, cfgErr) +} + +func TestCreateTLSConfig_ClientCertWithoutKey(t *testing.T) { + certPath, _, _ := generateSelfSignedCert(t) + defer os.Remove(certPath) + + provider := &MQTTTargetProvider{Config: MQTTTargetProviderConfig{ + UseTLS: true, + ClientCertPath: certPath, + // missing key path + }} + _, err := provider.createTLSConfig(context.Background()) + assert.NotNil(t, err) +} + +func TestCreateTLSConfig_ClientCertAndKey_Success(t *testing.T) { + certPath, keyPath, caBytes := generateSelfSignedCert(t) + defer os.Remove(certPath) + defer os.Remove(keyPath) + + // Use the same self-signed cert as CA to exercise RootCAs path + caFile, err := os.CreateTemp("", "ca-*.pem") + assert.Nil(t, err) + defer os.Remove(caFile.Name()) + defer caFile.Close() + _, _ = caFile.Write(caBytes) + + provider := &MQTTTargetProvider{Config: MQTTTargetProviderConfig{ + UseTLS: true, + CACertPath: caFile.Name(), + ClientCertPath: certPath, + ClientKeyPath: keyPath, + }} + cfg, err := provider.createTLSConfig(context.Background()) + assert.Nil(t, err) + assert.NotNil(t, cfg) + assert.True(t, len(cfg.Certificates) == 1) +} + +// Optional integration-style test to actually run MQTT with mTLS against a live broker. +// Requires environment variables: +// - TEST_MQTT_MTLS=1 (enables the test) +// - TEST_MQTT_MTLS_BROKER (e.g., ssl://127.0.0.1:8883) +// - TEST_MQTT_MTLS_CA, TEST_MQTT_MTLS_CERT, TEST_MQTT_MTLS_KEY (paths to PEM files) +// - TEST_MQTT_MTLS_REQUEST_TOPIC, TEST_MQTT_MTLS_RESPONSE_TOPIC +func TestGet_mTLS(t *testing.T) { + if os.Getenv("TEST_MQTT_MTLS") == "" { + t.Skip("Skipping mTLS test; set TEST_MQTT_MTLS and related env vars to enable") + } + broker := os.Getenv("TEST_MQTT_MTLS_BROKER") + ca := os.Getenv("TEST_MQTT_MTLS_CA") + cert := os.Getenv("TEST_MQTT_MTLS_CERT") + key := os.Getenv("TEST_MQTT_MTLS_KEY") + reqTopic := os.Getenv("TEST_MQTT_MTLS_REQUEST_TOPIC") + respTopic := os.Getenv("TEST_MQTT_MTLS_RESPONSE_TOPIC") + if broker == "" || ca == "" || cert == "" || key == "" || reqTopic == "" || respTopic == "" { + t.Skip("Skipping mTLS test; missing required TEST_MQTT_MTLS_* env vars") + } + + provider := &MQTTTargetProvider{} + err := provider.Init(MQTTTargetProviderConfig{ + Name: "mtls-test", + BrokerAddress: broker, + ClientID: "mtls-provider", + RequestTopic: reqTopic, + ResponseTopic: respTopic, + UseTLS: true, + CACertPath: ca, + ClientCertPath: cert, + ClientKeyPath: key, + }) + assert.Nil(t, err) + + // Separate client to respond to requests, also using mTLS + respTLS := newTLSConfigFromFiles(t, ca, cert, key) + + opts := gmqtt.NewClientOptions().AddBroker(broker).SetClientID("mtls-responder") + opts.SetTLSConfig(respTLS) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) + + c := gmqtt.NewClient(opts) + if token := c.Connect(); token.Wait() && token.Error() != nil { + t.Fatalf("failed to connect mtls responder: %v", token.Error()) + } + if token := c.Subscribe(reqTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { + var request v1alpha2.COARequest + _ = json.Unmarshal(msg.Payload(), &request) + var response v1alpha2.COAResponse + ret := make([]model.ComponentSpec, 0) + data, _ := json.Marshal(ret) + response.State = v1alpha2.OK + response.Metadata = map[string]string{"request-id": request.Metadata["request-id"]} + response.Body = data + data, _ = json.Marshal(response) + tok := c.Publish(respTopic, 0, false, data) + tok.Wait() + }); token.Wait() && token.Error() != nil { + if token.Error().Error() != "subscription exists" { + t.Fatalf("subscribe failed: %v", token.Error()) + } + } + + arr, err := provider.Get(context.Background(), model.DeploymentSpec{Instance: model.InstanceState{Spec: &model.InstanceSpec{}}}, nil) + assert.Nil(t, err) + assert.Equal(t, 0, len(arr)) +} + +// TLS server-auth only (no client cert). Requires a TLS listener without mTLS on the broker. +// Env vars: +// - TEST_MQTT_TLS=1 (enables the test) +// - TEST_MQTT_TLS_BROKER (e.g., ssl://127.0.0.1:8883) +// - TEST_MQTT_TLS_CA (path to broker CA cert) +// - TEST_MQTT_TLS_REQUEST_TOPIC, TEST_MQTT_TLS_RESPONSE_TOPIC +func TestGet_TLS(t *testing.T) { + if os.Getenv("TEST_MQTT_TLS") == "" { + t.Skip("Skipping TLS test; set TEST_MQTT_TLS and related env vars to enable") + } + broker := os.Getenv("TEST_MQTT_TLS_BROKER") + ca := os.Getenv("TEST_MQTT_TLS_CA") + reqTopic := os.Getenv("TEST_MQTT_TLS_REQUEST_TOPIC") + respTopic := os.Getenv("TEST_MQTT_TLS_RESPONSE_TOPIC") + if broker == "" || ca == "" || reqTopic == "" || respTopic == "" { + t.Skip("Skipping TLS test; missing required TEST_MQTT_TLS_* env vars") + } + + provider := &MQTTTargetProvider{} + err := provider.Init(MQTTTargetProviderConfig{ + Name: "tls-test", + BrokerAddress: broker, + ClientID: "tls-provider", + RequestTopic: reqTopic, + ResponseTopic: respTopic, + UseTLS: true, + CACertPath: ca, + }) + assert.Nil(t, err) + + // TLS responder without client certificate + caBytes, err := os.ReadFile(ca) + assert.Nil(t, err) + pool := x509.NewCertPool() + assert.True(t, pool.AppendCertsFromPEM(caBytes)) + tlsCfg := &tls.Config{RootCAs: pool} + + opts := gmqtt.NewClientOptions().AddBroker(broker).SetClientID("tls-responder") + opts.SetTLSConfig(tlsCfg) + opts.SetKeepAlive(30 * time.Second) + opts.SetAutoReconnect(false) + opts.SetPingTimeout(10 * time.Second) + + c := gmqtt.NewClient(opts) + if token := c.Connect(); token.Wait() && token.Error() != nil { + t.Fatalf("failed to connect tls responder: %v", token.Error()) + } + if token := c.Subscribe(reqTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { + var request v1alpha2.COARequest + _ = json.Unmarshal(msg.Payload(), &request) + var response v1alpha2.COAResponse + ret := make([]model.ComponentSpec, 0) + data, _ := json.Marshal(ret) + response.State = v1alpha2.OK + response.Metadata = map[string]string{"request-id": request.Metadata["request-id"]} + response.Body = data + data, _ = json.Marshal(response) + tok := c.Publish(respTopic, 0, false, data) + tok.Wait() + }); token.Wait() && token.Error() != nil { + if token.Error().Error() != "subscription exists" { + t.Fatalf("subscribe failed: %v", token.Error()) + } + } + + arr, err := provider.Get(context.Background(), model.DeploymentSpec{Instance: model.InstanceState{Spec: &model.InstanceSpec{}}}, nil) + assert.Nil(t, err) + assert.Equal(t, 0, len(arr)) +} + +func newTLSConfigFromFiles(t *testing.T, caPath, certPath, keyPath string) *tls.Config { + t.Helper() + caBytes, err := os.ReadFile(caPath) + assert.Nil(t, err) + pool := x509.NewCertPool() + assert.True(t, pool.AppendCertsFromPEM(caBytes)) + + crt, err := tls.LoadX509KeyPair(certPath, keyPath) + assert.Nil(t, err) + + return &tls.Config{RootCAs: pool, Certificates: []tls.Certificate{crt}} +} diff --git a/api/symphony-api-no-k8s-secure-mqtt.json b/api/symphony-api-no-k8s-secure-mqtt.json new file mode 100644 index 000000000..86889f53f --- /dev/null +++ b/api/symphony-api-no-k8s-secure-mqtt.json @@ -0,0 +1,703 @@ +{ + "siteInfo": { + "siteId": "laptop", + "properties": { + "name": "My Laptop", + "address": "1 Main Street", + "city": "Carnation", + "state": "WA", + "zip": "98014", + "country": "USA", + "phone": "425-555-1212", + "version": "0.45.1" + }, + "currentSite": { + "baseUrl": "http://localhost:8082/v1alpha2/", + "username": "admin", + "password": "" + } + }, + "api": { + "pubsub": { + "shared": true, + "provider": { + "type": "providers.pubsub.memory", + "config": {} + } + }, + "keylock": { + "shared": true, + "provider": { + "type": "providers.keylock.memory", + "config": { + "mode": "Global", + "cleanInterval" : 30, + "purgeDuration" : 43200 + } + } + }, + "vendors": [ + { + "type": "vendors.settings", + "managers": [ + { + "name": "config-manager", + "type": "managers.symphony.configs", + "properties": { + "singleton": "true" + }, + "providers": { + "catalog": { + "type": "providers.config.catalog", + "config": { + "user": "admin", + "password": "" + } + } + } + } + ] + }, + { + "type": "vendors.stage", + "route": "stage", + "managers": [ + { + "name": "stage-manager", + "type": "managers.symphony.stage", + "properties": { + "user": "admin", + "password": "", + "providers.volatilestate": "memory" + }, + "providers": { + "memory": { + "type": "providers.state.memory", + "config": {} + } + } + }, + { + "name": "campaigns-manager", + "type": "managers.symphony.campaigns", + "properties": { + "providers.persistentstate": "k8s-state", + "singleton": "true" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + }, + { + "name": "activations-manager", + "type": "managers.symphony.activations", + "properties": { + "providers.persistentstate": "k8s-state", + "singleton": "true" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ], + "properties": { + "wait.user": "admin", + "wait.password": "", + "wait.wait.interval": "15", + "wait.wait.count": "10" + } + }, + { + "type": "vendors.activations", + "route": "activations", + "managers": [ + { + "name": "activations-manager", + "type": "managers.symphony.activations", + "properties": { + "providers.persistentstate": "k8s-state", + "useJobManager": "true", + "singleton": "true" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.backgroundjob", + "route": "backgroundjob", + "loopInterval": 3600, + "managers": [ + { + "name": "activations-cleanup-manager", + "type": "managers.symphony.activationscleanup", + "properties": { + "providers.persistentstate": "k8s-state", + "singleton": "true", + "RetentionDuration": "4320h" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.campaigns", + "route": "campaigns", + "managers": [ + { + "name": "campaigns-manager", + "type": "managers.symphony.campaigns", + "properties": { + "providers.persistentstate": "k8s-state", + "singleton": "true" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.campaigncontainers", + "route": "campaigncontainers", + "managers": [ + { + "name": "campaign-container-manager", + "type": "managers.symphony.campaigncontainers", + "properties": { + "providers.persistentstate": "k8s-state" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.echo", + "route": "greetings", + "managers": [] + }, + { + "type": "vendors.jobs", + "route": "jobs", + "loopInterval": 15, + "managers": [ + { + "name": "jobs-manager", + "type": "managers.symphony.jobs", + "properties": { + "providers.volatilestate": "mem-state", + "providers.persistentstate": "mem-state", + "user": "admin", + "password": "", + "interval": "#15", + "poll.enabled": "true", + "schedule.enabled": "true" + }, + "providers": { + "mem-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.targets", + "loopInterval": 15, + "route": "targets", + "managers": [ + { + "name": "targets-manager", + "type": "managers.symphony.targets", + "properties": { + "providers.persistentstate": "k8s-state" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ], + "properties": { + "useJobManager": "true" + } + }, + { + "type": "vendors.solutions", + "loopInterval": 15, + "route": "solutions", + "managers": [ + { + "name": "solutions-manager", + "type": "managers.symphony.solutions", + "properties": { + "providers.persistentstate": "k8s-state" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.instances", + "loopInterval": 15, + "route": "instances", + "managers": [ + { + "name": "instances-manager", + "type": "managers.symphony.instances", + "properties": { + "providers.persistentstate": "k8s-state" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ], + "properties": { + "useJobManager": "true" + } + }, + { + "type": "vendors.solutioncontainers", + "route": "solutioncontainers", + "managers": [ + { + "name": "solution-container-manager", + "type": "managers.symphony.solutioncontainers", + "properties": { + "providers.persistentstate": "k8s-state" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.devices", + "loopInterval": 15, + "route": "devices", + "managers": [ + { + "name": "devices-manager", + "type": "managers.symphony.devices", + "properties": { + "providers.persistentstate": "k8s-state" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.models", + "loopInterval": 15, + "route": "models", + "managers": [ + { + "name": "models-manager", + "type": "managers.symphony.models", + "properties": { + "providers.persistentstate": "k8s-state" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.skills", + "loopInterval": 15, + "route": "skills", + "managers": [ + { + "name": "skills-manager", + "type": "managers.symphony.skills", + "properties": { + "providers.persistentstate": "k8s-state" + }, + "providers": { + "k8s-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.users", + "loopInterval": 15, + "route": "users", + "properties": { + "test-users": "true" + }, + "managers": [ + { + "name": "users-manager", + "type": "managers.symphony.users", + "properties": { + "providers.volatilestate": "mem-state" + }, + "providers": { + "mem-state": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.solution", + "loopInterval": 15, + "route": "solution", + "managers": [ + { + "name": "solution-manager", + "type": "managers.symphony.solution", + "properties": { + "providers.persistentstate": "mem-state", + "providers.config": "mock-config", + "providers.secret": "mock-secret", + "providers.keylock": "mem-keylock" + }, + "providers": { + "mem-state": { + "type": "providers.state.memory", + "config": {} + }, + "mem-keylock": { + "type": "providers.keylock.memory", + "config": { + "mode" : "Shared" + } + }, + "mock-config": { + "type": "providers.config.mock", + "config": {} + }, + "mock-secret": { + "type": "providers.secret.mock", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.agent", + "loopInterval": 15, + "route": "agent", + "managers": [ + { + "name": "reference-manager", + "type": "managers.symphony.reference", + "properties": { + "providers.reference": "http-reference", + "providers.volatilestate": "memory", + "providers.reporter": "http-reporter" + }, + "providers": { + "memory": { + "type": "providers.state.memory", + "config": {} + }, + "http-reference": { + "type": "providers.reference.http", + "config": {} + }, + "http-reporter": { + "type": "providers.reporter.http", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.federation", + "route": "federation", + "loopInterval": 15, + "managers": [ + { + "name": "trails-manager", + "type": "managers.symphony.trails", + "providers": { + "mock": { + "type": "providers.ledger.mock", + "config": {} + } + } + }, + { + "name": "sites-manager", + "type": "managers.symphony.sites", + "properties": { + "providers.persistentstate": "memory" + }, + "providers": { + "memory": { + "type": "providers.state.memory", + "config": {} + } + } + }, + { + "name": "catalogs-manager", + "type": "managers.symphony.catalogs", + "properties": { + "providers.persistentstate": "memory", + "singleton": "true" + }, + "providers": { + "memory": { + "type": "providers.state.memory", + "config": {} + } + } + }, + { + "name": "staging-manager", + "type": "managers.symphony.staging", + "properties": { + "poll.enabled": "true", + "interval": "#15", + "providers.queue": "memory-queue", + "providers.volatilestate": "memory-state" + }, + "providers": { + "memory-queue": { + "type": "providers.queue.memory", + "config": {} + }, + "memory-state": { + "type": "providers.state.memory", + "config": {} + } + } + }, + { + "name": "sync-manager", + "type": "managers.symphony.sync", + "properties": { + "baseUrl": "http://localhost:8080/v1alpha2/", + "user": "admin", + "password": "", + "interval": "#15", + "sync.enabled": "true" + } + } + ] + }, + { + "type": "vendors.catalogs", + "route": "catalogs", + "managers": [ + { + "name": "catalogs-manager", + "type": "managers.symphony.catalogs", + "properties": { + "providers.persistentstate": "memory", + "singleton": "true" + }, + "providers": { + "memory": { + "type": "providers.state.memory", + "config": {} + }, + "graph": { + "type": "providers.graph.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.catalogcontainers", + "route": "catalogcontainers", + "managers": [ + { + "name": "catalog-container-manager", + "type": "managers.symphony.catalogcontainers", + "properties": { + "providers.persistentstate": "memory", + "singleton": "true" + }, + "providers": { + "memory": { + "type": "providers.state.memory", + "config": {} + } + } + } + ] + }, + { + "type": "vendors.visualization", + "route": "visualization", + "managers": [ + { + "name": "catalogs-manager", + "type": "managers.symphony.catalogs", + "properties": { + "providers.persistentstate": "memory", + "singleton": "true" + }, + "providers": { + "memory": { + "type": "providers.state.memory", + "config": {} + }, + "graph": { + "type": "providers.graph.memory", + "config": {} + } + } + } + ] + } + ] + }, + "bindings": [ + { + "type": "bindings.mqtt", + "config": { + "brokerAddress": "ssl://MQTT_BROKER_ADDRESS:8883", + "clientID": "symphony-client", + "requestTopic": "symphony-request", + "responseTopic": "symphony-response", + "useTLS": "true", + "caCertPath": "/path/to/ca/cert/ca.crt", + "clientCertPath": "/path/to/client/cert/client.crt", + "clientKeyPath": "/path/to/client/key/client.key" + } + }, + { + "type": "bindings.http", + "config": { + "port": 8082, + "pipeline": [ + { + "type": "middleware.http.cors", + "properties": { + "Access-Control-Allow-Headers": "authorization,Content-Type", + "Access-Control-Allow-Credentials": "true", + "Access-Control-Allow-Methods": "HEAD,GET,POST,PUT,DELETE,OPTIONS", + "Access-Control-Allow-Origin": "*" + } + }, + { + "type": "middleware.http.jwt", + "properties": { + "ignorePaths": ["/v1alpha2/users/auth", "/v1alpha2/solution/instances", "/v1alpha2/agent/references", "/v1alpha2/greetings"], + "verifyKey": "SymphonyKey", + "enableRBAC": true, + "roles": [ + { + "role": "administrator", + "claim": "user", + "value": "admin" + }, + { + "role": "reader", + "claim": "user", + "value": "*" + }, + { + "role": "solution-creator", + "claim": "user", + "value": "developer" + }, + { + "role": "target-manager", + "claim": "user", + "value": "device-manager" + }, + { + "role": "operator", + "claim": "user", + "value": "solution-operator" + } + ], + "policy": { + "administrator": { + "items": { + "*": "*" + } + }, + "reader": { + "items": { + "*": "GET" + } + }, + "solution-creator": { + "items": { + "/v1alpha2/solutions": "*" + } + }, + "target-manager": { + "items": { + "/v1alpha2/targets": "*" + } + }, + "solution-operator": { + "items": { + "/v1alpha2/instances": "*" + } + } + } + } + } + ] + } + } + ] +} diff --git a/coa/pkg/apis/v1alpha2/bindings/mqtt/mqtt.go b/coa/pkg/apis/v1alpha2/bindings/mqtt/mqtt.go index 7006eface..be7fb4934 100644 --- a/coa/pkg/apis/v1alpha2/bindings/mqtt/mqtt.go +++ b/coa/pkg/apis/v1alpha2/bindings/mqtt/mqtt.go @@ -8,7 +8,12 @@ package mqtt import ( "context" + "crypto/tls" + "crypto/x509" "encoding/json" + "encoding/pem" + "fmt" + "io/ioutil" "strings" "time" @@ -21,10 +26,21 @@ import ( var log = logger.NewLogger("coa.runtime") type MQTTBindingConfig struct { - BrokerAddress string `json:"brokerAddress"` - ClientID string `json:"clientID"` - RequestTopic string `json:"requestTopic"` - ResponseTopic string `json:"responseTopic"` + BrokerAddress string `json:"brokerAddress"` + ClientID string `json:"clientID"` + RequestTopic string `json:"requestTopic"` + ResponseTopic string `json:"responseTopic"` + TimeoutSeconds int `json:"timeoutSeconds,omitempty"` + KeepAliveSeconds int `json:"keepAliveSeconds,omitempty"` + PingTimeoutSeconds int `json:"pingTimeoutSeconds,omitempty"` + // TLS/Certificate configuration fields + UseTLS string `json:"useTLS,omitempty"` + CACertPath string `json:"caCertPath,omitempty"` + ClientCertPath string `json:"clientCertPath,omitempty"` + ClientKeyPath string `json:"clientKeyPath,omitempty"` + InsecureSkipVerify string `json:"insecureSkipVerify,omitempty"` + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` } type MQTTBinding struct { @@ -44,13 +60,56 @@ func (m *MQTTBinding) Launch(config MQTTBindingConfig, endpoints []v1alpha2.Endp routeTable[route] = endpoint } + // Set default values + if config.KeepAliveSeconds <= 0 { + config.KeepAliveSeconds = 2 + } + if config.PingTimeoutSeconds <= 0 { + config.PingTimeoutSeconds = 1 + } + opts := gmqtt.NewClientOptions().AddBroker(config.BrokerAddress).SetClientID(config.ClientID) - opts.SetKeepAlive(2 * time.Second) - opts.SetPingTimeout(1 * time.Second) + opts.SetKeepAlive(time.Duration(config.KeepAliveSeconds) * time.Second) + opts.SetPingTimeout(time.Duration(config.PingTimeoutSeconds) * time.Second) opts.CleanSession = false + + // Configure authentication + if config.Username != "" { + opts.SetUsername(config.Username) + } + if config.Password != "" { + opts.SetPassword(config.Password) + } + + // Configure TLS if enabled + if config.UseTLS == "true" { + tlsConfig, err := m.createTLSConfig(config) + if err != nil { + log.Errorf("MQTT Binding: failed to create TLS config - %+v", err) + return v1alpha2.NewCOAError(err, "failed to create TLS config", v1alpha2.InternalError) + } + opts.SetTLSConfig(tlsConfig) + } + m.MQTTClient = gmqtt.NewClient(opts) if token := m.MQTTClient.Connect(); token.Wait() && token.Error() != nil { - return v1alpha2.NewCOAError(token.Error(), "failed to connect to MQTT broker", v1alpha2.InternalError) + connErr := token.Error() + log.Errorf("MQTT Binding: failed to connect to MQTT broker - %+v", connErr) + + // Provide specific guidance for common TLS errors + if strings.Contains(connErr.Error(), "certificate signed by unknown authority") { + log.Errorf("MQTT Binding: TLS certificate verification failed. Common solutions:") + log.Errorf("MQTT Binding: 1. Set 'caCertPath' to the path of your broker's CA certificate") + log.Errorf("MQTT Binding: 2. Set 'insecureSkipVerify' to 'true' for testing (not recommended for production)") + log.Errorf("MQTT Binding: 3. Ensure your broker certificate is issued by a trusted CA") + } else if strings.Contains(connErr.Error(), "tls:") { + log.Errorf("MQTT Binding: TLS connection error. Check your TLS configuration:") + log.Errorf("MQTT Binding: - Broker address should use 'ssl://' or 'tls://' prefix for TLS connections") + log.Errorf("MQTT Binding: - Verify CA certificate path and format") + log.Errorf("MQTT Binding: - Check client certificate and key paths if using mutual TLS") + } + + return v1alpha2.NewCOAError(connErr, "failed to connect to MQTT broker", v1alpha2.InternalError) } if token := m.MQTTClient.Subscribe(config.RequestTopic, 0, func(client gmqtt.Client, msg gmqtt.Message) { @@ -99,6 +158,83 @@ func (m *MQTTBinding) Launch(config MQTTBindingConfig, endpoints []v1alpha2.Endp return nil } +// createTLSConfig creates a TLS configuration for MQTT client authentication +func (m *MQTTBinding) createTLSConfig(config MQTTBindingConfig) (*tls.Config, error) { + insecureSkipVerify := config.InsecureSkipVerify == "true" + + tlsConfig := &tls.Config{ + InsecureSkipVerify: insecureSkipVerify, + } + + // Load CA certificate if provided + if config.CACertPath != "" { + log.Infof("MQTT Binding: attempting to load CA certificate from %s", config.CACertPath) + + caCert, err := ioutil.ReadFile(config.CACertPath) + if err != nil { + log.Errorf("MQTT Binding: failed to read CA certificate - %+v", err) + return nil, fmt.Errorf("failed to read CA certificate: %w", err) + } + + // Verify the CA cert content + log.Infof("MQTT Binding: CA certificate file size: %d bytes", len(caCert)) + if len(caCert) == 0 { + return nil, fmt.Errorf("CA certificate file is empty") + } + + // Validate that the file contains valid PEM data + if !isCertificatePEM(caCert) { + log.Errorf("MQTT Binding: CA certificate file does not contain valid PEM data") + return nil, fmt.Errorf("CA certificate file does not contain valid PEM data") + } + + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM(caCert) { + log.Errorf("MQTT Binding: failed to parse CA certificate - invalid PEM format or corrupted certificate") + return nil, fmt.Errorf("failed to parse CA certificate - invalid PEM format or corrupted certificate") + } + tlsConfig.RootCAs = caCertPool + log.Infof("MQTT Binding: successfully loaded CA certificate from %s", config.CACertPath) + } else { + if !insecureSkipVerify { + log.Warn("MQTT Binding: no CA certificate path provided - using system CA pool. If connection fails with 'certificate signed by unknown authority', either provide a CA certificate or set insecureSkipVerify to true") + } else { + log.Infof("MQTT Binding: TLS certificate verification disabled (insecureSkipVerify=true)") + } + } + + // Load client certificate and key if provided + if config.ClientCertPath != "" && config.ClientKeyPath != "" { + clientCert, err := tls.LoadX509KeyPair(config.ClientCertPath, config.ClientKeyPath) + if err != nil { + log.Errorf("MQTT Binding: failed to load client certificate and key - %+v", err) + return nil, fmt.Errorf("failed to load client certificate and key: %w", err) + } + tlsConfig.Certificates = []tls.Certificate{clientCert} + log.Infof("MQTT Binding: loaded client certificate from %s and key from %s", + config.ClientCertPath, config.ClientKeyPath) + } else if config.ClientCertPath != "" || config.ClientKeyPath != "" { + // Both cert and key must be provided together + return nil, fmt.Errorf("both clientCertPath and clientKeyPath must be provided for client certificate authentication") + } + + return tlsConfig, nil +} + +// isCertificatePEM checks if the given data contains valid PEM formatted certificate data +func isCertificatePEM(data []byte) bool { + // Check if the data contains PEM headers + dataStr := string(data) + if !strings.Contains(dataStr, "-----BEGIN CERTIFICATE-----") || + !strings.Contains(dataStr, "-----END CERTIFICATE-----") { + return false + } + + // Try to decode the PEM block + block, _ := pem.Decode(data) + return block != nil && block.Type == "CERTIFICATE" +} + // Shutdown stops the MQTT binding func (m *MQTTBinding) Shutdown(ctx context.Context) error { m.MQTTClient.Disconnect(1000)