Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
DaniD3v committed Apr 26, 2023
0 parents commit 65b9da9
Show file tree
Hide file tree
Showing 11 changed files with 370 additions and 0 deletions.
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2023-2024 DaniD3v

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# digiDownload
API to download books from [http://digi4school.at](http://digi4school.at)
`pip install digiDownload`

# Console Menu
built-in cli menu:
`python -m digiDownload`

```
Select the books you want to download:
1: [ ] Mathematik mit technischen Anwendungen
2: [x] das deutschbuch.
R: Register new book.
F: Finish selection.
```

# Async
This library makes extensive use of asyncio, allowing your code to be more efficient.

# Future plans
Add synchronous Book/Session class wrappers to make this more accessible for beginners.
Allow for downloading all the volumes of an E-Book instead of simply using the first one.

# Compatibility
Due to the inconsistency of digi4school this library only supports a limited set of books.
Because I can only test the library with the books I have access to, I don't even know which books will work.

- Books hosted directly on digi4school.at or hpthek.at will likely work
- there is limited compatibility with books that have multiple volumes
6 changes: 6 additions & 0 deletions digiDownload/AdBlockCookiePolicy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from http.cookiejar import DefaultCookiePolicy, Cookie


class AdBlockPolicy(DefaultCookiePolicy):
def set_ok(self, cookie: Cookie, _) -> bool:
return cookie.name != "ad_session_id"
149 changes: 149 additions & 0 deletions digiDownload/Book.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from digiDownload.LTIParser import LTIForm

from httpx import AsyncClient, Response
from bs4 import BeautifulSoup
from svglib.svglib import svg2rlg
from reportlab.graphics import renderPDF
from reportlab.pdfgen.canvas import Canvas
from PyPDF2 import PdfMerger
from io import BytesIO

from base64 import encodebytes
import asyncio


def _increment_page(page: str or int):
return page+1 if isinstance(page, int) else page


def get_digi4school_url(book_id: str, extra: str):
return lambda page, ending: f"https://a.digi4school.at/ebook/{book_id}/{extra}{_increment_page(page)}{ending}"


def get_hpthek_url(book_id: str, extra: str):
return lambda page, ending: f"https://a.hpthek.at/ebook/{book_id}/{_increment_page(page)}{'/' if page != '' else ''}{extra}{_increment_page(page)}{ending}"


class Book:
urls = {
"a.digi4school.at": get_digi4school_url,
"a.hpthek.at": get_hpthek_url
}

def __init__(self, client: AsyncClient):
self._client = client

self.publisher = None
self.title = None
self.cover = None

self._code = None
self._id = None
self._content_id = None

self._url = None
self._pages = None

@classmethod
async def create(cls, client: AsyncClient, html: BeautifulSoup) -> "Book" or list["Book"] or None:
self = cls(client)

self.publisher = html.find("span", {"class": "publisher"}).text
self.title = html.find("h1").text
self.cover = html.find("img")["src"]

self._code = html["data-code"]
self._id = html["data-id"]

resp = LTIForm((await client.get(f"https://digi4school.at/ebook/{self._code}")).text)
first_form = LTIForm((await resp.send(client)).text)
second_form = (await first_form.send(client))

self._content_id = first_form["resource_link_id"]

try: self._url = Book.urls[second_form.url.host](self._content_id, "")
except KeyError: print(f"Undocumented url: {second_form.url.host} (Book: {self.title})\nPlease open a Github issue with this url and the book title."); return None

main_page = (await client.get(self._url("", ""))).text # don't remove the / at the end of the url
if main_page.split('\n')[0] == "<html>": # checks if there are multiple volumes
soup = BeautifulSoup(main_page, "html.parser")
extra = '/'.join(soup.find("a")["href"].split("/")[:-1]) + '/'

self._url = Book.urls[second_form.url.host](self._content_id, extra)
main_page = (await client.get(self._url("", ""))).text

# TODO actually make multiple volumes work instead of simply taking the first one

soup = BeautifulSoup(main_page, "html.parser").find("meta", {"name": "pageLabels"})
if soup is not None: self._pages = soup['content'].count(',')
else:
pos = main_page.find("IDRViewer.makeNavBar(")
if pos == -1: print(f"Couldn't find the page count. (Book: {self.title})\nPlease open a Github issue with the book title."); return None
self._pages = int(main_page[pos:].split('(')[1].split(',')[0])

return self

async def _get_page(self, page: int) -> Response:
return await self._client.get(self._url(page, ".svg"))

async def _get_images(self, page: int, svg: BeautifulSoup) -> [tuple[BeautifulSoup, Response], None, None]:
queue = []
images = svg.find_all("image")

for image in images:
url_ending = image["xlink:href"]
if url_ending.count('/') == 2: url_ending = '/'.join(url_ending.split('/')[1:])

url = self._url(page, '/' + url_ending)
queue.append(asyncio.create_task(self._client.get(url, headers={"Content-Type": "image/avif,image/webp,*/*"})))

for resp in queue:
image = images[queue.index(resp)]
resp = await resp
if resp.headers["Content-Type"].startswith("image/"): yield image, resp

async def get_page_svg(self, page: int) -> str:
soup = BeautifulSoup((await self._get_page(page)).text, "xml")

async for image, resp in self._get_images(page, soup):
image["xlink:href"] = f"data:{resp.headers['Content-Type']};base64,{encodebytes(resp.content).decode('utf-8')}"

return str(soup)

async def get_page_pdf(self, page: int) -> BytesIO or None:
svg = await self.get_page_svg(page)

buffer = BytesIO()
try:
rlg = svg2rlg(BytesIO(svg.encode("utf-8")))
renderPDF.drawToFile(rlg, buffer)

except AttributeError:
canvas = Canvas(buffer)
canvas.save()

return buffer

async def get_pdf(self, show_progress: bool = False) -> BytesIO:
merger = PdfMerger()
queue = []

async def progress_updater():
while True:
finished = 0
for task in queue: finished += 1 if task.done() else 0

print(f"Downloading {self.title}: {finished/(self._pages+1)*100:.2f}% ({finished}/{self._pages+1})", end='\r')
if finished == self._pages: break
await asyncio.sleep(1)

if show_progress: asyncio.create_task(progress_updater())

for page in range(self._pages): queue.append(asyncio.create_task(self.get_page_pdf(page)))
for resp in queue:
result = await resp
if result is not None: merger.append(result)

buffer = BytesIO()
merger.write(buffer)
return buffer
23 changes: 23 additions & 0 deletions digiDownload/LTIParser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from digiDownload.exceptions import NotAnLtiLaunchForm

from httpx import AsyncClient, Response
from bs4 import BeautifulSoup


class LTIForm:
def __init__(self, content: str):
soup = BeautifulSoup(content, "html.parser")

if soup.form["name"] != "ltiLaunchForm": raise NotAnLtiLaunchForm("Not a lti launch form.")

self.url = soup.form["action"]
self.method = soup.form["method"]
self.content_type = soup.form["enctype"]

self.data = {s['name']: s['value'] for s in soup.find_all("input")}

def __getitem__(self, item: str) -> str:
return self.data[item]

async def send(self, client: AsyncClient) -> Response:
return await client.request(self.method, self.url, headers={"Content-Type": self.content_type}, data=self.data)
52 changes: 52 additions & 0 deletions digiDownload/Session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from digiDownload.AdBlockCookiePolicy import AdBlockPolicy
from digiDownload.exceptions import InvalidCredentials
from digiDownload.Book import Book

import httpx
from bs4 import BeautifulSoup

import asyncio
from http import cookiejar


class Session:
def __init__(self, client: httpx.AsyncClient):
self._client = client

@classmethod
async def create(cls, email: str, password: str, remember_login: bool = False):
client = httpx.AsyncClient(cookies=cookiejar.CookieJar(policy=AdBlockPolicy()), timeout=15)
resp = await client.post("https://digi4school.at/br/xhr/login",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={"email": email, "password": password, "indefinite": int(remember_login)})

if resp.status_code != 200 or resp.content != b"OK":
raise InvalidCredentials(f"Login failed. Are you sure you entered the correct credentials? {resp.status_code}: {resp.reason_phrase}")

return cls(client)

async def get_books(self) -> list[Book]:
resp = await self._client.get("https://digi4school.at/ebooks")
soup = BeautifulSoup(resp.text, "html.parser")

queue = []

for book in soup.find("div", {"id": "shelf"}):
queue.append(asyncio.create_task(Book.create(self._client, book)))

for result in queue:
result = await result
if isinstance(result, list):
for volume in result: yield volume
elif result is not None: yield result

async def redeem_code(self, code: str) -> str:
resp = (await self._client.post("https://digi4school.at/br/xhr/einloesen",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={"code": code})).json()

if resp["err"] != 0:
if "msg" not in resp: return "Unknown Error"
return resp["msg"].split(':')[1][1:]

return f"Successfully redeemed {code[:4]}-{code[4:8]}-{code[8:12]}-{code[12:16]}"
9 changes: 9 additions & 0 deletions digiDownload/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

if __name__ == "__main__":
from digiDownload.cli_tool import run
from asyncio import run as run_async
run_async(run())
exit(0)

import digiDownload.Session
import digiDownload.exceptions
49 changes: 49 additions & 0 deletions digiDownload/cli_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from digiDownload.Session import Session

import os
from getpass import getpass


async def run():
try: session = await Session.create(os.environ["email"], os.environ["password"])
except KeyError: session = await Session.create(input("EMail: "), getpass("Password: "))
books = [(b, False) async for b in session.get_books()]

path = f"{os.getcwd()}"
if not os.path.exists(path): os.mkdir(path)

def menu(books: list) -> bool: # False -> continue, True -> finish
print("\nSelect the books you want to download:")
for i, (b, s) in enumerate(books): print(f"{i + 1}: [{'x' if s else ' '}] {b.title}")
print("R: Register new book.")
print("F: Finish selection.")
print("Q: Exit")

selection = input(": ")
if selection.isnumeric():
selection = int(selection) - 1

try: books[selection] = (books[selection][0], not books[selection][1])
except IndexError: return False

else:
match selection.lower():
case 'r':
err = session.redeem_code(input("code: "))
if err is not None: print(err)
# noinspection PyUnusedLocal
books = [(b, False) for b in session.get_books()]
case 'f': return True
case 'q': exit(0)

return False

while not menu(books): pass

for book in [b for b, s in books if s]:
book_content = (await book.get_pdf(True)).getbuffer().tobytes()

with open(os.path.join(path, f"{book.title.replace('/', '')}.pdf"), "w+b") as f:
f.write(book_content)

print(f"\nDownloaded {book.title}")
3 changes: 3 additions & 0 deletions digiDownload/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

class InvalidCredentials(Exception): pass
class NotAnLtiLaunchForm(Exception): pass
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[metadata]
description-file = README.md
license_file = LICENSE
25 changes: 25 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import setuptools

setuptools.setup(
name="digiDownload",
url="https://github.com/DaniD3v/digiDownload",
author="DaniD3v",

description="API to download books from digi4school.at.",
keywords=["digi4school", "books", "api"],

version="1.0.2",
license='MIT',

packages=["digiDownload"],
install_requires=[
"httpx",
"lxml",
"reportlab",
"PyPDF2",
"svglib"
"beautifulsoup4"
],

download_url='https://github.com/DaniD3v/digiDownload/archive/refs/tags/1.0.2.tar.gz',
)

0 comments on commit 65b9da9

Please sign in to comment.