diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..91a320f --- /dev/null +++ b/.editorconfig @@ -0,0 +1,23 @@ +root = true + +[*] +end_of_line = lf +insert_final_newline = true +charset = utf-8 +trim_trailing_whitespace = true + +[*.{py,yml}] +indent_style = space +max_line_length = 79 + +[*.py] +indent_size = 4 + +[*.rst] +indent_size = 3 + +[Makefile] +indent_style = tab + +[*.yml] +indent_size = 2 diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 9724267..2b659f5 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -108,6 +108,7 @@ jobs: - name: tox run: tox env: + TOXENV: ${{ matrix.toxenv }} COVERALLS_PARALLEL: 'true' COVERALLS_SERVICE_NAME: github GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/README.rst b/README.rst index 8412949..e28ce8a 100644 --- a/README.rst +++ b/README.rst @@ -37,7 +37,7 @@ Features * Since version 2.0.0 using `caio`_, which contains linux ``libaio`` and two thread-based implementations (c-based and pure-python). -* AIOFile has no internal pointer. You should pass ``offset`` and +* ``AIOFile`` has no internal pointer. You should pass ``offset`` and ``chunk_size`` for each operation or use helpers (Reader or Writer). The simples way is to use ``async_open`` for creating object with file-like interface. @@ -57,7 +57,7 @@ Limitations * Linux native AIO implementation is not able to open special files. Asynchronous operations against special fs like ``/proc/`` ``/sys/`` are not - supported by the kernel. It's not a `aiofile`s or `caio` issue. + supported by the kernel. It's not a ``aiofile`` or ``caio`` issue. In this cases, you might switch to thread-based implementations (see troubleshooting_ section). However, when used on supported file systems, the linux implementation has a @@ -262,6 +262,61 @@ these files using compatible context objects. asyncio.run(main()) + +``clone`` helper +~~~~~~~~~~~~~~~~ + +Asynchronous context at a low level supports a limited number of concurrency operations, no matter how many +file descriptors are open. This means that you can make a second file-like object with its own offset stub +for one descriptor without opening the file several times. + +.. code-block:: python + + """ + This example counts multiple hash functions from the file passed as the first argument. + The hash functions are counted competitively, and the results are printed in the order of hashing completion. + """ + import asyncio + import hashlib + import sys + + import aiofile + + + async def hasher(name, hash_func, afp): + loop = asyncio.get_running_loop() + async for chunk in afp.iter_chunked(2 ** 20): + await loop.run_in_executor(None, hash_func.update, chunk) + print(name, hash_func.hexdigest()) + + + async def main(): + async with aiofile.async_open(sys.argv[1], "rb") as source: + hashers = [ + ("MD5", hashlib.md5()), + ("SHA1", hashlib.sha1()), + ("SHA256", hashlib.sha256()), + ("SHA512", hashlib.sha512()), + ("SHA3 224", hashlib.sha3_224()), + ("SHA3 256", hashlib.sha3_256()), + ("SHA3 512", hashlib.sha3_512()), + ("Blake2b", hashlib.blake2b()), + ] + + await asyncio.gather(*[ + hasher(name, hash_func, await aiofile.clone(source)) + for name, hash_func in hashers + ]) + + + asyncio.run(main()) + + +.. note:: + + In fact this will most likely perform very bad under windows, so if that's your target platform it's not + worth it to apply this optimization. + Low-level API ++++++++++++++ @@ -462,6 +517,19 @@ Async CSV Dict Reader asyncio.run(main()) +Limitations +----------- + +The underlying library ``caio`` uses multiple implementations of asynchronous +IO to ensure that it works across operating systems. + +This imposes a limitation on the mode of working with files; files are +always opened in binary mode, and string encoding and decoding takes place +in the library, not in the operating system. + +Thus the presence of ``b`` in the file opening mode determines the behavior +of the ``read`` and ``write`` functions, while the file descriptor will +always be opened in binary mode. .. _troubleshooting: diff --git a/aiofile/__init__.py b/aiofile/__init__.py index 50c257d..d99494d 100644 --- a/aiofile/__init__.py +++ b/aiofile/__init__.py @@ -1,7 +1,7 @@ from .aio import AIOFile from .utils import ( - BinaryFileWrapper, FileIOWrapperBase, LineReader, Reader, TextFileWrapper, - Writer, async_open, + BinaryFileWrapper, FileIOCloner, FileIOWrapperBase, LineReader, Reader, + TextFileWrapper, Writer, async_open, clone, ) from .version import ( __author__, __version__, author_info, package_info, package_license, @@ -13,6 +13,7 @@ "AIOFile", "BinaryFileWrapper", "FileIOWrapperBase", + "FileIOCloner", "LineReader", "Reader", "TextFileWrapper", @@ -21,6 +22,7 @@ "__version__", "async_open", "author_info", + "clone", "package_info", "package_license", "project_home", diff --git a/aiofile/aio.py b/aiofile/aio.py index c9e542b..eca5051 100644 --- a/aiofile/aio.py +++ b/aiofile/aio.py @@ -1,13 +1,14 @@ import asyncio import os -from collections import namedtuple +import sys +import warnings from concurrent.futures import Executor from functools import partial from os import strerror -from pathlib import Path +from pathlib import PurePath from typing import ( - Any, Awaitable, BinaryIO, Callable, Dict, Generator, Optional, TextIO, - TypeVar, Union, + IO, Any, Awaitable, BinaryIO, Callable, Dict, Generator, NamedTuple, + Optional, TextIO, TypeVar, Union, ) from weakref import finalize @@ -20,140 +21,128 @@ AIO_FILE_NOT_OPENED = -1 AIO_FILE_CLOSED = -2 -FileIOType = Union[TextIO, BinaryIO] - -FileMode = namedtuple( - "FileMode", ( - "readable", - "writable", - "plus", - "appending", - "created", - "flags", - "binary", - ), -) - - -def parse_mode(mode: str) -> FileMode: # noqa: C901 - """ Rewritten from `cpython fileno`_ - - .. _cpython fileio: https://bit.ly/2JY2cnp - """ - - flags = os.O_RDONLY - - rwa = False - writable = False - readable = False - plus = False - appending = False - created = False - binary = False - - for m in mode: - if m == "x": - rwa = True - created = True - writable = True - flags |= os.O_EXCL | os.O_CREAT - - if m == "r": - if rwa: - raise Exception("Bad mode") - - rwa = True - readable = True - - if m == "w": - if rwa: - raise Exception("Bad mode") - - rwa = True - writable = True - - flags |= os.O_CREAT | os.O_TRUNC - - if m == "a": - if rwa: - raise Exception("Bad mode") - rwa = True - writable = True - appending = True - flags |= os.O_CREAT | os.O_APPEND +FileIOType = Union[TextIO, BinaryIO, IO] - if m == "+": - if plus: - raise Exception("Bad mode") - readable = True - writable = True - plus = True - if m == "b": - binary = True - if hasattr(os, "O_BINARY"): - flags |= os.O_BINARY +class FileMode(NamedTuple): + readable: bool + writable: bool + plus: bool + appending: bool + created: bool + flags: int + binary: bool - if readable and writable: - flags |= os.O_RDWR - - elif readable: - flags |= os.O_RDONLY - else: - flags |= os.O_WRONLY - - return FileMode( - readable=readable, - writable=writable, - plus=plus, - appending=appending, - created=created, - flags=flags, - binary=binary, - ) + @classmethod + def parse(cls, mode: str) -> "FileMode": # noqa: C901 + """ Rewritten from `cpython fileno`_ + + .. _cpython fileio: https://bit.ly/2JY2cnp + """ + + flags = os.O_RDONLY + + rwa = False + writable = False + readable = False + plus = False + appending = False + created = False + binary = False + + for m in mode: + if m == "x": + rwa = True + created = True + writable = True + flags |= os.O_EXCL | os.O_CREAT + + if m == "r": + if rwa: + raise Exception("Bad mode") + + rwa = True + readable = True + + if m == "w": + if rwa: + raise Exception("Bad mode") + + rwa = True + writable = True + + flags |= os.O_CREAT | os.O_TRUNC + + if m == "a": + if rwa: + raise Exception("Bad mode") + rwa = True + writable = True + appending = True + flags |= os.O_CREAT | os.O_APPEND + + if m == "+": + if plus: + raise Exception("Bad mode") + readable = True + writable = True + plus = True + + if m == "b": + binary = True + + if hasattr(os, "O_BINARY"): + # always add the binary flag because the asynchronous + # API only works with bytes, we must always open the + # file in binary mode. + flags |= os.O_BINARY + + if readable and writable: + flags |= os.O_RDWR + + elif readable: + flags |= os.O_RDONLY + else: + flags |= os.O_WRONLY + + return cls( + readable=readable, + writable=writable, + plus=plus, + appending=appending, + created=created, + flags=flags, + binary=binary, + ) class AIOFile: - _file_obj: Optional[FileIOType] - _file_obj_owner: bool + _fileno: int _encoding: str _executor: Optional[Executor] mode: FileMode - __open_result: "Optional[asyncio.Future[FileIOType]]" def __init__( - self, filename: Union[str, Path], - mode: str = "r", encoding: str = "utf-8", + self, file_specifier: Union[str, PurePath, FileIOType], + mode: str = '', encoding: str = sys.getdefaultencoding(), context: Optional[AsyncioContextBase] = None, executor: Optional[Executor] = None, ): self.__context = context or get_default_context() - self.__open_result = None + self.__file_specifier = file_specifier - self._fname = str(filename) - self._open_mode = mode - - self.mode = parse_mode(mode) + if isinstance(self.__file_specifier, (str, PurePath)): + self._fname = str(self.__file_specifier) + self.mode = FileMode.parse(mode or "r") + else: + self._fname = self.__file_specifier.name + self.mode = FileMode.parse(mode or self.__file_specifier.mode) - self._file_obj = None - self._file_obj_owner = True + self._fileno = -1 self._encoding = encoding self._executor = executor - - @classmethod - def from_fp(cls, fp: FileIOType, **kwargs: Any) -> "AIOFile": - afp = cls(fp.name, fp.mode, **kwargs) - afp._file_obj = fp - afp._open_mode = fp.mode - afp._file_obj_owner = False - return afp - - def _run_in_thread( - self, func: "Callable[..., _T]", *args: Any, **kwargs: Any - ) -> "asyncio.Future[_T]": - return self.__context.loop.run_in_executor( - self._executor, partial(func, *args, **kwargs), - ) + self._lock = asyncio.Lock() + self._clones = 0 @property def name(self) -> str: @@ -167,41 +156,83 @@ def loop(self) -> asyncio.AbstractEventLoop: def encoding(self) -> str: return self._encoding - async def open(self) -> Optional[int]: - if self._file_obj is not None: - if self._file_obj.closed: - raise asyncio.InvalidStateError("AIOFile closed") - return None - - if self.__open_result is None: - self.__open_result = self._run_in_thread( - open, - self._fname, - self._open_mode, - ) - self._file_obj = await self.__open_result - self.__open_result = None - return self._file_obj.fileno() + @classmethod + def from_fp(cls, fp: FileIOType) -> "AIOFile": + warnings.warn( + "Classmethod is deprecated. Do not use this anymore. " + "Just pass file-like as a first argument", + DeprecationWarning + ) + return cls(fp, mode=fp.mode) + + if hasattr(os, 'O_BINARY'): + # In windows, the file may already be opened in text mode, and you + # will have to reopen it in binary mode. + # Unlike unix windows does not allow you to delete an already + # opened file, so it is relatively safe to open a file by name. + def _open_fp(self, fp: FileIOType) -> int: + return os.open(fp.name, self.mode.flags) + else: + def _open_fp(self, fp: FileIOType) -> int: + return os.dup(fp.fileno()) - await self.__open_result - return None + def _run_in_thread( + self, func: "Callable[..., _T]", *args: Any, **kwargs: Any + ) -> "asyncio.Future[_T]": + return self.__context.loop.run_in_executor( + self._executor, partial(func, *args, **kwargs), + ) + + def __open(self) -> int: + if isinstance(self.__file_specifier, (str, PurePath)): + return os.open(self._fname, self.mode.flags) + + result = self._open_fp(self.__file_specifier) + # remove linked object after first open + self.__file_specifier = self._fname + self.__is_fp = False + return result + + async def open(self) -> Optional[int]: + async with self._lock: + if self._fileno > 0: + return None + self._fileno = await self._run_in_thread(self.__open) + return self._fileno def __repr__(self) -> str: return "" % self._fname async def close(self) -> None: - if self._file_obj is None or not self._file_obj_owner: - return - - if self.mode.writable: - await self.fdsync() - - await self._run_in_thread(self._file_obj.close) + async with self._lock: + if self._fileno < 0: + return + + if self._clones > 0: + self._clones -= 1 + return + + if self.mode.writable: + await self.fdsync() + + await self._run_in_thread(os.close, self._fileno) + self._fileno = -1 + + async def clone(self) -> "AIOFile": + """ + Increases the clone count by one, as long as the clone + count is greater than zero, all ``self.close()`` + calls will only decrease the clone count without + really closing anything. + """ + async with self._lock: + self._clones += 1 + return self def fileno(self) -> int: - if self._file_obj is None: - raise asyncio.InvalidStateError("AIOFile closed") - return self._file_obj.fileno() + if self._fileno < 0: + raise asyncio.InvalidStateError(f"Not opened {self.__class__.__name__}") + return self._fileno def __await__(self) -> Generator[None, Any, "AIOFile"]: yield from self.open().__await__() @@ -218,17 +249,16 @@ async def read(self, size: int = -1, offset: int = 0) -> Union[bytes, str]: data = await self.read_bytes(size, offset) return data if self.mode.binary else self.decode_bytes(data) + async def stat(self) -> os.stat_result: + return await self._run_in_thread(os.fstat, self.fileno()) + async def read_bytes(self, size: int = -1, offset: int = 0) -> bytes: if size < -1: raise ValueError("Unsupported value %d for size" % size) if size == -1: - size = ( - await self._run_in_thread( - os.stat, - self.fileno(), - ) - ).st_size + stat = await self.stat() + size = stat.st_size return await self.__context.read(size, self.fileno(), offset) @@ -299,6 +329,10 @@ def truncate(self, length: int = 0) -> Awaitable[None]: os.ftruncate, self.fileno(), length, ) + def __del__(self) -> None: + if self._fileno > 0: + os.close(self._fileno) + ContextStoreType = Dict[asyncio.AbstractEventLoop, caio.AsyncioContext] DEFAULT_CONTEXT_STORE: ContextStoreType = {} diff --git a/aiofile/utils.py b/aiofile/utils.py index 236672d..461c484 100644 --- a/aiofile/utils.py +++ b/aiofile/utils.py @@ -3,9 +3,9 @@ import io import os from abc import ABC, abstractmethod -from pathlib import Path +from pathlib import PurePath from types import MappingProxyType -from typing import Any, Generator, Tuple, Union +from typing import Any, Generator, Generic, Optional, Tuple, TypeVar, Union from .aio import AIOFile, FileIOType @@ -94,13 +94,14 @@ def __init__(self, aio_file: AIOFile, offset: int = 0): self.__aio_file = aio_file self.__lock = asyncio.Lock() - async def __call__(self, data: Union[str, bytes]) -> None: + async def __call__(self, data: Union[str, bytes]) -> int: async with self.__lock: if isinstance(data, str): data = self.__aio_file.encode_bytes(data) - await self.__aio_file.write_bytes(data, self.__offset) + result = await self.__aio_file.write_bytes(data, self.__offset) self.__offset += len(data) + return result class LineReader(collections.abc.AsyncIterable): @@ -333,15 +334,10 @@ async def readline(self, size: int = -1, newline: str = "\n") -> str: def async_open( - file_specifier: Union[str, Path, FileIOType], - mode: str = "r", *args: Any, **kwargs: Any + file_specifier: Union[str, PurePath, FileIOType], + mode: str = "", *args: Any, **kwargs: Any ) -> Union[BinaryFileWrapper, TextFileWrapper]: - if isinstance(file_specifier, (str, Path)): - afp = AIOFile(str(file_specifier), mode, *args, **kwargs) - else: - if args: - raise ValueError("Arguments denied when IO[Any] opening.") - afp = AIOFile.from_fp(file_specifier, **kwargs) + afp = AIOFile(file_specifier, mode, *args, **kwargs) if not afp.mode.binary: return TextFileWrapper(afp) @@ -349,13 +345,52 @@ def async_open( return BinaryFileWrapper(afp) +T = TypeVar("T", bound=FileIOWrapperBase) + + +class FileIOCloner(Generic[T]): + def __init__(self, file: T): + self.source_afp = file + self.cloned_afp: Optional[T] = None + self.cloned_lock = asyncio.Lock() + + async def __clone(self) -> T: + async with self.cloned_lock: + if self.cloned_afp is not None: + return self.cloned_afp + + self.cloned_afp = self.source_afp.__class__( + await self.source_afp.file.clone(), + ) + return self.cloned_afp + + def __await__(self) -> Generator[Any, None, T]: + return self.__clone().__await__() + + async def __aenter__(self) -> T: + if self.cloned_afp is None: + return await self.__clone() + return self.cloned_afp + + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + if self.cloned_afp is None: + return + await self.cloned_afp.close() + + +def clone(afp: FileIOWrapperBase) -> FileIOCloner[FileIOWrapperBase]: + return FileIOCloner(afp) + + __all__ = ( "BinaryFileWrapper", + "FileIOCloner", "FileIOWrapperBase", "LineReader", "Reader", "TextFileWrapper", "Writer", "async_open", + "clone", "unicode_reader", ) diff --git a/aiofile/version.py b/aiofile/version.py index fa26a58..3557429 100644 --- a/aiofile/version.py +++ b/aiofile/version.py @@ -9,7 +9,7 @@ team_email = "me@mosquito.su" -version_info = (3, 8, 8) +version_info = (3, 9, 0) __author__ = ", ".join("{} <{}>".format(*info) for info in author_info) __version__ = ".".join(map(str, version_info)) diff --git a/setup.py b/setup.py index 86edf5f..42c833b 100644 --- a/setup.py +++ b/setup.py @@ -58,5 +58,7 @@ "coveralls", ], }, - install_requires=["caio~=0.9.0"], + install_requires=[ + "caio~=0.9.0" + ], ) diff --git a/tests/test_aio.py b/tests/test_aio.py index cc34562..876e66e 100644 --- a/tests/test_aio.py +++ b/tests/test_aio.py @@ -12,9 +12,9 @@ import caio import pytest -from aiofile import AIOFile -from aiofile.utils import ( - BinaryFileWrapper, LineReader, Reader, TextFileWrapper, Writer, +from aiofile import ( + AIOFile, BinaryFileWrapper, LineReader, Reader, TextFileWrapper, Writer, + clone, ) from .impl import split_by @@ -212,8 +212,8 @@ async def test_sequential_open(aio_file_maker, temp_file): finally: await file.close() - with pytest.raises(asyncio.InvalidStateError): - await file.open() + assert await file.open() is not None + await file.close() async def test_parallel_open(aio_file_maker, temp_file): @@ -232,7 +232,7 @@ async def test_line_reader(aio_file_maker, temp_file, uuid): writer = Writer(afp) - max_length = 1000 + max_length = 10 chunk = b64encode(os.urandom(max_length)).decode() lines = [chunk[:i] for i in range(max_length)] @@ -284,8 +284,8 @@ async def test_truncate(aio_file_maker, temp_file): @pytest.mark.parametrize("size", [1, 2, 3, 5, 10, 20, 100, 1000, 2000, 5000]) -async def test_modes(size, aio_file_maker, tmpdir): - tmpfile = tmpdir.join("test.txt") +async def test_modes(size, aio_file_maker, tmp_path): + tmpfile = tmp_path / "test.txt" async with aio_file_maker(tmpfile, "w") as afp: await afp.write("foo") @@ -302,7 +302,7 @@ async def test_modes(size, aio_file_maker, tmpdir): data = dict((str(i), i)for i in range(size)) - tmpfile = tmpdir.join("test.json") + tmpfile = tmp_path / "test.json" async with aio_file_maker(tmpfile, "w") as afp: await afp.write(json.dumps(data, indent=1)) @@ -626,3 +626,58 @@ async def test_open_non_existent_file_with_append( numbers.append(int(line.strip())) assert numbers == list(range(20)) + + +async def test_clone( + async_open, tmp_path: Path, +): + tmp_fpath = tmp_path / "test.txt" + + async with async_open(tmp_fpath, "w") as afp: + for i in range(1000, 1003): + await afp.write(str(i)) + await afp.write("\n") + + async with async_open(tmp_fpath, "r") as afp: + assert await afp.read(5) == "1000\n" + + async with clone(afp) as cloned: + assert await cloned.read(5) == "1000\n" + + async with clone(afp) as cloned2: + assert await cloned2.read(5) == "1000\n" + + assert await cloned.read(5) == await afp.read(5) == "1001\n" + + +async def test_clone_close( + async_open, tmp_path: Path, +): + tmp_fpath = tmp_path / "test.txt" + + async with async_open(tmp_fpath, "w") as afp: + for i in range(10): + await afp.write(str(i)) + await afp.write("\n") + + async with async_open(tmp_fpath, "r") as afp: + await afp.read(10) + + afp_clone = await clone(afp) + await afp_clone.close() + + assert await afp.read() + + async with async_open(tmp_fpath, "r") as afp: + await afp.read(10) + + afp_clone = await clone(afp) + await afp_clone.close() + + assert await afp.read() + + async with async_open(tmp_fpath, "r") as afp: + await afp.read(10) + + async with clone(afp): + assert await afp.read()