feat: bulk scrape

This commit is contained in:
Gergő Móricz 2024-10-17 19:40:18 +02:00
parent 081d7407b3
commit 03b37998fd
6 changed files with 140 additions and 17 deletions

View File

@ -0,0 +1,99 @@
import { Response } from "express";
import { v4 as uuidv4 } from "uuid";
import {
BulkScrapeRequest,
bulkScrapeRequestSchema,
CrawlResponse,
legacyScrapeOptions,
RequestWithAuth,
} from "./types";
import {
addCrawlJobs,
lockURLs,
saveCrawl,
StoredCrawl,
} from "../../lib/crawl-redis";
import { logCrawl } from "../../services/logging/crawl_log";
import { getScrapeQueue } from "../../services/queue-service";
import { getJobPriority } from "../../lib/job-priority";
export async function bulkScrapeController(
req: RequestWithAuth<{}, CrawlResponse, BulkScrapeRequest>,
res: Response<CrawlResponse>
) {
req.body = bulkScrapeRequestSchema.parse(req.body);
const id = uuidv4();
await logCrawl(id, req.auth.team_id);
let { remainingCredits } = req.account;
const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === 'true';
if(!useDbAuthentication){
remainingCredits = Infinity;
}
const pageOptions = legacyScrapeOptions(req.body);
const sc: StoredCrawl = {
crawlerOptions: null,
pageOptions,
team_id: req.auth.team_id,
createdAt: Date.now(),
plan: req.auth.plan,
};
await saveCrawl(id, sc);
let jobPriority = 20;
// If it is over 1000, we need to get the job priority,
// otherwise we can use the default priority of 20
if(req.body.urls.length > 1000){
// set base to 21
jobPriority = await getJobPriority({plan: req.auth.plan, team_id: req.auth.team_id, basePriority: 21})
}
const jobs = req.body.urls.map((x) => {
const uuid = uuidv4();
return {
name: uuid,
data: {
url: x,
mode: "single_urls",
team_id: req.auth.team_id,
plan: req.auth.plan,
crawlerOptions: null,
pageOptions,
origin: "api",
crawl_id: id,
sitemapped: true,
v1: true,
},
opts: {
jobId: uuid,
priority: 20,
},
};
});
await lockURLs(
id,
jobs.map((x) => x.data.url)
);
await addCrawlJobs(
id,
jobs.map((x) => x.opts.jobId)
);
await getScrapeQueue().addBulk(jobs);
const protocol = process.env.ENV === "local" ? req.protocol : "https";
return res.status(200).json({
success: true,
id,
url: `${protocol}://${req.get("host")}/v1/bulk/scrape/${id}`,
});
}

View File

@ -141,19 +141,29 @@ export const scrapeRequestSchema = scrapeOptions.extend({
return obj;
});
// export type ScrapeRequest = {
// url: string;
// formats?: Format[];
// headers?: { [K: string]: string };
// includeTags?: string[];
// excludeTags?: string[];
// onlyMainContent?: boolean;
// timeout?: number;
// waitFor?: number;
// }
export type ScrapeRequest = z.infer<typeof scrapeRequestSchema>;
export const bulkScrapeRequestSchema = scrapeOptions.extend({
urls: url.array(),
origin: z.string().optional().default("api"),
}).strict(strictMessage).refine(
(obj) => {
const hasExtractFormat = obj.formats?.includes("extract");
const hasExtractOptions = obj.extract !== undefined;
return (hasExtractFormat && hasExtractOptions) || (!hasExtractFormat && !hasExtractOptions);
},
{
message: "When 'extract' format is specified, 'extract' options must be provided, and vice versa",
}
).transform((obj) => {
if ((obj.formats?.includes("extract") || obj.extract) && !obj.timeout) {
return { ...obj, timeout: 60000 };
}
return obj;
});
export type BulkScrapeRequest = z.infer<typeof bulkScrapeRequestSchema>;
const crawlerOptions = z.object({
includePaths: z.string().array().default([]),
excludePaths: z.string().array().default([]),

View File

@ -3,7 +3,7 @@ import { redisConnection } from "../services/queue-service";
import { Logger } from "./logger";
export type StoredCrawl = {
originUrl: string;
originUrl?: string;
crawlerOptions: any;
pageOptions: any;
team_id: string;

View File

@ -112,7 +112,7 @@ export async function runWebScraper({
}
// remove docs with empty content
const filteredDocs = crawlerOptions.returnOnlyUrls
const filteredDocs = crawlerOptions?.returnOnlyUrls
? docs.map((doc) => {
if (doc.metadata.sourceURL) {
return { url: doc.metadata.sourceURL };

View File

@ -17,6 +17,7 @@ import { crawlCancelController } from "../controllers/v1/crawl-cancel";
import { Logger } from "../lib/logger";
import { scrapeStatusController } from "../controllers/v1/scrape-status";
import { concurrencyCheckController } from "../controllers/v1/concurrency-check";
import { bulkScrapeController } from "../controllers/v1/bulk-scrape";
// import { crawlPreviewController } from "../../src/controllers/v1/crawlPreview";
// import { crawlJobStatusPreviewController } from "../../src/controllers/v1/status";
// import { searchController } from "../../src/controllers/v1/search";
@ -122,6 +123,15 @@ v1Router.post(
wrap(crawlController)
);
v1Router.post(
"/bulk/scrape",
authMiddleware(RateLimiterMode.Crawl),
checkCreditsMiddleware(),
blocklistMiddleware,
idempotencyMiddleware,
wrap(bulkScrapeController)
);
v1Router.post(
"/map",
authMiddleware(RateLimiterMode.Map),
@ -136,6 +146,12 @@ v1Router.get(
wrap(crawlStatusController)
);
v1Router.get(
"/bulk/scrape/:jobId",
authMiddleware(RateLimiterMode.CrawlStatus),
wrap(crawlStatusController)
);
v1Router.get(
"/scrape/:jobId",
wrap(scrapeStatusController)

View File

@ -365,7 +365,7 @@ async function processJob(job: Job, token: string) {
const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl;
if (!job.data.sitemapped) {
if (!job.data.sitemapped && job.data.crawlerOptions !== null) {
if (!sc.cancelled) {
const crawler = crawlToCrawler(job.data.crawl_id, sc);
@ -414,9 +414,7 @@ async function processJob(job: Job, token: string) {
}
}
if (await finishCrawl(job.data.crawl_id)) {
if (await finishCrawl(job.data.crawl_id) && job.data.crawlerOptions !== null) {
if (!job.data.v1) {
const jobIDs = await getCrawlJobs(job.data.crawl_id);