Skip to content

Commit 04d51a1

Browse files
committed
fix: prevent synced invoices from getting updated by outdated webhooks
1 parent 87b309a commit 04d51a1

File tree

7 files changed

+199
-14
lines changed

7 files changed

+199
-14
lines changed

apps/node-fastify/src/test/test-utils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ export async function fetchInvoicesFromDatabase(postgresClient: PostgresClient,
1111

1212
const placeholders = invoiceIds.map((_, index) => `$${index + 1}`).join(',');
1313
const result = await postgresClient.query(
14-
`SELECT id, invoice_number, customer_id, total, currency, status, updated_at FROM orb.invoices WHERE id IN (${placeholders})`,
14+
`SELECT id, invoice_number, customer_id, total, currency, status, updated_at, last_synced_at FROM orb.invoices WHERE id IN (${placeholders})`,
1515
invoiceIds
1616
);
1717
return result.rows;

apps/node-fastify/src/test/webhooks.test.ts

Lines changed: 93 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ describe('POST /webhooks', () => {
182182

183183
const webhookData = JSON.parse(payload);
184184
webhookData.type = webhookType;
185+
186+
const webhookTimestamp = new Date('2025-01-15T10:30:00.000Z').toISOString();
187+
webhookData.created_at = webhookTimestamp;
188+
185189
payload = JSON.stringify(webhookData);
186190

187191
const invoiceId = webhookData.invoice.id;
@@ -200,6 +204,10 @@ describe('POST /webhooks', () => {
200204
expect(invoice.total).toBe(webhookData.invoice.amount_due);
201205
expect(invoice.currency).toBe(webhookData.invoice.currency);
202206
expect(invoice.status).toBe(webhookData.invoice.status);
207+
208+
// Verify that last_synced_at gets set to the webhook timestamp for new invoices
209+
expect(invoice.last_synced_at).toBeDefined();
210+
expect(new Date(invoice.last_synced_at).toISOString()).toBe(webhookTimestamp);
203211
});
204212

205213
it('should update an existing invoice when webhook arrives', async () => {
@@ -229,8 +237,14 @@ describe('POST /webhooks', () => {
229237
status: initialStatus,
230238
};
231239

232-
// Store the initial invoice in the database
233-
await syncInvoices(postgresClient, [initialInvoiceData]);
240+
// Store the initial invoice in the database with an old timestamp
241+
const oldTimestamp = new Date('2025-01-10T10:00:00.000Z').toISOString();
242+
await syncInvoices(postgresClient, [initialInvoiceData], oldTimestamp);
243+
244+
// Verify the initial invoice was created with the old timestamp
245+
const [initialInvoice] = await fetchInvoicesFromDatabase(orbSync.postgresClient, [invoiceId]);
246+
expect(initialInvoice).toBeDefined();
247+
expect(new Date(initialInvoice.last_synced_at).toISOString()).toBe(oldTimestamp);
234248

235249
// Now update the webhook data with new values
236250
const updatedAmount = 1500;
@@ -240,6 +254,10 @@ describe('POST /webhooks', () => {
240254
webhookData.invoice.status = updatedStatus;
241255
webhookData.invoice.paid_at = new Date().toISOString();
242256

257+
// Set a newer webhook timestamp that should trigger an update
258+
const newWebhookTimestamp = new Date('2025-01-15T10:30:00.000Z').toISOString();
259+
webhookData.created_at = newWebhookTimestamp;
260+
243261
payload = JSON.stringify(webhookData);
244262

245263
// Send the webhook with updated data
@@ -252,13 +270,80 @@ describe('POST /webhooks', () => {
252270
});
253271

254272
// Verify that the invoice was updated in the database
255-
const [invoice] = await fetchInvoicesFromDatabase(orbSync.postgresClient, [invoiceId]);
256-
expect(invoice).toBeDefined();
257-
expect(Number(invoice.total)).toBe(updatedAmount);
258-
expect(invoice.status).toBe(updatedStatus);
273+
const [updatedInvoice] = await fetchInvoicesFromDatabase(orbSync.postgresClient, [invoiceId]);
274+
expect(updatedInvoice).toBeDefined();
275+
expect(Number(updatedInvoice.total)).toBe(updatedAmount);
276+
expect(updatedInvoice.status).toBe(updatedStatus);
259277

260278
// Verify that the updated_at timestamp was changed
261-
expect(invoice.updated_at).toBeDefined();
262-
expect(new Date(invoice.updated_at).getTime()).toBeGreaterThan(new Date(webhookData.invoice.created_at).getTime());
279+
expect(updatedInvoice.updated_at).toBeDefined();
280+
expect(new Date(updatedInvoice.updated_at).getTime()).toBeGreaterThan(new Date(webhookData.invoice.created_at).getTime());
281+
282+
// Verify that last_synced_at was updated to the new webhook timestamp
283+
expect(updatedInvoice.last_synced_at).toBeDefined();
284+
expect(new Date(updatedInvoice.last_synced_at).toISOString()).toBe(newWebhookTimestamp);
285+
286+
// Verify that the new timestamp is newer than the old timestamp
287+
expect(new Date(updatedInvoice.last_synced_at).getTime()).toBeGreaterThan(new Date(oldTimestamp).getTime());
288+
});
289+
290+
it('should NOT update invoice when webhook timestamp is older than last_synced_at', async () => {
291+
let payload = loadWebhookPayload('invoice');
292+
const postgresClient = orbSync.postgresClient;
293+
294+
const webhookData = JSON.parse(payload);
295+
const invoiceId = webhookData.invoice.id;
296+
await deleteTestData(orbSync.postgresClient, 'invoices', [invoiceId]);
297+
298+
webhookData.type = 'invoice.payment_succeeded';
299+
300+
// Insert an invoice with a "new" timestamp and known values
301+
const originalAmount = 2000;
302+
const originalStatus = 'paid';
303+
webhookData.invoice.amount_due = originalAmount.toString();
304+
webhookData.invoice.total = originalAmount.toString();
305+
webhookData.invoice.status = originalStatus;
306+
307+
const newTimestamp = new Date('2025-01-15T10:30:00.000Z').toISOString();
308+
const initialInvoiceData = {
309+
...webhookData.invoice,
310+
amount_due: originalAmount.toString(),
311+
total: originalAmount.toString(),
312+
status: originalStatus,
313+
};
314+
await syncInvoices(postgresClient, [initialInvoiceData], newTimestamp);
315+
316+
// Verify the invoice was created with the new timestamp
317+
const [initialInvoice] = await fetchInvoicesFromDatabase(orbSync.postgresClient, [invoiceId]);
318+
expect(initialInvoice).toBeDefined();
319+
expect(Number(initialInvoice.total)).toBe(originalAmount);
320+
expect(initialInvoice.status).toBe(originalStatus);
321+
expect(new Date(initialInvoice.last_synced_at).toISOString()).toBe(newTimestamp);
322+
323+
// Now attempt to update with an older webhook timestamp and different values
324+
const outdatedAmount = 1000;
325+
const outdatedStatus = 'pending';
326+
webhookData.invoice.amount_due = outdatedAmount.toString();
327+
webhookData.invoice.total = outdatedAmount.toString();
328+
webhookData.invoice.status = outdatedStatus;
329+
webhookData.invoice.paid_at = undefined;
330+
const oldWebhookTimestamp = new Date('2025-01-10T10:00:00.000Z').toISOString();
331+
webhookData.created_at = oldWebhookTimestamp;
332+
payload = JSON.stringify(webhookData);
333+
334+
// Send the webhook with the outdated data
335+
const response = await sendWebhookRequest(payload);
336+
expect(response.statusCode).toBe(200);
337+
const data = response.json();
338+
expect(data).toMatchObject({ received: true });
339+
340+
// Fetch the invoice again and verify it was NOT updated
341+
const [afterWebhookInvoice] = await fetchInvoicesFromDatabase(orbSync.postgresClient, [invoiceId]);
342+
expect(afterWebhookInvoice).toBeDefined();
343+
// Data should remain unchanged
344+
expect(Number(afterWebhookInvoice.total)).toBe(originalAmount);
345+
expect(afterWebhookInvoice.status).toBe(originalStatus);
346+
// last_synced_at should remain the new timestamp
347+
expect(new Date(afterWebhookInvoice.last_synced_at).toISOString()).toBe(newTimestamp);
263348
});
264349
});
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
-- Add last_synced_at column to all relevant entity tables
2+
-- This column tracks when each entity was last synchronized with Orb.
3+
-- It enables timestamp-based protection to prevent old webhooks from overwriting newer data.
4+
5+
alter table orb.invoices
6+
add column last_synced_at timestamptz;
7+
8+
alter table orb.customers
9+
add column last_synced_at timestamptz;
10+
11+
alter table orb.subscriptions
12+
add column last_synced_at timestamptz;
13+
14+
alter table orb.credit_notes
15+
add column last_synced_at timestamptz;
16+
17+
alter table orb.plans
18+
add column last_synced_at timestamptz;
19+
20+
alter table orb.billable_metrics
21+
add column last_synced_at timestamptz;

packages/orb-sync-lib/src/database/postgres.ts

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,51 @@ export class PostgresClient {
4747
return results.flatMap((it) => it.rows);
4848
}
4949

50+
async upsertManyWithTimestampProtection<
51+
T extends {
52+
[Key: string]: any; // eslint-disable-line @typescript-eslint/no-explicit-any
53+
},
54+
>(
55+
entries: T[],
56+
table: string,
57+
tableSchema: JsonSchema,
58+
syncTimestamp: string
59+
): Promise<T[]> {
60+
if (!entries.length) return [];
61+
62+
// Max 5 in parallel to avoid exhausting connection pool
63+
const chunkSize = 5;
64+
const results: pg.QueryResult<T>[] = [];
65+
66+
for (let i = 0; i < entries.length; i += chunkSize) {
67+
const chunk = entries.slice(i, i + chunkSize);
68+
69+
const queries: Promise<pg.QueryResult<T>>[] = [];
70+
chunk.forEach((entry) => {
71+
// Inject the values
72+
const cleansed = this.cleanseArrayField(entry, tableSchema);
73+
// Add last_synced_at to the cleansed data for SQL parameter binding
74+
cleansed.last_synced_at = syncTimestamp;
75+
76+
const upsertSql = this.constructUpsertWithTimestampProtectionSql(
77+
this.config.schema,
78+
table,
79+
tableSchema
80+
);
81+
82+
const prepared = sql(upsertSql, {
83+
useNullForMissing: true,
84+
})(cleansed);
85+
86+
queries.push(this.pool.query(prepared.text, prepared.values));
87+
});
88+
89+
results.push(...(await Promise.all(queries)));
90+
}
91+
92+
return results.flatMap((it) => it.rows);
93+
}
94+
5095
private constructUpsertSql = (schema: string, table: string, tableSchema: JsonSchema): string => {
5196
const conflict = 'id';
5297
const properties = tableSchema.properties;
@@ -72,6 +117,35 @@ export class PostgresClient {
72117
;`;
73118
};
74119

120+
private constructUpsertWithTimestampProtectionSql = (
121+
schema: string,
122+
table: string,
123+
tableSchema: JsonSchema
124+
): string => {
125+
const conflict = 'id';
126+
const properties = tableSchema.properties;
127+
128+
return `
129+
INSERT INTO "${schema}"."${table}" (
130+
${Object.keys(properties)
131+
.map((x) => `"${x}"`)
132+
.join(',')}
133+
)
134+
VALUES (
135+
${Object.keys(properties)
136+
.map((x) => `:${x}`)
137+
.join(',')}
138+
)
139+
ON CONFLICT (${conflict}) DO UPDATE SET
140+
${Object.keys(properties)
141+
.filter((x) => x !== 'last_synced_at')
142+
.map((x) => `"${x}" = EXCLUDED."${x}"`)
143+
.join(',')},
144+
last_synced_at = :last_synced_at
145+
WHERE "${table}"."last_synced_at" IS NULL
146+
OR "${table}"."last_synced_at" < :last_synced_at;`;
147+
};
148+
75149
/**
76150
* Updates a subscription's billing cycle dates, provided that the current end date is in the past (i.e. the subscription
77151
* data in the database being stale).

packages/orb-sync-lib/src/orb-sync.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ export class OrbSync {
173173
const invoice = webhook.invoice;
174174
this.config.logger?.info(`Received webhook ${webhook.id}: ${parsedData.type} for invoice ${invoice.id}`);
175175

176-
await syncInvoices(this.postgresClient, [invoice]);
176+
await syncInvoices(this.postgresClient, [invoice], webhook.created_at);
177177

178178
const billingCycle = getBillingCycleFromInvoice(invoice);
179179
if (billingCycle && invoice.subscription) {
@@ -201,7 +201,7 @@ export class OrbSync {
201201

202202
this.config.logger?.info(`Received webhook ${webhook.id}: ${webhook.type} for invoice ${webhook.invoice.id}`);
203203

204-
await syncInvoices(this.postgresClient, [webhook.invoice]);
204+
await syncInvoices(this.postgresClient, [webhook.invoice], webhook.created_at);
205205
break;
206206
}
207207

packages/orb-sync-lib/src/schemas/invoice.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ export const invoiceSchema: JsonSchema = {
4343
sync_failed_at: { type: 'string' },
4444
voided_at: { type: 'string' },
4545
will_auto_issue: { type: 'boolean' },
46+
last_synced_at: { type: 'string' },
4647
},
4748
required: ['id'],
4849
} as const;

packages/orb-sync-lib/src/sync/invoices.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,19 @@ import { Invoice } from 'orb-billing/resources';
66

77
const TABLE = 'invoices';
88

9-
export async function syncInvoices(postgresClient: PostgresClient, invoices: Invoice[]) {
10-
return postgresClient.upsertMany(
9+
export async function syncInvoices(postgresClient: PostgresClient, invoices: Invoice[], syncTimestamp?: string) {
10+
const timestamp = syncTimestamp || new Date().toISOString();
11+
12+
return postgresClient.upsertManyWithTimestampProtection(
1113
invoices.map((invoice) => ({
1214
...invoice,
1315
customer_id: invoice.customer.id,
1416
subscription_id: invoice.subscription?.id,
17+
last_synced_at: timestamp,
1518
})),
1619
TABLE,
17-
invoiceSchema
20+
invoiceSchema,
21+
timestamp
1822
);
1923
}
2024

0 commit comments

Comments
 (0)