Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions spec/v2/providers/pubsub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,45 @@
expect(json).to.deep.equal({ hello: "world" });
});

it("should construct a CloudEvent with the correct context and message", async () => {
const publishTime = new Date().toISOString();
const messagePayload = {
messageId: "uuid",
data: Buffer.from(JSON.stringify({ hello: "world" })).toString("base64"),
publishTime,
};
const data: pubsub.MessagePublishedData = {
message: messagePayload as any,
subscription: "projects/aProject/subscriptions/aSubscription",
};
const event: CloudEvent<pubsub.MessagePublishedData> = {
specversion: "1.0",
id: "uuid",
time: publishTime,
type: "google.cloud.pubsub.topic.v1.messagePublished",
source: "//pubsub.googleapis.com/projects/aProject/topics/topic",
data,
};

let destructuredMessage: pubsub.Message<any>;
let context: any;
const func = pubsub.onMessagePublished("topic", (e) => {
({ message: destructuredMessage, context } = e as any);
});

await func(event);

expect(destructuredMessage.json).to.deep.equal({ hello: "world" });
expect(context).to.exist;
expect(context.eventId).to.equal("uuid");
expect(context.timestamp).to.equal(publishTime);
expect(context.eventType).to.equal("google.cloud.pubsub.topic.v1.messagePublished");
expect(context.resource).to.deep.equal({
service: "pubsub.googleapis.com",
name: "projects/aProject/topics/topic",
});
});

// These tests pass if the transpiler works
it("allows desirable syntax", () => {
pubsub.onMessagePublished<string>(
Expand All @@ -193,4 +232,40 @@
(event: CloudEvent<pubsub.MessagePublishedData>) => undefined
);
});


Check failure on line 236 in spec/v2/providers/pubsub.spec.ts

View workflow job for this annotation

GitHub Actions / lint (22.x)

Delete `⏎⏎⏎⏎⏎`




it("should use 'unknown-project' as fallback for resource name", async () => {
delete process.env.GCLOUD_PROJECT;
const publishTime = new Date().toISOString();
const message = {
messageId: "uuid",
data: Buffer.from(JSON.stringify({ hello: "world" })).toString("base64"),
publishTime,
};
const data: pubsub.MessagePublishedData = {
message: message as any,
subscription: "projects/aProject/subscriptions/aSubscription",
};
const event: CloudEvent<pubsub.MessagePublishedData> = {
specversion: "1.0",
id: "uuid",
time: publishTime,
type: "google.cloud.pubsub.topic.v1.messagePublished",
source: "//pubsub.googleapis.com/topics/topic", // Malformed source
data,
};

let receivedEvent: CloudEvent<pubsub.MessagePublishedData<any>>;
const func = pubsub.onMessagePublished("topic", (e) => {
receivedEvent = e;
});

await func(event);

expect(receivedEvent.context.resource.name).to.equal("projects/unknown-project/topics/topic");
});
});
8 changes: 8 additions & 0 deletions src/v2/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* @packageDocumentation
*/

import type { EventContext } from "../v1/cloud-functions";
import { Change } from "../common/change";
import { ManifestEndpoint } from "../runtime/manifest";

Expand Down Expand Up @@ -91,6 +92,13 @@

/** Information about this specific event. */
data: T;

/** V1- compatible context of this event.
*

Check failure on line 97 in src/v2/core.ts

View workflow job for this annotation

GitHub Actions / lint (22.x)

Delete `·`
* This getter is added at runtime for V1 compatibility.
* May be undefined it not set by a provider
*/
readonly context?: EventContext;
}

/**
Expand Down
72 changes: 71 additions & 1 deletion src/v2/providers/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import * as options from "../options";
import { SecretParam } from "../../params/types";
import { withInit } from "../../common/onInit";
import type { EventContext, Resource } from "../../v1/cloud-functions";

/**
* Google Cloud Pub/Sub is a globally distributed message bus that automatically scales as you need it.
Expand Down Expand Up @@ -304,7 +305,9 @@
subscription: string;
};
messagePublishedData.message = new Message(messagePublishedData.message);
return wrapTraceContext(withInit(handler))(raw as CloudEvent<MessagePublishedData<T>>);
const event = raw as CloudEvent<MessagePublishedData<T>>;
attachPubSubContext(event, topic);
return wrapTraceContext(withInit(handler))(event);
};

func.run = handler;
Expand Down Expand Up @@ -353,3 +356,70 @@

return func;
}

/**
* @internal
*
* Adds a v1-style context to the event.
*
* @param event - The event to add the context to.
* @param topic - The topic the event is for.
*/
function attachPubSubContext<T>(event: CloudEvent<MessagePublishedData<T>>, topic: string) {
if ("context" in event && event.context) {
throw new Error("Unexpected context in event.");
}

const resourceName = getResourceName(event, topic);
const resource: Resource = {

Check failure on line 375 in src/v2/providers/pubsub.ts

View workflow job for this annotation

GitHub Actions / lint (22.x)

Delete `⏎`
service: "pubsub.googleapis.com",
name: resourceName ?? "",

Check failure on line 377 in src/v2/providers/pubsub.ts

View workflow job for this annotation

GitHub Actions / lint (22.x)

Delete `⏎`

};

const context: EventContext = {
eventId: event.id,
timestamp: event.time,
resource,
eventType: "google.cloud.pubsub.topic.v1.messagePublished",
params: {}

Check failure on line 386 in src/v2/providers/pubsub.ts

View workflow job for this annotation

GitHub Actions / lint (22.x)

Replace `⏎` with `,`

};

Object.defineProperty(event, "context", {

Check failure on line 390 in src/v2/providers/pubsub.ts

View workflow job for this annotation

GitHub Actions / lint (22.x)

Delete `⏎`

get: () => context,
enumerable: false,
configurable: false,

Check failure on line 394 in src/v2/providers/pubsub.ts

View workflow job for this annotation

GitHub Actions / lint (22.x)

Delete `⏎`

});

Object.defineProperty(event, "message", {
get: () => (event.data as MessagePublishedData<T>).message,

Check failure on line 399 in src/v2/providers/pubsub.ts

View workflow job for this annotation

GitHub Actions / lint (22.x)

This assertion is unnecessary since it does not change the type of the expression
enumerable: false,
configurable: false,
});
}

/**
* @internal
*
* Extracts the resource name from the event source.
*
* @param event - The event to extract the resource name from.
* @param topic - The topic the event is for.
* @returns The resource name.
*/
function getResourceName(event: CloudEvent<MessagePublishedData<any>>, topic: string) {
const match = event.source?.match(/projects\/([^/]+)\/topics\/([^/]+)/);
const project = match?.[1];
const topicName = match?.[2] ?? topic;

if (!project) {
return `projects/unknown-project/topics/${topicName}`;
}

return `projects/${project}/topics/${topicName}`;

Check failure on line 424 in src/v2/providers/pubsub.ts

View workflow job for this annotation

GitHub Actions / lint (22.x)

Replace `⏎}` with `}⏎`
}
Loading