This commit is contained in:
yanlong.wang 2024-04-11 17:14:41 +08:00
parent b2f8b11cdc
commit 94e65381bd
No known key found for this signature in database
GPG Key ID: C0A623C0BADF9F37
6 changed files with 303 additions and 74 deletions

View File

@ -32,7 +32,7 @@
"archiver": "^6.0.1",
"axios": "^1.3.3",
"bcrypt": "^5.1.0",
"civkit": "^0.6.5-be430ac",
"civkit": "^0.6.5-326469b",
"cors": "^2.8.5",
"dayjs": "^1.11.9",
"express": "^4.19.2",

View File

@ -1,9 +1,10 @@
import { marshalErrorLike, RPCHost, RPCReflection } from 'civkit';
import { assignTransferProtocolMeta, marshalErrorLike, RPCHost, RPCReflection } from 'civkit';
import { singleton } from 'tsyringe';
import { CloudHTTPv2, Logger, OutputServerEventStream, Param, RPCReflect } from '../shared';
import { CloudHTTPv2, Ctx, Logger, OutputServerEventStream, RPCReflect } from '../shared';
import _ from 'lodash';
import { PuppeteerControl } from '../services/puppeteer';
import { PageSnapshot, PuppeteerControl } from '../services/puppeteer';
import TurnDownService from 'turndown';
import { Request, Response } from 'express';
@singleton()
@ -25,43 +26,115 @@ export class CrawlerHost extends RPCHost {
this.emit('ready');
}
formatSnapshot(snapshot: PageSnapshot) {
const toBeTurnedToMd = snapshot.parsed?.content;
const contentText = toBeTurnedToMd ? this.turnDownService.turndown(toBeTurnedToMd) : snapshot.text;
const formatted = `Title: ${(snapshot.parsed?.title || snapshot.title || '').trim()}
URL Source: ${snapshot.href.trim()}
Markdown Content:
${contentText.trim()}
`;
return formatted;
}
@CloudHTTPv2({
exportInGroup: ['crawler'],
runtime: {
memory: '4GiB',
timeoutSeconds: 540,
},
httpMethod: ['get', 'post'],
returnType: OutputServerEventStream,
returnType: [String, OutputServerEventStream],
})
async crawl(
@RPCReflect() rpcReflect: RPCReflection,
@Param('url', { required: true }) url: string
@Ctx() ctx: {
req: Request,
res: Response,
},
) {
await this.serviceReady();
const sseStream = new OutputServerEventStream();
const url = new URL(ctx.req.url, `${ctx.req.protocol}://${ctx.req.headers.host}`);
const rawPath = url.pathname.split('/').filter(Boolean);
const host = rawPath.shift();
const urlToCrawl = new URL(`${ctx.req.protocol}://${host}/${rawPath.join('/')}`);
urlToCrawl.search = url.search;
rpcReflect.return(sseStream);
if (!ctx.req.accepts('text/plain') && ctx.req.accepts('text/event-stream')) {
const sseStream = new OutputServerEventStream();
rpcReflect.return(sseStream);
try {
for await (const scrapped of this.puppeteerControl.scrap(url)) {
const content = typeof scrapped.snapshot === 'string' ? scrapped.snapshot : (scrapped.snapshot as any)?.content;
if (!content) {
continue;
try {
for await (const scrapped of this.puppeteerControl.scrap(urlToCrawl.toString())) {
if (!scrapped) {
continue;
}
const formatted = this.formatSnapshot(scrapped);
if (scrapped.screenshot) {
sseStream.write({
event: 'screenshot',
data: scrapped.screenshot.toString('base64'),
});
}
sseStream.write({
event: 'data',
data: formatted,
});
}
const text = this.turnDownService.turndown(typeof scrapped.snapshot === 'string' ? scrapped.snapshot : (scrapped.snapshot as any)?.content);
} catch (err: any) {
this.logger.error(`Failed to crawl ${url}`, { err: marshalErrorLike(err) });
sseStream.write({
event: 'data',
data: text,
event: 'error',
data: marshalErrorLike(err),
});
}
} catch (err: any) {
this.logger.error(`Failed to crawl ${url}`, { err: marshalErrorLike(err) });
sseStream.write({
event: 'error',
data: err,
});
sseStream.end();
return sseStream;
}
sseStream.end();
if (!ctx.req.accepts('text/plain') && (ctx.req.accepts('text/json') || ctx.req.accepts('application/json'))) {
for await (const scrapped of this.puppeteerControl.scrap(urlToCrawl.toString())) {
if (!scrapped?.parsed?.content) {
continue;
}
return sseStream;
const formatted = this.formatSnapshot(scrapped);
if (scrapped.screenshot) {
return [
{
type: 'image_url', image_url: {
url: `data:image/jpeg;base64,${scrapped.screenshot.toString('base64')}`,
}
},
{ type: 'text', content: formatted },
];
}
return formatted;
}
}
for await (const scrapped of this.puppeteerControl.scrap(urlToCrawl.toString())) {
if (!scrapped?.parsed?.content) {
continue;
}
const formatted = this.formatSnapshot(scrapped);
return assignTransferProtocolMeta(formatted, { contentType: 'text/plain', envelope: null });
}
throw new Error('Unreachable');
}

View File

@ -0,0 +1,59 @@
import { Also, parseJSONText, Prop } from 'civkit';
import { FirestoreRecord } from '../shared/lib/firestore';
import _ from 'lodash';
@Also({
dictOf: Object
})
export class Crawled extends FirestoreRecord {
static override collectionName = 'crawled';
override _id!: string;
@Prop({
required: true
})
url!: string;
@Prop({
required: true
})
urlPathDigest!: string;
@Prop()
snapshot!: any;
@Prop()
createdAt!: Date;
@Prop()
expireAt!: Date;
static patchedFields = [
'snapshot'
];
static override from(input: any) {
for (const field of this.patchedFields) {
if (typeof input[field] === 'string') {
input[field] = parseJSONText(input[field]);
}
}
return super.from(input) as Crawled;
}
override degradeForFireStore() {
const copy: any = { ...this };
for (const field of (this.constructor as typeof Crawled).patchedFields) {
if (typeof copy[field] === 'object') {
copy[field] = JSON.stringify(copy[field]) as any;
}
}
return copy;
}
[k: string]: any;
}

View File

@ -1,32 +1,31 @@
import 'reflect-metadata';
import * as functions from 'firebase-functions';
import { initializeApp } from 'firebase-admin/app';
initializeApp();
import secretExposer from './shared/services/secrets';
export const onUserCreated = functions
.runWith({ secrets: [...secretExposer.bundle], memory: '512MB' })
.auth.user()
.onCreate(async (user) => {
// export const onUserCreated = functions
// .runWith({ secrets: [...secretExposer.bundle], memory: '512MB' })
// .auth.user()
// .onCreate(async (user) => {
return null;
});
// return null;
// });
export const onUserLogin = functions
.runWith({ secrets: [...secretExposer.bundle], memory: '512MB' })
.auth.user()
.beforeSignIn(async (user, _ctx) => {
// export const onUserLogin = functions
// .runWith({ secrets: [...secretExposer.bundle], memory: '512MB' })
// .auth.user()
// .beforeSignIn(async (user, _ctx) => {
return;
});
// return;
// });
import { loadModulesDynamically, registry } from './shared';
import path from 'path';
loadModulesDynamically(path.resolve(__dirname, 'cloud-functions'));
Object.assign(exports, registry.exportAll());
Object.assign(exports, registry.exportGrouped({
memory: '1GiB',
memory: '4GiB',
timeoutSeconds: 540,
}));
registry.title = 'url2text';

View File

@ -1,14 +1,36 @@
import { AsyncService, Defer } from 'civkit';
import { AsyncService, Defer, HashManager, marshalErrorLike } from 'civkit';
import { container, singleton } from 'tsyringe';
import puppeteer, { Browser } from 'puppeteer';
import { Logger } from '../shared/services/logger';
import genericPool from 'generic-pool';
import os from 'os';
import fs from 'fs';
import { Crawled } from '../db/crawled';
const READABILITY_JS = fs.readFileSync(require.resolve('@mozilla/readability/Readability.js'), 'utf-8');
export interface PageSnapshot {
title: string;
href: string;
html: string;
text: string;
parsed: {
title: string;
content: string;
textContent: string;
length: number;
excerpt: string;
byline: string;
dir: string;
siteName: string;
lang: string;
publishedTime: string;
} | null;
screenshot?: Buffer;
}
const md5Hasher = new HashManager('md5', 'hex');
@singleton()
export class PuppeteerControl extends AsyncService {
@ -24,11 +46,14 @@ export class PuppeteerControl extends AsyncService {
await page.browserContext().close();
},
validate: async (page) => {
return this.browser.connected && !page.isClosed();
return page.browser().connected && !page.isClosed();
}
}, {
max: Math.ceil(os.freemem() / 1024 * 1024 * 1024),
min: 0,
max: 1 + Math.floor(os.freemem() / 1024 * 1024 * 1024),
min: 1,
acquireTimeoutMillis: 15_000,
testOnBorrow: true,
testOnReturn: true,
});
constructor(protected globalLogger: Logger) {
@ -39,7 +64,11 @@ export class PuppeteerControl extends AsyncService {
await this.dependencyReady();
if (this.browser) {
await this.browser.close();
if (this.browser.connected) {
await this.browser.close();
} else {
this.browser.process()?.kill();
}
}
this.browser = await puppeteer.launch({
headless: true,
@ -49,6 +78,7 @@ export class PuppeteerControl extends AsyncService {
this.logger.warn(`Browser disconnected`);
this.emit('crippled');
});
this.logger.info(`Browser launched: ${this.browser.process()?.pid}`);
this.emit('ready');
}
@ -58,26 +88,33 @@ export class PuppeteerControl extends AsyncService {
const dedicatedContext = await this.browser.createBrowserContext();
const page = await dedicatedContext.newPage();
await page.setUserAgent(`Slackbot-LinkExpanding 1.0 (+https://api.slack.com/robots)`);
await page.setViewport({ width: 1920, height: 1080 });
await page.exposeFunction('reportSnapshot', (snapshot: any) => {
const preparations = [];
preparations.push(page.setUserAgent(`Slackbot-LinkExpanding 1.0 (+https://api.slack.com/robots)`));
preparations.push(page.setViewport({ width: 1920, height: 1080 }));
preparations.push(page.exposeFunction('reportSnapshot', (snapshot: any) => {
page.emit('snapshot', snapshot);
});
await page.evaluateOnNewDocument(READABILITY_JS);
await page.evaluateOnNewDocument(() => {
function giveSnapshot() {
// @ts-expect-error
return new Readability(document.cloneNode(true)).parse();
};
}));
preparations.push(page.evaluateOnNewDocument(READABILITY_JS));
preparations.push(page.evaluateOnNewDocument(`
function giveSnapshot() {
return {
title: document.title,
href: document.location.href,
html: document.documentElement.outerHTML,
text: document.body.innerText,
parsed: new Readability(document.cloneNode(true)).parse(),
};
}
`));
preparations.push(page.evaluateOnNewDocument(() => {
let aftershot: any;
const handlePageLoad = () => {
// @ts-expect-error
if (document.readyState !== 'complete' && document.readyState !== 'interactive') {
return;
}
// @ts-expect-error
const parsed = giveSnapshot();
if (parsed) {
// @ts-expect-error
@ -97,16 +134,50 @@ export class PuppeteerControl extends AsyncService {
document.addEventListener('readystatechange', handlePageLoad);
// @ts-expect-error
document.addEventListener('load', handlePageLoad);
});
}));
await Promise.all(preparations);
// TODO: further setup the page;
return page;
}
async *scrap(url: string) {
async *scrap(url: string, noCache: string | boolean = false) {
const parsedUrl = new URL(url);
parsedUrl.search = '';
parsedUrl.hash = '';
const normalizedUrl = parsedUrl.toString().toLowerCase();
const digest = md5Hasher.hash(normalizedUrl);
this.logger.info(`Scraping ${url}, normalized digest: ${digest}`, { url, digest });
let snapshot: PageSnapshot | undefined;
let screenshot: Buffer | undefined;
if (!noCache) {
const cached = (await Crawled.fromFirestoreQuery(Crawled.COLLECTION.where('urlPathDigest', '==', digest).orderBy('createdAt', 'desc').limit(1)))?.[0];
if (cached && cached.createdAt.valueOf() > (Date.now() - 1000 * 300)) {
const age = Date.now() - cached.createdAt.valueOf();
this.logger.info(`Cache hit for ${url}, normalized digest: ${digest}, ${age}ms old`, { url, digest, age });
snapshot = {
...cached.snapshot
};
if (snapshot) {
delete snapshot.screenshot;
}
screenshot = cached.snapshot?.screenshot ? Buffer.from(cached.snapshot.screenshot, 'base64') : undefined;
yield {
...cached.snapshot,
screenshot: cached.snapshot?.screenshot ? Buffer.from(cached.snapshot.screenshot, 'base64') : undefined
};
return;
}
}
const page = await this.pagePool.acquire();
let snapshot: unknown;
let nextSnapshotDeferred = Defer();
let finalized = false;
const hdl = (s: any) => {
@ -118,30 +189,57 @@ export class PuppeteerControl extends AsyncService {
nextSnapshotDeferred = Defer();
};
page.on('snapshot', hdl);
const gotoPromise = page.goto(url, { waitUntil: 'networkidle0', timeout: 30_000 });
gotoPromise.finally(() => finalized = true);
const gotoPromise = page.goto(url, { waitUntil: ['load', 'domcontentloaded', 'networkidle0'], timeout: 30_000 })
.then(async (r) => {
screenshot = await page.screenshot({
type: 'jpeg',
quality: 85,
});
snapshot = await page.evaluate('giveSnapshot()') as PageSnapshot;
this.logger.info(`Snapshot of ${url} done`, { url, digest, title: snapshot?.title, href: snapshot?.href });
const nowDate = new Date();
Crawled.save(
Crawled.from({
url,
createdAt: nowDate,
expireAt: new Date(nowDate.valueOf() + 1000 * 3600 * 24 * 7),
urlPathDigest: digest,
snapshot: { ...snapshot, screenshot: screenshot?.toString('base64') || '' },
}).degradeForFireStore()
).catch((err) => {
this.logger.warn(`Failed to save snapshot`, { err: marshalErrorLike(err) });
});
return r;
});
gotoPromise.catch((err) => {
this.logger.warn(`Browsing of ${url} not fully done`, { err: marshalErrorLike(err) });
}).finally(() => {
finalized = true;
});
try {
while (true) {
await Promise.race([nextSnapshotDeferred.promise, gotoPromise]);
const screenshot = await page.screenshot();
if (finalized) {
await gotoPromise;
snapshot = await page.evaluate('new Readability(document.cloneNode(true)).parse()');
yield { snapshot, screenshot };
yield { ...snapshot, screenshot };
break;
}
yield { snapshot, screenshot };
yield snapshot;
}
} catch (_err) {
void 0;
} finally {
page.off('snapshot', hdl);
await this.pagePool.destroy(page);
gotoPromise.finally(() => {
page.off('snapshot', hdl);
this.pagePool.destroy(page).catch((err) => {
this.logger.warn(`Failed to destroy page`, { err: marshalErrorLike(err) });
});
});
}
}
}
const puppeteerControl = container.resolve(PuppeteerControl);