Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Split and add extra configuration to export_data method #580

Merged
merged 25 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0a84a72
Splitted and added extra onfiguration to export_data
deshansh Oct 7, 2024
f56aeb4
Code refactor + review comments
deshansh Oct 8, 2024
84ffb1a
chore(deps): update dependency typescript to v5.6.2 (#578)
renovate[bot] Oct 7, 2024
3d83ed3
chore(deps): update dependency eslint-plugin-react to v7.37.1 (#577)
renovate[bot] Oct 7, 2024
8b653a2
ci: install Poetry to setup-python environment (#579)
vdusek Oct 8, 2024
33acd8c
fix: Workaround for JSON value typing problems (#581)
janbuchar Oct 8, 2024
96f7788
chore(release): Update changelog and package version [skip ci]
Oct 8, 2024
fc0d1b0
ci: use python version instead of executable path (#582)
vdusek Oct 8, 2024
15c536d
ci: fix python version reference in pipx install (#583)
vdusek Oct 10, 2024
b12324e
feat: Key-value store context helpers (#584)
janbuchar Oct 14, 2024
08097ba
chore(release): Update changelog and package version [skip ci]
Oct 14, 2024
4a5dc9e
ci: rm unused pipeline (#591)
vdusek Oct 15, 2024
7d08a19
ci: use shared workflows and refactor them (#593)
vdusek Oct 16, 2024
bc7358e
ci: fix build and deploy docs workflow (#594)
vdusek Oct 16, 2024
800ac52
chore: Automatic docs theme update [skip ci]
github-actions[bot] Oct 16, 2024
39f9cd9
chore: fix doc commands in Makefile (#595)
vdusek Oct 16, 2024
92fa67d
Splitted and added extra onfiguration to export_data
deshansh Oct 7, 2024
3f2c4da
Merge remote-tracking branch 'origin' into implement-extra-configurat…
deshansh Oct 19, 2024
079ea5e
Merge remote-tracking branch 'origin/master' into implement-extra-con…
janbuchar Oct 31, 2024
588b7ce
(Sort of) bring back BasicCrawler.export_data
janbuchar Oct 31, 2024
231e0ce
Typo
janbuchar Oct 31, 2024
d05c084
Docs, validation
janbuchar Oct 31, 2024
26010d8
Remove conflict marker residue
janbuchar Oct 31, 2024
756c32b
Add all possible export kwargs
janbuchar Oct 31, 2024
ac4fbb3
Fix type
janbuchar Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/examples/code/export_entire_dataset_to_file_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def request_handler(context: BeautifulSoupCrawlingContext) -> None:
await crawler.run(['https://crawlee.dev'])

# Export the entire dataset to a CSV file.
await crawler.export_data('results.csv')
await crawler.export_data_csv(path='results.csv')


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/code/export_entire_dataset_to_file_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def request_handler(context: BeautifulSoupCrawlingContext) -> None:
await crawler.run(['https://crawlee.dev'])

# Export the entire dataset to a JSON file.
await crawler.export_data('results.json')
await crawler.export_data_json(path='results.json')


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/code/parsel_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def request_handler(context: ParselCrawlingContext) -> None:
await crawler.run(['https://github.com'])

# Export the entire dataset to a JSON file.
await crawler.export_data('results.json')
await crawler.export_data_json(path='results.json')


if __name__ == '__main__':
Expand Down
64 changes: 59 additions & 5 deletions src/crawlee/basic_crawler/_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from datetime import timedelta
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, AsyncContextManager, Callable, Generic, Literal, Union, cast
from typing import TYPE_CHECKING, Any, AsyncContextManager, Callable, Generic, Union, cast
from urllib.parse import ParseResult, urlparse

from tldextract import TLDExtract
Expand Down Expand Up @@ -55,7 +55,7 @@
from crawlee.proxy_configuration import ProxyConfiguration, ProxyInfo
from crawlee.sessions import Session
from crawlee.statistics import FinalStatistics, StatisticsState
from crawlee.storages._dataset import GetDataKwargs, PushDataKwargs
from crawlee.storages._dataset import ExportDataCsvKwargs, ExportDataJsonKwargs, GetDataKwargs, PushDataKwargs
from crawlee.storages._request_provider import RequestProvider

TCrawlingContext = TypeVar('TCrawlingContext', bound=BasicCrawlingContext, default=BasicCrawlingContext)
Expand Down Expand Up @@ -531,26 +531,80 @@ async def get_data(
async def export_data(
janbuchar marked this conversation as resolved.
Show resolved Hide resolved
self,
path: str | Path,
content_type: Literal['json', 'csv'] | None = None,
dataset_id: str | None = None,
dataset_name: str | None = None,
) -> None:
"""Export data from a dataset.

This helper method simplifies the process of exporting data from a dataset. It opens the specified
dataset and then exports the data based on the provided parameters. If you need to pass options
specific to the output format, use the `export_data_csv` or `export_data_json` method instead.

Args:
path: The destination path.
dataset_id: The ID of the dataset.
dataset_name: The name of the dataset.
"""
dataset = await self.get_dataset(id=dataset_id, name=dataset_name)

path = path if isinstance(path, Path) else Path(path)
destination = path.open('w', newline='')

if path.suffix == '.csv':
await dataset.write_to_csv(destination)
elif path.suffix == '.json':
await dataset.write_to_json(destination)
else:
raise ValueError(f'Unsupported file extension: {path.suffix}')

async def export_data_csv(
self,
path: str | Path,
janbuchar marked this conversation as resolved.
Show resolved Hide resolved
*,
dataset_id: str | None = None,
dataset_name: str | None = None,
**kwargs: Unpack[ExportDataCsvKwargs],
janbuchar marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
"""Export data from a dataset to a CSV file.

This helper method simplifies the process of exporting data from a dataset in csv format. It opens the specified
dataset and then exports the data based on the provided parameters.

Args:
path: The destination path.
content_type: The output format.
dataset_id: The ID of the dataset.
dataset_name: The name of the dataset.
kwargs: Extra configurations for dumping/writing in csv format.
"""
dataset = await self.get_dataset(id=dataset_id, name=dataset_name)
path = path if isinstance(path, Path) else Path(path)

return await dataset.write_to_csv(path.open('w', newline=''), **kwargs)

async def export_data_json(
self,
path: str | Path,
janbuchar marked this conversation as resolved.
Show resolved Hide resolved
*,
dataset_id: str | None = None,
dataset_name: str | None = None,
**kwargs: Unpack[ExportDataJsonKwargs],
janbuchar marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
"""Export data from a dataset to a JSON file.

This helper method simplifies the process of exporting data from a dataset in json format. It opens the
specified dataset and then exports the data based on the provided parameters.

Args:
path: The destination path
dataset_id: The ID of the dataset.
dataset_name: The name of the dataset.
kwargs: Extra configurations for dumping/writing in json format.
"""
dataset = await self.get_dataset(id=dataset_id, name=dataset_name)
path = path if isinstance(path, Path) else Path(path)

final_content_type = content_type or ('csv' if path.suffix == '.csv' else 'json')
return await dataset.write_to(final_content_type, path.open('w', newline=''))
return await dataset.write_to_json(path.open('w', newline=''), **kwargs)

async def _push_data(
self,
Expand Down
75 changes: 63 additions & 12 deletions src/crawlee/storages/_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,34 @@ class ExportToKwargs(TypedDict):
"""Name of the key-value store to save the exported file."""


class ExportDataJsonKwargs(TypedDict):
"""Keyword arguments for dataset's `export_data_json` method.

Args:
ensure_ascii: Whether non-ASCII characters should be escaped in the output JSON string.
indent: Specifies the number of spaces to use for indentation in the pretty-printed JSON output.
sort_keys: Specifies whether the output JSON object should have its keys sorted alphabetically.
"""

ensure_ascii: NotRequired[bool]
indent: NotRequired[int]
sort_keys: NotRequired[bool]


class ExportDataCsvKwargs(TypedDict):
"""Keyword arguments for dataset's `export_data_csv` method.

Args:
delimiter: A character that separates fields in the CSV file.
quotechar: A character used to enclose fields containing special characters like the delimiter.
quoting: An integer that defines how quotes should be applied.
"""

delimiter: NotRequired[str]
quotechar: NotRequired[str]
quoting: NotRequired[int]
janbuchar marked this conversation as resolved.
Show resolved Hide resolved


class Dataset(BaseStorage):
"""Represents an append-only structured storage, ideal for tabular data similar to database tables.

Expand Down Expand Up @@ -212,12 +240,12 @@ async def get_data(self, **kwargs: Unpack[GetDataKwargs]) -> DatasetItemsListPag
# https://github.com/apify/apify-sdk-python/issues/140
return await self._resource_client.list_items(**kwargs)

async def write_to(self, content_type: Literal['json', 'csv'], destination: TextIO) -> None:
async def write_to_csv(self, destination: TextIO, **kwargs: Unpack[ExportDataCsvKwargs]) -> None:
"""Exports the entire dataset into an arbitrary stream.

Args:
content_type: Specifies the output format.
destination: The stream into which the dataset contents should be written.
kwargs: Additional keyword arguments for `csv.writer`.
"""
items: list[dict] = []
limit = 1000
Expand All @@ -230,16 +258,34 @@ async def write_to(self, content_type: Literal['json', 'csv'], destination: Text
break
offset += list_items.count

if content_type == 'csv':
if items:
writer = csv.writer(destination, quoting=csv.QUOTE_MINIMAL)
writer.writerows([items[0].keys(), *[item.values() for item in items]])
else:
logger.warning('Attempting to export an empty dataset - no file will be created')
elif content_type == 'json':
json.dump(items, destination)
if items:
writer = csv.writer(destination, **kwargs)
writer.writerows([items[0].keys(), *[item.values() for item in items]])
else:
logger.warning('Attempting to export an empty dataset - no file will be created')

async def write_to_json(self, destination: TextIO, **kwargs: Unpack[ExportDataJsonKwargs]) -> None:
"""Exports the entire dataset into an arbitrary stream.

Args:
destination: The stream into which the dataset contents should be written.
kwargs: Additional keyword arguments for `json.dump`.
"""
items: list[dict] = []
limit = 1000
offset = 0
vdusek marked this conversation as resolved.
Show resolved Hide resolved

while True:
list_items = await self._resource_client.list_items(limit=limit, offset=offset)
items.extend(list_items.items)
if list_items.total <= offset + list_items.count:
break
offset += list_items.count

if items:
json.dump(items, destination, **kwargs)
else:
raise ValueError(f'Unsupported content type: {content_type}')
logger.warning('Attempting to export an empty dataset - no file will be created')

async def export_to(self, **kwargs: Unpack[ExportToKwargs]) -> None:
"""Exports the entire dataset into a specified file stored under a key in a key-value store.
Expand All @@ -260,7 +306,12 @@ async def export_to(self, **kwargs: Unpack[ExportToKwargs]) -> None:
key_value_store = await KeyValueStore.open(id=to_key_value_store_id, name=to_key_value_store_name)

output = io.StringIO()
await self.write_to(content_type, output)
if content_type == 'csv':
await self.write_to_csv(output)
elif content_type == 'json':
await self.write_to_json(output)
else:
raise ValueError('Unsupported content type, expecting CSV or JSON')

if content_type == 'csv':
await key_value_store.set_value(key, output.getvalue(), 'text/csv')
Expand Down
35 changes: 31 additions & 4 deletions tests/unit/basic_crawler/test_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,8 @@ async def test_crawler_push_and_export_data(tmp_path: Path) -> None:
await dataset.push_data([{'id': 0, 'test': 'test'}, {'id': 1, 'test': 'test'}])
await dataset.push_data({'id': 2, 'test': 'test'})

await crawler.export_data(tmp_path / 'dataset.json')
await crawler.export_data(tmp_path / 'dataset.csv')
await crawler.export_data_json(path=tmp_path / 'dataset.json')
await crawler.export_data_csv(path=tmp_path / 'dataset.csv')

assert json.load((tmp_path / 'dataset.json').open()) == [
{'id': 0, 'test': 'test'},
Expand All @@ -606,8 +606,8 @@ async def handler(context: BasicCrawlingContext) -> None:

await crawler.run([f'{httpbin}/1'])

await crawler.export_data(tmp_path / 'dataset.json')
await crawler.export_data(tmp_path / 'dataset.csv')
await crawler.export_data_json(path=tmp_path / 'dataset.json')
await crawler.export_data_csv(path=tmp_path / 'dataset.csv')

assert json.load((tmp_path / 'dataset.json').open()) == [
{'id': 0, 'test': 'test'},
Expand All @@ -618,6 +618,33 @@ async def handler(context: BasicCrawlingContext) -> None:
assert (tmp_path / 'dataset.csv').read_bytes() == b'id,test\r\n0,test\r\n1,test\r\n2,test\r\n'


async def test_crawler_push_and_export_data_and_json_dump_parameter(httpbin: str, tmp_path: Path) -> None:
crawler = BasicCrawler()

@crawler.router.default_handler
async def handler(context: BasicCrawlingContext) -> None:
await context.push_data([{'id': 0, 'test': 'test'}, {'id': 1, 'test': 'test'}])
await context.push_data({'id': 2, 'test': 'test'})

await crawler.run([f'{httpbin}/1'])

await crawler.export_data_json(path=tmp_path / 'dataset.json', indent=3)

with (tmp_path / 'dataset.json').open() as json_file:
exported_json_str = json_file.read()

# Expected data in JSON format with 3 spaces indent
expected_data = [
{'id': 0, 'test': 'test'},
{'id': 1, 'test': 'test'},
{'id': 2, 'test': 'test'},
]
expected_json_str = json.dumps(expected_data, indent=3)

# Assert that the exported JSON string matches the expected JSON string
assert exported_json_str == expected_json_str


async def test_context_update_kv_store() -> None:
crawler = BasicCrawler()

Expand Down
Loading