Skip to content

fix: prevent synced invoices from getting updated by outdated webhooks #68

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/node-fastify/src/test/test-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export async function fetchInvoicesFromDatabase(postgresClient: PostgresClient,

const placeholders = invoiceIds.map((_, index) => `$${index + 1}`).join(',');
const result = await postgresClient.query(
`SELECT id, invoice_number, customer_id, total, currency, status, updated_at FROM orb.invoices WHERE id IN (${placeholders})`,
`SELECT id, invoice_number, customer_id, total, currency, status, updated_at, last_synced_at FROM orb.invoices WHERE id IN (${placeholders})`,
invoiceIds
);
return result.rows;
Expand Down
156 changes: 148 additions & 8 deletions apps/node-fastify/src/test/webhooks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ describe('POST /webhooks', () => {

const webhookData = JSON.parse(payload);
webhookData.type = webhookType;

const webhookTimestamp = new Date('2025-01-15T10:30:00.000Z').toISOString();
webhookData.created_at = webhookTimestamp;

payload = JSON.stringify(webhookData);

const invoiceId = webhookData.invoice.id;
Expand All @@ -200,6 +204,10 @@ describe('POST /webhooks', () => {
expect(invoice.total).toBe(webhookData.invoice.amount_due);
expect(invoice.currency).toBe(webhookData.invoice.currency);
expect(invoice.status).toBe(webhookData.invoice.status);

// Verify that last_synced_at gets set to the webhook timestamp for new invoices
expect(invoice.last_synced_at).toBeDefined();
expect(new Date(invoice.last_synced_at).toISOString()).toBe(webhookTimestamp);
});

it('should update an existing invoice when webhook arrives', async () => {
Expand Down Expand Up @@ -229,8 +237,14 @@ describe('POST /webhooks', () => {
status: initialStatus,
};

// Store the initial invoice in the database
await syncInvoices(postgresClient, [initialInvoiceData]);
// Store the initial invoice in the database with an old timestamp
const oldTimestamp = new Date('2025-01-10T10:00:00.000Z').toISOString();
await syncInvoices(postgresClient, [initialInvoiceData], oldTimestamp);

// Verify the initial invoice was created with the old timestamp
const [initialInvoice] = await fetchInvoicesFromDatabase(orbSync.postgresClient, [invoiceId]);
expect(initialInvoice).toBeDefined();
expect(new Date(initialInvoice.last_synced_at).toISOString()).toBe(oldTimestamp);

// Now update the webhook data with new values
const updatedAmount = 1500;
Expand All @@ -240,6 +254,10 @@ describe('POST /webhooks', () => {
webhookData.invoice.status = updatedStatus;
webhookData.invoice.paid_at = new Date().toISOString();

// Set a newer webhook timestamp that should trigger an update
const newWebhookTimestamp = new Date('2025-01-15T10:30:00.000Z').toISOString();
webhookData.created_at = newWebhookTimestamp;

payload = JSON.stringify(webhookData);

// Send the webhook with updated data
Expand All @@ -252,13 +270,135 @@ describe('POST /webhooks', () => {
});

// Verify that the invoice was updated in the database
const [invoice] = await fetchInvoicesFromDatabase(orbSync.postgresClient, [invoiceId]);
expect(invoice).toBeDefined();
expect(Number(invoice.total)).toBe(updatedAmount);
expect(invoice.status).toBe(updatedStatus);
const [updatedInvoice] = await fetchInvoicesFromDatabase(orbSync.postgresClient, [invoiceId]);
expect(updatedInvoice).toBeDefined();
expect(Number(updatedInvoice.total)).toBe(updatedAmount);
expect(updatedInvoice.status).toBe(updatedStatus);

// Verify that the updated_at timestamp was changed
expect(invoice.updated_at).toBeDefined();
expect(new Date(invoice.updated_at).getTime()).toBeGreaterThan(new Date(webhookData.invoice.created_at).getTime());
expect(updatedInvoice.updated_at).toBeDefined();
expect(new Date(updatedInvoice.updated_at).getTime()).toBeGreaterThan(new Date(webhookData.invoice.created_at).getTime());

// Verify that last_synced_at was updated to the new webhook timestamp
expect(updatedInvoice.last_synced_at).toBeDefined();
expect(new Date(updatedInvoice.last_synced_at).toISOString()).toBe(newWebhookTimestamp);

// Verify that the new timestamp is newer than the old timestamp
expect(new Date(updatedInvoice.last_synced_at).getTime()).toBeGreaterThan(new Date(oldTimestamp).getTime());
});

it('should NOT update invoice when webhook timestamp is older than last_synced_at', async () => {
let payload = loadWebhookPayload('invoice');
const postgresClient = orbSync.postgresClient;

const webhookData = JSON.parse(payload);
const invoiceId = webhookData.invoice.id;
await deleteTestData(orbSync.postgresClient, 'invoices', [invoiceId]);

webhookData.type = 'invoice.payment_succeeded';

// Insert an invoice with a "new" timestamp and known values
const originalAmount = 2000;
const originalStatus = 'paid';
webhookData.invoice.amount_due = originalAmount.toString();
webhookData.invoice.total = originalAmount.toString();
webhookData.invoice.status = originalStatus;

const newTimestamp = new Date('2025-01-15T10:30:00.000Z').toISOString();
const initialInvoiceData = {
...webhookData.invoice,
amount_due: originalAmount.toString(),
total: originalAmount.toString(),
status: originalStatus,
};
await syncInvoices(postgresClient, [initialInvoiceData], newTimestamp);

// Verify the invoice was created with the new timestamp
const [initialInvoice] = await fetchInvoicesFromDatabase(orbSync.postgresClient, [invoiceId]);
expect(initialInvoice).toBeDefined();
expect(Number(initialInvoice.total)).toBe(originalAmount);
expect(initialInvoice.status).toBe(originalStatus);
expect(new Date(initialInvoice.last_synced_at).toISOString()).toBe(newTimestamp);

// Now attempt to update with an older webhook timestamp and different values
const outdatedAmount = 1000;
const outdatedStatus = 'pending';
webhookData.invoice.amount_due = outdatedAmount.toString();
webhookData.invoice.total = outdatedAmount.toString();
webhookData.invoice.status = outdatedStatus;
webhookData.invoice.paid_at = undefined;
const oldWebhookTimestamp = new Date('2025-01-10T10:00:00.000Z').toISOString();
webhookData.created_at = oldWebhookTimestamp;
payload = JSON.stringify(webhookData);

// Send the webhook with the outdated data
const response = await sendWebhookRequest(payload);
expect(response.statusCode).toBe(200);
const data = response.json();
expect(data).toMatchObject({ received: true });

// Fetch the invoice again and verify it was NOT updated
const [afterWebhookInvoice] = await fetchInvoicesFromDatabase(orbSync.postgresClient, [invoiceId]);
expect(afterWebhookInvoice).toBeDefined();
// Data should remain unchanged
expect(Number(afterWebhookInvoice.total)).toBe(originalAmount);
expect(afterWebhookInvoice.status).toBe(originalStatus);
// last_synced_at should remain the new timestamp
expect(new Date(afterWebhookInvoice.last_synced_at).toISOString()).toBe(newTimestamp);
});

it('should ignore outdated webhook for invoice when last_synced_at is newer, even with multiple invoices', async () => {
const postgresClient = orbSync.postgresClient;
let payload = loadWebhookPayload('invoice');
const webhookData = JSON.parse(payload);

webhookData.type = 'invoice.payment_succeeded';

// Prepare two invoices with different ids
const invoiceId1 = webhookData.invoice.id;
const invoiceId2 = invoiceId1 + '_other';

// Insert both invoices with different last_synced_at values
const webhookInvoice1Timestamp = new Date('2025-01-10T09:30:00.000Z').toISOString(); // This is older than invoice1Timestamp, so it should not update invoice1
const invoice1Timestamp = new Date('2025-01-10T10:00:00.000Z').toISOString();
const invoice2Timestamp = new Date('2025-01-10T10:30:00.000Z').toISOString();

// Invoice 1: will be updated
const invoiceData1 = {
...webhookData.invoice,
id: invoiceId1,
total: '1500',
status: 'paid',
};

// Invoice 2 data is irrelevant for this test
const invoiceData2 = {
...webhookData.invoice,
id: invoiceId2
};

await deleteTestData(postgresClient, 'invoices', [invoiceId1, invoiceId2]);
await syncInvoices(postgresClient, [invoiceData1], invoice1Timestamp);
await syncInvoices(postgresClient, [invoiceData2], invoice2Timestamp);

// Now, send a webhook for invoiceId1 with an older timestamp and outdated values
webhookData.invoice.id = invoiceId1;
webhookData.invoice.total = '1000';
webhookData.invoice.status = 'pending';
webhookData.created_at = webhookInvoice1Timestamp;
payload = JSON.stringify(webhookData);

const response = await sendWebhookRequest(payload);
expect(response.statusCode).toBe(200);

// Fetch both invoices
const [updatedInvoice1] = await fetchInvoicesFromDatabase(postgresClient, [invoiceId1]);

// Invoice 1 should NOT be updated because the webhook timestamp is older than the invoice's last_synced_at
expect(updatedInvoice1).toBeDefined();
expect(updatedInvoice1.total).toBe('1500');
expect(updatedInvoice1.status).toBe('paid');
expect(new Date(updatedInvoice1.last_synced_at).toISOString()).toBe(invoice1Timestamp);

});
});
77 changes: 77 additions & 0 deletions packages/orb-sync-lib/src/database/postgres.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,51 @@ export class PostgresClient {
return results.flatMap((it) => it.rows);
}

async upsertManyWithTimestampProtection<
T extends {
[Key: string]: any; // eslint-disable-line @typescript-eslint/no-explicit-any
},
>(
entries: T[],
table: string,
tableSchema: JsonSchema,
syncTimestamp: string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When doing a manual sync, when can always pass new Date() as syncTimestamp, given we sync at that time specifically - in that case we may end up ALWAYS having a syncTimestamp, so this separate method would not be necessary - or are there any cases I am missing here?

Copy link
Contributor Author

@ignaciodob ignaciodob Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A separate method is not strictly necessary, but my idea was to keep the new functionality contained to the invoices first, especially given that we don't have tests for the other events yet. In case something goes wrong, syncing up only the invoices would be easier.

But yeah, eventually, I don't see a reason why we wouldn't do the same for every other entity.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine by me to get this out 👍

): Promise<T[]> {
if (!entries.length) return [];

// Max 5 in parallel to avoid exhausting connection pool
const chunkSize = 5;
const results: pg.QueryResult<T>[] = [];

for (let i = 0; i < entries.length; i += chunkSize) {
const chunk = entries.slice(i, i + chunkSize);

const queries: Promise<pg.QueryResult<T>>[] = [];
chunk.forEach((entry) => {
// Inject the values
const cleansed = this.cleanseArrayField(entry, tableSchema);
// Add last_synced_at to the cleansed data for SQL parameter binding
cleansed.last_synced_at = syncTimestamp;

const upsertSql = this.constructUpsertWithTimestampProtectionSql(
this.config.schema,
table,
tableSchema
);

const prepared = sql(upsertSql, {
useNullForMissing: true,
})(cleansed);

queries.push(this.pool.query(prepared.text, prepared.values));
});

results.push(...(await Promise.all(queries)));
}

return results.flatMap((it) => it.rows);
}

private constructUpsertSql = (schema: string, table: string, tableSchema: JsonSchema): string => {
const conflict = 'id';
const properties = tableSchema.properties;
Expand All @@ -72,6 +117,38 @@ export class PostgresClient {
;`;
};

private constructUpsertWithTimestampProtectionSql = (
schema: string,
table: string,
tableSchema: JsonSchema
): string => {
const conflict = 'id';
const properties = tableSchema.properties;

// The WHERE clause in ON CONFLICT DO UPDATE only applies to the conflicting row
// (the row being updated), not to all rows in the table. PostgreSQL ensures that
// the condition is evaluated only for the specific row that conflicts with the INSERT.
return `
INSERT INTO "${schema}"."${table}" (
${Object.keys(properties)
.map((x) => `"${x}"`)
.join(',')}
)
VALUES (
${Object.keys(properties)
.map((x) => `:${x}`)
.join(',')}
)
ON CONFLICT (${conflict}) DO UPDATE SET
${Object.keys(properties)
.filter((x) => x !== 'last_synced_at')
.map((x) => `"${x}" = EXCLUDED."${x}"`)
.join(',')},
last_synced_at = :last_synced_at
WHERE "${table}"."last_synced_at" IS NULL
Copy link
Collaborator

@kevcodez kevcodez Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will upsert if ANY entry in the table matches this, but not necessarily this id, we can do something like

WHERE NOT EXISTS (select 1 from "${table}" where id = :id AND "${table}"."last_synced_at" > :last_synced_at;)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The integration tests likely did not catch this (if true), given you test for a single entry

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! You're right - I'll add a test to cover this too

Copy link
Contributor Author

@ignaciodob ignaciodob Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to do a deep dive into the Postgres docs, but it seems that Postgres actually scopes the WHERE clause in ON CONFLICT DO UPDATE very narrowly.

Specifically:

  • The WHERE clause only runs after a conflict is detected (i.e. once a row with matching constraint is found).
  • At that point, it only has access to two rows:
    • The conflicting row from the table (via ${table} or alias)
    • The proposed insert row (via excluded)

So even though it looks like ${table}."last_synced_at" could match any row, it actually only refers to the row that caused the conflict.

Source:

  • PostgreSQL Docs on INSERT
    “The SET and WHERE clauses in ON CONFLICT DO UPDATE have access to the existing row using the table’s name (or an alias), and to the row proposed for insertion using the special excluded table.”
  • This comment on StackOverflow also says the same thing and has a clearer example: https://stackoverflow.com/a/44579804

That said, I find this to be quite non-intuitive (even a bit misleading 😅). I've added a test to verify that this works and reviewed it thoroughly, and I believe it addresses this case. I've added a comment too.

Let me know what you think :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sgtm, also tested this locally and it makes sense

OR "${table}"."last_synced_at" < :last_synced_at;`;
};

/**
* Updates a subscription's billing cycle dates, provided that the current end date is in the past (i.e. the subscription
* data in the database being stale).
Expand Down
4 changes: 2 additions & 2 deletions packages/orb-sync-lib/src/orb-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ export class OrbSync {
const invoice = webhook.invoice;
this.config.logger?.info(`Received webhook ${webhook.id}: ${parsedData.type} for invoice ${invoice.id}`);

await syncInvoices(this.postgresClient, [invoice]);
await syncInvoices(this.postgresClient, [invoice], webhook.created_at);

const billingCycle = getBillingCycleFromInvoice(invoice);
if (billingCycle && invoice.subscription) {
Expand Down Expand Up @@ -201,7 +201,7 @@ export class OrbSync {

this.config.logger?.info(`Received webhook ${webhook.id}: ${webhook.type} for invoice ${webhook.invoice.id}`);

await syncInvoices(this.postgresClient, [webhook.invoice]);
await syncInvoices(this.postgresClient, [webhook.invoice], webhook.created_at);
break;
}

Expand Down
1 change: 1 addition & 0 deletions packages/orb-sync-lib/src/schemas/invoice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export const invoiceSchema: JsonSchema = {
sync_failed_at: { type: 'string' },
voided_at: { type: 'string' },
will_auto_issue: { type: 'boolean' },
last_synced_at: { type: 'string' },
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last_synced_at might be a little confusing wrt to the name, as this field is either

  • timestamp when a manual sync was triggered (matches the name)
  • Timestamp of the entity change, not when it was synced - i.e. entity changed at 21:00:00, webhook came in at 21:15:00 - so we synced it at 21:15:00, but we will be using 21:00:00 as date

I'm okay with the name as-is, but maybe we can come up with a more fitting name, just a nit though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for flagging this!

Timestamp of the entity change, not when it was synced - i.e. entity changed at 21:00:00, webhook came in at 21:15:00 - so we synced it at 21:15:00, but we will be using 21:00:00 as date

We will actually update last_synced_at to the time of the webhook creation, not the time when it arrived. We're using the webhook's created_at timestamp.

For example, taken from a recent Orb webhook:

{
  "id": "<invoice_id>",
  "created_at": "2025-06-27T17:54:17.144536+00:00",
  "type": "invoice.issued",
  "invoice": {
    "created_at": "2025-06-27T17:54:15+00:00",
    ...
   }
}

We'll use the first one as the last_synced_at, which is two seconds after the Invoice entity was created/updated.
In the webhook handler:

 await syncInvoices(this.postgresClient, [webhook.invoice], webhook.created_at);

Notice that we're using webhook.created_at and not webhook.invoice.created_at

So last_synced_at = “we synced this entity because of an event that occurred at this time”
I think that last_synced_at still makes sense in this case, what do you think?

Happy to make the change if it doesn't make sense! :D

},
required: ['id'],
} as const;
10 changes: 7 additions & 3 deletions packages/orb-sync-lib/src/sync/invoices.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@ import { Invoice } from 'orb-billing/resources';

const TABLE = 'invoices';

export async function syncInvoices(postgresClient: PostgresClient, invoices: Invoice[]) {
return postgresClient.upsertMany(
export async function syncInvoices(postgresClient: PostgresClient, invoices: Invoice[], syncTimestamp?: string) {
const timestamp = syncTimestamp || new Date().toISOString();

return postgresClient.upsertManyWithTimestampProtection(
invoices.map((invoice) => ({
...invoice,
customer_id: invoice.customer.id,
subscription_id: invoice.subscription?.id,
last_synced_at: timestamp,
})),
TABLE,
invoiceSchema
invoiceSchema,
timestamp
);
}

Expand Down