Skip to content

Commit 8f1dcda

Browse files
Implement V1 compat API for PubSub
1 parent 2c2c78d commit 8f1dcda

File tree

3 files changed

+208
-1
lines changed

3 files changed

+208
-1
lines changed

spec/v2/providers/pubsub.spec.ts

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,40 @@ describe("onMessagePublished", () => {
170170
expect(json).to.deep.equal({ hello: "world" });
171171
});
172172

173+
it("should construct a CloudEvent with the correct context", async () => {
174+
const publishTime = new Date().toISOString();
175+
const message = {
176+
messageId: "uuid",
177+
data: Buffer.from(JSON.stringify({ hello: "world" })).toString("base64"),
178+
publishTime,
179+
};
180+
const data: pubsub.MessagePublishedData = {
181+
message: message as any,
182+
subscription: "projects/aProject/subscriptions/aSubscription",
183+
};
184+
const event: CloudEvent<pubsub.MessagePublishedData> = {
185+
specversion: "1.0",
186+
id: "uuid",
187+
time: publishTime,
188+
type: "google.cloud.pubsub.topic.v1.messagePublished",
189+
source: "//pubsub.googleapis.com/projects/aProject/topics/topic",
190+
data,
191+
};
192+
193+
let receivedEvent: CloudEvent<pubsub.MessagePublishedData<any>>;
194+
const func = pubsub.onMessagePublished("topic", (e) => {
195+
receivedEvent = e;
196+
});
197+
198+
await func(event);
199+
200+
expect(receivedEvent.id).to.equal("uuid");
201+
expect(receivedEvent.time).to.equal(publishTime);
202+
expect(receivedEvent.type).to.equal("google.cloud.pubsub.topic.v1.messagePublished");
203+
expect(receivedEvent.source).to.equal("//pubsub.googleapis.com/projects/aProject/topics/topic");
204+
expect(receivedEvent.data.message.json).to.deep.equal({ hello: "world" });
205+
});
206+
173207
// These tests pass if the transpiler works
174208
it("allows desirable syntax", () => {
175209
pubsub.onMessagePublished<string>(
@@ -193,4 +227,103 @@ describe("onMessagePublished", () => {
193227
(event: CloudEvent<pubsub.MessagePublishedData>) => undefined
194228
);
195229
});
230+
231+
it("should not modify a CloudEvent that already has a context", async () => {
232+
const publishTime = new Date().toISOString();
233+
const message = {
234+
messageId: "uuid",
235+
data: Buffer.from(JSON.stringify({ hello: "world" })).toString("base64"),
236+
publishTime,
237+
};
238+
const data: pubsub.MessagePublishedData = {
239+
message: message as any,
240+
subscription: "projects/aProject/subscriptions/aSubscription",
241+
};
242+
const existingContext = {
243+
eventId: "custom-id",
244+
timestamp: publishTime,
245+
eventType: "custom.type",
246+
resource: "custom/resource",
247+
params: {},
248+
};
249+
const event: CloudEvent<pubsub.MessagePublishedData> = {
250+
specversion: "1.0",
251+
id: "uuid",
252+
time: publishTime,
253+
type: "google.cloud.pubsub.topic.v1.messagePublished",
254+
source: "//pubsub.googleapis.com/projects/aProject/topics/topic",
255+
data,
256+
context: existingContext as any,
257+
};
258+
259+
let receivedEvent: CloudEvent<pubsub.MessagePublishedData<any>>;
260+
const func = pubsub.onMessagePublished("topic", (e) => {
261+
receivedEvent = e;
262+
});
263+
264+
await func(event);
265+
266+
expect(receivedEvent.context).to.deep.equal(existingContext);
267+
});
268+
269+
it("should use GCLOUD_PROJECT as fallback for resource name", async () => {
270+
const publishTime = new Date().toISOString();
271+
const message = {
272+
messageId: "uuid",
273+
data: Buffer.from(JSON.stringify({ hello: "world" })).toString("base64"),
274+
publishTime,
275+
};
276+
const data: pubsub.MessagePublishedData = {
277+
message: message as any,
278+
subscription: "projects/aProject/subscriptions/aSubscription",
279+
};
280+
const event: CloudEvent<pubsub.MessagePublishedData> = {
281+
specversion: "1.0",
282+
id: "uuid",
283+
time: publishTime,
284+
type: "google.cloud.pubsub.topic.v1.messagePublished",
285+
source: "//pubsub.googleapis.com/topics/topic", // Malformed source
286+
data,
287+
};
288+
289+
let receivedEvent: CloudEvent<pubsub.MessagePublishedData<any>>;
290+
const func = pubsub.onMessagePublished("topic", (e) => {
291+
receivedEvent = e;
292+
});
293+
294+
await func(event);
295+
296+
expect(receivedEvent.context.resource.name).to.equal("projects/aProject/topics/topic");
297+
});
298+
299+
it("should use 'unknown-project' as fallback for resource name", async () => {
300+
delete process.env.GCLOUD_PROJECT;
301+
const publishTime = new Date().toISOString();
302+
const message = {
303+
messageId: "uuid",
304+
data: Buffer.from(JSON.stringify({ hello: "world" })).toString("base64"),
305+
publishTime,
306+
};
307+
const data: pubsub.MessagePublishedData = {
308+
message: message as any,
309+
subscription: "projects/aProject/subscriptions/aSubscription",
310+
};
311+
const event: CloudEvent<pubsub.MessagePublishedData> = {
312+
specversion: "1.0",
313+
id: "uuid",
314+
time: publishTime,
315+
type: "google.cloud.pubsub.topic.v1.messagePublished",
316+
source: "//pubsub.googleapis.com/topics/topic", // Malformed source
317+
data,
318+
};
319+
320+
let receivedEvent: CloudEvent<pubsub.MessagePublishedData<any>>;
321+
const func = pubsub.onMessagePublished("topic", (e) => {
322+
receivedEvent = e;
323+
});
324+
325+
await func(event);
326+
327+
expect(receivedEvent.context.resource.name).to.equal("project/unknown-project/topics/topic");
328+
});
196329
});

src/v2/core.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
* @packageDocumentation
2626
*/
2727

28+
import type { EventContext } from "../v1/cloud-functions";
2829
import { Change } from "../common/change";
2930
import { ManifestEndpoint } from "../runtime/manifest";
3031

@@ -91,6 +92,13 @@ export interface CloudEvent<T> {
9192

9293
/** Information about this specific event. */
9394
data: T;
95+
96+
/** V1- compatible context of this event.
97+
*
98+
* This getter is added at runtime for V1 compatibility.
99+
* May be undefined it not set by a provider
100+
*/
101+
readonly context?: EventContext;
94102
}
95103

96104
/**

src/v2/providers/pubsub.ts

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import { Expression } from "../../params";
3434
import * as options from "../options";
3535
import { SecretParam } from "../../params/types";
3636
import { withInit } from "../../common/onInit";
37+
import type { EventContext, Resource } from "../../v1/cloud-functions";
3738

3839
/**
3940
* Google Cloud Pub/Sub is a globally distributed message bus that automatically scales as you need it.
@@ -304,7 +305,9 @@ export function onMessagePublished<T = any>(
304305
subscription: string;
305306
};
306307
messagePublishedData.message = new Message(messagePublishedData.message);
307-
return wrapTraceContext(withInit(handler))(raw as CloudEvent<MessagePublishedData<T>>);
308+
const event = raw as CloudEvent<MessagePublishedData<T>>;
309+
attachPubSubContext(event, topic);
310+
return wrapTraceContext(withInit(handler))(event);
308311
};
309312

310313
func.run = handler;
@@ -353,3 +356,66 @@ export function onMessagePublished<T = any>(
353356

354357
return func;
355358
}
359+
360+
/**
361+
* @internal
362+
*
363+
* Adds a v1-style context to the event.
364+
*
365+
* @param event - The event to add the context to.
366+
* @param topic - The topic the event is for.
367+
*/
368+
function attachPubSubContext<T>(event: CloudEvent<MessagePublishedData<T>>, topic: string) {
369+
if ("context" in event && event.context) {
370+
return;
371+
}
372+
373+
const resourceName = getResourceName(event, topic);
374+
const resource: Resource = {
375+
376+
service: "pubsub.googleapis.com",
377+
name: resourceName ?? "",
378+
379+
};
380+
381+
const context: EventContext = {
382+
eventId: event.id,
383+
timestamp: event.time,
384+
resource,
385+
eventType: "google.cloud.pubsub.topic.v1.messagePublished",
386+
params: {}
387+
388+
};
389+
390+
Object.defineProperty(event, "context", {
391+
392+
get: () => context,
393+
enumerable: false,
394+
configurable: false,
395+
396+
});
397+
}
398+
399+
/**
400+
* @internal
401+
*
402+
* Extracts the resource name from the event source.
403+
*
404+
* @param event - The event to extract the resource name from.
405+
* @param topic - The topic the event is for.
406+
* @returns The resource name.
407+
*/
408+
function getResourceName(event: CloudEvent<MessagePublishedData<any>>, topic: string) {
409+
const match = event.source?.match(/projects\/([^/]+)\/topics\/([^/]+)/);
410+
const project =
411+
match?.[1] ?? process.env.GCLOUD_PROJECT ?? process.env.GCLOUD_PROJECT_ID ?? process.env.PROJECT_ID;
412+
413+
const topicName = match?.[2] ?? topic;
414+
415+
if (!project) {
416+
return `project/unknown-project/topics/${topicName}`;
417+
}
418+
419+
return `projects/${project}/topics/${topicName}`;
420+
421+
}

0 commit comments

Comments
 (0)