Skip to content

Commit

Permalink
[SCRAPER]: Integration stopper and Structure change
Browse files Browse the repository at this point in the history
  • Loading branch information
amadolid committed Jul 4, 2024
1 parent a0e7cfc commit 8722eea
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 30 deletions.
47 changes: 36 additions & 11 deletions jaseci_ai_kit/jac_misc/jac_misc/scraper/async_scraper.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import asyncio
import websocket

from uuid import uuid4, UUID
from os import getenv
from re import search
from orjson import dumps
from json import loads
from logging import exception, error
from logging import exception
from playwright.async_api import async_playwright, Page
from websocket import create_connection

from jac_misc.scraper.utils import (
Process,
add_url,
add_crawl,
get_script,
Expand Down Expand Up @@ -56,7 +56,7 @@ def notify_client(
"processing": processing,
"pending": [p["goto"]["url"] for p in pages],
"scanned": urls["scanned"],
"scraped": urls["scraped"],
"scraped": list(urls["scraped"]),
}
if content:
data["content"] = content
Expand Down Expand Up @@ -91,39 +91,63 @@ def custom_notify_client(self, target: str, data: dict, trial: int = 0):


async def scrape(
pages: list, pre_configs: list = [], detailed: bool = False, target: str = None
pages: list,
pre_configs: list = [],
detailed: bool = False,
target: str = None,
trigger_id: str = None,
):
content = ""
urls = {"scanned": [], "scanned_urls": set(), "scraped": [], "crawled": set()}
urls = {
"url": pages[0]["goto"]["url"],
"scanned": {},
"scanned_urls": set(),
"scraped": set(),
"crawled": set(),
}

ws = Client()
trigger_id = uuid4()
trigger_id = trigger_id or str(uuid4())

async with async_playwright() as aspw:
browser = await aspw.chromium.launch()
page = await browser.new_page()

while pages:
while pages and Process.can_continue(trigger_id):
try:
pg: dict = pages.pop(0)

pg_goto = pg.get("goto") or {}
url = pg_goto.get("url") or "N/A"
page.source = url

Process.has_to_stop(trigger_id)

ws.notify_client(
target, trigger_id, pages, urls, {"url": url, "status": "started"}
)

await goto(page, pg_goto, urls)

Process.has_to_stop(trigger_id)

content += await getters(page, pg.get("getters") or [], urls)

Process.has_to_stop(trigger_id)

await crawler(page, pg.get("crawler") or {}, urls, pages, pre_configs)

ws.notify_client(
target, trigger_id, pages, urls, {"url": url, "status": "completed"}
)
except Process.Stopped as e:
add_url(page, urls, error=str(e))

ws.notify_client(
target, trigger_id, pages, urls, {"url": url, "status": "stopped"}
)

break
except Exception as e:
add_url(page, urls, error=str(e))

Expand All @@ -140,6 +164,7 @@ async def scrape(

if detailed:
return {
"url": urls["url"],
"content": content,
"scanned": urls["scanned"],
"scraped": urls["scraped"],
Expand All @@ -156,7 +181,7 @@ async def goto(page: Page, specs: dict, urls: dict):
print(f'[goto]: loading {specs["url"]}')

await page.goto(**specs)
add_url(page, urls)
add_url(page, urls, await page.title())

await run_scripts(page, post, urls)

Expand Down Expand Up @@ -191,7 +216,7 @@ async def getters(page: Page, specss: list[dict], urls: dict):
elif method == "custom":
expression = f'{{{specs.get("expression")}}}'
elif method == "none":
expression = '""'
expression = ""
else:
expression = f"""{{
clone = document.body.cloneNode(true);
Expand All @@ -202,7 +227,7 @@ async def getters(page: Page, specss: list[dict], urls: dict):
if expression:
print(f"[getters]: getting content from {page.url}")
content += await page.evaluate(f"() =>{expression}")
add_url(page, urls, expression)
add_url(page, urls, await page.title(), expression)

await run_scripts(page, post, urls)

Expand Down Expand Up @@ -258,7 +283,7 @@ async def run_scripts(page: Page, scripts: list[dict], urls: dict):
method = script.pop("method", "evalutate") or "evaluate"
print(f"[script]: running method {method}\n{str(script)}")
await getattr(page, method)(**script)
add_url(page, urls)
add_url(page, urls, await page.title())


async def scrape_preview(page: dict, target: str = None):
Expand Down
41 changes: 36 additions & 5 deletions jaseci_ai_kit/jac_misc/jac_misc/scraper/scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
scrape as async_scrape,
scrape_preview as async_scrape_preview,
)
from jac_misc.scraper.utils import Process


if any(["uvicorn" in arg for arg in argv]):
Expand All @@ -21,13 +22,17 @@ class ScraperRequest(BaseModel):
pre_configs: list = []
detailed: bool = False
target: str = None
trigger_id: str = None
is_async: bool = False

class ScraperPreviewRequest(BaseModel):
page: dict
target: str = None
is_async: bool = False

class ScraperStopper(BaseModel):
trigger_id: str = None

app = FastAPI()

@app.post("/setup/")
Expand All @@ -38,10 +43,14 @@ def setup():
async def scrape(sr: ScraperRequest):
if sr.is_async:
task = asyncio.create_task(
async_scrape(sr.pages, sr.pre_configs, sr.detailed, sr.target)
async_scrape(
sr.pages, sr.pre_configs, sr.detailed, sr.target, sr.trigger_id
)
)
return {"task": task.get_name()}
return await async_scrape(sr.pages, sr.pre_configs, sr.detailed, sr.target)
return await async_scrape(
sr.pages, sr.pre_configs, sr.detailed, sr.target, sr.trigger_id
)

@app.post("/scrape_preview/")
async def scrape_preview(spr: ScraperPreviewRequest):
Expand All @@ -50,12 +59,25 @@ async def scrape_preview(spr: ScraperPreviewRequest):
return {"task": task.get_name()}
return await async_scrape_preview(spr.page, spr.target)

@app.post("/scrape_stop/")
async def scrape_stop(ss: ScraperStopper):
Process.add(ss.trigger_id)
return {"message": f"Successfully added {ss.trigger_id}"}

@app.get("/jaseci_actions_spec/")
def action_list():
return {
"wbs.setup": [],
"wbs.scrape": ["pages", "pre_configs", "detailed", "target", "is_async"],
"wbs.scrape": [
"pages",
"pre_configs",
"detailed",
"target",
"trigger_id",
"is_async",
],
"wbs.scrape_preview": ["page", "target", "is_async"],
"wbs.scrape_stop": ["trigger_id"],
}

else:
Expand All @@ -66,10 +88,19 @@ def setup():

@jaseci_action(act_group=["wbs"])
def scrape(
pages: list, pre_configs: list = [], detailed: bool = False, target: str = None
pages: list,
pre_configs: list = [],
detailed: bool = False,
target: str = None,
trigger_id: str = None,
):
return sync_scrape(pages, pre_configs, detailed, target)
return sync_scrape(pages, pre_configs, detailed, target, trigger_id)

@jaseci_action(act_group=["wbs"])
def scrape_preview(page: dict, target: str = None):
return sync_scrape_preview(page, target)

@jaseci_action(act_group=["wbs"])
def scrape_stop(trigger_id):
Process.add(trigger_id)
return {"message": f"Successfully added {trigger_id}"}
44 changes: 35 additions & 9 deletions jaseci_ai_kit/jac_misc/jac_misc/scraper/sync_scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from jaseci.jsorc.jsorc import JsOrc
from jac_misc.scraper.utils import (
Process,
add_url,
add_crawl,
get_script,
Expand Down Expand Up @@ -33,7 +34,7 @@ def notify_client(
"processing": processing,
"pending": [p["goto"]["url"] for p in pages],
"scanned": urls["scanned"],
"scraped": urls["scraped"],
"scraped": list(urls["scraped"]),
}
if content:
data["content"] = content
Expand All @@ -42,38 +43,62 @@ def notify_client(


def scrape(
pages: list, pre_configs: list = [], detailed: bool = False, target: str = None
pages: list,
pre_configs: list = [],
detailed: bool = False,
target: str = None,
trigger_id: str = None,
):
content = ""
urls = {"scanned": [], "scanned_urls": set(), "scraped": [], "crawled": set()}
urls = {
"url": pages[0]["goto"]["url"],
"scanned": {},
"scanned_urls": set(),
"scraped": set(),
"crawled": set(),
}

trigger_id = uuid4()
trigger_id = trigger_id or str(uuid4())

with sync_playwright() as spw:
browser = spw.chromium.launch()
page = browser.new_page()

while pages:
while pages and Process.can_continue(trigger_id):
try:
pg: dict = pages.pop(0)

pg_goto = pg.get("goto") or {}
url = pg_goto.get("url") or "N/A"
page.source = url

Process.has_to_stop(trigger_id)

notify_client(
target, trigger_id, pages, urls, {"url": url, "status": "started"}
)

goto(page, pg_goto, urls)

Process.has_to_stop(trigger_id)

content += getters(page, pg.get("getters") or [], urls)

Process.has_to_stop(trigger_id)

crawler(page, pg.get("crawler") or {}, urls, pages, pre_configs)

notify_client(
target, trigger_id, pages, urls, {"url": url, "status": "completed"}
)
except Process.Stopped as e:
add_url(page, urls, error=str(e))

notify_client(
target, trigger_id, pages, urls, {"url": url, "status": "stopped"}
)

break
except Exception as e:
add_url(page, urls, error=str(e))

Expand All @@ -89,6 +114,7 @@ def scrape(

if detailed:
return {
"url": urls["url"],
"content": content,
"scanned": urls["scanned"],
"scraped": urls["scraped"],
Expand All @@ -104,7 +130,7 @@ def goto(page: Page, specs: dict, urls: dict):
print(f'[goto]: loading {specs["url"]}')

page.goto(**specs)
add_url(page, urls)
add_url(page, urls, page.title())

run_scripts(page, post, urls)

Expand Down Expand Up @@ -139,7 +165,7 @@ def getters(page: Page, specss: list[dict], urls: dict):
elif method == "custom":
expression = f'{{{specs.get("expression")}}}'
elif method == "none":
expression = '""'
expression = ""
else:
expression = f"""{{
clone = document.body.cloneNode(true);
Expand All @@ -150,7 +176,7 @@ def getters(page: Page, specss: list[dict], urls: dict):
if expression:
print(f"[getters]: getting content from {page.url}")
content += page.evaluate(f"() =>{expression}")
add_url(page, urls, expression)
add_url(page, urls, page.title(), expression)

run_scripts(page, post, urls)

Expand Down Expand Up @@ -204,7 +230,7 @@ def run_scripts(page: Page, scripts: list[dict], urls: dict):
method = script.pop("method", "evalutate") or "evaluate"
print(f"[script]: running method {method}\n{str(script)}")
getattr(page, method)(**script)
add_url(page, urls)
add_url(page, urls, page.title())


def scrape_preview(page: dict, target: str):
Expand Down
Loading

0 comments on commit 8722eea

Please sign in to comment.