diff --git a/jaseci_ai_kit/jac_misc/jac_misc/scraper/async_scraper.py b/jaseci_ai_kit/jac_misc/jac_misc/scraper/async_scraper.py index d0f39c330c..7c1fd3defe 100644 --- a/jaseci_ai_kit/jac_misc/jac_misc/scraper/async_scraper.py +++ b/jaseci_ai_kit/jac_misc/jac_misc/scraper/async_scraper.py @@ -1,4 +1,3 @@ -import asyncio import websocket from uuid import uuid4, UUID @@ -6,11 +5,12 @@ from re import search from orjson import dumps from json import loads -from logging import exception, error +from logging import exception from playwright.async_api import async_playwright, Page from websocket import create_connection from jac_misc.scraper.utils import ( + Process, add_url, add_crawl, get_script, @@ -56,7 +56,7 @@ def notify_client( "processing": processing, "pending": [p["goto"]["url"] for p in pages], "scanned": urls["scanned"], - "scraped": urls["scraped"], + "scraped": list(urls["scraped"]), } if content: data["content"] = content @@ -91,19 +91,29 @@ def custom_notify_client(self, target: str, data: dict, trial: int = 0): async def scrape( - pages: list, pre_configs: list = [], detailed: bool = False, target: str = None + pages: list, + pre_configs: list = [], + detailed: bool = False, + target: str = None, + trigger_id: str = None, ): content = "" - urls = {"scanned": [], "scanned_urls": set(), "scraped": [], "crawled": set()} + urls = { + "url": pages[0]["goto"]["url"], + "scanned": {}, + "scanned_urls": set(), + "scraped": set(), + "crawled": set(), + } ws = Client() - trigger_id = uuid4() + trigger_id = trigger_id or str(uuid4()) async with async_playwright() as aspw: browser = await aspw.chromium.launch() page = await browser.new_page() - while pages: + while pages and Process.can_continue(trigger_id): try: pg: dict = pages.pop(0) @@ -111,19 +121,33 @@ async def scrape( url = pg_goto.get("url") or "N/A" page.source = url + Process.has_to_stop(trigger_id) + ws.notify_client( target, trigger_id, pages, urls, {"url": url, "status": "started"} ) await goto(page, pg_goto, urls) + Process.has_to_stop(trigger_id) + content += await getters(page, pg.get("getters") or [], urls) + Process.has_to_stop(trigger_id) + await crawler(page, pg.get("crawler") or {}, urls, pages, pre_configs) ws.notify_client( target, trigger_id, pages, urls, {"url": url, "status": "completed"} ) + except Process.Stopped as e: + add_url(page, urls, error=str(e)) + + ws.notify_client( + target, trigger_id, pages, urls, {"url": url, "status": "stopped"} + ) + + break except Exception as e: add_url(page, urls, error=str(e)) @@ -140,6 +164,7 @@ async def scrape( if detailed: return { + "url": urls["url"], "content": content, "scanned": urls["scanned"], "scraped": urls["scraped"], @@ -156,7 +181,7 @@ async def goto(page: Page, specs: dict, urls: dict): print(f'[goto]: loading {specs["url"]}') await page.goto(**specs) - add_url(page, urls) + add_url(page, urls, await page.title()) await run_scripts(page, post, urls) @@ -191,7 +216,7 @@ async def getters(page: Page, specss: list[dict], urls: dict): elif method == "custom": expression = f'{{{specs.get("expression")}}}' elif method == "none": - expression = '""' + expression = "" else: expression = f"""{{ clone = document.body.cloneNode(true); @@ -202,7 +227,7 @@ async def getters(page: Page, specss: list[dict], urls: dict): if expression: print(f"[getters]: getting content from {page.url}") content += await page.evaluate(f"() =>{expression}") - add_url(page, urls, expression) + add_url(page, urls, await page.title(), expression) await run_scripts(page, post, urls) @@ -258,7 +283,7 @@ async def run_scripts(page: Page, scripts: list[dict], urls: dict): method = script.pop("method", "evalutate") or "evaluate" print(f"[script]: running method {method}\n{str(script)}") await getattr(page, method)(**script) - add_url(page, urls) + add_url(page, urls, await page.title()) async def scrape_preview(page: dict, target: str = None): diff --git a/jaseci_ai_kit/jac_misc/jac_misc/scraper/scraper.py b/jaseci_ai_kit/jac_misc/jac_misc/scraper/scraper.py index 0d0b95ee77..fd73d9644c 100644 --- a/jaseci_ai_kit/jac_misc/jac_misc/scraper/scraper.py +++ b/jaseci_ai_kit/jac_misc/jac_misc/scraper/scraper.py @@ -12,6 +12,7 @@ scrape as async_scrape, scrape_preview as async_scrape_preview, ) +from jac_misc.scraper.utils import Process if any(["uvicorn" in arg for arg in argv]): @@ -21,6 +22,7 @@ class ScraperRequest(BaseModel): pre_configs: list = [] detailed: bool = False target: str = None + trigger_id: str = None is_async: bool = False class ScraperPreviewRequest(BaseModel): @@ -28,6 +30,9 @@ class ScraperPreviewRequest(BaseModel): target: str = None is_async: bool = False + class ScraperStopper(BaseModel): + trigger_id: str = None + app = FastAPI() @app.post("/setup/") @@ -38,10 +43,14 @@ def setup(): async def scrape(sr: ScraperRequest): if sr.is_async: task = asyncio.create_task( - async_scrape(sr.pages, sr.pre_configs, sr.detailed, sr.target) + async_scrape( + sr.pages, sr.pre_configs, sr.detailed, sr.target, sr.trigger_id + ) ) return {"task": task.get_name()} - return await async_scrape(sr.pages, sr.pre_configs, sr.detailed, sr.target) + return await async_scrape( + sr.pages, sr.pre_configs, sr.detailed, sr.target, sr.trigger_id + ) @app.post("/scrape_preview/") async def scrape_preview(spr: ScraperPreviewRequest): @@ -50,12 +59,25 @@ async def scrape_preview(spr: ScraperPreviewRequest): return {"task": task.get_name()} return await async_scrape_preview(spr.page, spr.target) + @app.post("/scrape_stop/") + async def scrape_stop(ss: ScraperStopper): + Process.add(ss.trigger_id) + return {"message": f"Successfully added {ss.trigger_id}"} + @app.get("/jaseci_actions_spec/") def action_list(): return { "wbs.setup": [], - "wbs.scrape": ["pages", "pre_configs", "detailed", "target", "is_async"], + "wbs.scrape": [ + "pages", + "pre_configs", + "detailed", + "target", + "trigger_id", + "is_async", + ], "wbs.scrape_preview": ["page", "target", "is_async"], + "wbs.scrape_stop": ["trigger_id"], } else: @@ -66,10 +88,19 @@ def setup(): @jaseci_action(act_group=["wbs"]) def scrape( - pages: list, pre_configs: list = [], detailed: bool = False, target: str = None + pages: list, + pre_configs: list = [], + detailed: bool = False, + target: str = None, + trigger_id: str = None, ): - return sync_scrape(pages, pre_configs, detailed, target) + return sync_scrape(pages, pre_configs, detailed, target, trigger_id) @jaseci_action(act_group=["wbs"]) def scrape_preview(page: dict, target: str = None): return sync_scrape_preview(page, target) + + @jaseci_action(act_group=["wbs"]) + def scrape_stop(trigger_id): + Process.add(trigger_id) + return {"message": f"Successfully added {trigger_id}"} diff --git a/jaseci_ai_kit/jac_misc/jac_misc/scraper/sync_scraper.py b/jaseci_ai_kit/jac_misc/jac_misc/scraper/sync_scraper.py index 1b773a24a9..3e515e699b 100644 --- a/jaseci_ai_kit/jac_misc/jac_misc/scraper/sync_scraper.py +++ b/jaseci_ai_kit/jac_misc/jac_misc/scraper/sync_scraper.py @@ -4,6 +4,7 @@ from jaseci.jsorc.jsorc import JsOrc from jac_misc.scraper.utils import ( + Process, add_url, add_crawl, get_script, @@ -33,7 +34,7 @@ def notify_client( "processing": processing, "pending": [p["goto"]["url"] for p in pages], "scanned": urls["scanned"], - "scraped": urls["scraped"], + "scraped": list(urls["scraped"]), } if content: data["content"] = content @@ -42,18 +43,28 @@ def notify_client( def scrape( - pages: list, pre_configs: list = [], detailed: bool = False, target: str = None + pages: list, + pre_configs: list = [], + detailed: bool = False, + target: str = None, + trigger_id: str = None, ): content = "" - urls = {"scanned": [], "scanned_urls": set(), "scraped": [], "crawled": set()} + urls = { + "url": pages[0]["goto"]["url"], + "scanned": {}, + "scanned_urls": set(), + "scraped": set(), + "crawled": set(), + } - trigger_id = uuid4() + trigger_id = trigger_id or str(uuid4()) with sync_playwright() as spw: browser = spw.chromium.launch() page = browser.new_page() - while pages: + while pages and Process.can_continue(trigger_id): try: pg: dict = pages.pop(0) @@ -61,19 +72,33 @@ def scrape( url = pg_goto.get("url") or "N/A" page.source = url + Process.has_to_stop(trigger_id) + notify_client( target, trigger_id, pages, urls, {"url": url, "status": "started"} ) goto(page, pg_goto, urls) + Process.has_to_stop(trigger_id) + content += getters(page, pg.get("getters") or [], urls) + Process.has_to_stop(trigger_id) + crawler(page, pg.get("crawler") or {}, urls, pages, pre_configs) notify_client( target, trigger_id, pages, urls, {"url": url, "status": "completed"} ) + except Process.Stopped as e: + add_url(page, urls, error=str(e)) + + notify_client( + target, trigger_id, pages, urls, {"url": url, "status": "stopped"} + ) + + break except Exception as e: add_url(page, urls, error=str(e)) @@ -89,6 +114,7 @@ def scrape( if detailed: return { + "url": urls["url"], "content": content, "scanned": urls["scanned"], "scraped": urls["scraped"], @@ -104,7 +130,7 @@ def goto(page: Page, specs: dict, urls: dict): print(f'[goto]: loading {specs["url"]}') page.goto(**specs) - add_url(page, urls) + add_url(page, urls, page.title()) run_scripts(page, post, urls) @@ -139,7 +165,7 @@ def getters(page: Page, specss: list[dict], urls: dict): elif method == "custom": expression = f'{{{specs.get("expression")}}}' elif method == "none": - expression = '""' + expression = "" else: expression = f"""{{ clone = document.body.cloneNode(true); @@ -150,7 +176,7 @@ def getters(page: Page, specss: list[dict], urls: dict): if expression: print(f"[getters]: getting content from {page.url}") content += page.evaluate(f"() =>{expression}") - add_url(page, urls, expression) + add_url(page, urls, page.title(), expression) run_scripts(page, post, urls) @@ -204,7 +230,7 @@ def run_scripts(page: Page, scripts: list[dict], urls: dict): method = script.pop("method", "evalutate") or "evaluate" print(f"[script]: running method {method}\n{str(script)}") getattr(page, method)(**script) - add_url(page, urls) + add_url(page, urls, page.title()) def scrape_preview(page: dict, target: str): diff --git a/jaseci_ai_kit/jac_misc/jac_misc/scraper/utils.py b/jaseci_ai_kit/jac_misc/jac_misc/scraper/utils.py index 845edcf437..44d23fe7f7 100644 --- a/jaseci_ai_kit/jac_misc/jac_misc/scraper/utils.py +++ b/jaseci_ai_kit/jac_misc/jac_misc/scraper/utils.py @@ -2,22 +2,47 @@ from copy import deepcopy -def add_url(page, urls: dict, scraped: bool = False, error: str = None): +class Process: + queue = set() + + class Stopped(Exception): + pass + + @staticmethod + def add(trigger_id: str): + Process.queue.add(trigger_id) + + @staticmethod + def can_continue(trigger_id: str): + if can := (trigger_id in Process.queue): + Process.queue.remove(trigger_id) + return not can + + @staticmethod + def has_to_stop(trigger_id: str): + if trigger_id in Process.queue: + Process.queue.remove(trigger_id) + raise Process.Stopped() + + +def add_url( + page, urls: dict, title: str = None, scraped: bool = False, error: str = None +): url = page.url source = page.source if url: if url not in urls["scanned_urls"]: urls["scanned_urls"].add(url) - scan = {"url": url} + scan = {"title": title} if error: scan["error"] = error if url != source: scan["source"] = source - urls["scanned"].append(scan) + urls["scanned"][url] = scan - if scraped and url not in urls["scraped"]: - urls["scraped"].append(url) + if scraped: + urls["scraped"].add(url) def add_crawl(pages: list, pre_configs: list, urls: dict, url: str, def_crawl: dict):