diff --git a/pkg/integrations/gcp/common/common.go b/pkg/integrations/gcp/common/common.go index 5b5b0cbb6d..ded355bc62 100644 --- a/pkg/integrations/gcp/common/common.go +++ b/pkg/integrations/gcp/common/common.go @@ -20,4 +20,5 @@ type Metadata struct { AccessTokenExpiresAt string `json:"accessTokenExpiresAt"` PubSubTopic string `json:"pubsubTopic,omitempty"` PubSubSubscription string `json:"pubsubSubscription,omitempty"` + EventsBaseURL string `json:"eventsBaseUrl,omitempty"` } diff --git a/pkg/integrations/gcp/gcp.go b/pkg/integrations/gcp/gcp.go index e9eb63f14d..8d55f1b445 100644 --- a/pkg/integrations/gcp/gcp.go +++ b/pkg/integrations/gcp/gcp.go @@ -140,12 +140,14 @@ func (g *GCP) Configuration() []configuration.Field { func (g *GCP) Components() []core.Component { return []core.Component{ &compute.CreateVM{}, + &gcppubsub.PublishMessage{}, } } func (g *GCP) Triggers() []core.Trigger { return []core.Trigger{ &compute.OnVMInstance{}, + &gcppubsub.OnTopicMessage{}, } } @@ -202,6 +204,7 @@ func (g *GCP) syncWIF(ctx core.SyncContext, config Configuration) error { ClientEmail: "", AuthMethod: gcpcommon.AuthMethodWIF, AccessTokenExpiresAt: expiresAt.Format(time.RFC3339), + EventsBaseURL: ctx.WebhooksBaseURL, } ctx.Integration.SetMetadata(metadata) @@ -241,6 +244,7 @@ func (g *GCP) syncServiceAccountKey(ctx core.SyncContext, config Configuration) return fmt.Errorf("invalid service account key: %w", err) } metadata.AuthMethod = gcpcommon.AuthMethodServiceAccountKey + metadata.EventsBaseURL = ctx.WebhooksBaseURL if err := ctx.Integration.SetSecret(gcpcommon.SecretNameServiceAccountKey, keyJSON); err != nil { return fmt.Errorf("failed to store service account key: %w", err) @@ -423,6 +427,8 @@ func (g *GCP) ListResources(resourceType string, ctx core.ListResourcesContext) p := ctx.Parameters switch resourceType { + case ResourceTypePubSubTopic: + return listPubSubTopicResources(reqCtx, client) case compute.ResourceTypeRegion: return compute.ListRegionResources(reqCtx, client) case compute.ResourceTypeZone: @@ -457,6 +463,11 @@ func (g *GCP) ListResources(resourceType string, ctx core.ListResourcesContext) } func (g *GCP) HandleRequest(ctx core.HTTPRequestContext) { + if strings.HasSuffix(ctx.Request.URL.Path, "/events/pubsub") { + g.handlePubSubTopicEvent(ctx) + return + } + if strings.HasSuffix(ctx.Request.URL.Path, "/events") { g.handleEvent(ctx) return @@ -465,6 +476,27 @@ func (g *GCP) HandleRequest(ctx core.HTTPRequestContext) { ctx.Response.WriteHeader(http.StatusNotFound) } +const ResourceTypePubSubTopic = "pubsubTopic" + +func listPubSubTopicResources(ctx context.Context, client *gcpcommon.Client) ([]core.IntegrationResource, error) { + topics, err := gcppubsub.ListTopics(ctx, client, client.ProjectID()) + if err != nil { + return nil, fmt.Errorf("list Pub/Sub topics: %w", err) + } + + resources := make([]core.IntegrationResource, 0, len(topics)) + for _, t := range topics { + parts := strings.Split(t.Name, "/") + shortName := parts[len(parts)-1] + resources = append(resources, core.IntegrationResource{ + Type: ResourceTypePubSubTopic, + Name: shortName, + ID: shortName, + }) + } + return resources, nil +} + // AuditLogEvent is the normalized event structure extracted from a Cloud Logging // audit log entry, used both for subscription pattern matching and as the message // payload delivered to triggers via OnIntegrationMessage. @@ -608,6 +640,114 @@ func (g *GCP) subscriptionApplies(subscription core.IntegrationSubscriptionConte return true } +// PubSubTopicEvent represents a message received from a user-specified Pub/Sub topic. +type PubSubTopicEvent struct { + Topic string `json:"topic" mapstructure:"topic"` + MessageID string `json:"messageId" mapstructure:"messageId"` + Data any `json:"data" mapstructure:"data"` +} + +// PubSubTopicSubscriptionPattern is the subscription pattern for user-specified topics. +type PubSubTopicSubscriptionPattern struct { + Type string `json:"type" mapstructure:"type"` + Topic string `json:"topic" mapstructure:"topic"` +} + +func (g *GCP) handlePubSubTopicEvent(ctx core.HTTPRequestContext) { + token := ctx.Request.URL.Query().Get("token") + if token == "" { + ctx.Response.WriteHeader(http.StatusBadRequest) + return + } + + secrets, err := ctx.Integration.GetSecrets() + if err != nil { + ctx.Response.WriteHeader(http.StatusInternalServerError) + return + } + + var secret string + for _, s := range secrets { + if s.Name == PubSubSecretName { + secret = string(s.Value) + break + } + } + + if token != secret { + ctx.Response.WriteHeader(http.StatusForbidden) + return + } + + topic := ctx.Request.URL.Query().Get("topic") + if topic == "" { + ctx.Response.WriteHeader(http.StatusBadRequest) + return + } + + body, err := io.ReadAll(ctx.Request.Body) + if err != nil { + ctx.Response.WriteHeader(http.StatusInternalServerError) + return + } + + var pushMsg pubsubPushMessage + if err := json.Unmarshal(body, &pushMsg); err != nil { + ctx.Response.WriteHeader(http.StatusBadRequest) + return + } + + decoded, err := base64Decode(pushMsg.Message.Data) + if err != nil { + ctx.Logger.Warnf("failed to decode Pub/Sub message data: %v", err) + ctx.Response.WriteHeader(http.StatusOK) + return + } + + var data any + if err := json.Unmarshal(decoded, &data); err != nil { + data = string(decoded) + } + + event := PubSubTopicEvent{ + Topic: topic, + MessageID: pushMsg.Message.MessageID, + Data: data, + } + + subscriptions, err := ctx.Integration.ListSubscriptions() + if err != nil { + ctx.Logger.Errorf("error listing subscriptions: %v", err) + ctx.Response.WriteHeader(http.StatusInternalServerError) + return + } + + for _, subscription := range subscriptions { + if !g.topicSubscriptionApplies(subscription, topic) { + continue + } + + if err := subscription.SendMessage(event); err != nil { + ctx.Logger.Errorf("error sending message to subscription: %v", err) + } + } + + ctx.Response.WriteHeader(http.StatusOK) +} + +func (g *GCP) topicSubscriptionApplies(subscription core.IntegrationSubscriptionContext, topic string) bool { + var pattern PubSubTopicSubscriptionPattern + if err := mapstructure.Decode(subscription.Configuration(), &pattern); err != nil { + return false + } + + if pattern.Type != "pubsub.topic" { + return false + } + + return pattern.Topic == topic +} + func base64Decode(s string) ([]byte, error) { return base64.StdEncoding.DecodeString(s) } diff --git a/pkg/integrations/gcp/gcp_test.go b/pkg/integrations/gcp/gcp_test.go index f66fb2a309..49f02b96ec 100644 --- a/pkg/integrations/gcp/gcp_test.go +++ b/pkg/integrations/gcp/gcp_test.go @@ -7,6 +7,70 @@ import ( "github.com/stretchr/testify/require" ) +func Test_GCPIntegrationComponentsAndTriggers(t *testing.T) { + g := &GCP{} + + components := g.Components() + require.Len(t, components, 2) + + componentNames := make([]string, 0, len(components)) + for _, c := range components { + componentNames = append(componentNames, c.Name()) + } + assert.Contains(t, componentNames, "gcp.createVM") + assert.Contains(t, componentNames, "gcp.pubsub.publishMessage") + + triggers := g.Triggers() + require.Len(t, triggers, 2) + + triggerNames := make([]string, 0, len(triggers)) + for _, tr := range triggers { + triggerNames = append(triggerNames, tr.Name()) + } + assert.Contains(t, triggerNames, "gcp.compute.onVMInstance") + assert.Contains(t, triggerNames, "gcp.pubsub.onTopicMessage") +} + +func Test_topicSubscriptionApplies(t *testing.T) { + g := &GCP{} + + t.Run("matches pubsub.topic pattern with correct topic", func(t *testing.T) { + sub := &mockSubscription{config: map[string]any{ + "type": "pubsub.topic", + "topic": "my-topic", + }} + assert.True(t, g.topicSubscriptionApplies(sub, "my-topic")) + }) + + t.Run("does not match different topic", func(t *testing.T) { + sub := &mockSubscription{config: map[string]any{ + "type": "pubsub.topic", + "topic": "my-topic", + }} + assert.False(t, g.topicSubscriptionApplies(sub, "other-topic")) + }) + + t.Run("does not match audit log pattern", func(t *testing.T) { + sub := &mockSubscription{config: map[string]any{ + "serviceName": "compute.googleapis.com", + "methodName": "v1.compute.instances.insert", + }} + assert.False(t, g.topicSubscriptionApplies(sub, "my-topic")) + }) +} + +type mockSubscription struct { + config any +} + +func (m *mockSubscription) Configuration() any { + return m.config +} + +func (m *mockSubscription) SendMessage(msg any) error { + return nil +} + func Test_validateAndParseServiceAccountKey(t *testing.T) { t.Run("valid key returns metadata", func(t *testing.T) { key := []byte(`{ diff --git a/pkg/integrations/gcp/pubsub/client.go b/pkg/integrations/gcp/pubsub/client.go index 3bcd58a064..244adbee6a 100644 --- a/pkg/integrations/gcp/pubsub/client.go +++ b/pkg/integrations/gcp/pubsub/client.go @@ -2,6 +2,7 @@ package pubsub import ( "context" + "encoding/base64" "encoding/json" "fmt" "strings" @@ -75,6 +76,87 @@ func DeleteSubscription(ctx context.Context, client *common.Client, projectID, s return err } +// --- Pub/Sub Publish --- + +type publishRequest struct { + Messages []publishMessage `json:"messages"` +} + +type publishMessage struct { + Data string `json:"data"` +} + +type publishResponse struct { + MessageIDs []string `json:"messageIds"` +} + +func Publish(ctx context.Context, client *common.Client, projectID, topicID, data string) (string, error) { + encoded := base64Encode(data) + url := fmt.Sprintf("%s/projects/%s/topics/%s:publish", pubsubBaseURL, projectID, topicID) + req := publishRequest{ + Messages: []publishMessage{ + {Data: encoded}, + }, + } + raw, err := json.Marshal(req) + if err != nil { + return "", fmt.Errorf("marshal publish body: %w", err) + } + resp, err := client.ExecRequest(ctx, "POST", url, strings.NewReader(string(raw))) + if err != nil { + return "", err + } + + var pr publishResponse + if err := json.Unmarshal(resp, &pr); err != nil { + return "", fmt.Errorf("parse publish response: %w", err) + } + if len(pr.MessageIDs) == 0 { + return "", fmt.Errorf("no message IDs returned from publish") + } + return pr.MessageIDs[0], nil +} + +func base64Encode(s string) string { + return base64.StdEncoding.EncodeToString([]byte(s)) +} + +// --- Pub/Sub List Topics --- + +type topicListResponse struct { + Topics []TopicItem `json:"topics"` + NextPageToken string `json:"nextPageToken"` +} + +type TopicItem struct { + Name string `json:"name"` +} + +func ListTopics(ctx context.Context, client *common.Client, projectID string) ([]TopicItem, error) { + var all []TopicItem + pageToken := "" + for { + url := fmt.Sprintf("%s/projects/%s/topics?pageSize=100", pubsubBaseURL, projectID) + if pageToken != "" { + url += "&pageToken=" + pageToken + } + resp, err := client.ExecRequest(ctx, "GET", url, nil) + if err != nil { + return nil, err + } + var lr topicListResponse + if err := json.Unmarshal(resp, &lr); err != nil { + return nil, fmt.Errorf("parse topics list: %w", err) + } + all = append(all, lr.Topics...) + if lr.NextPageToken == "" { + break + } + pageToken = lr.NextPageToken + } + return all, nil +} + // --- Cloud Logging Sink --- type sinkRequest struct { diff --git a/pkg/integrations/gcp/pubsub/on_topic_message.go b/pkg/integrations/gcp/pubsub/on_topic_message.go new file mode 100644 index 0000000000..936e32184b --- /dev/null +++ b/pkg/integrations/gcp/pubsub/on_topic_message.go @@ -0,0 +1,269 @@ +package pubsub + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/mitchellh/mapstructure" + "github.com/superplanehq/superplane/pkg/configuration" + "github.com/superplanehq/superplane/pkg/core" + gcpcommon "github.com/superplanehq/superplane/pkg/integrations/gcp/common" +) + +const EmittedEventType = "gcp.pubsub.topicMessage" + +type OnTopicMessage struct{} + +type OnTopicMessageConfiguration struct { + Topic string `json:"topic" mapstructure:"topic"` +} + +type OnTopicMessageMetadata struct { + Topic string `json:"topic" mapstructure:"topic"` + SubscriptionID string `json:"subscriptionId" mapstructure:"subscriptionId"` + PushSubID string `json:"pushSubId" mapstructure:"pushSubId"` +} + +func (t *OnTopicMessage) Name() string { + return "gcp.pubsub.onTopicMessage" +} + +func (t *OnTopicMessage) Label() string { + return "Pub/Sub • On Topic Message" +} + +func (t *OnTopicMessage) Description() string { + return "Listen to messages published to a Google Cloud Pub/Sub topic" +} + +func (t *OnTopicMessage) Documentation() string { + return `The On Topic Message trigger starts a workflow execution when a message is published to a Google Cloud Pub/Sub topic. + +## Use Cases + +- **Event-driven automation**: React to messages published by other services +- **Data pipeline triggers**: Start workflows when new data arrives in a topic +- **Cross-service orchestration**: Coordinate workflows across microservices using Pub/Sub + +## How it works + +During setup, SuperPlane creates a push subscription on the specified topic. When messages are published to the topic, they are delivered to SuperPlane and trigger workflow executions. + +## Setup + +Ensure the Pub/Sub API is enabled and the integration's service account has ` + "`roles/pubsub.subscriber`" + ` and ` + "`roles/pubsub.viewer`" + ` permissions on the topic.` +} + +func (t *OnTopicMessage) Icon() string { + return "gcp" +} + +func (t *OnTopicMessage) Color() string { + return "gray" +} + +func (t *OnTopicMessage) ExampleData() map[string]any { + return map[string]any{ + "topic": "my-topic", + "messageId": "12345678901234", + "data": map[string]any{"key": "value"}, + } +} + +func (t *OnTopicMessage) Configuration() []configuration.Field { + return []configuration.Field{ + { + Name: "topic", + Label: "Topic", + Type: configuration.FieldTypeString, + Required: true, + Description: "Pub/Sub topic ID to listen to (e.g. my-topic). The project is inferred from the integration.", + Placeholder: "e.g. my-topic", + }, + } +} + +func (t *OnTopicMessage) Setup(ctx core.TriggerContext) error { + if ctx.Integration == nil { + return fmt.Errorf("connect the GCP integration to this trigger to enable Pub/Sub event delivery") + } + + var config OnTopicMessageConfiguration + if err := mapstructure.Decode(ctx.Configuration, &config); err != nil { + return fmt.Errorf("failed to decode configuration: %w", err) + } + + topic := strings.TrimSpace(config.Topic) + if topic == "" { + return fmt.Errorf("topic is required") + } + + var metadata OnTopicMessageMetadata + if err := mapstructure.Decode(ctx.Metadata.Get(), &metadata); err != nil { + return fmt.Errorf("failed to decode metadata: %w", err) + } + + if metadata.Topic == topic && metadata.SubscriptionID != "" && metadata.PushSubID != "" { + return nil + } + + subscriptionID, err := ctx.Integration.Subscribe(TopicSubscriptionPattern(topic)) + if err != nil { + return fmt.Errorf("failed to subscribe: %w", err) + } + + pushSubID := "sp-pubsub-" + sanitizeID(subscriptionID.String()) + + if err := ctx.Metadata.Set(OnTopicMessageMetadata{ + Topic: topic, + SubscriptionID: subscriptionID.String(), + PushSubID: pushSubID, + }); err != nil { + return fmt.Errorf("failed to set metadata: %w", err) + } + + return ctx.Requests.ScheduleActionCall("provisionPushSubscription", map[string]any{ + "topic": topic, + "pushSubId": pushSubID, + }, 2*time.Second) +} + +func (t *OnTopicMessage) Actions() []core.Action { + return []core.Action{ + {Name: "provisionPushSubscription"}, + } +} + +func (t *OnTopicMessage) HandleAction(ctx core.TriggerActionContext) (map[string]any, error) { + if ctx.Name != "provisionPushSubscription" { + return nil, fmt.Errorf("unknown action: %s", ctx.Name) + } + + return t.provisionPushSubscription(ctx) +} + +func (t *OnTopicMessage) provisionPushSubscription(ctx core.TriggerActionContext) (map[string]any, error) { + meta, err := integrationMetadata(ctx.Integration) + if err != nil { + return nil, err + } + + topic, _ := ctx.Parameters["topic"].(string) + if topic == "" { + return nil, fmt.Errorf("topic parameter is required") + } + + pushSubID, _ := ctx.Parameters["pushSubId"].(string) + if pushSubID == "" { + return nil, fmt.Errorf("pushSubId parameter is required") + } + + client, err := gcpcommon.NewClient(ctx.HTTP, ctx.Integration) + if err != nil { + return nil, fmt.Errorf("create GCP client: %w", err) + } + + projectID := client.ProjectID() + + secret, err := eventsSecret(ctx.Integration) + if err != nil { + return nil, fmt.Errorf("get events secret: %w", err) + } + + baseURL := meta.EventsBaseURL + if baseURL == "" { + return nil, fmt.Errorf("events base URL not configured; re-sync the GCP integration") + } + + pushEndpoint := fmt.Sprintf("%s/api/v1/integrations/%s/events/pubsub?token=%s&topic=%s", + baseURL, ctx.Integration.ID(), secret, topic) + + reqCtx := context.Background() + if err := CreatePushSubscription(reqCtx, client, projectID, pushSubID, topic, pushEndpoint); err != nil { + return nil, fmt.Errorf("create push subscription: %w", err) + } + + return nil, nil +} + +func (t *OnTopicMessage) OnIntegrationMessage(ctx core.IntegrationMessageContext) error { + return ctx.Events.Emit(EmittedEventType, ctx.Message) +} + +func (t *OnTopicMessage) Cleanup(ctx core.TriggerContext) error { + var metadata OnTopicMessageMetadata + if err := mapstructure.Decode(ctx.Metadata.Get(), &metadata); err != nil || metadata.PushSubID == "" { + return nil + } + + if ctx.Integration == nil { + return nil + } + + client, err := gcpcommon.NewClient(ctx.HTTP, ctx.Integration) + if err != nil { + ctx.Logger.Warnf("failed to create GCP client for push subscription cleanup: %v", err) + return nil + } + + if err := DeleteSubscription(context.Background(), client, client.ProjectID(), metadata.PushSubID); err != nil { + if !gcpcommon.IsNotFoundError(err) { + ctx.Logger.Warnf("failed to delete push subscription %s: %v", metadata.PushSubID, err) + } + } + + return nil +} + +func (t *OnTopicMessage) HandleWebhook(ctx core.WebhookRequestContext) (int, error) { + return 200, nil +} + +func TopicSubscriptionPattern(topic string) map[string]any { + return map[string]any{ + "type": "pubsub.topic", + "topic": topic, + } +} + +func integrationMetadata(integration core.IntegrationContext) (*gcpcommon.Metadata, error) { + var m gcpcommon.Metadata + if err := mapstructure.Decode(integration.GetMetadata(), &m); err != nil { + return nil, fmt.Errorf("failed to read integration metadata: %w", err) + } + if m.ProjectID == "" { + return nil, fmt.Errorf("integration metadata does not contain a project ID; re-sync the GCP integration") + } + return &m, nil +} + +func eventsSecret(integration core.IntegrationContext) (string, error) { + secrets, err := integration.GetSecrets() + if err != nil { + return "", err + } + + for _, s := range secrets { + if s.Name == "pubsub.events.secret" { + return string(s.Value), nil + } + } + + return "", fmt.Errorf("events secret not found; re-sync the GCP integration") +} + +func sanitizeID(s string) string { + var b strings.Builder + for _, c := range strings.ToLower(s) { + if (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '-' { + b.WriteRune(c) + } + } + result := b.String() + if len(result) > 60 { + result = result[:60] + } + return result +} diff --git a/pkg/integrations/gcp/pubsub/on_topic_message_test.go b/pkg/integrations/gcp/pubsub/on_topic_message_test.go new file mode 100644 index 0000000000..1eaf79eee8 --- /dev/null +++ b/pkg/integrations/gcp/pubsub/on_topic_message_test.go @@ -0,0 +1,70 @@ +package pubsub + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_OnTopicMessageConfiguration(t *testing.T) { + trigger := &OnTopicMessage{} + fields := trigger.Configuration() + require.NotEmpty(t, fields) + + names := make([]string, 0, len(fields)) + for _, f := range fields { + names = append(names, f.Name) + } + assert.Contains(t, names, "topic") +} + +func Test_OnTopicMessageName(t *testing.T) { + trigger := &OnTopicMessage{} + assert.Equal(t, "gcp.pubsub.onTopicMessage", trigger.Name()) +} + +func Test_OnTopicMessageLabel(t *testing.T) { + trigger := &OnTopicMessage{} + assert.Equal(t, "Pub/Sub • On Topic Message", trigger.Label()) +} + +func Test_OnTopicMessageExampleData(t *testing.T) { + trigger := &OnTopicMessage{} + example := trigger.ExampleData() + require.NotNil(t, example) + assert.NotEmpty(t, example["topic"]) + assert.NotEmpty(t, example["messageId"]) +} + +func Test_TopicSubscriptionPattern(t *testing.T) { + pattern := TopicSubscriptionPattern("my-topic") + assert.Equal(t, "pubsub.topic", pattern["type"]) + assert.Equal(t, "my-topic", pattern["topic"]) +} + +func Test_SanitizeID(t *testing.T) { + t.Run("keeps alphanumeric and dashes", func(t *testing.T) { + result := sanitizeID("abc-123-def") + assert.Equal(t, "abc-123-def", result) + }) + + t.Run("lowercases input", func(t *testing.T) { + result := sanitizeID("ABC-DEF") + assert.Equal(t, "abc-def", result) + }) + + t.Run("removes special characters", func(t *testing.T) { + result := sanitizeID("abc!@#$%^&*()def") + assert.Equal(t, "abcdef", result) + }) + + t.Run("truncates to 60 characters", func(t *testing.T) { + long := "a" + for i := 0; i < 70; i++ { + long += "b" + } + result := sanitizeID(long) + assert.Len(t, result, 60) + }) +} diff --git a/pkg/integrations/gcp/pubsub/publish_message.go b/pkg/integrations/gcp/pubsub/publish_message.go new file mode 100644 index 0000000000..c25a74b703 --- /dev/null +++ b/pkg/integrations/gcp/pubsub/publish_message.go @@ -0,0 +1,228 @@ +package pubsub + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/google/uuid" + "github.com/mitchellh/mapstructure" + "github.com/superplanehq/superplane/pkg/configuration" + "github.com/superplanehq/superplane/pkg/core" + gcpcommon "github.com/superplanehq/superplane/pkg/integrations/gcp/common" +) + +const ( + PublishFormatJSON = "json" + PublishFormatText = "text" + + publishMessagePayloadType = "gcp.pubsub.message.published" + publishMessageOutputChannel = "default" +) + +type PublishMessage struct{} + +type PublishMessageConfiguration struct { + Topic string `json:"topic" mapstructure:"topic"` + Format string `json:"format" mapstructure:"format"` + JSON *any `json:"json,omitempty" mapstructure:"json"` + Text *string `json:"text,omitempty" mapstructure:"text"` +} + +func (c *PublishMessage) Name() string { + return "gcp.pubsub.publishMessage" +} + +func (c *PublishMessage) Label() string { + return "Pub/Sub • Publish Message" +} + +func (c *PublishMessage) Description() string { + return "Publish a message to a Google Cloud Pub/Sub topic" +} + +func (c *PublishMessage) Documentation() string { + return `The Publish Message component sends a message to a Google Cloud Pub/Sub topic. + +## Use Cases + +- **Event fan-out**: Broadcast workflow results to multiple subscribers +- **Cross-service communication**: Trigger downstream services through Pub/Sub +- **Data pipelines**: Feed data into streaming or batch processing pipelines + +## Output + +Emits the published message ID returned by the Pub/Sub API.` +} + +func (c *PublishMessage) Icon() string { + return "gcp" +} + +func (c *PublishMessage) Color() string { + return "gray" +} + +func (c *PublishMessage) ExampleOutput() map[string]any { + return map[string]any{ + "messageId": "12345678901234", + "topic": "my-topic", + } +} + +func (c *PublishMessage) OutputChannels(configuration any) []core.OutputChannel { + return []core.OutputChannel{core.DefaultOutputChannel} +} + +func (c *PublishMessage) Configuration() []configuration.Field { + return []configuration.Field{ + { + Name: "topic", + Label: "Topic", + Type: configuration.FieldTypeString, + Required: true, + Description: "Pub/Sub topic ID (e.g. my-topic). The project is inferred from the integration.", + Placeholder: "e.g. my-topic", + }, + { + Name: "format", + Label: "Message Format", + Type: configuration.FieldTypeSelect, + Required: true, + Default: PublishFormatJSON, + TypeOptions: &configuration.TypeOptions{ + Select: &configuration.SelectTypeOptions{ + Options: []configuration.FieldOption{ + {Value: PublishFormatJSON, Label: "JSON"}, + {Value: PublishFormatText, Label: "Text"}, + }, + }, + }, + }, + { + Name: "json", + Label: "JSON Message", + Type: configuration.FieldTypeObject, + Required: false, + Default: map[string]any{}, + VisibilityConditions: []configuration.VisibilityCondition{ + {Field: "topic", Values: []string{"*"}}, + {Field: "format", Values: []string{PublishFormatJSON}}, + }, + }, + { + Name: "text", + Label: "Text Message", + Type: configuration.FieldTypeText, + Required: false, + VisibilityConditions: []configuration.VisibilityCondition{ + {Field: "format", Values: []string{PublishFormatText}}, + }, + }, + } +} + +func (c *PublishMessage) Setup(ctx core.SetupContext) error { + var config PublishMessageConfiguration + if err := mapstructure.Decode(ctx.Configuration, &config); err != nil { + return fmt.Errorf("failed to decode configuration: %w", err) + } + + if strings.TrimSpace(config.Topic) == "" { + return fmt.Errorf("topic is required") + } + + if config.Format == "" { + return fmt.Errorf("message format is required") + } + + if config.Format == PublishFormatJSON && config.JSON == nil { + return fmt.Errorf("JSON message is required") + } + + if config.Format == PublishFormatText && config.Text == nil { + return fmt.Errorf("text message is required") + } + + return nil +} + +func (c *PublishMessage) ProcessQueueItem(ctx core.ProcessQueueContext) (*uuid.UUID, error) { + return ctx.DefaultProcessing() +} + +func (c *PublishMessage) Execute(ctx core.ExecutionContext) error { + var config PublishMessageConfiguration + if err := mapstructure.Decode(ctx.Configuration, &config); err != nil { + return ctx.ExecutionState.Fail("error", fmt.Sprintf("failed to decode configuration: %v", err)) + } + + topic := strings.TrimSpace(config.Topic) + if topic == "" { + return ctx.ExecutionState.Fail("error", "topic is required") + } + + data, err := c.buildMessageData(config) + if err != nil { + return ctx.ExecutionState.Fail("error", fmt.Sprintf("failed to build message data: %v", err)) + } + + client, err := gcpcommon.NewClient(ctx.HTTP, ctx.Integration) + if err != nil { + return ctx.ExecutionState.Fail("error", fmt.Sprintf("failed to create GCP client: %v", err)) + } + + result, err := Publish(context.Background(), client, client.ProjectID(), topic, data) + if err != nil { + return ctx.ExecutionState.Fail("error", fmt.Sprintf("failed to publish message to topic %q: %v", topic, err)) + } + + payload := map[string]any{ + "messageId": result, + "topic": topic, + } + + return ctx.ExecutionState.Emit(publishMessageOutputChannel, publishMessagePayloadType, []any{payload}) +} + +func (c *PublishMessage) Actions() []core.Action { + return nil +} + +func (c *PublishMessage) HandleAction(ctx core.ActionContext) error { + return nil +} + +func (c *PublishMessage) HandleWebhook(ctx core.WebhookRequestContext) (int, error) { + return http.StatusOK, nil +} + +func (c *PublishMessage) Cancel(ctx core.ExecutionContext) error { + return nil +} + +func (c *PublishMessage) Cleanup(ctx core.SetupContext) error { + return nil +} + +func (c *PublishMessage) buildMessageData(config PublishMessageConfiguration) (string, error) { + if config.Format == PublishFormatText { + if config.Text == nil { + return "", fmt.Errorf("text message is required") + } + return *config.Text, nil + } + + if config.JSON == nil { + return "", fmt.Errorf("JSON message is required") + } + + data, err := json.Marshal(config.JSON) + if err != nil { + return "", fmt.Errorf("failed to marshal JSON message: %w", err) + } + + return string(data), nil +} diff --git a/pkg/integrations/gcp/pubsub/publish_message_test.go b/pkg/integrations/gcp/pubsub/publish_message_test.go new file mode 100644 index 0000000000..0c0b3323c9 --- /dev/null +++ b/pkg/integrations/gcp/pubsub/publish_message_test.go @@ -0,0 +1,88 @@ +package pubsub + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_PublishMessageConfiguration(t *testing.T) { + c := &PublishMessage{} + fields := c.Configuration() + require.NotEmpty(t, fields) + + names := make([]string, 0, len(fields)) + for _, f := range fields { + names = append(names, f.Name) + } + assert.Contains(t, names, "topic") + assert.Contains(t, names, "format") + assert.Contains(t, names, "json") + assert.Contains(t, names, "text") +} + +func Test_PublishMessageName(t *testing.T) { + c := &PublishMessage{} + assert.Equal(t, "gcp.pubsub.publishMessage", c.Name()) +} + +func Test_PublishMessageLabel(t *testing.T) { + c := &PublishMessage{} + assert.Equal(t, "Pub/Sub • Publish Message", c.Label()) +} + +func Test_PublishMessageOutputChannels(t *testing.T) { + c := &PublishMessage{} + channels := c.OutputChannels(nil) + require.Len(t, channels, 1) + assert.Equal(t, "default", channels[0].Name) +} + +func Test_PublishMessageBuildMessageData(t *testing.T) { + c := &PublishMessage{} + + t.Run("text format", func(t *testing.T) { + text := "hello world" + data, err := c.buildMessageData(PublishMessageConfiguration{ + Format: PublishFormatText, + Text: &text, + }) + require.NoError(t, err) + assert.Equal(t, "hello world", data) + }) + + t.Run("json format", func(t *testing.T) { + jsonData := any(map[string]any{"key": "value"}) + data, err := c.buildMessageData(PublishMessageConfiguration{ + Format: PublishFormatJSON, + JSON: &jsonData, + }) + require.NoError(t, err) + assert.JSONEq(t, `{"key":"value"}`, data) + }) + + t.Run("text format without text returns error", func(t *testing.T) { + _, err := c.buildMessageData(PublishMessageConfiguration{ + Format: PublishFormatText, + Text: nil, + }) + require.Error(t, err) + }) + + t.Run("json format without json returns error", func(t *testing.T) { + _, err := c.buildMessageData(PublishMessageConfiguration{ + Format: PublishFormatJSON, + JSON: nil, + }) + require.Error(t, err) + }) +} + +func Test_PublishMessageExampleOutput(t *testing.T) { + c := &PublishMessage{} + example := c.ExampleOutput() + require.NotNil(t, example) + assert.NotEmpty(t, example["messageId"]) + assert.NotEmpty(t, example["topic"]) +} diff --git a/web_src/src/pages/workflowv2/mappers/gcp/index.ts b/web_src/src/pages/workflowv2/mappers/gcp/index.ts index 4b0e4f7538..a8ae64fa52 100644 --- a/web_src/src/pages/workflowv2/mappers/gcp/index.ts +++ b/web_src/src/pages/workflowv2/mappers/gcp/index.ts @@ -2,17 +2,22 @@ import { ComponentBaseMapper, CustomFieldRenderer, EventStateRegistry, TriggerRe import { baseMapper } from "./base"; import { buildActionStateRegistry } from "../utils"; import { onVMInstanceTriggerRenderer } from "./on_vm_instance"; +import { publishMessageMapper } from "./publish_message"; +import { onTopicMessageTriggerRenderer } from "./on_topic_message"; export const componentMappers: Record = { createVM: baseMapper, + "pubsub.publishMessage": publishMessageMapper, }; export const triggerRenderers: Record = { onVMInstance: onVMInstanceTriggerRenderer, + "pubsub.onTopicMessage": onTopicMessageTriggerRenderer, }; export const eventStateRegistry: Record = { createVM: buildActionStateRegistry("completed"), + "pubsub.publishMessage": buildActionStateRegistry("published"), }; export const customFieldRenderers: Record = {}; diff --git a/web_src/src/pages/workflowv2/mappers/gcp/on_topic_message.ts b/web_src/src/pages/workflowv2/mappers/gcp/on_topic_message.ts new file mode 100644 index 0000000000..9763099787 --- /dev/null +++ b/web_src/src/pages/workflowv2/mappers/gcp/on_topic_message.ts @@ -0,0 +1,65 @@ +import { getColorClass, getBackgroundColorClass } from "@/utils/colors"; +import { TriggerEventContext, TriggerRenderer, TriggerRendererContext } from "../types"; +import { TriggerProps } from "@/ui/trigger"; +import { flattenObject } from "@/lib/utils"; +import { formatTimeAgo } from "@/utils/date"; +import gcpIcon from "@/assets/icons/integrations/gcp.svg"; + +interface OnTopicMessageMetadata { + topic?: string; +} + +interface OnTopicMessageEventData { + topic?: string; + messageId?: string; + data?: unknown; +} + +export const onTopicMessageTriggerRenderer: TriggerRenderer = { + getTitleAndSubtitle: (context: TriggerEventContext): { title: string; subtitle: string } => { + const data = context.event?.data as OnTopicMessageEventData | undefined; + const topic = data?.topic ?? ""; + const messageId = data?.messageId ?? ""; + const title = topic ? `Message on ${topic}` : "Pub/Sub message"; + const subtitle = messageId ? `ID: ${messageId}` : ""; + return { title, subtitle }; + }, + + getRootEventValues: (context: TriggerEventContext): Record => { + return flattenObject(context.event?.data || {}); + }, + + getTriggerProps: (context: TriggerRendererContext): TriggerProps => { + const { node, definition, lastEvent } = context; + const metadata = node.metadata as unknown as OnTopicMessageMetadata | undefined; + + const metadataItems = []; + if (metadata?.topic) { + metadataItems.push({ icon: "message-square", label: metadata.topic }); + } + + const props: TriggerProps = { + title: node.name || definition.label || "Pub/Sub • On Topic Message", + iconSrc: gcpIcon, + iconSlug: definition.icon || "cloud", + iconColor: getColorClass("black"), + collapsedBackground: getBackgroundColorClass(definition.color ?? "gray"), + metadata: metadataItems, + }; + + if (lastEvent) { + const eventData = lastEvent.data as OnTopicMessageEventData | undefined; + const topic = eventData?.topic ?? ""; + + props.lastEventData = { + title: topic ? `Message on ${topic}` : "Pub/Sub message", + subtitle: formatTimeAgo(new Date(lastEvent.createdAt)), + receivedAt: new Date(lastEvent.createdAt), + state: "triggered", + eventId: lastEvent.id, + }; + } + + return props; + }, +}; diff --git a/web_src/src/pages/workflowv2/mappers/gcp/publish_message.ts b/web_src/src/pages/workflowv2/mappers/gcp/publish_message.ts new file mode 100644 index 0000000000..1c2d9c2f65 --- /dev/null +++ b/web_src/src/pages/workflowv2/mappers/gcp/publish_message.ts @@ -0,0 +1,87 @@ +import { ComponentBaseProps, EventSection } from "@/ui/componentBase"; +import { getState, getStateMap, getTriggerRenderer } from ".."; +import { + ComponentBaseContext, + ComponentBaseMapper, + ExecutionDetailsContext, + ExecutionInfo, + NodeInfo, + OutputPayload, + SubtitleContext, +} from "../types"; +import gcpIcon from "@/assets/icons/integrations/gcp.svg"; +import { formatTimeAgo } from "@/utils/date"; + +interface PublishMessageConfiguration { + topic?: string; + format?: string; +} + +export const publishMessageMapper: ComponentBaseMapper = { + props(context: ComponentBaseContext): ComponentBaseProps { + const lastExecution = context.lastExecutions.length > 0 ? context.lastExecutions[0] : null; + const componentName = "gcp.pubsub.publishMessage"; + const config = context.node.configuration as unknown as PublishMessageConfiguration | undefined; + + const metadata = []; + if (config?.topic) { + metadata.push({ icon: "message-square", label: config.topic }); + } + + return { + iconSrc: gcpIcon, + iconSlug: context.componentDefinition?.icon ?? "cloud", + collapsedBackground: "bg-white", + collapsed: context.node.isCollapsed, + title: context.node.name || context.componentDefinition?.label || "Pub/Sub • Publish Message", + metadata, + eventSections: lastExecution ? buildEventSections(context.nodes, lastExecution, componentName) : undefined, + includeEmptyState: !lastExecution, + eventStateMap: getStateMap(componentName), + }; + }, + + getExecutionDetails(context: ExecutionDetailsContext): Record { + const details: Record = {}; + const outputs = context.execution.outputs as { default?: OutputPayload[] } | undefined; + const payload = outputs?.default?.[0]; + const data = payload?.data as { topic?: string; messageId?: string } | undefined; + + if (context.execution.createdAt) { + details["Started At"] = new Date(context.execution.createdAt).toLocaleString(); + } + + if (data?.topic) { + details["Topic"] = data.topic; + } + + if (data?.messageId) { + details["Message ID"] = data.messageId; + } + + return details; + }, + + subtitle(context: SubtitleContext): string { + const timestamp = context.execution.updatedAt || context.execution.createdAt; + return timestamp ? formatTimeAgo(new Date(timestamp)) : ""; + }, +}; + +function buildEventSections(nodes: NodeInfo[], execution: ExecutionInfo, componentName: string): EventSection[] { + const rootTriggerNode = nodes.find((n) => n.id === execution.rootEvent?.nodeId); + const rootTriggerRenderer = getTriggerRenderer(rootTriggerNode?.componentName!); + const { title } = rootTriggerRenderer.getTitleAndSubtitle({ event: execution.rootEvent }); + const subtitleTimestamp = execution.updatedAt || execution.createdAt; + const eventSubtitle = subtitleTimestamp ? formatTimeAgo(new Date(subtitleTimestamp)) : ""; + + return [ + { + receivedAt: new Date(execution.createdAt!), + eventTitle: title, + eventSubtitle, + eventState: getState(componentName)(execution), + eventId: execution.rootEvent!.id!, + }, + ]; +}