From 30d5a8512443cebaa4e0467531a57d0855eab554 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Thu, 15 Jan 2026 11:38:44 +0100 Subject: [PATCH] Do not expire attachments if used in workspace bucket --- cronjobs/src/commands/__init__.py | 30 ++++++- .../src/commands/expire_orphan_attachments.py | 5 +- cronjobs/tests/commands/test_commands.py | 84 +++++++++++++++++++ .../test_expire_orphan_attachments.py | 3 +- 4 files changed, 117 insertions(+), 5 deletions(-) create mode 100644 cronjobs/tests/commands/test_commands.py diff --git a/cronjobs/src/commands/__init__.py b/cronjobs/src/commands/__init__.py index 132a1f13..30d8a253 100644 --- a/cronjobs/src/commands/__init__.py +++ b/cronjobs/src/commands/__init__.py @@ -83,7 +83,7 @@ def call_parallel(func, args_list, max_workers=PARALLEL_REQUESTS): return results -def fetch_all_changesets(client): +def fetch_all_changesets(client, with_workspace_buckets: bool = False): """ Return the `/changeset` responses for all collections listed in the `monitor/changes` endpoint. @@ -92,11 +92,37 @@ def fetch_all_changesets(client): """ monitor_changeset = client.get_changeset("monitor", "changes", bust_cache=True) print("%s collections" % len(monitor_changeset["changes"])) - args_list = [ (c["bucket"], c["collection"], c["last_modified"]) for c in monitor_changeset["changes"] ] + + if with_workspace_buckets: + # For each collection exposed in the monitor/changes endpoint, + # we will look for its corresponding workspace bucket using the + # info exposed in the `signer` capability. + server_info = client.server_info() + try: + resources = server_info["capabilities"]["signer"]["resources"] + except KeyError: + raise RuntimeError( + "Cannot fetch workspace buckets: signer not enabled on server" + ) + + # Walk through all monitored changesets, and for each one, + # add the corresponding workspace collection. We do this only using the + # destination, not the preview, to avoid adding them twice. + for monitored_changeset in monitor_changeset["changes"]: + bucket = monitored_changeset["bucket"] + for resource in resources: + if bucket == resource["destination"]["bucket"]: + source_bucket = resource["source"]["bucket"] + args_list.append( + # _expected=0 for workspace collections. + (source_bucket, monitored_changeset["collection"], 0) + ) + break + all_changesets = call_parallel( lambda bid, cid, ts: client.get_changeset(bid, cid, _expected=ts), args_list ) diff --git a/cronjobs/src/commands/expire_orphan_attachments.py b/cronjobs/src/commands/expire_orphan_attachments.py index 5bec75e0..d25cf722 100644 --- a/cronjobs/src/commands/expire_orphan_attachments.py +++ b/cronjobs/src/commands/expire_orphan_attachments.py @@ -17,6 +17,7 @@ "STORAGE_BUCKET_NAME", f"remote-settings-{REALM}-{ENVIRONMENT}-attachments" ) BATCH_SIZE = int(os.getenv("BATCH_SIZE", "100")) +AUTH = os.getenv("AUTH") def expire_orphan_attachments(event, context): @@ -28,8 +29,8 @@ def expire_orphan_attachments(event, context): Our `git_export` job will then also query GCS objects in order to purge files from the tree that 404s on the server. """ - client = KintoClient(server_url=SERVER) - all_changesets = fetch_all_changesets(client) + client = KintoClient(server_url=SERVER, auth=AUTH) + all_changesets = fetch_all_changesets(client, with_workspace_buckets=True) attachments = set() total_size = 0 diff --git a/cronjobs/tests/commands/test_commands.py b/cronjobs/tests/commands/test_commands.py new file mode 100644 index 00000000..236e766b --- /dev/null +++ b/cronjobs/tests/commands/test_commands.py @@ -0,0 +1,84 @@ +import responses +from commands import KintoClient, fetch_all_changesets + + +@responses.activate +def test_fetch_all_changesets(): + responses.add( + responses.GET, + "http://testserver:9999/v1/", + json={ + "capabilities": { + "signer": { + "resources": [ + { + "source": { + "bucket": "main-workspace", + }, + "preview": { + "bucket": "main-preview", + }, + "destination": { + "bucket": "main", + }, + } + ] + } + } + }, + ) + responses.add( + responses.GET, + "http://testserver:9999/v1/buckets/monitor/collections/changes/changeset", + json={ + "changes": [ + { + "id": "a", + "bucket": "main", + "collection": "search-config", + "last_modified": 1620000000000, + }, + { + "id": "b", + "bucket": "main-preview", + "collection": "search-config", + "last_modified": 1620000001000, + }, + ] + }, + ) + + for i, bucket in enumerate(["main-workspace", "main-preview", "main"]): + responses.add( + responses.GET, + f"http://testserver:9999/v1/buckets/{bucket}/collections/search-config/changeset", + json={ + "changes": [ + {"id": f"record-{i}", "last_modified": i}, + ] + }, + ) + + client = KintoClient( + server_url="http://testserver:9999/v1/", + auth=("user", "pass"), + ) + results = fetch_all_changesets(client, with_workspace_buckets=True) + + assert results == [ + { + "changes": [ + {"id": "record-2", "last_modified": 2}, + ] + }, + { + "changes": [ + {"id": "record-1", "last_modified": 1}, + ] + }, + { + "changes": [ + {"id": "record-0", "last_modified": 0}, + ] + }, + ] diff --git a/cronjobs/tests/commands/test_expire_orphan_attachments.py b/cronjobs/tests/commands/test_expire_orphan_attachments.py index 6027c55e..f2bb11b9 100644 --- a/cronjobs/tests/commands/test_expire_orphan_attachments.py +++ b/cronjobs/tests/commands/test_expire_orphan_attachments.py @@ -1,5 +1,5 @@ from datetime import datetime -from unittest.mock import MagicMock, patch +from unittest.mock import ANY, MagicMock, patch import pytest import responses @@ -84,6 +84,7 @@ def patch(self): expire_orphan_attachments(None, None) assert patched_blobs == {"folder1/orphan1.bin", "folder2/orphan2.png"} + mock_fetch_all_changesets.assert_called_with(ANY, with_workspace_buckets=True) @responses.activate