From c96c05392dbb583e7d0b8ab827f5efcfe312c2c0 Mon Sep 17 00:00:00 2001
From: crocodilestick <105450872+crocodilestick@users.noreply.github.com>
Date: Sat, 4 Jan 2025 14:57:13 +0100
Subject: [PATCH] Fixed advanced search css & added total book count to heading
on Books page as suggested by jadehawk in Issue #153
---
root/app/calibre-web/cps/cwa_functions.py | 2 -
.../app/calibre-web/cps/templates/layout.html | 2 +-
root/app/calibre-web/cps/web.py | 1695 +++++++++++++++++
3 files changed, 1696 insertions(+), 3 deletions(-)
create mode 100644 root/app/calibre-web/cps/web.py
diff --git a/root/app/calibre-web/cps/cwa_functions.py b/root/app/calibre-web/cps/cwa_functions.py
index 50ab5d6..c1fa588 100644
--- a/root/app/calibre-web/cps/cwa_functions.py
+++ b/root/app/calibre-web/cps/cwa_functions.py
@@ -393,8 +393,6 @@ def kill_epub_fixer(queue):
os.remove(tempfile.gettempdir() + '/kindle_epub_fixer.lock')
except FileNotFoundError:
...
- # # Empty tmp conversion dir of half finished files
- # empty_tmp_con_dir(get_tmp_conversion_dir())
# Remove the trigger file that triggered this block
try:
os.remove(trigger_file)
diff --git a/root/app/calibre-web/cps/templates/layout.html b/root/app/calibre-web/cps/templates/layout.html
index 66a219f..dad5901 100644
--- a/root/app/calibre-web/cps/templates/layout.html
+++ b/root/app/calibre-web/cps/templates/layout.html
@@ -59,7 +59,7 @@
{% if current_user.is_authenticated or g.allow_anonymous %}
{% endif %}
diff --git a/root/app/calibre-web/cps/web.py b/root/app/calibre-web/cps/web.py
new file mode 100644
index 0000000..558d768
--- /dev/null
+++ b/root/app/calibre-web/cps/web.py
@@ -0,0 +1,1695 @@
+# This file is part of the Calibre-Web (https://github.com/janeczku/calibre-web)
+# Copyright (C) 2018-2019 OzzieIsaacs, cervinko, jkrehm, bodybybuddha, ok11,
+# andy29485, idalin, Kyosfonica, wuqi, Kennyl, lemmsh,
+# falgh1, grunjol, csitko, ytils, xybydy, trasba, vrabe,
+# ruben-herold, marblepebble, JackED42, SiphonSquirrel,
+# apetresc, nanu-c, mutschler
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import os
+import json
+import mimetypes
+import chardet # dependency of requests
+import copy
+import importlib
+
+# CWA Imports
+import sqlite3
+import json
+
+from flask import Blueprint, jsonify
+from flask import request, redirect, send_from_directory, make_response, flash, abort, url_for, Response
+from flask import session as flask_session
+from flask_babel import gettext as _
+from flask_babel import get_locale
+from .cw_login import login_user, logout_user, current_user
+from flask_limiter import RateLimitExceeded
+from flask_limiter.util import get_remote_address
+from sqlalchemy.exc import IntegrityError, InvalidRequestError, OperationalError
+from sqlalchemy.sql.expression import text, func, false, not_, and_, or_
+from sqlalchemy.orm.attributes import flag_modified
+from sqlalchemy.sql.functions import coalesce
+from werkzeug.datastructures import Headers
+from werkzeug.security import generate_password_hash, check_password_hash
+
+from . import constants, logger, isoLanguages, services
+from . import db, ub, config, app
+from . import calibre_db, kobo_sync_status
+from .search import render_search_results, render_adv_search_results
+from .gdriveutils import getFileFromEbooksFolder, do_gdrive_download
+from .helper import check_valid_domain, check_email, check_username, \
+ get_book_cover, get_series_cover_thumbnail, get_download_link, send_mail, generate_random_password, \
+ send_registration_mail, check_send_to_ereader, check_read_formats, tags_filters, reset_password, valid_email, \
+ edit_book_read_status, valid_password
+from .pagination import Pagination
+from .redirect import get_redirect_location
+from .cw_babel import get_available_locale
+from .usermanagement import login_required_if_no_ano
+from .kobo_sync_status import remove_synced_book
+from .render_template import render_title_template
+from .kobo_sync_status import change_archived_books
+from . import limiter
+from .services.worker import WorkerThread
+from .tasks_status import render_task_status
+from .usermanagement import user_login_required
+from .string_helper import strip_whitespaces
+
+
+feature_support = {
+ 'ldap': bool(services.ldap),
+ 'goodreads': bool(services.goodreads_support),
+ 'kobo': bool(services.kobo)
+}
+
+try:
+ from .oauth_bb import oauth_check, register_user_with_oauth, logout_oauth_user, get_oauth_status
+
+ feature_support['oauth'] = True
+except ImportError:
+ feature_support['oauth'] = False
+ oauth_check = {}
+ register_user_with_oauth = logout_oauth_user = get_oauth_status = None
+
+from functools import wraps
+
+try:
+ from natsort import natsorted as sort
+except ImportError:
+ sort = sorted # Just use regular sort then, may cause issues with badly named pages in cbz/cbr files
+
+
+sql_version = importlib.metadata.version("sqlalchemy")
+sqlalchemy_version2 = ([int(x) for x in sql_version.split('.')] >= [2, 0, 0])
+
+
+@app.after_request
+def add_security_headers(resp):
+ default_src = ([host.strip() for host in config.config_trustedhosts.split(',') if host] +
+ ["'self'", "'unsafe-inline'", "'unsafe-eval'"])
+ csp = "default-src " + ' '.join(default_src)
+ if request.endpoint == "web.read_book" and config.config_use_google_drive:
+ csp +=" blob: "
+ csp += "; font-src 'self' data:"
+ if request.endpoint == "web.read_book":
+ csp += " blob: "
+ csp += "; img-src 'self'"
+ if request.path.startswith("/author/") and config.config_use_goodreads:
+ csp += " images.gr-assets.com i.gr-assets.com s.gr-assets.com"
+ csp += " data:"
+ if request.endpoint == "edit-book.show_edit_book" or config.config_use_google_drive:
+ csp += " *"
+ if request.endpoint == "web.read_book":
+ csp += " blob: ; style-src-elem 'self' blob: 'unsafe-inline'"
+ csp += "; object-src 'none';"
+ resp.headers['Content-Security-Policy'] = csp
+ resp.headers['X-Content-Type-Options'] = 'nosniff'
+ resp.headers['X-Frame-Options'] = 'SAMEORIGIN'
+ resp.headers['X-XSS-Protection'] = '1; mode=block'
+ resp.headers['Strict-Transport-Security'] = 'max-age=31536000';
+ return resp
+
+
+web = Blueprint('web', __name__)
+
+log = logger.create()
+
+
+# ################################### Login logic and rights management ###############################################
+
+
+def download_required(f):
+ @wraps(f)
+ def inner(*args, **kwargs):
+ if current_user.role_download():
+ return f(*args, **kwargs)
+ abort(403)
+
+ return inner
+
+
+def viewer_required(f):
+ @wraps(f)
+ def inner(*args, **kwargs):
+ if current_user.role_viewer():
+ return f(*args, **kwargs)
+ abort(403)
+
+ return inner
+
+
+# ################################### data provider functions #########################################################
+
+
+@web.route("/ajax/emailstat")
+@user_login_required
+def get_email_status_json():
+ tasks = WorkerThread.get_instance().tasks
+ return jsonify(render_task_status(tasks))
+
+
+@web.route("/ajax/bookmark//", methods=['POST'])
+@user_login_required
+def set_bookmark(book_id, book_format):
+ bookmark_key = request.form["bookmark"]
+ ub.session.query(ub.Bookmark).filter(and_(ub.Bookmark.user_id == int(current_user.id),
+ ub.Bookmark.book_id == book_id,
+ ub.Bookmark.format == book_format)).delete()
+ if not bookmark_key:
+ ub.session_commit()
+ return "", 204
+
+ l_bookmark = ub.Bookmark(user_id=current_user.id,
+ book_id=book_id,
+ format=book_format,
+ bookmark_key=bookmark_key)
+ ub.session.merge(l_bookmark)
+ ub.session_commit("Bookmark for user {} in book {} created".format(current_user.id, book_id))
+ return "", 201
+
+
+@web.route("/ajax/toggleread/", methods=['POST'])
+@user_login_required
+def toggle_read(book_id):
+ message = edit_book_read_status(book_id)
+ if message:
+ return message, 400
+ else:
+ return message
+
+
+@web.route("/ajax/togglearchived/", methods=['POST'])
+@user_login_required
+def toggle_archived(book_id):
+ change_archived_books(book_id, message="Book {} archive bit toggled".format(book_id))
+ # Remove book from syncd books list to force resync (?)
+ remove_synced_book(book_id)
+ return ""
+
+
+@web.route("/ajax/view", methods=["POST"])
+@login_required_if_no_ano
+def update_view():
+ to_save = request.get_json()
+ try:
+ for element in to_save:
+ for param in to_save[element]:
+ current_user.set_view_property(element, param, to_save[element][param])
+ except Exception as ex:
+ log.error("Could not save view_settings: %r %r: %e", request, to_save, ex)
+ return "Invalid request", 400
+ return "1", 200
+
+
+'''
+@web.route("/ajax/getcomic///")
+@user_login_required
+def get_comic_book(book_id, book_format, page):
+ book = calibre_db.get_book(book_id)
+ if not book:
+ return "", 204
+ else:
+ for bookformat in book.data:
+ if bookformat.format.lower() == book_format.lower():
+ cbr_file = os.path.join(config.config_calibre_dir, book.path, bookformat.name) + "." + book_format
+ if book_format in ("cbr", "rar"):
+ if feature_support['rar'] == True:
+ rarfile.UNRAR_TOOL = config.config_rarfile_location
+ try:
+ rf = rarfile.RarFile(cbr_file)
+ names = sort(rf.namelist())
+ extract = lambda page: rf.read(names[page])
+ except:
+ # rarfile not valid
+ log.error('Unrar binary not found, or unable to decompress file %s', cbr_file)
+ return "", 204
+ else:
+ log.info('Unrar is not supported please install python rarfile extension')
+ # no support means return nothing
+ return "", 204
+ elif book_format in ("cbz", "zip"):
+ zf = zipfile.ZipFile(cbr_file)
+ names=sort(zf.namelist())
+ extract = lambda page: zf.read(names[page])
+ elif book_format in ("cbt", "tar"):
+ tf = tarfile.TarFile(cbr_file)
+ names=sort(tf.getnames())
+ extract = lambda page: tf.extractfile(names[page]).read()
+ else:
+ log.error('unsupported comic format')
+ return "", 204
+
+ b64 = codecs.encode(extract(page), 'base64').decode()
+ ext = names[page].rpartition('.')[-1]
+ if ext not in ('png', 'gif', 'jpg', 'jpeg', 'webp'):
+ ext = 'png'
+ extractedfile="data:image/" + ext + ";base64," + b64
+ fileData={"name": names[page], "page":page, "last":len(names)-1, "content": extractedfile}
+ return make_response(json.dumps(fileData))
+ return "", 204
+'''
+
+
+# ################################### Typeahead ##################################################################
+
+
+@web.route("/get_authors_json", methods=['GET'])
+@login_required_if_no_ano
+def get_authors_json():
+ return calibre_db.get_typeahead(db.Authors, request.args.get('q'), ('|', ','))
+
+
+@web.route("/get_publishers_json", methods=['GET'])
+@login_required_if_no_ano
+def get_publishers_json():
+ return calibre_db.get_typeahead(db.Publishers, request.args.get('q'), ('|', ','))
+
+
+@web.route("/get_tags_json", methods=['GET'])
+@login_required_if_no_ano
+def get_tags_json():
+ return calibre_db.get_typeahead(db.Tags, request.args.get('q'), tag_filter=tags_filters())
+
+
+@web.route("/get_series_json", methods=['GET'])
+@login_required_if_no_ano
+def get_series_json():
+ return calibre_db.get_typeahead(db.Series, request.args.get('q'))
+
+
+@web.route("/get_languages_json", methods=['GET'])
+@login_required_if_no_ano
+def get_languages_json():
+ query = (request.args.get('q') or '').lower()
+ language_names = isoLanguages.get_language_names(get_locale())
+ entries_start = [s for key, s in language_names.items() if s.lower().startswith(query.lower())]
+ if len(entries_start) < 5:
+ entries = [s for key, s in language_names.items() if query in s.lower()]
+ entries_start.extend(entries[0:(5 - len(entries_start))])
+ entries_start = list(set(entries_start))
+ json_dumps = json.dumps([dict(name=r) for r in entries_start[0:5]])
+ return json_dumps
+
+
+@web.route("/get_matching_tags", methods=['GET'])
+@login_required_if_no_ano
+def get_matching_tags():
+ tag_dict = {'tags': []}
+ q = calibre_db.session.query(db.Books).filter(calibre_db.common_filters(True))
+ calibre_db.create_functions()
+ # calibre_db.session.connection().connection.connection.create_function("lower", 1, db.lcase)
+ author_input = request.args.get('authors') or ''
+ title_input = request.args.get('title') or ''
+ include_tag_inputs = request.args.getlist('include_tag') or ''
+ exclude_tag_inputs = request.args.getlist('exclude_tag') or ''
+ q = q.filter(db.Books.authors.any(func.lower(db.Authors.name).ilike("%" + author_input + "%")),
+ func.lower(db.Books.title).ilike("%" + title_input + "%"))
+ if len(include_tag_inputs) > 0:
+ for tag in include_tag_inputs:
+ q = q.filter(db.Books.tags.any(db.Tags.id == tag))
+ if len(exclude_tag_inputs) > 0:
+ for tag in exclude_tag_inputs:
+ q = q.filter(not_(db.Books.tags.any(db.Tags.id == tag)))
+ for book in q:
+ for tag in book.tags:
+ if tag.id not in tag_dict['tags']:
+ tag_dict['tags'].append(tag.id)
+ json_dumps = json.dumps(tag_dict)
+ return json_dumps
+
+
+def generate_char_list(entries): # data_colum, db_link):
+ char_list = list()
+ for entry in entries:
+ upper_char = entry[0].name[0].upper()
+ if upper_char not in char_list:
+ char_list.append(upper_char)
+ return char_list
+
+
+def query_char_list(data_colum, db_link):
+ results = (calibre_db.session.query(func.upper(func.substr(data_colum, 1, 1)).label('char'))
+ .join(db_link).join(db.Books).filter(calibre_db.common_filters())
+ .group_by(func.upper(func.substr(data_colum, 1, 1))).all())
+ return results
+
+
+def get_sort_function(sort_param, data):
+ order = [db.Books.timestamp.desc()]
+ if sort_param == 'stored':
+ sort_param = current_user.get_view_property(data, 'stored')
+ else:
+ current_user.set_view_property(data, 'stored', sort_param)
+ if sort_param == 'pubnew':
+ order = [db.Books.pubdate.desc()]
+ if sort_param == 'pubold':
+ order = [db.Books.pubdate]
+ if sort_param == 'abc':
+ order = [db.Books.sort]
+ if sort_param == 'zyx':
+ order = [db.Books.sort.desc()]
+ if sort_param == 'new':
+ order = [db.Books.timestamp.desc()]
+ if sort_param == 'old':
+ order = [db.Books.timestamp]
+ if sort_param == 'authaz':
+ order = [db.Books.author_sort.asc(), db.Series.name, db.Books.series_index]
+ if sort_param == 'authza':
+ order = [db.Books.author_sort.desc(), db.Series.name.desc(), db.Books.series_index.desc()]
+ if sort_param == 'seriesasc':
+ order = [db.Books.series_index.asc()]
+ if sort_param == 'seriesdesc':
+ order = [db.Books.series_index.desc()]
+ if sort_param == 'hotdesc':
+ order = [func.count(ub.Downloads.book_id).desc()]
+ if sort_param == 'hotasc':
+ order = [func.count(ub.Downloads.book_id).asc()]
+ if sort_param is None:
+ sort_param = "new"
+ return order, sort_param
+
+
+def cwa_get_library_location() -> str:
+ dirs = {}
+ with open('/app/calibre-web-automated/dirs.json', 'r') as f:
+ dirs: dict[str, str] = json.load(f)
+ library_dir = f"{dirs['calibre_library_dir']}/"
+ return library_dir
+
+def cwa_get_num_books_in_library() -> int:
+ try:
+ # Path to user's Calibre library's metadata.db
+ db_path = cwa_get_library_location() + "metadata.db"
+ # Connect to the SQLite database
+ conn = sqlite3.connect(db_path)
+ cursor = conn.cursor()
+ # Query the number of books
+ cursor.execute("SELECT COUNT(*) FROM books")
+ count = cursor.fetchone()[0]
+ # Close the connection
+ conn.close()
+ # Return the result
+ return count
+ except Exception:
+ return 0
+
+
+def render_books_list(data, sort_param, book_id, page):
+ order = get_sort_function(sort_param, data)
+ if data == "rated":
+ return render_rated_books(page, book_id, order=order)
+ elif data == "discover":
+ return render_discover_books(book_id)
+ elif data == "unread":
+ return render_read_books(page, False, order=order)
+ elif data == "read":
+ return render_read_books(page, True, order=order)
+ elif data == "hot":
+ return render_hot_books(page, order)
+ elif data == "download":
+ return render_downloaded_books(page, order, book_id)
+ elif data == "author":
+ return render_author_books(page, book_id, order)
+ elif data == "publisher":
+ return render_publisher_books(page, book_id, order)
+ elif data == "series":
+ return render_series_books(page, book_id, order)
+ elif data == "ratings":
+ return render_ratings_books(page, book_id, order)
+ elif data == "formats":
+ return render_formats_books(page, book_id, order)
+ elif data == "category":
+ return render_category_books(page, book_id, order)
+ elif data == "language":
+ return render_language_books(page, book_id, order)
+ elif data == "archived":
+ return render_archived_books(page, order)
+ elif data == "search":
+ term = request.args.get('query', None)
+ offset = int(int(config.config_books_per_page) * (page - 1))
+ return render_search_results(term, offset, order, config.config_books_per_page)
+ elif data == "advsearch":
+ term = json.loads(flask_session.get('query', '{}'))
+ offset = int(int(config.config_books_per_page) * (page - 1))
+ return render_adv_search_results(term, offset, order, config.config_books_per_page)
+ else:
+ website = data or "newest"
+ entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, True, order[0],
+ True, config.config_read_column,
+ db.books_series_link,
+ db.Books.id == db.books_series_link.c.book,
+ db.Series)
+ return render_title_template('index.html', random=random, entries=entries, pagination=pagination,
+ title=_(f"Books ({cwa_get_num_books_in_library():,})"), page=website, order=order[1])
+
+
+def render_rated_books(page, book_id, order):
+ if current_user.check_visibility(constants.SIDEBAR_BEST_RATED):
+ entries, random, pagination = calibre_db.fill_indexpage(page, 0,
+ db.Books,
+ db.Books.ratings.any(db.Ratings.rating > 9),
+ order[0],
+ True, config.config_read_column,
+ db.books_series_link,
+ db.Books.id == db.books_series_link.c.book,
+ db.Series)
+
+ return render_title_template('index.html', random=random, entries=entries, pagination=pagination,
+ id=book_id, title=_("Top Rated Books"), page="rated", order=order[1])
+ else:
+ abort(404)
+
+
+def render_discover_books(book_id):
+ if current_user.check_visibility(constants.SIDEBAR_RANDOM):
+ entries, __, ___ = calibre_db.fill_indexpage(1, 0, db.Books, True, [func.randomblob(2)],
+ join_archive_read=True,
+ config_read_column=config.config_read_column)
+ pagination = Pagination(1, config.config_books_per_page, config.config_books_per_page)
+ return render_title_template('index.html', random=false(), entries=entries, pagination=pagination, id=book_id,
+ title=_("Discover (Random Books)"), page="discover")
+ else:
+ abort(404)
+
+
+def render_hot_books(page, order):
+ if current_user.check_visibility(constants.SIDEBAR_HOT):
+ if order[1] not in ['hotasc', 'hotdesc']:
+ # Unary expression comparison only working (for this expression) in sqlalchemy 1.4+
+ # if not (order[0][0].compare(func.count(ub.Downloads.book_id).desc()) or
+ # order[0][0].compare(func.count(ub.Downloads.book_id).asc())):
+ order = [func.count(ub.Downloads.book_id).desc()], 'hotdesc'
+ if current_user.show_detail_random():
+ random_query = calibre_db.generate_linked_query(config.config_read_column, db.Books)
+ random = (random_query.filter(calibre_db.common_filters())
+ .order_by(func.random())
+ .limit(config.config_random_books).all())
+ else:
+ random = false()
+
+ off = int(int(config.config_books_per_page) * (page - 1))
+ all_books = ub.session.query(ub.Downloads, func.count(ub.Downloads.book_id)) \
+ .order_by(*order[0]).group_by(ub.Downloads.book_id)
+ hot_books = all_books.offset(off).limit(config.config_books_per_page)
+ entries = list()
+ for book in hot_books:
+ query = calibre_db.generate_linked_query(config.config_read_column, db.Books)
+ download_book = query.filter(calibre_db.common_filters()).filter(
+ book.Downloads.book_id == db.Books.id).first()
+ if download_book:
+ entries.append(download_book)
+ else:
+ ub.delete_download(book.Downloads.book_id)
+ num_books = entries.__len__()
+ pagination = Pagination(page, config.config_books_per_page, num_books)
+ return render_title_template('index.html', random=random, entries=entries, pagination=pagination,
+ title=_("Hot Books (Most Downloaded)"), page="hot", order=order[1])
+ else:
+ abort(404)
+
+
+def render_downloaded_books(page, order, user_id):
+ if current_user.role_admin():
+ user_id = int(user_id)
+ else:
+ user_id = current_user.id
+ user = ub.session.query(ub.User).filter(ub.User.id == user_id).first()
+ if current_user.check_visibility(constants.SIDEBAR_DOWNLOAD) and user:
+ entries, random, pagination = calibre_db.fill_indexpage(page,
+ 0,
+ db.Books,
+ ub.Downloads.user_id == user_id,
+ order[0],
+ True, config.config_read_column,
+ db.books_series_link,
+ db.Books.id == db.books_series_link.c.book,
+ db.Series,
+ ub.Downloads, db.Books.id == ub.Downloads.book_id)
+ for book in entries:
+ if not (calibre_db.session.query(db.Books).filter(calibre_db.common_filters())
+ .filter(db.Books.id == book.Books.id).first()):
+ ub.delete_download(book.Books.id)
+ return render_title_template('index.html',
+ random=random,
+ entries=entries,
+ pagination=pagination,
+ id=user_id,
+ title=_("Downloaded books by %(user)s", user=user.name),
+ page="download",
+ order=order[1])
+ else:
+ abort(404)
+
+
+def render_author_books(page, author_id, order):
+ entries, __, pagination = calibre_db.fill_indexpage(page, 0,
+ db.Books,
+ db.Books.authors.any(db.Authors.id == author_id),
+ [order[0][0], db.Series.name, db.Books.series_index],
+ True, config.config_read_column,
+ db.books_series_link,
+ db.books_series_link.c.book == db.Books.id,
+ db.Series)
+ if entries is None or not len(entries):
+ flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"),
+ category="error")
+ return redirect(url_for("web.index"))
+ if sqlalchemy_version2:
+ author = calibre_db.session.get(db.Authors, author_id)
+ else:
+ author = calibre_db.session.query(db.Authors).get(author_id)
+ author_name = author.name.replace('|', ',')
+
+ author_info = None
+ other_books = []
+ if services.goodreads_support and config.config_use_goodreads:
+ author_info = services.goodreads_support.get_author_info(author_name)
+ book_entries = [entry.Books for entry in entries]
+ other_books = services.goodreads_support.get_other_books(author_info, book_entries)
+ return render_title_template('author.html', entries=entries, pagination=pagination, id=author_id,
+ title=_("Author: %(name)s", name=author_name), author=author_info,
+ other_books=other_books, page="author", order=order[1])
+
+
+def render_publisher_books(page, book_id, order):
+ if book_id == '-1':
+ entries, random, pagination = calibre_db.fill_indexpage(page, 0,
+ db.Books,
+ db.Publishers.name == None,
+ [db.Series.name, order[0][0], db.Books.series_index],
+ True, config.config_read_column,
+ db.books_publishers_link,
+ db.Books.id == db.books_publishers_link.c.book,
+ db.Publishers,
+ db.books_series_link,
+ db.Books.id == db.books_series_link.c.book,
+ db.Series)
+ publisher = _("None")
+ else:
+ publisher = calibre_db.session.query(db.Publishers).filter(db.Publishers.id == book_id).first()
+ if publisher:
+ entries, random, pagination = calibre_db.fill_indexpage(page, 0,
+ db.Books,
+ db.Books.publishers.any(
+ db.Publishers.id == book_id),
+ [db.Series.name, order[0][0],
+ db.Books.series_index],
+ True, config.config_read_column,
+ db.books_series_link,
+ db.Books.id == db.books_series_link.c.book,
+ db.Series)
+ publisher = publisher.name
+ else:
+ abort(404)
+
+ return render_title_template('index.html', random=random, entries=entries, pagination=pagination, id=book_id,
+ title=_("Publisher: %(name)s", name=publisher),
+ page="publisher",
+ order=order[1])
+
+
+def render_series_books(page, book_id, order):
+ if book_id == '-1':
+ entries, random, pagination = calibre_db.fill_indexpage(page, 0,
+ db.Books,
+ db.Series.name == None,
+ [order[0][0]],
+ True, config.config_read_column,
+ db.books_series_link,
+ db.Books.id == db.books_series_link.c.book,
+ db.Series)
+ series_name = _("None")
+ else:
+ series_name = calibre_db.session.query(db.Series).filter(db.Series.id == book_id).first()
+ if series_name:
+ entries, random, pagination = calibre_db.fill_indexpage(page, 0,
+ db.Books,
+ db.Books.series.any(db.Series.id == book_id),
+ [order[0][0]],
+ True, config.config_read_column)
+ series_name = series_name.name
+ else:
+ abort(404)
+ return render_title_template('index.html', random=random, pagination=pagination, entries=entries, id=book_id,
+ title=_("Series: %(serie)s", serie=series_name), page="series", order=order[1])
+
+
+def render_ratings_books(page, book_id, order):
+ if book_id == '-1':
+ db_filter = coalesce(db.Ratings.rating, 0) < 1
+ entries, random, pagination = calibre_db.fill_indexpage(page, 0,
+ db.Books,
+ db_filter,
+ [order[0][0]],
+ True, config.config_read_column,
+ db.books_ratings_link,
+ db.Books.id == db.books_ratings_link.c.book,
+ db.Ratings)
+ title = _("Rating: None")
+ else:
+ name = calibre_db.session.query(db.Ratings).filter(db.Ratings.id == book_id).first()
+ if name:
+ entries, random, pagination = calibre_db.fill_indexpage(page, 0,
+ db.Books,
+ db.Books.ratings.any(db.Ratings.id == book_id),
+ [order[0][0]],
+ True, config.config_read_column)
+ title = _("Rating: %(rating)s stars", rating=int(name.rating / 2))
+ else:
+ abort(404)
+ return render_title_template('index.html', random=random, pagination=pagination, entries=entries, id=book_id,
+ title=title, page="ratings", order=order[1])
+
+
+def render_formats_books(page, book_id, order):
+ if book_id == '-1':
+ name = _("None")
+ entries, random, pagination = calibre_db.fill_indexpage(page, 0,
+ db.Books,
+ db.Data.format == None,
+ [order[0][0]],
+ True, config.config_read_column,
+ db.Data)
+
+ else:
+ name = calibre_db.session.query(db.Data).filter(db.Data.format == book_id.upper()).first()
+ if name:
+ name = name.format
+ entries, random, pagination = calibre_db.fill_indexpage(page, 0,
+ db.Books,
+ db.Books.data.any(
+ db.Data.format == book_id.upper()),
+ [order[0][0]],
+ True, config.config_read_column)
+ else:
+ abort(404)
+
+ return render_title_template('index.html', random=random, pagination=pagination, entries=entries, id=book_id,
+ title=_("File format: %(format)s", format=name),
+ page="formats",
+ order=order[1])
+
+
+def render_category_books(page, book_id, order):
+ if book_id == '-1':
+ entries, random, pagination = calibre_db.fill_indexpage(page, 0,
+ db.Books,
+ db.Tags.name == None,
+ [order[0][0], db.Series.name, db.Books.series_index],
+ True, config.config_read_column,
+ db.books_tags_link,
+ db.Books.id == db.books_tags_link.c.book,
+ db.Tags,
+ db.books_series_link,
+ db.Books.id == db.books_series_link.c.book,
+ db.Series)
+ tagsname = _("None")
+ else:
+ tagsname = calibre_db.session.query(db.Tags).filter(db.Tags.id == book_id).first()
+ if tagsname:
+ entries, random, pagination = calibre_db.fill_indexpage(page, 0,
+ db.Books,
+ db.Books.tags.any(db.Tags.id == book_id),
+ [order[0][0], db.Series.name,
+ db.Books.series_index],
+ True, config.config_read_column,
+ db.books_series_link,
+ db.Books.id == db.books_series_link.c.book,
+ db.Series)
+ tagsname = tagsname.name
+ else:
+ abort(404)
+ return render_title_template('index.html', random=random, entries=entries, pagination=pagination, id=book_id,
+ title=_("Category: %(name)s", name=tagsname), page="category", order=order[1])
+
+
+def render_language_books(page, name, order):
+ try:
+ if name.lower() != "none":
+ lang_name = isoLanguages.get_language_name(get_locale(), name)
+ if lang_name == "Unknown":
+ abort(404)
+ else:
+ lang_name = _("None")
+ except KeyError:
+ abort(404)
+ if name == "none":
+ entries, random, pagination = calibre_db.fill_indexpage(page, 0,
+ db.Books,
+ db.Languages.lang_code == None,
+ [order[0][0]],
+ True, config.config_read_column,
+ db.books_languages_link,
+ db.Books.id == db.books_languages_link.c.book,
+ db.Languages)
+ else:
+ entries, random, pagination = calibre_db.fill_indexpage(page, 0,
+ db.Books,
+ db.Books.languages.any(db.Languages.lang_code == name),
+ [order[0][0]],
+ True, config.config_read_column)
+ return render_title_template('index.html', random=random, entries=entries, pagination=pagination, id=name,
+ title=_("Language: %(name)s", name=lang_name), page="language", order=order[1])
+
+
+def render_read_books(page, are_read, as_xml=False, order=None):
+ sort_param = order[0] if order else []
+ if not config.config_read_column:
+ if are_read:
+ db_filter = and_(ub.ReadBook.user_id == int(current_user.id),
+ ub.ReadBook.read_status == ub.ReadBook.STATUS_FINISHED)
+ else:
+ db_filter = coalesce(ub.ReadBook.read_status, 0) != ub.ReadBook.STATUS_FINISHED
+ else:
+ try:
+ if are_read:
+ db_filter = db.cc_classes[config.config_read_column].value == True
+ else:
+ db_filter = coalesce(db.cc_classes[config.config_read_column].value, False) != True
+ except (KeyError, AttributeError, IndexError):
+ log.error("Custom Column No.{} does not exist in calibre database".format(config.config_read_column))
+ if not as_xml:
+ flash(_("Custom Column No.%(column)d does not exist in calibre database",
+ column=config.config_read_column),
+ category="error")
+ return redirect(url_for("web.index"))
+ return [] # ToDo: Handle error Case for opds
+
+ entries, random, pagination = calibre_db.fill_indexpage(page, 0,
+ db.Books,
+ db_filter,
+ sort_param,
+ True, config.config_read_column,
+ db.books_series_link,
+ db.Books.id == db.books_series_link.c.book,
+ db.Series)
+
+ if as_xml:
+ return entries, pagination
+ else:
+ if are_read:
+ name = _('Read Books') + ' (' + str(pagination.total_count) + ')'
+ page_name = "read"
+ else:
+ name = _('Unread Books') + ' (' + str(pagination.total_count) + ')'
+ page_name = "unread"
+ return render_title_template('index.html', random=random, entries=entries, pagination=pagination,
+ title=name, page=page_name, order=order[1])
+
+
+def render_archived_books(page, sort_param):
+ order = sort_param[0] or []
+ archived_books = (ub.session.query(ub.ArchivedBook)
+ .filter(ub.ArchivedBook.user_id == int(current_user.id))
+ .filter(ub.ArchivedBook.is_archived == True)
+ .all())
+ archived_book_ids = [archived_book.book_id for archived_book in archived_books]
+
+ archived_filter = db.Books.id.in_(archived_book_ids)
+
+ entries, random, pagination = calibre_db.fill_indexpage_with_archived_books(page, db.Books,
+ 0,
+ archived_filter,
+ order,
+ True,
+ True, config.config_read_column)
+
+ name = _('Archived Books') + ' (' + str(len(archived_book_ids)) + ')'
+ page_name = "archived"
+ return render_title_template('index.html', random=random, entries=entries, pagination=pagination,
+ title=name, page=page_name, order=sort_param[1])
+
+
+# ################################### View Books list ##################################################################
+
+
+@web.route("/", defaults={'page': 1})
+@web.route('/page/')
+@login_required_if_no_ano
+def index(page):
+ sort_param = (request.args.get('sort') or 'stored').lower()
+ return render_books_list("newest", sort_param, 1, page)
+
+
+@web.route('//', defaults={'page': 1, 'book_id': 1})
+@web.route('///', defaults={'page': 1, 'book_id': 1})
+@web.route('///', defaults={'page': 1})
+@web.route('////')
+@login_required_if_no_ano
+def books_list(data, sort_param, book_id, page):
+ return render_books_list(data, sort_param, book_id, page)
+
+
+@web.route("/table")
+@user_login_required
+def books_table():
+ visibility = current_user.view_settings.get('table', {})
+ cc = calibre_db.get_cc_columns(config, filter_config_custom_read=True)
+ return render_title_template('book_table.html', title=_("Books List"), cc=cc, page="book_table",
+ visiblility=visibility)
+
+
+@web.route("/ajax/listbooks")
+@user_login_required
+def list_books():
+ off = int(request.args.get("offset") or 0)
+ limit = int(request.args.get("limit") or config.config_books_per_page)
+ search_param = request.args.get("search")
+ sort_param = request.args.get("sort", "id")
+ order = request.args.get("order", "").lower()
+ state = None
+ join = tuple()
+
+ if sort_param == "state":
+ state = json.loads(request.args.get("state", "[]"))
+ elif sort_param == "tags":
+ order = [db.Tags.name.asc()] if order == "asc" else [db.Tags.name.desc()]
+ join = db.books_tags_link, db.Books.id == db.books_tags_link.c.book, db.Tags
+ elif sort_param == "series":
+ order = [db.Series.name.asc()] if order == "asc" else [db.Series.name.desc()]
+ join = db.books_series_link, db.Books.id == db.books_series_link.c.book, db.Series
+ elif sort_param == "publishers":
+ order = [db.Publishers.name.asc()] if order == "asc" else [db.Publishers.name.desc()]
+ join = db.books_publishers_link, db.Books.id == db.books_publishers_link.c.book, db.Publishers
+ elif sort_param == "authors":
+ order = [db.Authors.name.asc(), db.Series.name, db.Books.series_index] if order == "asc" \
+ else [db.Authors.name.desc(), db.Series.name.desc(), db.Books.series_index.desc()]
+ join = db.books_authors_link, db.Books.id == db.books_authors_link.c.book, db.Authors, db.books_series_link, \
+ db.Books.id == db.books_series_link.c.book, db.Series
+ elif sort_param == "languages":
+ order = [db.Languages.lang_code.asc()] if order == "asc" else [db.Languages.lang_code.desc()]
+ join = db.books_languages_link, db.Books.id == db.books_languages_link.c.book, db.Languages
+ elif order and sort_param in ["sort", "title", "authors_sort", "series_index"]:
+ order = [text(sort_param + " " + order)]
+ elif not state:
+ order = [db.Books.timestamp.desc()]
+
+ total_count = filtered_count = calibre_db.session.query(db.Books).filter(
+ calibre_db.common_filters(allow_show_archived=True)).count()
+ if state is not None:
+ if search_param:
+ books = calibre_db.search_query(search_param, config).all()
+ filtered_count = len(books)
+ else:
+ query = calibre_db.generate_linked_query(config.config_read_column, db.Books)
+ books = query.filter(calibre_db.common_filters(allow_show_archived=True)).all()
+ entries = calibre_db.get_checkbox_sorted(books, state, off, limit, order, True)
+ elif search_param:
+ entries, filtered_count, __ = calibre_db.get_search_results(search_param,
+ config,
+ off,
+ [order, ''],
+ limit,
+ *join)
+ else:
+ entries, __, __ = calibre_db.fill_indexpage_with_archived_books((int(off) / (int(limit)) + 1),
+ db.Books,
+ limit,
+ True,
+ order,
+ True,
+ True,
+ config.config_read_column,
+ *join)
+
+ result = list()
+ for entry in entries:
+ val = entry[0]
+ val.is_archived = entry[1] is True
+ val.read_status = entry[2] == ub.ReadBook.STATUS_FINISHED
+ for lang_index in range(0, len(val.languages)):
+ val.languages[lang_index].language_name = isoLanguages.get_language_name(get_locale(), val.languages[
+ lang_index].lang_code)
+ result.append(val)
+
+ table_entries = {'totalNotFiltered': total_count, 'total': filtered_count, "rows": result}
+ js_list = json.dumps(table_entries, cls=db.AlchemyEncoder)
+
+ response = make_response(js_list)
+ response.headers["Content-Type"] = "application/json; charset=utf-8"
+ return response
+
+
+@web.route("/ajax/table_settings", methods=['POST'])
+@user_login_required
+def update_table_settings():
+ current_user.view_settings['table'] = json.loads(request.data)
+ try:
+ try:
+ flag_modified(current_user, "view_settings")
+ except AttributeError:
+ pass
+ ub.session.commit()
+ except (InvalidRequestError, OperationalError):
+ log.error("Invalid request received: %r ", request, )
+ return "Invalid request", 400
+ return ""
+
+
+@web.route("/author")
+@login_required_if_no_ano
+def author_list():
+ if current_user.check_visibility(constants.SIDEBAR_AUTHOR):
+ if current_user.get_view_property('author', 'dir') == 'desc':
+ order = db.Authors.sort.desc()
+ order_no = 0
+ else:
+ order = db.Authors.sort.asc()
+ order_no = 1
+ entries = calibre_db.session.query(db.Authors, func.count('books_authors_link.book').label('count')) \
+ .join(db.books_authors_link).join(db.Books).filter(calibre_db.common_filters()) \
+ .group_by(text('books_authors_link.author')).order_by(order).all()
+ char_list = query_char_list(db.Authors.sort, db.books_authors_link)
+ # If not creating a copy, readonly databases can not display authornames with "|" in it as changing the name
+ # starts a change session
+ author_copy = copy.deepcopy(entries)
+ for entry in author_copy:
+ entry.Authors.name = entry.Authors.name.replace('|', ',')
+ return render_title_template('list.html', entries=author_copy, folder='web.books_list', charlist=char_list,
+ title="Authors", page="authorlist", data='author', order=order_no)
+ else:
+ abort(404)
+
+
+@web.route("/downloadlist")
+@login_required_if_no_ano
+def download_list():
+ if current_user.get_view_property('download', 'dir') == 'desc':
+ order = ub.User.name.desc()
+ order_no = 0
+ else:
+ order = ub.User.name.asc()
+ order_no = 1
+ if current_user.check_visibility(constants.SIDEBAR_DOWNLOAD) and current_user.role_admin():
+ entries = ub.session.query(ub.User, func.count(ub.Downloads.book_id).label('count')) \
+ .join(ub.Downloads).group_by(ub.Downloads.user_id).order_by(order).all()
+ char_list = ub.session.query(func.upper(func.substr(ub.User.name, 1, 1)).label('char')) \
+ .filter(ub.User.role.op('&')(constants.ROLE_ANONYMOUS) != constants.ROLE_ANONYMOUS) \
+ .group_by(func.upper(func.substr(ub.User.name, 1, 1))).all()
+ return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=char_list,
+ title=_("Downloads"), page="downloadlist", data="download", order=order_no)
+ else:
+ abort(404)
+
+
+@web.route("/publisher")
+@login_required_if_no_ano
+def publisher_list():
+ if current_user.get_view_property('publisher', 'dir') == 'desc':
+ order = db.Publishers.name.desc()
+ order_no = 0
+ else:
+ order = db.Publishers.name.asc()
+ order_no = 1
+ if current_user.check_visibility(constants.SIDEBAR_PUBLISHER):
+ entries = calibre_db.session.query(db.Publishers, func.count('books_publishers_link.book').label('count')) \
+ .join(db.books_publishers_link).join(db.Books).filter(calibre_db.common_filters()) \
+ .group_by(text('books_publishers_link.publisher')).order_by(order).all()
+ no_publisher_count = (calibre_db.session.query(db.Books)
+ .outerjoin(db.books_publishers_link).outerjoin(db.Publishers)
+ .filter(db.Publishers.name == None)
+ .filter(calibre_db.common_filters())
+ .count())
+ if no_publisher_count:
+ entries.append([db.Category(_("None"), "-1"), no_publisher_count])
+ entries = sorted(entries, key=lambda x: x[0].name.lower(), reverse=not order_no)
+ char_list = generate_char_list(entries)
+ return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=char_list,
+ title=_("Publishers"), page="publisherlist", data="publisher", order=order_no)
+ else:
+ abort(404)
+
+
+@web.route("/series")
+@login_required_if_no_ano
+def series_list():
+ if current_user.check_visibility(constants.SIDEBAR_SERIES):
+ if current_user.get_view_property('series', 'dir') == 'desc':
+ order = db.Series.sort.desc()
+ order_no = 0
+ else:
+ order = db.Series.sort.asc()
+ order_no = 1
+ char_list = query_char_list(db.Series.sort, db.books_series_link)
+ if current_user.get_view_property('series', 'series_view') == 'list':
+ entries = calibre_db.session.query(db.Series, func.count('books_series_link.book').label('count')) \
+ .join(db.books_series_link).join(db.Books).filter(calibre_db.common_filters()) \
+ .group_by(text('books_series_link.series')).order_by(order).all()
+ no_series_count = (calibre_db.session.query(db.Books)
+ .outerjoin(db.books_series_link).outerjoin(db.Series)
+ .filter(db.Series.name == None)
+ .filter(calibre_db.common_filters())
+ .count())
+ if no_series_count:
+ entries.append([db.Category(_("None"), "-1"), no_series_count])
+ entries = sorted(entries, key=lambda x: x[0].name.lower(), reverse=not order_no)
+ return render_title_template('list.html',
+ entries=entries,
+ folder='web.books_list',
+ charlist=char_list,
+ title=_("Series"),
+ page="serieslist",
+ data="series", order=order_no)
+ else:
+ entries = (calibre_db.session.query(db.Books, func.count('books_series_link').label('count'),
+ func.max(db.Books.series_index), db.Books.id)
+ .join(db.books_series_link).join(db.Series).filter(calibre_db.common_filters())
+ .group_by(text('books_series_link.series'))
+ .having(or_(func.max(db.Books.series_index), db.Books.series_index==""))
+ .order_by(order)
+ .all())
+ return render_title_template('grid.html', entries=entries, folder='web.books_list', charlist=char_list,
+ title=_("Series"), page="serieslist", data="series", bodyClass="grid-view",
+ order=order_no)
+ else:
+ abort(404)
+
+
+@web.route("/ratings")
+@login_required_if_no_ano
+def ratings_list():
+ if current_user.check_visibility(constants.SIDEBAR_RATING):
+ if current_user.get_view_property('ratings', 'dir') == 'desc':
+ order = db.Ratings.rating.desc()
+ order_no = 0
+ else:
+ order = db.Ratings.rating.asc()
+ order_no = 1
+ entries = calibre_db.session.query(db.Ratings, func.count('books_ratings_link.book').label('count'),
+ (db.Ratings.rating / 2).label('name')) \
+ .join(db.books_ratings_link).join(db.Books).filter(calibre_db.common_filters()) \
+ .filter(db.Ratings.rating > 0) \
+ .group_by(text('books_ratings_link.rating')).order_by(order).all()
+ no_rating_count = (calibre_db.session.query(db.Books)
+ .outerjoin(db.books_ratings_link).outerjoin(db.Ratings)
+ .filter(or_(db.Ratings.rating == None, db.Ratings.rating == 0))
+ .filter(calibre_db.common_filters())
+ .count())
+ if no_rating_count:
+ entries.append([db.Category(_("None"), "-1", -1), no_rating_count])
+ entries = sorted(entries, key=lambda x: x[0].rating, reverse=not order_no)
+ return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=list(),
+ title=_("Ratings list"), page="ratingslist", data="ratings", order=order_no)
+ else:
+ abort(404)
+
+
+@web.route("/formats")
+@login_required_if_no_ano
+def formats_list():
+ if current_user.check_visibility(constants.SIDEBAR_FORMAT):
+ if current_user.get_view_property('formats', 'dir') == 'desc':
+ order = db.Data.format.desc()
+ order_no = 0
+ else:
+ order = db.Data.format.asc()
+ order_no = 1
+ entries = calibre_db.session.query(db.Data,
+ func.count('data.book').label('count'),
+ db.Data.format.label('format')) \
+ .join(db.Books).filter(calibre_db.common_filters()) \
+ .group_by(db.Data.format).order_by(order).all()
+ no_format_count = (calibre_db.session.query(db.Books).outerjoin(db.Data)
+ .filter(db.Data.format == None)
+ .filter(calibre_db.common_filters())
+ .count())
+ if no_format_count:
+ entries.append([db.Category(_("None"), "-1"), no_format_count])
+ return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=list(),
+ title=_("File formats list"), page="formatslist", data="formats", order=order_no)
+ else:
+ abort(404)
+
+
+@web.route("/language")
+@login_required_if_no_ano
+def language_overview():
+ if current_user.check_visibility(constants.SIDEBAR_LANGUAGE) and current_user.filter_language() == "all":
+ order_no = 0 if current_user.get_view_property('language', 'dir') == 'desc' else 1
+ languages = calibre_db.speaking_language(reverse_order=not order_no, with_count=True)
+ char_list = generate_char_list(languages)
+ return render_title_template('list.html', entries=languages, folder='web.books_list', charlist=char_list,
+ title=_("Languages"), page="langlist", data="language", order=order_no)
+ else:
+ abort(404)
+
+
+@web.route("/category")
+@login_required_if_no_ano
+def category_list():
+ if current_user.check_visibility(constants.SIDEBAR_CATEGORY):
+ if current_user.get_view_property('category', 'dir') == 'desc':
+ order = db.Tags.name.desc()
+ order_no = 0
+ else:
+ order = db.Tags.name.asc()
+ order_no = 1
+ entries = calibre_db.session.query(db.Tags, func.count('books_tags_link.book').label('count')) \
+ .join(db.books_tags_link).join(db.Books).order_by(order).filter(calibre_db.common_filters()) \
+ .group_by(db.Tags.id).all()
+ no_tag_count = (calibre_db.session.query(db.Books)
+ .outerjoin(db.books_tags_link).outerjoin(db.Tags)
+ .filter(db.Tags.name == None)
+ .filter(calibre_db.common_filters())
+ .count())
+ if no_tag_count:
+ entries.append([db.Category(_("None"), "-1"), no_tag_count])
+ entries = sorted(entries, key=lambda x: x[0].name.lower(), reverse=not order_no)
+ char_list = generate_char_list(entries)
+ return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=char_list,
+ title=_("Categories"), page="catlist", data="category", order=order_no)
+ else:
+ abort(404)
+
+
+
+
+# ################################### Download/Send ##################################################################
+
+
+@web.route("/cover/")
+@web.route("/cover//")
+@login_required_if_no_ano
+def get_cover(book_id, resolution=None):
+ resolutions = {
+ 'og': constants.COVER_THUMBNAIL_ORIGINAL,
+ 'sm': constants.COVER_THUMBNAIL_SMALL,
+ 'md': constants.COVER_THUMBNAIL_MEDIUM,
+ 'lg': constants.COVER_THUMBNAIL_LARGE,
+ }
+ cover_resolution = resolutions.get(resolution, None)
+ return get_book_cover(book_id, cover_resolution)
+
+
+@web.route("/series_cover/")
+@web.route("/series_cover//")
+@login_required_if_no_ano
+def get_series_cover(series_id, resolution=None):
+ resolutions = {
+ 'og': constants.COVER_THUMBNAIL_ORIGINAL,
+ 'sm': constants.COVER_THUMBNAIL_SMALL,
+ 'md': constants.COVER_THUMBNAIL_MEDIUM,
+ 'lg': constants.COVER_THUMBNAIL_LARGE,
+ }
+ cover_resolution = resolutions.get(resolution, None)
+ return get_series_cover_thumbnail(series_id, cover_resolution)
+
+
+
+@web.route("/robots.txt")
+def get_robots():
+ try:
+ return send_from_directory(constants.STATIC_DIR, "robots.txt")
+ except PermissionError:
+ log.error("No permission to access robots.txt file.")
+ abort(403)
+
+
+@web.route("/show//", defaults={'anyname': 'None'})
+@web.route("/show///")
+@login_required_if_no_ano
+@viewer_required
+def serve_book(book_id, book_format, anyname):
+ book_format = book_format.split(".")[0]
+ book = calibre_db.get_book(book_id)
+ data = calibre_db.get_book_format(book_id, book_format.upper())
+ if not data:
+ return "File not in Database"
+ range_header = request.headers.get('Range', None)
+
+ if config.config_use_google_drive:
+ try:
+ headers = Headers()
+ headers["Content-Type"] = mimetypes.types_map.get('.' + book_format, "application/octet-stream")
+ if not range_header:
+ log.info('Serving book: %s', data.name)
+ headers['Accept-Ranges'] = 'bytes'
+ df = getFileFromEbooksFolder(book.path, data.name + "." + book_format)
+ return do_gdrive_download(df, headers, (book_format.upper() == 'TXT'))
+ except AttributeError as ex:
+ log.error_or_exception(ex)
+ return "File Not Found"
+ else:
+ if book_format.upper() == 'TXT':
+ log.info('Serving book: %s', data.name)
+ try:
+ rawdata = open(os.path.join(config.get_book_path(), book.path, data.name + "." + book_format),
+ "rb").read()
+ result = chardet.detect(rawdata)
+ try:
+ text_data = rawdata.decode(result['encoding']).encode('utf-8')
+ except UnicodeDecodeError as e:
+ log.error("Encoding error in text file {}: {}".format(book.id, e))
+ if "surrogate" in e.reason:
+ text_data = rawdata.decode(result['encoding'], 'surrogatepass').encode('utf-8', 'surrogatepass')
+ else:
+ text_data = rawdata.decode(result['encoding'], 'ignore').encode('utf-8', 'ignore')
+ return make_response(text_data)
+ except FileNotFoundError:
+ log.error("File Not Found")
+ return "File Not Found"
+ # enable byte range read of pdf
+ response = make_response(
+ send_from_directory(os.path.join(config.get_book_path(), book.path), data.name + "." + book_format))
+ if not range_header:
+ log.info('Serving book: %s', data.name)
+ response.headers['Accept-Ranges'] = 'bytes'
+ return response
+
+
+@web.route("/download//", defaults={'anyname': 'None'})
+@web.route("/download///")
+@login_required_if_no_ano
+@download_required
+def download_link(book_id, book_format, anyname):
+ client = "kobo" if "Kobo" in request.headers.get('User-Agent') else ""
+ return get_download_link(book_id, book_format, client)
+
+
+@web.route('/send///', methods=["POST"])
+@login_required_if_no_ano
+@download_required
+def send_to_ereader(book_id, book_format, convert):
+ if not config.get_mail_server_configured():
+ response = [{'type': "danger", 'message': _("Please configure the SMTP mail settings first...")}]
+ return Response(json.dumps(response), mimetype='application/json')
+ elif current_user.kindle_mail:
+ result = send_mail(book_id, book_format, convert, current_user.kindle_mail, config.get_book_path(),
+ current_user.name)
+ if result is None:
+ ub.update_download(book_id, int(current_user.id))
+ response = [{'type': "success", 'message': _("Success! Book queued for sending to %(eReadermail)s",
+ eReadermail=current_user.kindle_mail)}]
+ else:
+ response = [{'type': "danger", 'message': _("Oops! There was an error sending book: %(res)s", res=result)}]
+ else:
+ response = [{'type': "danger", 'message': _("Oops! Please update your profile with a valid eReader Email.")}]
+ return Response(json.dumps(response), mimetype='application/json')
+
+
+# ################################### Login Logout ##################################################################
+
+@web.route('/register', methods=['POST'])
+@limiter.limit("40/day", key_func=get_remote_address)
+@limiter.limit("3/minute", key_func=get_remote_address)
+def register_post():
+ if not config.config_public_reg:
+ abort(404)
+ to_save = request.form.to_dict()
+ try:
+ limiter.check()
+ except RateLimitExceeded:
+ flash(_(u"Please wait one minute to register next user"), category="error")
+ return render_title_template('register.html', config=config, title=_("Register"), page="register")
+ except (ConnectionError, Exception) as e:
+ log.error("Connection error to limiter backend: %s", e)
+ flash(_("Connection error to limiter backend, please contact your administrator"), category="error")
+ return render_title_template('register.html', config=config, title=_("Register"), page="register")
+ if current_user is not None and current_user.is_authenticated:
+ return redirect(url_for('web.index'))
+ if not config.get_mail_server_configured():
+ flash(_("Oops! Email server is not configured, please contact your administrator."), category="error")
+ return render_title_template('register.html', title=_("Register"), page="register")
+ nickname = strip_whitespaces(to_save.get("email", "")) if config.config_register_email else to_save.get('name')
+ if not nickname or not to_save.get("email"):
+ flash(_("Oops! Please complete all fields."), category="error")
+ return render_title_template('register.html', title=_("Register"), page="register")
+ try:
+ nickname = check_username(nickname)
+ email = check_email(to_save.get("email", ""))
+ except Exception as ex:
+ flash(str(ex), category="error")
+ return render_title_template('register.html', title=_("Register"), page="register")
+
+ content = ub.User()
+ if check_valid_domain(email):
+ content.name = nickname
+ content.email = email
+ password = generate_random_password(config.config_password_min_length)
+ content.password = generate_password_hash(password)
+ content.role = config.config_default_role
+ content.locale = config.config_default_locale
+ content.sidebar_view = config.config_default_show
+ try:
+ ub.session.add(content)
+ ub.session.commit()
+ if feature_support['oauth']:
+ register_user_with_oauth(content)
+ send_registration_mail(strip_whitespaces(to_save.get("email", "")), nickname, password)
+ except Exception:
+ ub.session.rollback()
+ flash(_("Oops! An unknown error occurred. Please try again later."), category="error")
+ return render_title_template('register.html', title=_("Register"), page="register")
+ else:
+ flash(_("Oops! Your Email is not allowed."), category="error")
+ log.warning('Registering failed for user "{}" Email: {}'.format(nickname, to_save.get("email","")))
+ return render_title_template('register.html', title=_("Register"), page="register")
+ flash(_("Success! Confirmation Email has been sent."), category="success")
+ return redirect(url_for('web.login'))
+
+
+@web.route('/register', methods=['GET'])
+def register():
+ if not config.config_public_reg:
+ abort(404)
+ if current_user is not None and current_user.is_authenticated:
+ return redirect(url_for('web.index'))
+ if not config.get_mail_server_configured():
+ flash(_("Oops! Email server is not configured, please contact your administrator."), category="error")
+ return render_title_template('register.html', title=_("Register"), page="register")
+ if feature_support['oauth']:
+ register_user_with_oauth()
+ return render_title_template('register.html', config=config, title=_("Register"), page="register")
+
+
+def handle_login_user(user, remember, message, category):
+ login_user(user, remember=remember)
+ flash(message, category=category)
+ [limiter.limiter.storage.clear(k.key) for k in limiter.current_limits]
+ return redirect(get_redirect_location(request.form.get('next', None), "web.index"))
+
+
+def render_login(username="", password=""):
+ next_url = request.args.get('next', default=url_for("web.index"), type=str)
+ if url_for("web.logout") == next_url:
+ next_url = url_for("web.index")
+ return render_title_template('login.html',
+ title=_("Login"),
+ next_url=next_url,
+ config=config,
+ username=username,
+ password=password,
+ oauth_check=oauth_check,
+ mail=config.get_mail_server_configured(), page="login")
+
+
+@web.route('/login', methods=['GET'])
+def login():
+ if current_user is not None and current_user.is_authenticated:
+ return redirect(url_for('web.index'))
+ if config.config_login_type == constants.LOGIN_LDAP and not services.ldap:
+ log.error(u"Cannot activate LDAP authentication")
+ flash(_(u"Cannot activate LDAP authentication"), category="error")
+ return render_login()
+
+
+@web.route('/login', methods=['POST'])
+@limiter.limit("40/day", key_func=lambda: strip_whitespaces(request.form.get('username', "")).lower())
+@limiter.limit("3/minute", key_func=lambda: strip_whitespaces(request.form.get('username', "")).lower())
+def login_post():
+ form = request.form.to_dict()
+ username = strip_whitespaces(form.get('username', "")).lower().replace("\n","").replace("\r","")
+ try:
+ limiter.check()
+ except RateLimitExceeded:
+ flash(_("Please wait one minute before next login"), category="error")
+ return render_login(username, form.get("password", ""))
+ except (ConnectionError, Exception) as e:
+ log.error("Connection error to limiter backend: %s", e)
+ flash(_("Connection error to limiter backend, please contact your administrator"), category="error")
+ return render_login(username, form.get("password", ""))
+ if current_user is not None and current_user.is_authenticated:
+ return redirect(url_for('web.index'))
+ if config.config_login_type == constants.LOGIN_LDAP and not services.ldap:
+ log.error(u"Cannot activate LDAP authentication")
+ flash(_(u"Cannot activate LDAP authentication"), category="error")
+ user = ub.session.query(ub.User).filter(func.lower(ub.User.name) == username).first()
+ remember_me = bool(form.get('remember_me'))
+ if config.config_login_type == constants.LOGIN_LDAP and services.ldap and user and form['password'] != "":
+ login_result, error = services.ldap.bind_user(username, form['password'])
+ if login_result:
+ log.debug(u"You are now logged in as: '{}'".format(user.name))
+ return handle_login_user(user,
+ remember_me,
+ _(u"you are now logged in as: '%(nickname)s'", nickname=user.name),
+ "success")
+ elif login_result is None and user and check_password_hash(str(user.password), form['password']) \
+ and user.name != "Guest":
+ log.info("Local Fallback Login as: '{}'".format(user.name))
+ return handle_login_user(user,
+ remember_me,
+ _(u"Fallback Login as: '%(nickname)s', "
+ u"LDAP Server not reachable, or user not known", nickname=user.name),
+ "warning")
+ elif login_result is None:
+ log.info(error)
+ flash(_(u"Could not login: %(message)s", message=error), category="error")
+ else:
+ ip_address = request.headers.get('X-Forwarded-For', request.remote_addr)
+ log.warning('LDAP Login failed for user "%s" IP-address: %s', username, ip_address)
+ flash(_(u"Wrong Username or Password"), category="error")
+ else:
+ ip_address = request.headers.get('X-Forwarded-For', request.remote_addr)
+ if form.get('forgot', "") == 'forgot':
+ if user is not None and user.name != "Guest":
+ ret, __ = reset_password(user.id)
+ if ret == 1:
+ flash(_(u"New Password was sent to your email address"), category="info")
+ log.info('Password reset for user "%s" IP-address: %s', username, ip_address)
+ else:
+ log.error(u"An unknown error occurred. Please try again later")
+ flash(_(u"An unknown error occurred. Please try again later."), category="error")
+ else:
+ flash(_(u"Please enter valid username to reset password"), category="error")
+ log.warning('Username missing for password reset IP-address: %s', ip_address)
+ else:
+ if user and check_password_hash(str(user.password), form['password']) and user.name != "Guest":
+ config.config_is_initial = False
+ log.debug(u"You are now logged in as: '{}'".format(user.name))
+ return handle_login_user(user,
+ remember_me,
+ _(u"You are now logged in as: '%(nickname)s'", nickname=user.name),
+ "success")
+ else:
+ log.warning('Login failed for user "{}" IP-address: {}'.format(username, ip_address))
+ flash(_(u"Wrong Username or Password"), category="error")
+ return render_login(username, form.get("password", ""))
+
+
+@web.route('/logout')
+@user_login_required
+def logout():
+ if current_user is not None and current_user.is_authenticated:
+ ub.delete_user_session(current_user.id, flask_session.get('_id', ""))
+ logout_user()
+ if feature_support['oauth'] and (config.config_login_type == 2 or config.config_login_type == 3):
+ logout_oauth_user()
+ log.debug("User logged out")
+ if config.config_anonbrowse:
+ location = get_redirect_location(request.args.get('next', None), "web.login")
+ else:
+ location = None
+ if location:
+ return redirect(location)
+ else:
+ return redirect(url_for('web.login'))
+
+
+# ################################### Users own configuration #########################################################
+def change_profile(kobo_support, local_oauth_check, oauth_status, translations, languages):
+ to_save = request.form.to_dict()
+ current_user.random_books = 0
+ try:
+ if current_user.role_passwd() or current_user.role_admin():
+ if to_save.get("password", "") != "":
+ current_user.password = generate_password_hash(valid_password(to_save.get("password")))
+ if to_save.get("kindle_mail", current_user.kindle_mail) != current_user.kindle_mail:
+ current_user.kindle_mail = valid_email(to_save.get("kindle_mail"))
+ new_email = valid_email(to_save.get("email", current_user.email))
+ if not new_email:
+ raise Exception(_("Email can't be empty and has to be a valid Email"))
+ if new_email != current_user.email:
+ current_user.email = check_email(new_email)
+ if current_user.role_admin():
+ if to_save.get("name", current_user.name) != current_user.name:
+ # Query username, if not existing, change
+ current_user.name = check_username(to_save.get("name"))
+ current_user.random_books = 1 if to_save.get("show_random") == "on" else 0
+ current_user.default_language = to_save.get("default_language", "all")
+ current_user.locale = to_save.get("locale", "en")
+ old_state = current_user.kobo_only_shelves_sync
+ # 1 -> 0: nothing has to be done
+ # 0 -> 1: all synced books have to be added to archived books, + currently synced shelfs which
+ # don't have to be synced have to be removed (added to Shelf archive)
+ current_user.kobo_only_shelves_sync = int(to_save.get("kobo_only_shelves_sync") == "on") or 0
+ if old_state == 0 and current_user.kobo_only_shelves_sync == 1:
+ kobo_sync_status.update_on_sync_shelfs(current_user.id)
+
+ except Exception as ex:
+ flash(str(ex), category="error")
+ return render_title_template("user_edit.html",
+ content=current_user,
+ config=config,
+ translations=translations,
+ profile=1,
+ languages=languages,
+ title=_("%(name)s's Profile", name=current_user.name),
+ page="me",
+ kobo_support=kobo_support,
+ registered_oauth=local_oauth_check,
+ oauth_status=oauth_status)
+
+ val = 0
+ for key, __ in to_save.items():
+ if key.startswith('show'):
+ val += int(key[5:])
+ current_user.sidebar_view = val
+ if to_save.get("Show_detail_random"):
+ current_user.sidebar_view += constants.DETAIL_RANDOM
+
+ try:
+ ub.session.commit()
+ flash(_("Success! Profile Updated"), category="success")
+ log.debug("Profile updated")
+ except IntegrityError:
+ ub.session.rollback()
+ flash(_("Oops! An account already exists for this Email."), category="error")
+ log.debug("Found an existing account for this Email")
+ except OperationalError as e:
+ ub.session.rollback()
+ log.error("Database error: %s", e)
+ flash(_("Oops! Database Error: %(error)s.", error=e), category="error")
+
+
+@web.route("/me", methods=["GET", "POST"])
+@user_login_required
+def profile():
+ languages = calibre_db.speaking_language()
+ translations = get_available_locale()
+ kobo_support = feature_support['kobo'] and config.config_kobo_sync
+ if feature_support['oauth'] and config.config_login_type == 2:
+ oauth_status = get_oauth_status()
+ local_oauth_check = oauth_check
+ else:
+ oauth_status = None
+ local_oauth_check = {}
+
+ if request.method == "POST":
+ change_profile(kobo_support, local_oauth_check, oauth_status, translations, languages)
+ return render_title_template("user_edit.html",
+ translations=translations,
+ profile=1,
+ languages=languages,
+ content=current_user,
+ config=config,
+ kobo_support=kobo_support,
+ title=_("%(name)s's Profile", name=current_user.name),
+ page="me",
+ registered_oauth=local_oauth_check,
+ oauth_status=oauth_status)
+
+
+# ###################################Show single book ##################################################################
+
+
+@web.route("/read//")
+@login_required_if_no_ano
+@viewer_required
+def read_book(book_id, book_format):
+ book = calibre_db.get_filtered_book(book_id)
+
+ if not book:
+ flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"),
+ category="error")
+ log.debug("Selected book is unavailable. File does not exist or is not accessible")
+ return redirect(url_for("web.index"))
+
+ book.ordered_authors = calibre_db.order_authors([book], False)
+
+ # check if book has a bookmark
+ bookmark = None
+ if current_user.is_authenticated:
+ bookmark = ub.session.query(ub.Bookmark).filter(and_(ub.Bookmark.user_id == int(current_user.id),
+ ub.Bookmark.book_id == book_id,
+ ub.Bookmark.format == book_format.upper())).first()
+ if book_format.lower() == "epub":
+ log.debug("Start epub reader for %d", book_id)
+ return render_title_template('read.html', bookid=book_id, title=book.title, bookmark=bookmark)
+ elif book_format.lower() == "pdf":
+ log.debug("Start pdf reader for %d", book_id)
+ return render_title_template('readpdf.html', pdffile=book_id, title=book.title)
+ elif book_format.lower() == "txt":
+ log.debug("Start txt reader for %d", book_id)
+ return render_title_template('readtxt.html', txtfile=book_id, title=book.title)
+ elif book_format.lower() in ["djvu", "djv"]:
+ log.debug("Start djvu reader for %d", book_id)
+ return render_title_template('readdjvu.html', djvufile=book_id, title=book.title,
+ extension=book_format.lower())
+ else:
+ for fileExt in constants.EXTENSIONS_AUDIO:
+ if book_format.lower() == fileExt:
+ entries = calibre_db.get_filtered_book(book_id)
+ log.debug("Start mp3 listening for %d", book_id)
+ return render_title_template('listenmp3.html', mp3file=book_id, audioformat=book_format.lower(),
+ entry=entries, bookmark=bookmark)
+ for fileExt in ["cbr", "cbt", "cbz"]:
+ if book_format.lower() == fileExt:
+ all_name = str(book_id)
+ title = book.title
+ if len(book.series):
+ title = title + " - " + book.series[0].name
+ if book.series_index:
+ title = title + " #" + '{0:.2f}'.format(book.series_index).rstrip('0').rstrip('.')
+ log.debug("Start comic reader for %d", book_id)
+ return render_title_template('readcbr.html', comicfile=all_name, title=title,
+ extension=fileExt, bookmark=bookmark)
+ log.debug("Selected book is unavailable. File does not exist or is not accessible")
+ flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"),
+ category="error")
+ return redirect(url_for("web.index"))
+
+
+@web.route("/book/")
+@login_required_if_no_ano
+def show_book(book_id):
+ entries = calibre_db.get_book_read_archived(book_id, config.config_read_column, allow_show_archived=True)
+ if entries:
+ read_book = entries[1]
+ archived_book = entries[2]
+ entry = entries[0]
+ entry.read_status = read_book == ub.ReadBook.STATUS_FINISHED
+ entry.is_archived = archived_book
+ for lang_index in range(0, len(entry.languages)):
+ entry.languages[lang_index].language_name = isoLanguages.get_language_name(get_locale(), entry.languages[
+ lang_index].lang_code)
+ cc = calibre_db.get_cc_columns(config, filter_config_custom_read=True)
+ book_in_shelves = []
+ shelves = ub.session.query(ub.BookShelf).filter(ub.BookShelf.book_id == book_id).all()
+ for sh in shelves:
+ book_in_shelves.append(sh.shelf)
+
+ entry.tags = sort(entry.tags, key=lambda tag: tag.name)
+
+ entry.ordered_authors = calibre_db.order_authors([entry])
+
+ entry.email_share_list = check_send_to_ereader(entry)
+ entry.reader_list = check_read_formats(entry)
+
+ entry.audio_entries = []
+ for media_format in entry.data:
+ if media_format.format.lower() in constants.EXTENSIONS_AUDIO:
+ entry.audio_entries.append(media_format.format.lower())
+
+ return render_title_template('detail.html',
+ entry=entry,
+ cc=cc,
+ is_xhr=request.headers.get('X-Requested-With') == 'XMLHttpRequest',
+ title=entry.title,
+ books_shelfs=book_in_shelves,
+ page="book")
+ else:
+ log.debug("Selected book is unavailable. File does not exist or is not accessible")
+ flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"),
+ category="error")
+ return redirect(url_for("web.index"))