mirror of
https://github.com/mendableai/firecrawl.git
synced 2024-11-16 03:32:22 +08:00
Nick: admin router
This commit is contained in:
parent
a75d6889c7
commit
2014d9dd2e
87
apps/api/src/controllers/admin/queue.ts
Normal file
87
apps/api/src/controllers/admin/queue.ts
Normal file
|
@ -0,0 +1,87 @@
|
|||
import { Request, Response } from "express";
|
||||
|
||||
import { Job } from "bull";
|
||||
import { Logger } from "../../lib/logger";
|
||||
import { getWebScraperQueue } from "../../services/queue-service";
|
||||
import { checkAlerts } from "../../services/alerts";
|
||||
|
||||
export async function cleanBefore24hCompleteJobsController(
|
||||
req: Request,
|
||||
res: Response
|
||||
) {
|
||||
Logger.info("🐂 Cleaning jobs older than 24h");
|
||||
try {
|
||||
const webScraperQueue = getWebScraperQueue();
|
||||
const batchSize = 10;
|
||||
const numberOfBatches = 9; // Adjust based on your needs
|
||||
const completedJobsPromises: Promise<Job[]>[] = [];
|
||||
for (let i = 0; i < numberOfBatches; i++) {
|
||||
completedJobsPromises.push(
|
||||
webScraperQueue.getJobs(
|
||||
["completed"],
|
||||
i * batchSize,
|
||||
i * batchSize + batchSize,
|
||||
true
|
||||
)
|
||||
);
|
||||
}
|
||||
const completedJobs: Job[] = (
|
||||
await Promise.all(completedJobsPromises)
|
||||
).flat();
|
||||
const before24hJobs =
|
||||
completedJobs.filter(
|
||||
(job) => job.finishedOn < Date.now() - 24 * 60 * 60 * 1000
|
||||
) || [];
|
||||
|
||||
let count = 0;
|
||||
|
||||
if (!before24hJobs) {
|
||||
return res.status(200).send(`No jobs to remove.`);
|
||||
}
|
||||
|
||||
for (const job of before24hJobs) {
|
||||
try {
|
||||
await job.remove();
|
||||
count++;
|
||||
} catch (jobError) {
|
||||
Logger.error(`🐂 Failed to remove job with ID ${job.id}: ${jobError}`);
|
||||
}
|
||||
}
|
||||
return res.status(200).send(`Removed ${count} completed jobs.`);
|
||||
} catch (error) {
|
||||
Logger.error(`🐂 Failed to clean last 24h complete jobs: ${error}`);
|
||||
return res.status(500).send("Failed to clean jobs");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
export async function checkQueuesController(req: Request, res: Response) {
|
||||
try {
|
||||
await checkAlerts();
|
||||
return res.status(200).send("Alerts initialized");
|
||||
} catch (error) {
|
||||
Logger.debug(`Failed to initialize alerts: ${error}`);
|
||||
return res.status(500).send("Failed to initialize alerts");
|
||||
}
|
||||
}
|
||||
|
||||
// Use this as a "health check" that way we dont destroy the server
|
||||
export async function queuesController(req: Request, res: Response) {
|
||||
try {
|
||||
const webScraperQueue = getWebScraperQueue();
|
||||
|
||||
const [webScraperActive] = await Promise.all([
|
||||
webScraperQueue.getActiveCount(),
|
||||
]);
|
||||
|
||||
const noActiveJobs = webScraperActive === 0;
|
||||
// 200 if no active jobs, 503 if there are active jobs
|
||||
return res.status(noActiveJobs ? 200 : 500).json({
|
||||
webScraperActive,
|
||||
noActiveJobs,
|
||||
});
|
||||
} catch (error) {
|
||||
Logger.error(error);
|
||||
return res.status(500).json({ error: error.message });
|
||||
}
|
||||
}
|
86
apps/api/src/controllers/admin/redis-health.ts
Normal file
86
apps/api/src/controllers/admin/redis-health.ts
Normal file
|
@ -0,0 +1,86 @@
|
|||
import { Request, Response } from "express";
|
||||
import Redis from "ioredis";
|
||||
import { Logger } from "../../lib/logger";
|
||||
import { sendSlackWebhook } from "../../services/alerts/slack";
|
||||
import { redisRateLimitClient } from "../../services/rate-limiter";
|
||||
|
||||
export async function redisHealthController(req: Request, res: Response) {
|
||||
const retryOperation = async (operation, retries = 3) => {
|
||||
for (let attempt = 1; attempt <= retries; attempt++) {
|
||||
try {
|
||||
return await operation();
|
||||
} catch (error) {
|
||||
if (attempt === retries) throw error;
|
||||
Logger.warn(`Attempt ${attempt} failed: ${error.message}. Retrying...`);
|
||||
await new Promise((resolve) => setTimeout(resolve, 2000)); // Wait 2 seconds before retrying
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
const queueRedis = new Redis(process.env.REDIS_URL);
|
||||
|
||||
const testKey = "test";
|
||||
const testValue = "test";
|
||||
|
||||
// Test queueRedis
|
||||
let queueRedisHealth;
|
||||
try {
|
||||
await retryOperation(() => queueRedis.set(testKey, testValue));
|
||||
queueRedisHealth = await retryOperation(() => queueRedis.get(testKey));
|
||||
await retryOperation(() => queueRedis.del(testKey));
|
||||
} catch (error) {
|
||||
Logger.error(`queueRedis health check failed: ${error}`);
|
||||
queueRedisHealth = null;
|
||||
}
|
||||
|
||||
// Test redisRateLimitClient
|
||||
let redisRateLimitHealth;
|
||||
try {
|
||||
await retryOperation(() => redisRateLimitClient.set(testKey, testValue));
|
||||
redisRateLimitHealth = await retryOperation(() =>
|
||||
redisRateLimitClient.get(testKey)
|
||||
);
|
||||
await retryOperation(() => redisRateLimitClient.del(testKey));
|
||||
} catch (error) {
|
||||
Logger.error(`redisRateLimitClient health check failed: ${error}`);
|
||||
redisRateLimitHealth = null;
|
||||
}
|
||||
|
||||
const healthStatus = {
|
||||
queueRedis: queueRedisHealth === testValue ? "healthy" : "unhealthy",
|
||||
redisRateLimitClient:
|
||||
redisRateLimitHealth === testValue ? "healthy" : "unhealthy",
|
||||
};
|
||||
|
||||
if (
|
||||
healthStatus.queueRedis === "healthy" &&
|
||||
healthStatus.redisRateLimitClient === "healthy"
|
||||
) {
|
||||
Logger.info("Both Redis instances are healthy");
|
||||
return res.status(200).json({ status: "healthy", details: healthStatus });
|
||||
} else {
|
||||
Logger.info(
|
||||
`Redis instances health check: ${JSON.stringify(healthStatus)}`
|
||||
);
|
||||
await sendSlackWebhook(
|
||||
`[REDIS DOWN] Redis instances health check: ${JSON.stringify(
|
||||
healthStatus
|
||||
)}`,
|
||||
true
|
||||
);
|
||||
return res
|
||||
.status(500)
|
||||
.json({ status: "unhealthy", details: healthStatus });
|
||||
}
|
||||
} catch (error) {
|
||||
Logger.error(`Redis health check failed: ${error}`);
|
||||
await sendSlackWebhook(
|
||||
`[REDIS DOWN] Redis instances health check: ${error.message}`,
|
||||
true
|
||||
);
|
||||
return res
|
||||
.status(500)
|
||||
.json({ status: "unhealthy", message: error.message });
|
||||
}
|
||||
}
|
|
@ -7,12 +7,8 @@ import { v0Router } from "./routes/v0";
|
|||
import { initSDK } from "@hyperdx/node-opentelemetry";
|
||||
import cluster from "cluster";
|
||||
import os from "os";
|
||||
import { Job } from "bull";
|
||||
import { sendSlackWebhook } from "./services/alerts/slack";
|
||||
import { checkAlerts } from "./services/alerts";
|
||||
import Redis from "ioredis";
|
||||
import { redisRateLimitClient } from "./services/rate-limiter";
|
||||
import { Logger } from "./lib/logger";
|
||||
import { adminRouter } from "./routes/admin";
|
||||
|
||||
const { createBullBoard } = require("@bull-board/api");
|
||||
const { BullAdapter } = require("@bull-board/api/bullAdapter");
|
||||
|
@ -46,7 +42,6 @@ if (cluster.isMaster) {
|
|||
|
||||
app.use(cors()); // Add this line to enable CORS
|
||||
|
||||
|
||||
const serverAdapter = new ExpressAdapter();
|
||||
serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`);
|
||||
|
||||
|
@ -71,6 +66,7 @@ if (cluster.isMaster) {
|
|||
|
||||
// register router
|
||||
app.use(v0Router);
|
||||
app.use(adminRouter);
|
||||
|
||||
const DEFAULT_PORT = process.env.PORT ?? 3002;
|
||||
const HOST = process.env.HOST ?? "localhost";
|
||||
|
@ -94,27 +90,6 @@ if (cluster.isMaster) {
|
|||
startServer();
|
||||
}
|
||||
|
||||
// Use this as a "health check" that way we dont destroy the server
|
||||
app.get(`/admin/${process.env.BULL_AUTH_KEY}/queues`, async (req, res) => {
|
||||
try {
|
||||
const webScraperQueue = getWebScraperQueue();
|
||||
|
||||
const [webScraperActive] = await Promise.all([
|
||||
webScraperQueue.getActiveCount(),
|
||||
]);
|
||||
|
||||
const noActiveJobs = webScraperActive === 0;
|
||||
// 200 if no active jobs, 503 if there are active jobs
|
||||
return res.status(noActiveJobs ? 200 : 500).json({
|
||||
webScraperActive,
|
||||
noActiveJobs,
|
||||
});
|
||||
} catch (error) {
|
||||
Logger.error(error);
|
||||
return res.status(500).json({ error: error.message });
|
||||
}
|
||||
});
|
||||
|
||||
app.get(`/serverHealthCheck`, async (req, res) => {
|
||||
try {
|
||||
const webScraperQueue = getWebScraperQueue();
|
||||
|
@ -187,141 +162,9 @@ if (cluster.isMaster) {
|
|||
}
|
||||
});
|
||||
|
||||
app.get(
|
||||
`/admin/${process.env.BULL_AUTH_KEY}/check-queues`,
|
||||
async (req, res) => {
|
||||
try {
|
||||
await checkAlerts();
|
||||
return res.status(200).send("Alerts initialized");
|
||||
} catch (error) {
|
||||
Logger.debug(`Failed to initialize alerts: ${error}`);
|
||||
return res.status(500).send("Failed to initialize alerts");
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
app.get(
|
||||
`/admin/${process.env.BULL_AUTH_KEY}/clean-before-24h-complete-jobs`,
|
||||
async (req, res) => {
|
||||
Logger.info("🐂 Cleaning jobs older than 24h");
|
||||
try {
|
||||
const webScraperQueue = getWebScraperQueue();
|
||||
const batchSize = 10;
|
||||
const numberOfBatches = 9; // Adjust based on your needs
|
||||
const completedJobsPromises: Promise<Job[]>[] = [];
|
||||
for (let i = 0; i < numberOfBatches; i++) {
|
||||
completedJobsPromises.push(
|
||||
webScraperQueue.getJobs(
|
||||
["completed"],
|
||||
i * batchSize,
|
||||
i * batchSize + batchSize,
|
||||
true
|
||||
)
|
||||
);
|
||||
}
|
||||
const completedJobs: Job[] = (
|
||||
await Promise.all(completedJobsPromises)
|
||||
).flat();
|
||||
const before24hJobs =
|
||||
completedJobs.filter(
|
||||
(job) => job.finishedOn < Date.now() - 24 * 60 * 60 * 1000
|
||||
) || [];
|
||||
|
||||
let count = 0;
|
||||
|
||||
if (!before24hJobs) {
|
||||
return res.status(200).send(`No jobs to remove.`);
|
||||
}
|
||||
|
||||
for (const job of before24hJobs) {
|
||||
try {
|
||||
await job.remove();
|
||||
count++;
|
||||
} catch (jobError) {
|
||||
Logger.error(`🐂 Failed to remove job with ID ${job.id}: ${jobError}` );
|
||||
}
|
||||
}
|
||||
return res.status(200).send(`Removed ${count} completed jobs.`);
|
||||
} catch (error) {
|
||||
Logger.error(`🐂 Failed to clean last 24h complete jobs: ${error}`);
|
||||
return res.status(500).send("Failed to clean jobs");
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
app.get("/is-production", (req, res) => {
|
||||
res.send({ isProduction: global.isProduction });
|
||||
});
|
||||
|
||||
app.get(
|
||||
`/admin/${process.env.BULL_AUTH_KEY}/redis-health`,
|
||||
async (req, res) => {
|
||||
try {
|
||||
const queueRedis = new Redis(process.env.REDIS_URL);
|
||||
|
||||
const testKey = "test";
|
||||
const testValue = "test";
|
||||
|
||||
// Test queueRedis
|
||||
let queueRedisHealth;
|
||||
try {
|
||||
await queueRedis.set(testKey, testValue);
|
||||
queueRedisHealth = await queueRedis.get(testKey);
|
||||
await queueRedis.del(testKey);
|
||||
} catch (error) {
|
||||
Logger.error(`queueRedis health check failed: ${error}`);
|
||||
queueRedisHealth = null;
|
||||
}
|
||||
|
||||
// Test redisRateLimitClient
|
||||
let redisRateLimitHealth;
|
||||
try {
|
||||
await redisRateLimitClient.set(testKey, testValue);
|
||||
redisRateLimitHealth = await redisRateLimitClient.get(testKey);
|
||||
await redisRateLimitClient.del(testKey);
|
||||
} catch (error) {
|
||||
Logger.error(`redisRateLimitClient health check failed: ${error}`);
|
||||
redisRateLimitHealth = null;
|
||||
}
|
||||
|
||||
const healthStatus = {
|
||||
queueRedis: queueRedisHealth === testValue ? "healthy" : "unhealthy",
|
||||
redisRateLimitClient:
|
||||
redisRateLimitHealth === testValue ? "healthy" : "unhealthy",
|
||||
};
|
||||
|
||||
if (
|
||||
healthStatus.queueRedis === "healthy" &&
|
||||
healthStatus.redisRateLimitClient === "healthy"
|
||||
) {
|
||||
Logger.info("Both Redis instances are healthy");
|
||||
return res
|
||||
.status(200)
|
||||
.json({ status: "healthy", details: healthStatus });
|
||||
} else {
|
||||
Logger.info(`Redis instances health check: ${JSON.stringify(healthStatus)}`);
|
||||
await sendSlackWebhook(
|
||||
`[REDIS DOWN] Redis instances health check: ${JSON.stringify(
|
||||
healthStatus
|
||||
)}`,
|
||||
true
|
||||
);
|
||||
return res
|
||||
.status(500)
|
||||
.json({ status: "unhealthy", details: healthStatus });
|
||||
}
|
||||
} catch (error) {
|
||||
Logger.error(`Redis health check failed: ${error}`);
|
||||
await sendSlackWebhook(
|
||||
`[REDIS DOWN] Redis instances health check: ${error.message}`,
|
||||
true
|
||||
);
|
||||
return res
|
||||
.status(500)
|
||||
.json({ status: "unhealthy", message: error.message });
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
Logger.info(`Worker ${process.pid} started`);
|
||||
}
|
||||
|
|
29
apps/api/src/routes/admin.ts
Normal file
29
apps/api/src/routes/admin.ts
Normal file
|
@ -0,0 +1,29 @@
|
|||
import express from "express";
|
||||
import { redisHealthController } from "../controllers/admin/redis-health";
|
||||
import {
|
||||
checkQueuesController,
|
||||
cleanBefore24hCompleteJobsController,
|
||||
queuesController,
|
||||
} from "../controllers/admin/queue";
|
||||
|
||||
export const adminRouter = express.Router();
|
||||
|
||||
adminRouter.post(
|
||||
`/admin/${process.env.BULL_AUTH_KEY}/redis-health`,
|
||||
redisHealthController
|
||||
);
|
||||
|
||||
adminRouter.post(
|
||||
`/admin/${process.env.BULL_AUTH_KEY}/clean-before-24h-complete-jobs`,
|
||||
cleanBefore24hCompleteJobsController
|
||||
);
|
||||
|
||||
adminRouter.post(
|
||||
`/admin/${process.env.BULL_AUTH_KEY}/check-queues`,
|
||||
checkQueuesController
|
||||
);
|
||||
|
||||
adminRouter.post(
|
||||
`/admin/${process.env.BULL_AUTH_KEY}/queues`,
|
||||
queuesController
|
||||
);
|
Loading…
Reference in New Issue
Block a user