From 67b1c1ad5da726dcda292f6a4a368c194ac02327 Mon Sep 17 00:00:00 2001 From: Dmitry Orlov Date: Sat, 19 Aug 2023 01:47:49 +0300 Subject: [PATCH 01/17] add clone --- README.rst | 55 ++++++++++++++++++++++++++++++++++++++++ aiofile/__init__.py | 6 +++-- aiofile/aio.py | 16 ++++++++++++ aiofile/utils.py | 39 ++++++++++++++++++++++++++++- aiofile/version.py | 2 +- tests/test_aio.py | 61 ++++++++++++++++++++++++++++++++++++++++++--- 6 files changed, 172 insertions(+), 7 deletions(-) diff --git a/README.rst b/README.rst index 8412949..ca1320f 100644 --- a/README.rst +++ b/README.rst @@ -262,6 +262,61 @@ these files using compatible context objects. asyncio.run(main()) + +``clone`` helper +~~~~~~~~~~~~~~~~ + +Asynchronous context at a low level supports a limited number of concurrency operations, no matter how many +file descriptors are open. This means that you can make a second file-like object with its own offset stub +for one descriptor without opening the file several times. + +.. code-block:: python + + """ + This example counts multiple hash functions from the file passed as the first argument. + The hash functions are counted competitively, and the results are printed in the order of hashing completion. + """ + import asyncio + import hashlib + import sys + + import aiofile + + + async def hasher(name, hash_func, afp): + loop = asyncio.get_running_loop() + async for chunk in afp.iter_chunked(2 ** 20): + await loop.run_in_executor(None, hash_func.update, chunk) + print(name, hash_func.hexdigest()) + + + async def main(): + async with aiofile.async_open(sys.argv[1], "rb") as source: + hashers = [ + ("MD5", hashlib.md5()), + ("SHA1", hashlib.sha1()), + ("SHA256", hashlib.sha256()), + ("SHA512", hashlib.sha512()), + ("SHA3 224", hashlib.sha3_224()), + ("SHA3 256", hashlib.sha3_256()), + ("SHA3 512", hashlib.sha3_512()), + ("Blake2b", hashlib.blake2b()), + ] + + await asyncio.gather(*[ + hasher(name, hash_func, await aiofile.clone(source)) + for name, hash_func in hashers + ]) + + + asyncio.run(main()) + + +.. note:: + + In fact this will most likely perform very bad under windows, so if that's your target platform it's not + worth it to apply this optimization. + Low-level API ++++++++++++++ diff --git a/aiofile/__init__.py b/aiofile/__init__.py index 50c257d..d99494d 100644 --- a/aiofile/__init__.py +++ b/aiofile/__init__.py @@ -1,7 +1,7 @@ from .aio import AIOFile from .utils import ( - BinaryFileWrapper, FileIOWrapperBase, LineReader, Reader, TextFileWrapper, - Writer, async_open, + BinaryFileWrapper, FileIOCloner, FileIOWrapperBase, LineReader, Reader, + TextFileWrapper, Writer, async_open, clone, ) from .version import ( __author__, __version__, author_info, package_info, package_license, @@ -13,6 +13,7 @@ "AIOFile", "BinaryFileWrapper", "FileIOWrapperBase", + "FileIOCloner", "LineReader", "Reader", "TextFileWrapper", @@ -21,6 +22,7 @@ "__version__", "async_open", "author_info", + "clone", "package_info", "package_license", "project_home", diff --git a/aiofile/aio.py b/aiofile/aio.py index c9e542b..0273c67 100644 --- a/aiofile/aio.py +++ b/aiofile/aio.py @@ -139,6 +139,8 @@ def __init__( self._file_obj_owner = True self._encoding = encoding self._executor = executor + self._clone_lock = asyncio.Lock() + self._clones = 0 @classmethod def from_fp(cls, fp: FileIOType, **kwargs: Any) -> "AIOFile": @@ -193,11 +195,25 @@ async def close(self) -> None: if self._file_obj is None or not self._file_obj_owner: return + async with self._clone_lock: + if self._clones > 0: + self._clones -= 1 + return + if self.mode.writable: await self.fdsync() await self._run_in_thread(self._file_obj.close) + async def clone(self) -> "AIOFile": + """ + Increases the clone count by one, as long as the clone count is greater than zero, all ``self.close()`` + calls will only decrease the clone count without really closing anything. + """ + async with self._clone_lock: + self._clones += 1 + return self + def fileno(self) -> int: if self._file_obj is None: raise asyncio.InvalidStateError("AIOFile closed") diff --git a/aiofile/utils.py b/aiofile/utils.py index 236672d..166a714 100644 --- a/aiofile/utils.py +++ b/aiofile/utils.py @@ -5,7 +5,7 @@ from abc import ABC, abstractmethod from pathlib import Path from types import MappingProxyType -from typing import Any, Generator, Tuple, Union +from typing import Any, Generator, Generic, Optional, Tuple, TypeVar, Union from .aio import AIOFile, FileIOType @@ -349,13 +349,50 @@ def async_open( return BinaryFileWrapper(afp) +T = TypeVar("T", bound=FileIOWrapperBase) + + +class FileIOCloner(Generic[T]): + def __init__(self, file: T): + self.source_afp = file + self.cloned_afp: Optional[T] = None + self.cloned_lock = asyncio.Lock() + + async def __clone(self): + async with self.cloned_lock: + if self.cloned_afp is not None: + return + + self.cloned_afp = self.source_afp.__class__( + await self.source_afp.file.clone(), + ) + return self.cloned_afp + + def __await__(self) -> Generator[Any, None, T]: + return self.__clone().__await__() + + async def __aenter__(self) -> T: + if self.cloned_afp is None: + await self.__clone() + return self.cloned_afp + + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + await self.cloned_afp.close() + + +def clone(afp: FileIOWrapperBase) -> FileIOCloner[FileIOWrapperBase]: + return FileIOCloner(afp) + + __all__ = ( "BinaryFileWrapper", + "FileIOCloner", "FileIOWrapperBase", "LineReader", "Reader", "TextFileWrapper", "Writer", "async_open", + "clone", "unicode_reader", ) diff --git a/aiofile/version.py b/aiofile/version.py index fa26a58..3557429 100644 --- a/aiofile/version.py +++ b/aiofile/version.py @@ -9,7 +9,7 @@ team_email = "me@mosquito.su" -version_info = (3, 8, 8) +version_info = (3, 9, 0) __author__ = ", ".join("{} <{}>".format(*info) for info in author_info) __version__ = ".".join(map(str, version_info)) diff --git a/tests/test_aio.py b/tests/test_aio.py index cc34562..16eaa69 100644 --- a/tests/test_aio.py +++ b/tests/test_aio.py @@ -12,9 +12,9 @@ import caio import pytest -from aiofile import AIOFile -from aiofile.utils import ( - BinaryFileWrapper, LineReader, Reader, TextFileWrapper, Writer, +from aiofile import ( + AIOFile, BinaryFileWrapper, LineReader, Reader, TextFileWrapper, Writer, + clone, ) from .impl import split_by @@ -626,3 +626,58 @@ async def test_open_non_existent_file_with_append( numbers.append(int(line.strip())) assert numbers == list(range(20)) + + +async def test_clone( + async_open, tmp_path: Path, +): + tmp_fpath = tmp_path / "test.txt" + + async with async_open(tmp_fpath, "w") as afp: + for i in range(1000, 1003): + await afp.write(str(i)) + await afp.write("\n") + + async with async_open(tmp_fpath, "r") as afp: + assert await afp.read(5) == "1000\n" + + async with clone(afp) as cloned: + assert await cloned.read(5) == "1000\n" + + async with clone(afp) as cloned2: + assert await cloned2.read(5) == "1000\n" + + assert await cloned.read(5) == await afp.read(5) == "1001\n" + + +async def test_clone_close( + async_open, tmp_path: Path, +): + tmp_fpath = tmp_path / "test.txt" + + async with async_open(tmp_fpath, "w") as afp: + for i in range(10): + await afp.write(str(i)) + await afp.write("\n") + + async with async_open(tmp_fpath, "r") as afp: + await afp.read(10) + + afp_clone = await clone(afp) + await afp_clone.close() + + assert await afp.read() + + async with async_open(tmp_fpath, "r") as afp: + await afp.read(10) + + afp_clone = await clone(afp) + await afp_clone.close() + + assert await afp.read() + + async with async_open(tmp_fpath, "r") as afp: + await afp.read(10) + + async with clone(afp): + assert await afp.read() From c5ef8de19a9c5d70e63126d71f2fa77a019c6d9e Mon Sep 17 00:00:00 2001 From: Dmitry Orlov Date: Sat, 19 Aug 2023 01:48:04 +0300 Subject: [PATCH 02/17] use NamedTuple --- aiofile/aio.py | 183 ++++++++++++++++++++++++------------------------- setup.py | 4 +- 2 files changed, 93 insertions(+), 94 deletions(-) diff --git a/aiofile/aio.py b/aiofile/aio.py index 0273c67..0d34bd6 100644 --- a/aiofile/aio.py +++ b/aiofile/aio.py @@ -1,13 +1,12 @@ import asyncio import os -from collections import namedtuple from concurrent.futures import Executor from functools import partial from os import strerror from pathlib import Path from typing import ( - Any, Awaitable, BinaryIO, Callable, Dict, Generator, Optional, TextIO, - TypeVar, Union, + IO, Any, Awaitable, BinaryIO, Callable, Dict, Generator, Optional, TextIO, + TypeVar, Union, NamedTuple, ) from weakref import finalize @@ -20,97 +19,95 @@ AIO_FILE_NOT_OPENED = -1 AIO_FILE_CLOSED = -2 -FileIOType = Union[TextIO, BinaryIO] - -FileMode = namedtuple( - "FileMode", ( - "readable", - "writable", - "plus", - "appending", - "created", - "flags", - "binary", - ), -) +FileIOType = Union[TextIO, BinaryIO, IO] + + +class FileMode(NamedTuple): + readable: bool + writable: bool + plus: bool + appending: bool + created: bool + flags: int + binary: bool + + @classmethod + def parse(cls, mode) -> "FileMode": + """ Rewritten from `cpython fileno`_ + .. _cpython fileio: https://bit.ly/2JY2cnp + """ -def parse_mode(mode: str) -> FileMode: # noqa: C901 - """ Rewritten from `cpython fileno`_ - - .. _cpython fileio: https://bit.ly/2JY2cnp - """ - - flags = os.O_RDONLY - - rwa = False - writable = False - readable = False - plus = False - appending = False - created = False - binary = False - - for m in mode: - if m == "x": - rwa = True - created = True - writable = True - flags |= os.O_EXCL | os.O_CREAT - - if m == "r": - if rwa: - raise Exception("Bad mode") - - rwa = True - readable = True - - if m == "w": - if rwa: - raise Exception("Bad mode") - - rwa = True - writable = True - - flags |= os.O_CREAT | os.O_TRUNC - - if m == "a": - if rwa: - raise Exception("Bad mode") - rwa = True - writable = True - appending = True - flags |= os.O_CREAT | os.O_APPEND - - if m == "+": - if plus: - raise Exception("Bad mode") - readable = True - writable = True - plus = True - - if m == "b": - binary = True - if hasattr(os, "O_BINARY"): - flags |= os.O_BINARY - - if readable and writable: - flags |= os.O_RDWR - - elif readable: - flags |= os.O_RDONLY - else: - flags |= os.O_WRONLY - - return FileMode( - readable=readable, - writable=writable, - plus=plus, - appending=appending, - created=created, - flags=flags, - binary=binary, - ) + flags = os.O_RDONLY + + rwa = False + writable = False + readable = False + plus = False + appending = False + created = False + binary = False + + for m in mode: + if m == "x": + rwa = True + created = True + writable = True + flags |= os.O_EXCL | os.O_CREAT + + if m == "r": + if rwa: + raise Exception("Bad mode") + + rwa = True + readable = True + + if m == "w": + if rwa: + raise Exception("Bad mode") + + rwa = True + writable = True + + flags |= os.O_CREAT | os.O_TRUNC + + if m == "a": + if rwa: + raise Exception("Bad mode") + rwa = True + writable = True + appending = True + flags |= os.O_CREAT | os.O_APPEND + + if m == "+": + if plus: + raise Exception("Bad mode") + readable = True + writable = True + plus = True + + if m == "b": + binary = True + if hasattr(os, "O_BINARY"): + flags |= os.O_BINARY + + if readable and writable: + flags |= os.O_RDWR + + elif readable: + flags |= os.O_RDONLY + else: + flags |= os.O_WRONLY + + return cls( + readable=readable, + writable=writable, + plus=plus, + appending=appending, + created=created, + flags=flags, + binary=binary, + ) class AIOFile: @@ -133,7 +130,7 @@ def __init__( self._fname = str(filename) self._open_mode = mode - self.mode = parse_mode(mode) + self.mode = FileMode.parse(mode) self._file_obj = None self._file_obj_owner = True diff --git a/setup.py b/setup.py index 86edf5f..42c833b 100644 --- a/setup.py +++ b/setup.py @@ -58,5 +58,7 @@ "coveralls", ], }, - install_requires=["caio~=0.9.0"], + install_requires=[ + "caio~=0.9.0" + ], ) From d9fed2c4378701be20cca9944b0be15fd4fe6dd2 Mon Sep 17 00:00:00 2001 From: Dmitry Orlov Date: Sat, 19 Aug 2023 01:48:23 +0300 Subject: [PATCH 03/17] reformat --- aiofile/aio.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aiofile/aio.py b/aiofile/aio.py index 0d34bd6..074c2ca 100644 --- a/aiofile/aio.py +++ b/aiofile/aio.py @@ -5,8 +5,8 @@ from os import strerror from pathlib import Path from typing import ( - IO, Any, Awaitable, BinaryIO, Callable, Dict, Generator, Optional, TextIO, - TypeVar, Union, NamedTuple, + IO, Any, Awaitable, BinaryIO, Callable, Dict, Generator, NamedTuple, + Optional, TextIO, TypeVar, Union, ) from weakref import finalize From 26aee325d9cc177d489091a69679062ac58dee39 Mon Sep 17 00:00:00 2001 From: Dmitry Orlov Date: Sat, 19 Aug 2023 01:52:47 +0300 Subject: [PATCH 04/17] mypy fixes --- aiofile/aio.py | 2 +- aiofile/utils.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/aiofile/aio.py b/aiofile/aio.py index 074c2ca..688c5a5 100644 --- a/aiofile/aio.py +++ b/aiofile/aio.py @@ -32,7 +32,7 @@ class FileMode(NamedTuple): binary: bool @classmethod - def parse(cls, mode) -> "FileMode": + def parse(cls, mode: str) -> "FileMode": """ Rewritten from `cpython fileno`_ .. _cpython fileio: https://bit.ly/2JY2cnp diff --git a/aiofile/utils.py b/aiofile/utils.py index 166a714..325a970 100644 --- a/aiofile/utils.py +++ b/aiofile/utils.py @@ -358,10 +358,10 @@ def __init__(self, file: T): self.cloned_afp: Optional[T] = None self.cloned_lock = asyncio.Lock() - async def __clone(self): + async def __clone(self) -> T: async with self.cloned_lock: if self.cloned_afp is not None: - return + return self.cloned_afp self.cloned_afp = self.source_afp.__class__( await self.source_afp.file.clone(), @@ -373,10 +373,12 @@ def __await__(self) -> Generator[Any, None, T]: async def __aenter__(self) -> T: if self.cloned_afp is None: - await self.__clone() + return await self.__clone() return self.cloned_afp async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + if self.cloned_afp is None: + return await self.cloned_afp.close() From 11e6d982b1c30e00a7cc37534f70e4719376b52b Mon Sep 17 00:00:00 2001 From: Dmitry Orlov Date: Sat, 19 Aug 2023 01:54:00 +0300 Subject: [PATCH 05/17] pylama fixes --- aiofile/aio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiofile/aio.py b/aiofile/aio.py index 688c5a5..4f239eb 100644 --- a/aiofile/aio.py +++ b/aiofile/aio.py @@ -32,7 +32,7 @@ class FileMode(NamedTuple): binary: bool @classmethod - def parse(cls, mode: str) -> "FileMode": + def parse(cls, mode: str) -> "FileMode": # noqa: C901 """ Rewritten from `cpython fileno`_ .. _cpython fileio: https://bit.ly/2JY2cnp From da22fdf2d14fb29dc364c6a9b65a29e21f152338 Mon Sep 17 00:00:00 2001 From: Dmitry Orlov Date: Sat, 19 Aug 2023 01:54:37 +0300 Subject: [PATCH 06/17] pylama fixes --- aiofile/aio.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/aiofile/aio.py b/aiofile/aio.py index 4f239eb..29f821b 100644 --- a/aiofile/aio.py +++ b/aiofile/aio.py @@ -204,8 +204,10 @@ async def close(self) -> None: async def clone(self) -> "AIOFile": """ - Increases the clone count by one, as long as the clone count is greater than zero, all ``self.close()`` - calls will only decrease the clone count without really closing anything. + Increases the clone count by one, as long as the clone + count is greater than zero, all ``self.close()`` + calls will only decrease the clone count without + really closing anything. """ async with self._clone_lock: self._clones += 1 From cda62fa67729f2791df77d11cf480bf6c7d4fb8f Mon Sep 17 00:00:00 2001 From: Dmitry Orlov Date: Sat, 19 Aug 2023 02:02:22 +0300 Subject: [PATCH 07/17] add toxenv --- .github/workflows/tests.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 9724267..2b659f5 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -108,6 +108,7 @@ jobs: - name: tox run: tox env: + TOXENV: ${{ matrix.toxenv }} COVERALLS_PARALLEL: 'true' COVERALLS_SERVICE_NAME: github GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} From 0a405374163ac0fa5e81f4dbd53abe820f9a57f6 Mon Sep 17 00:00:00 2001 From: Dmitry Orlov Date: Tue, 22 Aug 2023 16:18:20 +0300 Subject: [PATCH 08/17] do not store IO object use os.open instead --- aiofile/aio.py | 63 ++++++++++++++++++++--------------------------- tests/test_aio.py | 4 +-- 2 files changed, 29 insertions(+), 38 deletions(-) diff --git a/aiofile/aio.py b/aiofile/aio.py index 29f821b..b6ac48c 100644 --- a/aiofile/aio.py +++ b/aiofile/aio.py @@ -111,12 +111,10 @@ def parse(cls, mode: str) -> "FileMode": # noqa: C901 class AIOFile: - _file_obj: Optional[FileIOType] - _file_obj_owner: bool + _fileno: int _encoding: str _executor: Optional[Executor] mode: FileMode - __open_result: "Optional[asyncio.Future[FileIOType]]" def __init__( self, filename: Union[str, Path], @@ -125,26 +123,21 @@ def __init__( executor: Optional[Executor] = None, ): self.__context = context or get_default_context() - self.__open_result = None self._fname = str(filename) - self._open_mode = mode self.mode = FileMode.parse(mode) - self._file_obj = None - self._file_obj_owner = True + self._fileno = -1 self._encoding = encoding self._executor = executor - self._clone_lock = asyncio.Lock() + self._lock = asyncio.Lock() self._clones = 0 @classmethod def from_fp(cls, fp: FileIOType, **kwargs: Any) -> "AIOFile": afp = cls(fp.name, fp.mode, **kwargs) - afp._file_obj = fp - afp._open_mode = fp.mode - afp._file_obj_owner = False + afp._fileno = os.dup(fp.fileno()) return afp def _run_in_thread( @@ -167,40 +160,34 @@ def encoding(self) -> str: return self._encoding async def open(self) -> Optional[int]: - if self._file_obj is not None: - if self._file_obj.closed: - raise asyncio.InvalidStateError("AIOFile closed") - return None - - if self.__open_result is None: - self.__open_result = self._run_in_thread( - open, + async with self._lock: + if self._fileno > 0: + return None + + self._fileno = await self._run_in_thread( + os.open, self._fname, - self._open_mode, + self.mode.flags, ) - self._file_obj = await self.__open_result - self.__open_result = None - return self._file_obj.fileno() - - await self.__open_result - return None + return self._fileno def __repr__(self) -> str: return "" % self._fname async def close(self) -> None: - if self._file_obj is None or not self._file_obj_owner: - return + async with self._lock: + if self._fileno < 0: + return - async with self._clone_lock: if self._clones > 0: self._clones -= 1 return - if self.mode.writable: - await self.fdsync() + if self.mode.writable: + await self.fdsync() - await self._run_in_thread(self._file_obj.close) + await self._run_in_thread(os.close, self._fileno) + self._fileno = -1 async def clone(self) -> "AIOFile": """ @@ -209,14 +196,14 @@ async def clone(self) -> "AIOFile": calls will only decrease the clone count without really closing anything. """ - async with self._clone_lock: + async with self._lock: self._clones += 1 return self def fileno(self) -> int: - if self._file_obj is None: - raise asyncio.InvalidStateError("AIOFile closed") - return self._file_obj.fileno() + if self._fileno < 0: + raise asyncio.InvalidStateError(f"Not opened {self.__class__.__name__}") + return self._fileno def __await__(self) -> Generator[None, Any, "AIOFile"]: yield from self.open().__await__() @@ -314,6 +301,10 @@ def truncate(self, length: int = 0) -> Awaitable[None]: os.ftruncate, self.fileno(), length, ) + def __del__(self) -> None: + if self._fileno > 0: + os.close(self._fileno) + ContextStoreType = Dict[asyncio.AbstractEventLoop, caio.AsyncioContext] DEFAULT_CONTEXT_STORE: ContextStoreType = {} diff --git a/tests/test_aio.py b/tests/test_aio.py index 16eaa69..08c98a1 100644 --- a/tests/test_aio.py +++ b/tests/test_aio.py @@ -212,8 +212,8 @@ async def test_sequential_open(aio_file_maker, temp_file): finally: await file.close() - with pytest.raises(asyncio.InvalidStateError): - await file.open() + assert await file.open() is not None + await file.close() async def test_parallel_open(aio_file_maker, temp_file): From 9222e1a1245cc04c8b84fbdb46ec5ee6f7e4b9ec Mon Sep 17 00:00:00 2001 From: Dmitry Orlov Date: Tue, 22 Aug 2023 16:42:45 +0300 Subject: [PATCH 09/17] fixes --- aiofile/aio.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/aiofile/aio.py b/aiofile/aio.py index b6ac48c..ec3451b 100644 --- a/aiofile/aio.py +++ b/aiofile/aio.py @@ -1,5 +1,6 @@ import asyncio import os +import sys from concurrent.futures import Executor from functools import partial from os import strerror @@ -118,7 +119,7 @@ class AIOFile: def __init__( self, filename: Union[str, Path], - mode: str = "r", encoding: str = "utf-8", + mode: str = "r", encoding: str = sys.getdefaultencoding(), context: Optional[AsyncioContextBase] = None, executor: Optional[Executor] = None, ): @@ -159,16 +160,14 @@ def loop(self) -> asyncio.AbstractEventLoop: def encoding(self) -> str: return self._encoding + def __open(self) -> int: + return os.open(self._fname, self.mode.flags) + async def open(self) -> Optional[int]: async with self._lock: if self._fileno > 0: return None - - self._fileno = await self._run_in_thread( - os.open, - self._fname, - self.mode.flags, - ) + self._fileno = await self._run_in_thread(self.__open) return self._fileno def __repr__(self) -> str: @@ -220,17 +219,16 @@ async def read(self, size: int = -1, offset: int = 0) -> Union[bytes, str]: data = await self.read_bytes(size, offset) return data if self.mode.binary else self.decode_bytes(data) + async def stat(self) -> os.stat_result: + return await self._run_in_thread(os.fstat, self.fileno()) + async def read_bytes(self, size: int = -1, offset: int = 0) -> bytes: if size < -1: raise ValueError("Unsupported value %d for size" % size) if size == -1: - size = ( - await self._run_in_thread( - os.stat, - self.fileno(), - ) - ).st_size + stat = await self.stat() + size = stat.st_size return await self.__context.read(size, self.fileno(), offset) From d95295259e8d12c3c369f31ff03c206a7a31129e Mon Sep 17 00:00:00 2001 From: Dmitry Orlov Date: Tue, 22 Aug 2023 18:19:38 +0300 Subject: [PATCH 10/17] always open in binary mode --- aiofile/aio.py | 10 ++++++++-- aiofile/utils.py | 5 +++-- tests/test_aio.py | 4 ++-- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/aiofile/aio.py b/aiofile/aio.py index ec3451b..1088407 100644 --- a/aiofile/aio.py +++ b/aiofile/aio.py @@ -23,6 +23,9 @@ FileIOType = Union[TextIO, BinaryIO, IO] + + + class FileMode(NamedTuple): readable: bool writable: bool @@ -89,8 +92,11 @@ def parse(cls, mode: str) -> "FileMode": # noqa: C901 if m == "b": binary = True - if hasattr(os, "O_BINARY"): - flags |= os.O_BINARY + + # always add the binary flag because the asynchronous API only works with bytes, + # we must always open the file in binary mode. + if hasattr(os, "O_BINARY"): + flags |= os.O_BINARY if readable and writable: flags |= os.O_RDWR diff --git a/aiofile/utils.py b/aiofile/utils.py index 325a970..123ae08 100644 --- a/aiofile/utils.py +++ b/aiofile/utils.py @@ -94,13 +94,14 @@ def __init__(self, aio_file: AIOFile, offset: int = 0): self.__aio_file = aio_file self.__lock = asyncio.Lock() - async def __call__(self, data: Union[str, bytes]) -> None: + async def __call__(self, data: Union[str, bytes]) -> int: async with self.__lock: if isinstance(data, str): data = self.__aio_file.encode_bytes(data) - await self.__aio_file.write_bytes(data, self.__offset) + result = await self.__aio_file.write_bytes(data, self.__offset) self.__offset += len(data) + return result class LineReader(collections.abc.AsyncIterable): diff --git a/tests/test_aio.py b/tests/test_aio.py index 08c98a1..45094a9 100644 --- a/tests/test_aio.py +++ b/tests/test_aio.py @@ -232,13 +232,13 @@ async def test_line_reader(aio_file_maker, temp_file, uuid): writer = Writer(afp) - max_length = 1000 + max_length = 10 chunk = b64encode(os.urandom(max_length)).decode() lines = [chunk[:i] for i in range(max_length)] for line in lines: await writer(line) - await writer("\n") + print(await writer("\n")) await afp.fsync() read_lines = [] From 54d71edf8de21dba7f6e750c1b95ef152126ae3c Mon Sep 17 00:00:00 2001 From: Dmitry Orlov Date: Tue, 22 Aug 2023 19:05:51 +0300 Subject: [PATCH 11/17] from_fp is now async def --- .editorconfig | 23 +++++++++++++++++++++++ aiofile/aio.py | 35 +++++++++++++++++++++++------------ tests/test_aio.py | 10 ++++++++++ 3 files changed, 56 insertions(+), 12 deletions(-) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..91a320f --- /dev/null +++ b/.editorconfig @@ -0,0 +1,23 @@ +root = true + +[*] +end_of_line = lf +insert_final_newline = true +charset = utf-8 +trim_trailing_whitespace = true + +[*.{py,yml}] +indent_style = space +max_line_length = 79 + +[*.py] +indent_size = 4 + +[*.rst] +indent_size = 3 + +[Makefile] +indent_style = tab + +[*.yml] +indent_size = 2 diff --git a/aiofile/aio.py b/aiofile/aio.py index 1088407..d0b02dd 100644 --- a/aiofile/aio.py +++ b/aiofile/aio.py @@ -23,9 +23,6 @@ FileIOType = Union[TextIO, BinaryIO, IO] - - - class FileMode(NamedTuple): readable: bool writable: bool @@ -93,10 +90,11 @@ def parse(cls, mode: str) -> "FileMode": # noqa: C901 if m == "b": binary = True - # always add the binary flag because the asynchronous API only works with bytes, - # we must always open the file in binary mode. - if hasattr(os, "O_BINARY"): - flags |= os.O_BINARY + if hasattr(os, "O_BINARY"): + # always add the binary flag because the asynchronous + # API only works with bytes, we must always open the + # file in binary mode. + flags |= os.O_BINARY if readable and writable: flags |= os.O_RDWR @@ -141,11 +139,24 @@ def __init__( self._lock = asyncio.Lock() self._clones = 0 - @classmethod - def from_fp(cls, fp: FileIOType, **kwargs: Any) -> "AIOFile": - afp = cls(fp.name, fp.mode, **kwargs) - afp._fileno = os.dup(fp.fileno()) - return afp + if hasattr(os, 'O_BINARY'): + # In windows, the file may already be opened in text mode, and you + # will have to reopen it in binary mode. + # Unlike unix windows does not allow you to delete an already + # opened file, so it is relatively safe to open a file by name. + @classmethod + async def from_fp(cls, fp: FileIOType, **kwargs: Any) -> "AIOFile": + afp = cls(fp.name, fp.mode, **kwargs) + afp._fileno = await afp._run_in_thread( + os.open, fp.name, afp.mode.flags + ) + return afp + else: + @classmethod + async def from_fp(cls, fp: FileIOType, **kwargs: Any) -> "AIOFile": + afp = cls(fp.name, fp.mode, **kwargs) + afp._fileno = await afp._run_in_thread(os.dup, fp.fileno()) + return afp def _run_in_thread( self, func: "Callable[..., _T]", *args: Any, **kwargs: Any diff --git a/tests/test_aio.py b/tests/test_aio.py index 45094a9..3fbad2b 100644 --- a/tests/test_aio.py +++ b/tests/test_aio.py @@ -681,3 +681,13 @@ async def test_clone_close( async with clone(afp): assert await afp.read() + + +async def test_from_fp(tmp_path): + tmp_fpath = tmp_path / 'test.txt' + + with open(tmp_fpath, 'w+') as fp: + afp = await AIOFile.from_fp(fp) + + assert await afp.write("Hello world") + assert await afp.read() == "Hello world" From e47ef1653ebe03017f88e89efcf2f13c1c52e64d Mon Sep 17 00:00:00 2001 From: Dmitry Orlov Date: Tue, 22 Aug 2023 19:06:20 +0300 Subject: [PATCH 12/17] bump major version --- aiofile/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiofile/version.py b/aiofile/version.py index 3557429..34781e6 100644 --- a/aiofile/version.py +++ b/aiofile/version.py @@ -9,7 +9,7 @@ team_email = "me@mosquito.su" -version_info = (3, 9, 0) +version_info = (4, 0, 0) __author__ = ", ".join("{} <{}>".format(*info) for info in author_info) __version__ = ".".join(map(str, version_info)) From 6e420b818371333b30da39637486d89e215b24cf Mon Sep 17 00:00:00 2001 From: Dmitry Orlov Date: Tue, 22 Aug 2023 19:14:12 +0300 Subject: [PATCH 13/17] add limitations section --- README.rst | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/README.rst b/README.rst index ca1320f..d3c0cf1 100644 --- a/README.rst +++ b/README.rst @@ -517,6 +517,19 @@ Async CSV Dict Reader asyncio.run(main()) +Limitations +----------- + +The underlying library ``caio`` uses multiple implementations of asynchronous +IO to ensure that it works across operating systems. + +This imposes a limitation on the mode of working with files; files are +always opened in binary mode, and string encoding and decoding takes place +in the library, not in the operating system. + +Thus the presence of ``b`` in the file opening mode determines the behavior +of the ``read`` and ``write`` functions, while the file descriptor will +always be opened in binary mode. .. _troubleshooting: From 507797813d83e2091364a37cf44d6b85aac8a08d Mon Sep 17 00:00:00 2001 From: Dmitry Orlov Date: Wed, 23 Aug 2023 12:05:58 +0300 Subject: [PATCH 14/17] revert public-api changes --- aiofile/aio.py | 74 ++++++++++++++++++++++++++++------------------ aiofile/utils.py | 11 ++----- aiofile/version.py | 2 +- tests/test_aio.py | 12 +------- 4 files changed, 51 insertions(+), 48 deletions(-) diff --git a/aiofile/aio.py b/aiofile/aio.py index d0b02dd..553b5d8 100644 --- a/aiofile/aio.py +++ b/aiofile/aio.py @@ -1,10 +1,11 @@ import asyncio import os import sys +import warnings from concurrent.futures import Executor from functools import partial from os import strerror -from pathlib import Path +from pathlib import PurePath from typing import ( IO, Any, Awaitable, BinaryIO, Callable, Dict, Generator, NamedTuple, Optional, TextIO, TypeVar, Union, @@ -122,16 +123,26 @@ class AIOFile: mode: FileMode def __init__( - self, filename: Union[str, Path], + self, file_specifier: Union[str, PurePath, FileIOType], mode: str = "r", encoding: str = sys.getdefaultencoding(), context: Optional[AsyncioContextBase] = None, executor: Optional[Executor] = None, ): self.__context = context or get_default_context() - self._fname = str(filename) + self.__file_specifier = file_specifier + self.__is_fp = all(( + hasattr(self.__file_specifier, "name"), + hasattr(self.__file_specifier, "mode"), + hasattr(self.__file_specifier, "fileno"), + )) - self.mode = FileMode.parse(mode) + if self.__is_fp: + self._fname = self.__file_specifier.name + self.mode = FileMode.parse(self.__file_specifier.mode) + else: + self._fname = self.__file_specifier + self.mode = FileMode.parse(mode) self._fileno = -1 self._encoding = encoding @@ -139,24 +150,37 @@ def __init__( self._lock = asyncio.Lock() self._clones = 0 + @property + def name(self) -> str: + return self._fname + + @property + def loop(self) -> asyncio.AbstractEventLoop: + return self.__context.loop + + @property + def encoding(self) -> str: + return self._encoding + + @classmethod + def from_fp(cls, fp: FileIOType) -> "AIOFile": + warnings.warn( + "Classmethod is deprecated. Do not use this anymore. " + "Just pass file-like as a first argument", + DeprecationWarning + ) + return cls(fp, mode=fp.mode) + if hasattr(os, 'O_BINARY'): # In windows, the file may already be opened in text mode, and you # will have to reopen it in binary mode. # Unlike unix windows does not allow you to delete an already # opened file, so it is relatively safe to open a file by name. - @classmethod - async def from_fp(cls, fp: FileIOType, **kwargs: Any) -> "AIOFile": - afp = cls(fp.name, fp.mode, **kwargs) - afp._fileno = await afp._run_in_thread( - os.open, fp.name, afp.mode.flags - ) - return afp + def _open_fp(self, fp: FileIOType) -> int: + return os.open(fp.name, self.mode.flags) else: - @classmethod - async def from_fp(cls, fp: FileIOType, **kwargs: Any) -> "AIOFile": - afp = cls(fp.name, fp.mode, **kwargs) - afp._fileno = await afp._run_in_thread(os.dup, fp.fileno()) - return afp + def _open_fp(self, fp: FileIOType) -> int: + return os.dup(fp.fileno()) def _run_in_thread( self, func: "Callable[..., _T]", *args: Any, **kwargs: Any @@ -165,19 +189,13 @@ def _run_in_thread( self._executor, partial(func, *args, **kwargs), ) - @property - def name(self) -> str: - return self._fname - - @property - def loop(self) -> asyncio.AbstractEventLoop: - return self.__context.loop - - @property - def encoding(self) -> str: - return self._encoding - def __open(self) -> int: + if self.__is_fp: + result = self._open_fp(self.__file_specifier) + # remove linked object after first open + self.__file_specifier = self._fname + self.__is_fp = False + return result return os.open(self._fname, self.mode.flags) async def open(self) -> Optional[int]: diff --git a/aiofile/utils.py b/aiofile/utils.py index 123ae08..a65ff30 100644 --- a/aiofile/utils.py +++ b/aiofile/utils.py @@ -3,7 +3,7 @@ import io import os from abc import ABC, abstractmethod -from pathlib import Path +from pathlib import PurePath from types import MappingProxyType from typing import Any, Generator, Generic, Optional, Tuple, TypeVar, Union @@ -334,15 +334,10 @@ async def readline(self, size: int = -1, newline: str = "\n") -> str: def async_open( - file_specifier: Union[str, Path, FileIOType], + file_specifier: Union[str, PurePath, FileIOType], mode: str = "r", *args: Any, **kwargs: Any ) -> Union[BinaryFileWrapper, TextFileWrapper]: - if isinstance(file_specifier, (str, Path)): - afp = AIOFile(str(file_specifier), mode, *args, **kwargs) - else: - if args: - raise ValueError("Arguments denied when IO[Any] opening.") - afp = AIOFile.from_fp(file_specifier, **kwargs) + afp = AIOFile(file_specifier, mode, *args, **kwargs) if not afp.mode.binary: return TextFileWrapper(afp) diff --git a/aiofile/version.py b/aiofile/version.py index 34781e6..3557429 100644 --- a/aiofile/version.py +++ b/aiofile/version.py @@ -9,7 +9,7 @@ team_email = "me@mosquito.su" -version_info = (4, 0, 0) +version_info = (3, 9, 0) __author__ = ", ".join("{} <{}>".format(*info) for info in author_info) __version__ = ".".join(map(str, version_info)) diff --git a/tests/test_aio.py b/tests/test_aio.py index 3fbad2b..6408748 100644 --- a/tests/test_aio.py +++ b/tests/test_aio.py @@ -238,7 +238,7 @@ async def test_line_reader(aio_file_maker, temp_file, uuid): for line in lines: await writer(line) - print(await writer("\n")) + await writer("\n") await afp.fsync() read_lines = [] @@ -681,13 +681,3 @@ async def test_clone_close( async with clone(afp): assert await afp.read() - - -async def test_from_fp(tmp_path): - tmp_fpath = tmp_path / 'test.txt' - - with open(tmp_fpath, 'w+') as fp: - afp = await AIOFile.from_fp(fp) - - assert await afp.write("Hello world") - assert await afp.read() == "Hello world" From 808fb640fd66de22d316110af17ff65397515193 Mon Sep 17 00:00:00 2001 From: Dmitry Orlov Date: Wed, 23 Aug 2023 12:22:53 +0300 Subject: [PATCH 15/17] mypy fixes --- aiofile/aio.py | 29 ++++++++++++----------------- tests/test_aio.py | 6 +++--- 2 files changed, 15 insertions(+), 20 deletions(-) diff --git a/aiofile/aio.py b/aiofile/aio.py index 553b5d8..2c7a1fe 100644 --- a/aiofile/aio.py +++ b/aiofile/aio.py @@ -129,20 +129,14 @@ def __init__( executor: Optional[Executor] = None, ): self.__context = context or get_default_context() - self.__file_specifier = file_specifier - self.__is_fp = all(( - hasattr(self.__file_specifier, "name"), - hasattr(self.__file_specifier, "mode"), - hasattr(self.__file_specifier, "fileno"), - )) - if self.__is_fp: + if isinstance(self.__file_specifier, (str, PurePath)): + self._fname = str(self.__file_specifier) + self.mode = FileMode.parse(mode) + else: self._fname = self.__file_specifier.name self.mode = FileMode.parse(self.__file_specifier.mode) - else: - self._fname = self.__file_specifier - self.mode = FileMode.parse(mode) self._fileno = -1 self._encoding = encoding @@ -190,13 +184,14 @@ def _run_in_thread( ) def __open(self) -> int: - if self.__is_fp: - result = self._open_fp(self.__file_specifier) - # remove linked object after first open - self.__file_specifier = self._fname - self.__is_fp = False - return result - return os.open(self._fname, self.mode.flags) + if isinstance(self.__file_specifier, (str, PurePath)): + return os.open(self._fname, self.mode.flags) + + result = self._open_fp(self.__file_specifier) + # remove linked object after first open + self.__file_specifier = self._fname + self.__is_fp = False + return result async def open(self) -> Optional[int]: async with self._lock: diff --git a/tests/test_aio.py b/tests/test_aio.py index 6408748..876e66e 100644 --- a/tests/test_aio.py +++ b/tests/test_aio.py @@ -284,8 +284,8 @@ async def test_truncate(aio_file_maker, temp_file): @pytest.mark.parametrize("size", [1, 2, 3, 5, 10, 20, 100, 1000, 2000, 5000]) -async def test_modes(size, aio_file_maker, tmpdir): - tmpfile = tmpdir.join("test.txt") +async def test_modes(size, aio_file_maker, tmp_path): + tmpfile = tmp_path / "test.txt" async with aio_file_maker(tmpfile, "w") as afp: await afp.write("foo") @@ -302,7 +302,7 @@ async def test_modes(size, aio_file_maker, tmpdir): data = dict((str(i), i)for i in range(size)) - tmpfile = tmpdir.join("test.json") + tmpfile = tmp_path / "test.json" async with aio_file_maker(tmpfile, "w") as afp: await afp.write(json.dumps(data, indent=1)) From 22182f80929326a9971c43440f77c9c23c440981 Mon Sep 17 00:00:00 2001 From: Dmitry Orlov Date: Wed, 23 Aug 2023 12:23:13 +0300 Subject: [PATCH 16/17] allow to change mode for file-like objects --- aiofile/aio.py | 6 +++--- aiofile/utils.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/aiofile/aio.py b/aiofile/aio.py index 2c7a1fe..eca5051 100644 --- a/aiofile/aio.py +++ b/aiofile/aio.py @@ -124,7 +124,7 @@ class AIOFile: def __init__( self, file_specifier: Union[str, PurePath, FileIOType], - mode: str = "r", encoding: str = sys.getdefaultencoding(), + mode: str = '', encoding: str = sys.getdefaultencoding(), context: Optional[AsyncioContextBase] = None, executor: Optional[Executor] = None, ): @@ -133,10 +133,10 @@ def __init__( if isinstance(self.__file_specifier, (str, PurePath)): self._fname = str(self.__file_specifier) - self.mode = FileMode.parse(mode) + self.mode = FileMode.parse(mode or "r") else: self._fname = self.__file_specifier.name - self.mode = FileMode.parse(self.__file_specifier.mode) + self.mode = FileMode.parse(mode or self.__file_specifier.mode) self._fileno = -1 self._encoding = encoding diff --git a/aiofile/utils.py b/aiofile/utils.py index a65ff30..461c484 100644 --- a/aiofile/utils.py +++ b/aiofile/utils.py @@ -335,7 +335,7 @@ async def readline(self, size: int = -1, newline: str = "\n") -> str: def async_open( file_specifier: Union[str, PurePath, FileIOType], - mode: str = "r", *args: Any, **kwargs: Any + mode: str = "", *args: Any, **kwargs: Any ) -> Union[BinaryFileWrapper, TextFileWrapper]: afp = AIOFile(file_specifier, mode, *args, **kwargs) From 38cb295a9b68648118ccc1fb9d4e5b9bcdfabcd8 Mon Sep 17 00:00:00 2001 From: Dmitry Orlov Date: Wed, 23 Aug 2023 12:53:46 +0300 Subject: [PATCH 17/17] README fixes --- README.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index d3c0cf1..e28ce8a 100644 --- a/README.rst +++ b/README.rst @@ -37,7 +37,7 @@ Features * Since version 2.0.0 using `caio`_, which contains linux ``libaio`` and two thread-based implementations (c-based and pure-python). -* AIOFile has no internal pointer. You should pass ``offset`` and +* ``AIOFile`` has no internal pointer. You should pass ``offset`` and ``chunk_size`` for each operation or use helpers (Reader or Writer). The simples way is to use ``async_open`` for creating object with file-like interface. @@ -57,7 +57,7 @@ Limitations * Linux native AIO implementation is not able to open special files. Asynchronous operations against special fs like ``/proc/`` ``/sys/`` are not - supported by the kernel. It's not a `aiofile`s or `caio` issue. + supported by the kernel. It's not a ``aiofile`` or ``caio`` issue. In this cases, you might switch to thread-based implementations (see troubleshooting_ section). However, when used on supported file systems, the linux implementation has a