Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(webhook): Implement webhook functionality for bookmark events #852

Merged
merged 5 commits into from
Jan 19, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apps/workers/crawlerWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import {
OpenAIQueue,
triggerSearchReindex,
triggerVideoWorker,
triggerWebhookWorker,
zCrawlLinkRequestSchema,
} from "@hoarder/shared/queues";
import { BookmarkTypes } from "@hoarder/shared/types/bookmarks";
Expand Down Expand Up @@ -746,6 +747,9 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
// Trigger a potential download of a video from the URL
await triggerVideoWorker(bookmarkId, url);

// Trigger a webhook
await triggerWebhookWorker(bookmarkId, url, "create");

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This handles the case for URLs though. I think we should also trigger the webhook for non URL bookmarks which don't go through the crawling workflows. You'll need to trigger those from packages/trpc/routers/bookmarks.ts (at the end of the createBookmark handler).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering that the question of which events to support needs more debate and consideration, I've constrained it to "crawled." This means we only support the event that signals crawl completion for now. This preserves scalability and prevents current issues from becoming unnecessarily complex like @mkarliner saying.

Copy link

@mkarliner mkarliner Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Been thinking about this the last few days.
May I suggest a event of 'edited' or 'changed'.
Here is the use case:
I intend to use these events to post a stream of my bookmarks to social media, in particular the Fediverse/Maston, through the ActivityPub protocol. I don't actually want an unfiltered stream to be posted, I want to curate it at some level.
So, with an 'changed' event, I can manually change a bookmark in some way that will indicated to my bot that this is a bookmark I want to share and it will pick up the change and publish it.

It's worth noting that there are now Fediverse - Threads - Bluesky bridges, so this solution would get Hoarder bookmarks to a potentially large audience

Alternatively, if the event specified which list the bookmark was added to, that would also work.

// Do the archival as a separate last step as it has the potential for failure
await archivalLogic();
}
Expand Down
8 changes: 6 additions & 2 deletions apps/workers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import { shutdownPromise } from "./exit";
import { OpenAiWorker } from "./openaiWorker";
import { SearchIndexingWorker } from "./searchWorker";
import { VideoWorker } from "./videoWorker";
import { WebhookWorker } from "./webhookWorker";

async function main() {
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
runQueueDBMigrations();

const [crawler, openai, search, tidyAssets, video, feed, assetPreprocessing] =
const [crawler, openai, search, tidyAssets, video, feed, assetPreprocessing, webhook] =
[
await CrawlerWorker.build(),
OpenAiWorker.build(),
Expand All @@ -27,6 +28,7 @@ async function main() {
VideoWorker.build(),
FeedWorker.build(),
AssetPreprocessingWorker.build(),
WebhookWorker.build(),
];
FeedRefreshingWorker.start();

Expand All @@ -39,11 +41,12 @@ async function main() {
video.run(),
feed.run(),
assetPreprocessing.run(),
webhook.run(),
]),
shutdownPromise,
]);
logger.info(
"Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing and search workers ...",
"Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing, webhook and search workers ...",
);

FeedRefreshingWorker.stop();
Expand All @@ -54,6 +57,7 @@ async function main() {
video.stop();
feed.stop();
assetPreprocessing.stop();
webhook.stop();
}

main();
109 changes: 109 additions & 0 deletions apps/workers/webhookWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { DequeuedJob, Runner } from "liteque";
import { db } from "@hoarder/db";
import { bookmarks } from "@hoarder/db/schema";
import serverConfig from "@hoarder/shared/config";
import logger from "@hoarder/shared/logger";
import { WebhookQueue, zWebhookRequestSchema, ZWebhookRequest } from "@hoarder/shared/queues";
import { eq } from "drizzle-orm";
import fetch from 'node-fetch';
import { Response } from 'node-fetch';

export class WebhookWorker {
static build() {
logger.info("Starting webhook worker ...");
const worker = new Runner<ZWebhookRequest>(
WebhookQueue,
{
run: runWebhook,
onComplete: async (job) => {
const jobId = job.id;
logger.info(`[webhook][${jobId}] Completed successfully`);
},
onError: async (job) => {
const jobId = job.id;
logger.error(
`[webhook][${jobId}] webhook job failed: ${job.error}\n${job.error.stack}`,
)
},
},
{
concurrency: 1,
pollIntervalMs: 1000,
timeoutSecs: serverConfig.inference.jobTimeoutSec,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reusing inference timeout here is probably not intentional, let's maybe using the webhook.timeout / 1000 server settings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to :

timeoutSecs: serverConfig.webhook.timeout / 1000 * (serverConfig.webhook.retryTimes + 1) * serverConfig.webhook.urls.length + 1,

This is considering retry times, urls, and timeout and add 1 second for other stuff

},
);

return worker;
}
}


async function fetchBookmark(linkId: string) {
return await db.query.bookmarks.findFirst({
where: eq(bookmarks.id, linkId),
with: {
link: true,
text: true,
asset: true,
},
});
}

async function runWebhook(job: DequeuedJob<ZWebhookRequest>) {
const jobId = job.id;
const webhookUrls = serverConfig.webhook.urls;
const webhookToken = serverConfig.webhook.token;
const webhookTimeout = serverConfig.webhook.timeout;

const request = zWebhookRequestSchema.safeParse(job.data);
if (!request.success) {
throw new Error(
`[webhook][${jobId}] Got malformed job request: ${request.error.toString()}`,
);
}

const { bookmarkId } = request.data;
const bookmark = await fetchBookmark(bookmarkId);
if (!bookmark) {
throw new Error(
`[webhook][${jobId}] bookmark with id ${bookmarkId} was not found`,
);
}

logger.info(
`[webhook][${jobId}] Starting a webhook job for bookmark with id "${bookmark.id}"`,
);

for (const url of webhookUrls) {
try {
const response = await Promise.race([
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead of Promise.race, we can use AbortSignal.timeout here so that the fetch itself get cancelled as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goods suggestions , I improved this

fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
"Authorization": `Bearer ${webhookToken}`,
},
body: JSON.stringify({
jobId, bookmarkId, userId: bookmark.userId, url: bookmark.link.url, operation: job.data.operation
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's include bookmark type here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added

})
}),
new Promise((_, reject) =>
setTimeout(() => reject(new Error(`Request timed out in ${webhookTimeout}ms`)), webhookTimeout)
)
]);

if (!(response instanceof Response)) {
throw new Error(`Webhook call to ${url} failed: response is not a Response object`);
}

if (!response.ok) {
logger.error(`Webhook call to ${url} failed with status: ${response.status}`);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should we do if the webhook response is a non success status code? In that case, do we want to retry or not? I think we probably should?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a retry logic and also a config WEBHOOK_RETRY_TIMES


logger.info(`[webhook][${jobId}] Webhook to ${url} call succeeded`);
} catch (error) {
logger.error(`[webhook][${jobId}] Webhook to ${url} call failed: ${error}`);
throw error;
}
}
}
30 changes: 30 additions & 0 deletions docs/docs/03-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,33 @@ Hoarder uses [tesseract.js](https://github.com/naptha/tesseract.js) to extract t
| OCR_CACHE_DIR | No | $TEMP_DIR | The dir where tesseract will download its models. By default, those models are not persisted and stored in the OS' temp dir. |
| OCR_LANGS | No | eng | Comma separated list of the language codes that you want tesseract to support. You can find the language codes [here](https://tesseract-ocr.github.io/tessdoc/Data-Files-in-different-versions.html). Set to empty string to disable OCR. |
| OCR_CONFIDENCE_THRESHOLD | No | 50 | A number between 0 and 100 indicating the minimum acceptable confidence from tessaract. If tessaract's confidence is lower than this value, extracted text won't be stored. |

## Webhook Configs

You can use webhooks to trigger actions when bookmarks are changed ( only support *create* now ).

| Name | Required | Default | Description |
| --------------- | -------- | ------- | ---------------------------------------------------------------------------------------------- |
| WEBHOOK_URLS | No | | The urls of the webhooks to trigger, separated by commas. |
| WEBHOOK_TOKEN | No | | The token to use for authentication. Will appears in the Authorization header as Bearer token. |
| WEBHOOK_TIMEOUT | No | 5000 | The timeout for the webhook request in milliseconds. |

:::info

- If a url is add to hoarder , after it is crawled, the webhook will be triggered.
- The WEBHOOK_TOKEN is used for authentication. It will appear in the Authorization header as Bearer token.
```
Authorization: Bearer <WEBHOOK_TOKEN>
```
- The webhook will be triggered with the job id (used for idempotent), bookmark id, the user id, the url and the operation in JSON format in the body.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- The webhook will be triggered with the job id (used for idempotent), bookmark id, the user id, the url and the operation in JSON format in the body.
- The webhook will be triggered with the job id (used for idempotence), bookmark id, the user id, the url and the operation in JSON format in the body.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good , changed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also some other doc improvements


```json
{
"jobId": 123,
"bookmarkId": "exampleBookmarkId",
"userId": "exampleUserId",
"url": "https://example.com",
"operation": "create"
}
```
:::
23 changes: 17 additions & 6 deletions packages/shared/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ const allEnv = z.object({
DATA_DIR: z.string().default(""),
MAX_ASSET_SIZE_MB: z.coerce.number().default(4),
INFERENCE_LANG: z.string().default("english"),
WEBHOOK_URLS: z.preprocess(
(val) => typeof val === "string" ? val.split(",") : val,
z.array(z.string().url())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, not sure if the use of preprocess here is right. It seems more correct to do do:

z.string().transform(val => val.split(",")).pipe(z.array(z.string().url()))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I changed it and with default value

).default([]),
WEBHOOK_TOKEN: z.string().default(""),
WEBHOOK_TIMEOUT: z.coerce.number().default(5000),
// Build only flag
SERVER_VERSION: z.string().optional(),
DISABLE_NEW_RELEASE_CHECK: stringBool("false"),
Expand Down Expand Up @@ -118,22 +124,27 @@ const serverConfigSchema = allEnv.transform((val) => {
},
meilisearch: val.MEILI_ADDR
? {
address: val.MEILI_ADDR,
key: val.MEILI_MASTER_KEY,
}
address: val.MEILI_ADDR,
key: val.MEILI_MASTER_KEY,
}
: undefined,
logLevel: val.LOG_LEVEL,
demoMode: val.DEMO_MODE
? {
email: val.DEMO_MODE_EMAIL,
password: val.DEMO_MODE_PASSWORD,
}
email: val.DEMO_MODE_EMAIL,
password: val.DEMO_MODE_PASSWORD,
}
: undefined,
dataDir: val.DATA_DIR,
maxAssetSizeMb: val.MAX_ASSET_SIZE_MB,
serverVersion: val.SERVER_VERSION,
disableNewReleaseCheck: val.DISABLE_NEW_RELEASE_CHECK,
usingLegacySeparateContainers: val.USING_LEGACY_SEPARATE_CONTAINERS,
webhook: {
urls: val.WEBHOOK_URLS,
token: val.WEBHOOK_TOKEN,
timeout: val.WEBHOOK_TIMEOUT,
},
};
});

Expand Down
27 changes: 27 additions & 0 deletions packages/shared/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,30 @@ export const AssetPreprocessingQueue =
keepFailedJobs: false,
},
);


//Webhook worker
export const zWebhookRequestSchema = z.object({
bookmarkId: z.string(),
url: z.string(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the url should be included here. We have bookmarks of other types, and you're already fetching the bookmark in the worker anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes , I removed it and also removed from triggerWebhookWorker

operation: z.enum(["create"]),
});
export type ZWebhookRequest = z.infer<typeof zWebhookRequestSchema>;
export const WebhookQueue = new SqliteQueue<ZWebhookRequest>(
"webhook_queue",
queueDB,
{
defaultJobArgs: {
numRetries: 1,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we probably should retry on failure? Maybe 3 retries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to 3

},
keepFailedJobs: false,
},
);

export async function triggerWebhookWorker(bookmarkId: string, url: string, operation: "create") {
await WebhookQueue.enqueue({
bookmarkId,
url,
operation,
});
}