Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 64 additions & 67 deletions bin/check-workspace-attachments.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
"""

import asyncio
import datetime
import itertools
import os
import tempfile

import kinto_http
import requests
Expand Down Expand Up @@ -56,6 +56,53 @@ async def fetch(url):
return [url for url, success in zip(urls, results) if not success]


def rewrite_from_scratch(bucket, blob_name):
"""
Since GCS does not let us remove the `custom_time` field which
is used in the retention policy rule, we rewrite the object from
scratch.

Note: `custom_time` is not overridable during `rewrite()`.
"""
with tempfile.NamedTemporaryFile(dir="/tmp", delete=False) as tmp:
tmp_path = tmp.name
print(f"Rewrite gs://{STORAGE_BUCKET_NAME}/{blob_name} using backup at {tmp_path}", end=" ")
# Download
blob = bucket.blob(blob_name)
blob.download_to_filename(tmp_path)
print(".", end="")
# Delete all generations
versions = bucket.list_blobs(prefix=blob_name, versions=True)
print(".", end="")
bucket.delete_blobs(list(versions))
print(".", end="")
# Re-upload (same object name, new generation)
new_blob = bucket.blob(blob_name)
new_blob.metadata = blob.metadata
new_blob.content_type = blob.content_type
new_blob.upload_from_filename(tmp_path)
print(".", end="")
new_blob.reload()
assert new_blob.custom_time is None, (
f"{blob_name} has custom time as {new_blob.custom_time}"
)
print(". Done.")


async def list_all_attachments(client, collections):
records = await asyncio.gather(
*(client.get_records(bucket=bid, collection=cid) for bid, cid in collections)
)
return set(
[
r["attachment"]["location"]
for records in records
for r in records
if "attachment" in r
]
)


async def main():
client = kinto_http.AsyncClient(server_url=SERVER_URL, auth=AUTH)

Expand All @@ -67,21 +114,7 @@ async def main():
all_collections = list(itertools.chain.from_iterable(results))
print(len(all_collections), "collections to analyze")

# List all attachments in workspace buckets.
all_workspace_records = await asyncio.gather(
*(
client.get_records(bucket=bid, collection=cid)
for bid, cid in all_collections
)
)
all_workspace_attachments = set(
[
r["attachment"]["location"]
for records in all_workspace_records
for r in records
if "attachment" in r
]
)
all_workspace_attachments = await list_all_attachments(client, all_collections)
print(f"Found {len(all_workspace_attachments)} draft attachments in total")

# Now list all attachments in preview buckets.
Expand All @@ -97,19 +130,8 @@ async def main():
for bid, cid in all_collections
if cid not in without_preview_collection
]
all_preview_records = await asyncio.gather(
*(
client.get_records(bucket=bid, collection=cid)
for bid, cid in all_preview_collections
)
)
all_preview_attachments = set(
[
r["attachment"]["location"]
for records in all_preview_records
for r in records
if "attachment" in r
]
all_preview_attachments = await list_all_attachments(
client, all_preview_collections
)

# Now list attachments in main buckets.
Expand All @@ -118,34 +140,17 @@ async def main():
"security-state-staging": "security-state",
"staging": "blocklists",
}
all_main_collections = [
(main_buckets[bid], cid)
for bid, cid in all_collections
if cid not in without_preview_collection
]
all_main_records = await asyncio.gather(
*(
client.get_records(bucket=bid, collection=cid)
for bid, cid in all_main_collections
)
)
all_main_attachments = set(
[
r["attachment"]["location"]
for records in all_main_records
for r in records
if "attachment" in r
]
)
all_main_collections = [(main_buckets[bid], cid) for bid, cid in all_collections]
all_main_attachments = await list_all_attachments(client, all_main_collections)

# Check which of the only_workspace_attachments are missing from the server.
# We only check these since they are not checked by our Telescope attachments checks.
only_workspace_attachments = (
all_workspace_attachments - all_preview_attachments - all_main_attachments
)
print(
f"{len(only_workspace_attachments)} attachments are only referenced in workspace buckets"
)

# Check which of the only_workspace_attachments are missing from the server.
server_info = await client.server_info()
base_url = server_info["capabilities"]["attachments"]["base_url"]
missing = await check_urls(
Expand All @@ -155,7 +160,7 @@ async def main():
print("\n".join(missing))

# Now check which GCS attachments are marked for deletion. And make sure that we do not
# have attachments referenced in preview and main buckets that are marked for deletion.
# have attachments referenced in buckets that are marked for deletion.
print("Checking GCS for deletion marks...")
storage_client = storage.Client()
bucket = storage_client.bucket(STORAGE_BUCKET_NAME)
Expand All @@ -166,30 +171,22 @@ async def main():
print(f"{len(marked_for_deletion)} attachments are marked for deletion in GCS.")

# Now check which of the only_workspace_attachments are marked for deletion.
to_postpone_deletion = []
to_reset = set()
for live_attachments in (
all_workspace_attachments,
all_preview_attachments,
all_main_attachments,
):
if marked := marked_for_deletion & live_attachments:
to_postpone_deletion.extend(marked)
if marked := (marked_for_deletion & live_attachments):
to_reset.update(marked)

if to_postpone_deletion:
if to_reset:
print(
f"⚠️ {len(to_postpone_deletion)} attachments referenced in workspace/preview/main buckets are marked for deletion in GCS."
f"⚠️ {len(to_reset)} attachments referenced in workspace/preview/main buckets are marked for deletion in GCS."
)
with storage_client.batch():
for blob_name in to_postpone_deletion:
blob = bucket.blob(blob_name)
# Once set, custom_time cannot be removed. We set it to date in the future.
blob.custom_time = datetime.datetime.now(
datetime.timezone.utc
) + datetime.timedelta(days=POSTPONE_DELETION_MARK_DAYS)
blob.patch()
print(
f"Postponed deletion mark of gs://{STORAGE_BUCKET_NAME}/{blob.name} to {blob.custom_time}"
)
for blob_name in to_reset:
rewrite_from_scratch(bucket, blob_name)
print(f"Removed deletion mark of gs://{STORAGE_BUCKET_NAME}/{blob_name}")
else:
print(
"✅ No attachment referenced in workspace/preview/main buckets is marked for deletion in GCS."
Expand Down
4 changes: 3 additions & 1 deletion cronjobs/src/commands/git_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,9 @@ def process_attachments(
if existing := existing_attachments.get(location):
existing_hash, existing_size = existing
if existing_hash != hash or existing_size != size:
print(f"Bundle {path} {'is new' if existing_hash is None else 'has changed'}")
print(
f"Bundle {path} {'is new' if existing_hash is None else 'has changed'}"
)
changed_attachments.append((hash, size, url))
return changed_attachments, common_content

Expand Down
2 changes: 1 addition & 1 deletion cronjobs/tests/commands/test_git_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def mock_rs_server_content():
},
"config": {
"modified": "2024-01-01T00:00:00Z",
}
},
},
)

Expand Down
Loading