Nick: bull mq

This commit is contained in:
Nicolas 2024-07-30 13:27:23 -04:00
parent 46bcbd931f
commit 7e002a8b06
16 changed files with 330 additions and 99 deletions

View File

@ -29,7 +29,6 @@
"@jest/globals": "^29.7.0", "@jest/globals": "^29.7.0",
"@tsconfig/recommended": "^1.0.3", "@tsconfig/recommended": "^1.0.3",
"@types/body-parser": "^1.19.2", "@types/body-parser": "^1.19.2",
"@types/bull": "^4.10.0",
"@types/cors": "^2.8.13", "@types/cors": "^2.8.13",
"@types/express": "^4.17.17", "@types/express": "^4.17.17",
"@types/jest": "^29.5.12", "@types/jest": "^29.5.12",
@ -63,7 +62,7 @@
"async-mutex": "^0.5.0", "async-mutex": "^0.5.0",
"axios": "^1.3.4", "axios": "^1.3.4",
"bottleneck": "^2.19.5", "bottleneck": "^2.19.5",
"bull": "^4.15.0", "bullmq": "^5.11.0",
"cacheable-lookup": "^6.1.0", "cacheable-lookup": "^6.1.0",
"cheerio": "^1.0.0-rc.12", "cheerio": "^1.0.0-rc.12",
"cohere": "^1.1.1", "cohere": "^1.1.1",
@ -98,6 +97,7 @@
"robots-parser": "^3.0.1", "robots-parser": "^3.0.1",
"scrapingbee": "^1.7.4", "scrapingbee": "^1.7.4",
"stripe": "^16.1.0", "stripe": "^16.1.0",
"systeminformation": "^5.22.11",
"turndown": "^7.1.3", "turndown": "^7.1.3",
"turndown-plugin-gfm": "^1.0.2", "turndown-plugin-gfm": "^1.0.2",
"typesense": "^1.5.4", "typesense": "^1.5.4",

View File

@ -56,9 +56,9 @@ importers:
bottleneck: bottleneck:
specifier: ^2.19.5 specifier: ^2.19.5
version: 2.19.5 version: 2.19.5
bull: bullmq:
specifier: ^4.15.0 specifier: ^5.11.0
version: 4.15.0 version: 5.11.0
cacheable-lookup: cacheable-lookup:
specifier: ^6.1.0 specifier: ^6.1.0
version: 6.1.0 version: 6.1.0
@ -161,6 +161,9 @@ importers:
stripe: stripe:
specifier: ^16.1.0 specifier: ^16.1.0
version: 16.1.0 version: 16.1.0
systeminformation:
specifier: ^5.22.11
version: 5.22.11
turndown: turndown:
specifier: ^7.1.3 specifier: ^7.1.3
version: 7.2.0 version: 7.2.0
@ -201,9 +204,6 @@ importers:
'@types/body-parser': '@types/body-parser':
specifier: ^1.19.2 specifier: ^1.19.2
version: 1.19.5 version: 1.19.5
'@types/bull':
specifier: ^4.10.0
version: 4.10.0
'@types/cors': '@types/cors':
specifier: ^2.8.13 specifier: ^2.8.13
version: 2.8.17 version: 2.8.17
@ -1535,10 +1535,6 @@ packages:
'@types/body-parser@1.19.5': '@types/body-parser@1.19.5':
resolution: {integrity: sha512-fB3Zu92ucau0iQ0JMCFQE7b/dv8Ot07NI3KaZIkIUNXq82k4eBAqUaneXfleGY9JWskeS9y+u0nXMyspcuQrCg==} resolution: {integrity: sha512-fB3Zu92ucau0iQ0JMCFQE7b/dv8Ot07NI3KaZIkIUNXq82k4eBAqUaneXfleGY9JWskeS9y+u0nXMyspcuQrCg==}
'@types/bull@4.10.0':
resolution: {integrity: sha512-RkYW8K2H3J76HT6twmHYbzJ0GtLDDotpLP9ah9gtiA7zfF6peBH1l5fEiK0oeIZ3/642M7Jcb9sPmor8Vf4w6g==}
deprecated: This is a stub types definition. bull provides its own type definitions, so you do not need this installed.
'@types/bunyan@1.8.9': '@types/bunyan@1.8.9':
resolution: {integrity: sha512-ZqS9JGpBxVOvsawzmVt30sP++gSQMTejCkIAQ3VdadOcRE8izTyW66hufvwLeH+YEGP6Js2AW7Gz+RMyvrEbmw==} resolution: {integrity: sha512-ZqS9JGpBxVOvsawzmVt30sP++gSQMTejCkIAQ3VdadOcRE8izTyW66hufvwLeH+YEGP6Js2AW7Gz+RMyvrEbmw==}
@ -1935,9 +1931,8 @@ packages:
buffer@6.0.3: buffer@6.0.3:
resolution: {integrity: sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==} resolution: {integrity: sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==}
bull@4.15.0: bullmq@5.11.0:
resolution: {integrity: sha512-nOEAfUXwUXtFbRPQP3bWCwpQ/NAerAu2Nym/ucv5C1E+Qh2x6RGdKKsYIfZam4mYncayTynTUN/HLhRgGi2N8w==} resolution: {integrity: sha512-qVzyWGZqie3VHaYEgRXhId/j8ebfmj6MExEJyUByMsUJA5pVciVle3hKLer5fyMwtQ8lTMP7GwhXV/NZ+HzlRA==}
engines: {node: '>=12'}
bytes@3.1.2: bytes@3.1.2:
resolution: {integrity: sha512-/Nf7TyzTx6S3yRJObOAV7956r8cr2+Oj8AC5dt8wSP3BQAoeX58NoHyCU8P8zGkNXStjTSi6fzO6F0pBdcYbEg==} resolution: {integrity: sha512-/Nf7TyzTx6S3yRJObOAV7956r8cr2+Oj8AC5dt8wSP3BQAoeX58NoHyCU8P8zGkNXStjTSi6fzO6F0pBdcYbEg==}
@ -2559,10 +2554,6 @@ packages:
resolution: {integrity: sha512-pjzuKtY64GYfWizNAJ0fr9VqttZkNiK2iS430LtIHzjBEr6bX8Am2zm4sW4Ro5wjWW5cAlRL1qAMTcXbjNAO2Q==} resolution: {integrity: sha512-pjzuKtY64GYfWizNAJ0fr9VqttZkNiK2iS430LtIHzjBEr6bX8Am2zm4sW4Ro5wjWW5cAlRL1qAMTcXbjNAO2Q==}
engines: {node: '>=8.0.0'} engines: {node: '>=8.0.0'}
get-port@5.1.1:
resolution: {integrity: sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ==}
engines: {node: '>=8'}
get-stream@5.2.0: get-stream@5.2.0:
resolution: {integrity: sha512-nBF+F1rAZVCu/p7rjzgA+Yb4lfYXrpl7a6VmJrU8wF9I1CKvP/QwPNZHnOlwbTkY6dvtFIzFMSyQXbLoTQPRpA==} resolution: {integrity: sha512-nBF+F1rAZVCu/p7rjzgA+Yb4lfYXrpl7a6VmJrU8wF9I1CKvP/QwPNZHnOlwbTkY6dvtFIzFMSyQXbLoTQPRpA==}
engines: {node: '>=8'} engines: {node: '>=8'}
@ -3533,6 +3524,9 @@ packages:
resolution: {integrity: sha512-dBpDMdxv9Irdq66304OLfEmQ9tbNRFnFTuZiLo+bD+r332bBmMJ8GBLXklIXXgxd3+v9+KUnZaUR5PJMa75Gsg==} resolution: {integrity: sha512-dBpDMdxv9Irdq66304OLfEmQ9tbNRFnFTuZiLo+bD+r332bBmMJ8GBLXklIXXgxd3+v9+KUnZaUR5PJMa75Gsg==}
engines: {node: '>= 0.4.0'} engines: {node: '>= 0.4.0'}
node-abort-controller@3.1.1:
resolution: {integrity: sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==}
node-domexception@1.0.0: node-domexception@1.0.0:
resolution: {integrity: sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ==} resolution: {integrity: sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ==}
engines: {node: '>=10.5.0'} engines: {node: '>=10.5.0'}
@ -4258,6 +4252,12 @@ packages:
resolution: {integrity: sha512-SzRP5LQ6Ts2G5NyAa/jg16s8e3R7rfdFjizy1zeoecYWw+nGL+YA1xZvW/+iJmidBGSdLkuvdwTYEyJEb+EiUw==} resolution: {integrity: sha512-SzRP5LQ6Ts2G5NyAa/jg16s8e3R7rfdFjizy1zeoecYWw+nGL+YA1xZvW/+iJmidBGSdLkuvdwTYEyJEb+EiUw==}
engines: {node: '>=0.2.6'} engines: {node: '>=0.2.6'}
systeminformation@5.22.11:
resolution: {integrity: sha512-aLws5yi4KCHTb0BVvbodQY5bY8eW4asMRDTxTW46hqw9lGjACX6TlLdJrkdoHYRB0qs+MekqEq1zG7WDnWE8Ug==}
engines: {node: '>=8.0.0'}
os: [darwin, linux, win32, freebsd, openbsd, netbsd, sunos, android]
hasBin: true
tar-fs@3.0.5: tar-fs@3.0.5:
resolution: {integrity: sha512-JOgGAmZyMgbqpLwct7ZV8VzkEB6pxXFBVErLtb+XCOqzc6w1xiWKI9GVd6bwk68EX7eJ4DWmfXVmq8K2ziZTGg==} resolution: {integrity: sha512-JOgGAmZyMgbqpLwct7ZV8VzkEB6pxXFBVErLtb+XCOqzc6w1xiWKI9GVd6bwk68EX7eJ4DWmfXVmq8K2ziZTGg==}
@ -4450,10 +4450,6 @@ packages:
resolution: {integrity: sha512-8XkAphELsDnEGrDxUOHB3RGvXz6TeuYSGEZBOjtTtPm2lwhGBjLgOzLHB63IUWfBpNucQjND6d3AOudO+H3RWQ==} resolution: {integrity: sha512-8XkAphELsDnEGrDxUOHB3RGvXz6TeuYSGEZBOjtTtPm2lwhGBjLgOzLHB63IUWfBpNucQjND6d3AOudO+H3RWQ==}
hasBin: true hasBin: true
uuid@8.3.2:
resolution: {integrity: sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==}
hasBin: true
uuid@9.0.1: uuid@9.0.1:
resolution: {integrity: sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==} resolution: {integrity: sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==}
hasBin: true hasBin: true
@ -6437,12 +6433,6 @@ snapshots:
'@types/connect': 3.4.38 '@types/connect': 3.4.38
'@types/node': 20.14.1 '@types/node': 20.14.1
'@types/bull@4.10.0':
dependencies:
bull: 4.15.0
transitivePeerDependencies:
- supports-color
'@types/bunyan@1.8.9': '@types/bunyan@1.8.9':
dependencies: dependencies:
'@types/node': 20.14.1 '@types/node': 20.14.1
@ -6913,15 +6903,15 @@ snapshots:
base64-js: 1.5.1 base64-js: 1.5.1
ieee754: 1.2.1 ieee754: 1.2.1
bull@4.15.0: bullmq@5.11.0:
dependencies: dependencies:
cron-parser: 4.9.0 cron-parser: 4.9.0
get-port: 5.1.1
ioredis: 5.4.1 ioredis: 5.4.1
lodash: 4.17.21
msgpackr: 1.10.2 msgpackr: 1.10.2
node-abort-controller: 3.1.1
semver: 7.6.2 semver: 7.6.2
uuid: 8.3.2 tslib: 2.6.3
uuid: 9.0.1
transitivePeerDependencies: transitivePeerDependencies:
- supports-color - supports-color
@ -7522,8 +7512,6 @@ snapshots:
get-package-type@0.1.0: {} get-package-type@0.1.0: {}
get-port@5.1.1: {}
get-stream@5.2.0: get-stream@5.2.0:
dependencies: dependencies:
pump: 3.0.0 pump: 3.0.0
@ -8605,6 +8593,8 @@ snapshots:
netmask@2.0.2: {} netmask@2.0.2: {}
node-abort-controller@3.1.1: {}
node-domexception@1.0.0: {} node-domexception@1.0.0: {}
node-ensure@0.0.0: {} node-ensure@0.0.0: {}
@ -9417,6 +9407,8 @@ snapshots:
sylvester@0.0.12: {} sylvester@0.0.12: {}
systeminformation@5.22.11: {}
tar-fs@3.0.5: tar-fs@3.0.5:
dependencies: dependencies:
pump: 3.0.0 pump: 3.0.0
@ -9589,8 +9581,6 @@ snapshots:
uuid@10.0.0: {} uuid@10.0.0: {}
uuid@8.3.2: {}
uuid@9.0.1: {} uuid@9.0.1: {}
v8-compile-cache-lib@3.0.1: {} v8-compile-cache-lib@3.0.1: {}

View File

@ -1,6 +1,6 @@
import { Request, Response } from "express"; import { Request, Response } from "express";
import { Job } from "bull"; import { Job } from "bullmq";
import { Logger } from "../../lib/logger"; import { Logger } from "../../lib/logger";
import { getWebScraperQueue } from "../../services/queue-service"; import { getWebScraperQueue } from "../../services/queue-service";
import { checkAlerts } from "../../services/alerts"; import { checkAlerts } from "../../services/alerts";

View File

@ -41,7 +41,15 @@ export async function crawlCancelController(req: Request, res: Response) {
} }
const jobState = await job.getState(); const jobState = await job.getState();
const { partialDocs } = await job.progress(); let progress = job.progress;
if(typeof progress !== 'object') {
progress = {
partialDocs: []
}
}
const {
partialDocs = []
} = progress as { partialDocs: any[] };
if (partialDocs && partialDocs.length > 0 && jobState === "active") { if (partialDocs && partialDocs.length > 0 && jobState === "active") {
Logger.info("Billing team for partial docs..."); Logger.info("Billing team for partial docs...");
@ -51,10 +59,11 @@ export async function crawlCancelController(req: Request, res: Response) {
} }
try { try {
await getWebScraperQueue().client.del(job.lockKey()); // TODO: FIX THIS by doing as a flag on the data?
await job.takeLock(); // await getWebScraperQueue().client.del(job.lockKey());
await job.discard(); // await job.takeLock();
await job.moveToFailed(Error("Job cancelled by user"), true); // await job.discard();
// await job.moveToFailed(Error("Job cancelled by user"), true);
} catch (error) { } catch (error) {
Logger.error(error); Logger.error(error);
} }

View File

@ -21,7 +21,23 @@ export async function crawlStatusController(req: Request, res: Response) {
return res.status(404).json({ error: "Job not found" }); return res.status(404).json({ error: "Job not found" });
} }
const { current, current_url, total, current_step, partialDocs } = await job.progress(); let progress = job.progress;
if(typeof progress !== 'object') {
progress = {
current: 0,
current_url: '',
total: 0,
current_step: '',
partialDocs: []
}
}
const {
current = 0,
current_url = '',
total = 0,
current_step = '',
partialDocs = []
} = progress as { current: number, current_url: string, total: number, current_step: string, partialDocs: any[] };
let data = job.returnvalue; let data = job.returnvalue;
if (process.env.USE_DB_AUTHENTICATION === "true") { if (process.env.USE_DB_AUTHENTICATION === "true") {

View File

@ -74,7 +74,7 @@ export async function crawlController(req: Request, res: Response) {
}); });
const docs = await a.getDocuments(false, (progress) => { const docs = await a.getDocuments(false, (progress) => {
job.progress({ job.updateProgress({
current: progress.current, current: progress.current,
total: progress.total, total: progress.total,
current_step: "SCRAPING", current_step: "SCRAPING",

View File

@ -76,7 +76,7 @@ export async function scrapeHelper(
} }
} }
wsq.on("global:completed", listener); // wsq.on("global:completed", listener);
const timeoutPromise = new Promise<{ success: boolean; error?: string; returnCode: number }>((_, reject) => const timeoutPromise = new Promise<{ success: boolean; error?: string; returnCode: number }>((_, reject) =>
setTimeout(() => reject({ success: false, error: "Request timed out. Increase the timeout by passing `timeout` param to the request.", returnCode: 408 }), timeout) setTimeout(() => reject({ success: false, error: "Request timed out. Increase the timeout by passing `timeout` param to the request.", returnCode: 408 }), timeout)

View File

@ -10,7 +10,24 @@ export async function crawlJobStatusPreviewController(req: Request, res: Respons
return res.status(404).json({ error: "Job not found" }); return res.status(404).json({ error: "Job not found" });
} }
const { current, current_url, total, current_step, partialDocs } = await job.progress(); let progress = job.progress;
if(typeof progress !== 'object') {
progress = {
current: 0,
current_url: '',
total: 0,
current_step: '',
partialDocs: []
}
}
const {
current = 0,
current_url = '',
total = 0,
current_step = '',
partialDocs = []
} = progress as { current: number, current_url: string, total: number, current_step: string, partialDocs: any[] };
let data = job.returnvalue; let data = job.returnvalue;
if (process.env.USE_DB_AUTHENTICATION === "true") { if (process.env.USE_DB_AUTHENTICATION === "true") {
const supabaseData = await supabaseGetJobById(req.params.jobId); const supabaseData = await supabaseGetJobById(req.params.jobId);
@ -21,7 +38,7 @@ export async function crawlJobStatusPreviewController(req: Request, res: Respons
} }
let jobStatus = await job.getState(); let jobStatus = await job.getState();
if (jobStatus === 'waiting' || jobStatus === 'stuck') { if (jobStatus === 'waiting' || jobStatus === 'delayed' || jobStatus === 'waiting-children' || jobStatus === 'unknown' || jobStatus === 'prioritized') {
jobStatus = 'active'; jobStatus = 'active';
} }

View File

@ -181,11 +181,11 @@ if (cluster.isMaster) {
Logger.info(`Worker ${process.pid} started`); Logger.info(`Worker ${process.pid} started`);
} }
const wsq = getWebScraperQueue(); // const wsq = getWebScraperQueue();
wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); // wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));
wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active")); // wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active"));
wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed")); // wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed"));
wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused")); // wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); // wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); // wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));

View File

@ -1,4 +1,4 @@
import { Job, JobId } from "bull"; import { Job } from "bullmq";
import type { baseScrapers } from "../scraper/WebScraper/single_url"; import type { baseScrapers } from "../scraper/WebScraper/single_url";
import { supabase_service as supabase } from "../services/supabase"; import { supabase_service as supabase } from "../services/supabase";
import { Logger } from "./logger"; import { Logger } from "./logger";
@ -70,7 +70,7 @@ export class ScrapeEvents {
} }
} }
static async logJobEvent(job: Job | JobId, event: ScrapeQueueEvent["event"]) { static async logJobEvent(job: Job | any, event: ScrapeQueueEvent["event"]) {
try { try {
await this.insert(((job as any).id ? (job as any).id : job) as string, { await this.insert(((job as any).id ? (job as any).id : job) as string, {
type: "queue", type: "queue",

View File

@ -1,4 +1,4 @@
import { Job } from "bull"; import { Job } from "bullmq";
import { import {
CrawlResult, CrawlResult,
WebScraperOptions, WebScraperOptions,
@ -15,8 +15,10 @@ import { ScrapeEvents } from "../lib/scrape-events";
export async function startWebScraperPipeline({ export async function startWebScraperPipeline({
job, job,
token,
}: { }: {
job: Job<WebScraperOptions>; job: Job<WebScraperOptions>;
token: string;
}) { }) {
let partialDocs: Document[] = []; let partialDocs: Document[] = [];
return (await runWebScraper({ return (await runWebScraper({
@ -31,17 +33,17 @@ export async function startWebScraperPipeline({
if (partialDocs.length > 50) { if (partialDocs.length > 50) {
partialDocs = partialDocs.slice(-50); partialDocs = partialDocs.slice(-50);
} }
job.progress({ ...progress, partialDocs: partialDocs }); job.updateProgress({ ...progress, partialDocs: partialDocs });
} }
}, },
onSuccess: (result) => { onSuccess: (result) => {
Logger.debug(`🐂 Job completed ${job.id}`); Logger.debug(`🐂 Job completed ${job.id}`);
saveJob(job, result); saveJob(job, result, token);
}, },
onError: (error) => { onError: (error) => {
Logger.error(`🐂 Job failed ${job.id}`); Logger.error(`🐂 Job failed ${job.id}`);
ScrapeEvents.logJobEvent(job, "failed"); ScrapeEvents.logJobEvent(job, "failed");
job.moveToFailed(error); job.moveToFailed(error, token, false);
}, },
team_id: job.data.team_id, team_id: job.data.team_id,
bull_job_id: job.id.toString(), bull_job_id: job.id.toString(),
@ -121,7 +123,7 @@ export async function runWebScraper({
} }
} }
const saveJob = async (job: Job, result: any) => { const saveJob = async (job: Job, result: any, token: string) => {
try { try {
if (process.env.USE_DB_AUTHENTICATION === "true") { if (process.env.USE_DB_AUTHENTICATION === "true") {
const { data, error } = await supabase_service const { data, error } = await supabase_service
@ -131,13 +133,13 @@ const saveJob = async (job: Job, result: any) => {
if (error) throw new Error(error.message); if (error) throw new Error(error.message);
try { try {
await job.moveToCompleted(null); await job.moveToCompleted(null, token, false);
} catch (error) { } catch (error) {
// I think the job won't exist here anymore // I think the job won't exist here anymore
} }
} else { } else {
try { try {
await job.moveToCompleted(result); await job.moveToCompleted(result, token, false);
} catch (error) { } catch (error) {
// I think the job won't exist here anymore // I think the job won't exist here anymore
} }

View File

@ -36,9 +36,8 @@ export async function checkAlerts() {
const checkWaitingQueue = async () => { const checkWaitingQueue = async () => {
const webScraperQueue = getWebScraperQueue(); const webScraperQueue = getWebScraperQueue();
const waitingJobs = await webScraperQueue.getWaitingCount(); const waitingJobs = await webScraperQueue.getWaitingCount();
const paused = await webScraperQueue.getPausedCount();
if (waitingJobs !== paused && waitingJobs > Number(process.env.ALERT_NUM_WAITING_JOBS)) { if (waitingJobs > Number(process.env.ALERT_NUM_WAITING_JOBS)) {
Logger.warn( Logger.warn(
`Alert: Number of waiting jobs is over ${process.env.ALERT_NUM_WAITING_JOBS}. Current waiting jobs: ${waitingJobs}.` `Alert: Number of waiting jobs is over ${process.env.ALERT_NUM_WAITING_JOBS}. Current waiting jobs: ${waitingJobs}.`
); );

View File

@ -1,4 +1,4 @@
import { Job, Queue } from "bull"; import { Job, Queue } from "bullmq";
import { import {
getWebScraperQueue, getWebScraperQueue,
} from "./queue-service"; } from "./queue-service";
@ -10,7 +10,7 @@ export async function addWebScraperJob(
options: any = {}, options: any = {},
jobId: string = uuidv4(), jobId: string = uuidv4(),
): Promise<Job> { ): Promise<Job> {
return await getWebScraperQueue().add(webScraperOptions, { return await getWebScraperQueue().add(jobId, webScraperOptions, {
...options, ...options,
jobId, jobId,
}); });

View File

@ -1,23 +1,41 @@
import Queue from "bull"; import { Queue } from "bullmq";
import { Queue as BullQueue } from "bull";
import { Logger } from "../lib/logger"; import { Logger } from "../lib/logger";
import IORedis from "ioredis";
import { Worker } from "bullmq";
import systemMonitor from "./system-monitor";
import { v4 as uuidv4 } from "uuid";
let webScraperQueue: BullQueue; let webScraperQueue: Queue;
export const redisConnection = new IORedis(process.env.REDIS_URL, {
maxRetriesPerRequest: null,
});
export const webScraperQueueName = "{webscraperQueue}";
export function getWebScraperQueue() { export function getWebScraperQueue() {
if (!webScraperQueue) { if (!webScraperQueue) {
webScraperQueue = new Queue("web-scraper", process.env.REDIS_URL, { webScraperQueue = new Queue(
settings: { webScraperQueueName,
lockDuration: 1 * 60 * 1000, // 1 minute in milliseconds, {
lockRenewTime: 15 * 1000, // 15 seconds in milliseconds connection: redisConnection,
stalledInterval: 30 * 1000,
maxStalledCount: 10,
},
defaultJobOptions:{
attempts: 5
} }
}); // {
// settings: {
// lockDuration: 1 * 60 * 1000, // 1 minute in milliseconds,
// lockRenewTime: 15 * 1000, // 15 seconds in milliseconds
// stalledInterval: 30 * 1000,
// maxStalledCount: 10,
// },
// defaultJobOptions:{
// attempts: 5
// }
// }
);
Logger.info("Web scraper queue created"); Logger.info("Web scraper queue created");
} }
return webScraperQueue; return webScraperQueue;
} }

View File

@ -1,14 +1,17 @@
import { CustomError } from "../lib/custom-error"; import { CustomError } from "../lib/custom-error";
import { getWebScraperQueue } from "./queue-service"; import { getWebScraperQueue, redisConnection, webScraperQueueName } from "./queue-service";
import "dotenv/config"; import "dotenv/config";
import { logtail } from "./logtail"; import { logtail } from "./logtail";
import { startWebScraperPipeline } from "../main/runWebScraper"; import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook"; import { callWebhook } from "./webhook";
import { logJob } from "./logging/log_job"; import { logJob } from "./logging/log_job";
import { initSDK } from '@hyperdx/node-opentelemetry'; import { initSDK } from '@hyperdx/node-opentelemetry';
import { Job } from "bull"; import { Job } from "bullmq";
import { Logger } from "../lib/logger"; import { Logger } from "../lib/logger";
import { ScrapeEvents } from "../lib/scrape-events"; import { ScrapeEvents } from "../lib/scrape-events";
import { Worker } from "bullmq";
import systemMonitor from "./system-monitor";
import { v4 as uuidv4 } from "uuid";
if (process.env.ENV === 'production') { if (process.env.ENV === 'production') {
initSDK({ initSDK({
@ -16,21 +19,115 @@ if (process.env.ENV === 'production') {
additionalInstrumentations: [], additionalInstrumentations: [],
}); });
} }
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
const workerLockDuration = Number(process.env.WORKER_LOCK_DURATION) || 60000;
const workerStalledCheckInterval =
Number(process.env.WORKER_STALLED_CHECK_INTERVAL) || 30000;
const jobLockExtendInterval =
Number(process.env.JOB_LOCK_EXTEND_INTERVAL) || 15000;
const jobLockExtensionTime =
Number(process.env.JOB_LOCK_EXTENSION_TIME) || 15000;
const cantAcceptConnectionInterval =
Number(process.env.CANT_ACCEPT_CONNECTION_INTERVAL) || 2000;
const connectionMonitorInterval =
Number(process.env.CONNECTION_MONITOR_INTERVAL) || 10;
const gotJobInterval = Number(process.env.CONNECTION_MONITOR_INTERVAL) || 20;
const wsq = getWebScraperQueue(); const wsq = getWebScraperQueue();
async function processJob(job: Job, done) { const processJobInternal = async (token: string, job: Job) => {
const extendLockInterval = setInterval(async () => {
await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval);
try {
const result = await processJob(job, token);
// await resultQueue.add('resultJob', result,{jobId: job.id});
console.log("🐂 Job completed", result);
console.log({token})
console.log(await job.getState())
// await job.moveToCompleted(result, token, false); //3rd arg fetchNext
} catch (error) {
console.log("Job failed, error:", error);
await job.moveToFailed(error, token, false);
} finally {
clearInterval(extendLockInterval);
}
};
let isShuttingDown = false;
process.on("SIGINT", () => {
console.log("Received SIGINT. Shutting down gracefully...");
isShuttingDown = true;
});
const workerFun = async () => {
// const bullQueueName = queueNames[engine];
// const resultQueue = messageQueues[engine];
const worker = new Worker(webScraperQueueName, null, {
connection: redisConnection,
lockDuration: 1 * 60 * 1000, // 1 minute
// lockRenewTime: 15 * 1000, // 15 seconds
stalledInterval: 30 * 1000, // 30 seconds
maxStalledCount: 10, // 10 times
});
worker.startStalledCheckTimer();
let contextManager;
const monitor = await systemMonitor;
while (true) {
if (isShuttingDown) {
console.log("No longer accepting new jobs. SIGINT");
break;
}
const token = uuidv4();
// console.time("acceptConnectionDelay");
const canAcceptConnection = await monitor.acceptConnection();
// console.timeEnd("acceptConnectionDelay");
// console.log("canAcceptConnection", canAcceptConnection);
if (!canAcceptConnection) {
console.log("Cant accept connection");
await sleep(cantAcceptConnectionInterval); // more sleep
continue;
}
const job = await worker.getNextJob(token);
// console.log("job", job);
if (job) {
processJobInternal(token, job);
await sleep(gotJobInterval);
} else {
await sleep(connectionMonitorInterval);
}
}
};
workerFun();
async function processJob(job: Job, token: string) {
Logger.debug(`🐂 Worker taking job ${job.id}`); Logger.debug(`🐂 Worker taking job ${job.id}`);
try { try {
job.progress({ console.log("🐂 Updating progress");
console.log({job})
job.updateProgress({
current: 1, current: 1,
total: 100, total: 100,
current_step: "SCRAPING", current_step: "SCRAPING",
current_url: "", current_url: "",
}); });
const start = Date.now(); const start = Date.now();
const { success, message, docs } = await startWebScraperPipeline({ job }); const { success, message, docs } = await startWebScraperPipeline({ job, token });
const end = Date.now(); const end = Date.now();
const timeTakenInSeconds = (end - start) / 1000; const timeTakenInSeconds = (end - start) / 1000;
@ -64,10 +161,11 @@ async function processJob(job: Job, done) {
origin: job.data.origin, origin: job.data.origin,
}); });
Logger.debug(`🐂 Job done ${job.id}`); Logger.debug(`🐂 Job done ${job.id}`);
done(null, data); // done(null, data);
return data;
} catch (error) { } catch (error) {
Logger.error(`🐂 Job errored ${job.id} - ${error}`); Logger.error(`🐂 Job errored ${job.id} - ${error}`);
if (await getWebScraperQueue().isPaused(false)) { if (await getWebScraperQueue().isPaused()) {
Logger.debug("🐂Queue is paused, ignoring"); Logger.debug("🐂Queue is paused, ignoring");
return; return;
} }
@ -112,18 +210,19 @@ async function processJob(job: Job, done) {
pageOptions: job.data.pageOptions, pageOptions: job.data.pageOptions,
origin: job.data.origin, origin: job.data.origin,
}); });
done(null, data); // done(null, data);
return data;
} }
} }
wsq.process( // wsq.process(
Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)), // Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),
processJob // processJob
); // );
wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting")); // wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));
wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active")); // wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active"));
wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed")); // wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed"));
wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused")); // wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed")); // wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed")); // wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));

View File

@ -0,0 +1,81 @@
import si from 'systeminformation';
import { Mutex } from "async-mutex";
const MAX_CPU = process.env.MAX_CPU ? parseFloat(process.env.MAX_CPU) : 0.8;
const MAX_RAM = process.env.MAX_RAM ? parseFloat(process.env.MAX_RAM) : 0.8;
const CACHE_DURATION = process.env.SYS_INFO_MAX_CACHE_DURATION ? parseFloat(process.env.SYS_INFO_MAX_CACHE_DURATION) : 150;
class SystemMonitor {
private static instance: SystemMonitor;
private static instanceMutex = new Mutex();
private cpuUsageCache: number | null = null;
private memoryUsageCache: number | null = null;
private lastCpuCheck: number = 0;
private lastMemoryCheck: number = 0;
private constructor() {}
public static async getInstance(): Promise<SystemMonitor> {
if (SystemMonitor.instance) {
return SystemMonitor.instance;
}
await this.instanceMutex.runExclusive(async () => {
if (!SystemMonitor.instance) {
SystemMonitor.instance = new SystemMonitor();
}
});
return SystemMonitor.instance;
}
private async checkMemoryUsage() {
const now = Date.now();
if (this.memoryUsageCache !== null && (now - this.lastMemoryCheck) < CACHE_DURATION) {
return this.memoryUsageCache;
}
const memoryData = await si.mem();
const totalMemory = memoryData.total;
const availableMemory = memoryData.available;
const usedMemory = totalMemory - availableMemory;
const usedMemoryPercentage = (usedMemory / totalMemory);
this.memoryUsageCache = usedMemoryPercentage;
this.lastMemoryCheck = now;
return usedMemoryPercentage;
}
private async checkCpuUsage() {
const now = Date.now();
if (this.cpuUsageCache !== null && (now - this.lastCpuCheck) < CACHE_DURATION) {
return this.cpuUsageCache;
}
const cpuData = await si.currentLoad();
const cpuLoad = cpuData.currentLoad / 100;
this.cpuUsageCache = cpuLoad;
this.lastCpuCheck = now;
return cpuLoad;
}
public async acceptConnection() {
const cpuUsage = await this.checkCpuUsage();
const memoryUsage = await this.checkMemoryUsage();
return cpuUsage < MAX_CPU && memoryUsage < MAX_RAM;
}
public clearCache() {
this.cpuUsageCache = null;
this.memoryUsageCache = null;
this.lastCpuCheck = 0;
this.lastMemoryCheck = 0;
}
}
export default SystemMonitor.getInstance();