bugfix for pdfs and logging pdf events, also added trycatchs for docx

This commit is contained in:
rafaelsideguide 2024-07-29 14:13:46 -03:00
parent 4c9d62f6d3
commit 49e3e64787
3 changed files with 129 additions and 30 deletions

View File

@ -20,6 +20,7 @@ import { getWebScraperQueue } from "../../../src/services/queue-service";
import { fetchAndProcessDocx } from "./utils/docxProcessor";
import { getAdjustedMaxDepth, getURLDepth } from "./utils/maxDepthUtils";
import { Logger } from "../../lib/logger";
import { ScrapeEvents } from "../../lib/scrape-events";
export class WebScraperDataProvider {
private jobId: string;
@ -316,10 +317,28 @@ export class WebScraperDataProvider {
private async fetchPdfDocuments(pdfLinks: string[]): Promise<Document[]> {
return Promise.all(
pdfLinks.map(async (pdfLink) => {
const timer = Date.now();
const logInsertPromise = ScrapeEvents.insert(this.jobId, {
type: "scrape",
url: pdfLink,
worker: process.env.FLY_MACHINE_ID,
method: "pdf-scrape",
result: null,
});
const { content, pageStatusCode, pageError } = await fetchAndProcessPdf(
pdfLink,
this.pageOptions.parsePDF
);
const insertedLogId = await logInsertPromise;
ScrapeEvents.updateScrapeResult(insertedLogId, {
response_size: content.length,
success: !(pageStatusCode && pageStatusCode >= 400) && !!content && (content.trim().length >= 100),
error: pageError,
response_code: pageStatusCode,
time_taken: Date.now() - timer,
});
return {
content: content,
metadata: { sourceURL: pdfLink, pageStatusCode, pageError },
@ -330,12 +349,32 @@ export class WebScraperDataProvider {
}
private async fetchDocxDocuments(docxLinks: string[]): Promise<Document[]> {
return Promise.all(
docxLinks.map(async (p) => {
const { content, pageStatusCode, pageError } =
await fetchAndProcessDocx(p);
docxLinks.map(async (docxLink) => {
const timer = Date.now();
const logInsertPromise = ScrapeEvents.insert(this.jobId, {
type: "scrape",
url: docxLink,
worker: process.env.FLY_MACHINE_ID,
method: "docx-scrape",
result: null,
});
const { content, pageStatusCode, pageError } = await fetchAndProcessDocx(
docxLink
);
const insertedLogId = await logInsertPromise;
ScrapeEvents.updateScrapeResult(insertedLogId, {
response_size: content.length,
success: !(pageStatusCode && pageStatusCode >= 400) && !!content && (content.trim().length >= 100),
error: pageError,
response_code: pageStatusCode,
time_taken: Date.now() - timer,
});
return {
content,
metadata: { sourceURL: p, pageStatusCode, pageError },
metadata: { sourceURL: docxLink, pageStatusCode, pageError },
provider: "web-scraper",
};
})

View File

@ -4,38 +4,76 @@ import { createWriteStream } from "node:fs";
import path from "path";
import os from "os";
import mammoth from "mammoth";
import { Logger } from "../../../lib/logger";
export async function fetchAndProcessDocx(url: string): Promise<{ content: string; pageStatusCode: number; pageError: string }> {
const { tempFilePath, pageStatusCode, pageError } = await downloadDocx(url);
const content = await processDocxToText(tempFilePath);
fs.unlinkSync(tempFilePath); // Clean up the temporary file
let tempFilePath = '';
let pageStatusCode = 200;
let pageError = '';
let content = '';
try {
const downloadResult = await downloadDocx(url);
tempFilePath = downloadResult.tempFilePath;
pageStatusCode = downloadResult.pageStatusCode;
pageError = downloadResult.pageError;
content = await processDocxToText(tempFilePath);
} catch (error) {
Logger.error(`Failed to fetch and process DOCX: ${error.message}`);
pageStatusCode = 500;
pageError = error.message;
content = '';
} finally {
if (tempFilePath) {
fs.unlinkSync(tempFilePath); // Clean up the temporary file
}
}
return { content, pageStatusCode, pageError };
}
async function downloadDocx(url: string): Promise<{ tempFilePath: string; pageStatusCode: number; pageError: string }> {
const response = await axios({
url,
method: "GET",
responseType: "stream",
});
try {
const response = await axios({
url,
method: "GET",
responseType: "stream",
});
const tempFilePath = path.join(os.tmpdir(), `tempDocx-${Date.now()}.docx`);
const writer = createWriteStream(tempFilePath);
const tempFilePath = path.join(os.tmpdir(), `tempDocx-${Date.now()}.docx`);
const writer = createWriteStream(tempFilePath);
response.data.pipe(writer);
response.data.pipe(writer);
return new Promise((resolve, reject) => {
writer.on("finish", () => resolve({ tempFilePath, pageStatusCode: response.status, pageError: response.statusText != "OK" ? response.statusText : undefined }));
writer.on("error", reject);
});
return new Promise((resolve, reject) => {
writer.on("finish", () => resolve({ tempFilePath, pageStatusCode: response.status, pageError: response.statusText != "OK" ? response.statusText : undefined }));
writer.on("error", () => {
Logger.error('Failed to write DOCX file to disk');
reject(new Error('Failed to write DOCX file to disk'));
});
});
} catch (error) {
Logger.error(`Failed to download DOCX: ${error.message}`);
return { tempFilePath: "", pageStatusCode: 500, pageError: error.message };
}
}
export async function processDocxToText(filePath: string): Promise<string> {
const content = await extractTextFromDocx(filePath);
return content;
try {
const content = await extractTextFromDocx(filePath);
return content;
} catch (error) {
Logger.error(`Failed to process DOCX to text: ${error.message}`);
return "";
}
}
async function extractTextFromDocx(filePath: string): Promise<string> {
const result = await mammoth.extractRawText({ path: filePath });
return result.value;
try {
const result = await mammoth.extractRawText({ path: filePath });
return result.value;
} catch (error) {
Logger.error(`Failed to extract text from DOCX: ${error.message}`);
return "";
}
}

View File

@ -76,7 +76,6 @@ export async function processPdfToText(filePath: string, parsePDF: boolean): Pro
let attempt = 0;
const maxAttempts = 10; // Maximum number of attempts
let resultAvailable = false;
while (attempt < maxAttempts && !resultAvailable) {
try {
resultResponse = await axios.get(resultUrl, { headers, timeout: (axiosTimeout * 2) });
@ -90,13 +89,22 @@ export async function processPdfToText(filePath: string, parsePDF: boolean): Pro
} catch (error) {
Logger.debug("Error fetching result w/ LlamaIndex");
attempt++;
if (attempt >= maxAttempts) {
Logger.error("Max attempts reached, unable to fetch result.");
break; // Exit the loop if max attempts are reached
}
await new Promise((resolve) => setTimeout(resolve, 500)); // Wait for 0.5 seconds before retrying
// You may want to handle specific errors differently
}
}
if (!resultAvailable) {
content = await processPdf(filePath);
try {
content = await processPdf(filePath);
} catch (error) {
Logger.error(`Failed to process PDF: ${error}`);
content = "";
}
}
content = resultResponse.data[resultType];
} catch (error) {
@ -104,15 +112,29 @@ export async function processPdfToText(filePath: string, parsePDF: boolean): Pro
content = await processPdf(filePath);
}
} else if (parsePDF) {
content = await processPdf(filePath);
try {
content = await processPdf(filePath);
} catch (error) {
Logger.error(`Failed to process PDF: ${error}`);
content = "";
}
} else {
content = fs.readFileSync(filePath, "utf-8");
try {
content = fs.readFileSync(filePath, "utf-8");
} catch (error) {
Logger.error(`Failed to read PDF file: ${error}`);
content = "";
}
}
return content;
}
async function processPdf(file: string) {
const fileContent = fs.readFileSync(file);
const data = await pdf(fileContent);
return data.text;
try {
const fileContent = fs.readFileSync(file);
const data = await pdf(fileContent);
return data.text;
} catch (error) {
throw error;
}
}