feat(ScrapeEvents): log queue events

This commit is contained in:
Gergo Moricz 2024-07-24 18:44:14 +02:00
parent 4d35ad073c
commit 60c74357df
3 changed files with 29 additions and 1 deletions

View File

@ -13,6 +13,7 @@ import { checkAlerts } from "./services/alerts";
import Redis from "ioredis";
import { redisRateLimitClient } from "./services/rate-limiter";
import { Logger } from "./lib/logger";
import { ScrapeEvents } from "./lib/scrape-events";
const { createBullBoard } = require("@bull-board/api");
const { BullAdapter } = require("@bull-board/api/bullAdapter");
@ -325,3 +326,12 @@ if (cluster.isMaster) {
Logger.info(`Worker ${process.pid} started`);
}
const wsq = getWebScraperQueue();
wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));
wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active"));
wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed"));
wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));

View File

@ -1,3 +1,4 @@
import { Job, JobId } from "bull";
import type { baseScrapers } from "../scraper/WebScraper/single_url";
import { supabase_service as supabase } from "../services/supabase";
@ -24,7 +25,7 @@ export type ScrapeScrapeEvent = {
export type ScrapeQueueEvent = {
type: "queue",
event: "created" | "started" | "interrupted" | "finished",
event: "waiting" | "active" | "completed" | "paused" | "resumed" | "removed",
worker?: string,
}
@ -58,4 +59,12 @@ export class ScrapeEvents {
}
}).eq("id", logId);
}
static async logJobEvent(job: Job | JobId, event: ScrapeQueueEvent["event"]) {
await this.insert(((job as any).id ? (job as any).id : job) as string, {
type: "queue",
event,
worker: process.env.FLY_MACHINE_ID,
});
}
}

View File

@ -8,6 +8,7 @@ import { logJob } from "./logging/log_job";
import { initSDK } from '@hyperdx/node-opentelemetry';
import { Job } from "bull";
import { Logger } from "../lib/logger";
import { ScrapeEvents } from "../lib/scrape-events";
if (process.env.ENV === 'production') {
initSDK({
@ -20,6 +21,7 @@ const wsq = getWebScraperQueue();
async function processJob(job: Job, done) {
Logger.debug(`🐂 Worker taking job ${job.id}`);
try {
job.progress({
current: 1,
@ -114,3 +116,10 @@ wsq.process(
Math.floor(Number(process.env.NUM_WORKERS_PER_QUEUE ?? 8)),
processJob
);
wsq.on("waiting", j => ScrapeEvents.logJobEvent(j, "waiting"));
wsq.on("active", j => ScrapeEvents.logJobEvent(j, "active"));
wsq.on("completed", j => ScrapeEvents.logJobEvent(j, "completed"));
wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));