feat(sentry): add trace continuity for queue

This commit is contained in:
Gergő Móricz 2024-08-22 16:47:38 +02:00
parent 6d92b8524d
commit 6d48dbcd38
8 changed files with 95 additions and 25 deletions

View File

@ -194,7 +194,14 @@ export async function crawlController(req: Request, res: Response) {
id,
jobs.map((x) => x.opts.jobId)
);
if (Sentry.isInitialized()) {
for (const job of jobs) {
// add with sentry instrumentation
await addScrapeJob(job.data as any, {}, job.opts.jobId);
}
} else {
await getScrapeQueue().addBulk(jobs);
}
} else {
await lockURL(id, sc, url);
const job = await addScrapeJob(

View File

@ -50,12 +50,12 @@ export async function scrapeHelper(
let doc;
const err = await Sentry.startSpanManual({ name: "Wait for job to finish", op: "bullmq.wait", attributes: { job: jobId } }, async (span) => {
const err = await Sentry.startSpan({ name: "Wait for job to finish", op: "bullmq.wait", attributes: { job: jobId } }, async (span) => {
try {
doc = (await job.waitUntilFinished(scrapeQueueEvents, timeout))[0]
} catch (e) {
if (e instanceof Error && e.message.startsWith("Job wait")) {
span.setAttribute("timedOut", true).end();
span.setAttribute("timedOut", true);
return {
success: false,
error: "Request timed out",
@ -65,7 +65,7 @@ export async function scrapeHelper(
throw e;
}
}
span.setAttribute("result", JSON.stringify(doc)).end();
span.setAttribute("result", JSON.stringify(doc));
return null;
});

View File

@ -1,8 +1,6 @@
import { Request, Response } from "express";
import { Logger } from "../../src/lib/logger";
import { getCrawl, getCrawlJobs } from "../../src/lib/crawl-redis";
import { getScrapeQueue } from "../../src/services/queue-service";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
import { getJobs } from "./crawl-status";
import * as Sentry from "@sentry/node";

View File

@ -12,7 +12,6 @@ import { Document } from "../lib/entities";
import { supabase_service } from "../services/supabase";
import { Logger } from "../lib/logger";
import { ScrapeEvents } from "../lib/scrape-events";
import { getScrapeQueue } from "../services/queue-service";
export async function startWebScraperPipeline({
job,

View File

@ -16,7 +16,6 @@ import {
replacePathsWithAbsolutePaths,
} from "./utils/replacePaths";
import { generateCompletions } from "../../lib/LLM-extraction";
import { getScrapeQueue } from "../../../src/services/queue-service";
import { fetchAndProcessDocx } from "./utils/docxProcessor";
import { getAdjustedMaxDepth, getURLDepth } from "./utils/maxDepthUtils";
import { Logger } from "../../lib/logger";

View File

@ -2,11 +2,12 @@ import { Job, Queue } from "bullmq";
import { getScrapeQueue } from "./queue-service";
import { v4 as uuidv4 } from "uuid";
import { WebScraperOptions } from "../types";
import * as Sentry from "@sentry/node";
export async function addScrapeJob(
webScraperOptions: WebScraperOptions,
options: any = {},
jobId: string = uuidv4(),
async function addScrapeJobRaw(
webScraperOptions: any,
options: any,
jobId: string,
): Promise<Job> {
return await getScrapeQueue().add(jobId, webScraperOptions, {
priority: webScraperOptions.crawl_id ? 20 : 10,
@ -15,3 +16,33 @@ export async function addScrapeJob(
});
}
export async function addScrapeJob(
webScraperOptions: WebScraperOptions,
options: any = {},
jobId: string = uuidv4(),
): Promise<Job> {
if (Sentry.isInitialized()) {
const size = JSON.stringify(webScraperOptions).length;
return await Sentry.startSpan({
name: "Add scrape job",
op: "queue.publish",
attributes: {
"messaging.message.id": jobId,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": size,
},
}, async (span) => {
return await addScrapeJobRaw({
...webScraperOptions,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
}, options, jobId);
});
} else {
return await addScrapeJobRaw(webScraperOptions, options, jobId);
}
}

View File

@ -50,6 +50,7 @@ const processJobInternal = async (token: string, job: Job) => {
await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval);
let err = null;
try {
const result = await processJob(job, token);
try{
@ -62,11 +63,14 @@ const processJobInternal = async (token: string, job: Job) => {
}
} catch (error) {
console.log("Job failed, error:", error);
Sentry.captureException(error);
err = error;
await job.moveToFailed(error, token, false);
} finally {
clearInterval(extendLockInterval);
}
return err;
};
let isShuttingDown = false;
@ -76,7 +80,7 @@ process.on("SIGINT", () => {
isShuttingDown = true;
});
const workerFun = async (queueName: string, processJobInternal: (token: string, job: Job) => Promise<void>) => {
const workerFun = async (queueName: string, processJobInternal: (token: string, job: Job) => Promise<any>) => {
const worker = new Worker(queueName, null, {
connection: redisConnection,
lockDuration: 1 * 60 * 1000, // 1 minute
@ -104,16 +108,47 @@ const workerFun = async (queueName: string, processJobInternal: (token: string,
const job = await worker.getNextJob(token);
if (job) {
if (job.data && job.data.sentry && Sentry.isInitialized()) {
Sentry.continueTrace({ sentryTrace: job.data.sentry.trace, baggage: job.data.sentry.baggage }, () => {
Sentry.startSpan({
name: "Scrape job",
op: "bullmq.job",
attributes: {
job: job.id,
worker: process.env.FLY_MACHINE_ID ?? worker.id,
},
}, async (span) => {
await Sentry.startSpan({
name: "Process scrape job",
op: "queue.process",
attributes: {
"messaging.message.id": job.id,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": job.data.sentry.size,
"messaging.message.receive.latency": Date.now() - (job.processedOn ?? job.timestamp),
"messaging.message.retry.count": job.attemptsMade,
}
}, async () => {
await processJobInternal(token, job);
const res = await processJobInternal(token, job);
if (res !== null) {
span.setStatus({ code: 2 }); // ERROR
} else {
span.setStatus({ code: 1 }); // OK
}
});
});
});
} else {
Sentry.startSpan({
name: "Scrape job",
attributes: {
job: job.id,
worker: process.env.FLY_MACHINE_ID ?? worker.id,
},
}, () => {
processJobInternal(token, job);
});
}
await sleep(gotJobInterval);
} else {
await sleep(connectionMonitorInterval);

View File

@ -10,8 +10,9 @@ if (process.env.SENTRY_DSN) {
integrations: [
nodeProfilingIntegration(),
],
tracesSampleRate: 0.045,
tracesSampleRate: process.env.SENTRY_ENVIRONMENT === "dev" ? 1.0 : 0.045,
profilesSampleRate: 1.0,
serverName: process.env.FLY_MACHINE_ID,
environment: process.env.SENTRY_ENVIRONMENT ?? "production",
});
}