Skip to content

Commit a32274f

Browse files
authored
fix: Make ApifyFileSystemStorageClient compatible with the apify cli (#677)
### Description - Make sure that storage from `ApifyFileSystemStorageClient` does not get purged twice due to storage from `FileSystemStorageClient` pointing to the same location. (Those storage clients will have the same cache key and thus there can be only one.) - Ensure that `Actor` will open input containing KVS on initialization to ensure that an aware storage client is used. - Support any possible pre-existing input key and file that is defined through `Configuration.input_key`. Different input files will have different handling based on their suffix: - ".json" is parsed as json. - ".txt" is opened as plain text - everything else is opened as bytes - without extension is tried to be parsed as json first, but falls back to bytes - Create a metadata file for the valid pre-existing input file, without modifying the input file (otherwise, cli might detect the change to the input, which would be a false positive) - Raise an error if two valid pre-existing input files exist in the expected storage directory. - CLI does not respect env variables with the input key so far. TODO: apify/apify-cli#960 ### Issues Closes: apify/crawlee-python#621 Related to: [#INPUT.json Automatically Deleted on Each Run (Python SDK Local Storage Issue)](#686) ### Testing - Added unit tests. - Manually tested with [apify-cli@1.1.2-beta.20](https://www.npmjs.com/package/apify-cli/v/1.1.2-beta.20) - npx apify-cli@1.1.2-beta.20 run -i {\"a\":\"c\"} with pre-existing input file or without input and multiple times in a row - npx apify-cli@1.1.2-beta.20 run with pre-existing input file or without input and multiple times in a row
1 parent 68012c7 commit a32274f

File tree

4 files changed

+238
-42
lines changed

4 files changed

+238
-42
lines changed

src/apify/_actor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,10 @@ async def __aenter__(self) -> Self:
192192

193193
# Mark initialization as complete and update global state.
194194
self._is_initialized = True
195+
196+
if not Actor.is_at_home():
197+
# Make sure that the input related KVS is initialized to ensure that the input aware client is used
198+
await self.open_key_value_store()
195199
return self
196200

197201
async def __aexit__(
Lines changed: 83 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
import asyncio
22
import json
33
import logging
4+
from itertools import chain
5+
from pathlib import Path
46

5-
from more_itertools import flatten
67
from typing_extensions import Self, override
78

89
from crawlee._consts import METADATA_FILENAME
10+
from crawlee._utils.file import atomic_write, infer_mime_type, json_dumps
911
from crawlee.configuration import Configuration as CrawleeConfiguration
1012
from crawlee.storage_clients._file_system import FileSystemKeyValueStoreClient
11-
from crawlee.storage_clients.models import KeyValueStoreRecord
13+
from crawlee.storage_clients.models import KeyValueStoreMetadata, KeyValueStoreRecord, KeyValueStoreRecordMetadata
1214

13-
from apify._configuration import Configuration
15+
from apify._configuration import Configuration as ApifyConfiguration
1416

1517
logger = logging.getLogger(__name__)
1618

@@ -22,6 +24,18 @@ class ApifyFileSystemKeyValueStoreClient(FileSystemKeyValueStoreClient):
2224
directory, except for the metadata file and the `INPUT.json` file.
2325
"""
2426

27+
def __init__(
28+
self,
29+
*,
30+
metadata: KeyValueStoreMetadata,
31+
path_to_kvs: Path,
32+
lock: asyncio.Lock,
33+
) -> None:
34+
super().__init__(metadata=metadata, path_to_kvs=path_to_kvs, lock=lock)
35+
global_configuration = ApifyConfiguration.get_global_configuration()
36+
self._input_key = global_configuration.input_key
37+
self._input_key_filename = global_configuration.input_key
38+
2539
@override
2640
@classmethod
2741
async def open(
@@ -34,7 +48,18 @@ async def open(
3448
) -> Self:
3549
client = await super().open(id=id, name=name, alias=alias, configuration=configuration)
3650

37-
await client._sanitize_input_json_files() # noqa: SLF001 - it's okay, this is a factory method
51+
if isinstance(configuration, ApifyConfiguration):
52+
client._input_key = configuration.input_key # noqa: SLF001 - it's okay, this is a factory method
53+
input_key_filename = cls._get_input_key_file_name(
54+
path_to_kvs=client.path_to_kvs, configuration=configuration
55+
)
56+
client._input_key_filename = input_key_filename # noqa: SLF001 - it's okay, this is a factory method
57+
input_file_path = client.path_to_kvs / input_key_filename
58+
input_file_metadata_path = client.path_to_kvs / f'{input_file_path}.{METADATA_FILENAME}'
59+
if input_file_path.exists() and not input_file_metadata_path.exists():
60+
await cls._create_missing_metadata_for_input_file(
61+
key=configuration.input_key, record_path=input_file_path
62+
)
3863

3964
return client
4065

@@ -43,14 +68,10 @@ async def purge(self) -> None:
4368
"""Purges the key-value store by deleting all its contents.
4469
4570
It deletes all files in the key-value store directory, except for the metadata file and
46-
the `INPUT.json` file. It also updates the metadata to reflect that the store has been purged.
71+
the input related file and its metadata.
4772
"""
48-
configuration = Configuration.get_global_configuration()
49-
5073
async with self._lock:
51-
files_to_keep = set(
52-
flatten([key, f'{key}.{METADATA_FILENAME}'] for key in configuration.input_key_candidates)
53-
)
74+
files_to_keep = {self._input_key_filename, f'{self._input_key_filename}.{METADATA_FILENAME}'}
5475
files_to_keep.add(METADATA_FILENAME)
5576

5677
for file_path in self.path_to_kvs.glob('*'):
@@ -64,40 +85,61 @@ async def purge(self) -> None:
6485
update_modified_at=True,
6586
)
6687

67-
async def _sanitize_input_json_files(self) -> None:
68-
"""Handle missing metadata for input files."""
69-
configuration = Configuration.get_global_configuration()
70-
alternative_keys = configuration.input_key_candidates - {configuration.canonical_input_key}
71-
72-
if (self.path_to_kvs / configuration.canonical_input_key).exists():
73-
# Refresh metadata to prevent inconsistencies
74-
input_data = await asyncio.to_thread(
75-
lambda: json.loads((self.path_to_kvs / configuration.canonical_input_key).read_text())
76-
)
77-
await self.set_value(key=configuration.canonical_input_key, value=input_data)
88+
@override
89+
async def get_value(self, *, key: str) -> KeyValueStoreRecord | None:
90+
if key == self._input_key:
91+
# Potentially point to custom input file name instead
92+
key = self._input_key_filename
93+
return await super().get_value(key=key)
7894

79-
for alternative_key in alternative_keys:
80-
if (alternative_input_file := self.path_to_kvs / alternative_key).exists():
81-
logger.warning(f'Redundant input file found: {alternative_input_file}')
95+
@staticmethod
96+
async def _create_missing_metadata_for_input_file(key: str, record_path: Path) -> None:
97+
# Read the actual value
98+
try:
99+
content = await asyncio.to_thread(record_path.read_bytes)
100+
except FileNotFoundError:
101+
logger.warning(f'Input file disparaged on path: "{record_path}"')
102+
return
103+
104+
# Figure out the metadata from the file content
105+
size = len(content)
106+
if record_path.suffix == '.json':
107+
value = json.loads(content.decode('utf-8'))
108+
elif record_path.suffix == '.txt':
109+
value = content.decode('utf-8')
110+
elif record_path.suffix == '':
111+
try:
112+
value = json.loads(content.decode('utf-8'))
113+
except json.JSONDecodeError:
114+
value = content
82115
else:
83-
for alternative_key in alternative_keys:
84-
alternative_input_file = self.path_to_kvs / alternative_key
116+
value = content
85117

86-
# Only process files that actually exist
87-
if alternative_input_file.exists():
88-
# Refresh metadata to prevent inconsistencies
89-
with alternative_input_file.open() as f:
90-
input_data = await asyncio.to_thread(lambda: json.load(f))
91-
await self.set_value(key=alternative_key, value=input_data)
118+
content_type = infer_mime_type(value)
92119

93-
@override
94-
async def get_value(self, *, key: str) -> KeyValueStoreRecord | None:
95-
configuration = Configuration.get_global_configuration()
120+
record_metadata = KeyValueStoreRecordMetadata(key=key, content_type=content_type, size=size)
121+
record_metadata_filepath = record_path.with_name(f'{record_path.name}.{METADATA_FILENAME}')
122+
record_metadata_content = await json_dumps(record_metadata.model_dump())
96123

97-
if key in configuration.input_key_candidates:
98-
for candidate in configuration.input_key_candidates:
99-
value = await super().get_value(key=candidate)
100-
if value is not None:
101-
return value
124+
# Write the record metadata to the file.
125+
await atomic_write(record_metadata_filepath, record_metadata_content)
102126

103-
return await super().get_value(key=key)
127+
@staticmethod
128+
def _get_input_key_file_name(path_to_kvs: Path, configuration: ApifyConfiguration) -> str:
129+
found_input_files = set()
130+
for file_path in chain(
131+
path_to_kvs.glob(f'{configuration.input_key}.*'), path_to_kvs.glob(f'{configuration.input_key}')
132+
):
133+
if str(file_path).endswith(METADATA_FILENAME):
134+
# Ignore metadata files
135+
continue
136+
found_input_files.add(file_path.name)
137+
138+
if len(found_input_files) > 1:
139+
raise RuntimeError(f'Only one input file is allowed. Following input files found: {found_input_files}')
140+
141+
if len(found_input_files) == 1:
142+
return found_input_files.pop()
143+
144+
# No custom input file found, return the default input key
145+
return configuration.input_key

src/apify/storage_clients/_file_system/_storage_client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
from ._key_value_store_client import ApifyFileSystemKeyValueStoreClient
1111

1212
if TYPE_CHECKING:
13+
from collections.abc import Hashable
14+
1315
from crawlee.storage_clients._file_system import FileSystemKeyValueStoreClient
1416

1517

@@ -21,6 +23,13 @@ class ApifyFileSystemStorageClient(FileSystemStorageClient):
2123
except for the metadata file and the `INPUT.json` file.
2224
"""
2325

26+
@override
27+
def get_storage_client_cache_key(self, configuration: Configuration) -> Hashable:
28+
# Ensure same cache key as the `FileSystemStorageClient` to prevent potential purging of the path twice.
29+
# If `FileSystemStorageClient` opens the storage first, it will be used even in successive open calls by
30+
# `ApifyFileSystemStorageClient` and vice versa.
31+
return FileSystemStorageClient().get_storage_client_cache_key(configuration)
32+
2433
@override
2534
async def create_kvs_client(
2635
self,

tests/unit/test_apify_storages.py

Lines changed: 142 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,27 @@
1+
import asyncio
2+
import json
13
from datetime import datetime, timezone
4+
from pathlib import Path
25
from unittest import mock
36
from unittest.mock import AsyncMock
47

58
import pytest
69

10+
from crawlee.storage_clients import FileSystemStorageClient
11+
from crawlee.storage_clients._file_system import FileSystemKeyValueStoreClient
712
from crawlee.storage_clients.models import StorageMetadata
813
from crawlee.storages._base import Storage
914

10-
from apify import Configuration
15+
from apify import Actor, Configuration
1116
from apify.storage_clients import ApifyStorageClient
1217
from apify.storage_clients._apify import ApifyDatasetClient, ApifyKeyValueStoreClient, ApifyRequestQueueClient
18+
from apify.storage_clients._file_system import ApifyFileSystemKeyValueStoreClient, ApifyFileSystemStorageClient
1319
from apify.storages import Dataset, KeyValueStore, RequestQueue
1420

21+
EXAMPLE_JSON_INPUT = json.dumps({'key': 'value'})
22+
EXAMPLE_TXT_INPUT = 'Best input ever'
23+
EXAMPLE_BYTES_INPUT = b'High quality bytes'
24+
1525

1626
@pytest.mark.parametrize(
1727
('storage', '_storage_client'),
@@ -61,3 +71,134 @@ def create_metadata(id: str) -> StorageMetadata:
6171
# Equivalent configuration results in same storage clients.
6272
assert storage_1 is storage_4
6373
assert storage_3 is storage_5
74+
75+
76+
async def test_no_double_purge_for_filesystem_storage_client() -> None:
77+
expected_value = 'some value'
78+
expected_key = 'some key'
79+
80+
async with Actor():
81+
await Actor.set_value(expected_key, expected_value)
82+
# RQ uses KVS under the hood for persistence, so it will try to open same default KVS as it was already opened,
83+
# but based on different client - FileSystemStorageClient.
84+
await Actor.open_request_queue()
85+
assert expected_value == await Actor.get_value(expected_key)
86+
87+
88+
async def test_first_filesystem_storage_client_wins() -> None:
89+
"""Test that when two different FileSystemStorageClient variants are used to open the same storage, they both use
90+
the same client that was used to open the storage first"""
91+
kvs_1 = await KeyValueStore.open(storage_client=ApifyFileSystemStorageClient())
92+
kvs_2 = await KeyValueStore.open(storage_client=FileSystemStorageClient())
93+
94+
kvs_3 = await KeyValueStore.open(name='a', storage_client=FileSystemStorageClient())
95+
kvs_4 = await KeyValueStore.open(name='a', storage_client=ApifyFileSystemStorageClient())
96+
97+
assert kvs_1 is kvs_2
98+
assert type(kvs_2._client) is ApifyFileSystemKeyValueStoreClient
99+
100+
assert kvs_3 is kvs_4
101+
assert type(kvs_4._client) is FileSystemKeyValueStoreClient
102+
103+
104+
@pytest.fixture(params=['INPUT', 'FOO'])
105+
def input_test_configuration(tmp_path: Path, request: pytest.FixtureRequest) -> Configuration:
106+
configuration = Configuration()
107+
configuration.input_key = request.param
108+
configuration.storage_dir = str(tmp_path)
109+
# Explicitly demand purge. Input file should survive this.
110+
configuration.purge_on_start = True
111+
112+
# Create custom key file without metadata in the KVS directory
113+
(tmp_path / 'key_value_stores' / 'default').mkdir(parents=True)
114+
return configuration
115+
116+
117+
async def test_multiple_input_file_formats_cause_error(input_test_configuration: Configuration) -> None:
118+
"""Test that having multiple input files causes an error, for example: `INPUT` and `INPUT.json`"""
119+
120+
# Create two input files in the KVS directory
121+
kvs_path = Path(input_test_configuration.storage_dir) / 'key_value_stores' / 'default'
122+
(kvs_path / f'{input_test_configuration.input_key}').write_bytes(EXAMPLE_BYTES_INPUT)
123+
(kvs_path / f'{input_test_configuration.input_key}.json').write_text(EXAMPLE_JSON_INPUT)
124+
125+
with pytest.raises(RuntimeError, match=r'Only one input file is allowed. Following input files found: .*'):
126+
await KeyValueStore.open(
127+
storage_client=ApifyFileSystemStorageClient(),
128+
configuration=input_test_configuration,
129+
)
130+
131+
132+
async def test_txt_input_missing_metadata(input_test_configuration: Configuration) -> None:
133+
"""Test that files with missing metadata can be used, and metadata is recreated."""
134+
135+
# Create custom key file without metadata in the KVS directory
136+
kvs_path = Path(input_test_configuration.storage_dir) / 'key_value_stores' / 'default'
137+
input_file = kvs_path / f'{input_test_configuration.input_key}.txt'
138+
input_file.write_text(EXAMPLE_TXT_INPUT)
139+
last_modified = input_file.stat().st_mtime
140+
141+
# Make sure that filesystem has enough time to detect changes
142+
await asyncio.sleep(1)
143+
144+
kvs = await KeyValueStore.open(
145+
storage_client=ApifyFileSystemStorageClient(), configuration=input_test_configuration
146+
)
147+
assert await kvs.get_value(input_test_configuration.input_key) == EXAMPLE_TXT_INPUT
148+
assert last_modified == input_file.stat().st_mtime, 'File was modified or recreated.'
149+
150+
151+
@pytest.mark.parametrize('suffix', [('.json'), ('')])
152+
async def test_json_input_missing_metadata(input_test_configuration: Configuration, suffix: str) -> None:
153+
"""Test that files with missing metadata can be used, and metadata is recreated."""
154+
155+
# Create custom key file without metadata in the KVS directory
156+
kvs_path = Path(input_test_configuration.storage_dir) / 'key_value_stores' / 'default'
157+
input_file = kvs_path / f'{input_test_configuration.input_key}{suffix}'
158+
input_file.write_text(EXAMPLE_JSON_INPUT)
159+
last_modified = input_file.stat().st_mtime
160+
161+
# Make sure that filesystem has enough time to detect changes
162+
await asyncio.sleep(1)
163+
164+
kvs = await KeyValueStore.open(
165+
storage_client=ApifyFileSystemStorageClient(), configuration=input_test_configuration
166+
)
167+
assert json.loads(EXAMPLE_JSON_INPUT) == await kvs.get_value(input_test_configuration.input_key)
168+
assert last_modified == input_file.stat().st_mtime, 'File was modified or recreated.'
169+
170+
171+
@pytest.mark.parametrize('suffix', [('.bin'), (''), ('.whatever')])
172+
async def test_bytes_input_missing_metadata(input_test_configuration: Configuration, suffix: str) -> None:
173+
"""Test that files with missing metadata can be used, and metadata is recreated."""
174+
175+
# Create custom key file without metadata in the KVS directory
176+
kvs_path = Path(input_test_configuration.storage_dir) / 'key_value_stores' / 'default'
177+
input_file = kvs_path / f'{input_test_configuration.input_key}{suffix}'
178+
input_file.write_bytes(EXAMPLE_BYTES_INPUT)
179+
last_modified = input_file.stat().st_mtime
180+
181+
# Make sure that filesystem has enough time to detect changes
182+
await asyncio.sleep(1)
183+
184+
kvs = await KeyValueStore.open(
185+
storage_client=ApifyFileSystemStorageClient(), configuration=input_test_configuration
186+
)
187+
assert await kvs.get_value(input_test_configuration.input_key) == EXAMPLE_BYTES_INPUT
188+
assert last_modified == input_file.stat().st_mtime, 'File was modified or recreated.'
189+
190+
191+
async def test_pre_existing_input_not_deleted_in_actor_context(input_test_configuration: Configuration) -> None:
192+
"""Test that pre-existing INPUT file is never deleted as long as the Actor context was started first."""
193+
194+
# Create custom key file without metadata in the KVS directory
195+
kvs_path = Path(input_test_configuration.storage_dir) / 'key_value_stores' / 'default'
196+
input_file = kvs_path / f'{input_test_configuration.input_key}'
197+
input_file.write_bytes(EXAMPLE_BYTES_INPUT)
198+
199+
async with Actor(configuration=input_test_configuration):
200+
# Storage client that is not aware of the input file and could delete it during purge.
201+
storage_client = FileSystemStorageClient()
202+
# Unless already implicitly opened by Actor, the input file would be deleted.
203+
await KeyValueStore.open(storage_client=storage_client, configuration=input_test_configuration)
204+
assert await Actor.get_input() == EXAMPLE_BYTES_INPUT

0 commit comments

Comments
 (0)