Skip to content

Commit

Permalink
Make runner reentrant (#1037)
Browse files Browse the repository at this point in the history
The cloud runner is sometimes restarted unexpectedly by K8s. Prior to
this PR, this was handled by starting an entirely new pipeline run, with
a new id, starting where the old one left off. This PR changes the
behavior such that the runner is instead able to re-create its internal
state and continue from where it left off. This (unlike the previous
behavior) should be transparent to end-users.

Testing
-------

Disabled the signal handlers in the StateMachineRunner, so it wouldn't
interpret `kubectl delete pod ...` as cancellations, then performed the
following tests, using `kubectl delete pod` on the runner pod to emulate
evictions:

- With an entirely inline graph, interrupted mid-execution.
[Run](https://josh.dev-usw2-sematic0.sematic.cloud/runs/390bf91a57524b38acff77df6265fce7#run=e6d3cc3d90dc445d920c129ff8663be7&tab=source)
- With standalone functions, interrupted mid-execution.
[Run](https://josh.dev-usw2-sematic0.sematic.cloud/runs/41e720c95c95449f8e74a5f91e25a0ea#run=741f1d1d261446be88af714a9411a3ed&tab=source)
- With multiple base images, interrupted mid-execution.
[Run](https://josh.dev-usw2-sematic0.sematic.cloud/runs/41e720c95c95449f8e74a5f91e25a0ea#run=741f1d1d261446be88af714a9411a3ed&tab=source)
- With an implicit make_list, interrupted mid-execution.
[Run](https://josh.dev-usw2-sematic0.sematic.cloud/runs/d26de86afc9b401ba596690918db1b78)
- Validated that rerun-from-here still works.
[Run](https://josh.dev-usw2-sematic0.sematic.cloud/runs/9034aa1bc6eb4e238d8477d8a172a7a7)

---------

Co-authored-by: Josh Bauer <josh@sematic.dev>
  • Loading branch information
augray and Josh Bauer authored Aug 14, 2023
1 parent 1530201 commit 23bd529
Show file tree
Hide file tree
Showing 12 changed files with 572 additions and 169 deletions.
1 change: 0 additions & 1 deletion sematic/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ sematic_py_lib(
"//sematic/db/models:edge",
"//sematic/db/models:factories",
"//sematic/db/models:run",
"//sematic/resolvers:type_utils",
"//sematic/utils:algorithms",
"//sematic/utils:memoized_property",
],
Expand Down
1 change: 1 addition & 0 deletions sematic/db/models/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ sematic_py_lib(
":user",
"//sematic:abstract_future",
"//sematic/scheduling:job_details",
"//sematic/resolvers:type_utils",
"//sematic/types:serialization",
"//sematic/types/types:union",
"//sematic/utils:hashing",
Expand Down
74 changes: 74 additions & 0 deletions sematic/db/models/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from sematic.db.models.resolution import Resolution, ResolutionStatus
from sematic.db.models.run import Run
from sematic.db.models.user import User
from sematic.resolvers.type_utils import make_list_type, make_tuple_type
from sematic.scheduling.job_details import JobDetails, JobKindString, JobStatus
from sematic.types.serialization import (
get_json_encodable_summary,
Expand All @@ -36,6 +37,8 @@ def make_run_from_future(future: AbstractFuture) -> Run:
"""
Create a Run model instance from a future.
"""
# Note: when updating this you likely need to also update
# initialize_future_from_run below.
run = Run(
id=future.id,
original_run_id=future.original_future_id,
Expand Down Expand Up @@ -66,6 +69,77 @@ def make_run_from_future(future: AbstractFuture) -> Run:
return run


def initialize_future_from_run(
run: Run, kwargs: Dict[str, Any], use_same_id: bool = True
) -> AbstractFuture:
"""Initialize a future using corresponding properties in the run.
Only properties that are completely represented
in the run itself or its corresponding Sematic Function arguments will
be set. In particular, properties requiring access to a broader run graph
or external sources of information will NOT be updated by this function.
It is thus primarily useful in conjunction with something constructing
futures with access to a full run graph (ex: see graph.py).
A list (not necessarily exhaustive) of fields that will explicitly
NOT be updated on the future from the run, due to lack of broader
graph access or other listed reasons:
- resolved_kwargs: requires full graph access
- value: requires full graph access
- parent_future: requires full graph access
- nested_future: requires full graph access
- standalone: May be set by the run's function; see Issue #1032
- cache: May be set by the run's function; see Issue #1032
- timeout_mins: May be set by the run's function; see Issue #1032
- retry_settings: May be set by the run's function; see Issue #1032
Parameters
----------
run:
The run to update properties from.
kwargs:
The keyword arguments for the future.
use_same_id:
Whether the new future should have the same id as the provided run.
Defaults to True.
Returns
-------
A future created from the given run.
"""
# Note: when updating this you likely need to also update
# make_run_from_future above.
func = run.get_func()

# _make_list and _make_tuple need special treatment as they are not
# decorated functions, but factories that dynamically generate futures.
if run.function_path == "sematic.function._make_list":
# Dict values insertion order guaranteed as of Python 3.7
input_list = list(kwargs.values())
future = func(make_list_type(input_list), input_list) # type: ignore
elif run.function_path == "sematic.function._make_tuple":
# Dict values insertion order guaranteed as of Python 3.7
input_tuple = tuple(kwargs.values())
future = func(make_tuple_type(input_tuple), input_tuple) # type: ignore
else:
future = func(**kwargs)

if use_same_id:
future.id = run.id
future.props.name = run.name
future.props.tags = json.loads(str(run.tags))
future_state = run.future_state
if isinstance(future_state, str):
future_state = FutureState[future_state]
future.props.state = future_state
future.props.resource_requirements = run.resource_requirements
if run.started_at is not None:
future.props.scheduled_epoch_time = run.started_at.timestamp()

return future


def clone_root_run(
run: Run, edges: List[Edge], artifacts_override: Optional[Dict[str, str]] = None
) -> Tuple[Run, List[Edge]]:
Expand Down
58 changes: 58 additions & 0 deletions sematic/db/models/tests/test_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import hashlib
import json
import time
from datetime import datetime, timedelta, timezone
from typing import List, Optional, Union

# Third-party
Expand All @@ -13,6 +14,7 @@
from sematic.db.models.factories import (
clone_resolution,
clone_root_run,
initialize_future_from_run,
make_artifact,
make_job,
make_run_from_future,
Expand Down Expand Up @@ -47,6 +49,11 @@ def f():
pass # Some note


@func
def f2(a: int, b: int) -> int:
return a + b


def test_make_run_from_future():
future = f()
parent_future = f()
Expand Down Expand Up @@ -255,6 +262,57 @@ def test_clone_resolution(resolution: Resolution): # noqa: F811
assert cloned_resolution.build_config == resolution.build_config


def test_initialize_future_from_run():
created_at = datetime(
year=2023, month=8, day=10, tzinfo=timezone(timedelta(hours=4))
)
run = Run( # noqa: F811
id="theid",
original_run_id=None,
future_state=FutureState.RAN,
name="the name",
function_path=f"{f2.__module__}.{f2.__name__}",
parent_id="parentid",
root_id="rootid",
description="the description",
tags=["foo", "bar"],
nested_future_id="nestedid",
container_image_uri="imageuri",
created_at=created_at,
updated_at=created_at + timedelta(hours=2),
started_at=created_at + timedelta(hours=1),
ended_at=created_at + timedelta(2),
resolved_at=created_at + timedelta(2),
failed_at=None,
cache_key="cachekey",
)
requirements = ResourceRequirements(
kubernetes=KubernetesResourceRequirements(
node_selector={"foo": "bar"},
)
)
run.resource_requirements = requirements

kwargs = {"a": 1, "b": 2}
future = initialize_future_from_run(run, kwargs=kwargs, use_same_id=True)

assert future.id == run.id
assert future.function is f2
assert future.props.name == run.name
assert future.props.tags == json.loads(run.tags) # type: ignore
assert future.props.state == FutureState[run.future_state] # type: ignore
assert future.props.resource_requirements == requirements
assert future.props.scheduled_epoch_time == 1691614800
assert future.kwargs == kwargs

future2 = initialize_future_from_run(run, kwargs, use_same_id=False)
assert future2.id != future.id

run.function_path = "sematic.function._make_list"
future3 = initialize_future_from_run(run, kwargs={"v0": 0, "v1": 1})
assert future3.function.execute(**future3.kwargs) == [0, 1]


def test_new():
name = "foo"
namespace = "bar"
Expand Down
1 change: 1 addition & 0 deletions sematic/db/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ sematic_py_lib(
# buildifier: leave-alone
deps = [
"//sematic:abstract_future",
"//sematic:function",
"//sematic/db:db",
"//sematic/db:queries",
"//sematic/db/models:edge",
Expand Down
Loading

0 comments on commit 23bd529

Please sign in to comment.