mirror of
https://github.com/mendableai/firecrawl.git
synced 2024-11-16 11:42:24 +08:00
fix queue stuck bug via lock setting changes
This commit is contained in:
parent
8d5ebc9b9f
commit
8160c311c0
|
@ -8,9 +8,6 @@ primary_region = 'mia'
|
||||||
kill_signal = 'SIGINT'
|
kill_signal = 'SIGINT'
|
||||||
kill_timeout = '30s'
|
kill_timeout = '30s'
|
||||||
|
|
||||||
[deploy]
|
|
||||||
release_command = 'node dist/src/trigger-shutdown.js https://staging-firecrawl-scraper-js.fly.dev'
|
|
||||||
|
|
||||||
[build]
|
[build]
|
||||||
|
|
||||||
[processes]
|
[processes]
|
||||||
|
|
|
@ -8,9 +8,6 @@ primary_region = 'mia'
|
||||||
kill_signal = 'SIGINT'
|
kill_signal = 'SIGINT'
|
||||||
kill_timeout = '30s'
|
kill_timeout = '30s'
|
||||||
|
|
||||||
[deploy]
|
|
||||||
release_command = 'node dist/src/trigger-shutdown.js https://api.firecrawl.dev'
|
|
||||||
|
|
||||||
[build]
|
[build]
|
||||||
|
|
||||||
[processes]
|
[processes]
|
||||||
|
|
|
@ -19,8 +19,8 @@
|
||||||
"mongo-docker": "docker run -d -p 2717:27017 -v ./mongo-data:/data/db --name mongodb mongo:latest",
|
"mongo-docker": "docker run -d -p 2717:27017 -v ./mongo-data:/data/db --name mongodb mongo:latest",
|
||||||
"mongo-docker-console": "docker exec -it mongodb mongosh",
|
"mongo-docker-console": "docker exec -it mongodb mongosh",
|
||||||
"run-example": "npx ts-node src/example.ts",
|
"run-example": "npx ts-node src/example.ts",
|
||||||
"deploy:fly": "flyctl deploy && node postdeploy.js https://api.firecrawl.dev",
|
"deploy:fly": "flyctl deploy",
|
||||||
"deploy:fly:staging": "fly deploy -c fly.staging.toml && node postdeploy.js https://staging-firecrawl-scraper-js.fly.dev"
|
"deploy:fly:staging": "fly deploy -c fly.staging.toml"
|
||||||
},
|
},
|
||||||
"author": "",
|
"author": "",
|
||||||
"license": "ISC",
|
"license": "ISC",
|
||||||
|
|
|
@ -1,11 +0,0 @@
|
||||||
require("dotenv").config();
|
|
||||||
|
|
||||||
fetch(process.argv[2] + "/admin/" + process.env.BULL_AUTH_KEY + "/unpause", {
|
|
||||||
method: "POST"
|
|
||||||
}).then(async x => {
|
|
||||||
console.log(await x.text());
|
|
||||||
process.exit(0);
|
|
||||||
}).catch(e => {
|
|
||||||
console.error(e);
|
|
||||||
process.exit(1);
|
|
||||||
});
|
|
|
@ -119,63 +119,6 @@ if (cluster.isMaster) {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
app.post(`/admin/${process.env.BULL_AUTH_KEY}/shutdown`, async (req, res) => {
|
|
||||||
// return res.status(200).json({ ok: true });
|
|
||||||
try {
|
|
||||||
console.log("Gracefully shutting down...");
|
|
||||||
await getWebScraperQueue().pause(false, true);
|
|
||||||
res.json({ ok: true });
|
|
||||||
} catch (error) {
|
|
||||||
console.error(error);
|
|
||||||
return res.status(500).json({ error: error.message });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
app.post(`/admin/${process.env.BULL_AUTH_KEY}/unpause`, async (req, res) => {
|
|
||||||
try {
|
|
||||||
const wsq = getWebScraperQueue();
|
|
||||||
|
|
||||||
const jobs = await wsq.getActive();
|
|
||||||
|
|
||||||
console.log("Requeueing", jobs.length, "jobs...");
|
|
||||||
|
|
||||||
if (jobs.length > 0) {
|
|
||||||
console.log(" Removing", jobs.length, "jobs...");
|
|
||||||
|
|
||||||
await Promise.all(
|
|
||||||
jobs.map(async (x) => {
|
|
||||||
try {
|
|
||||||
await wsq.client.del(await x.lockKey());
|
|
||||||
await x.takeLock();
|
|
||||||
await x.moveToFailed({ message: "interrupted" });
|
|
||||||
await x.remove();
|
|
||||||
} catch (e) {
|
|
||||||
console.warn("Failed to remove job", x.id, e);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
);
|
|
||||||
|
|
||||||
console.log(" Re-adding", jobs.length, "jobs...");
|
|
||||||
await wsq.addBulk(
|
|
||||||
jobs.map((x) => ({
|
|
||||||
data: x.data,
|
|
||||||
opts: {
|
|
||||||
jobId: x.id,
|
|
||||||
},
|
|
||||||
}))
|
|
||||||
);
|
|
||||||
|
|
||||||
console.log(" Done!");
|
|
||||||
}
|
|
||||||
|
|
||||||
await getWebScraperQueue().resume(false);
|
|
||||||
res.json({ ok: true });
|
|
||||||
} catch (error) {
|
|
||||||
console.error(error);
|
|
||||||
return res.status(500).json({ error: error.message });
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
app.get(`/serverHealthCheck`, async (req, res) => {
|
app.get(`/serverHealthCheck`, async (req, res) => {
|
||||||
try {
|
try {
|
||||||
const webScraperQueue = getWebScraperQueue();
|
const webScraperQueue = getWebScraperQueue();
|
||||||
|
|
|
@ -7,8 +7,10 @@ export function getWebScraperQueue() {
|
||||||
if (!webScraperQueue) {
|
if (!webScraperQueue) {
|
||||||
webScraperQueue = new Queue("web-scraper", process.env.REDIS_URL, {
|
webScraperQueue = new Queue("web-scraper", process.env.REDIS_URL, {
|
||||||
settings: {
|
settings: {
|
||||||
lockDuration: 2 * 60 * 60 * 1000, // 2 hours in milliseconds,
|
lockDuration: 2 * 60 * 1000, // 1 minute in milliseconds,
|
||||||
lockRenewTime: 30 * 60 * 1000, // 30 minutes in milliseconds
|
lockRenewTime: 15 * 1000, // 15 seconds in milliseconds
|
||||||
|
stalledInterval: 30 * 1000,
|
||||||
|
maxStalledCount: 10,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
console.log("Web scraper queue created");
|
console.log("Web scraper queue created");
|
||||||
|
|
|
@ -6,6 +6,7 @@ import { startWebScraperPipeline } from "../main/runWebScraper";
|
||||||
import { callWebhook } from "./webhook";
|
import { callWebhook } from "./webhook";
|
||||||
import { logJob } from "./logging/log_job";
|
import { logJob } from "./logging/log_job";
|
||||||
import { initSDK } from '@hyperdx/node-opentelemetry';
|
import { initSDK } from '@hyperdx/node-opentelemetry';
|
||||||
|
import { Job } from "bull";
|
||||||
|
|
||||||
if(process.env.ENV === 'production') {
|
if(process.env.ENV === 'production') {
|
||||||
initSDK({
|
initSDK({
|
||||||
|
@ -16,93 +17,99 @@ if(process.env.ENV === 'production') {
|
||||||
|
|
||||||
const wsq = getWebScraperQueue();
|
const wsq = getWebScraperQueue();
|
||||||
|
|
||||||
wsq.process(
|
async function processJob(job: Job, done) {
|
||||||
Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),
|
console.log("taking job", job.id);
|
||||||
async function (job, done) {
|
try {
|
||||||
try {
|
job.progress({
|
||||||
job.progress({
|
current: 1,
|
||||||
current: 1,
|
total: 100,
|
||||||
total: 100,
|
current_step: "SCRAPING",
|
||||||
current_step: "SCRAPING",
|
current_url: "",
|
||||||
current_url: "",
|
});
|
||||||
});
|
const start = Date.now();
|
||||||
const start = Date.now();
|
const { success, message, docs } = await startWebScraperPipeline({ job });
|
||||||
const { success, message, docs } = await startWebScraperPipeline({ job });
|
const end = Date.now();
|
||||||
const end = Date.now();
|
const timeTakenInSeconds = (end - start) / 1000;
|
||||||
const timeTakenInSeconds = (end - start) / 1000;
|
|
||||||
|
|
||||||
const data = {
|
const data = {
|
||||||
success: success,
|
success: success,
|
||||||
result: {
|
result: {
|
||||||
links: docs.map((doc) => {
|
links: docs.map((doc) => {
|
||||||
return { content: doc, source: doc?.metadata?.sourceURL ?? doc?.url ?? "" };
|
return { content: doc, source: doc?.metadata?.sourceURL ?? doc?.url ?? "" };
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
project_id: job.data.project_id,
|
project_id: job.data.project_id,
|
||||||
error: message /* etc... */,
|
error: message /* etc... */,
|
||||||
};
|
};
|
||||||
|
|
||||||
await callWebhook(job.data.team_id, job.id as string, data);
|
await callWebhook(job.data.team_id, job.id as string, data);
|
||||||
|
|
||||||
await logJob({
|
await logJob({
|
||||||
job_id: job.id as string,
|
job_id: job.id as string,
|
||||||
success: success,
|
success: success,
|
||||||
message: message,
|
message: message,
|
||||||
num_docs: docs.length,
|
num_docs: docs.length,
|
||||||
docs: docs,
|
docs: docs,
|
||||||
time_taken: timeTakenInSeconds,
|
time_taken: timeTakenInSeconds,
|
||||||
team_id: job.data.team_id,
|
team_id: job.data.team_id,
|
||||||
mode: "crawl",
|
mode: "crawl",
|
||||||
url: job.data.url,
|
url: job.data.url,
|
||||||
crawlerOptions: job.data.crawlerOptions,
|
crawlerOptions: job.data.crawlerOptions,
|
||||||
pageOptions: job.data.pageOptions,
|
pageOptions: job.data.pageOptions,
|
||||||
origin: job.data.origin,
|
origin: job.data.origin,
|
||||||
});
|
});
|
||||||
done(null, data);
|
console.log("job done", job.id);
|
||||||
} catch (error) {
|
done(null, data);
|
||||||
if (await getWebScraperQueue().isPaused(false)) {
|
} catch (error) {
|
||||||
return;
|
console.log("job errored", job.id, error);
|
||||||
}
|
if (await getWebScraperQueue().isPaused(false)) {
|
||||||
|
console.log("queue is paused, ignoring");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (error instanceof CustomError) {
|
if (error instanceof CustomError) {
|
||||||
// Here we handle the error, then save the failed job
|
// Here we handle the error, then save the failed job
|
||||||
console.error(error.message); // or any other error handling
|
console.error(error.message); // or any other error handling
|
||||||
|
|
||||||
logtail.error("Custom error while ingesting", {
|
logtail.error("Custom error while ingesting", {
|
||||||
job_id: job.id,
|
|
||||||
error: error.message,
|
|
||||||
dataIngestionJob: error.dataIngestionJob,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
console.log(error);
|
|
||||||
|
|
||||||
logtail.error("Overall error ingesting", {
|
|
||||||
job_id: job.id,
|
job_id: job.id,
|
||||||
error: error.message,
|
error: error.message,
|
||||||
|
dataIngestionJob: error.dataIngestionJob,
|
||||||
});
|
});
|
||||||
|
|
||||||
const data = {
|
|
||||||
success: false,
|
|
||||||
project_id: job.data.project_id,
|
|
||||||
error:
|
|
||||||
"Something went wrong... Contact help@mendable.ai or try again." /* etc... */,
|
|
||||||
};
|
|
||||||
await callWebhook(job.data.team_id, job.id as string, data);
|
|
||||||
await logJob({
|
|
||||||
job_id: job.id as string,
|
|
||||||
success: false,
|
|
||||||
message: typeof error === 'string' ? error : (error.message ?? "Something went wrong... Contact help@mendable.ai"),
|
|
||||||
num_docs: 0,
|
|
||||||
docs: [],
|
|
||||||
time_taken: 0,
|
|
||||||
team_id: job.data.team_id,
|
|
||||||
mode: "crawl",
|
|
||||||
url: job.data.url,
|
|
||||||
crawlerOptions: job.data.crawlerOptions,
|
|
||||||
pageOptions: job.data.pageOptions,
|
|
||||||
origin: job.data.origin,
|
|
||||||
});
|
|
||||||
done(null, data);
|
|
||||||
}
|
}
|
||||||
|
console.log(error);
|
||||||
|
|
||||||
|
logtail.error("Overall error ingesting", {
|
||||||
|
job_id: job.id,
|
||||||
|
error: error.message,
|
||||||
|
});
|
||||||
|
|
||||||
|
const data = {
|
||||||
|
success: false,
|
||||||
|
project_id: job.data.project_id,
|
||||||
|
error:
|
||||||
|
"Something went wrong... Contact help@mendable.ai or try again." /* etc... */,
|
||||||
|
};
|
||||||
|
await callWebhook(job.data.team_id, job.id as string, data);
|
||||||
|
await logJob({
|
||||||
|
job_id: job.id as string,
|
||||||
|
success: false,
|
||||||
|
message: typeof error === 'string' ? error : (error.message ?? "Something went wrong... Contact help@mendable.ai"),
|
||||||
|
num_docs: 0,
|
||||||
|
docs: [],
|
||||||
|
time_taken: 0,
|
||||||
|
team_id: job.data.team_id,
|
||||||
|
mode: "crawl",
|
||||||
|
url: job.data.url,
|
||||||
|
crawlerOptions: job.data.crawlerOptions,
|
||||||
|
pageOptions: job.data.pageOptions,
|
||||||
|
origin: job.data.origin,
|
||||||
|
});
|
||||||
|
done(null, data);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wsq.process(
|
||||||
|
Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),
|
||||||
|
processJob
|
||||||
);
|
);
|
||||||
|
|
|
@ -1,9 +0,0 @@
|
||||||
fetch(process.argv[2] + "/admin/" + process.env.BULL_AUTH_KEY + "/shutdown", {
|
|
||||||
method: "POST"
|
|
||||||
}).then(async x => {
|
|
||||||
console.log(await x.text());
|
|
||||||
process.exit(0);
|
|
||||||
}).catch(e => {
|
|
||||||
console.error(e);
|
|
||||||
process.exit(1);
|
|
||||||
});
|
|
Loading…
Reference in New Issue
Block a user