From a4bd0f31a0b2652b6e4d65a45fbbf2129a9325fc Mon Sep 17 00:00:00 2001 From: cipres Date: Sat, 18 Nov 2023 05:43:13 +0100 Subject: [PATCH] Start integrating with py-ipfs-car-decoder * Add a coroutine in DagAPI to unpack a UnixFS CAR export to a directory * DagAPI.export() now accepts an optional Path to write the CAR to * Use asyncio_mode=auto for pytest * Turn "iclient" into an async fixture that closes the session on exit * GH workflow: test all python versions up to 3.11, test kubo 0.24.0 revbump to 0.6.5 --- .github/workflows/python-package.yml | 22 +++--- README.rst | 10 ++- aioipfs/__init__.py | 2 +- aioipfs/apis/__init__.py | 15 ++++ aioipfs/apis/dag.py | 55 ++++++++++++-- aioipfs/util.py | 42 +++++++++++ requirements-dev.txt | 1 - requirements.txt | 3 +- setup.py | 14 ++-- tests/test_client.py | 104 ++++++++++++++------------- tox.ini | 3 + 11 files changed, 193 insertions(+), 78 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 6007e41..71698d3 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -2,7 +2,7 @@ name: aioipfs-build on: push: - branches: [ master, devel, ci, kubo ] + branches: [ master, devel, ci, kubo, car ] pull_request: branches: [ master ] @@ -10,7 +10,7 @@ jobs: build: strategy: matrix: - python-version: [3.7, 3.8, 3.9] + python-version: ['3.7', '3.8', '3.9', '3.10', '3.11'] os: [ubuntu-latest, macos-latest, windows-latest] runs-on: ${{ matrix.os }} @@ -25,19 +25,14 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install -r requirements-dev.txt - pip install -r requirements.txt - - - name: Lint with flake8 - run: | - flake8 aioipfs tests --count --select=E9,F63,F7,F82 --show-source --statistics + pip install '.[car,test]' - name: Fetch kubo (win) uses: engineerd/configurator@v0.0.8 if: startsWith(matrix.os, 'windows') with: name: ipfs.exe - url: "https://dist.ipfs.tech/kubo/v0.17.0/kubo_v0.17.0_windows-amd64.zip" + url: "https://dist.ipfs.tech/kubo/v0.24.0/kubo_v0.24.0_windows-amd64.zip" pathInArchive: "kubo/ipfs.exe" - name: Fetch kubo (linux) @@ -45,7 +40,7 @@ jobs: if: startsWith(matrix.os, 'ubuntu') with: name: ipfs - url: "https://dist.ipfs.tech/kubo/v0.17.0/kubo_v0.17.0_linux-amd64.tar.gz" + url: "https://dist.ipfs.tech/kubo/v0.24.0/kubo_v0.24.0_linux-amd64.tar.gz" pathInArchive: "kubo/ipfs" - name: Fetch kubo (macos) @@ -53,7 +48,7 @@ jobs: if: startsWith(matrix.os, 'macos') with: name: ipfs - url: "https://dist.ipfs.tech/kubo/v0.17.0/kubo_v0.17.0_darwin-amd64.tar.gz" + url: "https://dist.ipfs.tech/kubo/v0.24.0/kubo_v0.24.0_darwin-amd64.tar.gz" pathInArchive: "kubo/ipfs" - name: Test with pytest @@ -77,8 +72,7 @@ jobs: - name: Build wheel run: | python -m pip install --upgrade pip - pip install -r requirements-dev.txt - pip install -r requirements.txt + pip install '.[car,test]' python setup.py sdist bdist_wheel - name: Upload to PyPI @@ -88,4 +82,4 @@ jobs: TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }} run: | pip install twine - twine upload dist/* + twine check dist/*.whl diff --git a/README.rst b/README.rst index 722a9f0..0b9d74a 100644 --- a/README.rst +++ b/README.rst @@ -5,7 +5,7 @@ aioipfs :info: Asynchronous IPFS_ client library **aioipfs** is a python3 library providing an asynchronous API for IPFS_. -Supported python versions: *3.6*, *3.7*, *3.8*, *3.9*, *3.10*, *3.11*. +Supported python versions: *3.6*, *3.7*, *3.8*, *3.9*, *3.10*, *3.11*, *3.12*. This library supports the `RPC API specifications `_ @@ -27,6 +27,14 @@ Installation pip install aioipfs +Support for CAR (Content-addressed Archives) decoding (with the +`ipfs-car-decoder package `_) +can be enabled with the *car* extra: + +.. code-block:: shell + + pip install 'aioipfs[car]' + By default the *json* module from the standard Python library is used to decode JSON messages, but orjson_ will be used if it is installed: diff --git a/aioipfs/__init__.py b/aioipfs/__init__.py index 9b9042f..7cd29f9 100644 --- a/aioipfs/__init__.py +++ b/aioipfs/__init__.py @@ -1,4 +1,4 @@ -__version__ = '0.6.4' +__version__ = '0.6.5' from yarl import URL from distutils.version import StrictVersion diff --git a/aioipfs/apis/__init__.py b/aioipfs/apis/__init__.py index 3bacb5b..a92fed1 100644 --- a/aioipfs/apis/__init__.py +++ b/aioipfs/apis/__init__.py @@ -11,6 +11,7 @@ from aioipfs.helpers import * # noqa from aioipfs.exceptions import * # noqa +from aioipfs.util import car_decoder, have_car_decoder DEFAULT_TIMEOUT = 60 * 60 @@ -102,6 +103,20 @@ async def fetch_raw(self, url, params={}, timeout=DEFAULT_TIMEOUT): return data + async def car_stream(self, url, params={}): + if not have_car_decoder: + raise Exception('The CAR decoding library is not available') + + stream = car_decoder.ChunkedMemoryByteStream() + + async with self.driver.session.post(url, params=params) as resp: + async for chunk, _ in resp.content.iter_chunks(): + await stream.append_bytes(chunk) + + await stream.mark_complete() + + return stream + async def fetch_json(self, url, params={}, timeout=DEFAULT_TIMEOUT): return await self.post(url, params=params, outformat='json') diff --git a/aioipfs/apis/dag.py b/aioipfs/apis/dag.py index dbfa8ac..bdb1890 100644 --- a/aioipfs/apis/dag.py +++ b/aioipfs/apis/dag.py @@ -1,4 +1,5 @@ import os.path +from pathlib import Path from aiohttp import payload from aioipfs.api import SubAPI @@ -6,6 +7,7 @@ from aioipfs.api import ARG_PARAM from aioipfs import multi from aioipfs import UnknownAPIError +from aioipfs import util class DagAPI(SubAPI): @@ -44,18 +46,63 @@ async def put(self, filename, return await self.post(self.url('dag/put'), mpwriter, params=params, outformat='json') - async def car_export(self, cid, progress=False): + async def car_export(self, cid: str, progress: bool = False, + output_path: Path = None): """ - Streams the selected DAG as a .car stream on stdout. + Streams the selected DAG as a .car stream and return it + as a raw buffer or write it to a file if output_path is + passed. - :param str cid: CID of a root to recursively export + :param str cid: Root CID of a DAG to recursively export + :param bool progress: Stream progress + :param Path output_path: Write the CAR data to this file (optional) """ - return await self.fetch_raw( + car_data = await self.fetch_raw( self.url('dag/export'), params={ARG_PARAM: cid, 'progress': boolarg(progress)} ) + if output_path is None: + return car_data + else: + with open(output_path, 'wb') as car: + car.write(car_data) + + export = car_export + + async def export_to_directory(self, + cid: str, + dst_dir: Path) -> bool: + """ + Export a UnixFS DAG to a CAR and unpack it to a directory + + :param str cid: CID of a UnixFS DAG to recursively export + :param Path dst_dir: Filesystem destination path + :rtype: bool + """ + + if not util.have_car_decoder: + raise util.CARDecoderMissing( + 'The CAR decoding library is not available') + + if not dst_dir.exists(): + dst_dir.mkdir(parents=True, exist_ok=True) + + stream = await self.car_stream( + self.url('dag/export'), + params={ARG_PARAM: cid, 'progress': boolarg(False)} + ) + + if not stream: + raise ValueError(f'Failed to get car stream for {cid}') + + await util.car_decoder.write_car_filesystem_to_path( + cid, stream, str(dst_dir) + ) + + return True + async def get(self, objpath, output_codec=None): """ Get a DAG node from IPFS diff --git a/aioipfs/util.py b/aioipfs/util.py index a4f2111..584897b 100644 --- a/aioipfs/util.py +++ b/aioipfs/util.py @@ -1,5 +1,47 @@ import json from functools import reduce +from pathlib import Path + +try: + import ipfs_car_decoder as car_decoder + have_car_decoder = True +except Exception: + have_car_decoder = False + car_decoder = None + + +class CARDecoderMissing(Exception): + pass + + +def car_open(car_path: Path): + """ + Open a Content-Adressed aRchive file and return the CAR stream. + + :param Path car_path: CAR file path + """ + + if not have_car_decoder: + raise CARDecoderMissing() + + return car_decoder.FileByteStream(car_path) + + +async def car_bytes(stream, cid: str) -> bytes: + """ + CAR stream to bytes + + :param stream: CAR stream + :param str cid: CID of the UnixFS directory to export + :rtype: bytes + """ + + buff = b'' + + async for chunk in car_decoder.stream_bytes(cid, stream): + buff += chunk + + return buff class DotJSON(dict): diff --git a/requirements-dev.txt b/requirements-dev.txt index f5039e0..a026989 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,4 +1,3 @@ -importlib_metadata pytest pytest-asyncio tox diff --git a/requirements.txt b/requirements.txt index 9625db9..0c0e52b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,8 @@ aiohttp>=3.7.4 aiofiles>=0.7.0 base58>=1.0.2 -gitignore-parser>=0.1.9 +gitignore-parser==0.1.9 multiaddr>=0.0.9 py-multibase>=1.0.3 py-multiformats-cid>=0.4.3 +setuptools>=67.7.0 diff --git a/setup.py b/setup.py index e2a642b..9d11ea9 100644 --- a/setup.py +++ b/setup.py @@ -7,11 +7,7 @@ from setuptools import setup from setuptools import find_packages -PY_VER = sys.version_info - -if PY_VER >= (3, 6): - pass -else: +if sys.version_info < (3, 6): raise RuntimeError("You need python3.6 or newer") with codecs.open(os.path.join(os.path.abspath(os.path.dirname( @@ -28,6 +24,9 @@ with open('requirements.txt') as f: install_reqs = f.read().splitlines() +with open('requirements-dev.txt') as f: + install_test_reqs = f.read().splitlines() + setup( name='aioipfs', version=version, @@ -42,7 +41,9 @@ include_package_data=False, install_requires=install_reqs, extras_require={ - 'orjson': ['orjson>=3.0'] + 'orjson': ['orjson>=3.0'], + 'car': ['ipfs-car-decoder==0.1.1'], + 'test': install_test_reqs }, classifiers=[ 'Programming Language :: Python', @@ -52,6 +53,7 @@ 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12' 'Intended Audience :: Developers', 'Development Status :: 5 - Production/Stable', 'Natural Language :: English', diff --git a/tests/test_client.py b/tests/test_client.py index 3714cc7..b5eb903 100755 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -11,6 +11,7 @@ import platform import json import tarfile +import sys from pathlib import Path from multiaddr import Multiaddr @@ -18,7 +19,7 @@ import asyncio import aioipfs -from aioipfs.util import DotJSON +from aioipfs import util from aioipfs.multi import DirectoryListing @@ -32,7 +33,7 @@ def ipfs_config_get(): except Exception: return None else: - return DotJSON(cfg) + return util.DotJSON(cfg) def ipfs_config_replace(filep: str): @@ -104,7 +105,13 @@ def dir_hierarchy2(tmpdir): root.joinpath('d/.e/f').mkdir(parents=True) root.joinpath('d/.e/f/.file3').touch() root.joinpath('file1').touch() - root.joinpath('README.txt').touch() + + readme = root.joinpath('README.txt') + readme.touch() + + with open(readme, 'wt') as f: + f.write('Hello') + root.joinpath('README2.txt').touch() root.joinpath('.file2').touch() ign = root.joinpath('.gitignore') @@ -176,9 +183,11 @@ def ipfs_peerid(ipfsdaemon): return ipfs_getconfig_var('Identity.PeerID').strip() -@pytest.fixture() -def iclient(event_loop): - return aioipfs.AsyncIPFS(port=apiport, loop=event_loop) +@pytest.fixture(autouse=True) +async def iclient(event_loop): + client = aioipfs.AsyncIPFS(port=apiport, loop=event_loop) + yield client + await client.close() class TestClientConstructor: @@ -283,13 +292,11 @@ async def test_basic(self, event_loop, ipfsdaemon, iclient): await iclient.id() await iclient.core.version() await iclient.commands() - await iclient.close() @pytest.mark.asyncio async def test_bootstrap(self, event_loop, ipfsdaemon, iclient): tmpdir, sp = ipfsdaemon await iclient.bootstrap.list() - await iclient.close() @pytest.mark.asyncio async def test_swarm(self, event_loop, ipfsdaemon, iclient): @@ -298,8 +305,6 @@ async def test_swarm(self, event_loop, ipfsdaemon, iclient): await iclient.swarm.addrs_local() await iclient.swarm.addrs_listen() - await iclient.close() - @pytest.mark.asyncio async def test_swarm_resources(self, event_loop, ipfsdaemon, iclient): if await iclient.agent_version_get() < \ @@ -326,8 +331,6 @@ async def test_swarm_peering(self, event_loop, ipfsdaemon, iclient): with pytest.raises(aioipfs.APIError): await iclient.swarm.peering.rm('nothere') - await iclient.close() - @pytest.mark.asyncio async def test_refs(self, event_loop, ipfsdaemon, iclient, testfile1): @@ -339,14 +342,11 @@ async def test_refs(self, event_loop, ipfsdaemon, iclient, async for refobj in iclient.refs.local(): assert 'Ref' in refobj - await iclient.close() - @pytest.mark.asyncio async def test_block1(self, event_loop, ipfsdaemon, iclient, testfile1): reply = await iclient.block.put(testfile1) data = await iclient.block.get(reply['Key']) assert data.decode() == testfile1.read() - await iclient.close() @pytest.mark.asyncio async def test_add(self, event_loop, ipfsdaemon, iclient, testfile1, @@ -388,8 +388,6 @@ async def test_add(self, event_loop, ipfsdaemon, iclient, testfile1, await iclient.add_str('test', to_files='/wslash') assert (await iclient.files.read('/wslash')).decode() == 'test' - await iclient.close() - @pytest.mark.asyncio async def test_hidden(self, event_loop, ipfsdaemon, iclient, dir_hierarchy1): @@ -438,7 +436,6 @@ async def test_addtar(self, event_loop, ipfsdaemon, iclient, fetched = await iclient.tar.cat(tarhash) f = tmpdir.join('new.tar') f.write(fetched) - await iclient.close() @pytest.mark.asyncio @pytest.mark.parametrize('order', ['gin', 'tonic']) @@ -456,7 +453,6 @@ async def test_addjson(self, event_loop, ipfsdaemon, iclient, data = await iclient.cat(h) assert data.decode() == json.dumps(json1) - await iclient.close() @pytest.mark.asyncio @pytest.mark.parametrize('data', [b'234098dsfkj2doidf0']) @@ -471,8 +467,6 @@ async def test_addbytes(self, event_loop, ipfsdaemon, iclient, data): reply = await iclient.add_bytes(data, cid_version=1, hash='sha2-512') assert reply['Hash'] == 'bafkrgqdao6vujlzh4z6o7mzgv3jnydftv2of5jy32yufswk7bnvwaq7oyaizo6gnditr4okfphi2cguz2cack27rsjfzuybm57knagzjl6m34' # noqa - await iclient.close() - @pytest.mark.asyncio @pytest.mark.parametrize('data', [b'234098dsfkj2doidf0']) async def test_dag(self, event_loop, ipfsdaemon, iclient, tmpdir, data): @@ -484,7 +478,6 @@ async def test_dag(self, event_loop, ipfsdaemon, iclient, tmpdir, data): reply = await iclient.dag.put(filedag) assert 'Cid' in reply - await iclient.close() @pytest.mark.asyncio @pytest.mark.parametrize('data', [b'234098dsfkj2doidf0']) @@ -515,13 +508,51 @@ async def test_car(self, event_loop, ipfsdaemon, iclient, tmpdir, data): os.close(carfd) os.unlink(filecar) + @pytest.mark.asyncio + @pytest.mark.skipif(sys.version_info < (3, 11), + reason='Need python >= 3.11 for CAR decoding') + async def test_car_fs_export(self, event_loop, ipfsdaemon, iclient, + tmpdir, testfile1): + """ + Test unpacking a UnixFS CAR export to a directory + by using the /dag/export endpoint + """ + + if await iclient.agent_version_get() < \ + aioipfs.IpfsDaemonVersion('0.20.0'): + pytest.skip('Not testing CAR export for this version of kubo') + + dst = Path(tmpdir).joinpath('unpacked') + dst.mkdir(parents=True, exist_ok=True) + + cids = [added['Hash'] async for added in + iclient.add(str(testfile1), wrap=True, cid_version=1)] + top_cid = cids[-1] + + path = Path(tmpdir).joinpath('export.car') + await iclient.dag.export(top_cid, output_path=path) + + assert path.is_file() + + # Test the function that reads a CAR file + stream = util.car_open(path) + assert stream + + data = await util.car_bytes(stream, top_cid) + assert data == b'POIEKJDOOOPIDMWOPIMPOWE()=ds129084bjcy' + + assert await iclient.dag.export_to_directory(top_cid, dst) is True + + fp = dst.joinpath(cids[0]) + assert fp.is_file() + assert fp.read_text() == 'POIEKJDOOOPIDMWOPIMPOWE()=ds129084bjcy' + @pytest.mark.asyncio @pytest.mark.skipif(platform.system() == 'Windows', reason='This kubo API is not available on your OS') async def test_diag(self, event_loop, ipfsdaemon, iclient, tmpdir): reply = await iclient.diag.sys() assert 'diskinfo' in reply - await iclient.close() @pytest.mark.asyncio @pytest.mark.parametrize('data', [b'0123456789']) @@ -532,7 +563,6 @@ async def test_catoffset(self, event_loop, ipfsdaemon, iclient, assert raw.decode() == '456789' raw = await iclient.cat(entry['Hash'], offset=2, length=3) assert raw.decode() == '234' - await iclient.close() @pytest.mark.asyncio async def test_get(self, event_loop, ipfsdaemon, @@ -546,7 +576,6 @@ async def test_get(self, event_loop, ipfsdaemon, assert result is True assert cid in os.listdir(tmpdir) - await iclient.close() @pytest.mark.asyncio async def test_multiget(self, event_loop, ipfsdaemon, @@ -568,8 +597,6 @@ async def test_multiget(self, event_loop, ipfsdaemon, status, read, clength = result assert status in [0, 1] - await iclient.close() - @pytest.mark.asyncio async def test_multibase(self, event_loop, ipfsdaemon, iclient, tmpdir, testfile1): @@ -597,8 +624,6 @@ async def test_multibase(self, event_loop, ipfsdaemon, iclient, reply = await iclient.multibase.transcode(str(encp)) assert isinstance(reply, str) - await iclient.close() - @pytest.mark.asyncio @pytest.mark.parametrize('topic', ['aioipfs.pytest']) @pytest.mark.parametrize('msgdata', ['test', @@ -664,8 +689,6 @@ async def subtask(): await asyncio.sleep(0.5) assert t.result() is True - await iclient.close() - @pytest.mark.asyncio async def test_routing(self, event_loop, ipfsdaemon, iclient): if await iclient.agent_version_get() < \ @@ -680,14 +703,11 @@ async def test_routing(self, event_loop, ipfsdaemon, iclient): provs = [p async for p in iclient.routing.findprovs(reply['Hash'])] assert len(provs) > 0 - await iclient.close() - @pytest.mark.asyncio async def test_stats(self, event_loop, ipfsdaemon, iclient): await iclient.stats.bw() await iclient.stats.bitswap() await iclient.stats.repo() - await iclient.close() @pytest.mark.asyncio @pytest.mark.parametrize('protocol', ['/x/test']) @@ -711,7 +731,6 @@ async def test_p2p(self, event_loop, ipfsdaemon, iclient, protocol, await iclient.p2p.listener_close(protocol) listeners = await iclient.p2p.listener_ls() assert listeners['Listeners'] is None - await iclient.close() @pytest.mark.asyncio @pytest.mark.parametrize('protocol', ['/x/test']) @@ -725,8 +744,6 @@ async def test_p2p_dial(self, event_loop, ipfsdaemon, iclient, allow_loopback=True) as ctx: assert ctx.maddr == Multiaddr(address) - await iclient.close() - @pytest.mark.asyncio @pytest.mark.parametrize('keysize', [2048, 4096]) async def test_keys(self, event_loop, ipfsdaemon, iclient, @@ -752,20 +769,16 @@ async def test_keys(self, event_loop, ipfsdaemon, iclient, ) assert reply['Name'] == impname - await iclient.close() - @pytest.mark.asyncio async def test_bitswap(self, event_loop, ipfsdaemon, iclient): await iclient.bitswap.wantlist() stats = await iclient.bitswap.stat() assert 'Wantlist' in stats assert 'DataSent' in stats - await iclient.close() @pytest.mark.asyncio async def test_filestore(self, event_loop, ipfsdaemon, iclient): await iclient.filestore.dups() - await iclient.close() @pytest.mark.asyncio @pytest.mark.parametrize('obj', [b'0123456789']) @@ -790,7 +803,6 @@ async def test_files_rw(self, event_loop, ipfsdaemon, iclient, obj, offset=5) data = await iclient.files.read('/test3', offset=5, count=3) assert data == otro - await iclient.close() @pytest.mark.asyncio @pytest.mark.parametrize('obj', [b'0123456789']) @@ -806,8 +818,6 @@ async def test_files_cp(self, event_loop, ipfsdaemon, iclient, obj): data = await iclient.files.read('/test9') assert data == obj - await iclient.close() - @pytest.mark.asyncio @pytest.mark.parametrize('obj1', [b'0123456789']) @pytest.mark.parametrize('obj2', [b'0a1b2c3d4e5']) @@ -841,7 +851,6 @@ async def test_object(self, event_loop, ipfsdaemon, iclient, obj1, obj2, rm = await iclient.object.patch.rm_link(r2['Hash'], 'obj1') dag = await iclient.object.get(rm['Hash']) assert len(dag['Links']) == 1 - await iclient.close() @pytest.mark.asyncio async def test_name_inspect(self, event_loop, ipfsdaemon, iclient): @@ -901,8 +910,6 @@ async def test_config(self, event_loop, ipfsdaemon, iclient, tmpdir): result = await iclient.config.config('Bootstrap') assert result['Value'] == [] - await iclient.close() - @pytest.mark.asyncio async def test_cidapi(self, event_loop, ipfsdaemon, iclient, testfile1): async for added in iclient.add(str(testfile1), cid_version=1): @@ -916,7 +923,6 @@ async def test_cidapi(self, event_loop, ipfsdaemon, iclient, testfile1): await iclient.cid.codecs() await iclient.cid.bases() await iclient.cid.hashes() - await iclient.close() @pytest.mark.asyncio @pytest.mark.skip(reason='This test relies on specific network conditions') @@ -957,8 +963,6 @@ async def test_pin_remote(self, event_loop, ipfsdaemon, iclient, res = await iclient.pin.remote.service.ls() assert len(res['RemoteServices']) == 0 - await iclient.close() - class TestMultipart: def test_dirlisting(self, dir_hierarchy2): diff --git a/tox.ini b/tox.ini index 6b947fe..79379db 100644 --- a/tox.ini +++ b/tox.ini @@ -16,3 +16,6 @@ commands = [flake8] ignore = F403, F405, E722, W504 + +[pytest] +asyncio_mode = auto