Merge pull request #824 from mendableai/mog/concurrency-limit-2

concurrency limit fix
This commit is contained in:
Nicolas 2024-10-30 17:14:20 -03:00 committed by GitHub
commit f5c58e0c51
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 162 additions and 102 deletions

View File

@ -193,13 +193,9 @@ export async function crawlController(req: Request, res: Response) {
id,
jobs.map((x) => x.opts.jobId)
);
if (Sentry.isInitialized()) {
for (const job of jobs) {
// add with sentry instrumentation
await addScrapeJob(job.data as any, {}, job.opts.jobId);
}
} else {
await getScrapeQueue().addBulk(jobs);
for (const job of jobs) {
// add with sentry instrumentation
await addScrapeJob(job.data as any, {}, job.opts.jobId);
}
} else {
await lockURL(id, sc, url);
@ -207,7 +203,8 @@ export async function crawlController(req: Request, res: Response) {
// Not needed, first one should be 15.
// const jobPriority = await getJobPriority({plan, team_id, basePriority: 10})
const job = await addScrapeJob(
const jobId = uuidv4();
await addScrapeJob(
{
url,
mode: "single_urls",
@ -220,9 +217,10 @@ export async function crawlController(req: Request, res: Response) {
},
{
priority: 15, // prioritize request 0 of crawl jobs same as scrape jobs
}
},
jobId,
);
await addCrawlJob(id, job.id);
await addCrawlJob(id, jobId);
}
res.json({ jobId: id });

View File

@ -103,7 +103,8 @@ export async function crawlPreviewController(req: Request, res: Response) {
if (sitemap !== null) {
for (const url of sitemap.map(x => x.url)) {
await lockURL(id, sc, url);
const job = await addScrapeJob({
const jobId = uuidv4();
await addScrapeJob({
url,
mode: "single_urls",
crawlerOptions: crawlerOptions,
@ -113,12 +114,13 @@ export async function crawlPreviewController(req: Request, res: Response) {
origin: "website-preview",
crawl_id: id,
sitemapped: true,
});
await addCrawlJob(id, job.id);
}, {}, jobId);
await addCrawlJob(id, jobId);
}
} else {
await lockURL(id, sc, url);
const job = await addScrapeJob({
const jobId = uuidv4();
await addScrapeJob({
url,
mode: "single_urls",
crawlerOptions: crawlerOptions,
@ -127,8 +129,8 @@ export async function crawlPreviewController(req: Request, res: Response) {
pageOptions: pageOptions,
origin: "website-preview",
crawl_id: id,
});
await addCrawlJob(id, job.id);
}, {}, jobId);
await addCrawlJob(id, jobId);
}
res.json({ jobId: id });

View File

@ -54,7 +54,7 @@ export async function scrapeHelper(
const jobPriority = await getJobPriority({ plan, team_id, basePriority: 10 });
const job = await addScrapeJob(
await addScrapeJob(
{
url,
mode: "single_urls",
@ -81,7 +81,7 @@ export async function scrapeHelper(
},
async (span) => {
try {
doc = (await waitForJob(job.id, timeout))[0];
doc = (await waitForJob(jobId, timeout))[0];
} catch (e) {
if (e instanceof Error && e.message.startsWith("Job wait")) {
span.setAttribute("timedOut", true);
@ -116,10 +116,10 @@ export async function scrapeHelper(
return err;
}
await job.remove();
await getScrapeQueue().remove(jobId);
if (!doc) {
console.error("!!! PANIC DOC IS", doc, job);
console.error("!!! PANIC DOC IS", doc);
return {
success: true,
error: "No page found",

View File

@ -99,24 +99,19 @@ export async function searchHelper(
};
})
let jobs = [];
if (Sentry.isInitialized()) {
for (const job of jobDatas) {
// add with sentry instrumentation
jobs.push(await addScrapeJob(job.data as any, {}, job.opts.jobId, job.opts.priority));
}
} else {
jobs = await getScrapeQueue().addBulk(jobDatas);
await getScrapeQueue().addBulk(jobs);
// TODO: addScrapeJobs
for (const job of jobDatas) {
await addScrapeJob(job.data as any, {}, job.opts.jobId, job.opts.priority)
}
const docs = (await Promise.all(jobs.map(x => waitForJob(x.id, 60000)))).map(x => x[0]);
const docs = (await Promise.all(jobDatas.map(x => waitForJob(x.opts.jobId, 60000)))).map(x => x[0]);
if (docs.length === 0) {
return { success: true, error: "No search results found", returnCode: 200 };
}
await Promise.all(jobs.map(x => x.remove()));
const sq = getScrapeQueue();
await Promise.all(jobDatas.map(x => sq.remove(x.opts.jobId)));
// make sure doc.content is not empty
const filteredDocs = docs.filter(

View File

@ -17,6 +17,7 @@ import {
import { logCrawl } from "../../services/logging/crawl_log";
import { getScrapeQueue } from "../../services/queue-service";
import { getJobPriority } from "../../lib/job-priority";
import { addScrapeJobs } from "../../services/queue-jobs";
export async function batchScrapeController(
req: RequestWithAuth<{}, CrawlResponse, BatchScrapeRequest>,
@ -58,12 +59,10 @@ export async function batchScrapeController(
}
const jobs = req.body.urls.map((x) => {
const uuid = uuidv4();
return {
name: uuid,
data: {
url: x,
mode: "single_urls",
mode: "single_urls" as const,
team_id: req.auth.team_id,
plan: req.auth.plan,
crawlerOptions: null,
@ -75,7 +74,7 @@ export async function batchScrapeController(
v1: true,
},
opts: {
jobId: uuid,
jobId: uuidv4(),
priority: 20,
},
};
@ -89,7 +88,7 @@ export async function batchScrapeController(
id,
jobs.map((x) => x.opts.jobId)
);
await getScrapeQueue().addBulk(jobs);
await addScrapeJobs(jobs);
const protocol = process.env.ENV === "local" ? req.protocol : "https";

View File

@ -137,7 +137,8 @@ export async function crawlController(
await getScrapeQueue().addBulk(jobs);
} else {
await lockURL(id, sc, req.body.url);
const job = await addScrapeJob(
const jobId = uuidv4();
await addScrapeJob(
{
url: req.body.url,
mode: "single_urls",
@ -152,9 +153,10 @@ export async function crawlController(
},
{
priority: 15,
}
},
jobId,
);
await addCrawlJob(id, job.id);
await addCrawlJob(id, jobId);
}
if(req.body.webhook) {

View File

@ -17,6 +17,7 @@ import { addScrapeJob, waitForJob } from "../../services/queue-jobs";
import { logJob } from "../../services/logging/log_job";
import { getJobPriority } from "../../lib/job-priority";
import { PlanType } from "../../types";
import { getScrapeQueue } from "../../services/queue-service";
export async function scrapeController(
req: RequestWithAuth<{}, ScrapeResponse, ScrapeRequest>,
@ -38,7 +39,7 @@ export async function scrapeController(
basePriority: 10,
});
const job = await addScrapeJob(
await addScrapeJob(
{
url: req.body.url,
mode: "single_urls",
@ -59,7 +60,7 @@ export async function scrapeController(
let doc: any | undefined;
try {
doc = (await waitForJob(job.id, timeout + totalWait))[0];
doc = (await waitForJob(jobId, timeout + totalWait))[0];
} catch (e) {
Logger.error(`Error in scrapeController: ${e}`);
if (e instanceof Error && e.message.startsWith("Job wait")) {
@ -79,10 +80,10 @@ export async function scrapeController(
}
}
await job.remove();
await getScrapeQueue().remove(jobId);
if (!doc) {
console.error("!!! PANIC DOC IS", doc, job);
console.error("!!! PANIC DOC IS", doc);
return res.status(200).json({
success: true,
warning: "No page found",

View File

@ -0,0 +1,48 @@
import { getRateLimiterPoints } from "../services/rate-limiter";
import { redisConnection } from "../services/queue-service";
import { RateLimiterMode } from "../types";
import { JobsOptions } from "bullmq";
const constructKey = (team_id: string) => "concurrency-limiter:" + team_id;
const constructQueueKey = (team_id: string) => "concurrency-limit-queue:" + team_id;
const stalledJobTimeoutMs = 2 * 60 * 1000;
export function getConcurrencyLimitMax(plan: string): number {
return getRateLimiterPoints(RateLimiterMode.Scrape, undefined, plan);
}
export async function cleanOldConcurrencyLimitEntries(team_id: string, now: number = Date.now()) {
await redisConnection.zremrangebyscore(constructKey(team_id), -Infinity, now);
}
export async function getConcurrencyLimitActiveJobs(team_id: string, now: number = Date.now()): Promise<string[]> {
return await redisConnection.zrangebyscore(constructKey(team_id), now, Infinity);
}
export async function pushConcurrencyLimitActiveJob(team_id: string, id: string, now: number = Date.now()) {
await redisConnection.zadd(constructKey(team_id), now + stalledJobTimeoutMs, id);
}
export async function removeConcurrencyLimitActiveJob(team_id: string, id: string) {
await redisConnection.zrem(constructKey(team_id), id);
}
export type ConcurrencyLimitedJob = {
id: string;
data: any;
opts: JobsOptions;
priority?: number;
}
export async function takeConcurrencyLimitedJob(team_id: string): Promise<ConcurrencyLimitedJob | null> {
const res = await redisConnection.zmpop(1, constructQueueKey(team_id), "MIN");
if (res === null || res === undefined) {
return null;
}
return JSON.parse(res[1][0][0]);
}
export async function pushConcurrencyLimitedJob(team_id: string, job: ConcurrencyLimitedJob) {
await redisConnection.zadd(constructQueueKey(team_id), job.priority ?? 1, JSON.stringify(job));
}

View File

@ -1,20 +1,47 @@
import { Job, Queue } from "bullmq";
import { Job, JobsOptions } from "bullmq";
import { getScrapeQueue } from "./queue-service";
import { v4 as uuidv4 } from "uuid";
import { WebScraperOptions } from "../types";
import * as Sentry from "@sentry/node";
import { cleanOldConcurrencyLimitEntries, getConcurrencyLimitActiveJobs, getConcurrencyLimitMax, pushConcurrencyLimitActiveJob, pushConcurrencyLimitedJob } from "../lib/concurrency-limit";
async function addScrapeJobRaw(
webScraperOptions: any,
options: any,
jobId: string,
jobPriority: number = 10
): Promise<Job> {
return await getScrapeQueue().add(jobId, webScraperOptions, {
...options,
priority: jobPriority,
jobId,
});
) {
let concurrencyLimited = false;
if (webScraperOptions && webScraperOptions.team_id && webScraperOptions.plan) {
const now = Date.now();
const limit = await getConcurrencyLimitMax(webScraperOptions.plan);
cleanOldConcurrencyLimitEntries(webScraperOptions.team_id, now);
concurrencyLimited = (await getConcurrencyLimitActiveJobs(webScraperOptions.team_id, now)).length >= limit;
}
if (concurrencyLimited) {
await pushConcurrencyLimitedJob(webScraperOptions.team_id, {
id: jobId,
data: webScraperOptions,
opts: {
...options,
priority: jobPriority,
jobId: jobId,
},
priority: jobPriority,
});
} else {
if (webScraperOptions && webScraperOptions.team_id && webScraperOptions.plan) {
await pushConcurrencyLimitActiveJob(webScraperOptions.team_id, jobId);
}
await getScrapeQueue().add(jobId, webScraperOptions, {
...options,
priority: jobPriority,
jobId,
});
}
}
export async function addScrapeJob(
@ -22,8 +49,7 @@ export async function addScrapeJob(
options: any = {},
jobId: string = uuidv4(),
jobPriority: number = 10
): Promise<Job> {
) {
if (Sentry.isInitialized()) {
const size = JSON.stringify(webScraperOptions).length;
return await Sentry.startSpan({
@ -35,7 +61,7 @@ export async function addScrapeJob(
"messaging.message.body.size": size,
},
}, async (span) => {
return await addScrapeJobRaw({
await addScrapeJobRaw({
...webScraperOptions,
sentry: {
trace: Sentry.spanToTraceHeader(span),
@ -45,10 +71,23 @@ export async function addScrapeJob(
}, options, jobId, jobPriority);
});
} else {
return await addScrapeJobRaw(webScraperOptions, options, jobId, jobPriority);
await addScrapeJobRaw(webScraperOptions, options, jobId, jobPriority);
}
}
export async function addScrapeJobs(
jobs: {
data: WebScraperOptions,
opts: {
jobId: string,
priority: number,
},
}[],
) {
// TODO: better
await Promise.all(jobs.map(job => addScrapeJob(job.data, job.opts, job.opts.jobId, job.opts.priority)));
}
export function waitForJob(jobId: string, timeout: number) {
return new Promise((resolve, reject) => {
const start = Date.now();

View File

@ -38,6 +38,7 @@ import { PlanType, RateLimiterMode } from "../types";
import { getJobs } from "../../src/controllers/v1/crawl-status";
import { configDotenv } from "dotenv";
import { getRateLimiterPoints } from "./rate-limiter";
import { cleanOldConcurrencyLimitEntries, pushConcurrencyLimitActiveJob, removeConcurrencyLimitActiveJob, takeConcurrencyLimitedJob } from "../lib/concurrency-limit";
configDotenv();
if (process.env.ENV === "production") {
@ -135,46 +136,27 @@ const workerFun = async (
const job = await worker.getNextJob(token);
if (job) {
const concurrencyLimiterKey = "concurrency-limiter:" + job.data?.team_id;
async function afterJobDone(job: Job<any, any, string>) {
if (job.id && job.data && job.data.team_id && job.data.plan) {
await removeConcurrencyLimitActiveJob(job.data.team_id, job.id);
cleanOldConcurrencyLimitEntries(job.data.team_id);
if (job.data && job.data.team_id && job.data.plan) {
const concurrencyLimiterThrottledKey = "concurrency-limiter:" + job.data.team_id + ":throttled";
const concurrencyLimit = getRateLimiterPoints(RateLimiterMode.Scrape, undefined, job.data.plan);
const now = Date.now();
const stalledJobTimeoutMs = 2 * 60 * 1000;
const throttledJobTimeoutMs = 10 * 60 * 1000;
// Queue up next job, if it exists
// No need to check if we're under the limit here -- if the current job is finished,
// we are 1 under the limit, assuming the job insertion logic never over-inserts. - MG
const nextJob = await takeConcurrencyLimitedJob(job.data.team_id);
if (nextJob !== null) {
await pushConcurrencyLimitActiveJob(job.data.team_id, nextJob.id);
redisConnection.zremrangebyscore(concurrencyLimiterThrottledKey, -Infinity, now);
redisConnection.zremrangebyscore(concurrencyLimiterKey, -Infinity, now);
const activeJobsOfTeam = await redisConnection.zrangebyscore(concurrencyLimiterKey, now, Infinity);
if (activeJobsOfTeam.length >= concurrencyLimit) {
// Nick: removed the log because it was too spammy, tested and confirmed that the job is added back to the queue
// Logger.info("Moving job " + job.id + " back the queue -- concurrency limit hit");
// Concurrency limit hit, throttles the job
await redisConnection.zadd(concurrencyLimiterThrottledKey, now + throttledJobTimeoutMs, job.id);
// We move to failed with a specific error
await job.moveToFailed(new Error("Concurrency limit hit"), token, false);
// Remove the job from the queue
await job.remove();
// Increment the priority of the job exponentially by 5%, Note: max bull priority is 2 million
const newJobPriority = Math.min(Math.round((job.opts.priority ?? 10) * 1.05), 20000);
// Add the job back to the queue with the new priority
await queue.add(job.name, {
...job.data,
concurrencyLimitHit: true,
}, {
...job.opts,
jobId: job.id,
priority: newJobPriority, // exponential backoff for stuck jobs
});
// await sleep(gotJobInterval);
continue;
} else {
// If we are not throttled, add the job back to the queue with the new priority
await redisConnection.zadd(concurrencyLimiterKey, now + stalledJobTimeoutMs, job.id);
// Remove the job from the throttled list
await redisConnection.zrem(concurrencyLimiterThrottledKey, job.id);
await queue.add(nextJob.id, {
...nextJob.data,
concurrencyLimitHit: true,
}, {
...nextJob.opts,
jobId: nextJob.id,
priority: nextJob.priority,
});
}
}
}
@ -212,9 +194,7 @@ const workerFun = async (
try {
res = await processJobInternal(token, job);
} finally {
if (job.id && job.data && job.data.team_id) {
await redisConnection.zrem(concurrencyLimiterKey, job.id);
}
await afterJobDone(job)
}
if (res !== null) {
@ -239,11 +219,7 @@ const workerFun = async (
},
() => {
processJobInternal(token, job)
.finally(() => {
if (job.id && job.data && job.data.team_id) {
redisConnection.zrem(concurrencyLimiterKey, job.id);
}
});
.finally(() => afterJobDone(job));
}
);
}
@ -391,7 +367,7 @@ async function processJob(job: Job, token: string) {
// console.log("base priority: ", job.data.crawl_id ? 20 : 10)
// console.log("job priority: " , jobPriority, "\n\n\n")
const newJob = await addScrapeJob(
await addScrapeJob(
{
url: link,
mode: "single_urls",
@ -409,7 +385,7 @@ async function processJob(job: Job, token: string) {
jobPriority
);
await addCrawlJob(job.data.crawl_id, newJob.id);
await addCrawlJob(job.data.crawl_id, jobId);
}
}
}