-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcleanup.py
80 lines (63 loc) · 2.26 KB
/
cleanup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import datetime
import time
import concurrent.futures
import threading
import arvados
import arvados.util
arv_client = arvados.api()
# 1. Make a list of arvados-cwl-runner containers with nonempty requesting_container_uuid
# 2. Iterate over intermediate collections more than 1 week old, starting from oldest
# 3. Delete collection unless container_request is in the list from (1)
protected_requests = set()
for container_request in arvados.util.keyset_list_all(
arv_client.container_requests().list,
filters=[
["command", "like", "[\"arvados-cwl-runner%"],
["requesting_container_uuid", "!=", None]
],
select=["uuid"]):
protected_requests.add(container_request["uuid"])
before = datetime.datetime.utcnow() - datetime.timedelta(days=7)
before_timestamp = before.isoformat("T") + 'Z'
executor = concurrent.futures.ThreadPoolExecutor(4)
class AtomicCounter:
def __init__(self, initial=0):
"""Initialize a new atomic counter to given initial value (default 0)."""
self.value = initial
self._lock = threading.Lock()
def increment(self, num=1):
"""Atomically increment the counter by num (default 1) and return the
new value.
"""
with self._lock:
self.value += num
return self.value
# For reporting progress
count = AtomicCounter()
# So we don't have too many futures in flight at once,
# which eventually will consume RAM
limiter = threading.Semaphore(1001)
def delete_item(uuid):
arv_client.collections().delete(uuid=uuid).execute()
v = count.increment(1)
if v % 1000 == 0:
print(v, flush=True)
limiter.release()
skip = 0
start = time.time()
print("start", flush=True)
for col in arvados.util.keyset_list_all(
arv_client.collections().list,
filters=[
["properties.type", "in", ["intermediate", "log"]],
["properties", "exists", "container_request"],
["modified_at", "<", before_timestamp]
],
select=["uuid", "properties"]):
if col["properties"]["container_request"] not in protected_requests:
limiter.acquire()
executor.submit(delete_item, col["uuid"])
else:
skip += 1
executor.shutdown()
print("count", count.value, "skip", skip)