Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/integrations/gcp/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ type Metadata struct {
AccessTokenExpiresAt string `json:"accessTokenExpiresAt"`
PubSubTopic string `json:"pubsubTopic,omitempty"`
PubSubSubscription string `json:"pubsubSubscription,omitempty"`
EventsBaseURL string `json:"eventsBaseUrl,omitempty"`
}
140 changes: 140 additions & 0 deletions pkg/integrations/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,14 @@ func (g *GCP) Configuration() []configuration.Field {
func (g *GCP) Components() []core.Component {
return []core.Component{
&compute.CreateVM{},
&gcppubsub.PublishMessage{},
}
}

func (g *GCP) Triggers() []core.Trigger {
return []core.Trigger{
&compute.OnVMInstance{},
&gcppubsub.OnTopicMessage{},
}
}

Expand Down Expand Up @@ -202,6 +204,7 @@ func (g *GCP) syncWIF(ctx core.SyncContext, config Configuration) error {
ClientEmail: "",
AuthMethod: gcpcommon.AuthMethodWIF,
AccessTokenExpiresAt: expiresAt.Format(time.RFC3339),
EventsBaseURL: ctx.WebhooksBaseURL,
}
ctx.Integration.SetMetadata(metadata)

Expand Down Expand Up @@ -241,6 +244,7 @@ func (g *GCP) syncServiceAccountKey(ctx core.SyncContext, config Configuration)
return fmt.Errorf("invalid service account key: %w", err)
}
metadata.AuthMethod = gcpcommon.AuthMethodServiceAccountKey
metadata.EventsBaseURL = ctx.WebhooksBaseURL

if err := ctx.Integration.SetSecret(gcpcommon.SecretNameServiceAccountKey, keyJSON); err != nil {
return fmt.Errorf("failed to store service account key: %w", err)
Expand Down Expand Up @@ -423,6 +427,8 @@ func (g *GCP) ListResources(resourceType string, ctx core.ListResourcesContext)
p := ctx.Parameters

switch resourceType {
case ResourceTypePubSubTopic:
return listPubSubTopicResources(reqCtx, client)
case compute.ResourceTypeRegion:
return compute.ListRegionResources(reqCtx, client)
case compute.ResourceTypeZone:
Expand Down Expand Up @@ -457,6 +463,11 @@ func (g *GCP) ListResources(resourceType string, ctx core.ListResourcesContext)
}

func (g *GCP) HandleRequest(ctx core.HTTPRequestContext) {
if strings.HasSuffix(ctx.Request.URL.Path, "/events/pubsub") {
g.handlePubSubTopicEvent(ctx)
return
}

if strings.HasSuffix(ctx.Request.URL.Path, "/events") {
g.handleEvent(ctx)
return
Expand All @@ -465,6 +476,27 @@ func (g *GCP) HandleRequest(ctx core.HTTPRequestContext) {
ctx.Response.WriteHeader(http.StatusNotFound)
}

const ResourceTypePubSubTopic = "pubsubTopic"

func listPubSubTopicResources(ctx context.Context, client *gcpcommon.Client) ([]core.IntegrationResource, error) {
topics, err := gcppubsub.ListTopics(ctx, client, client.ProjectID())
if err != nil {
return nil, fmt.Errorf("list Pub/Sub topics: %w", err)
}

resources := make([]core.IntegrationResource, 0, len(topics))
for _, t := range topics {
parts := strings.Split(t.Name, "/")
shortName := parts[len(parts)-1]
resources = append(resources, core.IntegrationResource{
Type: ResourceTypePubSubTopic,
Name: shortName,
ID: shortName,
})
}
return resources, nil
}

// AuditLogEvent is the normalized event structure extracted from a Cloud Logging
// audit log entry, used both for subscription pattern matching and as the message
// payload delivered to triggers via OnIntegrationMessage.
Expand Down Expand Up @@ -608,6 +640,114 @@ func (g *GCP) subscriptionApplies(subscription core.IntegrationSubscriptionConte
return true
}

// PubSubTopicEvent represents a message received from a user-specified Pub/Sub topic.
type PubSubTopicEvent struct {
Topic string `json:"topic" mapstructure:"topic"`
MessageID string `json:"messageId" mapstructure:"messageId"`
Data any `json:"data" mapstructure:"data"`
}

// PubSubTopicSubscriptionPattern is the subscription pattern for user-specified topics.
type PubSubTopicSubscriptionPattern struct {
Type string `json:"type" mapstructure:"type"`
Topic string `json:"topic" mapstructure:"topic"`
}

func (g *GCP) handlePubSubTopicEvent(ctx core.HTTPRequestContext) {
token := ctx.Request.URL.Query().Get("token")
if token == "" {
ctx.Response.WriteHeader(http.StatusBadRequest)
return
}

secrets, err := ctx.Integration.GetSecrets()
if err != nil {
ctx.Response.WriteHeader(http.StatusInternalServerError)
return
}

var secret string
for _, s := range secrets {
if s.Name == PubSubSecretName {
secret = string(s.Value)
break
}
}

if token != secret {
ctx.Response.WriteHeader(http.StatusForbidden)
return
}

topic := ctx.Request.URL.Query().Get("topic")
if topic == "" {
ctx.Response.WriteHeader(http.StatusBadRequest)
return
}

body, err := io.ReadAll(ctx.Request.Body)
if err != nil {
ctx.Response.WriteHeader(http.StatusInternalServerError)
return
}

var pushMsg pubsubPushMessage
if err := json.Unmarshal(body, &pushMsg); err != nil {
ctx.Response.WriteHeader(http.StatusBadRequest)
return
}

decoded, err := base64Decode(pushMsg.Message.Data)
if err != nil {
ctx.Logger.Warnf("failed to decode Pub/Sub message data: %v", err)
ctx.Response.WriteHeader(http.StatusOK)
return
}

var data any
if err := json.Unmarshal(decoded, &data); err != nil {
data = string(decoded)
}

event := PubSubTopicEvent{
Topic: topic,
MessageID: pushMsg.Message.MessageID,
Data: data,
}

subscriptions, err := ctx.Integration.ListSubscriptions()
if err != nil {
ctx.Logger.Errorf("error listing subscriptions: %v", err)
ctx.Response.WriteHeader(http.StatusInternalServerError)
return
}

for _, subscription := range subscriptions {
if !g.topicSubscriptionApplies(subscription, topic) {
continue
}

if err := subscription.SendMessage(event); err != nil {
ctx.Logger.Errorf("error sending message to subscription: %v", err)
}
}

ctx.Response.WriteHeader(http.StatusOK)
}

func (g *GCP) topicSubscriptionApplies(subscription core.IntegrationSubscriptionContext, topic string) bool {
var pattern PubSubTopicSubscriptionPattern
if err := mapstructure.Decode(subscription.Configuration(), &pattern); err != nil {
return false
}

if pattern.Type != "pubsub.topic" {
return false
}

return pattern.Topic == topic
}

func base64Decode(s string) ([]byte, error) {
return base64.StdEncoding.DecodeString(s)
}
64 changes: 64 additions & 0 deletions pkg/integrations/gcp/gcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,70 @@ import (
"github.com/stretchr/testify/require"
)

func Test_GCPIntegrationComponentsAndTriggers(t *testing.T) {
g := &GCP{}

components := g.Components()
require.Len(t, components, 2)

componentNames := make([]string, 0, len(components))
for _, c := range components {
componentNames = append(componentNames, c.Name())
}
assert.Contains(t, componentNames, "gcp.createVM")
assert.Contains(t, componentNames, "gcp.pubsub.publishMessage")

triggers := g.Triggers()
require.Len(t, triggers, 2)

triggerNames := make([]string, 0, len(triggers))
for _, tr := range triggers {
triggerNames = append(triggerNames, tr.Name())
}
assert.Contains(t, triggerNames, "gcp.compute.onVMInstance")
assert.Contains(t, triggerNames, "gcp.pubsub.onTopicMessage")
}

func Test_topicSubscriptionApplies(t *testing.T) {
g := &GCP{}

t.Run("matches pubsub.topic pattern with correct topic", func(t *testing.T) {
sub := &mockSubscription{config: map[string]any{
"type": "pubsub.topic",
"topic": "my-topic",
}}
assert.True(t, g.topicSubscriptionApplies(sub, "my-topic"))
})

t.Run("does not match different topic", func(t *testing.T) {
sub := &mockSubscription{config: map[string]any{
"type": "pubsub.topic",
"topic": "my-topic",
}}
assert.False(t, g.topicSubscriptionApplies(sub, "other-topic"))
})

t.Run("does not match audit log pattern", func(t *testing.T) {
sub := &mockSubscription{config: map[string]any{
"serviceName": "compute.googleapis.com",
"methodName": "v1.compute.instances.insert",
}}
assert.False(t, g.topicSubscriptionApplies(sub, "my-topic"))
})
}

type mockSubscription struct {
config any
}

func (m *mockSubscription) Configuration() any {
return m.config
}

func (m *mockSubscription) SendMessage(msg any) error {
return nil
}

func Test_validateAndParseServiceAccountKey(t *testing.T) {
t.Run("valid key returns metadata", func(t *testing.T) {
key := []byte(`{
Expand Down
82 changes: 82 additions & 0 deletions pkg/integrations/gcp/pubsub/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pubsub

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"strings"
Expand Down Expand Up @@ -75,6 +76,87 @@ func DeleteSubscription(ctx context.Context, client *common.Client, projectID, s
return err
}

// --- Pub/Sub Publish ---

type publishRequest struct {
Messages []publishMessage `json:"messages"`
}

type publishMessage struct {
Data string `json:"data"`
}

type publishResponse struct {
MessageIDs []string `json:"messageIds"`
}

func Publish(ctx context.Context, client *common.Client, projectID, topicID, data string) (string, error) {
encoded := base64Encode(data)
url := fmt.Sprintf("%s/projects/%s/topics/%s:publish", pubsubBaseURL, projectID, topicID)
req := publishRequest{
Messages: []publishMessage{
{Data: encoded},
},
}
raw, err := json.Marshal(req)
if err != nil {
return "", fmt.Errorf("marshal publish body: %w", err)
}
resp, err := client.ExecRequest(ctx, "POST", url, strings.NewReader(string(raw)))
if err != nil {
return "", err
}

var pr publishResponse
if err := json.Unmarshal(resp, &pr); err != nil {
return "", fmt.Errorf("parse publish response: %w", err)
}
if len(pr.MessageIDs) == 0 {
return "", fmt.Errorf("no message IDs returned from publish")
}
return pr.MessageIDs[0], nil
}

func base64Encode(s string) string {
return base64.StdEncoding.EncodeToString([]byte(s))
}

// --- Pub/Sub List Topics ---

type topicListResponse struct {
Topics []TopicItem `json:"topics"`
NextPageToken string `json:"nextPageToken"`
}

type TopicItem struct {
Name string `json:"name"`
}

func ListTopics(ctx context.Context, client *common.Client, projectID string) ([]TopicItem, error) {
var all []TopicItem
pageToken := ""
for {
url := fmt.Sprintf("%s/projects/%s/topics?pageSize=100", pubsubBaseURL, projectID)
if pageToken != "" {
url += "&pageToken=" + pageToken
}
resp, err := client.ExecRequest(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
var lr topicListResponse
if err := json.Unmarshal(resp, &lr); err != nil {
return nil, fmt.Errorf("parse topics list: %w", err)
}
all = append(all, lr.Topics...)
if lr.NextPageToken == "" {
break
}
pageToken = lr.NextPageToken
}
return all, nil
}

// --- Cloud Logging Sink ---

type sinkRequest struct {
Expand Down
Loading
Loading