Skip to content

Commit

Permalink
- async support
Browse files Browse the repository at this point in the history
- added async tests
  • Loading branch information
EchterAlsFake committed Jan 11, 2025
1 parent a9b738d commit b647a70
Show file tree
Hide file tree
Showing 11 changed files with 176 additions and 203 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
name: EPorner API test
name: Async API test

permissions:
contents: read
pull-requests: write

on:
push:
Expand All @@ -20,7 +24,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest
pip install pytest pytest-asyncio
pip install .
- name: Test with pytest
run: |
Expand Down
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

<div align="center">
<a href="https://pepy.tech/project/Eporner-API"><img src="https://static.pepy.tech/badge/Eporner-API" alt="Downloads"></a>
<a href="https://pepy.tech/project/Eporner-API-async"><img src="https://static.pepy.tech/badge/Eporner-API-async" alt="Downloads"></a> <span style="font-size: 20px">Async</span>
<a href="https://github.com/EchterAlsFake/EPorner_API/workflows/"><img src="https://github.com/EchterAlsFake/EPorner_API/workflows/CodeQL/badge.svg" alt="CodeQL Analysis"/></a>
<a href="https://github.com/EchterAlsFake/EPorner_API/workflows/"><img src="https://github.com/EchterAlsFake/EPorner_API/actions/workflows/tests.yml/badge.svg" alt="API Tests"/></a>
<a href="https://github.com/EchterAlsFake/EPorner_API/workflows/"><img src="https://github.com/EchterAlsFake/EPorner_API/actions/workflows/tests.yml/badge.svg" alt="Sync API Tests"/></a>
<a href="https://github.com/EchterAlsFake/EPorner_API/workflows/"><img src="https://github.com/EchterAlsFake/EPorner_API/actions/workflows/async-tests.yml/badge.svg?branch=async" alt="Async API Tests"/></a>
</div>

# Description
Expand Down Expand Up @@ -33,9 +35,6 @@ EPorner API is an API for EPorner, which allows you to fetch information from vi
More will be coming in the next versions!

> [!NOTE]
> GitHub tests are failing for some weird reason, but the API itself is perfectly working!
# Quickstart

### Have a look at the [Documentation](https://github.com/EchterAlsFake/API_Docs/blob/master/Porn_APIs/EPorner.md) for more details
Expand Down
9 changes: 8 additions & 1 deletion README/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,11 @@ Quality.BEST would translate to "best"
- Fixed #4, but I am not entirely if this issue is really completely fixed lol
- updated to eaf base api v2
- fixed pornstar pagination
- fixed quality selection
- fixed quality selection

# 1.8.1 - 1.8.4
- async support
- fixed downloading
- fixed quality selection
- fixed tests
- added async tests
217 changes: 121 additions & 96 deletions eporner_api/eporner_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import html
import asyncio
import json
import logging
import argparse
Expand All @@ -22,7 +22,7 @@
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from base_api.base import BaseCore
from typing import Generator, Union
from typing import Union, List
from functools import cached_property

"""
Expand Down Expand Up @@ -64,21 +64,84 @@ def disable_logging():
logger.setLevel(logging.CRITICAL)


def extract_json_from_html(html_content):
soup = BeautifulSoup(html_content, 'html.parser')
script_tags = soup.find_all('script', {'type': 'application/ld+json'})

combined_data = {}

for script in script_tags:
json_text = script.string.strip()
try:
data = json.loads(json_text)

except json.decoder.JSONDecodeError:
raise InvalidVideo("""
JSONDecodeError: I need your help to fix this error. Please report the URL you've used on GitHub. Thanks :)""")

combined_data.update(data)

cleaned_dictionary =flatten_json(combined_data)
return cleaned_dictionary


def flatten_json(nested_json, parent_key='', sep='_'):
"""
Flatten a nested JSON dictionary. Duplicate keys will be overridden.
:param nested_json: The nested JSON dictionary to be flattened.
:param parent_key: The base key to use for the flattened keys.
:param sep: The separator between nested keys.
:return: A flattened dictionary.
"""
items = []
for k, v in nested_json.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(flatten_json(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)


class Video:
def __init__(self, url: str, enable_html_scraping: bool = False):
def __init__(self, url: str, enable_html_scraping: bool = False, content: str = None, json_data: dict = None,
html_json_data: dict = None) -> None:
self.url = url
self.enable_html = enable_html_scraping
self.html_content = None
self.json_data = self.raw_json_data()
if self.enable_html:
self.request_html_content()
is_removed = REGEX_VIDEO_DISABLED.findall(self.html_content)
self.html_content = content
self.html_json_data = html_json_data
self.json_data = json_data

@classmethod
async def create(cls, url: str, enable_html_scraping: bool = False):
if enable_html_scraping:
html_content = await core.fetch(url)
is_removed = REGEX_VIDEO_DISABLED.findall(html_content)
for _ in is_removed:
if _ == "deletedfile":
raise VideoDisabled("Video has been removed because of a Copyright claim")

self.html_json_data = self.extract_json_from_html()
if str(url).startswith("https://"):
video_id = REGEX_ID.search(url)
if video_id:
id = video_id.group(1)

else:
video_id = REGEX_ID_ALTERNATE.search(url)
id = video_id.group(1)

data = await core.fetch(f"{ROOT_URL}{API_VIDEO_ID}?id={id}&thumbsize=medium&format=json")
json_data = json.loads(data)
html_json_data = extract_json_from_html(html_content)

else:
html_content = None
data = await core.fetch(f"{ROOT_URL}{API_VIDEO_ID}?id={id}&thumbsize=medium&format=json")
json_data = json.loads(data)
html_json_data = None

return cls(url, enable_html_scraping, html_content, json_data, html_json_data)

@cached_property
def video_id(self) -> str:
Expand All @@ -102,16 +165,6 @@ def video_id(self) -> str:
else:
return self.url # Assuming this is a video ID (hopefully)

def raw_json_data(self):
"""
Uses the V2 API to retrieve information from a video
:return:
"""

data = core.fetch(f"{ROOT_URL}{API_VIDEO_ID}?id={self.video_id}&thumbsize=medium&format=json")
parsed_data = json.loads(data)
return parsed_data

@cached_property
def tags(self) -> list:
tags = []
Expand Down Expand Up @@ -170,54 +223,6 @@ def thumbnail(self):
The following methods are using HTML scraping. This is against the ToS from EPorner.com!
"""

def request_html_content(self):
if not self.enable_html:
raise HTML_IS_DISABLED("HTML content is disabled! See Documentation for more details")

self.html_content = html.unescape(core.fetch(self.url))


def extract_json_from_html(self):
if not self.enable_html:
raise HTML_IS_DISABLED("HTML content is disabled! See Documentation for more details")

soup = BeautifulSoup(self.html_content, 'html.parser')
script_tags = soup.find_all('script', {'type': 'application/ld+json'})

combined_data = {}

for script in script_tags:
json_text = script.string.strip()
try:
data = json.loads(json_text)

except json.decoder.JSONDecodeError:
raise InvalidVideo("""
JSONDecodeError: I need your help to fix this error. Please report the URL you've used on GitHub. Thanks :)""")

combined_data.update(data)

cleaned_dictionary = self.flatten_json(combined_data)
return cleaned_dictionary

def flatten_json(self, nested_json, parent_key='', sep='_'):
"""
Flatten a nested JSON dictionary. Duplicate keys will be overridden.
:param nested_json: The nested JSON dictionary to be flattened.
:param parent_key: The base key to use for the flattened keys.
:param sep: The separator between nested keys.
:return: A flattened dictionary.
"""
items = []
for k, v in nested_json.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(self.flatten_json(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)

@cached_property
def bitrate(self) -> str:
"""Return the bitrate of the video? (I don't know)"""
Expand Down Expand Up @@ -331,18 +336,18 @@ def direct_download_link(self, quality, mode) -> str:

return urljoin("https://eporner.com", str(url))

def download(self, quality, path, callback=None, mode=Encoding.mp4_h264, no_title=False):
async def download(self, quality, path, callback=None, mode=Encoding.mp4_h264, no_title=False):
if not self.enable_html:
raise HTML_IS_DISABLED("HTML content is disabled! See Documentation for more details")

response_redirect_url = core.fetch(self.direct_download_link(quality, mode),
response_redirect_url = await core.fetch(self.direct_download_link(quality, mode),
allow_redirects=True, get_response=True)

if no_title is False:
path = os.path.join(path, f"{self.title}.mp4")

try:
core.legacy_download(url=str(response_redirect_url.url), callback=callback, path=path)
await core.legacy_download(url=str(response_redirect_url.url), callback=callback, path=path)
return True

except Exception:
Expand All @@ -353,26 +358,31 @@ def download(self, quality, path, callback=None, mode=Encoding.mp4_h264, no_titl


class Pornstar:
def __init__(self, url: str, enable_html_scraping: bool = False):
def __init__(self, url: str, enable_html_scraping: bool = False, content: str = None):
self.url = url
self.enable_html_scraping = enable_html_scraping
self.html_content = core.fetch(self.url)
self.html_content = content

@classmethod
async def create(cls, url, enable_html_scraping=False):
return cls(url, enable_html_scraping=enable_html_scraping, content = await core.fetch(url))

def videos(self, pages: int = 0) -> Generator[Video, None, None]:
async def videos(self, pages: int = 0) -> List[Video]:
if pages == 0:
pages = int(self.video_amount) / 37 # One page contains 37 videos

urls = []
for page in range(1, pages):
response = core.fetch(urljoin(self.url + "/", str(page)))
response = await core.fetch(urljoin(self.url + "/", str(page)))
extraction = REGEX_SCRAPE_VIDEO_URLS.findall(response)
for url in extraction:
url = f"https://www.eporner.com{url}"
url = url.replace("EPTHBN/", "")
urls.append(url)

for url in urls:
yield Video(url, enable_html_scraping=self.enable_html_scraping)
video_tasks = [asyncio.create_task(Client.get_video(url, enable_html_scraping=self.enable_html_scraping)) for url in urls]
video_results = await asyncio.gather(*video_tasks)
return video_results

@cached_property
def name(self) -> str:
Expand Down Expand Up @@ -481,41 +491,56 @@ def biography(self) -> str:
class Client:

@classmethod
def get_video(cls, url: str, enable_html_scraping: bool = False) -> Video:
async def get_video(cls, url: str, enable_html_scraping: bool = False) -> Video:
"""Returns the Video object for a given URL"""
return Video(url, enable_html_scraping=enable_html_scraping)
return await Video.create(url, enable_html_scraping=enable_html_scraping)

@classmethod
def search_videos(cls, query: str, sorting_gay: Union[str, Gay], sorting_order: Union[str, Order],
async def search_videos(cls, query: str, sorting_gay: Union[str, Gay], sorting_order: Union[str, Order],
sorting_low_quality: Union[str, LowQuality],
page: int, per_page: int, enable_html_scraping: bool = False) -> Generator[Video, None, None]:
page: int, per_page: int, enable_html_scraping: bool = False) -> List[Video]:

response = core.fetch(f"{ROOT_URL}{API_SEARCH}?query={query}&per_page={per_page}&%page={page}"
response = await core.fetch(f"{ROOT_URL}{API_SEARCH}?query={query}&per_page={per_page}&%page={page}"
f"&thumbsize=medium&order={sorting_order}&gay={sorting_gay}&lq="
f"{sorting_low_quality}&format=json")

json_data = json.loads(response)
video_urls = []

for video_ in json_data.get("videos", []): # Don't know why this works lmao
id_ = video_["url"]
yield Video(id_, enable_html_scraping)
video_urls.append(video_["url"])

video_tasks = [asyncio.create_task(Client.get_video(url=url, enable_html_scraping=enable_html_scraping)) for url
in video_urls]
video_results = await asyncio.gather(*video_tasks)
return video_results


@classmethod
def get_videos_by_category(cls, category: Union[str, Category], enable_html_scraping: bool = False)\
-> Generator[Video, None, None]:
for page in range(100):
response = core.fetch(f"{ROOT_URL}cat/{category}/{page}")
async def get_videos_by_category(cls, category: Union[str, Category], enable_html_scraping: bool = False,
pages: int = 1) -> List[Video]:

video_urls = []

for page in range(pages):
response = await core.fetch(f"{ROOT_URL}cat/{category}/{page}")
extraction = REGEX_SCRAPE_VIDEO_URLS.findall(response)
for url in extraction:
url = f"https://www.eporner.com{url}"
url = url.replace("EPTHBN/", "")
yield Video(url, enable_html_scraping=enable_html_scraping)
video_urls.append(url)

video_tasks = [asyncio.create_task(Client.get_video(url=url, enable_html_scraping=enable_html_scraping)) for url in video_urls]
video_results = await asyncio.gather(*video_tasks)
return video_results


@classmethod
def get_pornstar(cls, url: str, enable_html_scraping: bool = True) -> Pornstar:
return Pornstar(url, enable_html_scraping)
async def get_pornstar(cls, url: str, enable_html_scraping: bool = True) -> Pornstar:
return await Pornstar.create(url, enable_html_scraping)


def main():
async def main():
parser = argparse.ArgumentParser(description="API Command Line Interface")
parser.add_argument("--download", metavar="URL (str)", type=str, help="URL to download from")
parser.add_argument("--quality", metavar="best,half,worst", type=str, help="The video quality (best,half,worst)",
Expand All @@ -532,8 +557,8 @@ def main():

if args.download:
client = Client()
video = client.get_video(args.download, enable_html_scraping=True)
video.download(quality=args.quality, path=args.output, no_title=no_title)
video = await client.get_video(args.download, enable_html_scraping=True)
await video.download(quality=args.quality, path=args.output, no_title=no_title)

if args.file:
videos = []
Expand All @@ -543,11 +568,11 @@ def main():
content = file.read().splitlines()

for url in content:
videos.append(client.get_video(url, enable_html_scraping=True))
videos.append(await client.get_video(url, enable_html_scraping=True))

for video in videos:
video.download(quality=args.quality, path=args.output, no_title=no_title)
await video.download(quality=args.quality, path=args.output, no_title=no_title)


if __name__ == "__main__":
main()
asyncio.run(main())
Loading

0 comments on commit b647a70

Please sign in to comment.