Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(webhook): Implement webhook functionality for bookmark events #852

Merged
merged 5 commits into from
Jan 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apps/workers/crawlerWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import {
OpenAIQueue,
triggerSearchReindex,
triggerVideoWorker,
triggerWebhookWorker,
zCrawlLinkRequestSchema,
} from "@hoarder/shared/queues";
import { BookmarkTypes } from "@hoarder/shared/types/bookmarks";
Expand Down Expand Up @@ -770,6 +771,9 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
// Trigger a potential download of a video from the URL
await triggerVideoWorker(bookmarkId, url);

// Trigger a webhook
await triggerWebhookWorker(bookmarkId, "crawled");

// Do the archival as a separate last step as it has the potential for failure
await archivalLogic();
}
Expand Down
34 changes: 23 additions & 11 deletions apps/workers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,31 @@ import { shutdownPromise } from "./exit";
import { OpenAiWorker } from "./openaiWorker";
import { SearchIndexingWorker } from "./searchWorker";
import { VideoWorker } from "./videoWorker";
import { WebhookWorker } from "./webhookWorker";

async function main() {
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
runQueueDBMigrations();

const [crawler, openai, search, tidyAssets, video, feed, assetPreprocessing] =
[
await CrawlerWorker.build(),
OpenAiWorker.build(),
SearchIndexingWorker.build(),
TidyAssetsWorker.build(),
VideoWorker.build(),
FeedWorker.build(),
AssetPreprocessingWorker.build(),
];
const [
crawler,
openai,
search,
tidyAssets,
video,
feed,
assetPreprocessing,
webhook,
] = [
await CrawlerWorker.build(),
OpenAiWorker.build(),
SearchIndexingWorker.build(),
TidyAssetsWorker.build(),
VideoWorker.build(),
FeedWorker.build(),
AssetPreprocessingWorker.build(),
WebhookWorker.build(),
];
FeedRefreshingWorker.start();

await Promise.any([
Expand All @@ -39,11 +49,12 @@ async function main() {
video.run(),
feed.run(),
assetPreprocessing.run(),
webhook.run(),
]),
shutdownPromise,
]);
logger.info(
"Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing and search workers ...",
"Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing, webhook and search workers ...",
);

FeedRefreshingWorker.stop();
Expand All @@ -54,6 +65,7 @@ async function main() {
video.stop();
feed.stop();
assetPreprocessing.stop();
webhook.stop();
}

main();
136 changes: 136 additions & 0 deletions apps/workers/webhookWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import { eq } from "drizzle-orm";
import { DequeuedJob, Runner } from "liteque";
import fetch from "node-fetch";

import { db } from "@hoarder/db";
import { bookmarks } from "@hoarder/db/schema";
import serverConfig from "@hoarder/shared/config";
import logger from "@hoarder/shared/logger";
import {
WebhookQueue,
ZWebhookRequest,
zWebhookRequestSchema,
} from "@hoarder/shared/queues";

export class WebhookWorker {
static build() {
logger.info("Starting webhook worker ...");
const worker = new Runner<ZWebhookRequest>(
WebhookQueue,
{
run: runWebhook,
onComplete: async (job) => {
const jobId = job.id;
logger.info(`[webhook][${jobId}] Completed successfully`);
return Promise.resolve();
},
onError: async (job) => {
const jobId = job.id;
logger.error(
`[webhook][${jobId}] webhook job failed: ${job.error}\n${job.error.stack}`,
);
return Promise.resolve();
},
},
{
concurrency: 1,
pollIntervalMs: 1000,
timeoutSecs:
serverConfig.webhook.timeoutSec *
(serverConfig.webhook.retryTimes + 1) +
1, //consider retry times, and timeout and add 1 second for other stuff
validator: zWebhookRequestSchema,
},
);

return worker;
}
}

async function fetchBookmark(linkId: string) {
return await db.query.bookmarks.findFirst({
where: eq(bookmarks.id, linkId),
with: {
link: true,
text: true,
asset: true,
},
});
}

async function runWebhook(job: DequeuedJob<ZWebhookRequest>) {
const jobId = job.id;
const webhookUrls = serverConfig.webhook.urls;
if (!webhookUrls) {
logger.info(
`[webhook][${jobId}] No webhook urls configured. Skipping webhook job.`,
);
return;
}
const webhookToken = serverConfig.webhook.token;
const webhookTimeoutSec = serverConfig.webhook.timeoutSec;

const { bookmarkId } = job.data;
const bookmark = await fetchBookmark(bookmarkId);
if (!bookmark) {
throw new Error(
`[webhook][${jobId}] bookmark with id ${bookmarkId} was not found`,
);
}

logger.info(
`[webhook][${jobId}] Starting a webhook job for bookmark with id "${bookmark.id}"`,
);

await Promise.allSettled(
webhookUrls.map(async (url) => {
const maxRetries = serverConfig.webhook.retryTimes;
let attempt = 0;
let success = false;

while (attempt < maxRetries && !success) {
try {
const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
...(webhookToken
? {
Authorization: `Bearer ${webhookToken}`,
}
: {}),
},
body: JSON.stringify({
jobId,
bookmarkId,
userId: bookmark.userId,
url: bookmark.link ? bookmark.link.url : undefined,
type: bookmark.type,
operation: job.data.operation,
}),
signal: AbortSignal.timeout(webhookTimeoutSec * 1000),
});

if (!response.ok) {
logger.error(
`Webhook call to ${url} failed with status: ${response.status}`,
);
} else {
logger.info(`[webhook][${jobId}] Webhook to ${url} call succeeded`);
success = true;
}
} catch (error) {
logger.error(
`[webhook][${jobId}] Webhook to ${url} call failed: ${error}`,
);
}
attempt++;
if (!success && attempt < maxRetries) {
logger.info(
`[webhook][${jobId}] Retrying webhook call to ${url}, attempt ${attempt + 1}`,
);
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should we do if the webhook response is a non success status code? In that case, do we want to retry or not? I think we probably should?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a retry logic and also a config WEBHOOK_RETRY_TIMES

}),
);
}
33 changes: 33 additions & 0 deletions docs/docs/03-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,36 @@ Hoarder uses [tesseract.js](https://github.com/naptha/tesseract.js) to extract t
| OCR_CACHE_DIR | No | $TEMP_DIR | The dir where tesseract will download its models. By default, those models are not persisted and stored in the OS' temp dir. |
| OCR_LANGS | No | eng | Comma separated list of the language codes that you want tesseract to support. You can find the language codes [here](https://tesseract-ocr.github.io/tessdoc/Data-Files-in-different-versions.html). Set to empty string to disable OCR. |
| OCR_CONFIDENCE_THRESHOLD | No | 50 | A number between 0 and 100 indicating the minimum acceptable confidence from tessaract. If tessaract's confidence is lower than this value, extracted text won't be stored. |

## Webhook Configs

You can use webhooks to trigger actions when bookmarks are changed ( only support _crawled_ now ).

| Name | Required | Default | Description |
| ------------------- | -------- | ------- | ---------------------------------------------------------------------------------------------- |
| WEBHOOK_URLS | No | | The urls of the webhooks to trigger, separated by commas. |
| WEBHOOK_TOKEN | No | | The token to use for authentication. Will appears in the Authorization header as Bearer token. |
| WEBHOOK_TIMEOUT_SEC | No | 5 | The timeout for the webhook request in seconds. |
| WEBHOOK_RETRY_TIMES | No | 3 | The number of times to retry the webhook request. |

:::info

- If a url is add to hoarder , after it is crawled, the webhook will be triggered.
- The WEBHOOK_TOKEN is used for authentication. It will appear in the Authorization header as Bearer token.
```
Authorization: Bearer <WEBHOOK_TOKEN>
```
- The webhook will be triggered with the job id (used for idempotence), bookmark id, bookmark type, the user id, the url and the operation in JSON format in the body.

```json
{
"jobId": 123,
"type": "link",
"bookmarkId": "exampleBookmarkId",
"userId": "exampleUserId",
"url": "https://example.com",
"operation": "crawled"
}
```

:::
14 changes: 14 additions & 0 deletions packages/shared/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ const allEnv = z.object({
DATA_DIR: z.string().default(""),
MAX_ASSET_SIZE_MB: z.coerce.number().default(4),
INFERENCE_LANG: z.string().default("english"),
WEBHOOK_URLS: z
.string()
.transform((val) => val.split(","))
.pipe(z.array(z.string().url()))
.optional(),
WEBHOOK_TOKEN: z.string().optional(),
WEBHOOK_TIMEOUT_SEC: z.coerce.number().default(5),
WEBHOOK_RETRY_TIMES: z.coerce.number().int().min(0).default(3),
// Build only flag
SERVER_VERSION: z.string().optional(),
DISABLE_NEW_RELEASE_CHECK: stringBool("false"),
Expand Down Expand Up @@ -134,6 +142,12 @@ const serverConfigSchema = allEnv.transform((val) => {
serverVersion: val.SERVER_VERSION,
disableNewReleaseCheck: val.DISABLE_NEW_RELEASE_CHECK,
usingLegacySeparateContainers: val.USING_LEGACY_SEPARATE_CONTAINERS,
webhook: {
urls: val.WEBHOOK_URLS,
token: val.WEBHOOK_TOKEN,
timeoutSec: val.WEBHOOK_TIMEOUT_SEC,
retryTimes: val.WEBHOOK_RETRY_TIMES,
},
};
});

Expand Down
27 changes: 27 additions & 0 deletions packages/shared/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,30 @@ export const AssetPreprocessingQueue =
keepFailedJobs: false,
},
);

//Webhook worker
export const zWebhookRequestSchema = z.object({
bookmarkId: z.string(),
operation: z.enum(["crawled"]),
});
export type ZWebhookRequest = z.infer<typeof zWebhookRequestSchema>;
export const WebhookQueue = new SqliteQueue<ZWebhookRequest>(
"webhook_queue",
queueDB,
{
defaultJobArgs: {
numRetries: 3,
},
keepFailedJobs: false,
},
);

export async function triggerWebhookWorker(
bookmarkId: string,
operation: "crawled",
) {
await WebhookQueue.enqueue({
bookmarkId,
operation,
});
}
Loading