From 6d48dbcd38a5a8173b6917a38338bf296dfc23e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Thu, 22 Aug 2024 16:47:38 +0200 Subject: [PATCH] feat(sentry): add trace continuity for queue --- apps/api/src/controllers/crawl.ts | 9 +++- apps/api/src/controllers/scrape.ts | 6 +-- apps/api/src/controllers/status.ts | 2 - apps/api/src/main/runWebScraper.ts | 1 - apps/api/src/scraper/WebScraper/index.ts | 1 - apps/api/src/services/queue-jobs.ts | 39 ++++++++++++++-- apps/api/src/services/queue-worker.ts | 59 +++++++++++++++++++----- apps/api/src/services/sentry.ts | 3 +- 8 files changed, 95 insertions(+), 25 deletions(-) diff --git a/apps/api/src/controllers/crawl.ts b/apps/api/src/controllers/crawl.ts index c299dc01..c5f440e2 100644 --- a/apps/api/src/controllers/crawl.ts +++ b/apps/api/src/controllers/crawl.ts @@ -194,7 +194,14 @@ export async function crawlController(req: Request, res: Response) { id, jobs.map((x) => x.opts.jobId) ); - await getScrapeQueue().addBulk(jobs); + if (Sentry.isInitialized()) { + for (const job of jobs) { + // add with sentry instrumentation + await addScrapeJob(job.data as any, {}, job.opts.jobId); + } + } else { + await getScrapeQueue().addBulk(jobs); + } } else { await lockURL(id, sc, url); const job = await addScrapeJob( diff --git a/apps/api/src/controllers/scrape.ts b/apps/api/src/controllers/scrape.ts index b0004276..3666fc1a 100644 --- a/apps/api/src/controllers/scrape.ts +++ b/apps/api/src/controllers/scrape.ts @@ -50,12 +50,12 @@ export async function scrapeHelper( let doc; - const err = await Sentry.startSpanManual({ name: "Wait for job to finish", op: "bullmq.wait", attributes: { job: jobId } }, async (span) => { + const err = await Sentry.startSpan({ name: "Wait for job to finish", op: "bullmq.wait", attributes: { job: jobId } }, async (span) => { try { doc = (await job.waitUntilFinished(scrapeQueueEvents, timeout))[0] } catch (e) { if (e instanceof Error && e.message.startsWith("Job wait")) { - span.setAttribute("timedOut", true).end(); + span.setAttribute("timedOut", true); return { success: false, error: "Request timed out", @@ -65,7 +65,7 @@ export async function scrapeHelper( throw e; } } - span.setAttribute("result", JSON.stringify(doc)).end(); + span.setAttribute("result", JSON.stringify(doc)); return null; }); diff --git a/apps/api/src/controllers/status.ts b/apps/api/src/controllers/status.ts index c3ca906f..362f1f24 100644 --- a/apps/api/src/controllers/status.ts +++ b/apps/api/src/controllers/status.ts @@ -1,8 +1,6 @@ import { Request, Response } from "express"; import { Logger } from "../../src/lib/logger"; import { getCrawl, getCrawlJobs } from "../../src/lib/crawl-redis"; -import { getScrapeQueue } from "../../src/services/queue-service"; -import { supabaseGetJobById } from "../../src/lib/supabase-jobs"; import { getJobs } from "./crawl-status"; import * as Sentry from "@sentry/node"; diff --git a/apps/api/src/main/runWebScraper.ts b/apps/api/src/main/runWebScraper.ts index 2be05bd5..aea7876e 100644 --- a/apps/api/src/main/runWebScraper.ts +++ b/apps/api/src/main/runWebScraper.ts @@ -12,7 +12,6 @@ import { Document } from "../lib/entities"; import { supabase_service } from "../services/supabase"; import { Logger } from "../lib/logger"; import { ScrapeEvents } from "../lib/scrape-events"; -import { getScrapeQueue } from "../services/queue-service"; export async function startWebScraperPipeline({ job, diff --git a/apps/api/src/scraper/WebScraper/index.ts b/apps/api/src/scraper/WebScraper/index.ts index 65247df1..38d0cc32 100644 --- a/apps/api/src/scraper/WebScraper/index.ts +++ b/apps/api/src/scraper/WebScraper/index.ts @@ -16,7 +16,6 @@ import { replacePathsWithAbsolutePaths, } from "./utils/replacePaths"; import { generateCompletions } from "../../lib/LLM-extraction"; -import { getScrapeQueue } from "../../../src/services/queue-service"; import { fetchAndProcessDocx } from "./utils/docxProcessor"; import { getAdjustedMaxDepth, getURLDepth } from "./utils/maxDepthUtils"; import { Logger } from "../../lib/logger"; diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 3099da68..33997890 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -2,11 +2,12 @@ import { Job, Queue } from "bullmq"; import { getScrapeQueue } from "./queue-service"; import { v4 as uuidv4 } from "uuid"; import { WebScraperOptions } from "../types"; +import * as Sentry from "@sentry/node"; -export async function addScrapeJob( - webScraperOptions: WebScraperOptions, - options: any = {}, - jobId: string = uuidv4(), +async function addScrapeJobRaw( + webScraperOptions: any, + options: any, + jobId: string, ): Promise { return await getScrapeQueue().add(jobId, webScraperOptions, { priority: webScraperOptions.crawl_id ? 20 : 10, @@ -15,3 +16,33 @@ export async function addScrapeJob( }); } +export async function addScrapeJob( + webScraperOptions: WebScraperOptions, + options: any = {}, + jobId: string = uuidv4(), +): Promise { + if (Sentry.isInitialized()) { + const size = JSON.stringify(webScraperOptions).length; + return await Sentry.startSpan({ + name: "Add scrape job", + op: "queue.publish", + attributes: { + "messaging.message.id": jobId, + "messaging.destination.name": getScrapeQueue().name, + "messaging.message.body.size": size, + }, + }, async (span) => { + return await addScrapeJobRaw({ + ...webScraperOptions, + sentry: { + trace: Sentry.spanToTraceHeader(span), + baggage: Sentry.spanToBaggageHeader(span), + size, + }, + }, options, jobId); + }); + } else { + return await addScrapeJobRaw(webScraperOptions, options, jobId); + } +} + diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 2086d0a6..a7d20383 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -50,6 +50,7 @@ const processJobInternal = async (token: string, job: Job) => { await job.extendLock(token, jobLockExtensionTime); }, jobLockExtendInterval); + let err = null; try { const result = await processJob(job, token); try{ @@ -62,11 +63,14 @@ const processJobInternal = async (token: string, job: Job) => { } } catch (error) { console.log("Job failed, error:", error); - + Sentry.captureException(error); + err = error; await job.moveToFailed(error, token, false); } finally { clearInterval(extendLockInterval); } + + return err; }; let isShuttingDown = false; @@ -76,7 +80,7 @@ process.on("SIGINT", () => { isShuttingDown = true; }); -const workerFun = async (queueName: string, processJobInternal: (token: string, job: Job) => Promise) => { +const workerFun = async (queueName: string, processJobInternal: (token: string, job: Job) => Promise) => { const worker = new Worker(queueName, null, { connection: redisConnection, lockDuration: 1 * 60 * 1000, // 1 minute @@ -104,16 +108,47 @@ const workerFun = async (queueName: string, processJobInternal: (token: string, const job = await worker.getNextJob(token); if (job) { - Sentry.startSpan({ - name: "Scrape job", - op: "bullmq.job", - attributes: { - job: job.id, - worker: process.env.FLY_MACHINE_ID ?? worker.id, - }, - }, async () => { - await processJobInternal(token, job); - }); + if (job.data && job.data.sentry && Sentry.isInitialized()) { + Sentry.continueTrace({ sentryTrace: job.data.sentry.trace, baggage: job.data.sentry.baggage }, () => { + Sentry.startSpan({ + name: "Scrape job", + attributes: { + job: job.id, + worker: process.env.FLY_MACHINE_ID ?? worker.id, + }, + }, async (span) => { + await Sentry.startSpan({ + name: "Process scrape job", + op: "queue.process", + attributes: { + "messaging.message.id": job.id, + "messaging.destination.name": getScrapeQueue().name, + "messaging.message.body.size": job.data.sentry.size, + "messaging.message.receive.latency": Date.now() - (job.processedOn ?? job.timestamp), + "messaging.message.retry.count": job.attemptsMade, + } + }, async () => { + const res = await processJobInternal(token, job); + if (res !== null) { + span.setStatus({ code: 2 }); // ERROR + } else { + span.setStatus({ code: 1 }); // OK + } + }); + }); + }); + } else { + Sentry.startSpan({ + name: "Scrape job", + attributes: { + job: job.id, + worker: process.env.FLY_MACHINE_ID ?? worker.id, + }, + }, () => { + processJobInternal(token, job); + }); + } + await sleep(gotJobInterval); } else { await sleep(connectionMonitorInterval); diff --git a/apps/api/src/services/sentry.ts b/apps/api/src/services/sentry.ts index 1292773a..176d3d4b 100644 --- a/apps/api/src/services/sentry.ts +++ b/apps/api/src/services/sentry.ts @@ -10,8 +10,9 @@ if (process.env.SENTRY_DSN) { integrations: [ nodeProfilingIntegration(), ], - tracesSampleRate: 0.045, + tracesSampleRate: process.env.SENTRY_ENVIRONMENT === "dev" ? 1.0 : 0.045, profilesSampleRate: 1.0, serverName: process.env.FLY_MACHINE_ID, + environment: process.env.SENTRY_ENVIRONMENT ?? "production", }); }