Skip to content

Commit

Permalink
added ruff's D202 check (#61)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Piskun <bigcat88@icloud.com>
  • Loading branch information
bigcat88 authored Aug 2, 2023
1 parent e501238 commit 9862eb0
Show file tree
Hide file tree
Showing 14 changed files with 1 addition and 106 deletions.
8 changes: 0 additions & 8 deletions nc_py_api/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ def disable(self, app_name: str) -> None:
:param app_name: id of the application.
.. note:: Does not work in NextcloudApp mode, only for Nextcloud client mode."""

if not app_name:
raise ValueError("`app_name` parameter can not be empty")
self._session.ocs(method="DELETE", path=f"{ENDPOINT}/{app_name}")
Expand All @@ -49,7 +48,6 @@ def enable(self, app_name: str) -> None:
:param app_name: id of the application.
.. note:: Does not work in NextcloudApp mode, only for Nextcloud client mode."""

if not app_name:
raise ValueError("`app_name` parameter can not be empty")
self._session.ocs(method="POST", path=f"{ENDPOINT}/{app_name}")
Expand All @@ -59,7 +57,6 @@ def get_list(self, enabled: Optional[bool] = None) -> list[str]:
:param enabled: filter to list all/only enabled/only disabled applications.
"""

params = None
if enabled is not None:
params = {"filter": "enabled" if enabled else "disabled"}
Expand All @@ -71,7 +68,6 @@ def is_installed(self, app_name: str) -> bool:
:param app_name: id of the application.
"""

if not app_name:
raise ValueError("`app_name` parameter can not be empty")
return app_name in self.get_list()
Expand All @@ -81,7 +77,6 @@ def is_enabled(self, app_name: str) -> bool:
:param app_name: id of the application.
"""

if not app_name:
raise ValueError("`app_name` parameter can not be empty")
return app_name in self.get_list(enabled=True)
Expand All @@ -91,19 +86,16 @@ def is_disabled(self, app_name: str) -> bool:
:param app_name: id of the application.
"""

if not app_name:
raise ValueError("`app_name` parameter can not be empty")
return app_name in self.get_list(enabled=False)

def ex_app_get_list(self) -> list[str]:
"""Gets the list of the external applications IDs installed on the server."""

require_capabilities("app_ecosystem_v2", self._session.capabilities)
return self._session.ocs(method="GET", path=f"{APP_V2_BASIC_URL}/ex-app/all", params={"extended": 0})

def ex_app_get_info(self) -> list[ExAppInfo]:
"""Gets information of the external applications installed on the server."""

require_capabilities("app_ecosystem_v2", self._session.capabilities)
return self._session.ocs(method="GET", path=f"{APP_V2_BASIC_URL}/ex-app/all", params={"extended": 1})
15 changes: 0 additions & 15 deletions nc_py_api/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ def listdir(self, path: Union[str, FsNode] = "", depth: int = 1, exclude_self=Tr
:param exclude_self: boolean value indicating whether the `path` itself should be excluded from the list or not.
Default = **True**.
"""

if exclude_self and not depth:
raise ValueError("Wrong input parameters, query will return nothing.")
properties = PROPFIND_PROPERTIES
Expand All @@ -83,14 +82,12 @@ def by_id(self, file_id: Union[int, str, FsNode]) -> Optional[FsNode]:
:param file_id: can be full file ID with Nextcloud instance ID or only clear file ID.
"""

file_id = file_id.file_id if isinstance(file_id, FsNode) else file_id
result = self.find(req=["eq", "fileid", file_id])
return result[0] if result else None

def by_path(self, path: Union[str, FsNode]) -> Optional[FsNode]:
"""Returns :py:class:`~nc_py_api.files_defs.FsNode` by exact path if any."""

path = path.user_path if isinstance(path, FsNode) else path
result = self.listdir(path, depth=0, exclude_self=False)
return result[0] if result else None
Expand All @@ -101,7 +98,6 @@ def find(self, req: list, path: Union[str, FsNode] = "") -> list[FsNode]:
:param req: list of conditions to search for. Detailed description here...
:param path: path where to search from. Default = **""**.
"""

# `req` possible keys: "name", "mime", "last_modified", "size", "favorite", "fileid"
path = path.user_path if isinstance(path, FsNode) else path
root = ElementTree.Element(
Expand Down Expand Up @@ -129,7 +125,6 @@ def download(self, path: Union[str, FsNode]) -> bytes:
:param path: path to download file.
"""

path = path.user_path if isinstance(path, FsNode) else path
response = self._session.dav("GET", self._dav_get_obj_path(self._session.user, path))
check_error(response.status_code, f"download: user={self._session.user}, path={path}")
Expand All @@ -143,7 +138,6 @@ def download2stream(self, path: Union[str, FsNode], fp, **kwargs) -> None:
The object must implement the ``file.write`` method and be able to write binary data.
:param kwargs: **chunk_size** an int value specifying chunk size to write. Default = **4Mb**
"""

path = path.user_path if isinstance(path, FsNode) else path
if isinstance(fp, (str, Path)):
with builtins.open(fp, "wb") as f:
Expand All @@ -159,7 +153,6 @@ def upload(self, path: Union[str, FsNode], content: Union[bytes, str]) -> FsNode
:param path: file's upload path.
:param content: content to create the file. If it is a string, it will be encoded into bytes using UTF-8.
"""

path = path.user_path if isinstance(path, FsNode) else path
full_path = self._dav_get_obj_path(self._session.user, path)
response = self._session.dav("PUT", full_path, data=content)
Expand All @@ -174,7 +167,6 @@ def upload_stream(self, path: Union[str, FsNode], fp, **kwargs) -> FsNode:
The object must implement the ``file.read`` method providing data with str or bytes type.
:param kwargs: **chunk_size** an int value specifying chunk size to read. Default = **4Mb**
"""

path = path.user_path if isinstance(path, FsNode) else path
if isinstance(fp, (str, Path)):
with builtins.open(fp, "rb") as f:
Expand All @@ -189,7 +181,6 @@ def mkdir(self, path: Union[str, FsNode]) -> FsNode:
:param path: path of the directory to be created.
"""

path = path.user_path if isinstance(path, FsNode) else path
full_path = self._dav_get_obj_path(self._session.user, path)
response = self._session.dav("MKCOL", full_path)
Expand All @@ -204,7 +195,6 @@ def makedirs(self, path: Union[str, FsNode], exist_ok=False) -> Optional[FsNode]
:param exist_ok: ignore error if any of pathname components already exists.
:returns: `FsNode` if directory was created or ``None`` if it was already created.
"""

_path = ""
path = path.user_path if isinstance(path, FsNode) else path
result = None
Expand All @@ -226,7 +216,6 @@ def delete(self, path: Union[str, FsNode], not_fail=False) -> None:
:param path: path to delete.
:param not_fail: if set to ``True`` and the object is not found, it does not raise an exception.
"""

path = path.user_path if isinstance(path, FsNode) else path
response = self._session.dav("DELETE", self._dav_get_obj_path(self._session.user, path))
if response.status_code == 404 and not_fail:
Expand All @@ -241,7 +230,6 @@ def move(self, path_src: Union[str, FsNode], path_dest: Union[str, FsNode], over
:param overwrite: if ``True`` and the destination object already exists, it gets overwritten.
Default = **False**.
"""

path_src = path_src.user_path if isinstance(path_src, FsNode) else path_src
full_dest_path = self._dav_get_obj_path(
self._session.user, path_dest.user_path if isinstance(path_dest, FsNode) else path_dest
Expand All @@ -264,7 +252,6 @@ def copy(self, path_src: Union[str, FsNode], path_dest: Union[str, FsNode], over
:param overwrite: if ``True`` and the destination object already exists, it gets overwritten.
Default = **False**.
"""

path_src = path_src.user_path if isinstance(path_src, FsNode) else path_src
full_dest_path = self._dav_get_obj_path(
self._session.user, path_dest.user_path if isinstance(path_dest, FsNode) else path_dest
Expand All @@ -281,7 +268,6 @@ def copy(self, path_src: Union[str, FsNode], path_dest: Union[str, FsNode], over

def listfav(self) -> list[FsNode]:
"""Returns a list of the current user's favorite files."""

root = ElementTree.Element(
"oc:filter-files",
attrib={"xmlns:d": "DAV:", "xmlns:oc": "http://owncloud.org/ns", "xmlns:nc": "http://nextcloud.org/ns"},
Expand All @@ -301,7 +287,6 @@ def setfav(self, path: Union[str, FsNode], value: Union[int, bool]) -> None:
:param path: path to the object to set the state.
:param value: value to set for the ``favourite`` state.
"""

path = path.user_path if isinstance(path, FsNode) else path
root = ElementTree.Element(
"d:propertyupdate",
Expand Down
14 changes: 0 additions & 14 deletions nc_py_api/files_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def last_modified(self) -> datetime:
"""Time when the object was last modified.
.. note:: ETag if more preferable way to check if the object was changed."""

return self._last_modified

@last_modified.setter
Expand Down Expand Up @@ -77,7 +76,6 @@ def __init__(self, full_path: str, **kwargs):
@property
def is_dir(self) -> bool:
"""Returns ``True`` for the directories, ``False`` otherwise."""

return self.full_path.endswith("/")

def __str__(self):
Expand All @@ -95,69 +93,58 @@ def __eq__(self, other):
def has_extra(self) -> bool:
"""Flag indicating whether this ``FsNode`` was obtained by the `mkdir` or `upload`
methods and does not contain extended information."""

return bool(self.info.permissions)

@property
def name(self) -> str:
"""Returns last ``pathname`` component."""

return self.full_path.rstrip("/").rsplit("/", maxsplit=1)[-1]

@property
def user(self) -> str:
"""Returns user ID extracted from the `full_path`."""

return self.full_path.lstrip("/").split("/", maxsplit=2)[1]

@property
def user_path(self) -> str:
"""Returns path relative to the user's root directory."""

return self.full_path.lstrip("/").split("/", maxsplit=2)[-1]

@property
def is_shared(self) -> bool:
"""Check if a file or folder is shared."""

return self.info.permissions.find("S") != -1

@property
def is_shareable(self) -> bool:
"""Check if a file or folder can be shared."""

return self.info.permissions.find("R") != -1

@property
def is_mounted(self) -> bool:
"""Check if a file or folder is mounted."""

return self.info.permissions.find("M") != -1

@property
def is_readable(self) -> bool:
"""Check if the file or folder is readable."""

return self.info.permissions.find("G") != -1

@property
def is_deletable(self) -> bool:
"""Check if a file or folder can be deleted."""

return self.info.permissions.find("D") != -1

@property
def is_updatable(self) -> bool:
"""Check if file/directory is writable."""

if self.is_dir:
return self.info.permissions.find("NV") != -1
return self.info.permissions.find("W") != -1

@property
def is_creatable(self) -> bool:
"""Check whether new files or folders can be created inside this folder."""

if not self.is_dir:
return False
return self.info.permissions.find("CK") != -1
Expand Down Expand Up @@ -233,7 +220,6 @@ def share_type(self) -> ShareType:
@property
def permissions(self) -> SharePermissions:
"""Recipient permissions."""

return SharePermissions(int(self.raw_data["permissions"]))

@property
Expand Down
4 changes: 0 additions & 4 deletions nc_py_api/files_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ def __init__(self, session: NcSessionBasic):
@property
def available(self) -> bool:
"""Returns True if the Nextcloud instance supports this feature, False otherwise."""

return not check_capabilities("files_sharing", self._session.capabilities)

def get_list(
self, shared_with_me=False, reshares=False, subfiles=False, path: Union[str, FsNode] = ""
) -> list[Share]:
"""Returns lists of shares."""

require_capabilities("files_sharing", self._session.capabilities)
path = path.user_path if isinstance(path, FsNode) else path
params = {
Expand Down Expand Up @@ -65,7 +63,6 @@ def create(
* ``note`` - string with note, if any. default = ``""``
* ``label`` - string with label, if any. default = ``""``
"""

require_capabilities("files_sharing", self._session.capabilities)
path = path.user_path if isinstance(path, FsNode) else path
params = {
Expand Down Expand Up @@ -94,6 +91,5 @@ def delete(self, share_id: Union[int, Share]) -> None:
:param share_id: The Share object or an ID of the share.
"""

share_id = share_id.share_id if isinstance(share_id, Share) else share_id
self._session.ocs(method="DELETE", path=f"{ENDPOINT_BASE}/shares/{share_id}")
13 changes: 0 additions & 13 deletions nc_py_api/nextcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,34 +36,29 @@ def _init_api(self, session: NcSessionBasic):
@property
def capabilities(self) -> dict:
"""Returns the capabilities of the Nextcloud instance."""

return self._session.capabilities

@property
def srv_version(self) -> ServerVersion:
"""Returns dictionary with the server version."""

return self._session.nc_version

def check_capabilities(self, capabilities: Union[str, list[str]]) -> list[str]:
"""Returns the list with missing capabilities if any.
:param capabilities: one or more features to check for."""

return check_capabilities(capabilities, self.capabilities)

def update_server_info(self) -> None:
"""Updates the capabilities and the Nextcloud version.
*In normal cases, it is called automatically and there is no need to call it manually.*
"""

self._session.update_server_info()

@property
def theme(self) -> Optional[ThemingInfo]:
"""Returns Theme information."""

return get_parsed_theme(self.capabilities["theming"]) if "theming" in self.capabilities else None


Expand All @@ -76,14 +71,12 @@ class Nextcloud(NextcloudBasic):

def __init__(self, **kwargs):
""":param dsdada: ddsdsds"""

self._session = NcSession(**kwargs)
self._init_api(self._session)

@property
def user(self) -> str:
"""Returns current user name."""

return self._session.user


Expand Down Expand Up @@ -114,7 +107,6 @@ def log(self, log_lvl: LogLvl, content: str) -> None:
:param log_lvl: level of the log, content belongs to.
:param content: string to write into the log.
"""

if self.check_capabilities("app_ecosystem_v2"):
return
if int(log_lvl) < self.capabilities["app_ecosystem_v2"].get("loglevel", 0):
Expand All @@ -125,14 +117,12 @@ def log(self, log_lvl: LogLvl, content: str) -> None:

def users_list(self) -> list[str]:
"""Returns list of users on the Nextcloud instance. **Available** only for ``System`` applications."""

return self._session.ocs("GET", path=f"{APP_V2_BASIC_URL}/users", params={"format": "json"})

def scope_allowed(self, scope: ApiScope) -> bool:
"""Check if API scope is avalaible for application.
Useful for applications which declare ``Optional`` scopes, to check if they are allowed for them."""

if self.check_capabilities("app_ecosystem_v2"):
return False
return scope in self.capabilities["app_ecosystem_v2"]["scopes"]
Expand All @@ -143,7 +133,6 @@ def user(self) -> str:
*System Applications* can set it and impersonate the user. For normal applications, it is set automatically.
"""

return self._session.user

@user.setter
Expand All @@ -155,7 +144,6 @@ def user(self, value: str):
@property
def app_cfg(self) -> AppConfig:
"""Returns deploy config, with AppEcosystem version, Application version and name."""

return self._session.cfg

def request_sign_check(self, request: Request) -> bool:
Expand All @@ -165,7 +153,6 @@ def request_sign_check(self, request: Request) -> bool:
.. note:: In most cases ``nc: Annotated[NextcloudApp, Depends(nc_app)]`` should be used.
"""

try:
self._session.sign_check(request)
except ValueError as e:
Expand Down
Loading

0 comments on commit 9862eb0

Please sign in to comment.