Skip to content

Commit d12cd98

Browse files
committed
[unfinished] card locking for safe sidecar updates
- created a locking mechanism to ensure cards are locked to individual subprocesses. - introduced `LockableCardDict` which helps lock cards per subprocess - introduced `CardStateManager` to save the state of components/data updates during task execution ## life-cycle of cards 1. Pre-task execution : - @card decorator instantiates a CardComponentCollector with a LockableCardDict set to `pre_task_startup=True` and empty dictionary for it's `card_component_store` - This ensures that new CardComponentManagers can be trivially added by the main process. - Once main process is done adding all cards, we run `finalize` method : - it will set `pre_task_startup=False` and set post_task_startup=True - it will instantiate the locks. Won't check if they are already possessed by others since the code is being run in the main process and task execution has not started yet; (All lock alteration happens once user code starts executing). - Upon finishing finalize `current.card` will also dump the state of `current.card` for being available to any outside subprocesses. 2. Task Execution: - Main process : - Which ever cards the main thread update's will be locked by the main thread. - The cards which are not accessed / updated by the main thread, can be locked by other processes. - Locked cards will provide the same type of object back to the user, but the `card_proc` method will be replaced with a method that will warn the user that the card is locked. - Subprocesses: - calls the `get_runtime_card` method to get the `current.card` object - the method will seek out the `current.card`'s state location and load the state. - the state of the card specifies what ever task spec's component collector's state - will be able to call render_runtime and refresh from the cli - `refresh` will also dump the state of the components to disk. - [todo] when finished will call `current.card.done()` which will dump the state of the components/data-updates to disk - [todo] no other processes will also be able to lock "done" cards - [todo] If a subprocess is gracefully shutdown, then: - [todo] the card manager will call `current.card.done()` which will avoid writes from other processes 3. Post Task Execution: - sub processes: - Should have **ideally** ended and released all locks and ran the finalize method. - If not then what ever is the last component state dumped in the subprocess will be the components loaded for the card. - Main process: - For each card, - call the render method. - [todo] If the card was locked by an outside process then seek it's component state and load it. - if the card
1 parent 17ec166 commit d12cd98

File tree

5 files changed

+363
-47
lines changed

5 files changed

+363
-47
lines changed

metaflow/plugins/cards/card_creator.py

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@
99
ASYNC_TIMEOUT = 30
1010

1111

12+
def warning_message(message, logger=None, ts=False):
13+
msg = "[@card WARNING] %s" % message
14+
if logger:
15+
logger(msg, timestamp=ts, bad=True)
16+
17+
1218
class CardProcessManager:
1319
"""
1420
This class is responsible for managing the card creation processes.
@@ -44,18 +50,25 @@ def _remove_card_process(cls, carduuid):
4450

4551

4652
class CardCreator:
53+
54+
state_manager = None # of type `CardStateManager`
55+
4756
def __init__(
4857
self,
49-
top_level_options=None,
58+
base_command=None,
5059
pathspec=None,
5160
# TODO Add a pathspec somewhere here so that it can instantiate correctly.
5261
):
53-
self._top_level_options = top_level_options
62+
self._base_command = base_command
5463
self._pathspec = pathspec
5564

65+
@property
66+
def pathspec(self):
67+
return self._pathspec
68+
5669
def _dump_state_to_dict(self):
5770
return {
58-
"top_level_options": self._top_level_options,
71+
"base_command": self._base_command,
5972
"pathspec": self._pathspec,
6073
}
6174

@@ -124,12 +137,13 @@ def _run_cards_subprocess(
124137
json.dump(data, data_file)
125138
data_file.seek(0)
126139

127-
executable = sys.executable
128-
cmd = [
129-
executable,
130-
sys.argv[0],
131-
]
132-
cmd += self._top_level_options + [
140+
if self.state_manager is not None:
141+
self.state_manager._save_card_state(
142+
card_uuid, components=component_strings, data=data
143+
)
144+
145+
cmd = []
146+
cmd += self._base_command + [
133147
"card",
134148
"create",
135149
runspec,
@@ -169,6 +183,7 @@ def _run_cards_subprocess(
169183
timeout=decorator_attributes["timeout"],
170184
wait=wait,
171185
)
186+
# warning_message(str(cmd), logger)
172187
if fail:
173188
resp = "" if response is None else response.decode("utf-8")
174189
logger(

metaflow/plugins/cards/card_decorator.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,6 @@ def __init__(self, *args, **kwargs):
8383
self._current_step = None
8484
self._state_manager = None # This is set on a per TASK basis
8585

86-
@classmethod
87-
def _set_collector_state_dump_file(cls, file):
88-
cls._collector_state_dump_file = file
89-
9086
@classmethod
9187
def _set_card_creator(cls, card_creator):
9288
cls.card_creator = card_creator
@@ -186,7 +182,7 @@ def task_pre_step(
186182
self._register_event("pre-step")
187183
self._set_card_creator(
188184
CardCreator(
189-
top_level_options=self._create_top_level_args(),
185+
base_command=self._create_base_command(),
190186
pathspec=pathspec,
191187
)
192188
)
@@ -230,7 +226,8 @@ def task_exception(
230226
def _dump_state_of_collector_to_file(self, pathspec):
231227
# Create a directory in the DATASTORE_LOCAL_DIR like `DATASTORE_LOCAL_DIR/`
232228
self._state_manager = CardStateManager(pathspec)
233-
self._state_manager.save(current.card._dump_state_to_dict())
229+
self.card_creator.state_manager = self._state_manager
230+
self._state_manager.component_collector.save(current.card._dump_state_to_dict())
234231

235232
def task_finished(
236233
self, step_name, flow, graph, is_task_ok, retry_count, max_user_code_retries
@@ -250,6 +247,7 @@ def task_finished(
250247
if is_task_ok:
251248
self.card_creator.create(mode="render", **create_options)
252249
self.card_creator.create(mode="refresh", final=True, **create_options)
250+
current.card._finished()
253251
if self._state_manager is not None:
254252
self._state_manager.done()
255253

@@ -264,7 +262,7 @@ def _options(mapping):
264262
if not isinstance(value, bool):
265263
yield to_unicode(value)
266264

267-
def _create_top_level_args(self):
265+
def _create_base_command(self):
268266
top_level_options = {
269267
"quiet": True,
270268
"metadata": self._metadata.TYPE,
@@ -277,4 +275,6 @@ def _create_top_level_args(self):
277275
# We don't provide --with as all execution is taking place in
278276
# the context of the main process
279277
}
280-
return list(self._options(top_level_options))
278+
executable = sys.executable
279+
cmd = [executable, sys.argv[0]]
280+
return cmd + list(self._options(top_level_options))
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
from .runtime_collector_state import _get_card_state_directory
2+
import fcntl
3+
import os
4+
import functools
5+
6+
7+
class CardLock:
8+
9+
POSSIBLE_STATES = ["locked_by_me", "unlocked", "locked_by_other"]
10+
11+
lock_base_path = None
12+
13+
def __init__(self, carduuid):
14+
self._carduuid = carduuid
15+
self._lock_path = os.path.join(self.lock_base_path, self._carduuid + ".lock")
16+
self._lock_file = None
17+
self._current_state = "unlocked"
18+
19+
def _try_to_acquire_lock(self):
20+
try:
21+
lock_file = open(self._lock_path, "w")
22+
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
23+
return lock_file
24+
except BlockingIOError:
25+
return False
26+
27+
@property
28+
def owned_by_others(self):
29+
return self._current_state == "locked_by_other"
30+
31+
@property
32+
def owns_the_lock(self):
33+
return self._current_state == "locked_by_me"
34+
35+
@property
36+
def seems_unlocked(self):
37+
return self._current_state == "unlocked"
38+
39+
def test_lockability(self):
40+
if self.owns_the_lock:
41+
return True
42+
elif self.seems_unlocked:
43+
_lock_file = self._try_to_acquire_lock()
44+
if _lock_file:
45+
_lock_file.close()
46+
return True
47+
self._current_state = "locked_by_other"
48+
return False
49+
else:
50+
return False
51+
52+
def lock(self):
53+
if self.owns_the_lock:
54+
return True
55+
elif self.seems_unlocked:
56+
self._lock_file = self._try_to_acquire_lock()
57+
if self._lock_file:
58+
self._current_state = "locked_by_me"
59+
return True
60+
else:
61+
self._current_state = "locked_by_other"
62+
return False
63+
else:
64+
return False
65+
66+
def unlock(self):
67+
if self.owns_the_lock:
68+
fcntl.flock(self._lock_file, fcntl.LOCK_UN)
69+
self._lock_file.close()
70+
self._current_state = "unlocked"
71+
return True
72+
else:
73+
return False
74+
75+
def __del__(self):
76+
if self.owns_the_lock:
77+
self.unlock()
78+
79+
80+
class LockableCardDict:
81+
"""
82+
This is a class that will help us ensure that when `current.card` interface is called in an outside subprocess,
83+
then we are able to call the `current.card` interface in outside subprocesses without any issues.
84+
85+
Some constraints which the `LockableCardDict` aims to fulfill:
86+
87+
> No two processes can lock the same card at the same time. Once a card is locked by a process, no other process can lock it.
88+
(Meaning the first process to lock the card locks it for-ever. No other process can lock it after that).
89+
90+
The life-cycle of cards is as follows:
91+
1. Pre-task execution :
92+
- @card decorator instantiates a CardComponentCollector with a LockableCardDict set to `pre_task_startup=True` and empty dictionary for it's `card_component_store`
93+
- This ensures that new CardComponentManagers can be trivially added by the main process.
94+
- Once main process is done adding all cards, we run `finalize` method :
95+
- it will set `pre_task_startup=False` and set post_task_startup=True
96+
- it will instantiate the locks. Won't check if they are already possessed by others since the code is being run in the main process and
97+
task execution has not started yet; (All lock alteration happens once user code starts executing).
98+
- Upon finishing finalize `current.card` will also dump the state of `current.card` for being available to any outside subprocesses.
99+
2. Task Execution:
100+
- Main process :
101+
- Which ever cards the main thread update's will be locked by the main thread.
102+
- The cards which are not accessed / updated by the main thread, can be locked by other processes.
103+
- Locked cards will provide the same type of object back to the user, but the `card_proc` method will be replaced with a
104+
method that will warn the user that the card is locked.
105+
- Subprocesses:
106+
- calls the `get_runtime_card` method to get the `current.card` object
107+
- the method will seek out the `current.card`'s state location and load the state.
108+
- the state of the card specifies what ever task spec's component collector's state
109+
- will be able to call render_runtime and refresh from the cli
110+
- `refresh` will also dump the state of the components to disk.
111+
- [todo] when finished will call `current.card.done()` which will dump the state of the components/data-updates to disk
112+
- [todo] no other processes will also be able to lock "done" cards
113+
- [todo] If a subprocess is gracefully shutdown, then:
114+
- [todo] the card manager will call `current.card.done()` which will avoid writes from other processes
115+
3. Post Task Execution:
116+
- sub processes:
117+
- Should have **ideally** ended and released all locks and ran the finalize method.
118+
- If not then what ever is the last component state dumped in the subprocess will be the components loaded for the card.
119+
- Main process:
120+
- For each card,
121+
- call the render method.
122+
- [todo] If the card was locked by an outside process then seek it's component state and load it.
123+
- if the card was not locked by an outside process then use the in-memory component state of the card.
124+
"""
125+
126+
def __init__(
127+
self,
128+
card_component_store,
129+
lock_base_path,
130+
pre_task_startup=False,
131+
post_task_startup=True,
132+
):
133+
self._card_component_store = card_component_store
134+
self._card_locks = {}
135+
self._lock_base_path = lock_base_path
136+
self._pre_task_startup = pre_task_startup
137+
self._post_task_startup = post_task_startup
138+
self._init_locks()
139+
140+
def _init_locks(self):
141+
if self._pre_task_startup:
142+
return
143+
144+
CardLock.lock_base_path = self._lock_base_path
145+
for carduuid in self._card_component_store:
146+
self._card_locks[carduuid] = CardLock(carduuid)
147+
148+
if not self._post_task_startup:
149+
return
150+
151+
# Since locks are initializing here we can test lockability here.
152+
# As this code may be instantiated in a subprocess outside the main process,
153+
# we need to ensure that when then LockableCardDict initializes, it can keeps only
154+
# unlocked cards in the card_component_store.
155+
# At this point when the code is running, the owning process of this object will
156+
# not have any locks that it owns since all objects are being initialized.
157+
for carduuid in self._card_component_store:
158+
self._card_locks[carduuid].test_lockability()
159+
if not self._card_locks[carduuid].seems_unlocked:
160+
self._disable_rendering_for_locked_card(carduuid)
161+
162+
def finalize(self):
163+
# This method will only be called once by the CardComponent collector in scope of the
164+
# @card decorator
165+
self._pre_task_startup = False
166+
self._init_locks()
167+
self._post_task_startup = True
168+
169+
def _disable_rendering_for_locked_card(self, carduuid):
170+
def _wrap_method(instance, method_name):
171+
def wrapper(*args, **kwargs):
172+
instance._warning(
173+
"Card is locked by another process. "
174+
"This card will not be rendered/refreshed."
175+
)
176+
return None
177+
178+
# Bind the new method to the instance
179+
setattr(instance, method_name, wrapper)
180+
181+
_wrap_method(self._card_component_store[carduuid], "_card_proc")
182+
183+
def _lock_check_for_getter(self, key):
184+
# Here the key is uuid, so it's an internal things.
185+
# Users will never have access to this object
186+
assert key in self._card_locks, "Card not found in card store."
187+
if self._post_task_startup:
188+
if self._card_locks[key].seems_unlocked:
189+
if not self._card_locks[key].lock():
190+
# If a card was locked then the in-memory status
191+
# here will also flip to locked.
192+
self._disable_rendering_for_locked_card(key)
193+
194+
def _lock_check_for_setter(self, key):
195+
if self._post_task_startup:
196+
if self._card_locks[key].seems_unlocked:
197+
if not self._card_locks[key].lock():
198+
return False
199+
elif self._card_locks[key].owned_by_others:
200+
return False
201+
return True
202+
203+
def unlock_everything(self):
204+
for key in self._card_locks:
205+
self._card_locks[key].unlock()
206+
207+
def __contains__(self, key):
208+
return key in self._card_component_store
209+
210+
def __getitem__(self, key):
211+
self._lock_check_for_getter(key)
212+
return self._card_component_store[key]
213+
214+
def __setitem__(self, key, value):
215+
self._card_component_store[key] = value
216+
if not self._lock_check_for_setter(key):
217+
self._disable_rendering_for_locked_card(key)
218+
219+
def items(self):
220+
return self._card_component_store.items()
221+
222+
def __len__(self):
223+
return len(self._card_component_store)

0 commit comments

Comments
 (0)