Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions cronjobs/src/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def call_parallel(func, args_list, max_workers=PARALLEL_REQUESTS):
return results


def fetch_all_changesets(client):
def fetch_all_changesets(client, with_workspace_buckets: bool = False):
"""
Return the `/changeset` responses for all collections listed
in the `monitor/changes` endpoint.
Expand All @@ -92,11 +92,37 @@ def fetch_all_changesets(client):
"""
monitor_changeset = client.get_changeset("monitor", "changes", bust_cache=True)
print("%s collections" % len(monitor_changeset["changes"]))

args_list = [
(c["bucket"], c["collection"], c["last_modified"])
for c in monitor_changeset["changes"]
]

if with_workspace_buckets:
# For each collection exposed in the monitor/changes endpoint,
# we will look for its corresponding workspace bucket using the
# info exposed in the `signer` capability.
server_info = client.server_info()
try:
resources = server_info["capabilities"]["signer"]["resources"]
except KeyError:
raise RuntimeError(
"Cannot fetch workspace buckets: signer not enabled on server"
)

# Walk through all monitored changesets, and for each one,
# add the corresponding workspace collection. We do this only using the
# destination, not the preview, to avoid adding them twice.
for monitored_changeset in monitor_changeset["changes"]:
bucket = monitored_changeset["bucket"]
for resource in resources:
if bucket == resource["destination"]["bucket"]:
source_bucket = resource["source"]["bucket"]
args_list.append(
# _expected=0 for workspace collections.
(source_bucket, monitored_changeset["collection"], 0)
)
break

all_changesets = call_parallel(
lambda bid, cid, ts: client.get_changeset(bid, cid, _expected=ts), args_list
)
Expand Down
5 changes: 3 additions & 2 deletions cronjobs/src/commands/expire_orphan_attachments.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"STORAGE_BUCKET_NAME", f"remote-settings-{REALM}-{ENVIRONMENT}-attachments"
)
BATCH_SIZE = int(os.getenv("BATCH_SIZE", "100"))
AUTH = os.getenv("AUTH")


def expire_orphan_attachments(event, context):
Expand All @@ -28,8 +29,8 @@ def expire_orphan_attachments(event, context):
Our `git_export` job will then also query GCS objects in order
to purge files from the tree that 404s on the server.
"""
client = KintoClient(server_url=SERVER)
all_changesets = fetch_all_changesets(client)
client = KintoClient(server_url=SERVER, auth=AUTH)
all_changesets = fetch_all_changesets(client, with_workspace_buckets=True)

attachments = set()
total_size = 0
Expand Down
84 changes: 84 additions & 0 deletions cronjobs/tests/commands/test_commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import responses
from commands import KintoClient, fetch_all_changesets


@responses.activate
def test_fetch_all_changesets():
responses.add(
responses.GET,
"http://testserver:9999/v1/",
json={
"capabilities": {
"signer": {
"resources": [
{
"source": {
"bucket": "main-workspace",
},
"preview": {
"bucket": "main-preview",
},
"destination": {
"bucket": "main",
},
}
]
}
}
},
)
responses.add(
responses.GET,
"http://testserver:9999/v1/buckets/monitor/collections/changes/changeset",
json={
"changes": [
{
"id": "a",
"bucket": "main",
"collection": "search-config",
"last_modified": 1620000000000,
},
{
"id": "b",
"bucket": "main-preview",
"collection": "search-config",
"last_modified": 1620000001000,
},
]
},
)

for i, bucket in enumerate(["main-workspace", "main-preview", "main"]):
responses.add(
responses.GET,
f"http://testserver:9999/v1/buckets/{bucket}/collections/search-config/changeset",
json={
"changes": [
{"id": f"record-{i}", "last_modified": i},
]
},
)

client = KintoClient(
server_url="http://testserver:9999/v1/",
auth=("user", "pass"),
)
results = fetch_all_changesets(client, with_workspace_buckets=True)

assert results == [
{
"changes": [
{"id": "record-2", "last_modified": 2},
]
},
{
"changes": [
{"id": "record-1", "last_modified": 1},
]
},
{
"changes": [
{"id": "record-0", "last_modified": 0},
]
},
]
3 changes: 2 additions & 1 deletion cronjobs/tests/commands/test_expire_orphan_attachments.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from unittest.mock import MagicMock, patch
from unittest.mock import ANY, MagicMock, patch

import pytest
import responses
Expand Down Expand Up @@ -84,6 +84,7 @@ def patch(self):
expire_orphan_attachments(None, None)

assert patched_blobs == {"folder1/orphan1.bin", "folder2/orphan2.png"}
mock_fetch_all_changesets.assert_called_with(ANY, with_workspace_buckets=True)


@responses.activate
Expand Down
Loading