Skip to content

Commit 5f6f2b8

Browse files
committed
[SCRAPER]: Integration stopper and Structure change
1 parent a0e7cfc commit 5f6f2b8

File tree

4 files changed

+132
-27
lines changed

4 files changed

+132
-27
lines changed

jaseci_ai_kit/jac_misc/jac_misc/scraper/async_scraper.py

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
1-
import asyncio
21
import websocket
32

43
from uuid import uuid4, UUID
54
from os import getenv
65
from re import search
76
from orjson import dumps
87
from json import loads
9-
from logging import exception, error
8+
from logging import exception
109
from playwright.async_api import async_playwright, Page
1110
from websocket import create_connection
1211

1312
from jac_misc.scraper.utils import (
13+
Process,
1414
add_url,
1515
add_crawl,
1616
get_script,
@@ -91,39 +91,63 @@ def custom_notify_client(self, target: str, data: dict, trial: int = 0):
9191

9292

9393
async def scrape(
94-
pages: list, pre_configs: list = [], detailed: bool = False, target: str = None
94+
pages: list,
95+
pre_configs: list = [],
96+
detailed: bool = False,
97+
target: str = None,
98+
trigger_id: str = None,
9599
):
96100
content = ""
97-
urls = {"scanned": [], "scanned_urls": set(), "scraped": [], "crawled": set()}
101+
urls = {
102+
"url": pages[0]["goto"]["url"],
103+
"scanned": {},
104+
"scanned_urls": set(),
105+
"scraped": set(),
106+
"crawled": set(),
107+
}
98108

99109
ws = Client()
100-
trigger_id = uuid4()
110+
trigger_id = trigger_id or str(uuid4())
101111

102112
async with async_playwright() as aspw:
103113
browser = await aspw.chromium.launch()
104114
page = await browser.new_page()
105115

106-
while pages:
116+
while pages and Process.can_continue(trigger_id):
107117
try:
108118
pg: dict = pages.pop(0)
109119

110120
pg_goto = pg.get("goto") or {}
111121
url = pg_goto.get("url") or "N/A"
112122
page.source = url
113123

124+
Process.has_to_stop(trigger_id)
125+
114126
ws.notify_client(
115127
target, trigger_id, pages, urls, {"url": url, "status": "started"}
116128
)
117129

118130
await goto(page, pg_goto, urls)
119131

132+
Process.has_to_stop(trigger_id)
133+
120134
content += await getters(page, pg.get("getters") or [], urls)
121135

136+
Process.has_to_stop(trigger_id)
137+
122138
await crawler(page, pg.get("crawler") or {}, urls, pages, pre_configs)
123139

124140
ws.notify_client(
125141
target, trigger_id, pages, urls, {"url": url, "status": "completed"}
126142
)
143+
except Process.Stopped as e:
144+
add_url(page, urls, error=str(e))
145+
146+
ws.notify_client(
147+
target, trigger_id, pages, urls, {"url": url, "status": "stopped"}
148+
)
149+
150+
break
127151
except Exception as e:
128152
add_url(page, urls, error=str(e))
129153

@@ -140,6 +164,7 @@ async def scrape(
140164

141165
if detailed:
142166
return {
167+
"url": urls["url"],
143168
"content": content,
144169
"scanned": urls["scanned"],
145170
"scraped": urls["scraped"],
@@ -156,7 +181,7 @@ async def goto(page: Page, specs: dict, urls: dict):
156181
print(f'[goto]: loading {specs["url"]}')
157182

158183
await page.goto(**specs)
159-
add_url(page, urls)
184+
add_url(page, urls, await page.title())
160185

161186
await run_scripts(page, post, urls)
162187

@@ -191,7 +216,7 @@ async def getters(page: Page, specss: list[dict], urls: dict):
191216
elif method == "custom":
192217
expression = f'{{{specs.get("expression")}}}'
193218
elif method == "none":
194-
expression = '""'
219+
expression = ""
195220
else:
196221
expression = f"""{{
197222
clone = document.body.cloneNode(true);
@@ -202,7 +227,7 @@ async def getters(page: Page, specss: list[dict], urls: dict):
202227
if expression:
203228
print(f"[getters]: getting content from {page.url}")
204229
content += await page.evaluate(f"() =>{expression}")
205-
add_url(page, urls, expression)
230+
add_url(page, urls, await page.title(), expression)
206231

207232
await run_scripts(page, post, urls)
208233

@@ -258,7 +283,7 @@ async def run_scripts(page: Page, scripts: list[dict], urls: dict):
258283
method = script.pop("method", "evalutate") or "evaluate"
259284
print(f"[script]: running method {method}\n{str(script)}")
260285
await getattr(page, method)(**script)
261-
add_url(page, urls)
286+
add_url(page, urls, await page.title())
262287

263288

264289
async def scrape_preview(page: dict, target: str = None):

jaseci_ai_kit/jac_misc/jac_misc/scraper/scraper.py

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
scrape as async_scrape,
1313
scrape_preview as async_scrape_preview,
1414
)
15+
from jac_misc.scraper.utils import Process
1516

1617

1718
if any(["uvicorn" in arg for arg in argv]):
@@ -21,13 +22,17 @@ class ScraperRequest(BaseModel):
2122
pre_configs: list = []
2223
detailed: bool = False
2324
target: str = None
25+
trigger_id: str = None
2426
is_async: bool = False
2527

2628
class ScraperPreviewRequest(BaseModel):
2729
page: dict
2830
target: str = None
2931
is_async: bool = False
3032

33+
class ScraperStopper(BaseModel):
34+
trigger_id: str = None
35+
3136
app = FastAPI()
3237

3338
@app.post("/setup/")
@@ -38,7 +43,9 @@ def setup():
3843
async def scrape(sr: ScraperRequest):
3944
if sr.is_async:
4045
task = asyncio.create_task(
41-
async_scrape(sr.pages, sr.pre_configs, sr.detailed, sr.target)
46+
async_scrape(
47+
sr.pages, sr.pre_configs, sr.detailed, sr.target, sr.trigger_id
48+
)
4249
)
4350
return {"task": task.get_name()}
4451
return await async_scrape(sr.pages, sr.pre_configs, sr.detailed, sr.target)
@@ -50,12 +57,25 @@ async def scrape_preview(spr: ScraperPreviewRequest):
5057
return {"task": task.get_name()}
5158
return await async_scrape_preview(spr.page, spr.target)
5259

60+
@app.post("/scrape_stop/")
61+
async def scrape_stop(ss: ScraperStopper):
62+
Process.add(ss.trigger_id)
63+
return {"message": f"Successfully added {ss.trigger_id}"}
64+
5365
@app.get("/jaseci_actions_spec/")
5466
def action_list():
5567
return {
5668
"wbs.setup": [],
57-
"wbs.scrape": ["pages", "pre_configs", "detailed", "target", "is_async"],
69+
"wbs.scrape": [
70+
"pages",
71+
"pre_configs",
72+
"detailed",
73+
"target",
74+
"trigger_id",
75+
"is_async",
76+
],
5877
"wbs.scrape_preview": ["page", "target", "is_async"],
78+
"wbs.scrape_stop": ["trigger_id"],
5979
}
6080

6181
else:
@@ -66,10 +86,19 @@ def setup():
6686

6787
@jaseci_action(act_group=["wbs"])
6888
def scrape(
69-
pages: list, pre_configs: list = [], detailed: bool = False, target: str = None
89+
pages: list,
90+
pre_configs: list = [],
91+
detailed: bool = False,
92+
target: str = None,
93+
trigger_id: str = None,
7094
):
71-
return sync_scrape(pages, pre_configs, detailed, target)
95+
return sync_scrape(pages, pre_configs, detailed, target, trigger_id)
7296

7397
@jaseci_action(act_group=["wbs"])
7498
def scrape_preview(page: dict, target: str = None):
7599
return sync_scrape_preview(page, target)
100+
101+
@jaseci_action(act_group=["wbs"])
102+
def scrape_stop(trigger_id):
103+
Process.add(trigger_id)
104+
return {"message": f"Successfully added {trigger_id}"}

jaseci_ai_kit/jac_misc/jac_misc/scraper/sync_scraper.py

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from jaseci.jsorc.jsorc import JsOrc
66
from jac_misc.scraper.utils import (
7+
Process,
78
add_url,
89
add_crawl,
910
get_script,
@@ -42,38 +43,62 @@ def notify_client(
4243

4344

4445
def scrape(
45-
pages: list, pre_configs: list = [], detailed: bool = False, target: str = None
46+
pages: list,
47+
pre_configs: list = [],
48+
detailed: bool = False,
49+
target: str = None,
50+
trigger_id: str = None,
4651
):
4752
content = ""
48-
urls = {"scanned": [], "scanned_urls": set(), "scraped": [], "crawled": set()}
53+
urls = {
54+
"url": pages[0]["goto"]["url"],
55+
"scanned": {},
56+
"scanned_urls": set(),
57+
"scraped": set(),
58+
"crawled": set(),
59+
}
4960

50-
trigger_id = uuid4()
61+
trigger_id = trigger_id or str(uuid4())
5162

5263
with sync_playwright() as spw:
5364
browser = spw.chromium.launch()
5465
page = browser.new_page()
5566

56-
while pages:
67+
while pages and Process.can_continue(trigger_id):
5768
try:
5869
pg: dict = pages.pop(0)
5970

6071
pg_goto = pg.get("goto") or {}
6172
url = pg_goto.get("url") or "N/A"
6273
page.source = url
6374

75+
Process.has_to_stop(trigger_id)
76+
6477
notify_client(
6578
target, trigger_id, pages, urls, {"url": url, "status": "started"}
6679
)
6780

6881
goto(page, pg_goto, urls)
6982

83+
Process.has_to_stop(trigger_id)
84+
7085
content += getters(page, pg.get("getters") or [], urls)
7186

87+
Process.has_to_stop(trigger_id)
88+
7289
crawler(page, pg.get("crawler") or {}, urls, pages, pre_configs)
7390

7491
notify_client(
7592
target, trigger_id, pages, urls, {"url": url, "status": "completed"}
7693
)
94+
except Process.Stopped as e:
95+
add_url(page, urls, error=str(e))
96+
97+
notify_client(
98+
target, trigger_id, pages, urls, {"url": url, "status": "stopped"}
99+
)
100+
101+
break
77102
except Exception as e:
78103
add_url(page, urls, error=str(e))
79104

@@ -89,6 +114,7 @@ def scrape(
89114

90115
if detailed:
91116
return {
117+
"url": urls["url"],
92118
"content": content,
93119
"scanned": urls["scanned"],
94120
"scraped": urls["scraped"],
@@ -104,7 +130,7 @@ def goto(page: Page, specs: dict, urls: dict):
104130
print(f'[goto]: loading {specs["url"]}')
105131

106132
page.goto(**specs)
107-
add_url(page, urls)
133+
add_url(page, urls, page.title())
108134

109135
run_scripts(page, post, urls)
110136

@@ -139,7 +165,7 @@ def getters(page: Page, specss: list[dict], urls: dict):
139165
elif method == "custom":
140166
expression = f'{{{specs.get("expression")}}}'
141167
elif method == "none":
142-
expression = '""'
168+
expression = ""
143169
else:
144170
expression = f"""{{
145171
clone = document.body.cloneNode(true);
@@ -150,7 +176,7 @@ def getters(page: Page, specss: list[dict], urls: dict):
150176
if expression:
151177
print(f"[getters]: getting content from {page.url}")
152178
content += page.evaluate(f"() =>{expression}")
153-
add_url(page, urls, expression)
179+
add_url(page, urls, page.title(), expression)
154180

155181
run_scripts(page, post, urls)
156182

@@ -204,7 +230,7 @@ def run_scripts(page: Page, scripts: list[dict], urls: dict):
204230
method = script.pop("method", "evalutate") or "evaluate"
205231
print(f"[script]: running method {method}\n{str(script)}")
206232
getattr(page, method)(**script)
207-
add_url(page, urls)
233+
add_url(page, urls, page.title())
208234

209235

210236
def scrape_preview(page: dict, target: str):

jaseci_ai_kit/jac_misc/jac_misc/scraper/utils.py

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,47 @@
22
from copy import deepcopy
33

44

5-
def add_url(page, urls: dict, scraped: bool = False, error: str = None):
5+
class Process:
6+
queue = set()
7+
8+
class Stopped(Exception):
9+
pass
10+
11+
@staticmethod
12+
def add(trigger_id: str):
13+
Process.queue.add(trigger_id)
14+
15+
@staticmethod
16+
def can_continue(trigger_id: str):
17+
if can := (trigger_id in Process.queue):
18+
Process.queue.remove(trigger_id)
19+
return not can
20+
21+
@staticmethod
22+
def has_to_stop(trigger_id: str):
23+
if trigger_id in Process.queue:
24+
Process.queue.remove(trigger_id)
25+
raise Process.Stopped()
26+
27+
28+
def add_url(
29+
page, urls: dict, title: str = None, scraped: bool = False, error: str = None
30+
):
631
url = page.url
732
source = page.source
833
if url:
934
if url not in urls["scanned_urls"]:
1035
urls["scanned_urls"].add(url)
1136

12-
scan = {"url": url}
37+
scan = {"title": title}
1338
if error:
1439
scan["error"] = error
1540
if url != source:
1641
scan["source"] = source
17-
urls["scanned"].append(scan)
42+
urls["scanned"][url] = scan
1843

19-
if scraped and url not in urls["scraped"]:
20-
urls["scraped"].append(url)
44+
if scraped:
45+
urls["scraped"].add(url)
2146

2247

2348
def add_crawl(pages: list, pre_configs: list, urls: dict, url: str, def_crawl: dict):

0 commit comments

Comments
 (0)