Merge pull request #484 from mendableai/nsc/fix-scaling-fix-email
Some checks are pending
Fly Deploy / Pre-deploy checks (push) Waiting to run
Fly Deploy / Test Suite (push) Blocked by required conditions
Fly Deploy / Python SDK Tests (push) Blocked by required conditions
Fly Deploy / JavaScript SDK Tests (push) Blocked by required conditions
Fly Deploy / Deploy app (push) Blocked by required conditions
Fly Deploy / Build and publish Python SDK (push) Blocked by required conditions
Fly Deploy / Build and publish JavaScript SDK (push) Blocked by required conditions

Fix parallelization issues with caching
This commit is contained in:
Nicolas 2024-07-30 19:21:00 -04:00 committed by GitHub
commit 4ab7fb007a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 82 additions and 24 deletions

View File

@ -94,6 +94,7 @@
"promptable": "^0.0.10", "promptable": "^0.0.10",
"puppeteer": "^22.12.1", "puppeteer": "^22.12.1",
"rate-limiter-flexible": "2.4.2", "rate-limiter-flexible": "2.4.2",
"redlock": "5.0.0-beta.2",
"resend": "^3.4.0", "resend": "^3.4.0",
"robots-parser": "^3.0.1", "robots-parser": "^3.0.1",
"scrapingbee": "^1.7.4", "scrapingbee": "^1.7.4",

View File

@ -149,6 +149,9 @@ importers:
rate-limiter-flexible: rate-limiter-flexible:
specifier: 2.4.2 specifier: 2.4.2
version: 2.4.2 version: 2.4.2
redlock:
specifier: 5.0.0-beta.2
version: 5.0.0-beta.2
resend: resend:
specifier: ^3.4.0 specifier: ^3.4.0
version: 3.4.0 version: 3.4.0
@ -3533,6 +3536,9 @@ packages:
resolution: {integrity: sha512-dBpDMdxv9Irdq66304OLfEmQ9tbNRFnFTuZiLo+bD+r332bBmMJ8GBLXklIXXgxd3+v9+KUnZaUR5PJMa75Gsg==} resolution: {integrity: sha512-dBpDMdxv9Irdq66304OLfEmQ9tbNRFnFTuZiLo+bD+r332bBmMJ8GBLXklIXXgxd3+v9+KUnZaUR5PJMa75Gsg==}
engines: {node: '>= 0.4.0'} engines: {node: '>= 0.4.0'}
node-abort-controller@3.1.1:
resolution: {integrity: sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==}
node-domexception@1.0.0: node-domexception@1.0.0:
resolution: {integrity: sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ==} resolution: {integrity: sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ==}
engines: {node: '>=10.5.0'} engines: {node: '>=10.5.0'}
@ -3956,6 +3962,10 @@ packages:
redis@4.6.14: redis@4.6.14:
resolution: {integrity: sha512-GrNg/e33HtsQwNXL7kJT+iNFPSwE1IPmd7wzV3j4f2z0EYxZfZE7FVTmUysgAtqQQtg5NXF5SNLR9OdO/UHOfw==} resolution: {integrity: sha512-GrNg/e33HtsQwNXL7kJT+iNFPSwE1IPmd7wzV3j4f2z0EYxZfZE7FVTmUysgAtqQQtg5NXF5SNLR9OdO/UHOfw==}
redlock@5.0.0-beta.2:
resolution: {integrity: sha512-2RDWXg5jgRptDrB1w9O/JgSZC0j7y4SlaXnor93H/UJm/QyDiFgBKNtrh0TI6oCXqYSaSoXxFh6Sd3VtYfhRXw==}
engines: {node: '>=12'}
regenerator-runtime@0.14.1: regenerator-runtime@0.14.1:
resolution: {integrity: sha512-dYnhHh0nJoMfnkZs6GmmhFknAGRrLznOu5nc9ML+EJxGvrx6H7teuevqVqCuPcPK//3eDrrjQhehXVx9cnkGdw==} resolution: {integrity: sha512-dYnhHh0nJoMfnkZs6GmmhFknAGRrLznOu5nc9ML+EJxGvrx6H7teuevqVqCuPcPK//3eDrrjQhehXVx9cnkGdw==}
@ -8605,6 +8615,8 @@ snapshots:
netmask@2.0.2: {} netmask@2.0.2: {}
node-abort-controller@3.1.1: {}
node-domexception@1.0.0: {} node-domexception@1.0.0: {}
node-ensure@0.0.0: {} node-ensure@0.0.0: {}
@ -9108,6 +9120,10 @@ snapshots:
'@redis/search': 1.1.6(@redis/client@1.5.16) '@redis/search': 1.1.6(@redis/client@1.5.16)
'@redis/time-series': 1.0.5(@redis/client@1.5.16) '@redis/time-series': 1.0.5(@redis/client@1.5.16)
redlock@5.0.0-beta.2:
dependencies:
node-abort-controller: 3.1.1
regenerator-runtime@0.14.1: {} regenerator-runtime@0.14.1: {}
require-directory@2.1.1: {} require-directory@2.1.1: {}

View File

@ -118,18 +118,12 @@ export async function scrapeController(req: Request, res: Response) {
} catch (error) { } catch (error) {
Logger.error(error); Logger.error(error);
earlyReturn = true; earlyReturn = true;
return res.status(402).json({ error: "Error checking team credits. Please contact hello@firecrawl.com for help." }); return res.status(500).json({ error: "Error checking team credits. Please contact hello@firecrawl.com for help." });
} }
}; };
// Async check saves 500ms in average case await checkCredits();
// Don't async check in llm extraction mode as it could be expensive
if (extractorOptions.mode.includes("llm-extraction")) {
await checkCredits();
} else {
checkCredits();
}
const jobId = uuidv4(); const jobId = uuidv4();

View File

@ -46,7 +46,7 @@ export class ScrapeEvents {
}).select().single(); }).select().single();
return (result.data as any).id; return (result.data as any).id;
} catch (error) { } catch (error) {
Logger.error(`Error inserting scrape event: ${error}`); // Logger.error(`Error inserting scrape event: ${error}`);
return null; return null;
} }
} }

View File

@ -23,8 +23,8 @@ describe('scrapSingleUrl', () => {
}, 10000); }, 10000);
}); });
it('should return a list of links on the mendable.ai page', async () => { it('should return a list of links on the firecrawl.ai page', async () => {
const url = 'https://mendable.ai'; const url = 'https://example.com';
const pageOptions: PageOptions = { includeHtml: true }; const pageOptions: PageOptions = { includeHtml: true };
const result = await scrapSingleUrl("TEST", url, pageOptions); const result = await scrapSingleUrl("TEST", url, pageOptions);
@ -33,5 +33,5 @@ it('should return a list of links on the mendable.ai page', async () => {
expect(result.linksOnPage).toBeDefined(); expect(result.linksOnPage).toBeDefined();
expect(Array.isArray(result.linksOnPage)).toBe(true); expect(Array.isArray(result.linksOnPage)).toBe(true);
expect(result.linksOnPage.length).toBeGreaterThan(0); expect(result.linksOnPage.length).toBeGreaterThan(0);
expect(result.linksOnPage).toContain('https://mendable.ai/blog') expect(result.linksOnPage).toContain('https://www.iana.org/domains/example')
}, 10000); }, 10000);

View File

@ -3,9 +3,38 @@ import { withAuth } from "../../lib/withAuth";
import { sendNotification } from "../notification/email_notification"; import { sendNotification } from "../notification/email_notification";
import { supabase_service } from "../supabase"; import { supabase_service } from "../supabase";
import { Logger } from "../../lib/logger"; import { Logger } from "../../lib/logger";
import { getValue, setValue } from "../redis";
import Redlock from "redlock";
import Client from "ioredis";
const FREE_CREDITS = 500; const FREE_CREDITS = 500;
const redlock = new Redlock(
// You should have one client for each independent redis node
// or cluster.
[new Client(process.env.REDIS_RATE_LIMIT_URL)],
{
// The expected clock drift; for more details see:
// http://redis.io/topics/distlock
driftFactor: 0.01, // multiplied by lock ttl to determine drift time
// The max number of times Redlock will attempt to lock a resource
// before erroring.
retryCount: 5,
// the time in ms between attempts
retryDelay: 100, // time in ms
// the max time in ms randomly added to retries
// to improve performance under high contention
// see https://www.awsarchitectureblog.com/2015/03/backoff.html
retryJitter: 200, // time in ms
// The minimum remaining time on a lock before an extension is automatically
// attempted with the `using` API.
automaticExtensionThreshold: 500, // time in ms
}
);
export async function billTeam(team_id: string, credits: number) { export async function billTeam(team_id: string, credits: number) {
return withAuth(supaBillTeam)(team_id, credits); return withAuth(supaBillTeam)(team_id, credits);
} }
@ -254,23 +283,41 @@ export async function supaCheckTeamCredits(team_id: string, credits: number) {
} }
let totalCreditsUsed = 0; let totalCreditsUsed = 0;
const cacheKey = `credit_usage_${subscription.id}_${subscription.current_period_start}_${subscription.current_period_end}_lc`;
const redLockKey = `lock_${cacheKey}`;
const lockTTL = 10000; // 10 seconds
try { try {
const { data: creditUsages, error: creditUsageError } = const lock = await redlock.acquire([redLockKey], lockTTL);
await supabase_service.rpc("get_credit_usage_2", {
sub_id: subscription.id,
start_time: subscription.current_period_start,
end_time: subscription.current_period_end,
});
if (creditUsageError) { try {
Logger.error(`Error calculating credit usage: ${creditUsageError}`); const cachedCreditUsage = await getValue(cacheKey);
}
if (creditUsages && creditUsages.length > 0) { if (cachedCreditUsage) {
totalCreditsUsed = creditUsages[0].total_credits_used; totalCreditsUsed = parseInt(cachedCreditUsage);
} else {
const { data: creditUsages, error: creditUsageError } =
await supabase_service.rpc("get_credit_usage_2", {
sub_id: subscription.id,
start_time: subscription.current_period_start,
end_time: subscription.current_period_end,
});
if (creditUsageError) {
Logger.error(`Error calculating credit usage: ${creditUsageError}`);
}
if (creditUsages && creditUsages.length > 0) {
totalCreditsUsed = creditUsages[0].total_credits_used;
await setValue(cacheKey, totalCreditsUsed.toString(), 1800); // Cache for 30 minutes
// Logger.info(`Cache set for credit usage: ${totalCreditsUsed}`);
}
}
} finally {
await lock.release();
} }
} catch (error) { } catch (error) {
Logger.error(`Error calculating credit usage: ${error}`); Logger.error(`Error acquiring lock or calculating credit usage: ${error}`);
} }
// Adjust total credits used by subtracting coupon value // Adjust total credits used by subtracting coupon value