-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
1.2版本重构了引擎并规范函数调用
- Loading branch information
Showing
2 changed files
with
384 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,281 @@ | ||
import requests | ||
from readability import Document | ||
from bs4 import BeautifulSoup | ||
from tqdm import tqdm | ||
|
||
from pathlib import Path | ||
import re | ||
import os | ||
import time | ||
import json | ||
|
||
|
||
brower = requests.Session() #创建浏览器 | ||
headers = { | ||
'User-Agent': 'Mozilla/5.0 (X11; Linux aarch64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.20 Safari/537.36' | ||
} | ||
headers_phone = { | ||
'User-Agent': 'Mozilla/5.0 (Linux; Android 8.0.0; PRA-AL00X Build/HONORPRA-AL00X; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/77.0.3865.120 MQQBrowser/6.2 TBS/045714 Mobile Safari/537.36' | ||
} | ||
api_url = 'http://url2api.applinzi.com/' # URL2Article API地址,使用体验版或购买独享资源 | ||
|
||
|
||
#get | ||
def gets(url): | ||
try: | ||
#cj = {i.split("=")[0]:i.split("=")[1] for i in cookies.split(";")} | ||
response = brower.get(url=url,headers=headers,timeout=5) | ||
if response.status_code == 200: | ||
return response.text | ||
if response.status_code == 404: | ||
print('网址不存在') | ||
return 'null' | ||
print(response) | ||
print(url+'访问出错') | ||
time.sleep(1) | ||
gets(url) | ||
except: | ||
print(url+'访问崩溃,请检查网络') | ||
time.sleep(1) | ||
gets(url) | ||
|
||
#清洗文件名 | ||
def clean_name(strs): | ||
strs = re.sub(r'/:<>?/\^|@& ', "",strs) | ||
strs = strs.replace('@','') | ||
strs = strs.replace('&','') | ||
strs = strs.replace(' ','_') | ||
# 去除不可见字符 | ||
return strs | ||
|
||
|
||
#下载列表去重 | ||
def dic(info): | ||
new_list = []#创建临时列表 | ||
for a in info: | ||
if a == 'null': | ||
continue | ||
if not a in new_list: | ||
new_list.append(a) | ||
return new_list | ||
|
||
#获取与修改下载列表 | ||
def downlist(set='null'): | ||
if set == 'null': | ||
if os.path.exists('downlist.json'): | ||
with open('downlist.json','r') as f: | ||
list = json.load(f) | ||
return list | ||
return [] | ||
else: | ||
with open('downlist.json','w') as f: | ||
json.dump(set,f) | ||
|
||
def get_set(): | ||
with open('set.json','r') as f: | ||
set = json.load(f) | ||
return set | ||
|
||
def save_set(set): | ||
with open('set.json','w') as f: | ||
json.dump(set,f) | ||
|
||
|
||
#readability接口解析 | ||
def clean_1(data): #获取网站正文文本 | ||
try: | ||
doc = Document(data) | ||
html = doc.summary() | ||
#获取页面标题 | ||
title = doc.title() | ||
#对html进行修正以使img正常加载 | ||
need = '<figure class="img-box" contenteditable="false">' | ||
html = html.replace(need,'<figure>') | ||
need = '<figure contenteditable="false" class="img-box">' | ||
html = html.replace(need,'<figure>') | ||
html = html.replace('data-src','src') | ||
html = html.replace('src="//','src="https://') | ||
#获取txt格式 | ||
txt = doc.summary().replace('</p>','\n')#doc.summary()为提取的网页正文但包含html控制符,这一步是替换换行符 | ||
txt = re.sub(r'</?\w+[^>]*>', '', txt) #这一步是去掉<****>内的内容 | ||
#创个字典存放数据 | ||
fine = {} | ||
fine['title'] = title | ||
fine['html'] = html | ||
fine['txt'] = txt | ||
except: | ||
#创个字典存放数据 | ||
fine = {} | ||
fine['title'] = 'null' | ||
fine['html'] = 'null' | ||
fine['txt'] = 'null' | ||
return fine | ||
return fine | ||
|
||
#url2io接口 | ||
def clean_2(url): | ||
token = 'MeTlIPmSRjmWgaNa9GzDAw' # 开发者 token, 注册后可得,注意保密 | ||
fields = ','.join(['next','txt']) # 可选字段# HTTP Get | ||
result = brower.get(api_url+'article?token='+token+'&url='+url+'&fields='+fields).json() | ||
#创个字典存放数据 | ||
fine = {} | ||
fine['title'] = result['title'] | ||
fine['html'] = result['content'] | ||
return fine | ||
|
||
def save_json(name,data): | ||
with open(name,'w',encoding='utf-8') as f: | ||
json.dump(data,f) | ||
|
||
def save_txt(name,data): | ||
with open(name,'w',encoding='utf-8') as f: | ||
f.write(data) | ||
|
||
#下载图片 | ||
def down_img(img,save_dir): | ||
save_dir = str(save_dir) | ||
sep = '\n' | ||
if img == 'null': | ||
return | ||
with open(Path(save_dir+'/url.txt'),'w') as f: | ||
f.write(sep.join(img)) | ||
for url in tqdm(img,desc='图片下载中:',unit='img'): | ||
os.system('wget -N -nv '+url+' -P '+save_dir) | ||
#os.system('aria2c --quiet true -j 10 --continue=true --dir="'+str(Path(save_dir))+'" -i "'+str(Path(save_dir+'/url.txt'))+'"') | ||
return | ||
|
||
#lofter文章信息获取 | ||
def lofter_info(data): | ||
try: | ||
html_txt = data | ||
sp = BeautifulSoup(data,'lxml') | ||
#标题提取 | ||
title_data = sp.find_all(name="meta") | ||
title = 'null' | ||
for data in title_data: | ||
if "Description" in str(data): | ||
title_1 = data['content'] | ||
break | ||
title_2 = re.findall("<title>(.*?)</title>",html_txt)[0] | ||
if len(title_1) > len(title_2): | ||
title = title_2 | ||
else: | ||
title = title_1 | ||
if len(title) >= 60: | ||
title = str(title)[0:60] | ||
#图片提取 | ||
image_data = sp.find_all(imggroup="gal") | ||
if len(image_data) != 0: | ||
image_url = re.findall('src="(https://.*?g)\?',str(image_data)) | ||
#整理数据 | ||
info = {} | ||
info['html'] = html_txt | ||
info['title'] = title | ||
if len(image_data) != 0: | ||
info['image'] = dic(image_url) | ||
if len(image_data) == 0: | ||
info['image'] = 'null' | ||
except: | ||
try: | ||
info = {} | ||
info['html'] = data | ||
info['title'] = title | ||
if len(image_data) != 0: | ||
info['image'] = dic(image_url) | ||
if len(image_data) == 0: | ||
info['image'] = 'null' | ||
print(info['title']) | ||
return info | ||
except: | ||
info = {} | ||
info['html'] = 'null' | ||
info['title'] = 'null' | ||
info['image'] = 'null' | ||
print(info['title']) | ||
return info | ||
print(info['title']) | ||
return info | ||
|
||
#提取页面文章列表 | ||
def lofter_post_list(url): | ||
page = 0 | ||
post = [] | ||
while 1==1: | ||
page += 1 | ||
if page == 1: | ||
post_url = url | ||
else: | ||
post_url = url+'?page='+str(page) | ||
data = gets(post_url) | ||
if data == 'null': | ||
return 'null' | ||
kk = re.compile('"(https://.*?\.lofter\.com/post/.*?_.*?)"') | ||
post_list = kk.findall(data) | ||
print('正在解析第'+str(page)+'页') | ||
post_list = dic(post_list) | ||
if len(dic(post + post_list)) - len(post) == 0: | ||
break | ||
post = dic(post + post_list) | ||
print('本页提取'+str(len(post_list))+'个文章') | ||
print('已提取'+str(len(post))+'个文章') | ||
print('共提取'+str(len(post))+'个文章') | ||
return post | ||
|
||
def lofter_down(data,local_dir): | ||
text = clean_1(data['html']) #清洗文本 | ||
#检查是否有有效文本信息 | ||
test = text['txt'].replace('\n','') | ||
if test == '': | ||
test = 'null' | ||
data['txt'] = text['txt'] | ||
if data['title'] == 'null': | ||
return | ||
print('数据分析完毕') | ||
#创建根文件夹 | ||
save_dir =str(Path(str(local_dir)+'/'+clean_name(data['title'])).resolve()) | ||
a = 0 | ||
if Path(save_dir).exists() : | ||
while 1==1: | ||
a += 1 | ||
if Path(save_dir+str(a)).exists() : | ||
continue | ||
save_dir = save_dir+str(a) | ||
break | ||
os.makedirs(save_dir,exist_ok=True) | ||
|
||
#保存元数据为json以便调用 | ||
print('保存元数据') | ||
save_json(Path(save_dir+'/entry.json'),data) | ||
|
||
#创建下载 | ||
if len(data['image']) != 0: | ||
print('启动媒体数据本地化') | ||
os.makedirs(Path(save_dir+'/img'),exist_ok=True)#创建临时文件夹 | ||
down_img(data['image'],Path(save_dir+'/img')) | ||
#修正aria下载图片文件名错误 | ||
print('修正下载文件名错误') | ||
down_name = os.listdir(Path(save_dir+'/img')) | ||
for change_name in down_name: | ||
if 'img%2F' in change_name: | ||
#捕获错误段 | ||
true_name = change_name.replace('%2F','') | ||
#true_name = true_name[3:] | ||
os.rename(Path(save_dir+'/img/'+change_name),Path(save_dir+'/img/'+true_name)) | ||
print('媒体文件下载完毕') | ||
|
||
#html本地化 | ||
# 截取图片文件名 | ||
new_img_list = [] | ||
for url in data['image']: | ||
img_name = url.split('/').pop() | ||
new_img_list.append(img_name) | ||
for num in range(len(new_img_list)): | ||
data['html'] = data['html'].replace(data['image'][num],'/img/'+new_img_list[num]) | ||
#创建index.html | ||
if data['html'] != 'null': | ||
save_txt(Path(save_dir+'/index.html'),data['html']) | ||
if data['txt'] != 'null': | ||
save_txt(Path(save_dir+'/index.txt'),data['txt']) | ||
print('索引链接创建完成') | ||
print('本地化完成') | ||
return |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
import requests | ||
from readability import Document | ||
from bs4 import BeautifulSoup | ||
|
||
from pathlib import Path | ||
import os | ||
import time | ||
import sys | ||
from urllib.parse import quote | ||
|
||
from tqdm import tqdm | ||
|
||
import core | ||
|
||
# 初始化 | ||
local_time = time.strftime("%y/%m/%d", time.localtime()) | ||
if not Path('set.json').exists(): | ||
local_dir = str(Path(str(Path.cwd().resolve())+'/Download')) | ||
set = {} | ||
set['download'] = local_dir | ||
core.save_set(set) | ||
if Path('set.json').exists(): | ||
set = core.get_set() | ||
|
||
|
||
def start_download(answer): | ||
if '/post/' in answer: # 单文章下载 | ||
info = core.gets(answer) | ||
info = core.lofter_info(info) | ||
if info['title'] == 'null': | ||
print('链接错误,请重试')#发送消息 | ||
return 'error' | ||
else: | ||
print(info['title']+' 下载中')#发送消息 | ||
core.lofter_down(info,set['download']) | ||
print(info['title']+' 下载完成')#发送消息 | ||
return 'fine' | ||
elif 'lofter.com' in answer: | ||
list = core.lofter_post_list(answer) | ||
if list == 'null': | ||
print('链接错误,请重试')#发送消息 | ||
return 'error' | ||
else: | ||
answer = input('批量下载任务解析完成,共有'+str(len(list))+'个任务(回车开始下载,输入任意内容退出下载:') | ||
if not str(answer) == '': | ||
print('下载已退出') | ||
return 'fine' | ||
print('批量下载任务已开始,共有'+str(len(list))+'个任务')#发送消息 | ||
for url in tqdm(list,desc='批量下载进行中:',unit='doc'): | ||
info = core.gets(url) | ||
if info == 'null': | ||
print('链接错误,自动跳过')#发送消息 | ||
continue | ||
info = core.lofter_info(info) | ||
if info == 'null': | ||
print('链接错误,自动跳过')#发送消息 | ||
continue | ||
core.lofter_down(info, set['download']) | ||
print(info['title']+' 下载完成')#发送消息 | ||
print('批量任务已完成,共下载'+str(len(list))+'个文章')#发送消息 | ||
print('链接错误,请重试') | ||
return 'error' | ||
|
||
|
||
|
||
|
||
# tui创建 | ||
#主函数 | ||
print('LofterSaver 1.2 Dev') | ||
print('Power by python') | ||
print('Made in Mr.G') | ||
print('程序初始化。。。。') | ||
|
||
while 1==1: | ||
answer = input('请粘贴需解析地址(输入set调整设置,输入exit退出):') | ||
if answer == 'set': | ||
while 1==1: | ||
set = core.get_set() | ||
print('1.当前下载目录:'+set['download']) | ||
answer = input('输入1修改设置(输入0退出设置):') | ||
if answer == '0': | ||
break | ||
if answer == '1': | ||
answer = input('输入新指定的下载目录(若留空指定为'+str(Path(str(Path.cwd().resolve())+'/Download):'))) | ||
if answer == '': | ||
set['download'] = str(Path(str(Path.cwd().resolve())+'/Download')) | ||
core.save_set(set) | ||
print('已修改下载目录') | ||
continue | ||
if answer != '': | ||
os.makedirs(answer,exist_ok=True) | ||
if os.path.exists(answer): | ||
print('下载目录可用') | ||
set['download'] = answer | ||
core.save_set(set) | ||
print('已修改下载目录') | ||
continue | ||
print('输入不正确') | ||
continue | ||
if answer == 'exit': | ||
sys.exit() | ||
code = start_download(answer) | ||
exit() |