Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use identity check for comparison to a singleton #4

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 68 additions & 53 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
# __license__ = "GPL"
# __version__ = "2.1.0"
# __email__ = "a2FpdG9raWQxNDEyLmNvbmFuQGdtYWlsLmNvbQ=="
__black_list_type__ = ['.php']
__black_list_type__ = [".php"]
__status_code__ = [200, 404]
__clone_all__ = False
__zip__ = False
__headless__ = False
__clone_url__ = 'https://themesbrand.com/velzon/html/default/index.html'
__clone_url__ = "https://themesbrand.com/velzon/html/default/index.html"

import os
import os.path
Expand All @@ -29,14 +29,20 @@
def extract_info_url(url, main=False):
data_url = urlparse(url)
domain = data_url.netloc
path_file = domain.replace('.', '') + os.path.split(data_url.path)[0] + '/'
path_file = domain.replace(".", "") + os.path.split(data_url.path)[0] + "/"
file_name = os.path.split(data_url.path)[1]
scheme = data_url.scheme
url_ori = url.replace(file_name, '')
black_list = ['', '/']
if main == True and file_name in black_list:
file_name = 'index.html'
return {"domain": domain, "path": path_file, "file_name": file_name, "scheme": scheme, "url": url_ori}
url_ori = url.replace(file_name, "")
black_list = ["", "/"]
if main is True and file_name in black_list:
file_name = "index.html"
return {
"domain": domain,
"path": path_file,
"file_name": file_name,
"scheme": scheme,
"url": url_ori,
}


def get_all_file_paths(directory):
Expand All @@ -49,13 +55,13 @@ def get_all_file_paths(directory):


def compress(path_folder):
print(f'Compression files... {str(path_folder)}.zip')
print(f"Compression files... {str(path_folder)}.zip")
directory = path_folder
file_paths = get_all_file_paths(directory)
with ZipFile(f'{path_folder}.zip', 'w') as zip:
with ZipFile(f"{path_folder}.zip", "w") as zip:
for file in file_paths:
zip.write(file)
print('All files zipped successfully!')
print("All files zipped successfully!")


def check_invalid(file_name):
Expand All @@ -66,7 +72,7 @@ def check_invalid(file_name):


class File:
info_url = ''
info_url = ""

def __init__(self, url):
self.url = url
Expand All @@ -78,61 +84,67 @@ def download_file(self, url, headers):
if url == self.url:
info_url = extract_info_url(url, True)

if info_url['file_name'][-4:] not in __black_list_type__:
file_name = info_url['file_name']
black_list = ['', '/']
if info_url["file_name"][-4:] not in __black_list_type__:
file_name = info_url["file_name"]
black_list = ["", "/"]
if file_name in black_list:
file_name = 'index.html'
path_file = info_url['path'] + file_name
file_name = "index.html"
path_file = info_url["path"] + file_name
if not os.path.exists(path_file):
r = requests.get(url, headers=headers)
os.makedirs(os.path.dirname(path_file), exist_ok=True)
with open(path_file, 'wb') as f:
with open(path_file, "wb") as f:
f.write(r.content)

def check_exists(self, url):
info_url = extract_info_url(url)
if info_url['domain'] != self.info_url['domain']:
if info_url["domain"] != self.info_url["domain"]:
return False
path_file = info_url['path'] + info_url['file_name']
return os.path.exists(path_file) == False
path_file = info_url["path"] + info_url["file_name"]
return os.path.exists(path_file) is False

def get_all_urls_in_page(self, page_source):
result = []
source = BeautifulSoup(page_source, 'html.parser')
source = BeautifulSoup(page_source, "html.parser")
try:
data_a = source.find_all("a")
except Exception:
data_a = None
a_tag = []
for a in data_a:
if a.get('href') != '' and a.get('href') != '#' and str(a.get('href')) not in a_tag and check_invalid(str(a.get('href'))) is not None:
a_tag.append(a.get('href'))
if (
a.get("href") != ""
and a.get("href") != "#"
and str(a.get("href")) not in a_tag
and check_invalid(str(a.get("href"))) is not None
):
a_tag.append(a.get("href"))

for href in a_tag:
domain = urlparse(href).netloc
if domain == '':
if len(href.split('../')) > 1:
cut = self.info_url['url'].split('/')[-(len(href.split('../'))):]
link = self.info_url['url']
if domain == "":
if len(href.split("../")) > 1:
cut = self.info_url["url"].split(
"/")[-(len(href.split("../"))):]
link = self.info_url["url"]
for text in cut:
if text != '':
link = link.replace(f'{str(text)}/', '')
result.append(link + href.replace('../', ''))
elif href[:1] == '/':
link = re.split('[\/]+', self.info_url['url'])[:2]
link = f'{str(link[0])}//{str(link[1])}'
if text != "":
link = link.replace(f"{str(text)}/", "")
result.append(link + href.replace("../", ""))
elif href[:1] == "/":
link = re.split("[\/]+", self.info_url["url"])[:2]
link = f"{str(link[0])}//{str(link[1])}"
result.append(link + href)
else:
result.append(self.info_url['url'] + href)
if domain == self.info_url['domain']:
result.append(self.info_url["url"] + href)
if domain == self.info_url["domain"]:
result.append(href)
return result


class BrowserClone(File):
driver = ''
page_source = ''
driver = ""
page_source = ""
all_url = []
url_down = []
headers = {}
Expand All @@ -143,23 +155,26 @@ def __init__(self, url):
self.open_browser()

def open_browser(self):
print('============================== Begin ==============================')
print("============================== Begin ==============================")
options = webdriver.ChromeOptions()
if __headless__:
options.add_argument('--headless')
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_experimental_option("useAutomationExtension", False)
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.page_load_strategy = 'none'
options.add_experimental_option(
"excludeSwitches", ["enable-automation"])
options.page_load_strategy = "none"

self.driver = webdriver.Chrome(chrome_options=options, executable_path=ChromeDriverManager().install())
self.driver = webdriver.Chrome(
chrome_options=options, executable_path=ChromeDriverManager().install()
)
self.driver.get(self.url)
print('Waiting 30s to make sure the page has finished loading...')
print("Waiting 30s to make sure the page has finished loading...")
time.sleep(30)
self.set_page_source()
self.extract_file()

print('Getting all the links to crawl...')
print("Getting all the links to crawl...")
all_urls_in_page = super().get_all_urls_in_page(self.page_source)
for url_in_page in all_urls_in_page:
self.all_url.append(url_in_page)
Expand All @@ -172,18 +187,18 @@ def open_browser(self):
self.driver.get(url)
self.extract_file()

print('Get all the links done!')
print("Get all the links done!")
self.extract_file(True)

if __zip__:
url_info = extract_info_url(self.url, True)
folder = './' + url_info['domain'].replace('.', '')
folder = "./" + url_info["domain"].replace(".", "")
compress(folder)
try:
shutil.rmtree(folder, ignore_errors=True)
except OSError as e:
print(f"Error: {folder} : {e.strerror}")
print('============================== End Game ==============================')
print("============================== End Game ==============================")

def extract_html(self, url):
super().__init__(url)
Expand All @@ -197,21 +212,21 @@ def extract_html(self, url):
def extract_file(self, down=False):
for request in self.driver.requests:
if (
request.response
and request.response.status_code in __status_code__
and request.url not in self.url_down
request.response
and request.response.status_code in __status_code__
and request.url not in self.url_down
):
self.url_down.append(request.url)
if down:
print('Save files...')
print("Save files...")
super().__init__(self.url)
data = list(set(self.url_down))
with tqdm(total=len(data)) as pbar:
for file in data:
if super().check_exists(file):
super().download_file(file, self.headers)
pbar.update(1)
print('Save files Done!')
print("Save files Done!")

def set_page_source(self):
for _ in range(5):
Expand Down